本文整理汇总了Java中com.datatorrent.api.DefaultOutputPort类的典型用法代码示例。如果您正苦于以下问题:Java DefaultOutputPort类的具体用法?Java DefaultOutputPort怎么用?Java DefaultOutputPort使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
DefaultOutputPort类属于com.datatorrent.api包,在下文中一共展示了DefaultOutputPort类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: populateDAG
import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Override
public void populateDAG(DAG dag, Configuration conf)
{
RandomKeyValGenerator randGen = dag.addOperator("RandomGenerator", new RandomKeyValGenerator());
UniqueValueCount<Integer> valCount = dag.addOperator("UniqueCounter", new UniqueValueCount<Integer>());
ConsoleOutputOperator consOut = dag.addOperator("Console", new ConsoleOutputOperator());
IntegerUniqueValueCountAppender uniqueUnifier = dag.addOperator("StatefulUniqueCounter", new IntegerUniqueValueCountAppender());
dag.getOperatorMeta("StatefulUniqueCounter").getMeta(uniqueUnifier.input).getAttributes().put(Context.PortContext.STREAM_CODEC, new KeyBasedStreamCodec());
@SuppressWarnings("rawtypes")
DefaultOutputPort valOut = valCount.output;
@SuppressWarnings("rawtypes")
DefaultOutputPort uniqueOut = uniqueUnifier.output;
dag.addStream("Events", randGen.outport, valCount.input);
dag.addStream("Unified", valOut, uniqueUnifier.input);
dag.addStream("Result", uniqueOut, consOut.input);
}
开发者ID:apache,项目名称:apex-malhar,代码行数:20,代码来源:StatefulApplication.java
示例2: populateDAG
import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Override
public void populateDAG(DAG dag, Configuration conf)
{
KeyGen keyGen = dag.addOperator("KeyGenerator", new KeyGen());
UniqueValueCount<Integer> valCount = dag.addOperator("ValueCounter", new UniqueValueCount<Integer>());
IntegerUniqueValueCountAppender uniqueUnifier = dag.addOperator("Unique", new IntegerUniqueValueCountAppender());
VerifyTable verifyTable = dag.addOperator("VerifyTable", new VerifyTable());
@SuppressWarnings("rawtypes")
DefaultOutputPort valOut = valCount.output;
@SuppressWarnings("rawtypes")
DefaultOutputPort uniqueOut = uniqueUnifier.output;
dag.addStream("DataIn", keyGen.output, valCount.input);
dag.addStream("UnifyWindows", valOut, uniqueUnifier.input);
dag.addStream("ResultsOut", uniqueOut, verifyTable.input);
}
开发者ID:apache,项目名称:apex-malhar,代码行数:18,代码来源:StatefulUniqueCountTest.java
示例3: attachOutputPortToInputPort
import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
/**
* This is a utility method which is used to attach the output port of an {@link EmbeddableQueryInfoProvider} to the input port
* of the encapsulating {@link AppData.Store}.
* @param <T> The type of data emitted by the {@link EmbeddableQueryInfoProvider}'s output port and received by the
* {@link AppData.Store}'s input port.
* @param outputPort The output port of the {@link EmbeddableQueryInfoProvider} which is being used by an {@link AppData.Store}.
* @param inputPort The input port of the {@link AppData.Store} which is using an {@link EmbeddableQueryInfoProvider}.
*/
public static <T> void attachOutputPortToInputPort(DefaultOutputPort<T> outputPort, final DefaultInputPort<T> inputPort)
{
outputPort.setSink(new Sink<Object>()
{
@Override
@SuppressWarnings("unchecked")
public void put(Object tuple)
{
LOG.debug("processing tuple");
inputPort.process((T)tuple);
}
@Override
public int getCount(boolean reset)
{
return 0;
}
});
}
开发者ID:apache,项目名称:apex-malhar,代码行数:29,代码来源:StoreUtils.java
示例4: test
import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void test()
{
DimensionQueryResultMergeUnifier unifier = new DimensionQueryResultMergeUnifier();
DefaultOutputPort<String> output = unifier.output;
SimpleSink<Object> simpleSink = new SimpleSink<Object>();
output.setSink(simpleSink);
unifier.beginWindow(1);
for (List<String> tuples : idToTuplesMap.values()) {
for (String tuple : tuples) {
unifier.process(tuple);
}
}
unifier.endWindow();
//verify
assertCollectionSame(simpleSink.data, (Collection)expectedIdToTuple.values());
}
开发者ID:DataTorrent,项目名称:Megh,代码行数:21,代码来源:DimensionQueryResultMergeUnifierTester.java
示例5: output
import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
@Override
public <T> void output(TupleTag<T> tag, WindowedValue<T> tuple) {
DefaultOutputPort<ApexStreamTuple<?>> additionalOutputPort =
additionalOutputPortMapping.get(tag);
if (additionalOutputPort != null) {
additionalOutputPort.emit(ApexStreamTuple.DataTuple.of(tuple));
} else {
output.emit(ApexStreamTuple.DataTuple.of(tuple));
}
if (traceTuples) {
LOG.debug("\nemitting {}\n", tuple);
}
}
开发者ID:apache,项目名称:beam,代码行数:14,代码来源:ApexParDoOperator.java
示例6: outputWatermark
import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
private void outputWatermark(ApexStreamTuple.WatermarkTuple<?> mark) {
if (traceTuples) {
LOG.debug("\nemitting {}\n", mark);
}
output.emit(mark);
if (!additionalOutputPortMapping.isEmpty()) {
for (DefaultOutputPort<ApexStreamTuple<?>> additionalOutput :
additionalOutputPortMapping.values()) {
additionalOutput.emit(mark);
}
}
}
开发者ID:apache,项目名称:beam,代码行数:13,代码来源:ApexParDoOperator.java
示例7: emitCreditCardKeyTuple
import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
private void emitCreditCardKeyTuple(MerchantTransaction tuple, DefaultOutputPort<KeyValPair<MerchantKey, CreditCardData>> outputPort)
{
MerchantKey key = getMerchantKey(tuple);
CreditCardData data = new CreditCardData();
data.fullCcNum = tuple.fullCcNum;
data.amount = tuple.amount;
KeyValPair<MerchantKey, CreditCardData> keyValPair = new KeyValPair<MerchantKey, CreditCardData>(key, data);
outputPort.emit(keyValPair);
}
开发者ID:apache,项目名称:apex-malhar,代码行数:12,代码来源:MerchantTransactionBucketOperator.java
示例8: QueryManagerAsynchronous
import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
public QueryManagerAsynchronous(DefaultOutputPort<String> resultPort,
QueueManager<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> queueManager,
QueryExecutor<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT, RESULT> queryExecutor,
MessageSerializerFactory messageSerializerFactory,
Thread mainThread)
{
setResultPort(resultPort);
setQueueManager(queueManager);
setQueryExecutor(queryExecutor);
setMessageSerializerFactory(messageSerializerFactory);
setMainThread(mainThread);
}
开发者ID:apache,项目名称:apex-malhar,代码行数:13,代码来源:QueryManagerAsynchronous.java
示例9: populateDAG
import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
@Override
public void populateDAG(DAG dag, Configuration configuration)
{
redisServer = configuration.get("dt.application.AppWithDCWithoutDe.redisServer");
DefaultOutputPort<DimensionTuple> upstreamOutput = populateUpstreamDAG(dag, configuration);
//populateHardCodedDimensionsDAG(dag, configuration, generateOperator.outputPort);
populateDimensionsDAG(dag, configuration, upstreamOutput);
}
开发者ID:yahoo,项目名称:streaming-benchmarks,代码行数:11,代码来源:ApplicationWithDCWithoutDeserializer.java
示例10: populateUpstreamDAG
import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
public DefaultOutputPort<DimensionTuple> populateUpstreamDAG(DAG dag, Configuration configuration)
{
JsonGenerator eventGenerator = dag.addOperator("eventGenerator", new JsonGenerator());
FilterTuples filterTuples = dag.addOperator("filterTuples", new FilterTuples());
FilterFields filterFields = dag.addOperator("filterFields", new FilterFields());
// Connect the Ports in the Operators
dag.addStream("filterTuples", eventGenerator.out, filterTuples.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
dag.addStream("filterFields", filterTuples.output, filterFields.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
TupleToDimensionTupleConverter converter = dag.addOperator("converter", new TupleToDimensionTupleConverter());
if(includeRedisJoin) {
RedisJoin redisJoin = dag.addOperator("redisJoin", new RedisJoin());
dag.addStream("redisJoin", filterFields.output, redisJoin.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
dag.addStream("converter", redisJoin.output, converter.inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
dag.setInputPortAttribute(redisJoin.input, Context.PortContext.PARTITION_PARALLEL, true);
setupRedis(eventGenerator.getCampaigns());
} else {
dag.addStream("convert", filterFields.output, converter.inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
}
dag.setInputPortAttribute(filterTuples.input, Context.PortContext.PARTITION_PARALLEL, true);
dag.setInputPortAttribute(filterFields.input, Context.PortContext.PARTITION_PARALLEL, true);
dag.setInputPortAttribute(converter.inputPort, Context.PortContext.PARTITION_PARALLEL, true);
dag.setAttribute(eventGenerator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<EventGenerator>(PARTITION_NUM));
return converter.outputPort;
}
开发者ID:yahoo,项目名称:streaming-benchmarks,代码行数:34,代码来源:ApplicationWithDCWithoutDeserializer.java
示例11: emitMerchantKeyTuple
import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
private void emitMerchantKeyTuple(MerchantTransaction tuple, DefaultOutputPort<KeyValPair<MerchantKey, Long>> outputPort)
{
MerchantKey key = getMerchantKey(tuple);
KeyValPair<MerchantKey, Long> keyValPair = new KeyValPair<MerchantKey, Long>(key, tuple.amount);
outputPort.emit(keyValPair);
}
开发者ID:apache,项目名称:apex-malhar,代码行数:7,代码来源:MerchantTransactionBucketOperator.java
示例12: emitBankIdNumTuple
import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
private void emitBankIdNumTuple(MerchantTransaction tuple, DefaultOutputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>> outputPort)
{
MerchantKey key = getMerchantKey(tuple);
KeyValPair<MerchantKey, String> keyValPair = new KeyValPair<MerchantKey, String>(key, tuple.bankIdNum);
outputPort.emit(new KeyValPair<KeyValPair<MerchantKey, String>, Integer>(keyValPair, 1));
}
开发者ID:apache,项目名称:apex-malhar,代码行数:7,代码来源:MerchantTransactionBucketOperator.java
示例13: getOutputPortOfGenerator
import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
@Override
protected DefaultOutputPort getOutputPortOfGenerator( StringGeneratorInputOperator generator )
{
return generator.outputPort;
}
开发者ID:apache,项目名称:apex-malhar,代码行数:6,代码来源:KinesisStringOutputOperatorTest.java
示例14: getOutputPortOfGenerator
import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
@Override
protected DefaultOutputPort getOutputPortOfGenerator(POJOTupleGenerateOperator generator)
{
return generator.outputPort;
}
开发者ID:apache,项目名称:apex-malhar,代码行数:6,代码来源:KinesisByteArrayOutputOperatorTest.java
示例15: setResultPort
import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
private void setResultPort(DefaultOutputPort<String> resultPort)
{
this.resultPort = Preconditions.checkNotNull(resultPort);
}
开发者ID:apache,项目名称:apex-malhar,代码行数:5,代码来源:QueryManagerAsynchronous.java
示例16: getOutputPort
import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
@Override
public DefaultOutputPort<String> getOutputPort()
{
return outputPort;
}
开发者ID:apache,项目名称:apex-malhar,代码行数:6,代码来源:PubSubWebSocketAppDataQuery.java
示例17: stressTest
import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
@Test
public void stressTest() throws Exception
{
final int totalTuples = 100000;
final int batchSize = 100;
final double waitMillisProb = .01;
AppDataWindowEndQueueManager<MockQuery, Void> queueManager = new AppDataWindowEndQueueManager<MockQuery, Void>();
DefaultOutputPort<String> outputPort = new DefaultOutputPort<String>();
CollectorTestSink<MockResult> sink = new CollectorTestSink<MockResult>();
TestUtils.setSink(outputPort, sink);
MessageSerializerFactory msf = new MessageSerializerFactory(new ResultFormatter());
QueryManagerAsynchronous<MockQuery, Void, MutableLong, MockResult> queryManagerAsynch =
new QueryManagerAsynchronous<>(outputPort, queueManager, new NOPQueryExecutor(waitMillisProb), msf,
Thread.currentThread());
Thread producerThread = new Thread(new ProducerThread(queueManager, totalTuples, batchSize, waitMillisProb));
producerThread.start();
producerThread.setName("Producer Thread");
long startTime = System.currentTimeMillis();
queryManagerAsynch.setup(null);
int numWindows = 0;
for (; sink.collectedTuples.size() < totalTuples && ((System.currentTimeMillis() - startTime) < 60000);
numWindows++) {
queryManagerAsynch.beginWindow(numWindows);
Thread.sleep(100);
queryManagerAsynch.endWindow();
}
producerThread.stop();
queryManagerAsynch.teardown();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//Do Nothing
}
Assert.assertEquals(totalTuples, sink.collectedTuples.size());
}
开发者ID:apache,项目名称:apex-malhar,代码行数:48,代码来源:QueryManagerAsynchronousTest.java
示例18: populateDimensionsDAG
import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
public void populateDimensionsDAG(DAG dag, Configuration conf, DefaultOutputPort<DimensionTuple> upstreamPort)
{
final String eventSchema = SchemaUtils.jarResourceFileToString(eventSchemaLocation);
// dimension
DimensionsComputationFlexibleSingleSchemaPOJO dimensions = dag.addOperator("DimensionsComputation",
DimensionsComputationFlexibleSingleSchemaPOJO.class);
// Set operator properties
// key expression
{
Map<String, String> keyToExpression = Maps.newHashMap();
keyToExpression.put("campaignId", DimensionTuple.CAMPAIGNID);
keyToExpression.put("time", DimensionTuple.EVENTTIME);
dimensions.setKeyToExpression(keyToExpression);
}
// aggregate expression
{
Map<String, String> valueToExpression = Maps.newHashMap();
valueToExpression.put("clicks", DimensionTuple.CLICKS);
valueToExpression.put("latency", DimensionTuple.LATENCY);
dimensions.setAggregateToExpression(valueToExpression);
}
// event schema
dimensions.setConfigurationSchemaJSON(eventSchema);
dimensions.setUnifier(new DimensionsComputationUnifierImpl<InputEvent, Aggregate>());
dag.setUnifierAttribute(dimensions.output, OperatorContext.MEMORY_MB, 10240);
dag.setInputPortAttribute(dimensions.input, Context.PortContext.PARTITION_PARALLEL, true);
// store
AppDataSingleSchemaDimensionStoreHDHT store = createStore(dag, conf, eventSchema);
store.setCacheWindowDuration(10000 * 5 / STREAMING_WINDOW_SIZE_MILLIS); //cache for 5 windows
dag.addStream("GenerateStream", upstreamPort, dimensions.input).setLocality(Locality.CONTAINER_LOCAL);
StoreStreamCodec codec = new StoreStreamCodec();
dag.setInputPortAttribute(store.input, PortContext.STREAM_CODEC, codec);
dag.addStream("DimensionalStream", dimensions.output, store.input);
if (includeQuery) {
createQuery(dag, conf, store);
// wsOut
PubSubWebSocketAppDataResult wsOut = createQueryResult(dag, conf, store);
dag.addStream("QueryResult", store.queryResult, wsOut.input);
} else {
DevNull devNull = new DevNull();
dag.addOperator("devNull", devNull);
dag.addStream("QueryResult", store.queryResult, devNull.data);
}
dag.setAttribute(DAGContext.STREAMING_WINDOW_SIZE_MILLIS, STREAMING_WINDOW_SIZE_MILLIS);
}
开发者ID:yahoo,项目名称:streaming-benchmarks,代码行数:60,代码来源:ApplicationDimensionComputation.java
示例19: getOutputPort
import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
/**
* Gets the output port for queries.
* @return The output port for queries.
*/
DefaultOutputPort<QUERY_TYPE> getOutputPort();
开发者ID:apache,项目名称:apex-core,代码行数:6,代码来源:AppData.java
示例20: getOutputPortOfGenerator
import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
protected abstract DefaultOutputPort getOutputPortOfGenerator( G generator );
开发者ID:apache,项目名称:apex-malhar,代码行数:2,代码来源:KinesisOutputOperatorTest.java
注:本文中的com.datatorrent.api.DefaultOutputPort类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论