本文整理汇总了Java中com.twitter.util.Function类的典型用法代码示例。如果您正苦于以下问题:Java Function类的具体用法?Java Function怎么用?Java Function使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Function类属于com.twitter.util包,在下文中一共展示了Function类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: asyncGetLogRecordCount
import com.twitter.util.Function; //导入依赖的package包/类
/**
* Get a count of records between beginDLSN and the end of the stream.
*
* @param beginDLSN dlsn marking the start of the range
* @return the count of records present in the range
*/
public Future<Long> asyncGetLogRecordCount(final DLSN beginDLSN) {
return checkLogStreamExistsAsync().flatMap(new Function<Void, Future<Long>>() {
public Future<Long> apply(Void done) {
return asyncGetFullLedgerList(true, false).flatMap(new Function<List<LogSegmentMetadata>, Future<Long>>() {
public Future<Long> apply(List<LogSegmentMetadata> ledgerList) {
List<Future<Long>> futureCounts = new ArrayList<Future<Long>>(ledgerList.size());
for (LogSegmentMetadata ledger : ledgerList) {
if (ledger.getLogSegmentSequenceNumber() >= beginDLSN.getLogSegmentSequenceNo()) {
futureCounts.add(asyncGetLogRecordCount(ledger, beginDLSN));
}
}
return Future.collect(futureCounts).map(new Function<List<Long>, Long>() {
public Long apply(List<Long> counts) {
return sum(counts);
}
});
}
});
}
});
}
开发者ID:twitter,项目名称:distributedlog,代码行数:31,代码来源:BKLogHandler.java
示例2: addWatch
import com.twitter.util.Function; //导入依赖的package包/类
/**
* Adds a watch on the specified file. The file must exist, otherwise a FileNotFoundException
* is returned. If the file is deleted after a watch is established, the watcher will log errors
* but continue to monitor it, and resume watching if it is recreated.
*
* @param filePath path to the file to watch.
* @param onUpdate function to call when a change is detected to the file. The entire contents
* of the file will be passed in to the function. Note that onUpdate will be
* called once before this call completes, which facilities initial load of data.
* This callback is executed synchronously on the watcher thread - it is
* important that the function be non-blocking.
*/
public synchronized void addWatch(String filePath, Function<byte[], Void> onUpdate)
throws IOException {
MorePreconditions.checkNotBlank(filePath);
Preconditions.checkNotNull(onUpdate);
// Read the file and make the initial onUpdate call.
File file = new File(filePath);
ByteSource byteSource = Files.asByteSource(file);
onUpdate.apply(byteSource.read());
// Add the file to our map if it isn't already there, and register the new change watcher.
ConfigFileInfo configFileInfo = watchedFileMap.get(filePath);
if (configFileInfo == null) {
configFileInfo = new ConfigFileInfo(file.lastModified(), byteSource.hash(HASH_FUNCTION));
watchedFileMap.put(filePath, configFileInfo);
}
configFileInfo.changeWatchers.add(onUpdate);
}
开发者ID:pinterest-attic,项目名称:pinlater,代码行数:31,代码来源:ConfigFileWatcher.java
示例3: createQueueImpl
import com.twitter.util.Function; //导入依赖的package包/类
@Override
protected void createQueueImpl(final String queueName) throws Exception {
// Add the queueName to the queueNames sorted set in each shard.
final double currentTimeSeconds = System.currentTimeMillis() / 1000.0;
for (final ImmutableMap.Entry<String, RedisPools> shard : shardMap.entrySet()) {
final String queueNamesRedisKey = RedisBackendUtils.constructQueueNamesRedisKey(
shard.getKey());
RedisUtils.executeWithConnection(
shard.getValue().getGeneralRedisPool(),
new Function<Jedis, Void>() {
@Override
public Void apply(Jedis conn) {
if (conn.zscore(queueNamesRedisKey, queueName) == null) {
conn.zadd(queueNamesRedisKey, currentTimeSeconds, queueName);
}
return null;
}
});
}
reloadQueueNames();
}
开发者ID:pinterest-attic,项目名称:pinlater,代码行数:22,代码来源:PinLaterRedisBackend.java
示例4: cleanUpAllShards
import com.twitter.util.Function; //导入依赖的package包/类
/**
* Clean up all the keys in each shard. This method is only for test use.
*/
@VisibleForTesting
public Future<Void> cleanUpAllShards() {
return futurePool.apply(new ExceptionalFunction0<Void>() {
@Override
public Void applyE() throws Throwable {
for (final ImmutableMap.Entry<String, RedisPools> shard : shardMap.entrySet()) {
RedisUtils.executeWithConnection(
shard.getValue().getGeneralRedisPool(),
new Function<Jedis, Void>() {
@Override
public Void apply(Jedis conn) {
conn.flushAll();
return null;
}
});
}
return null;
}
});
}
开发者ID:pinterest-attic,项目名称:pinlater,代码行数:24,代码来源:PinLaterRedisBackend.java
示例5: reloadQueueNames
import com.twitter.util.Function; //导入依赖的package包/类
/**
* Reload queue names from redis to local cache.
*/
private synchronized void reloadQueueNames() throws Exception {
ImmutableSet.Builder<String> builder = new ImmutableSet.Builder<String>();
if (!shardMap.isEmpty()) {
final Map.Entry<String, RedisPools> randomShard = getRandomShard(true);
if (randomShard == null) {
throw new PinLaterException(ErrorCode.NO_HEALTHY_SHARDS, "Unable to find healthy shard");
}
Set<String> newQueueNames = RedisUtils.executeWithConnection(
randomShard.getValue().getGeneralRedisPool(),
new Function<Jedis, Set<String>>() {
@Override
public Set<String> apply(Jedis conn) {
return RedisBackendUtils.getQueueNames(conn, randomShard.getKey());
}
});
builder.addAll(newQueueNames);
}
queueNames.set(builder.build());
}
开发者ID:pinterest-attic,项目名称:pinlater,代码行数:23,代码来源:PinLaterRedisBackend.java
示例6: removeJobHash
import com.twitter.util.Function; //导入依赖的package包/类
/**
* Remove the job hash from redis. This function is used in test to simulate the case where the
* job id is still in the queue, while the job hash is evicted by redis LRU.
*/
@VisibleForTesting
public Future<Void> removeJobHash(String jobDescriptor) {
final PinLaterJobDescriptor jobDesc = new PinLaterJobDescriptor(jobDescriptor);
return futurePool.apply(new ExceptionalFunction0<Void>() {
@Override
public Void applyE() throws Throwable {
RedisUtils.executeWithConnection(
shardMap.get(jobDesc.getShardName()).getGeneralRedisPool(),
new Function<Jedis, Void>() {
@Override
public Void apply(Jedis conn) {
String hashRedisKey = RedisBackendUtils.constructHashRedisKey(
jobDesc.getQueueName(), jobDesc.getShardName(), jobDesc.getLocalId());
conn.del(hashRedisKey);
return null;
}
});
return null;
}
});
}
开发者ID:pinterest-attic,项目名称:pinlater,代码行数:26,代码来源:PinLaterRedisBackend.java
示例7: clientPing
import com.twitter.util.Function; //导入依赖的package包/类
public boolean clientPing(JedisPool jedisPool) {
boolean result = false;
try {
result = RedisUtils.executeWithConnection(
jedisPool,
new Function<Jedis, String>() {
@Override
public String apply(Jedis conn) {
return conn.ping();
}
}
).equals("PONG");
} catch (Exception e) {
// failed ping
}
return result;
}
开发者ID:pinterest-attic,项目名称:pinlater,代码行数:18,代码来源:JedisClientHelper.java
示例8: executeWithConnection
import com.twitter.util.Function; //导入依赖的package包/类
/**
* Gets the connection from the connection pool and adds the wrapper catch/finally block for the
* given function.
*
* This helper method saves the trouble of dealing with redis connection. When we got
* JedisConnectionException, we will discard this connection. Otherwise, we return the connection
* to the connection pool.
*
* @param jedisPool Jedis connection pool
* @param redisDBNum Redis DB number (index) (if redisDBNum == -1, don't select a DB )
* @param func The function to execute inside the catch/finally block.
* @return A Resp object, which is the return value of wrapped function.
*/
public static <Resp> Resp executeWithConnection(JedisPool jedisPool,
int redisDBNum,
Function<Jedis, Resp> func) {
Preconditions.checkNotNull(jedisPool);
Preconditions.checkNotNull(func);
Jedis conn = null;
boolean gotJedisConnException = false;
try {
conn = jedisPool.getResource();
selectRedisDB(conn, redisDBNum);
return func.apply(conn);
} catch (JedisConnectionException e) {
jedisPool.returnBrokenResource(conn);
gotJedisConnException = true;
throw e;
} finally {
if (conn != null && !gotJedisConnException) {
jedisPool.returnResource(conn);
}
}
}
开发者ID:pinterest-attic,项目名称:pinlater,代码行数:35,代码来源:RedisUtils.java
示例9: executePartitioned
import com.twitter.util.Function; //导入依赖的package包/类
/**
* Executes a batch of requests asynchronously in a partitioned manner,
* with the specified parallelism.
*
* @param requests List of requests to execute.
* @param parallelism Desired parallelism (must be > 0).
* @param executeBatch Function to execute each partitioned batch of requests.
* @param <Req> Request type.
* @param <Resp> Response type.
* @return List of response futures.
*/
public static <Req, Resp> List<Future<Resp>> executePartitioned(
List<Req> requests,
int parallelism,
Function<List<Req>, Future<Resp>> executeBatch) {
MorePreconditions.checkNotBlank(requests);
Preconditions.checkArgument(parallelism > 0);
Preconditions.checkNotNull(executeBatch);
int sizePerPartition = Math.max(requests.size() / parallelism, 1);
List<List<Req>> partitions = Lists.partition(requests, sizePerPartition);
List<Future<Resp>> futures = Lists.newArrayListWithCapacity(partitions.size());
for (final List<Req> request : partitions) {
futures.add(executeBatch.apply(request));
}
return futures;
}
开发者ID:pinterest-attic,项目名称:pinlater,代码行数:28,代码来源:PinLaterBackendUtils.java
示例10: dequeueJobs
import com.twitter.util.Function; //导入依赖的package包/类
@Override
public Future<PinLaterDequeueResponse> dequeueJobs(
RequestContext context, final PinLaterDequeueRequest request) {
if (!queueConfig.allowDequeue(request.getQueueName(), request.getLimit())) {
Stats.incr(request.getQueueName() + "_dequeue_requests_rate_limited");
return Future.exception(new PinLaterException(ErrorCode.DEQUEUE_RATE_LIMITED,
"Dequeue rate limit exceeded for queue: " + request.getQueueName()));
}
return Stats.timeFutureMillis(
"PinLaterService.dequeueJobs",
backend.dequeueJobs(context.getSource(), request).onSuccess(
new Function<PinLaterDequeueResponse, BoxedUnit>() {
@Override
public BoxedUnit apply(PinLaterDequeueResponse response) {
Stats.incr(request.getQueueName() + "_dequeue", response.getJobsSize());
return null;
}
}).rescue(new LogAndWrapException<PinLaterDequeueResponse>(
context, "dequeueJobs", request.toString())));
}
开发者ID:pinterest-attic,项目名称:pinlater,代码行数:22,代码来源:PinLaterServiceImpl.java
示例11: ackDequeuedJobs
import com.twitter.util.Function; //导入依赖的package包/类
@Override
public Future<Void> ackDequeuedJobs(RequestContext context, final PinLaterJobAckRequest request) {
return Stats.timeFutureMillis(
"PinLaterService.ackDequeuedJobs",
backend.ackDequeuedJobs(request).onSuccess(
new Function<Void, BoxedUnit>() {
@Override
public BoxedUnit apply(Void aVoid) {
Stats.incr(request.getQueueName() + "_ack_succeeded",
request.getJobsSucceededSize());
Stats.incr(request.getQueueName() + "_ack_failed", request.getJobsFailedSize());
return null;
}
}).rescue(new LogAndWrapException<Void>(context, "ackDequeuedJobs",
request.toString())));
}
开发者ID:pinterest-attic,项目名称:pinlater,代码行数:17,代码来源:PinLaterServiceImpl.java
示例12: setAcceptNewStream
import com.twitter.util.Function; //导入依赖的package包/类
@Override
public Future<Void> setAcceptNewStream(boolean enabled) {
Map<SocketAddress, ProxyClient> snapshot = clientManager.getAllClients();
List<Future<Void>> futures = new ArrayList<Future<Void>>(snapshot.size());
for (Map.Entry<SocketAddress, ProxyClient> entry : snapshot.entrySet()) {
futures.add(entry.getValue().getService().setAcceptNewStream(enabled));
}
return Future.collect(futures).map(new Function<List<Void>, Void>() {
@Override
public Void apply(List<Void> list) {
return null;
}
});
}
开发者ID:twitter,项目名称:distributedlog,代码行数:15,代码来源:DistributedLogClientImpl.java
示例13: purgeLogSegmentsOlderThanTimestamp
import com.twitter.util.Function; //导入依赖的package包/类
Future<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTimestamp(final long minTimestampToKeep) {
if (minTimestampToKeep >= Utils.nowInMillis()) {
return Future.exception(new IllegalArgumentException(
"Invalid timestamp " + minTimestampToKeep + " to purge logs for " + getFullyQualifiedName()));
}
return asyncGetFullLedgerList(false, false).flatMap(
new Function<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() {
@Override
public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) {
List<LogSegmentMetadata> purgeList = new ArrayList<LogSegmentMetadata>(logSegments.size());
int numCandidates = getNumCandidateLogSegmentsToTruncate(logSegments);
for (int iterator = 0; iterator < numCandidates; iterator++) {
LogSegmentMetadata l = logSegments.get(iterator);
// When application explicitly truncates segments; timestamp based purge is
// only used to cleanup log segments that have been marked for truncation
if ((l.isTruncated() || !conf.getExplicitTruncationByApplication()) &&
!l.isInProgress() && (l.getCompletionTime() < minTimestampToKeep)) {
purgeList.add(l);
} else {
// stop truncating log segments if we find either an inprogress or a partially
// truncated log segment
break;
}
}
LOG.info("Deleting log segments older than {} for {} : {}",
new Object[] { minTimestampToKeep, getFullyQualifiedName(), purgeList });
return deleteLogSegments(purgeList);
}
});
}
开发者ID:twitter,项目名称:distributedlog,代码行数:33,代码来源:BKLogWriteHandler.java
示例14: deleteLogSegments
import com.twitter.util.Function; //导入依赖的package包/类
private Future<List<LogSegmentMetadata>> deleteLogSegments(
final List<LogSegmentMetadata> logs) {
if (LOG.isTraceEnabled()) {
LOG.trace("Purging logs for {} : {}", getFullyQualifiedName(), logs);
}
return FutureUtils.processList(logs,
new Function<LogSegmentMetadata, Future<LogSegmentMetadata>>() {
@Override
public Future<LogSegmentMetadata> apply(LogSegmentMetadata segment) {
return deleteLogSegment(segment);
}
}, scheduler);
}
开发者ID:twitter,项目名称:distributedlog,代码行数:14,代码来源:BKLogWriteHandler.java
示例15: ensureReadLockPathExist
import com.twitter.util.Function; //导入依赖的package包/类
private Future<Void> ensureReadLockPathExist() {
final Promise<Void> promise = new Promise<Void>();
promise.setInterruptHandler(new com.twitter.util.Function<Throwable, BoxedUnit>() {
@Override
public BoxedUnit apply(Throwable t) {
FutureUtils.setException(promise, new LockCancelledException(readLockPath, "Could not ensure read lock path", t));
return null;
}
});
Optional<String> parentPathShouldNotCreate = Optional.of(logMetadata.getLogRootPath());
Utils.zkAsyncCreateFullPathOptimisticRecursive(zooKeeperClient, readLockPath, parentPathShouldNotCreate,
new byte[0], zooKeeperClient.getDefaultACL(), CreateMode.PERSISTENT,
new org.apache.zookeeper.AsyncCallback.StringCallback() {
@Override
public void processResult(final int rc, final String path, Object ctx, String name) {
scheduler.submit(new Runnable() {
@Override
public void run() {
if (KeeperException.Code.NONODE.intValue() == rc) {
FutureUtils.setException(promise, new LogNotFoundException(String.format("Log %s does not exist or has been deleted", getFullyQualifiedName())));
} else if (KeeperException.Code.OK.intValue() == rc) {
FutureUtils.setValue(promise, null);
LOG.trace("Created path {}.", path);
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
FutureUtils.setValue(promise, null);
LOG.trace("Path {} is already existed.", path);
} else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE == rc) {
FutureUtils.setException(promise, new ZooKeeperClient.ZooKeeperConnectionException(path));
} else if (DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) {
FutureUtils.setException(promise, new DLInterruptedException(path));
} else {
FutureUtils.setException(promise, KeeperException.create(KeeperException.Code.get(rc)));
}
}
});
}
}, null);
return promise;
}
开发者ID:twitter,项目名称:distributedlog,代码行数:40,代码来源:BKLogReadHandler.java
示例16: ListFutureProcessor
import com.twitter.util.Function; //导入依赖的package包/类
ListFutureProcessor(List<T> items,
Function<T, Future<R>> processFunc,
ExecutorService callbackExecutor) {
this.itemsIter = items.iterator();
this.processFunc = processFunc;
this.promise = new Promise<List<R>>();
this.promise.setInterruptHandler(this);
this.results = new ArrayList<R>();
this.callbackExecutor = callbackExecutor;
}
开发者ID:twitter,项目名称:distributedlog,代码行数:11,代码来源:FutureUtils.java
示例17: processList
import com.twitter.util.Function; //导入依赖的package包/类
/**
* Process the list of items one by one using the process function <i>processFunc</i>.
* The process will be stopped immediately if it fails on processing any one.
*
* @param collection list of items
* @param processFunc process function
* @param callbackExecutor executor to process the item
* @return future presents the list of processed results
*/
public static <T, R> Future<List<R>> processList(List<T> collection,
Function<T, Future<R>> processFunc,
@Nullable ExecutorService callbackExecutor) {
ListFutureProcessor<T, R> processor =
new ListFutureProcessor<T, R>(collection, processFunc, callbackExecutor);
if (null != callbackExecutor) {
callbackExecutor.submit(processor);
} else {
processor.run();
}
return processor.promise;
}
开发者ID:twitter,项目名称:distributedlog,代码行数:22,代码来源:FutureUtils.java
示例18: processReaderOperation
import com.twitter.util.Function; //导入依赖的package包/类
<T> Future<T> processReaderOperation(final Function<BKLogReadHandler, Future<T>> func) {
initializeFuturePool(false);
return readerFuturePool.apply(new ExceptionalFunction0<BKLogReadHandler>() {
@Override
public BKLogReadHandler applyE() throws Throwable {
return getReadHandlerForListener(true);
}
}).flatMap(new ExceptionalFunction<BKLogReadHandler, Future<T>>() {
@Override
public Future<T> applyE(final BKLogReadHandler readHandler) throws Throwable {
return func.apply(readHandler);
}
});
}
开发者ID:twitter,项目名称:distributedlog,代码行数:15,代码来源:BKDistributedLogManager.java
示例19: getLastLogRecordAsyncInternal
import com.twitter.util.Function; //导入依赖的package包/类
private Future<LogRecordWithDLSN> getLastLogRecordAsyncInternal(final boolean recover,
final boolean includeEndOfStream) {
return processReaderOperation(new Function<BKLogReadHandler, Future<LogRecordWithDLSN>>() {
@Override
public Future<LogRecordWithDLSN> apply(final BKLogReadHandler ledgerHandler) {
return ledgerHandler.getLastLogRecordAsync(recover, includeEndOfStream);
}
});
}
开发者ID:twitter,项目名称:distributedlog,代码行数:10,代码来源:BKDistributedLogManager.java
示例20: getFirstRecordAsyncInternal
import com.twitter.util.Function; //导入依赖的package包/类
private Future<LogRecordWithDLSN> getFirstRecordAsyncInternal() {
return processReaderOperation(new Function<BKLogReadHandler, Future<LogRecordWithDLSN>>() {
@Override
public Future<LogRecordWithDLSN> apply(final BKLogReadHandler ledgerHandler) {
return ledgerHandler.asyncGetFirstLogRecord();
}
});
}
开发者ID:twitter,项目名称:distributedlog,代码行数:9,代码来源:BKDistributedLogManager.java
注:本文中的com.twitter.util.Function类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论