本文整理汇总了Java中org.apache.samza.config.Config类的典型用法代码示例。如果您正苦于以下问题:Java Config类的具体用法?Java Config怎么用?Java Config使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Config类属于org.apache.samza.config包,在下文中一共展示了Config类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: initTest
import org.apache.samza.config.Config; //导入依赖的package包/类
@BeforeClass
public static void initTest() throws IOException {
InputStream inputStream = new FileInputStream("src/test/resources/streaming.properties");
properties.load(inputStream);
config = mock(Config.class);
when(config.getList("redborder.stores", Collections.<String>emptyList())).thenReturn(stores);
String storesListAsString = properties.getProperty("redborder.stores");
for (String store : storesListAsString.split(",")) {
List<String> keys = Arrays.asList(properties.getProperty("redborder.stores." + store + ".keys").split(","));
String storeOverwriteStr = properties.getProperty("redborder.stores." + store + ".overwrite");
boolean storeOverwrite = (storeOverwriteStr == null || storeOverwriteStr.equals("true"));
when(config.getList("redborder.stores." + store + ".keys", Collections.singletonList("key"))).thenReturn(keys);
when(config.getBoolean("redborder.stores." + store + ".overwrite", true)).thenReturn(storeOverwrite);
stores.add(store);
}
context = new MockTaskContext();
storeManager = new StoreManager(config, context);
}
开发者ID:redBorder,项目名称:rb-samza-streaming,代码行数:23,代码来源:EnrichManagerTest.java
示例2: initTest
import org.apache.samza.config.Config; //导入依赖的package包/类
@BeforeClass
public static void initTest() throws IOException {
InputStream inputStream = new FileInputStream("src/test/resources/streaming.properties");
properties.load(inputStream);
config = mock(Config.class);
when(config.getList("redborder.stores", Collections.<String>emptyList())).thenReturn(stores);
String storesListAsString = properties.getProperty("redborder.stores");
for (String store : storesListAsString.split(",")) {
List<String> keys = Arrays.asList(properties.getProperty("redborder.stores." + store + ".keys").split(","));
String storeOverwriteStr = properties.getProperty("redborder.stores." + store + ".overwrite");
boolean storeOverwrite = (storeOverwriteStr == null || storeOverwriteStr.equals("true"));
when(config.getList("redborder.stores." + store + ".keys", Collections.singletonList("key"))).thenReturn(keys);
when(config.getBoolean("redborder.stores." + store + ".overwrite", true)).thenReturn(storeOverwrite);
stores.add(store);
}
context = new MockTaskContext();
storeManager = new StoreManager(config, context);
}
开发者ID:redBorder,项目名称:rb-samza-streaming,代码行数:24,代码来源:EnrichTest.java
示例3: testBroadcastChain
import org.apache.samza.config.Config; //导入依赖的package包/类
@Test
public void testBroadcastChain() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
MessageStream<Object> inputStream = streamGraph.getInputStream("input");
inputStream.filter(mock(FilterFunction.class));
inputStream.map(mock(MapFunction.class));
TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
OperatorImplGraph opImplGraph =
new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream"));
assertEquals(2, inputOpImpl.registeredOperators.size());
assertTrue(inputOpImpl.registeredOperators.stream()
.anyMatch(opImpl -> ((OperatorImpl) opImpl).getOperatorSpec().getOpCode() == OpCode.FILTER));
assertTrue(inputOpImpl.registeredOperators.stream()
.anyMatch(opImpl -> ((OperatorImpl) opImpl).getOperatorSpec().getOpCode() == OpCode.MAP));
}
开发者ID:apache,项目名称:samza,代码行数:23,代码来源:TestOperatorImplGraph.java
示例4: convertConfigToCoordinatorMessage
import org.apache.samza.config.Config; //导入依赖的package包/类
private void convertConfigToCoordinatorMessage(Config config) {
try {
for (Map.Entry<String, String> configPair : config.entrySet()) {
byte[] keyBytes = null;
byte[] messgeBytes = null;
if (configPair.getKey().startsWith(CHANGELOGPREFIX)) {
String[] changelogInfo = configPair.getKey().split(":");
String changeLogPartition = configPair.getValue();
SetChangelogMapping changelogMapping = new SetChangelogMapping(changelogInfo[1], changelogInfo[2], Integer.parseInt(changeLogPartition));
keyBytes = MAPPER.writeValueAsString(changelogMapping.getKeyArray()).getBytes("UTF-8");
messgeBytes = MAPPER.writeValueAsString(changelogMapping.getMessageMap()).getBytes("UTF-8");
} else {
SetConfig setConfig = new SetConfig("source", configPair.getKey(), configPair.getValue());
keyBytes = MAPPER.writeValueAsString(setConfig.getKeyArray()).getBytes("UTF-8");
messgeBytes = MAPPER.writeValueAsString(setConfig.getMessageMap()).getBytes("UTF-8");
}
// The ssp here is the coordinator ssp (which is always fixed) and not the task ssp.
put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, "", keyBytes, messgeBytes));
}
setIsAtHead(systemStreamPartition, true);
} catch (Exception e) {
throw new SamzaException(e);
}
}
开发者ID:apache,项目名称:samza,代码行数:25,代码来源:MockCoordinatorStreamWrappedConsumer.java
示例5: getConsumer
import org.apache.samza.config.Config; //导入依赖的package包/类
/**
* Returns a consumer that sends all configs to the coordinator stream.
*
* @param config Along with the configs, you can pass checkpoints and changelog stream messages into the stream.
* The expected pattern is cp:source:taskname -> ssp,offset for checkpoint (Use sspToString util)
* ch:source:taskname -> changelogPartition for changelog
* Everything else is processed as normal config
*/
public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
if (useCachedConsumer && mockConsumer != null) {
return mockConsumer;
}
String jobName = config.get("job.name");
String jobId = config.get("job.id");
if (jobName == null) {
throw new ConfigException("Must define job.name.");
}
if (jobId == null) {
jobId = "1";
}
String streamName = Util.getCoordinatorStreamName(jobName, jobId);
SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, streamName, new Partition(0));
mockConsumer = new MockCoordinatorStreamWrappedConsumer(systemStreamPartition, config);
return mockConsumer;
}
开发者ID:apache,项目名称:samza,代码行数:28,代码来源:MockCoordinatorStreamSystemFactory.java
示例6: testResourceMapWithInvalidTypeFailure
import org.apache.samza.config.Config; //导入依赖的package包/类
@Test
public void testResourceMapWithInvalidTypeFailure() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("No enum constant org.apache.hadoop.yarn.api.records.LocalResourceType.INVALIDTYPE");
Map<String, String> configMap = new HashMap<>();
configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme");
configMap.put("yarn.resources.myResource1.local.name", "readme");
configMap.put("yarn.resources.myResource1.local.type", "invalidType");
configMap.put("yarn.resources.myResource1.local.visibility", "public");
Config conf = new MapConfig(configMap);
YarnConfiguration yarnConfiguration = new YarnConfiguration();
yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName());
yarnConfiguration.set("fs.https.impl", HttpFileSystem.class.getName());
LocalizerResourceMapper mapper = new LocalizerResourceMapper(new LocalizerResourceConfig(conf), yarnConfiguration);
}
开发者ID:apache,项目名称:samza,代码行数:18,代码来源:TestLocalizerResourceMapper.java
示例7: init
import org.apache.samza.config.Config; //导入依赖的package包/类
@Override public void init(StreamGraph graph, Config config) {
MessageStream<PageViewEvent> pageViewEvents =
graph.getInputStream("pageViewEventStream", new JsonSerdeV2<>(PageViewEvent.class));
OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream =
graph.getOutputStream("pageViewEventPerMemberStream",
KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class)));
Supplier<Integer> initialValue = () -> 0;
FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1;
pageViewEvents
.window(Windows.keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), initialValue, foldLeftFn, null, null)
.setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
.setAccumulationMode(AccumulationMode.DISCARDING), "tumblingWindow")
.map(windowPane -> KV.of(windowPane.getKey().getKey(), new PageViewCount(windowPane)))
.sendTo(pageViewEventPerMemberStream);
}
开发者ID:apache,项目名称:samza,代码行数:17,代码来源:PageViewCounterExample.java
示例8: testLocalityMapWithoutHostAffinity
import org.apache.samza.config.Config; //导入依赖的package包/类
@Test
public void testLocalityMapWithoutHostAffinity() {
Config config = new MapConfig(new HashMap<String, String>() {
{
put("cluster-manager.container.count", "1");
put("cluster-manager.container.memory.mb", "512");
put("cluster-manager.container.retry.count", "1");
put("cluster-manager.container.retry.window.ms", "1999999999");
put("cluster-manager.allocator.sleep.ms", "10");
put("yarn.package.path", "/foo");
put("task.inputs", "test-system.test-stream");
put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory");
put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
put("job.host-affinity.enabled", "false");
}
});
this.localityMappings.put("0", new HashMap<String, String>() { {
put(SetContainerHostMapping.HOST_KEY, "abc-affinity");
} });
this.jobModelManager = JobModelManagerTestUtil.getJobModelManagerUsingReadModel(config, 1, mockStreamMetadataCache, mockLocalityManager, server);
assertEquals(jobModelManager.jobModel().getAllContainerLocality(), new HashMap<String, String>() { { this.put("0", null); } });
}
开发者ID:apache,项目名称:samza,代码行数:26,代码来源:TestJobModelManager.java
示例9: HdfsSystemConsumer
import org.apache.samza.config.Config; //导入依赖的package包/类
public HdfsSystemConsumer(String systemName, Config config, HdfsSystemConsumerMetrics consumerMetrics) {
super(consumerMetrics.getMetricsRegistry());
hdfsConfig = new HdfsConfig(config);
readerType = HdfsReaderFactory.getType(hdfsConfig.getFileReaderType(systemName));
stagingDirectory = hdfsConfig.getStagingDirectory();
bufferCapacity = hdfsConfig.getConsumerBufferCapacity(systemName);
numMaxRetires = hdfsConfig.getConsumerNumMaxRetries(systemName);
readers = new ConcurrentHashMap<>();
readerRunnableStatus = new ConcurrentHashMap<>();
isShutdown = false;
this.consumerMetrics = consumerMetrics;
cachedPartitionDescriptorMap = CacheBuilder.newBuilder().build(new CacheLoader<String, Map<Partition, List<String>>>() {
@Override
public Map<Partition, List<String>> load(String streamName)
throws Exception {
Validate.notEmpty(streamName);
if (StringUtils.isBlank(stagingDirectory)) {
throw new SamzaException("Staging directory can't be empty. "
+ "Is this not a yarn job (currently hdfs system consumer only works in "
+ "the same yarn environment on which hdfs is running)? " + "Is STAGING_DIRECTORY ("
+ HdfsConfig.STAGING_DIRECTORY() + ") not set (see HdfsConfig.scala)?");
}
return HdfsSystemAdmin.obtainPartitionDescriptorMap(stagingDirectory, streamName);
}
});
}
开发者ID:apache,项目名称:samza,代码行数:27,代码来源:HdfsSystemConsumer.java
示例10: fromConfig
import org.apache.samza.config.Config; //导入依赖的package包/类
/**
* Static method to load the {@link ApplicationRunner}
*
* @param config configuration passed in to initialize the Samza processes
* @return the configure-driven {@link ApplicationRunner} to run the user-defined stream applications
*/
public static ApplicationRunner fromConfig(Config config) {
try {
Class<?> runnerClass = Class.forName(config.get(RUNNER_CONFIG, DEFAULT_RUNNER_CLASS));
if (ApplicationRunner.class.isAssignableFrom(runnerClass)) {
Constructor<?> constructor = runnerClass.getConstructor(Config.class); // *sigh*
return (ApplicationRunner) constructor.newInstance(config);
}
} catch (Exception e) {
throw new ConfigException(String.format("Problem in loading ApplicationRunner class %s", config.get(
RUNNER_CONFIG)), e);
}
throw new ConfigException(String.format(
"Class %s does not extend ApplicationRunner properly",
config.get(RUNNER_CONFIG)));
}
开发者ID:apache,项目名称:samza,代码行数:22,代码来源:ApplicationRunner.java
示例11: setup
import org.apache.samza.config.Config; //导入依赖的package包/类
@Before
public void setup() throws IOException {
Map<String, String> configMap = new HashMap<String, String>();
Set<SystemStreamPartition> ssp = new HashSet<>();
configMap.put("a", "b");
Config config = new MapConfig(configMap);
TaskName taskName = new TaskName("test");
ssp.add(new SystemStreamPartition("foo", "bar", new Partition(1)));
TaskModel taskModel = new TaskModel(taskName, ssp, new Partition(2));
Map<TaskName, TaskModel> tasks = new HashMap<TaskName, TaskModel>();
tasks.put(taskName, taskModel);
ContainerModel containerModel = new ContainerModel("1", 1, tasks);
Map<String, ContainerModel> containerMap = new HashMap<String, ContainerModel>();
containerMap.put("1", containerModel);
jobModel = new JobModel(config, containerMap);
}
开发者ID:apache,项目名称:samza,代码行数:17,代码来源:TestSamzaObjectMapper.java
示例12: testGetIntermediateStreamWithDefaultDefaultSerde
import org.apache.samza.config.Config; //导入依赖的package包/类
@Test
public void testGetIntermediateStreamWithDefaultDefaultSerde() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
Config mockConfig = mock(Config.class);
StreamSpec mockStreamSpec = mock(StreamSpec.class);
String mockStreamName = "mockStreamName";
when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
graph.getIntermediateStream(mockStreamName, null);
assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
assertTrue(intermediateStreamImpl.getOutputStream().getValueSerde() instanceof NoOpSerde);
assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde() instanceof NoOpSerde);
}
开发者ID:apache,项目名称:samza,代码行数:21,代码来源:TestStreamGraphImpl.java
示例13: TableManager
import org.apache.samza.config.Config; //导入依赖的package包/类
/**
* Construct a table manager instance
* @param config the job configuration
* @param serdes Serde instances for tables
*/
public TableManager(Config config, Map<String, Serde<Object>> serdes) {
new JavaTableConfig(config).getTableIds().forEach(tableId -> {
// Construct the table provider
String tableProviderFactory = config.get(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, tableId));
// Construct the KVSerde
JavaTableConfig tableConfig = new JavaTableConfig(config);
KVSerde serde = KVSerde.of(
serdes.get(tableConfig.getKeySerde(tableId)),
serdes.get(tableConfig.getValueSerde(tableId)));
TableSpec tableSpec = new TableSpec(tableId, serde, tableProviderFactory,
config.subset(String.format(JavaTableConfig.TABLE_ID_PREFIX, tableId) + "."));
addTable(tableSpec);
logger.info("Added table " + tableSpec.getId());
});
}
开发者ID:apache,项目名称:samza,代码行数:27,代码来源:TableManager.java
示例14: testStreamOperatorClose
import org.apache.samza.config.Config; //导入依赖的package包/类
@Test
public void testStreamOperatorClose() {
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
when(mockOp.getTransformFn()).thenReturn(txfmFn);
Config mockConfig = mock(Config.class);
TaskContext mockContext = mock(TaskContext.class);
StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl =
new StreamOperatorImpl<>(mockOp, mockConfig, mockContext);
// ensure that close is not called yet
verify(txfmFn, times(0)).close();
opImpl.handleClose();
// ensure that close is called once inside handleClose()
verify(txfmFn, times(1)).close();
}
开发者ID:apache,项目名称:samza,代码行数:18,代码来源:TestStreamOperatorImpl.java
示例15: testKinesisConfigs
import org.apache.samza.config.Config; //导入依赖的package包/类
@Test
public void testKinesisConfigs() {
Map<String, String> kv = new HashMap<>();
String system = "kinesis";
String stream = "kinesis-stream";
String systemConfigPrefix = String.format("systems.%s.", system);
String ssConfigPrefix = String.format("systems.%s.streams.%s.", system, stream);
kv.put("sensitive." + ssConfigPrefix + "aws.secretKey", "secretKey");
kv.put(systemConfigPrefix + "aws.region", "us-east-1");
kv.put(ssConfigPrefix + "aws.accessKey", "accessKey");
Config config = new MapConfig(kv);
KinesisConfig kConfig = new KinesisConfig(config);
assertEquals("us-east-1", kConfig.getRegion(system, stream).getName());
assertEquals("accessKey", kConfig.getStreamAccessKey(system, stream));
assertEquals("secretKey", kConfig.getStreamSecretKey(system, stream));
}
开发者ID:apache,项目名称:samza,代码行数:20,代码来源:TestKinesisConfig.java
示例16: testGetInputStreamWithDefaultKeyValueSerde
import org.apache.samza.config.Config; //导入依赖的package包/类
@Test
public void testGetInputStreamWithDefaultKeyValueSerde() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
StreamSpec mockStreamSpec = mock(StreamSpec.class);
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
KVSerde mockKVSerde = mock(KVSerde.class);
Serde mockKeySerde = mock(Serde.class);
Serde mockValueSerde = mock(Serde.class);
doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
graph.setDefaultSerde(mockKVSerde);
MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");
InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
(InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
}
开发者ID:apache,项目名称:samza,代码行数:24,代码来源:TestStreamGraphImpl.java
示例17: testReadFailsOnSerdeExceptions
import org.apache.samza.config.Config; //导入依赖的package包/类
@Test(expected = SamzaException.class)
public void testReadFailsOnSerdeExceptions() throws Exception {
KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
CHECKPOINT_SYSTEM, 1);
Config mockConfig = mock(Config.class);
when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY())).thenReturn(GROUPER_FACTORY_CLASS);
// mock out a consumer that returns a single checkpoint IME
SystemStreamPartition ssp = new SystemStreamPartition("system-1", "input-topic", new Partition(0));
List<List<IncomingMessageEnvelope>> checkpointEnvelopes = ImmutableList.of(
ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, "0")));
SystemConsumer mockConsumer = newConsumer(checkpointEnvelopes);
SystemAdmin mockAdmin = newAdmin("0", "1");
SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);
// wire up an exception throwing serde with the checkpointmanager
KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
true, mockConfig, mock(MetricsRegistry.class), new ExceptionThrowingCheckpointSerde(), new KafkaCheckpointLogKeySerde());
checkpointManager.register(TASK1);
checkpointManager.start();
// expect an exception from ExceptionThrowingSerde
checkpointManager.readLastCheckpoint(TASK1);
}
开发者ID:apache,项目名称:samza,代码行数:26,代码来源:TestKafkaCheckpointManagerJava.java
示例18: testGetInputStreamPreservesInsertionOrder
import org.apache.samza.config.Config; //导入依赖的package包/类
@Test
public void testGetInputStreamPreservesInsertionOrder() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
Config mockConfig = mock(Config.class);
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1);
StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", "physical-stream-2", "test-system");
when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2);
StreamSpec testStreamSpec3 = new StreamSpec("test-stream-3", "physical-stream-3", "test-system");
when(mockRunner.getStreamSpec("test-stream-3")).thenReturn(testStreamSpec3);
graph.getInputStream("test-stream-1");
graph.getInputStream("test-stream-2");
graph.getInputStream("test-stream-3");
List<InputOperatorSpec> inputSpecs = new ArrayList<>(graph.getInputOperators().values());
Assert.assertEquals(inputSpecs.size(), 3);
Assert.assertEquals(inputSpecs.get(0).getStreamSpec(), testStreamSpec1);
Assert.assertEquals(inputSpecs.get(1).getStreamSpec(), testStreamSpec2);
Assert.assertEquals(inputSpecs.get(2).getStreamSpec(), testStreamSpec3);
}
开发者ID:apache,项目名称:samza,代码行数:27,代码来源:TestStreamGraphImpl.java
示例19: init
import org.apache.samza.config.Config; //导入依赖的package包/类
@Override
public void init(StreamGraph graph, Config config) {
MessageStream<OrderRecord> orders =
graph.getInputStream("orders", new JsonSerdeV2<>(OrderRecord.class));
MessageStream<ShipmentRecord> shipments =
graph.getInputStream("shipments", new JsonSerdeV2<>(ShipmentRecord.class));
OutputStream<KV<String, FulfilledOrderRecord>> fulfilledOrders =
graph.getOutputStream("fulfilledOrders",
KVSerde.of(new StringSerde(), new JsonSerdeV2<>(FulfilledOrderRecord.class)));
orders
.join(shipments, new MyJoinFunction(),
new StringSerde(), new JsonSerdeV2<>(OrderRecord.class), new JsonSerdeV2<>(ShipmentRecord.class),
Duration.ofMinutes(1), "join")
.map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder))
.sendTo(fulfilledOrders);
}
开发者ID:apache,项目名称:samza,代码行数:18,代码来源:OrderShipmentJoinExample.java
示例20: getEventHubConfig
import org.apache.samza.config.Config; //导入依赖的package包/类
public static Config getEventHubConfig(EventHubSystemProducer.PartitioningMethod partitioningMethod) {
HashMap<String, String> mapConfig = new HashMap<>();
mapConfig.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, SYSTEM_NAME), partitioningMethod.toString());
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, SYSTEM_NAME), STREAM_NAME1 + "," + STREAM_NAME2);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_NAMESPACE);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_ENTITY1);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_KEY_NAME);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_KEY);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_NAMESPACE);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_ENTITY2);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_KEY_NAME);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_KEY);
return new MapConfig(mapConfig);
}
开发者ID:apache,项目名称:samza,代码行数:18,代码来源:MockEventHubConfigFactory.java
注:本文中的org.apache.samza.config.Config类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论