本文整理汇总了Java中org.apache.beam.sdk.io.Read类的典型用法代码示例。如果您正苦于以下问题:Java Read类的具体用法?Java Read怎么用?Java Read使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Read类属于org.apache.beam.sdk.io包,在下文中一共展示了Read类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: mkKafkaReadTransform
import org.apache.beam.sdk.io.Read; //导入依赖的package包/类
/**
* Creates a consumer with two topics, with 10 partitions each.
* numElements are (round-robin) assigned all the 20 partitions.
*/
private static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(
int numElements,
int maxNumRecords,
@Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) {
List<String> topics = ImmutableList.of("topic_a", "topic_b");
KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read()
.withBootstrapServers("myServer1:9092,myServer2:9092")
.withTopics(topics)
.withConsumerFactoryFn(new ConsumerFactoryFn(
topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(LongDeserializer.class)
.withMaxNumRecords(maxNumRecords);
if (timestampFn != null) {
return reader.withTimestampFn(timestampFn);
} else {
return reader;
}
}
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:KafkaIOTest.java
示例2: testUnboundedSourceWithSingleTopic
import org.apache.beam.sdk.io.Read; //导入依赖的package包/类
@Test
public void testUnboundedSourceWithSingleTopic() {
// same as testUnboundedSource, but with single topic
int numElements = 1000;
String topic = "my_topic";
KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read()
.withBootstrapServers("none")
.withTopic("my_topic")
.withConsumerFactoryFn(new ConsumerFactoryFn(
ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST))
.withMaxNumRecords(numElements)
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(LongDeserializer.class);
PCollection<Long> input = p
.apply(reader.withoutMetadata())
.apply(Values.<Long>create());
addCountingAsserts(input, numElements);
p.run();
}
开发者ID:apache,项目名称:beam,代码行数:24,代码来源:KafkaIOTest.java
示例3: testSourceWithExplicitPartitionsDisplayData
import org.apache.beam.sdk.io.Read; //导入依赖的package包/类
@Test
public void testSourceWithExplicitPartitionsDisplayData() {
KafkaIO.Read<byte[], Long> read = KafkaIO.<byte[], Long>read()
.withBootstrapServers("myServer1:9092,myServer2:9092")
.withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5),
new TopicPartition("test", 6)))
.withConsumerFactoryFn(new ConsumerFactoryFn(
Lists.newArrayList("test"), 10, 10, OffsetResetStrategy.EARLIEST)) // 10 partitions
.withKeyDeserializer(ByteArrayDeserializer.class)
.withValueDeserializer(LongDeserializer.class);
DisplayData displayData = DisplayData.from(read);
assertThat(displayData, hasDisplayItem("topicPartitions", "test-5,test-6"));
assertThat(displayData, hasDisplayItem("enable.auto.commit", false));
assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092"));
assertThat(displayData, hasDisplayItem("auto.offset.reset", "latest"));
assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288));
}
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:KafkaIOTest.java
示例4: ensureAllReadsConsumed
import org.apache.beam.sdk.io.Read; //导入依赖的package包/类
public static void ensureAllReadsConsumed(Pipeline pipeline) {
final Set<PCollection<?>> unconsumed = new HashSet<>();
pipeline.traverseTopologically(
new PipelineVisitor.Defaults() {
@Override
public void visitPrimitiveTransform(Node node) {
unconsumed.removeAll(node.getInputs().values());
}
@Override
public void visitValue(PValue value, Node producer) {
if (producer.getTransform() instanceof Read.Bounded
|| producer.getTransform() instanceof Read.Unbounded) {
unconsumed.add((PCollection<?>) value);
}
}
});
int i = 0;
for (PCollection<?> unconsumedPCollection : unconsumed) {
consume(unconsumedPCollection, i);
i++;
}
}
开发者ID:apache,项目名称:beam,代码行数:24,代码来源:UnconsumedReads.java
示例5: doesNotConsumeAlreadyConsumedRead
import org.apache.beam.sdk.io.Read; //导入依赖的package包/类
@Test
public void doesNotConsumeAlreadyConsumedRead() {
Unbounded<Long> transform = Read.from(CountingSource.unbounded());
final PCollection<Long> output = pipeline.apply(transform);
final Flatten.PCollections<Long> consumer = Flatten.<Long>pCollections();
PCollectionList.of(output).apply(consumer);
UnconsumedReads.ensureAllReadsConsumed(pipeline);
pipeline.traverseTopologically(
new PipelineVisitor.Defaults() {
@Override
public void visitPrimitiveTransform(Node node) {
// The output should only be consumed by a single consumer
if (node.getInputs().values().contains(output)) {
assertThat(node.getTransform(), Matchers.<PTransform<?, ?>>is(consumer));
}
}
});
}
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:UnconsumedReadsTest.java
示例6: validateConsumed
import org.apache.beam.sdk.io.Read; //导入依赖的package包/类
private void validateConsumed() {
final Set<PValue> consumedOutputs = new HashSet<PValue>();
final Set<PValue> allReadOutputs = new HashSet<PValue>();
pipeline.traverseTopologically(
new PipelineVisitor.Defaults() {
@Override
public void visitPrimitiveTransform(Node node) {
consumedOutputs.addAll(node.getInputs().values());
}
@Override
public void visitValue(PValue value, Node producer) {
if (producer.getTransform() instanceof Read.Bounded
|| producer.getTransform() instanceof Read.Unbounded) {
allReadOutputs.add(value);
}
}
});
assertThat(consumedOutputs, Matchers.hasItems(allReadOutputs.toArray(new PValue[0])));
}
开发者ID:apache,项目名称:beam,代码行数:21,代码来源:UnconsumedReadsTest.java
示例7: readBounded
import org.apache.beam.sdk.io.Read; //导入依赖的package包/类
private static <T> TransformEvaluator<Read.Bounded<T>> readBounded() {
return new TransformEvaluator<Read.Bounded<T>>() {
@Override
public void evaluate(Read.Bounded<T> transform, EvaluationContext context) {
String stepName = context.getCurrentTransform().getFullName();
final JavaSparkContext jsc = context.getSparkContext();
// create an RDD from a BoundedSource.
JavaRDD<WindowedValue<T>> input =
new SourceRDD.Bounded<>(
jsc.sc(), transform.getSource(), context.getSerializableOptions(), stepName)
.toJavaRDD();
// cache to avoid re-evaluation of the source by Spark's lazy DAG evaluation.
context.putDataset(transform, new BoundedDataset<>(input), true);
}
@Override
public String toNativeString() {
return "sparkContext.<readFrom(<source>)>()";
}
};
}
开发者ID:apache,项目名称:beam,代码行数:22,代码来源:TransformTranslator.java
示例8: readUnbounded
import org.apache.beam.sdk.io.Read; //导入依赖的package包/类
private static <T> TransformEvaluator<Read.Unbounded<T>> readUnbounded() {
return new TransformEvaluator<Read.Unbounded<T>>() {
@Override
public void evaluate(Read.Unbounded<T> transform, EvaluationContext context) {
final String stepName = context.getCurrentTransform().getFullName();
context.putDataset(
transform,
SparkUnboundedSource.read(
context.getStreamingContext(),
context.getSerializableOptions(),
transform.getSource(),
stepName));
}
@Override
public String toNativeString() {
return "streamingContext.<readFrom(<source>)>()";
}
};
}
开发者ID:apache,项目名称:beam,代码行数:21,代码来源:StreamingTransformTranslator.java
示例9: test
import org.apache.beam.sdk.io.Read; //导入依赖的package包/类
@Test
public void test() throws IOException {
TestSparkPipelineOptions options =
PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
options.setRunner(TestSparkRunner.class);
options.setForceStreaming(true);
// pipeline with a bounded read.
Pipeline pipeline = Pipeline.create(options);
// apply the BoundedReadFromUnboundedSource.
BoundedReadFromUnboundedSource<?> boundedRead =
Read.from(CountingSource.unbounded()).withMaxNumRecords(-1);
pipeline.apply(boundedRead);
// adapt reads
TestSparkRunner runner = TestSparkRunner.fromOptions(options);
runner.adaptBoundedReads(pipeline);
UnboundedReadDetector unboundedReadDetector = new UnboundedReadDetector();
pipeline.traverseTopologically(unboundedReadDetector);
// assert that the applied BoundedReadFromUnboundedSource
// is being treated as an unbounded read.
assertThat("Expected to have an unbounded read.", unboundedReadDetector.isUnbounded);
}
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:ForceStreamingTest.java
示例10: testTranslate
import org.apache.beam.sdk.io.Read; //导入依赖的package包/类
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testTranslate() {
ReadBoundedTranslator translator = new ReadBoundedTranslator();
GearpumpPipelineOptions options =
PipelineOptionsFactory.create().as(GearpumpPipelineOptions.class);
Read.Bounded transform = mock(Read.Bounded.class);
BoundedSource source = mock(BoundedSource.class);
when(transform.getSource()).thenReturn(source);
TranslationContext translationContext = mock(TranslationContext.class);
when(translationContext.getPipelineOptions()).thenReturn(options);
JavaStream stream = mock(JavaStream.class);
PValue mockOutput = mock(PValue.class);
when(translationContext.getOutput()).thenReturn(mockOutput);
when(translationContext.getSourceStream(any(DataSource.class))).thenReturn(stream);
translator.translate(transform, translationContext);
verify(translationContext).getSourceStream(argThat(new BoundedSourceWrapperMatcher()));
verify(translationContext).setOutputStream(mockOutput, stream);
}
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:ReadBoundedTranslatorTest.java
示例11: testTranslate
import org.apache.beam.sdk.io.Read; //导入依赖的package包/类
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testTranslate() {
ReadUnboundedTranslator translator = new ReadUnboundedTranslator();
GearpumpPipelineOptions options =
PipelineOptionsFactory.create().as(GearpumpPipelineOptions.class);
Read.Unbounded transform = mock(Read.Unbounded.class);
UnboundedSource source = mock(UnboundedSource.class);
when(transform.getSource()).thenReturn(source);
TranslationContext translationContext = mock(TranslationContext.class);
when(translationContext.getPipelineOptions()).thenReturn(options);
JavaStream stream = mock(JavaStream.class);
PValue mockOutput = mock(PValue.class);
when(translationContext.getOutput()).thenReturn(mockOutput);
when(translationContext.getSourceStream(any(DataSource.class))).thenReturn(stream);
translator.translate(transform, translationContext);
verify(translationContext).getSourceStream(argThat(new UnboundedSourceWrapperMatcher()));
verify(translationContext).setOutputStream(mockOutput, stream);
}
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:ReadUnboundedTranslatorTest.java
示例12: testValueSource
import org.apache.beam.sdk.io.Read; //导入依赖的package包/类
@Test
public void testValueSource() {
GearpumpPipelineOptions options =
PipelineOptionsFactory.create().as(GearpumpPipelineOptions.class);
Config config = ClusterConfig.master(null);
config =
config.withValue(Constants.APPLICATION_TOTAL_RETRIES(), ConfigValueFactory.fromAnyRef(0));
EmbeddedCluster cluster = new EmbeddedCluster(config);
cluster.start();
options.setEmbeddedCluster(cluster);
options.setRunner(GearpumpRunner.class);
options.setParallelism(1);
Pipeline p = Pipeline.create(options);
List<String> values = Lists.newArrayList("1", "2", "3", "4", "5");
ValuesSource<String> source = new ValuesSource<>(values, StringUtf8Coder.of());
p.apply(Read.from(source)).apply(ParDo.of(new ResultCollector()));
p.run().waitUntilFinish();
cluster.stop();
Assert.assertEquals(Sets.newHashSet(values), ResultCollector.RESULTS);
}
开发者ID:apache,项目名称:beam,代码行数:24,代码来源:ValueSoureTest.java
示例13: getRootTransformsContainsRootTransforms
import org.apache.beam.sdk.io.Read; //导入依赖的package包/类
@Test
public void getRootTransformsContainsRootTransforms() {
PCollection<String> created = p.apply(Create.of("foo", "bar"));
PCollection<Long> counted = p.apply(Read.from(CountingSource.upTo(1234L)));
PCollection<Long> unCounted = p.apply(GenerateSequence.from(0));
p.traverseTopologically(visitor);
DirectGraph graph = visitor.getGraph();
assertThat(graph.getRootTransforms(), hasSize(3));
assertThat(
graph.getRootTransforms(),
Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder(
graph.getProducer(created), graph.getProducer(counted), graph.getProducer(unCounted)));
for (AppliedPTransform<?, ?, ?> root : graph.getRootTransforms()) {
// Root transforms will have no inputs
assertThat(root.getInputs().entrySet(), emptyIterable());
assertThat(
Iterables.getOnlyElement(root.getOutputs().values()),
Matchers.<POutput>isOneOf(created, counted, unCounted));
}
}
开发者ID:apache,项目名称:beam,代码行数:21,代码来源:DirectGraphVisitorTest.java
示例14: boundedSourceEvaluatorClosesReader
import org.apache.beam.sdk.io.Read; //导入依赖的package包/类
@Test
public void boundedSourceEvaluatorClosesReader() throws Exception {
TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L);
PCollection<Long> pcollection = p.apply(Read.from(source));
AppliedPTransform<?, ?, ?> sourceTransform = DirectGraphs.getProducer(pcollection);
UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
when(context.createBundle(pcollection)).thenReturn(output);
TransformEvaluator<BoundedSourceShard<Long>> evaluator =
factory.forApplication(
sourceTransform, bundleFactory.createRootBundle().commit(Instant.now()));
evaluator.processElement(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)));
evaluator.finishBundle();
CommittedBundle<Long> committed = output.commit(Instant.now());
assertThat(committed.getElements(), containsInAnyOrder(gw(2L), gw(3L), gw(1L)));
assertThat(TestSource.readerClosed, is(true));
}
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:BoundedReadEvaluatorFactoryTest.java
示例15: boundedSourceEvaluatorNoElementsClosesReader
import org.apache.beam.sdk.io.Read; //导入依赖的package包/类
@Test
public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception {
TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of());
PCollection<Long> pcollection = p.apply(Read.from(source));
AppliedPTransform<?, ?, ?> sourceTransform = DirectGraphs.getProducer(pcollection);
UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
when(context.createBundle(pcollection)).thenReturn(output);
TransformEvaluator<BoundedSourceShard<Long>> evaluator =
factory.forApplication(
sourceTransform, bundleFactory.createRootBundle().commit(Instant.now()));
evaluator.processElement(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)));
evaluator.finishBundle();
CommittedBundle<Long> committed = output.commit(Instant.now());
assertThat(committed.getElements(), emptyIterable());
assertThat(TestSource.readerClosed, is(true));
}
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:BoundedReadEvaluatorFactoryTest.java
示例16: testSource
import org.apache.beam.sdk.io.Read; //导入依赖的package包/类
@Test
public void testSource() {
Pipeline pipeline = TestPipeline.create();
FixedFlowProperties fixedFlowProperties = new FixedFlowProperties("fixedFlowProperties");
fixedFlowProperties.init();
fixedFlowProperties.data.setValue("a;b;c");
fixedFlowProperties.rowDelimited.setValue(";");
FixedFlowSource fixedFlowSource = new FixedFlowSource();
fixedFlowSource.initialize(null, fixedFlowProperties);
TCompBoundedSourceAdapter source = new TCompBoundedSourceAdapter(fixedFlowSource);
PCollection<String> result = pipeline.apply(Read.from(source)).apply(ParDo.of(new DoFn<IndexedRecord, String>() {
@DoFn.ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(c.element().get(0).toString());
}
}));
PAssert.that(result).containsInAnyOrder(Arrays.asList("a", "b", "c"));
pipeline.run();
}
开发者ID:Talend,项目名称:components,代码行数:27,代码来源:TCompBoundedSourceSinkAdapterTest.java
示例17: read
import org.apache.beam.sdk.io.Read; //导入依赖的package包/类
@Override
public PCollection<IndexedRecord> read(PBegin in) {
PCollection<?> pc2;
if (path.startsWith("gs://")) {
pc2 = in.apply(TextIO.read().from(path));
} else {
CsvHdfsFileSource source = CsvHdfsFileSource.of(doAs, path, recordDelimiter);
source.getExtraHadoopConfiguration().addFrom(getExtraHadoopConfiguration());
source.setLimit(limit);
PCollection<KV<org.apache.hadoop.io.LongWritable, Text>> pc1 = in.apply(Read.from(source));
pc2 = pc1.apply(Values.<Text> create());
}
PCollection<IndexedRecord> pc3 = pc2.apply(ParDo.of(new ExtractCsvRecord<>(fieldDelimiter.charAt(0))));
return pc3;
}
开发者ID:Talend,项目名称:components,代码行数:21,代码来源:SimpleRecordFormatCsvIO.java
示例18: visitPrimitiveTransform
import org.apache.beam.sdk.io.Read; //导入依赖的package包/类
@Override
public void visitPrimitiveTransform(TransformTreeNode node) {
Class<? extends PTransform> transformClass = node.getTransform().getClass();
if (transformClass == Read.Unbounded.class) {
streaming = true;
}
}
开发者ID:FreshetDMS,项目名称:Freshet-Deprecated,代码行数:8,代码来源:PipelineModeDetector.java
示例19: streamEventsSource
import org.apache.beam.sdk.io.Read; //导入依赖的package包/类
/**
* Return a transform which yields a finite number of synthesized events generated
* on-the-fly in real time.
*/
public static PTransform<PBegin, PCollection<Event>> streamEventsSource(
NexmarkConfiguration configuration) {
return Read.from(new UnboundedEventSource(NexmarkUtils.standardGeneratorConfig(configuration),
configuration.numEventGenerators,
configuration.watermarkHoldbackSec,
configuration.isRateLimited));
}
开发者ID:apache,项目名称:beam,代码行数:12,代码来源:NexmarkUtils.java
示例20: expand
import org.apache.beam.sdk.io.Read; //导入依赖的package包/类
@Override
public PCollection<PubsubMessage> expand(PBegin input) {
return input.getPipeline().begin()
.apply(Read.from(new PubsubSource(this)))
.apply("PubsubUnboundedSource.Stats",
ParDo.of(new StatsFn(
pubsubFactory, subscription, topic, timestampAttribute, idAttribute)));
}
开发者ID:apache,项目名称:beam,代码行数:9,代码来源:PubsubUnboundedSource.java
注:本文中的org.apache.beam.sdk.io.Read类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论