本文整理汇总了Java中org.apache.cassandra.concurrent.StageManager类的典型用法代码示例。如果您正苦于以下问题:Java StageManager类的具体用法?Java StageManager怎么用?Java StageManager使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
StageManager类属于org.apache.cassandra.concurrent包,在下文中一共展示了StageManager类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: deserialize
import org.apache.cassandra.concurrent.StageManager; //导入依赖的package包/类
public Future<Pair<CounterCacheKey, ClockAndCount>> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException
{
final ByteBuffer partitionKey = ByteBufferUtil.readWithLength(in);
final CellName cellName = cfs.metadata.comparator.cellFromByteBuffer(ByteBufferUtil.readWithLength(in));
return StageManager.getStage(Stage.READ).submit(new Callable<Pair<CounterCacheKey, ClockAndCount>>()
{
public Pair<CounterCacheKey, ClockAndCount> call() throws Exception
{
DecoratedKey key = cfs.partitioner.decorateKey(partitionKey);
QueryFilter filter = QueryFilter.getNamesFilter(key,
cfs.metadata.cfName,
FBUtilities.singleton(cellName, cfs.metadata.comparator),
Long.MIN_VALUE);
ColumnFamily cf = cfs.getTopLevelColumns(filter, Integer.MIN_VALUE);
if (cf == null)
return null;
Cell cell = cf.getColumn(cellName);
if (cell == null || !cell.isLive(Long.MIN_VALUE))
return null;
ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value());
return Pair.create(CounterCacheKey.create(cfs.metadata.cfId, partitionKey, cellName), clockAndCount);
}
});
}
开发者ID:vcostet,项目名称:cassandra-kmean,代码行数:25,代码来源:CacheService.java
示例2: announce
import org.apache.cassandra.concurrent.StageManager; //导入依赖的package包/类
private static Future<?> announce(final Collection<Mutation> schema)
{
Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable()
{
protected void runMayThrow() throws IOException, ConfigurationException
{
DefsTables.mergeSchema(schema);
}
});
for (InetAddress endpoint : Gossiper.instance.getLiveMembers())
{
// only push schema to nodes with known and equal versions
if (!endpoint.equals(FBUtilities.getBroadcastAddress()) &&
MessagingService.instance().knowsVersion(endpoint) &&
MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version)
pushSchemaMutation(endpoint, schema);
}
return f;
}
开发者ID:vcostet,项目名称:cassandra-kmean,代码行数:22,代码来源:MigrationManager.java
示例3: makeRequests
import org.apache.cassandra.concurrent.StageManager; //导入依赖的package包/类
private void makeRequests(ReadCommand readCommand, Iterable<InetAddress> endpoints)
{
MessageOut<ReadCommand> message = null;
boolean hasLocalEndpoint = false;
for (InetAddress endpoint : endpoints)
{
if (isLocalRequest(endpoint))
{
hasLocalEndpoint = true;
continue;
}
logger.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
if (message == null)
message = readCommand.createMessage();
MessagingService.instance().sendRR(message, endpoint, handler);
}
// We delay the local (potentially blocking) read till the end to avoid stalling remote requests.
if (hasLocalEndpoint)
{
logger.trace("reading {} locally", readCommand.isDigestQuery() ? "digest" : "data");
StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalReadRunnable(command, handler));
}
}
开发者ID:vcostet,项目名称:cassandra-kmean,代码行数:27,代码来源:AbstractReadExecutor.java
示例4: insertLocal
import org.apache.cassandra.concurrent.StageManager; //导入依赖的package包/类
private static void insertLocal(final Mutation mutation, final AbstractWriteResponseHandler responseHandler)
{
StageManager.getStage(Stage.MUTATION).maybeExecuteImmediately(new LocalMutationRunnable()
{
public void runMayThrow()
{
IMutation processed = SinkManager.processWriteRequest(mutation);
if (processed != null)
{
((Mutation) processed).apply();
responseHandler.response(null);
}
}
});
}
开发者ID:vcostet,项目名称:cassandra-kmean,代码行数:17,代码来源:StorageProxy.java
示例5: complete
import org.apache.cassandra.concurrent.StageManager; //导入依赖的package包/类
/**
* Registers the newly created tree for rendezvous in Stage.ANTIENTROPY.
*/
public void complete()
{
completeTree();
StageManager.getStage(Stage.ANTI_ENTROPY).execute(this);
if (logger.isDebugEnabled())
{
// log distribution of rows in tree
logger.debug("Validated {} partitions for {}. Partitions per leaf are:", validated, desc.sessionId);
tree.histogramOfRowCountPerLeaf().log(logger);
logger.debug("Validated {} partitions for {}. Partition sizes are:", validated, desc.sessionId);
tree.histogramOfRowSizePerLeaf().log(logger);
}
}
开发者ID:vcostet,项目名称:cassandra-kmean,代码行数:19,代码来源:Validator.java
示例6: trace
import org.apache.cassandra.concurrent.StageManager; //导入依赖的package包/类
public static void trace(final ByteBuffer sessionIdBytes, final String message, final int elapsed)
{
final String threadName = Thread.currentThread().getName();
StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
{
public void runMayThrow()
{
Mutation mutation = new Mutation(Tracing.TRACE_KS, sessionIdBytes);
ColumnFamily cells = mutation.addOrGet(CFMetaData.TraceEventsCf);
CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.make(UUIDGen.getTimeUUID()), FBUtilities.timestampMicros());
adder.add("activity", message);
adder.add("source", FBUtilities.getBroadcastAddress());
if (elapsed >= 0)
adder.add("source_elapsed", elapsed);
adder.add("thread", threadName);
Tracing.mutateWithCatch(mutation);
}
});
}
开发者ID:vcostet,项目名称:cassandra-kmean,代码行数:23,代码来源:TraceState.java
示例7: flushAES
import org.apache.cassandra.concurrent.StageManager; //导入依赖的package包/类
void flushAES() throws Exception
{
final ExecutorService stage = StageManager.getStage(Stage.ANTI_ENTROPY);
final Callable noop = new Callable<Object>()
{
public Boolean call()
{
return true;
}
};
// send two tasks through the stage: one to follow existing tasks and a second to follow tasks created by
// those existing tasks: tasks won't recursively create more tasks
stage.submit(noop).get(5000, TimeUnit.MILLISECONDS);
stage.submit(noop).get(5000, TimeUnit.MILLISECONDS);
}
开发者ID:vcostet,项目名称:cassandra-kmean,代码行数:17,代码来源:AntiEntropyServiceTestAbstract.java
示例8: announce
import org.apache.cassandra.concurrent.StageManager; //导入依赖的package包/类
private static Future<?> announce(final Collection<RowMutation> schema)
{
Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable()
{
protected void runMayThrow() throws IOException, ConfigurationException
{
DefsTables.mergeSchema(schema);
}
});
for (InetAddress endpoint : Gossiper.instance.getLiveMembers())
{
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
continue; // we've dealt with localhost already
// don't send schema to the nodes with the versions older than current major
if (MessagingService.instance().getVersion(endpoint) < MessagingService.current_version)
continue;
pushSchemaMutation(endpoint, schema);
}
return f;
}
开发者ID:pgaref,项目名称:ACaZoo,代码行数:24,代码来源:MigrationManager.java
示例9: makeDataRequests
import org.apache.cassandra.concurrent.StageManager; //导入依赖的package包/类
protected void makeDataRequests(Iterable<InetAddress> endpoints)
{
for (InetAddress endpoint : endpoints)
{
if (isLocalRequest(endpoint))
{
logger.trace("reading data locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
}
else
{
logger.trace("reading data from {}", endpoint);
MessagingService.instance().sendRR(command.createMessage(), endpoint, handler);
}
}
}
开发者ID:pgaref,项目名称:ACaZoo,代码行数:17,代码来源:AbstractReadExecutor.java
示例10: makeDigestRequests
import org.apache.cassandra.concurrent.StageManager; //导入依赖的package包/类
protected void makeDigestRequests(Iterable<InetAddress> endpoints)
{
ReadCommand digestCommand = command.copy();
digestCommand.setDigestQuery(true);
MessageOut<?> message = digestCommand.createMessage();
for (InetAddress endpoint : endpoints)
{
if (isLocalRequest(endpoint))
{
logger.trace("reading digest locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
}
else
{
logger.trace("reading digest from {}", endpoint);
MessagingService.instance().sendRR(message, endpoint, handler);
}
}
}
开发者ID:pgaref,项目名称:ACaZoo,代码行数:20,代码来源:AbstractReadExecutor.java
示例11: complete
import org.apache.cassandra.concurrent.StageManager; //导入依赖的package包/类
/**
* Registers the newly created tree for rendezvous in Stage.ANTIENTROPY.
*/
public void complete()
{
completeTree();
StageManager.getStage(Stage.ANTI_ENTROPY).execute(this);
if (logger.isDebugEnabled())
{
// log distribution of rows in tree
logger.debug("Validated " + validated + " rows into AEService tree for " + desc + " with row count distribution:");
tree.histogramOfRowCountPerLeaf().log(logger);
logger.debug("Validated " + validated + " rows into AEService tree for " + desc + " with row size distribution:");
tree.histogramOfRowSizePerLeaf().log(logger);
}
}
开发者ID:pgaref,项目名称:ACaZoo,代码行数:19,代码来源:Validator.java
示例12: RepairJob
import org.apache.cassandra.concurrent.StageManager; //导入依赖的package包/类
/**
* Create repair job to run on specific columnfamily
*/
public RepairJob(UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential)
{
this.desc = new RepairJobDesc(sessionId, keyspace, columnFamily, range);
this.isSequential = isSequential;
this.treeRequests = new RequestCoordinator<InetAddress>(isSequential)
{
public void send(InetAddress endpoint)
{
ValidationRequest request = new ValidationRequest(desc, gcBefore);
MessagingService.instance().sendOneWay(request.createMessage(), endpoint);
}
};
this.differencers = new RequestCoordinator<Differencer>(isSequential)
{
public void send(Differencer d)
{
StageManager.getStage(Stage.ANTI_ENTROPY).execute(d);
}
};
}
开发者ID:pgaref,项目名称:ACaZoo,代码行数:24,代码来源:RepairJob.java
示例13: trace
import org.apache.cassandra.concurrent.StageManager; //导入依赖的package包/类
public static void trace(final ByteBuffer sessionIdBytes, final String message, final int elapsed)
{
final ByteBuffer eventId = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes());
final String threadName = Thread.currentThread().getName();
StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
{
public void runMayThrow() throws Exception
{
CFMetaData cfMeta = CFMetaData.TraceEventsCf;
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfMeta);
Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("activity")), message);
Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source")), FBUtilities.getBroadcastAddress());
if (elapsed >= 0)
Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")), elapsed);
Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("thread")), threadName);
RowMutation mutation = new RowMutation(Tracing.TRACE_KS, sessionIdBytes, cf);
StorageProxy.mutate(Arrays.asList(mutation), ConsistencyLevel.ANY);
}
});
}
开发者ID:pgaref,项目名称:ACaZoo,代码行数:22,代码来源:TraceState.java
示例14: response
import org.apache.cassandra.concurrent.StageManager; //导入依赖的package包/类
public void response(MessageIn<ReadResponse> message)
{
resolver.preprocess(message);
int n = waitingFor(message.from)
? recievedUpdater.incrementAndGet(this)
: received;
if (n >= blockfor && resolver.isDataPresent())
{
condition.signalAll();
// kick off a background digest comparison if this is a result that (may have) arrived after
// the original resolve that get() kicks off as soon as the condition is signaled
if (blockfor < endpoints.size() && n == endpoints.size())
{
TraceState traceState = Tracing.instance.get();
if (traceState != null)
traceState.trace("Initiating read-repair");
StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner(traceState));
}
}
}
开发者ID:scylladb,项目名称:scylla-tools-java,代码行数:21,代码来源:ReadCallback.java
示例15: deserialize
import org.apache.cassandra.concurrent.StageManager; //导入依赖的package包/类
public Future<Pair<RowCacheKey, IRowCacheEntry>> deserialize(DataInputPlus in, final ColumnFamilyStore cfs) throws IOException
{
//Keyspace and CF name are deserialized by AutoSaving cache and used to fetch the CFS provided as a
//parameter so they aren't deserialized here, even though they are serialized by this serializer
final ByteBuffer buffer = ByteBufferUtil.readWithLength(in);
final int rowsToCache = cfs.metadata.params.caching.rowsPerPartitionToCache();
if (cfs == null || !cfs.isRowCacheEnabled())
return null;
assert(!cfs.isIndex());//Shouldn't have row cache entries for indexes
return StageManager.getStage(Stage.READ).submit(new Callable<Pair<RowCacheKey, IRowCacheEntry>>()
{
public Pair<RowCacheKey, IRowCacheEntry> call() throws Exception
{
DecoratedKey key = cfs.decorateKey(buffer);
int nowInSec = FBUtilities.nowInSeconds();
try (OpOrder.Group op = cfs.readOrdering.start(); UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, nowInSec, key).queryMemtableAndDisk(cfs, op))
{
CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec), nowInSec);
return Pair.create(new RowCacheKey(cfs.metadata.ksAndCFName, key), (IRowCacheEntry)toCache);
}
}
});
}
开发者ID:scylladb,项目名称:scylla-tools-java,代码行数:25,代码来源:CacheService.java
示例16: announce
import org.apache.cassandra.concurrent.StageManager; //导入依赖的package包/类
private static Future<?> announce(final Collection<Mutation> schema)
{
Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable()
{
protected void runMayThrow() throws ConfigurationException
{
SchemaKeyspace.mergeSchemaAndAnnounceVersion(schema);
}
});
for (InetAddress endpoint : Gossiper.instance.getLiveMembers())
{
// only push schema to nodes with known and equal versions
if (!endpoint.equals(FBUtilities.getBroadcastAddress()) &&
MessagingService.instance().knowsVersion(endpoint) &&
MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version)
pushSchemaMutation(endpoint, schema);
}
return f;
}
开发者ID:scylladb,项目名称:scylla-tools-java,代码行数:22,代码来源:MigrationManager.java
示例17: makeRequests
import org.apache.cassandra.concurrent.StageManager; //导入依赖的package包/类
private void makeRequests(ReadCommand readCommand, Iterable<InetAddress> endpoints)
{
boolean hasLocalEndpoint = false;
for (InetAddress endpoint : endpoints)
{
if (StorageProxy.canDoLocalRequest(endpoint))
{
hasLocalEndpoint = true;
continue;
}
if (traceState != null)
traceState.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
logger.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
MessageOut<ReadCommand> message = readCommand.createMessage(MessagingService.instance().getVersion(endpoint));
MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
}
// We delay the local (potentially blocking) read till the end to avoid stalling remote requests.
if (hasLocalEndpoint)
{
logger.trace("reading {} locally", readCommand.isDigestQuery() ? "digest" : "data");
StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalReadRunnable(command, handler));
}
}
开发者ID:scylladb,项目名称:scylla-tools-java,代码行数:27,代码来源:AbstractReadExecutor.java
示例18: performLocally
import org.apache.cassandra.concurrent.StageManager; //导入依赖的package包/类
private static void performLocally(Stage stage, final Runnable runnable)
{
StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable()
{
public void runMayThrow()
{
try
{
runnable.run();
}
catch (Exception ex)
{
logger.error("Failed to apply mutation locally : {}", ex);
}
}
@Override
protected Verb verb()
{
return MessagingService.Verb.MUTATION;
}
});
}
开发者ID:scylladb,项目名称:scylla-tools-java,代码行数:24,代码来源:StorageProxy.java
示例19: complete
import org.apache.cassandra.concurrent.StageManager; //导入依赖的package包/类
/**
* Registers the newly created tree for rendezvous in Stage.ANTIENTROPY.
*/
public void complete()
{
completeTree();
StageManager.getStage(Stage.ANTI_ENTROPY).execute(this);
if (logger.isDebugEnabled())
{
// log distribution of rows in tree
logger.debug("Validated {} partitions for {}. Partitions per leaf are:", validated, desc.sessionId);
trees.logRowCountPerLeaf(logger);
logger.debug("Validated {} partitions for {}. Partition sizes are:", validated, desc.sessionId);
trees.logRowSizePerLeaf(logger);
}
}
开发者ID:scylladb,项目名称:scylla-tools-java,代码行数:19,代码来源:Validator.java
示例20: waitForPendingEvents
import org.apache.cassandra.concurrent.StageManager; //导入依赖的package包/类
/**
* Post a no-op event to the TRACING stage, so that we can be sure that any previous mutations
* have at least been applied to one replica. This works because the tracking executor only
* has one thread in its pool, see {@link StageManager#tracingExecutor()}.
*/
protected void waitForPendingEvents()
{
if (WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS <= 0)
return;
try
{
if (logger.isTraceEnabled())
logger.trace("Waiting for up to {} seconds for trace events to complete",
+WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS);
StageManager.getStage(Stage.TRACING).submit(StageManager.NO_OP_TASK)
.get(WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS, TimeUnit.SECONDS);
}
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
logger.debug("Failed to wait for tracing events to complete: {}", t);
}
}
开发者ID:scylladb,项目名称:scylla-tools-java,代码行数:26,代码来源:TraceState.java
注:本文中的org.apache.cassandra.concurrent.StageManager类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论