• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java Function类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中com.twitter.util.Function的典型用法代码示例。如果您正苦于以下问题:Java Function类的具体用法?Java Function怎么用?Java Function使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Function类属于com.twitter.util包,在下文中一共展示了Function类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: asyncGetLogRecordCount

import com.twitter.util.Function; //导入依赖的package包/类
/**
 * Get a count of records between beginDLSN and the end of the stream.
 *
 * @param beginDLSN dlsn marking the start of the range
 * @return the count of records present in the range
 */
public Future<Long> asyncGetLogRecordCount(final DLSN beginDLSN) {

    return checkLogStreamExistsAsync().flatMap(new Function<Void, Future<Long>>() {
        public Future<Long> apply(Void done) {

            return asyncGetFullLedgerList(true, false).flatMap(new Function<List<LogSegmentMetadata>, Future<Long>>() {
                public Future<Long> apply(List<LogSegmentMetadata> ledgerList) {

                    List<Future<Long>> futureCounts = new ArrayList<Future<Long>>(ledgerList.size());
                    for (LogSegmentMetadata ledger : ledgerList) {
                        if (ledger.getLogSegmentSequenceNumber() >= beginDLSN.getLogSegmentSequenceNo()) {
                            futureCounts.add(asyncGetLogRecordCount(ledger, beginDLSN));
                        }
                    }
                    return Future.collect(futureCounts).map(new Function<List<Long>, Long>() {
                        public Long apply(List<Long> counts) {
                            return sum(counts);
                        }
                    });
                }
            });
        }
    });
}
 
开发者ID:twitter,项目名称:distributedlog,代码行数:31,代码来源:BKLogHandler.java


示例2: addWatch

import com.twitter.util.Function; //导入依赖的package包/类
/**
 * Adds a watch on the specified file. The file must exist, otherwise a FileNotFoundException
 * is returned. If the file is deleted after a watch is established, the watcher will log errors
 * but continue to monitor it, and resume watching if it is recreated.
 *
 * @param filePath path to the file to watch.
 * @param onUpdate function to call when a change is detected to the file. The entire contents
 *                 of the file will be passed in to the function. Note that onUpdate will be
 *                 called once before this call completes, which facilities initial load of data.
 *                 This callback is executed synchronously on the watcher thread - it is
 *                 important that the function be non-blocking.
 */
public synchronized void addWatch(String filePath, Function<byte[], Void> onUpdate)
    throws IOException {
  MorePreconditions.checkNotBlank(filePath);
  Preconditions.checkNotNull(onUpdate);

  // Read the file and make the initial onUpdate call.
  File file = new File(filePath);
  ByteSource byteSource = Files.asByteSource(file);
  onUpdate.apply(byteSource.read());

  // Add the file to our map if it isn't already there, and register the new change watcher.
  ConfigFileInfo configFileInfo = watchedFileMap.get(filePath);
  if (configFileInfo == null) {
    configFileInfo = new ConfigFileInfo(file.lastModified(), byteSource.hash(HASH_FUNCTION));
    watchedFileMap.put(filePath, configFileInfo);
  }
  configFileInfo.changeWatchers.add(onUpdate);
}
 
开发者ID:pinterest-attic,项目名称:pinlater,代码行数:31,代码来源:ConfigFileWatcher.java


示例3: createQueueImpl

import com.twitter.util.Function; //导入依赖的package包/类
@Override
protected void createQueueImpl(final String queueName) throws Exception {
  // Add the queueName to the queueNames sorted set in each shard.
  final double currentTimeSeconds = System.currentTimeMillis() / 1000.0;
  for (final ImmutableMap.Entry<String, RedisPools> shard : shardMap.entrySet()) {
    final String queueNamesRedisKey = RedisBackendUtils.constructQueueNamesRedisKey(
        shard.getKey());
    RedisUtils.executeWithConnection(
        shard.getValue().getGeneralRedisPool(),
        new Function<Jedis, Void>() {
          @Override
          public Void apply(Jedis conn) {
            if (conn.zscore(queueNamesRedisKey, queueName) == null) {
              conn.zadd(queueNamesRedisKey, currentTimeSeconds, queueName);
            }
            return null;
          }
        });
  }
  reloadQueueNames();
}
 
开发者ID:pinterest-attic,项目名称:pinlater,代码行数:22,代码来源:PinLaterRedisBackend.java


示例4: cleanUpAllShards

import com.twitter.util.Function; //导入依赖的package包/类
/**
 * Clean up all the keys in each shard. This method is only for test use.
 */
@VisibleForTesting
public Future<Void> cleanUpAllShards() {
  return futurePool.apply(new ExceptionalFunction0<Void>() {
    @Override
    public Void applyE() throws Throwable {
      for (final ImmutableMap.Entry<String, RedisPools> shard : shardMap.entrySet()) {
        RedisUtils.executeWithConnection(
            shard.getValue().getGeneralRedisPool(),
            new Function<Jedis, Void>() {
              @Override
              public Void apply(Jedis conn) {
                conn.flushAll();
                return null;
              }
            });
      }
      return null;
    }
  });
}
 
开发者ID:pinterest-attic,项目名称:pinlater,代码行数:24,代码来源:PinLaterRedisBackend.java


示例5: reloadQueueNames

import com.twitter.util.Function; //导入依赖的package包/类
/**
 * Reload queue names from redis to local cache.
 */
private synchronized void reloadQueueNames() throws Exception {
  ImmutableSet.Builder<String> builder = new ImmutableSet.Builder<String>();
  if (!shardMap.isEmpty()) {
    final Map.Entry<String, RedisPools> randomShard = getRandomShard(true);
    if (randomShard == null) {
      throw new PinLaterException(ErrorCode.NO_HEALTHY_SHARDS, "Unable to find healthy shard");
    }
    Set<String> newQueueNames = RedisUtils.executeWithConnection(
        randomShard.getValue().getGeneralRedisPool(),
        new Function<Jedis, Set<String>>() {
          @Override
          public Set<String> apply(Jedis conn) {
            return RedisBackendUtils.getQueueNames(conn, randomShard.getKey());
          }
        });
    builder.addAll(newQueueNames);
  }
  queueNames.set(builder.build());
}
 
开发者ID:pinterest-attic,项目名称:pinlater,代码行数:23,代码来源:PinLaterRedisBackend.java


示例6: removeJobHash

import com.twitter.util.Function; //导入依赖的package包/类
/**
 * Remove the job hash from redis. This function is used in test to simulate the case where the
 * job id is still in the queue, while the job hash is evicted by redis LRU.
 */
@VisibleForTesting
public Future<Void> removeJobHash(String jobDescriptor) {
  final PinLaterJobDescriptor jobDesc = new PinLaterJobDescriptor(jobDescriptor);
  return futurePool.apply(new ExceptionalFunction0<Void>() {
    @Override
    public Void applyE() throws Throwable {
      RedisUtils.executeWithConnection(
          shardMap.get(jobDesc.getShardName()).getGeneralRedisPool(),
          new Function<Jedis, Void>() {
            @Override
            public Void apply(Jedis conn) {
              String hashRedisKey = RedisBackendUtils.constructHashRedisKey(
                  jobDesc.getQueueName(), jobDesc.getShardName(), jobDesc.getLocalId());
              conn.del(hashRedisKey);
              return null;
            }
          });
      return null;
    }
  });
}
 
开发者ID:pinterest-attic,项目名称:pinlater,代码行数:26,代码来源:PinLaterRedisBackend.java


示例7: clientPing

import com.twitter.util.Function; //导入依赖的package包/类
public boolean clientPing(JedisPool jedisPool) {
  boolean result = false;
  try {
    result = RedisUtils.executeWithConnection(
        jedisPool,
        new Function<Jedis, String>() {
          @Override
          public String apply(Jedis conn) {
            return conn.ping();
          }
        }
    ).equals("PONG");
  } catch (Exception e) {
    // failed ping
  }
  return result;
}
 
开发者ID:pinterest-attic,项目名称:pinlater,代码行数:18,代码来源:JedisClientHelper.java


示例8: executeWithConnection

import com.twitter.util.Function; //导入依赖的package包/类
/**
 * Gets the connection from the connection pool and adds the wrapper catch/finally block for the
 * given function.
 *
 * This helper method saves the trouble of dealing with redis connection. When we got
 * JedisConnectionException, we will discard this connection. Otherwise, we return the connection
 * to the connection pool.
 *
 * @param jedisPool Jedis connection pool
 * @param redisDBNum Redis DB number (index) (if redisDBNum == -1, don't select a DB )
 * @param func    The function to execute inside the catch/finally block.
 * @return A Resp object, which is the return value of wrapped function.
 */
public static <Resp> Resp executeWithConnection(JedisPool jedisPool,
                                                int redisDBNum,
                                                Function<Jedis, Resp> func) {
  Preconditions.checkNotNull(jedisPool);
  Preconditions.checkNotNull(func);
  Jedis conn = null;
  boolean gotJedisConnException = false;
  try {
    conn = jedisPool.getResource();
    selectRedisDB(conn, redisDBNum);
    return func.apply(conn);
  } catch (JedisConnectionException e) {
    jedisPool.returnBrokenResource(conn);
    gotJedisConnException = true;
    throw e;
  } finally {
    if (conn != null && !gotJedisConnException) {
      jedisPool.returnResource(conn);
    }
  }
}
 
开发者ID:pinterest-attic,项目名称:pinlater,代码行数:35,代码来源:RedisUtils.java


示例9: executePartitioned

import com.twitter.util.Function; //导入依赖的package包/类
/**
 * Executes a batch of requests asynchronously in a partitioned manner,
 * with the specified parallelism.
 *
 * @param requests      List of requests to execute.
 * @param parallelism   Desired parallelism (must be > 0).
 * @param executeBatch  Function to execute each partitioned batch of requests.
 * @param <Req>         Request type.
 * @param <Resp>        Response type.
 * @return List of response futures.
 */
public static <Req, Resp> List<Future<Resp>> executePartitioned(
    List<Req> requests,
    int parallelism,
    Function<List<Req>, Future<Resp>> executeBatch) {
  MorePreconditions.checkNotBlank(requests);
  Preconditions.checkArgument(parallelism > 0);
  Preconditions.checkNotNull(executeBatch);

  int sizePerPartition = Math.max(requests.size() / parallelism, 1);
  List<List<Req>> partitions = Lists.partition(requests, sizePerPartition);
  List<Future<Resp>> futures = Lists.newArrayListWithCapacity(partitions.size());
  for (final List<Req> request : partitions) {
    futures.add(executeBatch.apply(request));
  }
  return futures;
}
 
开发者ID:pinterest-attic,项目名称:pinlater,代码行数:28,代码来源:PinLaterBackendUtils.java


示例10: dequeueJobs

import com.twitter.util.Function; //导入依赖的package包/类
@Override
public Future<PinLaterDequeueResponse> dequeueJobs(
    RequestContext context, final PinLaterDequeueRequest request) {
  if (!queueConfig.allowDequeue(request.getQueueName(), request.getLimit())) {
    Stats.incr(request.getQueueName() + "_dequeue_requests_rate_limited");
    return Future.exception(new PinLaterException(ErrorCode.DEQUEUE_RATE_LIMITED,
        "Dequeue rate limit exceeded for queue: " + request.getQueueName()));
  }

  return Stats.timeFutureMillis(
      "PinLaterService.dequeueJobs",
      backend.dequeueJobs(context.getSource(), request).onSuccess(
          new Function<PinLaterDequeueResponse, BoxedUnit>() {
            @Override
            public BoxedUnit apply(PinLaterDequeueResponse response) {
              Stats.incr(request.getQueueName() + "_dequeue", response.getJobsSize());
              return null;
            }
          }).rescue(new LogAndWrapException<PinLaterDequeueResponse>(
          context, "dequeueJobs", request.toString())));
}
 
开发者ID:pinterest-attic,项目名称:pinlater,代码行数:22,代码来源:PinLaterServiceImpl.java


示例11: ackDequeuedJobs

import com.twitter.util.Function; //导入依赖的package包/类
@Override
public Future<Void> ackDequeuedJobs(RequestContext context, final PinLaterJobAckRequest request) {
  return Stats.timeFutureMillis(
      "PinLaterService.ackDequeuedJobs",
      backend.ackDequeuedJobs(request).onSuccess(
          new Function<Void, BoxedUnit>() {
            @Override
            public BoxedUnit apply(Void aVoid) {
              Stats.incr(request.getQueueName() + "_ack_succeeded",
                  request.getJobsSucceededSize());
              Stats.incr(request.getQueueName() + "_ack_failed", request.getJobsFailedSize());
              return null;
            }
          }).rescue(new LogAndWrapException<Void>(context, "ackDequeuedJobs",
          request.toString())));
}
 
开发者ID:pinterest-attic,项目名称:pinlater,代码行数:17,代码来源:PinLaterServiceImpl.java


示例12: setAcceptNewStream

import com.twitter.util.Function; //导入依赖的package包/类
@Override
public Future<Void> setAcceptNewStream(boolean enabled) {
    Map<SocketAddress, ProxyClient> snapshot = clientManager.getAllClients();
    List<Future<Void>> futures = new ArrayList<Future<Void>>(snapshot.size());
    for (Map.Entry<SocketAddress, ProxyClient> entry : snapshot.entrySet()) {
        futures.add(entry.getValue().getService().setAcceptNewStream(enabled));
    }
    return Future.collect(futures).map(new Function<List<Void>, Void>() {
        @Override
        public Void apply(List<Void> list) {
            return null;
        }
    });
}
 
开发者ID:twitter,项目名称:distributedlog,代码行数:15,代码来源:DistributedLogClientImpl.java


示例13: purgeLogSegmentsOlderThanTimestamp

import com.twitter.util.Function; //导入依赖的package包/类
Future<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTimestamp(final long minTimestampToKeep) {
    if (minTimestampToKeep >= Utils.nowInMillis()) {
        return Future.exception(new IllegalArgumentException(
                "Invalid timestamp " + minTimestampToKeep + " to purge logs for " + getFullyQualifiedName()));
    }
    return asyncGetFullLedgerList(false, false).flatMap(
            new Function<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() {
        @Override
        public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) {
            List<LogSegmentMetadata> purgeList = new ArrayList<LogSegmentMetadata>(logSegments.size());

            int numCandidates = getNumCandidateLogSegmentsToTruncate(logSegments);

            for (int iterator = 0; iterator < numCandidates; iterator++) {
                LogSegmentMetadata l = logSegments.get(iterator);
                // When application explicitly truncates segments; timestamp based purge is
                // only used to cleanup log segments that have been marked for truncation
                if ((l.isTruncated() || !conf.getExplicitTruncationByApplication()) &&
                    !l.isInProgress() && (l.getCompletionTime() < minTimestampToKeep)) {
                    purgeList.add(l);
                } else {
                    // stop truncating log segments if we find either an inprogress or a partially
                    // truncated log segment
                    break;
                }
            }
            LOG.info("Deleting log segments older than {} for {} : {}",
                    new Object[] { minTimestampToKeep, getFullyQualifiedName(), purgeList });
            return deleteLogSegments(purgeList);
        }
    });
}
 
开发者ID:twitter,项目名称:distributedlog,代码行数:33,代码来源:BKLogWriteHandler.java


示例14: deleteLogSegments

import com.twitter.util.Function; //导入依赖的package包/类
private Future<List<LogSegmentMetadata>> deleteLogSegments(
        final List<LogSegmentMetadata> logs) {
    if (LOG.isTraceEnabled()) {
        LOG.trace("Purging logs for {} : {}", getFullyQualifiedName(), logs);
    }
    return FutureUtils.processList(logs,
            new Function<LogSegmentMetadata, Future<LogSegmentMetadata>>() {
        @Override
        public Future<LogSegmentMetadata> apply(LogSegmentMetadata segment) {
            return deleteLogSegment(segment);
        }
    }, scheduler);
}
 
开发者ID:twitter,项目名称:distributedlog,代码行数:14,代码来源:BKLogWriteHandler.java


示例15: ensureReadLockPathExist

import com.twitter.util.Function; //导入依赖的package包/类
private Future<Void> ensureReadLockPathExist() {
    final Promise<Void> promise = new Promise<Void>();
    promise.setInterruptHandler(new com.twitter.util.Function<Throwable, BoxedUnit>() {
        @Override
        public BoxedUnit apply(Throwable t) {
            FutureUtils.setException(promise, new LockCancelledException(readLockPath, "Could not ensure read lock path", t));
            return null;
        }
    });
    Optional<String> parentPathShouldNotCreate = Optional.of(logMetadata.getLogRootPath());
    Utils.zkAsyncCreateFullPathOptimisticRecursive(zooKeeperClient, readLockPath, parentPathShouldNotCreate,
            new byte[0], zooKeeperClient.getDefaultACL(), CreateMode.PERSISTENT,
            new org.apache.zookeeper.AsyncCallback.StringCallback() {
                @Override
                public void processResult(final int rc, final String path, Object ctx, String name) {
                    scheduler.submit(new Runnable() {
                        @Override
                        public void run() {
                            if (KeeperException.Code.NONODE.intValue() == rc) {
                                FutureUtils.setException(promise, new LogNotFoundException(String.format("Log %s does not exist or has been deleted", getFullyQualifiedName())));
                            } else if (KeeperException.Code.OK.intValue() == rc) {
                                FutureUtils.setValue(promise, null);
                                LOG.trace("Created path {}.", path);
                            } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
                                FutureUtils.setValue(promise, null);
                                LOG.trace("Path {} is already existed.", path);
                            } else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE == rc) {
                                FutureUtils.setException(promise, new ZooKeeperClient.ZooKeeperConnectionException(path));
                            } else if (DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) {
                                FutureUtils.setException(promise, new DLInterruptedException(path));
                            } else {
                                FutureUtils.setException(promise, KeeperException.create(KeeperException.Code.get(rc)));
                            }
                        }
                    });
                }
            }, null);
    return promise;
}
 
开发者ID:twitter,项目名称:distributedlog,代码行数:40,代码来源:BKLogReadHandler.java


示例16: ListFutureProcessor

import com.twitter.util.Function; //导入依赖的package包/类
ListFutureProcessor(List<T> items,
                    Function<T, Future<R>> processFunc,
                    ExecutorService callbackExecutor) {
    this.itemsIter = items.iterator();
    this.processFunc = processFunc;
    this.promise = new Promise<List<R>>();
    this.promise.setInterruptHandler(this);
    this.results = new ArrayList<R>();
    this.callbackExecutor = callbackExecutor;
}
 
开发者ID:twitter,项目名称:distributedlog,代码行数:11,代码来源:FutureUtils.java


示例17: processList

import com.twitter.util.Function; //导入依赖的package包/类
/**
 * Process the list of items one by one using the process function <i>processFunc</i>.
 * The process will be stopped immediately if it fails on processing any one.
 *
 * @param collection list of items
 * @param processFunc process function
 * @param callbackExecutor executor to process the item
 * @return future presents the list of processed results
 */
public static <T, R> Future<List<R>> processList(List<T> collection,
                                                 Function<T, Future<R>> processFunc,
                                                 @Nullable ExecutorService callbackExecutor) {
    ListFutureProcessor<T, R> processor =
            new ListFutureProcessor<T, R>(collection, processFunc, callbackExecutor);
    if (null != callbackExecutor) {
        callbackExecutor.submit(processor);
    } else {
        processor.run();
    }
    return processor.promise;
}
 
开发者ID:twitter,项目名称:distributedlog,代码行数:22,代码来源:FutureUtils.java


示例18: processReaderOperation

import com.twitter.util.Function; //导入依赖的package包/类
<T> Future<T> processReaderOperation(final Function<BKLogReadHandler, Future<T>> func) {
    initializeFuturePool(false);
    return readerFuturePool.apply(new ExceptionalFunction0<BKLogReadHandler>() {
        @Override
        public BKLogReadHandler applyE() throws Throwable {
            return getReadHandlerForListener(true);
        }
    }).flatMap(new ExceptionalFunction<BKLogReadHandler, Future<T>>() {
        @Override
        public Future<T> applyE(final BKLogReadHandler readHandler) throws Throwable {
            return func.apply(readHandler);
        }
    });
}
 
开发者ID:twitter,项目名称:distributedlog,代码行数:15,代码来源:BKDistributedLogManager.java


示例19: getLastLogRecordAsyncInternal

import com.twitter.util.Function; //导入依赖的package包/类
private Future<LogRecordWithDLSN> getLastLogRecordAsyncInternal(final boolean recover,
                                                                final boolean includeEndOfStream) {
    return processReaderOperation(new Function<BKLogReadHandler, Future<LogRecordWithDLSN>>() {
        @Override
        public Future<LogRecordWithDLSN> apply(final BKLogReadHandler ledgerHandler) {
            return ledgerHandler.getLastLogRecordAsync(recover, includeEndOfStream);
        }
    });
}
 
开发者ID:twitter,项目名称:distributedlog,代码行数:10,代码来源:BKDistributedLogManager.java


示例20: getFirstRecordAsyncInternal

import com.twitter.util.Function; //导入依赖的package包/类
private Future<LogRecordWithDLSN> getFirstRecordAsyncInternal() {
    return processReaderOperation(new Function<BKLogReadHandler, Future<LogRecordWithDLSN>>() {
        @Override
        public Future<LogRecordWithDLSN> apply(final BKLogReadHandler ledgerHandler) {
            return ledgerHandler.asyncGetFirstLogRecord();
        }
    });
}
 
开发者ID:twitter,项目名称:distributedlog,代码行数:9,代码来源:BKDistributedLogManager.java



注:本文中的com.twitter.util.Function类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java XContentRestResponse类代码示例发布时间:2022-05-23
下一篇:
Java Table类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap