本文整理汇总了Java中org.apache.samza.task.MessageCollector类的典型用法代码示例。如果您正苦于以下问题:Java MessageCollector类的具体用法?Java MessageCollector怎么用?Java MessageCollector使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
MessageCollector类属于org.apache.samza.task包,在下文中一共展示了MessageCollector类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: process
import org.apache.samza.task.MessageCollector; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
Map<String, Object> jsonObject = (Map<String, Object>) envelope.getMessage();
WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
try {
Map<String, Object> parsedJsonObject = parse(event.getRawEvent());
parsedJsonObject.put("channel", event.getChannel());
parsedJsonObject.put("source", event.getSource());
parsedJsonObject.put("time", event.getTime());
collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-edits"), parsedJsonObject));
} catch (Exception e) {
System.err.println("Unable to parse line: " + event);
}
}
开发者ID:yoloanalytics,项目名称:bigdata-swamp,代码行数:19,代码来源:WikipediaParserStreamTask.java
示例2: handleMessage
import org.apache.samza.task.MessageCollector; //导入依赖的package包/类
@Override
public Collection<JM> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) {
try {
KeyValueStore<K, TimestampedValue<M>> thisState = thisPartialJoinFn.getState();
KeyValueStore<K, TimestampedValue<OM>> otherState = otherPartialJoinFn.getState();
K key = thisPartialJoinFn.getKey(message);
thisState.put(key, new TimestampedValue<>(message, clock.currentTimeMillis()));
TimestampedValue<OM> otherMessage = otherState.get(key);
long now = clock.currentTimeMillis();
if (otherMessage != null && otherMessage.getTimestamp() > now - ttlMs) {
JM joinResult = thisPartialJoinFn.apply(message, otherMessage.getValue());
return Collections.singletonList(joinResult);
}
} catch (Exception e) {
throw new SamzaException("Error handling message in PartialJoinOperatorImpl " + getOpImplId(), e);
}
return Collections.emptyList();
}
开发者ID:apache,项目名称:samza,代码行数:21,代码来源:PartialJoinOperatorImpl.java
示例3: handleTimer
import org.apache.samza.task.MessageCollector; //导入依赖的package包/类
@Override
public Collection<WindowPane<K, Object>> handleTimer(MessageCollector collector, TaskCoordinator coordinator) {
LOG.trace("Processing timer.");
List<WindowPane<K, Object>> results = new ArrayList<>();
List<TriggerKey<K>> keys = triggerScheduler.runPendingCallbacks();
for (TriggerKey<K> key : keys) {
TriggerImplHandler triggerImplHandler = triggers.get(key);
if (triggerImplHandler != null) {
Optional<WindowPane<K, Object>> maybeTriggeredPane = triggerImplHandler.onTimer(key, collector, coordinator);
maybeTriggeredPane.ifPresent(results::add);
}
}
LOG.trace("Triggered panes: " + results.size());
return results;
}
开发者ID:apache,项目名称:samza,代码行数:17,代码来源:WindowOperatorImpl.java
示例4: onMessage
import org.apache.samza.task.MessageCollector; //导入依赖的package包/类
public Optional<WindowPane<K, Object>> onMessage(TriggerKey<K> triggerKey, M message,
MessageCollector collector, TaskCoordinator coordinator) {
if (!isCancelled) {
LOG.trace("Forwarding callbacks for {}", message);
impl.onMessage(message, triggerScheduler);
if (impl.shouldFire()) {
// repeating trigger can trigger multiple times, So, clear the state to allow future triggerings.
if (impl instanceof RepeatingTriggerImpl) {
((RepeatingTriggerImpl<M, K>) impl).clear();
}
return onTriggerFired(triggerKey, collector, coordinator);
}
}
return Optional.empty();
}
开发者ID:apache,项目名称:samza,代码行数:17,代码来源:WindowOperatorImpl.java
示例5: aggregateEndOfStream
import org.apache.samza.task.MessageCollector; //导入依赖的package包/类
/**
* Aggregate {@link EndOfStreamMessage} from each ssp of the stream.
* Invoke onEndOfStream() if the stream reaches the end.
* @param eos {@link EndOfStreamMessage} object
* @param ssp system stream partition
* @param collector message collector
* @param coordinator task coordinator
*/
public final void aggregateEndOfStream(EndOfStreamMessage eos, SystemStreamPartition ssp, MessageCollector collector,
TaskCoordinator coordinator) {
LOG.info("Received end-of-stream message from task {} in {}", eos.getTaskName(), ssp);
eosStates.update(eos, ssp);
SystemStream stream = ssp.getSystemStream();
if (eosStates.isEndOfStream(stream)) {
LOG.info("Input {} reaches the end for task {}", stream.toString(), taskName.getTaskName());
onEndOfStream(collector, coordinator);
if (eosStates.allEndOfStream()) {
// all inputs have been end-of-stream, shut down the task
LOG.info("All input streams have reached the end for task {}", taskName.getTaskName());
coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
}
}
}
开发者ID:apache,项目名称:samza,代码行数:27,代码来源:OperatorImpl.java
示例6: joinFnInitAndClose
import org.apache.samza.task.MessageCollector; //导入依赖的package包/类
@Test
public void joinFnInitAndClose() throws Exception {
TestJoinFunction joinFn = new TestJoinFunction();
TestJoinStreamApplication app = new TestJoinStreamApplication(joinFn);
StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
assertEquals(1, joinFn.getNumInitCalls());
MessageCollector messageCollector = mock(MessageCollector.class);
// push messages to first stream
numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
// close should not be called till now
assertEquals(0, joinFn.getNumCloseCalls());
sot.close();
// close should be called from sot.close()
assertEquals(1, joinFn.getNumCloseCalls());
}
开发者ID:apache,项目名称:samza,代码行数:19,代码来源:TestJoinOperator.java
示例7: joinRetainsLatestMessageForKey
import org.apache.samza.task.MessageCollector; //导入依赖的package包/类
@Test
public void joinRetainsLatestMessageForKey() throws Exception {
TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction());
StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
// push messages to first stream
numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
// push messages to first stream again with same keys but different values
numbers.forEach(n -> sot.process(new FirstStreamIME(n, 2 * n), messageCollector, taskCoordinator));
// push messages to second stream with same key
numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
int outputSum = output.stream().reduce(0, (s, m) -> s + m);
assertEquals(165, outputSum); // should use latest messages in the first stream
}
开发者ID:apache,项目名称:samza,代码行数:18,代码来源:TestJoinOperator.java
示例8: joinRetainsLatestMessageForKeyReverse
import org.apache.samza.task.MessageCollector; //导入依赖的package包/类
@Test
public void joinRetainsLatestMessageForKeyReverse() throws Exception {
TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction());
StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
// push messages to second stream
numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
// push messages to second stream again with same keys but different values
numbers.forEach(n -> sot.process(new SecondStreamIME(n, 2 * n), messageCollector, taskCoordinator));
// push messages to first stream with same key
numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
int outputSum = output.stream().reduce(0, (s, m) -> s + m);
assertEquals(165, outputSum); // should use latest messages in the second stream
}
开发者ID:apache,项目名称:samza,代码行数:18,代码来源:TestJoinOperator.java
示例9: joinRetainsMatchedMessages
import org.apache.samza.task.MessageCollector; //导入依赖的package包/类
@Test
public void joinRetainsMatchedMessages() throws Exception {
TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction());
StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
// push messages to first stream
numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
// push messages to second stream with same key
numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
int outputSum = output.stream().reduce(0, (s, m) -> s + m);
assertEquals(110, outputSum);
output.clear();
// push messages to first stream with same keys once again.
numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
int newOutputSum = output.stream().reduce(0, (s, m) -> s + m);
assertEquals(110, newOutputSum); // should produce the same output as before
}
开发者ID:apache,项目名称:samza,代码行数:23,代码来源:TestJoinOperator.java
示例10: joinRetainsMatchedMessagesReverse
import org.apache.samza.task.MessageCollector; //导入依赖的package包/类
@Test
public void joinRetainsMatchedMessagesReverse() throws Exception {
TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction());
StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
// push messages to first stream
numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
// push messages to second stream with same key
numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
int outputSum = output.stream().reduce(0, (s, m) -> s + m);
assertEquals(110, outputSum);
output.clear();
// push messages to second stream with same keys once again.
numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
int newOutputSum = output.stream().reduce(0, (s, m) -> s + m);
assertEquals(110, newOutputSum); // should produce the same output as before
}
开发者ID:apache,项目名称:samza,代码行数:23,代码来源:TestJoinOperator.java
示例11: joinRemovesExpiredMessages
import org.apache.samza.task.MessageCollector; //导入依赖的package包/类
@Test
public void joinRemovesExpiredMessages() throws Exception {
TestClock testClock = new TestClock();
TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction());
StreamOperatorTask sot = createStreamOperatorTask(testClock, app);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
// push messages to first stream
numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
testClock.advanceTime(JOIN_TTL.plus(Duration.ofMinutes(1))); // 1 minute after ttl
sot.window(messageCollector, taskCoordinator); // should expire first stream messages
// push messages to second stream with same key
numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
assertTrue(output.isEmpty());
}
开发者ID:apache,项目名称:samza,代码行数:20,代码来源:TestJoinOperator.java
示例12: joinRemovesExpiredMessagesReverse
import org.apache.samza.task.MessageCollector; //导入依赖的package包/类
@Test
public void joinRemovesExpiredMessagesReverse() throws Exception {
TestClock testClock = new TestClock();
TestJoinStreamApplication app = new TestJoinStreamApplication(new TestJoinFunction());
StreamOperatorTask sot = createStreamOperatorTask(testClock, app);
List<Integer> output = new ArrayList<>();
MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
// push messages to second stream
numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
testClock.advanceTime(JOIN_TTL.plus(Duration.ofMinutes(1))); // 1 minute after ttl
sot.window(messageCollector, taskCoordinator); // should expire second stream messages
// push messages to first stream with same key
numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
assertTrue(output.isEmpty());
}
开发者ID:apache,项目名称:samza,代码行数:20,代码来源:TestJoinOperator.java
示例13: testStreamOperator
import org.apache.samza.task.MessageCollector; //导入依赖的package包/类
@Test
@SuppressWarnings("unchecked")
public void testStreamOperator() {
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
when(mockOp.getTransformFn()).thenReturn(txfmFn);
Config mockConfig = mock(Config.class);
TaskContext mockContext = mock(TaskContext.class);
StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl =
new StreamOperatorImpl<>(mockOp, mockConfig, mockContext);
TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class);
Collection<TestOutputMessageEnvelope> mockOutputs = mock(Collection.class);
when(txfmFn.apply(inMsg)).thenReturn(mockOutputs);
MessageCollector mockCollector = mock(MessageCollector.class);
TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
Collection<TestOutputMessageEnvelope> results = opImpl
.handleMessage(inMsg, mockCollector, mockCoordinator);
verify(txfmFn, times(1)).apply(inMsg);
assertEquals(results, mockOutputs);
}
开发者ID:apache,项目名称:samza,代码行数:21,代码来源:TestStreamOperatorImpl.java
示例14: testSinkOperatorClose
import org.apache.samza.task.MessageCollector; //导入依赖的package包/类
@Test
public void testSinkOperatorClose() {
TestOutputMessageEnvelope mockMsg = mock(TestOutputMessageEnvelope.class);
MessageCollector mockCollector = mock(MessageCollector.class);
TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
SinkFunction<TestOutputMessageEnvelope> sinkFn = mock(SinkFunction.class);
SinkOperatorImpl<TestOutputMessageEnvelope> sinkImpl = createSinkOperator(sinkFn);
sinkImpl.handleMessage(mockMsg, mockCollector, mockCoordinator);
verify(sinkFn, times(1)).apply(mockMsg, mockCollector, mockCoordinator);
// ensure that close is not called yet
verify(sinkFn, times(0)).close();
sinkImpl.handleClose();
// ensure that close is called once from handleClose()
verify(sinkFn, times(1)).close();
}
开发者ID:apache,项目名称:samza,代码行数:19,代码来源:TestSinkOperatorImpl.java
示例15: testNonKeyedTumblingWindowsDiscardingMode
import org.apache.samza.task.MessageCollector; //导入依赖的package包/类
@Test
public void testNonKeyedTumblingWindowsDiscardingMode() throws Exception {
StreamApplication sgb = new TumblingWindowStreamApplication(AccumulationMode.DISCARDING,
Duration.ofSeconds(1), Triggers.repeat(Triggers.count(1000)));
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
TestClock testClock = new TestClock();
StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
task.init(config, taskContext);
MessageCollector messageCollector =
envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
Assert.assertEquals(windowPanes.size(), 0);
integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
Assert.assertEquals(windowPanes.size(), 0);
testClock.advanceTime(Duration.ofSeconds(1));
Assert.assertEquals(windowPanes.size(), 0);
task.window(messageCollector, taskCoordinator);
Assert.assertEquals(windowPanes.size(), 1);
Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 9);
}
开发者ID:apache,项目名称:samza,代码行数:26,代码来源:TestWindowOperator.java
示例16: testTumblingAggregatingWindowsDiscardingMode
import org.apache.samza.task.MessageCollector; //导入依赖的package包/类
@Test
public void testTumblingAggregatingWindowsDiscardingMode() throws Exception {
when(taskContext.getStore("jobName-jobId-window-w1"))
.thenReturn(new TestInMemoryStore<>(new TimeSeriesKeySerde(new IntegerSerde()), new IntegerSerde()));
StreamApplication sgb = new AggregateTumblingWindowStreamApplication(AccumulationMode.DISCARDING,
Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
List<WindowPane<Integer, Integer>> windowPanes = new ArrayList<>();
TestClock testClock = new TestClock();
StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
task.init(config, taskContext);
MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Integer>) envelope.getMessage());
integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
testClock.advanceTime(Duration.ofSeconds(1));
task.window(messageCollector, taskCoordinator);
Assert.assertEquals(windowPanes.size(), 5);
Assert.assertEquals(windowPanes.get(0).getMessage(), new Integer(2));
Assert.assertEquals(windowPanes.get(1).getMessage(), new Integer(2));
Assert.assertEquals(windowPanes.get(2).getMessage(), new Integer(2));
Assert.assertEquals(windowPanes.get(3).getMessage(), new Integer(2));
Assert.assertEquals(windowPanes.get(4).getMessage(), new Integer(1));
}
开发者ID:apache,项目名称:samza,代码行数:26,代码来源:TestWindowOperator.java
示例17: testOnMessageUpdatesMetrics
import org.apache.samza.task.MessageCollector; //导入依赖的package包/类
@Test
public void testOnMessageUpdatesMetrics() {
TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
ReadableMetricsRegistry mockMetricsRegistry = mock(ReadableMetricsRegistry.class);
when(mockTaskContext.getMetricsRegistry()).thenReturn(mockMetricsRegistry);
Counter mockCounter = mock(Counter.class);
Timer mockTimer = mock(Timer.class);
when(mockMetricsRegistry.newCounter(anyString(), anyString())).thenReturn(mockCounter);
when(mockMetricsRegistry.newTimer(anyString(), anyString())).thenReturn(mockTimer);
Object mockTestOpImplOutput = mock(Object.class);
OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
opImpl.init(mock(Config.class), mockTaskContext);
// send a message to this operator
MessageCollector mockCollector = mock(MessageCollector.class);
TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
opImpl.onMessage(mock(Object.class), mockCollector, mockCoordinator);
// verify that it updates message count and timer metrics
verify(mockCounter, times(1)).inc();
verify(mockTimer, times(1)).update(anyLong());
}
开发者ID:apache,项目名称:samza,代码行数:24,代码来源:TestOperatorImpl.java
示例18: testOnTimerUpdatesMetrics
import org.apache.samza.task.MessageCollector; //导入依赖的package包/类
@Test
public void testOnTimerUpdatesMetrics() {
TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
ReadableMetricsRegistry mockMetricsRegistry = mock(ReadableMetricsRegistry.class);
when(mockTaskContext.getMetricsRegistry()).thenReturn(mockMetricsRegistry);
Counter mockMessageCounter = mock(Counter.class);
Timer mockTimer = mock(Timer.class);
when(mockMetricsRegistry.newCounter(anyString(), anyString())).thenReturn(mockMessageCounter);
when(mockMetricsRegistry.newTimer(anyString(), anyString())).thenReturn(mockTimer);
Object mockTestOpImplOutput = mock(Object.class);
OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
opImpl.init(mock(Config.class), mockTaskContext);
// send a message to this operator
MessageCollector mockCollector = mock(MessageCollector.class);
TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
opImpl.onTimer(mockCollector, mockCoordinator);
// verify that it updates metrics
verify(mockMessageCounter, times(0)).inc();
verify(mockTimer, times(1)).update(anyLong());
}
开发者ID:apache,项目名称:samza,代码行数:24,代码来源:TestOperatorImpl.java
示例19: process
import org.apache.samza.task.MessageCollector; //导入依赖的package包/类
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
if (envelope.getSystemStreamPartition().getStream().equals("epoch")) {
int newEpoch = Integer.parseInt((String) envelope.getMessage());
logger.info("New epoch in message - " + newEpoch);
Integer epoch = getInt(EPOCH);
if (epoch == null || newEpoch == epoch)
return;
if (newEpoch < epoch)
throw new IllegalArgumentException("Got new epoch " + newEpoch + " which is less than current epoch " + epoch);
// it's a new era, reset current epoch and count
logger.info("Epoch: " + newEpoch);
this.state.put(EPOCH, Integer.toString(newEpoch));
this.state.put(COUNT, "0");
coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER);
}
}
开发者ID:apache,项目名称:samza,代码行数:20,代码来源:Emitter.java
示例20: send
import org.apache.samza.task.MessageCollector; //导入依赖的package包/类
public void send(MessageCollector collector, ContentEvent contentEvent) {
if (actualSystemStream == null)
this.initSystemStream();
switch (this.scheme) {
case SHUFFLE:
this.sendShuffle(collector, contentEvent);
break;
case GROUP_BY_KEY:
this.sendGroupByKey(collector, contentEvent);
break;
case BROADCAST:
this.sendBroadcast(collector, contentEvent);
break;
}
}
开发者ID:apache,项目名称:incubator-samoa,代码行数:17,代码来源:SamzaStream.java
注:本文中的org.apache.samza.task.MessageCollector类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论