本文整理汇总了Java中org.apache.beam.sdk.values.PCollectionView类的典型用法代码示例。如果您正苦于以下问题:Java PCollectionView类的具体用法?Java PCollectionView怎么用?Java PCollectionView使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
PCollectionView类属于org.apache.beam.sdk.values包,在下文中一共展示了PCollectionView类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: filterAlreadyProcessedUrls
import org.apache.beam.sdk.values.PCollectionView; //导入依赖的package包/类
/**
* @param options
* @param pipeline
* @param readContent
* @return
*/
private static PCollection<InputContent> filterAlreadyProcessedUrls(
PCollection<InputContent> readContent, Pipeline pipeline,
IndexerPipelineOptions options) {
PCollection<InputContent> contentToProcess;
String query = IndexerPipelineUtils.buildBigQueryProcessedUrlsQuery(options);
PCollection<KV<String,Long>> alreadyProcessedUrls = pipeline
.apply("Get processed URLs",BigQueryIO.read().fromQuery(query))
.apply(ParDo.of(new GetUrlFn()));
final PCollectionView<Map<String,Long>> alreadyProcessedUrlsSideInput =
alreadyProcessedUrls.apply(View.<String,Long>asMap());
contentToProcess = readContent
.apply(ParDo.of(new FilterProcessedUrls(alreadyProcessedUrlsSideInput))
.withSideInputs(alreadyProcessedUrlsSideInput));
return contentToProcess;
}
开发者ID:GoogleCloudPlatform,项目名称:dataflow-opinion-analysis,代码行数:24,代码来源:IndexerPipeline.java
示例2: testFlatMapBasicWithSideInput
import org.apache.beam.sdk.values.PCollectionView; //导入依赖的package包/类
/**
* Basic test of {@link FlatMapElements} with a {@link Fn} and a side input.
*/
@Test
@Category(NeedsRunner.class)
public void testFlatMapBasicWithSideInput() throws Exception {
final PCollectionView<Integer> view =
pipeline.apply("Create base", Create.of(40)).apply(View.<Integer>asSingleton());
PCollection<Integer> output =
pipeline
.apply(Create.of(0, 1, 2))
.apply(
FlatMapElements.into(integers()).via(fn(
new Fn<Integer, Iterable<Integer>>() {
@Override
public List<Integer> apply(Integer input, Context c) {
return ImmutableList.of(
c.sideInput(view) - input, c.sideInput(view) + input);
}
},
requiresSideInputs(view))));
PAssert.that(output).containsInAnyOrder(38, 39, 40, 40, 41, 42);
pipeline.run();
}
开发者ID:apache,项目名称:beam,代码行数:26,代码来源:FlatMapElementsTest.java
示例3: testMultimapSideInput
import org.apache.beam.sdk.values.PCollectionView; //导入依赖的package包/类
@Test
@Category(ValidatesRunner.class)
public void testMultimapSideInput() {
final PCollectionView<Map<String, Iterable<Integer>>> view =
pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("a", 2), KV.of("b", 3)))
.apply(View.<String, Integer>asMultimap());
PCollection<KV<String, Integer>> output =
pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
.apply(
"OutputSideInputs",
ParDo.of(new DoFn<String, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) {
c.output(of(c.element(), v));
}
}
}).withSideInputs(view));
PAssert.that(output).containsInAnyOrder(
KV.of("apple", 1), KV.of("apple", 2), KV.of("banana", 3), KV.of("blackberry", 3));
pipeline.run();
}
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:ViewTest.java
示例4: getPCollectionView
import org.apache.beam.sdk.values.PCollectionView; //导入依赖的package包/类
SideInputBroadcast getPCollectionView(
PCollectionView<?> view,
JavaSparkContext context) {
// initialize broadcastHelperMap if needed
if (broadcastHelperMap == null) {
synchronized (SparkPCollectionView.class) {
if (broadcastHelperMap == null) {
broadcastHelperMap = new LinkedHashMap<>();
}
}
}
//lazily broadcast views
SideInputBroadcast helper = broadcastHelperMap.get(view);
if (helper == null) {
synchronized (SparkPCollectionView.class) {
helper = broadcastHelperMap.get(view);
if (helper == null) {
helper = createBroadcastHelper(view, context);
}
}
}
return helper;
}
开发者ID:apache,项目名称:beam,代码行数:25,代码来源:SparkPCollectionView.java
示例5: createFromComponents
import org.apache.beam.sdk.values.PCollectionView; //导入依赖的package包/类
/**
* Returns a {@code Combine.Context} from {@code PipelineOptions}, {@code SideInputReader}, and
* the main input window.
*/
private static CombineWithContext.Context createFromComponents(
final PipelineOptions options,
final SideInputReader sideInputReader,
final BoundedWindow mainInputWindow) {
return new CombineWithContext.Context() {
@Override
public PipelineOptions getPipelineOptions() {
return options;
}
@Override
public <T> T sideInput(PCollectionView<T> view) {
if (!sideInputReader.contains(view)) {
throw new IllegalArgumentException("calling sideInput() with unknown view");
}
BoundedWindow sideInputWindow =
view.getWindowMappingFn().getSideInputWindow(mainInputWindow);
return sideInputReader.get(view, sideInputWindow);
}
};
}
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:GlobalCombineFnRunners.java
示例6: withInputCoder
import org.apache.beam.sdk.values.PCollectionView; //导入依赖的package包/类
public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT>
withInputCoder(GlobalCombineFn<? super InputT, AccumT, OutputT> fn,
CoderRegistry registry, KvCoder<K, InputT> kvCoder,
Iterable<PCollectionView<?>> sideInputViews, WindowingStrategy<?, ?> windowingStrategy) {
// Casting down the K and InputT is safe because they're only used as inputs.
@SuppressWarnings("unchecked")
GlobalCombineFn<InputT, AccumT, OutputT> clonedFn =
(GlobalCombineFn<InputT, AccumT, OutputT>) SerializableUtils.clone(fn);
try {
Coder<AccumT> accumulatorCoder =
clonedFn.getAccumulatorCoder(registry, kvCoder.getValueCoder());
return create(clonedFn, accumulatorCoder, sideInputViews, kvCoder, windowingStrategy);
} catch (CannotProvideCoderException e) {
throw new IllegalStateException("Could not determine coder for accumulator", e);
}
}
开发者ID:apache,项目名称:beam,代码行数:17,代码来源:AppliedCombineFn.java
示例7: testNonSingletonSideInput
import org.apache.beam.sdk.values.PCollectionView; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testNonSingletonSideInput() throws Exception {
PCollection<Integer> oneTwoThree = pipeline.apply(Create.<Integer>of(1, 2, 3));
final PCollectionView<Integer> view = oneTwoThree.apply(View.<Integer>asSingleton());
oneTwoThree.apply(
"OutputSideInputs", ParDo.of(new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.sideInput(view));
}
}).withSideInputs(view));
thrown.expect(PipelineExecutionException.class);
thrown.expectCause(isA(IllegalArgumentException.class));
thrown.expectMessage("PCollection");
thrown.expectMessage("more than one");
thrown.expectMessage("singleton");
pipeline.run();
}
开发者ID:apache,项目名称:beam,代码行数:24,代码来源:ViewTest.java
示例8: parDoBaseClassPipelineOptionsNullTest
import org.apache.beam.sdk.values.PCollectionView; //导入依赖的package包/类
@Test(expected = Exception.class)
public void parDoBaseClassPipelineOptionsNullTest() {
TupleTag<String> mainTag = new TupleTag<>("main-output");
Coder<WindowedValue<String>> coder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
new TestDoFn(),
"stepName",
coder,
mainTag,
Collections.<TupleTag<?>>emptyList(),
new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, coder),
WindowingStrategy.globalDefault(),
new HashMap<Integer, PCollectionView<?>>(),
Collections.<PCollectionView<?>>emptyList(),
null,
null);
}
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:PipelineOptionsTest.java
示例9: testMapSideInput
import org.apache.beam.sdk.values.PCollectionView; //导入依赖的package包/类
@Test
@Category(ValidatesRunner.class)
public void testMapSideInput() {
final PCollectionView<Map<String, Integer>> view =
pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("b", 3)))
.apply(View.<String, Integer>asMap());
PCollection<KV<String, Integer>> output =
pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
.apply(
"OutputSideInputs",
ParDo.of(new DoFn<String, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(
of(c.element(), c.sideInput(view).get(c.element().substring(0, 1))));
}
}).withSideInputs(view));
PAssert.that(output).containsInAnyOrder(
KV.of("apple", 1), KV.of("banana", 3), KV.of("blackberry", 3));
pipeline.run();
}
开发者ID:apache,项目名称:beam,代码行数:26,代码来源:ViewTest.java
示例10: getDynamicDestinationSideInputs
import org.apache.beam.sdk.values.PCollectionView; //导入依赖的package包/类
public static <UserT, DestinationT> List<PCollectionView<?>> getDynamicDestinationSideInputs(
AppliedPTransform<
PCollection<UserT>, WriteFilesResult<DestinationT>,
? extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>>>
transform)
throws IOException {
SdkComponents sdkComponents = SdkComponents.create();
RunnerApi.PTransform transformProto = PTransformTranslation.toProto(transform, sdkComponents);
List<PCollectionView<?>> views = Lists.newArrayList();
Map<String, SideInput> sideInputs = getWriteFilesPayload(transform).getSideInputsMap();
for (Map.Entry<String, SideInput> entry : sideInputs.entrySet()) {
PCollection<?> originalPCollection =
checkNotNull(
(PCollection<?>) transform.getInputs().get(new TupleTag<>(entry.getKey())),
"no input with tag %s",
entry.getKey());
views.add(
PCollectionViewTranslation.viewFromProto(
entry.getValue(),
entry.getKey(),
originalPCollection,
transformProto,
RehydratedComponents.forComponents(sdkComponents.toComponents())));
}
return views;
}
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:WriteFilesTranslation.java
示例11: filterAlreadyProcessedDocuments
import org.apache.beam.sdk.values.PCollectionView; //导入依赖的package包/类
/**
* @param contentToIndexNotSkipped
* @param contentNotToIndexSkipped
* @param pipeline
* @param options
* @return
*/
private static ContentToIndexOrNot filterAlreadyProcessedDocuments(
PCollection<InputContent> contentToIndexNotSkipped, PCollection<InputContent> contentNotToIndexSkipped,
Pipeline pipeline, IndexerPipelineOptions options) {
PCollection<KV<String,Long>> alreadyProcessedDocs = null;
if (!options.getWriteTruncate()) {
String query = IndexerPipelineUtils.buildBigQueryProcessedDocsQuery(options);
alreadyProcessedDocs = pipeline
.apply("Get already processed Documents",BigQueryIO.read().fromQuery(query))
.apply(ParDo.of(new GetDocumentHashFn()));
} else {
Map<String, Long> map = new HashMap<String,Long>();
alreadyProcessedDocs = pipeline
.apply("Create empty side input of Docs",
Create.of(map).withCoder(KvCoder.of(StringUtf8Coder.of(),VarLongCoder.of())));
}
final PCollectionView<Map<String,Long>> alreadyProcessedDocsSideInput =
alreadyProcessedDocs.apply(View.<String,Long>asMap());
PCollectionTuple indexOrNotBasedOnExactDupes = contentToIndexNotSkipped
.apply("Extract DocumentHash key", ParDo.of(new GetInputContentDocumentHashFn()))
.apply("Group by DocumentHash key", GroupByKey.<String, InputContent>create())
.apply("Eliminate InputContent Dupes", ParDo.of(new EliminateInputContentDupes(alreadyProcessedDocsSideInput))
.withSideInputs(alreadyProcessedDocsSideInput)
.withOutputTags(PipelineTags.contentToIndexNotExactDupesTag, // main output collection
TupleTagList.of(PipelineTags.contentNotToIndexExactDupesTag))); // side output collection
PCollection<InputContent> contentToIndexNotExactDupes = indexOrNotBasedOnExactDupes.get(PipelineTags.contentToIndexNotExactDupesTag);
PCollection<InputContent> contentNotToIndexExactDupes = indexOrNotBasedOnExactDupes.get(PipelineTags.contentNotToIndexExactDupesTag);
// Merge the sets of items that are dupes or skipped
PCollectionList<InputContent> contentNotToIndexList = PCollectionList.of(contentNotToIndexExactDupes).and(contentNotToIndexSkipped);
ContentToIndexOrNot content = new ContentToIndexOrNot(contentToIndexNotExactDupes, contentNotToIndexList.apply(Flatten.<InputContent>pCollections()));
return content;
}
开发者ID:GoogleCloudPlatform,项目名称:dataflow-opinion-analysis,代码行数:46,代码来源:IndexerPipeline.java
示例12: ConvertSummaryToJsonDoFn
import org.apache.beam.sdk.values.PCollectionView; //导入依赖的package包/类
public ConvertSummaryToJsonDoFn(PCollectionView<Map<String, String>> productMasterView,
PCollectionView<Map<String, String>> storeMasterView,
PCollectionView<Map<String, String>> categoryMasterView,
TupleTag<String> tagOutputTrend,
TupleTag<String> tagOutputHeatmap,
int limitStoreNum) {
this.productMasterView = productMasterView;
this.categoryMasterView = categoryMasterView;
this.storeMasterView = storeMasterView;
this.tagOutputTrend = tagOutputTrend;
this.tagOutputHeatmap = tagOutputHeatmap;
this.limitStoreNum = limitStoreNum;
}
开发者ID:topgate,项目名称:retail-demo,代码行数:14,代码来源:ConvertSummaryToJsonDoFn.java
示例13: expand
import org.apache.beam.sdk.values.PCollectionView; //导入依赖的package包/类
@Override
public PCollection<T> expand(PCollection<T> input) {
// See https://issues.apache.org/jira/browse/BEAM-2803
// We use a combined approach to "break fusion" here:
// (see https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion)
// 1) force the data to be materialized by passing it as a side input to an identity fn,
// then 2) reshuffle it with a random key. Initial materialization provides some parallelism
// and ensures that data to be shuffled can be generated in parallel, while reshuffling
// provides perfect parallelism.
// In most cases where a "fusion break" is needed, a simple reshuffle would be sufficient.
// The current approach is necessary only to support the particular case of JdbcIO where
// a single query may produce many gigabytes of query results.
PCollectionView<Iterable<T>> empty =
input
.apply("Consume", Filter.by(SerializableFunctions.<T, Boolean>constant(false)))
.apply(View.<T>asIterable());
PCollection<T> materialized =
input.apply(
"Identity",
ParDo.of(
new DoFn<T, T>() {
@ProcessElement
public void process(ProcessContext c) {
c.output(c.element());
}
})
.withSideInputs(empty));
return materialized.apply(Reshuffle.<T>viaRandomKey());
}
开发者ID:apache,项目名称:beam,代码行数:30,代码来源:JdbcIO.java
示例14: translateNode
import org.apache.beam.sdk.values.PCollectionView; //导入依赖的package包/类
@Override
public void translateNode(
CreateStreamingFlinkView.CreateFlinkPCollectionView<ElemT, ViewT> transform,
FlinkStreamingTranslationContext context) {
// just forward
DataStream<WindowedValue<List<ElemT>>> inputDataSet =
context.getInputDataStream(context.getInput(transform));
PCollectionView<ViewT> view = transform.getView();
context.setOutputDataStream(view, inputDataSet);
}
开发者ID:apache,项目名称:beam,代码行数:13,代码来源:FlinkStreamingTransformTranslators.java
示例15: testFixedWindowsCombineWithContext
import org.apache.beam.sdk.values.PCollectionView; //导入依赖的package包/类
@Test
@Category(ValidatesRunner.class)
public void testFixedWindowsCombineWithContext() {
PCollection<KV<String, Integer>> perKeyInput =
pipeline
.apply(
Create.timestamped(
TimestampedValue.of(KV.of("a", 1), new Instant(0L)),
TimestampedValue.of(KV.of("a", 1), new Instant(1L)),
TimestampedValue.of(KV.of("a", 4), new Instant(6L)),
TimestampedValue.of(KV.of("b", 1), new Instant(7L)),
TimestampedValue.of(KV.of("b", 13), new Instant(8L)))
.withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
.apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.millis(2))));
PCollection<Integer> globallyInput = perKeyInput.apply(Values.<Integer>create());
PCollection<Integer> sum = globallyInput
.apply("Sum", Combine.globally(new SumInts()).withoutDefaults());
PCollectionView<Integer> globallySumView = sum.apply(View.<Integer>asSingleton());
PCollection<KV<String, String>> combinePerKeyWithContext =
perKeyInput.apply(
Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView))
.withSideInputs(globallySumView));
PCollection<String> combineGloballyWithContext = globallyInput
.apply(Combine.globally(new TestCombineFnWithContext(globallySumView))
.withoutDefaults()
.withSideInputs(globallySumView));
PAssert.that(sum).containsInAnyOrder(2, 5, 13);
PAssert.that(combinePerKeyWithContext)
.containsInAnyOrder(
Arrays.asList(
KV.of("a", "2:11"), KV.of("a", "5:4"), KV.of("b", "5:1"), KV.of("b", "13:13")));
PAssert.that(combineGloballyWithContext).containsInAnyOrder("2:11", "5:14", "13:13");
pipeline.run();
}
开发者ID:apache,项目名称:beam,代码行数:41,代码来源:CombineTest.java
示例16: FlinkMergingNonShuffleReduceFunction
import org.apache.beam.sdk.values.PCollectionView; //导入依赖的package包/类
public FlinkMergingNonShuffleReduceFunction(
CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn,
WindowingStrategy<Object, W> windowingStrategy,
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
PipelineOptions pipelineOptions) {
this.combineFn = combineFn;
this.windowingStrategy = windowingStrategy;
this.sideInputs = sideInputs;
this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
}
开发者ID:apache,项目名称:beam,代码行数:15,代码来源:FlinkMergingNonShuffleReduceFunction.java
示例17: get
import org.apache.beam.sdk.values.PCollectionView; //导入依赖的package包/类
@Override
@Nullable
public <T> T get(PCollectionView<T> view, BoundedWindow window) {
if (window.equals(GlobalWindow.INSTANCE)) {
return (T) (Integer) 5;
}
fail("Should only call get in the Global Window, others are not ready");
throw new AssertionError("Unreachable");
}
开发者ID:apache,项目名称:beam,代码行数:10,代码来源:ParDoEvaluatorTest.java
示例18: GbkThenStatefulParDo
import org.apache.beam.sdk.values.PCollectionView; //导入依赖的package包/类
public GbkThenStatefulParDo(
DoFn<KV<K, InputT>, OutputT> doFn,
TupleTag<OutputT> mainOutputTag,
TupleTagList additionalOutputTags,
List<PCollectionView<?>> sideInputs) {
this.doFn = doFn;
this.additionalOutputTags = additionalOutputTags;
this.mainOutputTag = mainOutputTag;
this.sideInputs = sideInputs;
}
开发者ID:apache,项目名称:beam,代码行数:11,代码来源:ParDoMultiOverrideFactory.java
示例19: defaultRunnerFactory
import org.apache.beam.sdk.values.PCollectionView; //导入依赖的package包/类
public static <InputT, OutputT> DoFnRunnerFactory<InputT, OutputT> defaultRunnerFactory() {
return new DoFnRunnerFactory<InputT, OutputT>() {
@Override
public PushbackSideInputDoFnRunner<InputT, OutputT> createRunner(
PipelineOptions options,
DoFn<InputT, OutputT> fn,
List<PCollectionView<?>> sideInputs,
ReadyCheckingSideInputReader sideInputReader,
OutputManager outputManager,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
DirectStepContext stepContext,
WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy) {
DoFnRunner<InputT, OutputT> underlying =
DoFnRunners.simpleRunner(
options,
fn,
sideInputReader,
outputManager,
mainOutputTag,
additionalOutputTags,
stepContext,
windowingStrategy);
return SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
}
};
}
开发者ID:apache,项目名称:beam,代码行数:28,代码来源:ParDoEvaluator.java
示例20: testWindowedSideInputFixedToFixed
import org.apache.beam.sdk.values.PCollectionView; //导入依赖的package包/类
@Test
@Category(ValidatesRunner.class)
public void testWindowedSideInputFixedToFixed() {
final PCollectionView<Integer> view =
pipeline.apply(
"CreateSideInput",
Create.timestamped(TimestampedValue.of(1, new Instant(1)),
TimestampedValue.of(2, new Instant(11)), TimestampedValue.of(3, new Instant(13))))
.apply("WindowSideInput", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
.apply(Sum.integersGlobally().withoutDefaults())
.apply(View.<Integer>asSingleton());
PCollection<String> output =
pipeline.apply("CreateMainInput", Create.timestamped(
TimestampedValue.of("A", new Instant(4)),
TimestampedValue.of("B", new Instant(15)),
TimestampedValue.of("C", new Instant(7))))
.apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10))))
.apply("OutputMainAndSideInputs", ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element() + c.sideInput(view));
}
}).withSideInputs(view));
PAssert.that(output).containsInAnyOrder("A1", "B5", "C1");
pipeline.run();
}
开发者ID:apache,项目名称:beam,代码行数:32,代码来源:ViewTest.java
注:本文中的org.apache.beam.sdk.values.PCollectionView类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论