本文整理汇总了Java中org.apache.samza.system.SystemStream类的典型用法代码示例。如果您正苦于以下问题:Java SystemStream类的具体用法?Java SystemStream怎么用?Java SystemStream使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
SystemStream类属于org.apache.samza.system包,在下文中一共展示了SystemStream类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: process
import org.apache.samza.system.SystemStream; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
Map<String, Object> jsonObject = (Map<String, Object>) envelope.getMessage();
WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
try {
Map<String, Object> parsedJsonObject = parse(event.getRawEvent());
parsedJsonObject.put("channel", event.getChannel());
parsedJsonObject.put("source", event.getSource());
parsedJsonObject.put("time", event.getTime());
collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-edits"), parsedJsonObject));
} catch (Exception e) {
System.err.println("Unable to parse line: " + event);
}
}
开发者ID:yoloanalytics,项目名称:bigdata-swamp,代码行数:19,代码来源:WikipediaParserStreamTask.java
示例2: StreamPartitionCountMonitor
import org.apache.samza.system.SystemStream; //导入依赖的package包/类
/**
* Default constructor.
*
* @param streamsToMonitor a set of SystemStreams to monitor.
* @param metadataCache the metadata cache which will be used to fetch metadata for partition counts.
* @param metrics the metrics registry to which the metrics should be added.
* @param monitorPeriodMs the period at which the monitor will run in milliseconds.
* @param monitorCallback the callback method to be invoked when partition count changes are detected
*/
public StreamPartitionCountMonitor(Set<SystemStream> streamsToMonitor, StreamMetadataCache metadataCache,
MetricsRegistry metrics, int monitorPeriodMs, Callback monitorCallback) {
this.streamsToMonitor = streamsToMonitor;
this.metadataCache = metadataCache;
this.monitorPeriodMs = monitorPeriodMs;
this.initialMetadata = getMetadata(streamsToMonitor, metadataCache);
this.callbackMethod = monitorCallback;
// Pre-populate the gauges
Map<SystemStream, Gauge<Integer>> mutableGauges = new HashMap<>();
for (Map.Entry<SystemStream, SystemStreamMetadata> metadataEntry : initialMetadata.entrySet()) {
SystemStream systemStream = metadataEntry.getKey();
Gauge gauge = metrics.newGauge("job-coordinator",
String.format("%s-%s-partitionCount", systemStream.getSystem(), systemStream.getStream()), 0);
mutableGauges.put(systemStream, gauge);
}
gauges = Collections.unmodifiableMap(mutableGauges);
}
开发者ID:apache,项目名称:samza,代码行数:28,代码来源:StreamPartitionCountMonitor.java
示例3: getPartitionCountMonitor
import org.apache.samza.system.SystemStream; //导入依赖的package包/类
private StreamPartitionCountMonitor getPartitionCountMonitor(Config config) {
Map<String, SystemAdmin> systemAdmins = new JavaSystemConfig(config).getSystemAdmins();
StreamMetadataCache streamMetadata = new StreamMetadataCache(Util.javaMapAsScalaMap(systemAdmins), 0, SystemClock.instance());
Set<SystemStream> inputStreamsToMonitor = new TaskConfigJava(config).getAllInputStreams();
if (inputStreamsToMonitor.isEmpty()) {
throw new SamzaException("Input streams to a job can not be empty.");
}
return new StreamPartitionCountMonitor(
inputStreamsToMonitor,
streamMetadata,
metrics,
new JobConfig(config).getMonitorPartitionChangeFrequency(),
streamsChanged -> {
// Fail the jobs with durable state store. Otherwise, application state.status remains UNDEFINED s.t. YARN job will be restarted
if (hasDurableStores) {
log.error("Input topic partition count changed in a job with durable state. Failing the job.");
state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
}
coordinatorException = new PartitionChangeException("Input topic partition count changes detected.");
});
}
开发者ID:apache,项目名称:samza,代码行数:23,代码来源:ClusterBasedJobCoordinator.java
示例4: aggregateEndOfStream
import org.apache.samza.system.SystemStream; //导入依赖的package包/类
/**
* Aggregate {@link EndOfStreamMessage} from each ssp of the stream.
* Invoke onEndOfStream() if the stream reaches the end.
* @param eos {@link EndOfStreamMessage} object
* @param ssp system stream partition
* @param collector message collector
* @param coordinator task coordinator
*/
public final void aggregateEndOfStream(EndOfStreamMessage eos, SystemStreamPartition ssp, MessageCollector collector,
TaskCoordinator coordinator) {
LOG.info("Received end-of-stream message from task {} in {}", eos.getTaskName(), ssp);
eosStates.update(eos, ssp);
SystemStream stream = ssp.getSystemStream();
if (eosStates.isEndOfStream(stream)) {
LOG.info("Input {} reaches the end for task {}", stream.toString(), taskName.getTaskName());
onEndOfStream(collector, coordinator);
if (eosStates.allEndOfStream()) {
// all inputs have been end-of-stream, shut down the task
LOG.info("All input streams have reached the end for task {}", taskName.getTaskName());
coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
}
}
}
开发者ID:apache,项目名称:samza,代码行数:27,代码来源:OperatorImpl.java
示例5: process
import org.apache.samza.system.SystemStream; //导入依赖的package包/类
/**
* Passes the incoming message envelopes along to the {@link InputOperatorImpl} node
* for the input {@link SystemStream}.
* <p>
* From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates its transformed output to
* its chained {@link org.apache.samza.operators.impl.OperatorImpl}s itself.
*
* @param ime incoming message envelope to process
* @param collector the collector to send messages with
* @param coordinator the coordinator to request commits or shutdown
*/
@Override
public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) {
SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream();
InputOperatorImpl inputOpImpl = operatorImplGraph.getInputOperator(systemStream);
if (inputOpImpl != null) {
switch (MessageType.of(ime.getMessage())) {
case USER_MESSAGE:
inputOpImpl.onMessage(KV.of(ime.getKey(), ime.getMessage()), collector, coordinator);
break;
case END_OF_STREAM:
EndOfStreamMessage eosMessage = (EndOfStreamMessage) ime.getMessage();
inputOpImpl.aggregateEndOfStream(eosMessage, ime.getSystemStreamPartition(), collector, coordinator);
break;
case WATERMARK:
WatermarkMessage watermarkMessage = (WatermarkMessage) ime.getMessage();
inputOpImpl.aggregateWatermark(watermarkMessage, ime.getSystemStreamPartition(), collector, coordinator);
break;
}
}
}
开发者ID:apache,项目名称:samza,代码行数:34,代码来源:StreamOperatorTask.java
示例6: testCoordinatorStreamSystemConsumer
import org.apache.samza.system.SystemStream; //导入依赖的package包/类
@Test
public void testCoordinatorStreamSystemConsumer() {
Map<String, String> expectedConfig = new LinkedHashMap<String, String>();
expectedConfig.put("job.id", "1234");
SystemStream systemStream = new SystemStream("system", "stream");
MockSystemConsumer systemConsumer = new MockSystemConsumer(new SystemStreamPartition(systemStream, new Partition(0)));
CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(systemStream, systemConsumer, new SinglePartitionWithoutOffsetsSystemAdmin());
assertEquals(0, systemConsumer.getRegisterCount());
consumer.register();
assertEquals(1, systemConsumer.getRegisterCount());
assertFalse(systemConsumer.isStarted());
consumer.start();
assertTrue(systemConsumer.isStarted());
try {
consumer.getConfig();
fail("Should have failed when retrieving config before bootstrapping.");
} catch (SamzaException e) {
// Expected.
}
consumer.bootstrap();
assertEquals(expectedConfig, consumer.getConfig());
assertFalse(systemConsumer.isStopped());
consumer.stop();
assertTrue(systemConsumer.isStopped());
}
开发者ID:apache,项目名称:samza,代码行数:26,代码来源:TestCoordinatorStreamSystemConsumer.java
示例7: testCoordinatorStreamSystemConsumerRegisterOnceOnly
import org.apache.samza.system.SystemStream; //导入依赖的package包/类
@Test
public void testCoordinatorStreamSystemConsumerRegisterOnceOnly() throws Exception {
Map<String, String> expectedConfig = new LinkedHashMap<String, String>();
expectedConfig.put("job.id", "1234");
SystemStream systemStream = new SystemStream("system", "stream");
MockSystemConsumer systemConsumer = new MockSystemConsumer(new SystemStreamPartition(systemStream, new Partition(0)));
CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(systemStream, systemConsumer, new SinglePartitionWithoutOffsetsSystemAdmin());
assertEquals(0, systemConsumer.getRegisterCount());
consumer.register();
assertEquals(1, systemConsumer.getRegisterCount());
assertFalse(systemConsumer.isStarted());
consumer.start();
assertTrue(systemConsumer.isStarted());
consumer.register();
assertEquals(1, systemConsumer.getRegisterCount());
}
开发者ID:apache,项目名称:samza,代码行数:17,代码来源:TestCoordinatorStreamSystemConsumer.java
示例8: testBroadcastChain
import org.apache.samza.system.SystemStream; //导入依赖的package包/类
@Test
public void testBroadcastChain() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
MessageStream<Object> inputStream = streamGraph.getInputStream("input");
inputStream.filter(mock(FilterFunction.class));
inputStream.map(mock(MapFunction.class));
TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
OperatorImplGraph opImplGraph =
new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream"));
assertEquals(2, inputOpImpl.registeredOperators.size());
assertTrue(inputOpImpl.registeredOperators.stream()
.anyMatch(opImpl -> ((OperatorImpl) opImpl).getOperatorSpec().getOpCode() == OpCode.FILTER));
assertTrue(inputOpImpl.registeredOperators.stream()
.anyMatch(opImpl -> ((OperatorImpl) opImpl).getOperatorSpec().getOpCode() == OpCode.MAP));
}
开发者ID:apache,项目名称:samza,代码行数:23,代码来源:TestOperatorImplGraph.java
示例9: SqlSystemStreamConfig
import org.apache.samza.system.SystemStream; //导入依赖的package包/类
public SqlSystemStreamConfig(String systemName, String streamName, List<String> sourceParts,
Config systemConfig) {
HashMap<String, String> streamConfigs = new HashMap<>(systemConfig);
this.systemName = systemName;
this.streamName = streamName;
this.source = getSourceFromSourceParts(sourceParts);
this.sourceParts = sourceParts;
this.systemStream = new SystemStream(systemName, streamName);
samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER);
Validate.notEmpty(samzaRelConverterName,
String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));
relSchemaProviderName = streamConfigs.get(CFG_REL_SCHEMA_PROVIDER);
// Removing the Samza SQL specific configs to get the remaining Samza configs.
streamConfigs.remove(CFG_SAMZA_REL_CONVERTER);
streamConfigs.remove(CFG_REL_SCHEMA_PROVIDER);
config = new MapConfig(streamConfigs);
}
开发者ID:apache,项目名称:samza,代码行数:24,代码来源:SqlSystemStreamConfig.java
示例10: TestAvroRelConversion
import org.apache.samza.system.SystemStream; //导入依赖的package包/类
public TestAvroRelConversion() {
Map<String, String> props = new HashMap<>();
SystemStream ss1 = new SystemStream("test", "complexRecord");
SystemStream ss2 = new SystemStream("test", "simpleRecord");
props.put(
String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, ss1.getSystem(), ss1.getStream()),
ComplexRecord.SCHEMA$.toString());
props.put(
String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA, ss2.getSystem(), ss2.getStream()),
SimpleRecord.SCHEMA$.toString());
ConfigBasedAvroRelSchemaProviderFactory factory = new ConfigBasedAvroRelSchemaProviderFactory();
complexRecordSchemProvider = (AvroRelSchemaProvider) factory.create(ss1, new MapConfig(props));
simpleRecordSchemaProvider = (AvroRelSchemaProvider) factory.create(ss2, new MapConfig(props));
complexRecordAvroRelConverter = new AvroRelConverter(ss1, complexRecordSchemProvider, new MapConfig());
simpleRecordAvroRelConverter = new AvroRelConverter(ss2, simpleRecordSchemaProvider, new MapConfig());
}
开发者ID:apache,项目名称:samza,代码行数:19,代码来源:TestAvroRelConversion.java
示例11: start
import org.apache.samza.system.SystemStream; //导入依赖的package包/类
public void start() {
serde = new JsonSerdeFactory().getSerde(SYSTEM_PRODUCER_SOURCE, config);
dropOnError = config.getBoolean(ConfigConst.DROP_ON_ERROR, false);
dropMaxRatio = config.getDouble(ConfigConst.DROP_MAX_RATIO, 0.5);
if (dropMaxRatio <= 0.0 || dropMaxRatio >= 1.0) {
throw new ConfigException(String.format("%s must be between 0.0 and 1.0", ConfigConst.DROP_MAX_RATIO));
}
boolean logDroppedMsgs = config.getBoolean(ConfigConst.ENABLE_DROPPED_MESSAGE_LOG, false);
droppedMsgStream = Optional.ofNullable(config.get(ConfigConst.DROPPED_MESSAGE_STREAM_NAME))
.map(streamName -> new SystemStream(ConfigConst.DEFAULT_SYSTEM_NAME, streamName));
if (logDroppedMsgs && !droppedMsgStream.isPresent()) {
throw new ConfigException(
String.format("No stream configured for dropped messages. Either set %s=false or %s",
ConfigConst.ENABLE_DROPPED_MESSAGE_LOG,
ConfigConst.DROPPED_MESSAGE_STREAM_NAME)
);
}
systemProducer = droppedMsgStream.map(stream -> getSystemProducer(config));
logDroppedMsgConfig();
}
开发者ID:quantiply,项目名称:rico,代码行数:21,代码来源:ErrorHandler.java
示例12: window
import org.apache.samza.system.SystemStream; //导入依赖的package包/类
@Override
public void window(MessageCollector collector,
TaskCoordinator coordinator) {
KeyValueIterator<String, String> entries = store.all();
while (entries.hasNext()) { // c
Entry<String, String> entry = entries.next();
String key = entry.getKey();
String value = entry.getValue();
if (isTimestampKey(key) && Cart.isAbandoned(value)) { // d
String shopper = extractShopper(key);
String cart = store.get(asCartKey(shopper));
AbandonedCartEvent event =
new AbandonedCartEvent(shopper, cart);
collector.send(new OutgoingMessageEnvelope(
new SystemStream("kafka", "derived-events-ch04"), event)); // e
resetShopper(shopper);
}
}
}
开发者ID:alexanderdean,项目名称:Unified-Log-Processing,代码行数:23,代码来源:AbandonedCartStreamTask.java
示例13: window
import org.apache.samza.system.SystemStream; //导入依赖的package包/类
@Override
public void window(MessageCollector collector, TaskCoordinator coordinator) {
counts.put("edits", edits);
counts.put("bytes-added", byteDiff);
counts.put("unique-titles", titles.size());
counts.put("edits-all-time", store.get("count-edits-all-time"));
collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-stats"), counts));
// Reset counts after windowing.
edits = 0;
byteDiff = 0;
titles = new HashSet<String>();
counts = new HashMap<String, Integer>();
}
开发者ID:yoloanalytics,项目名称:bigdata-swamp,代码行数:16,代码来源:WikipediaStatsStreamTask.java
示例14: getInputStreamPartitions
import org.apache.samza.system.SystemStream; //导入依赖的package包/类
/**
* For each input stream specified in config, exactly determine its
* partitions, returning a set of SystemStreamPartitions containing them all.
*/
private Set<SystemStreamPartition> getInputStreamPartitions() {
TaskConfig taskConfig = new TaskConfig(config);
scala.collection.immutable.Set<SystemStream> inputSystemStreams = taskConfig.getInputStreams();
// Get the set of partitions for each SystemStream from the stream metadata
Set<SystemStreamPartition>
sspSet = JavaConverters.mapAsJavaMapConverter(streamMetadataCache.getStreamMetadata(inputSystemStreams, true)).asJava()
.entrySet()
.stream()
.flatMap(this::mapSSMToSSP)
.collect(Collectors.toSet());
return sspSet;
}
开发者ID:apache,项目名称:samza,代码行数:19,代码来源:AzureJobCoordinator.java
示例15: mapSSMToSSP
import org.apache.samza.system.SystemStream; //导入依赖的package包/类
private Stream<SystemStreamPartition> mapSSMToSSP(Map.Entry<SystemStream, SystemStreamMetadata> ssMs) {
return ssMs.getValue()
.getSystemStreamPartitionMetadata()
.keySet()
.stream()
.map(partition -> new SystemStreamPartition(ssMs.getKey(), partition));
}
开发者ID:apache,项目名称:samza,代码行数:8,代码来源:AzureJobCoordinator.java
示例16: CoordinatorStreamSystemConsumer
import org.apache.samza.system.SystemStream; //导入依赖的package包/类
public CoordinatorStreamSystemConsumer(SystemStream coordinatorSystemStream, SystemConsumer systemConsumer, SystemAdmin systemAdmin, Serde<List<?>> keySerde, Serde<Map<String, Object>> messageSerde) {
this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0));
this.systemConsumer = systemConsumer;
this.systemAdmin = systemAdmin;
this.configMap = new HashMap();
this.isBootstrapped = false;
this.keySerde = keySerde;
this.messageSerde = messageSerde;
}
开发者ID:apache,项目名称:samza,代码行数:10,代码来源:CoordinatorStreamSystemConsumer.java
示例17: CoordinatorStreamSystemProducer
import org.apache.samza.system.SystemStream; //导入依赖的package包/类
public CoordinatorStreamSystemProducer(SystemStream systemStream, SystemProducer systemProducer, SystemAdmin systemAdmin, Serde<List<?>> keySerde, Serde<Map<String, Object>> messageSerde) {
this.systemStream = systemStream;
this.systemProducer = systemProducer;
this.systemAdmin = systemAdmin;
this.keySerde = keySerde;
this.messageSerde = messageSerde;
}
开发者ID:apache,项目名称:samza,代码行数:8,代码来源:CoordinatorStreamSystemProducer.java
示例18: getMetadata
import org.apache.samza.system.SystemStream; //导入依赖的package包/类
/**
* Gets the metadata for all the specified system streams from the provided metadata cache.
* Handles scala-java conversions.
*
* @param streamsToMonitor the set of system streams for which the metadata is needed.
* @param metadataCache the metadata cache which will be used to fetch metadata.
* @return a map from each system stream to its metadata.
*/
private static Map<SystemStream, SystemStreamMetadata> getMetadata(Set<SystemStream> streamsToMonitor,
StreamMetadataCache metadataCache) {
return JavaConverters
.mapAsJavaMapConverter(
metadataCache.getStreamMetadata(
JavaConverters.asScalaSetConverter(streamsToMonitor).asScala().toSet(),
true
)
).asJava();
}
开发者ID:apache,项目名称:samza,代码行数:19,代码来源:StreamPartitionCountMonitor.java
示例19: for
import org.apache.samza.system.SystemStream; //导入依赖的package包/类
/**
* Fetch the partitions of source/sink streams and update the StreamEdges.
* @param jobGraph {@link JobGraph}
* @param streamManager the {@link StreamManager} to interface with the streams.
*/
/* package private */ static void updateExistingPartitions(JobGraph jobGraph, StreamManager streamManager) {
Set<StreamEdge> existingStreams = new HashSet<>();
existingStreams.addAll(jobGraph.getSources());
existingStreams.addAll(jobGraph.getSinks());
Multimap<String, StreamEdge> systemToStreamEdges = HashMultimap.create();
// group the StreamEdge(s) based on the system name
existingStreams.forEach(streamEdge -> {
SystemStream systemStream = streamEdge.getSystemStream();
systemToStreamEdges.put(systemStream.getSystem(), streamEdge);
});
for (Map.Entry<String, Collection<StreamEdge>> entry : systemToStreamEdges.asMap().entrySet()) {
String systemName = entry.getKey();
Collection<StreamEdge> streamEdges = entry.getValue();
Map<String, StreamEdge> streamToStreamEdge = new HashMap<>();
// create the stream name to StreamEdge mapping for this system
streamEdges.forEach(streamEdge -> streamToStreamEdge.put(streamEdge.getSystemStream().getStream(), streamEdge));
// retrieve the partition counts for the streams in this system
Map<String, Integer> streamToPartitionCount = streamManager.getStreamPartitionCounts(systemName, streamToStreamEdge.keySet());
// set the partitions of a stream to its StreamEdge
streamToPartitionCount.forEach((stream, partitionCount) -> {
streamToStreamEdge.get(stream).setPartitionCount(partitionCount);
log.debug("Partition count is {} for stream {}", partitionCount, stream);
});
}
}
开发者ID:apache,项目名称:samza,代码行数:32,代码来源:ExecutionPlanner.java
示例20: WatermarkStates
import org.apache.samza.system.SystemStream; //导入依赖的package包/类
WatermarkStates(Set<SystemStreamPartition> ssps, Map<SystemStream, Integer> producerTaskCounts) {
Map<SystemStreamPartition, WatermarkState> states = new HashMap<>();
ssps.forEach(ssp -> {
states.put(ssp, new WatermarkState(producerTaskCounts.getOrDefault(ssp.getSystemStream(), 0)));
});
this.watermarkStates = Collections.unmodifiableMap(states);
}
开发者ID:apache,项目名称:samza,代码行数:8,代码来源:WatermarkStates.java
注:本文中的org.apache.samza.system.SystemStream类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论