本文整理汇总了Java中org.apache.beam.sdk.transforms.View类的典型用法代码示例。如果您正苦于以下问题:Java View类的具体用法?Java View怎么用?Java View使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
View类属于org.apache.beam.sdk.transforms包,在下文中一共展示了View类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: filterAlreadyProcessedUrls
import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
/**
* @param options
* @param pipeline
* @param readContent
* @return
*/
private static PCollection<InputContent> filterAlreadyProcessedUrls(
PCollection<InputContent> readContent, Pipeline pipeline,
IndexerPipelineOptions options) {
PCollection<InputContent> contentToProcess;
String query = IndexerPipelineUtils.buildBigQueryProcessedUrlsQuery(options);
PCollection<KV<String,Long>> alreadyProcessedUrls = pipeline
.apply("Get processed URLs",BigQueryIO.read().fromQuery(query))
.apply(ParDo.of(new GetUrlFn()));
final PCollectionView<Map<String,Long>> alreadyProcessedUrlsSideInput =
alreadyProcessedUrls.apply(View.<String,Long>asMap());
contentToProcess = readContent
.apply(ParDo.of(new FilterProcessedUrls(alreadyProcessedUrlsSideInput))
.withSideInputs(alreadyProcessedUrlsSideInput));
return contentToProcess;
}
开发者ID:GoogleCloudPlatform,项目名称:dataflow-opinion-analysis,代码行数:24,代码来源:IndexerPipeline.java
示例2: expand
import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Override
public PCollection<List<ResultT>> expand(PCollection<ResultT> input) {
if (getWindowedWrites()) {
// Reshuffle the results to make them stable against retries.
// Use a single void key to maximize size of bundles for finalization.
return input
.apply("Add void key", WithKeys.<Void, ResultT>of((Void) null))
.apply("Reshuffle", Reshuffle.<Void, ResultT>of())
.apply("Drop key", Values.<ResultT>create())
.apply("Gather bundles", ParDo.of(new GatherBundlesPerWindowFn<ResultT>()))
.setCoder(ListCoder.of(resultCoder))
// Reshuffle one more time to stabilize the contents of the bundle lists to finalize.
.apply(Reshuffle.<List<ResultT>>viaRandomKey());
} else {
// Pass results via a side input rather than reshuffle, because we need to get an empty
// iterable to finalize if there are no results.
return input
.getPipeline()
.apply(
Reify.viewInGlobalWindow(
input.apply(View.<ResultT>asList()), ListCoder.of(resultCoder)));
}
}
开发者ID:apache,项目名称:beam,代码行数:24,代码来源:WriteFiles.java
示例3: expand
import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Override
public PCollection<Iterable<T>> expand(PCollection<T> input) {
final PCollectionView<Iterable<T>> view = input.apply(View.<T>asIterable());
return input
.getPipeline()
.apply(Create.of((Void) null).withCoder(VoidCoder.of()))
.apply(
ParDo.of(
new DoFn<Void, Iterable<T>>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.sideInput(view));
}
})
.withSideInputs(view));
}
开发者ID:apache,项目名称:beam,代码行数:17,代码来源:ReifyAsIterable.java
示例4: testPassThroughThenCleanup
import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Test
public void testPassThroughThenCleanup() throws Exception {
PCollection<Integer> output =
p.apply(Create.of(1, 2, 3))
.apply(
new PassThroughThenCleanup<Integer>(
new PassThroughThenCleanup.CleanupOperation() {
@Override
void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception {
// no-op
}
},
p.apply("Create1", Create.of("")).apply(View.<String>asSingleton())));
PAssert.that(output).containsInAnyOrder(1, 2, 3);
p.run();
}
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:BigQueryIOReadTest.java
示例5: testPassThroughThenCleanupExecuted
import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Test
public void testPassThroughThenCleanupExecuted() throws Exception {
p.apply(Create.empty(VarIntCoder.of()))
.apply(
new PassThroughThenCleanup<Integer>(
new PassThroughThenCleanup.CleanupOperation() {
@Override
void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception {
throw new RuntimeException("cleanup executed");
}
},
p.apply("Create1", Create.of("")).apply(View.<String>asSingleton())));
thrown.expect(RuntimeException.class);
thrown.expectMessage("cleanup executed");
p.run();
}
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:BigQueryIOReadTest.java
示例6: expand
import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Override public PCollection<KV<String, String>> expand(PCollection<KV<String, String>> input) {
// reparallelize mimics the same behavior as in JdbcIO
// breaking fusion
PCollectionView<Iterable<KV<String, String>>> empty = input
.apply("Consume",
Filter.by(SerializableFunctions.<KV<String, String>, Boolean>constant(false)))
.apply(View.<KV<String, String>>asIterable());
PCollection<KV<String, String>> materialized = input
.apply("Identity", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext context) {
context.output(context.element());
}
}).withSideInputs(empty));
return materialized.apply(Reshuffle.<KV<String, String>>viaRandomKey());
}
开发者ID:apache,项目名称:beam,代码行数:17,代码来源:RedisIO.java
示例7: multiMultiParDo
import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
private static AppliedPTransform<?, ?, ?> multiMultiParDo(Pipeline pipeline) {
PCollectionView<String> view =
pipeline.apply(Create.of("foo")).apply(View.<String>asSingleton());
PCollection<Long> input = pipeline.apply(GenerateSequence.from(0));
ParDo.MultiOutput<Long, KV<Long, String>> parDo =
ParDo.of(new TestDoFn())
.withSideInputs(view)
.withOutputTags(
new TupleTag<KV<Long, String>>() {},
TupleTagList.of(new TupleTag<KV<String, Long>>() {}));
PCollectionTuple output = input.apply(parDo);
Map<TupleTag<?>, PValue> inputs = new HashMap<>();
inputs.putAll(parDo.getAdditionalInputs());
inputs.putAll(input.expand());
return AppliedPTransform
.<PCollection<Long>, PCollectionTuple, ParDo.MultiOutput<Long, KV<Long, String>>>of(
"MultiParDoInAndOut", inputs, output.expand(), parDo, pipeline);
}
开发者ID:apache,项目名称:beam,代码行数:21,代码来源:PTransformTranslationTest.java
示例8: getDefaultOverrides
import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
public static List<PTransformOverride> getDefaultOverrides(boolean streaming) {
if (streaming) {
return ImmutableList.<PTransformOverride>builder()
.add(
PTransformOverride.of(
PTransformMatchers.splittableParDoMulti(),
new FlinkStreamingPipelineTranslator.SplittableParDoOverrideFactory()))
.add(
PTransformOverride.of(
PTransformMatchers.classEqualTo(SplittableParDo.ProcessKeyedElements.class),
new SplittableParDoViaKeyedWorkItems.OverrideFactory()))
.add(
PTransformOverride.of(
PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
new CreateStreamingFlinkView.Factory()))
.build();
} else {
return ImmutableList.of();
}
}
开发者ID:apache,项目名称:beam,代码行数:21,代码来源:FlinkTransformOverrides.java
示例9: visitValue
import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Override
public void visitValue(PValue value, TransformHierarchy.Node producer) {
LOG.debug("Checking translation of {}", value);
// Primitive transforms are the only ones assigned step names.
if (producer.getTransform() instanceof CreateDataflowView
&& !hasExperiment(options, "beam_fn_api")) {
// CreateDataflowView produces a dummy output (as it must be a primitive transform)
// but in the Dataflow Job graph produces only the view and not the output PCollection.
asOutputReference(
((CreateDataflowView) producer.getTransform()).getView(),
producer.toAppliedPTransform(getPipeline()));
return;
} else if (producer.getTransform() instanceof View.CreatePCollectionView
&& hasExperiment(options, "beam_fn_api")) {
// View.CreatePCollectionView produces a dummy output (as it must be a primitive transform)
// but in the Dataflow Job graph produces only the view and not the output PCollection.
asOutputReference(
((View.CreatePCollectionView) producer.getTransform()).getView(),
producer.toAppliedPTransform(getPipeline()));
return;
}
asOutputReference(value, producer.toAppliedPTransform(getPipeline()));
}
开发者ID:apache,项目名称:beam,代码行数:24,代码来源:DataflowPipelineTranslator.java
示例10: getViewsReturnsViews
import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Test
public void getViewsReturnsViews() {
PCollectionView<List<String>> listView =
p.apply("listCreate", Create.of("foo", "bar"))
.apply(
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void processElement(DoFn<String, String>.ProcessContext c)
throws Exception {
c.output(Integer.toString(c.element().length()));
}
}))
.apply(View.<String>asList());
PCollectionView<Object> singletonView =
p.apply("singletonCreate", Create.<Object>of(1, 2, 3)).apply(View.<Object>asSingleton());
p.replaceAll(
DirectRunner.fromOptions(TestPipeline.testingPipelineOptions())
.defaultTransformOverrides());
p.traverseTopologically(visitor);
assertThat(
visitor.getGraph().getViews(),
Matchers.<PCollectionView<?>>containsInAnyOrder(listView, singletonView));
}
开发者ID:apache,项目名称:beam,代码行数:25,代码来源:DirectGraphVisitorTest.java
示例11: setup
import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
PCollection<Integer> create =
pipeline.apply("forBaseCollection", Create.<Integer>of(1, 2, 3, 4));
mapView =
create.apply("forKeyTypes", WithKeys.<String, Integer>of("foo"))
.apply("asMapView", View.<String, Integer>asMap());
singletonView = create.apply("forCombinedTypes", Mean.<Integer>globally().asSingletonView());
iterableView = create.apply("asIterableView", View.<Integer>asIterable());
container = SideInputContainer.create(
context, ImmutableList.of(iterableView, mapView, singletonView));
}
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:SideInputContainerTest.java
示例12: writeForMultipleIdenticalElementsInSameWindowSucceeds
import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Test
public void writeForMultipleIdenticalElementsInSameWindowSucceeds() throws Exception {
ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
for (Object materializedValue : materializeValuesFor(View.asIterable(), 44, 44)) {
valuesBuilder.add(WindowedValue.of(
materializedValue,
FIRST_WINDOW.maxTimestamp().minus(200L),
FIRST_WINDOW,
PaneInfo.ON_TIME_AND_ONLY_FIRING));
}
container.write(iterableView, valuesBuilder.build());
assertThat(
container
.createReaderForViews(ImmutableList.<PCollectionView<?>>of(iterableView))
.get(iterableView, FIRST_WINDOW),
contains(44, 44));
}
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:SideInputContainerTest.java
示例13: writeForElementInMultipleWindowsSucceeds
import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Test
public void writeForElementInMultipleWindowsSucceeds() throws Exception {
ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
for (Object materializedValue : materializeValuesFor(View.asSingleton(), 2.875)) {
valuesBuilder.add(WindowedValue.of(
materializedValue,
FIRST_WINDOW.maxTimestamp().minus(200L),
ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW),
PaneInfo.ON_TIME_AND_ONLY_FIRING));
}
container.write(singletonView, valuesBuilder.build());
assertThat(
container
.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
.get(singletonView, FIRST_WINDOW),
equalTo(2.875));
assertThat(
container
.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
.get(singletonView, SECOND_WINDOW),
equalTo(2.875));
}
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:SideInputContainerTest.java
示例14: expand
import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Override
public PCollectionView<Integer> expand(PCollection<T> input) {
return input
.getPipeline()
.apply(Create.of(0))
.apply(
"FixedNumShards",
ParDo.of(
new DoFn<Integer, Integer>() {
@ProcessElement
public void outputNumShards(ProcessContext ctxt) {
checkArgument(
numShards.isAccessible(),
"NumShards must be accessible at runtime to use constant sharding");
ctxt.output(numShards.get());
}
}))
.apply(View.<Integer>asSingleton());
}
开发者ID:Talend,项目名称:components,代码行数:20,代码来源:Write.java
示例15: filterAlreadyProcessedDocuments
import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
/**
* @param contentToIndexNotSkipped
* @param contentNotToIndexSkipped
* @param pipeline
* @param options
* @return
*/
private static ContentToIndexOrNot filterAlreadyProcessedDocuments(
PCollection<InputContent> contentToIndexNotSkipped, PCollection<InputContent> contentNotToIndexSkipped,
Pipeline pipeline, IndexerPipelineOptions options) {
PCollection<KV<String,Long>> alreadyProcessedDocs = null;
if (!options.getWriteTruncate()) {
String query = IndexerPipelineUtils.buildBigQueryProcessedDocsQuery(options);
alreadyProcessedDocs = pipeline
.apply("Get already processed Documents",BigQueryIO.read().fromQuery(query))
.apply(ParDo.of(new GetDocumentHashFn()));
} else {
Map<String, Long> map = new HashMap<String,Long>();
alreadyProcessedDocs = pipeline
.apply("Create empty side input of Docs",
Create.of(map).withCoder(KvCoder.of(StringUtf8Coder.of(),VarLongCoder.of())));
}
final PCollectionView<Map<String,Long>> alreadyProcessedDocsSideInput =
alreadyProcessedDocs.apply(View.<String,Long>asMap());
PCollectionTuple indexOrNotBasedOnExactDupes = contentToIndexNotSkipped
.apply("Extract DocumentHash key", ParDo.of(new GetInputContentDocumentHashFn()))
.apply("Group by DocumentHash key", GroupByKey.<String, InputContent>create())
.apply("Eliminate InputContent Dupes", ParDo.of(new EliminateInputContentDupes(alreadyProcessedDocsSideInput))
.withSideInputs(alreadyProcessedDocsSideInput)
.withOutputTags(PipelineTags.contentToIndexNotExactDupesTag, // main output collection
TupleTagList.of(PipelineTags.contentNotToIndexExactDupesTag))); // side output collection
PCollection<InputContent> contentToIndexNotExactDupes = indexOrNotBasedOnExactDupes.get(PipelineTags.contentToIndexNotExactDupesTag);
PCollection<InputContent> contentNotToIndexExactDupes = indexOrNotBasedOnExactDupes.get(PipelineTags.contentNotToIndexExactDupesTag);
// Merge the sets of items that are dupes or skipped
PCollectionList<InputContent> contentNotToIndexList = PCollectionList.of(contentNotToIndexExactDupes).and(contentNotToIndexSkipped);
ContentToIndexOrNot content = new ContentToIndexOrNot(contentToIndexNotExactDupes, contentNotToIndexList.apply(Flatten.<InputContent>pCollections()));
return content;
}
开发者ID:GoogleCloudPlatform,项目名称:dataflow-opinion-analysis,代码行数:46,代码来源:IndexerPipeline.java
示例16: expand
import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Override
public PCollection<TableRow> expand(PCollection<TableRow> rows) {
// Extract the mean_temp from each row.
PCollection<Double> meanTemps = rows.apply(
ParDo.of(new ExtractTempFn()));
// Find the global mean, of all the mean_temp readings in the weather data,
// and prepare this singleton PCollectionView for use as a side input.
final PCollectionView<Double> globalMeanTemp =
meanTemps.apply(Mean.<Double>globally())
.apply(View.<Double>asSingleton());
// Rows filtered to remove all but a single month
PCollection<TableRow> monthFilteredRows = rows
.apply(ParDo.of(new FilterSingleMonthDataFn(monthFilter)));
// Then, use the global mean as a side input, to further filter the weather data.
// By using a side input to pass in the filtering criteria, we can use a value
// that is computed earlier in pipeline execution.
// We'll only output readings with temperatures below this mean.
PCollection<TableRow> filteredRows = monthFilteredRows
.apply("ParseAndFilter", ParDo
.of(new DoFn<TableRow, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
Double meanTemp = Double.parseDouble(c.element().get("mean_temp").toString());
Double gTemp = c.sideInput(globalMeanTemp);
if (meanTemp < gTemp) {
c.output(c.element());
}
}
}).withSideInputs(globalMeanTemp));
return filteredRows;
}
开发者ID:apache,项目名称:beam,代码行数:37,代码来源:FilterExamples.java
示例17: sideInputJoinHelper
import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
private PCollection<BeamRecord> sideInputJoinHelper(
JoinRelType joinType,
PCollection<KV<BeamRecord, BeamRecord>> leftRows,
PCollection<KV<BeamRecord, BeamRecord>> rightRows,
BeamRecord rightNullRow, boolean swapped) {
final PCollectionView<Map<BeamRecord, Iterable<BeamRecord>>> rowsView = rightRows
.apply(View.<BeamRecord, BeamRecord>asMultimap());
PCollection<BeamRecord> ret = leftRows
.apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn(
joinType, rightNullRow, rowsView, swapped)).withSideInputs(rowsView))
.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder());
return ret;
}
开发者ID:apache,项目名称:beam,代码行数:16,代码来源:BeamJoinRel.java
示例18: thatMultimap
import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
/**
* Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection} with the
* specified reason.
*
* <p>Note that the actual value must be coded by a {@link KvCoder}, not just any
* {@code Coder<K, V>}.
*/
public static <K, V> SingletonAssert<Map<K, Iterable<V>>> thatMultimap(
String reason, PCollection<KV<K, V>> actual) {
@SuppressWarnings("unchecked")
KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder();
return new PCollectionViewAssert<>(
actual,
View.<K, V>asMultimap(),
MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder())),
PAssertionSite.capture(reason));
}
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:PAssert.java
示例19: thatMap
import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
/**
* Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection} with
* the specified reason. The {@link PCollection} must have at most one value per key.
*
* <p>Note that the actual value must be coded by a {@link KvCoder}, not just any
* {@code Coder<K, V>}.
*/
public static <K, V> SingletonAssert<Map<K, V>> thatMap(
String reason, PCollection<KV<K, V>> actual) {
@SuppressWarnings("unchecked")
KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder();
return new PCollectionViewAssert<>(
actual,
View.<K, V>asMap(),
MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()),
PAssertionSite.capture(reason));
}
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:PAssert.java
示例20: expand
import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Override
public PCollectionView<Integer> expand(PCollection<String> input) {
return input
.apply(
ParDo.of(
new DoFn<String, Integer>() {
@ProcessElement
public void toInteger(ProcessContext ctxt) {
ctxt.output(Integer.valueOf(ctxt.element()));
}
}))
.apply(Top.<Integer>largest(1))
.apply(Flatten.<Integer>iterables())
.apply(View.<Integer>asSingleton());
}
开发者ID:apache,项目名称:beam,代码行数:16,代码来源:WriteFilesTest.java
注:本文中的org.apache.beam.sdk.transforms.View类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论