本文整理汇总了Java中org.apache.beam.sdk.values.TupleTag类的典型用法代码示例。如果您正苦于以下问题:Java TupleTag类的具体用法?Java TupleTag怎么用?Java TupleTag使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
TupleTag类属于org.apache.beam.sdk.values包,在下文中一共展示了TupleTag类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: FlinkStatefulDoFnFunction
import org.apache.beam.sdk.values.TupleTag; //导入依赖的package包/类
public FlinkStatefulDoFnFunction(
DoFn<KV<K, V>, OutputT> dofn,
String stepName,
WindowingStrategy<?, ?> windowingStrategy,
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
PipelineOptions pipelineOptions,
Map<TupleTag<?>, Integer> outputMap,
TupleTag<OutputT> mainOutputTag) {
this.dofn = dofn;
this.stepName = stepName;
this.windowingStrategy = windowingStrategy;
this.sideInputs = sideInputs;
this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
this.outputMap = outputMap;
this.mainOutputTag = mainOutputTag;
}
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:FlinkStatefulDoFnFunction.java
示例2: testCoGroupByKeyGetOnly
import org.apache.beam.sdk.values.TupleTag; //导入依赖的package包/类
@Test
@Category(ValidatesRunner.class)
public void testCoGroupByKeyGetOnly() {
final TupleTag<String> tag1 = new TupleTag<>();
final TupleTag<String> tag2 = new TupleTag<>();
PCollection<KV<Integer, CoGbkResult>> coGbkResults =
buildGetOnlyGbk(p, tag1, tag2);
PAssert.thatMap(coGbkResults).satisfies(
new SerializableFunction<Map<Integer, CoGbkResult>, Void>() {
@Override
public Void apply(Map<Integer, CoGbkResult> results) {
assertEquals("collection1-1", results.get(1).getOnly(tag1));
assertEquals("collection1-2", results.get(2).getOnly(tag1));
assertEquals("collection2-2", results.get(2).getOnly(tag2));
assertEquals("collection2-3", results.get(3).getOnly(tag2));
return null;
}
});
p.run();
}
开发者ID:apache,项目名称:beam,代码行数:24,代码来源:CoGroupByKeyTest.java
示例3: ProcessKeyedElements
import org.apache.beam.sdk.values.TupleTag; //导入依赖的package包/类
/**
* @param fn the splittable {@link DoFn}.
* @param windowingStrategy the {@link WindowingStrategy} of the input collection.
* @param sideInputs list of side inputs that should be available to the {@link DoFn}.
* @param mainOutputTag {@link TupleTag Tag} of the {@link DoFn DoFn's} main output.
* @param additionalOutputTags {@link TupleTagList Tags} of the {@link DoFn DoFn's} additional
* @param outputTagsToCoders A map from output tag to the coder for that output, which should
* provide mappings for the main and all additional tags.
*/
public ProcessKeyedElements(
DoFn<InputT, OutputT> fn,
Coder<InputT> elementCoder,
Coder<RestrictionT> restrictionCoder,
WindowingStrategy<InputT, ?> windowingStrategy,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
TupleTagList additionalOutputTags,
Map<TupleTag<?>, Coder<?>> outputTagsToCoders) {
this.fn = fn;
this.elementCoder = elementCoder;
this.restrictionCoder = restrictionCoder;
this.windowingStrategy = windowingStrategy;
this.sideInputs = sideInputs;
this.mainOutputTag = mainOutputTag;
this.additionalOutputTags = additionalOutputTags;
this.outputTagsToCoders = outputTagsToCoders;
}
开发者ID:apache,项目名称:beam,代码行数:28,代码来源:SplittableParDo.java
示例4: testUdf
import org.apache.beam.sdk.values.TupleTag; //导入依赖的package包/类
/**
* test UDF.
*/
@Test
public void testUdf() throws Exception{
BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int", "cubicvalue"),
Arrays.asList(Types.INTEGER, Types.INTEGER));
BeamRecord record = new BeamRecord(resultType, 2, 8);
String sql1 = "SELECT f_int, cubic1(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
PCollection<BeamRecord> result1 =
boundedInput1.apply("testUdf1",
BeamSql.query(sql1).withUdf("cubic1", CubicInteger.class));
PAssert.that(result1).containsInAnyOrder(record);
String sql2 = "SELECT f_int, cubic2(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2";
PCollection<BeamRecord> result2 =
PCollectionTuple.of(new TupleTag<BeamRecord>("PCOLLECTION"), boundedInput1)
.apply("testUdf2",
BeamSql.queryMulti(sql2).withUdf("cubic2", new CubicIntegerFn()));
PAssert.that(result2).containsInAnyOrder(record);
pipeline.run().waitUntilFinish();
}
开发者ID:apache,项目名称:beam,代码行数:26,代码来源:BeamSqlDslUdfUdafTest.java
示例5: runPartialFields
import org.apache.beam.sdk.values.TupleTag; //导入依赖的package包/类
private void runPartialFields(PCollection<BeamRecord> input) throws Exception {
String sql = "SELECT f_int, f_long FROM TABLE_A";
PCollection<BeamRecord> result =
PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
.apply("testPartialFields", BeamSql.queryMulti(sql));
BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int", "f_long"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
BeamRecord record = new BeamRecord(resultType
, recordsInTableA.get(0).getFieldValue(0), recordsInTableA.get(0).getFieldValue(1));
PAssert.that(result).containsInAnyOrder(record);
pipeline.run().waitUntilFinish();
}
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:BeamSqlDslProjectTest.java
示例6: buildGetOnlyGbk
import org.apache.beam.sdk.values.TupleTag; //导入依赖的package包/类
/**
* Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the result
* of a {@link CoGroupByKey} over 2 {@code PCollection<KV<Integer, String>>},
* where each {@link PCollection} has no duplicate keys and the key sets of
* each {@link PCollection} are intersecting but neither is a subset of the other.
*/
private PCollection<KV<Integer, CoGbkResult>> buildGetOnlyGbk(
Pipeline p,
TupleTag<String> tag1,
TupleTag<String> tag2) {
List<KV<Integer, String>> list1 =
Arrays.asList(
KV.of(1, "collection1-1"),
KV.of(2, "collection1-2"));
List<KV<Integer, String>> list2 =
Arrays.asList(
KV.of(2, "collection2-2"),
KV.of(3, "collection2-3"));
PCollection<KV<Integer, String>> collection1 = createInput("CreateList1", p, list1);
PCollection<KV<Integer, String>> collection2 = createInput("CreateList2", p, list2);
PCollection<KV<Integer, CoGbkResult>> coGbkResults =
KeyedPCollectionTuple.of(tag1, collection1)
.and(tag2, collection2)
.apply(CoGroupByKey.<Integer>create());
return coGbkResults;
}
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:CoGroupByKeyTest.java
示例7: getDynamicDestinationSideInputs
import org.apache.beam.sdk.values.TupleTag; //导入依赖的package包/类
public static <UserT, DestinationT> List<PCollectionView<?>> getDynamicDestinationSideInputs(
AppliedPTransform<
PCollection<UserT>, WriteFilesResult<DestinationT>,
? extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>>>
transform)
throws IOException {
SdkComponents sdkComponents = SdkComponents.create();
RunnerApi.PTransform transformProto = PTransformTranslation.toProto(transform, sdkComponents);
List<PCollectionView<?>> views = Lists.newArrayList();
Map<String, SideInput> sideInputs = getWriteFilesPayload(transform).getSideInputsMap();
for (Map.Entry<String, SideInput> entry : sideInputs.entrySet()) {
PCollection<?> originalPCollection =
checkNotNull(
(PCollection<?>) transform.getInputs().get(new TupleTag<>(entry.getKey())),
"no input with tag %s",
entry.getKey());
views.add(
PCollectionViewTranslation.viewFromProto(
entry.getValue(),
entry.getKey(),
originalPCollection,
transformProto,
RehydratedComponents.forComponents(sdkComponents.toComponents())));
}
return views;
}
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:WriteFilesTranslation.java
示例8: getTupleTagCoders
import org.apache.beam.sdk.values.TupleTag; //导入依赖的package包/类
/**
* Utility to get mapping between TupleTag and a coder.
* @param outputs - A map of tuple tags and pcollections
* @return mapping between TupleTag and a coder
*/
public static Map<TupleTag<?>, Coder<WindowedValue<?>>> getTupleTagCoders(
Map<TupleTag<?>, PValue> outputs) {
Map<TupleTag<?>, Coder<WindowedValue<?>>> coderMap = new HashMap<>(outputs.size());
for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
// we get the first PValue as all of them are fro the same type.
PCollection<?> pCollection = (PCollection<?>) output.getValue();
Coder<?> coder = pCollection.getCoder();
Coder<? extends BoundedWindow> wCoder =
pCollection.getWindowingStrategy().getWindowFn().windowCoder();
@SuppressWarnings("unchecked")
Coder<WindowedValue<?>> windowedValueCoder =
(Coder<WindowedValue<?>>) (Coder<?>) WindowedValue.getFullCoder(coder, wCoder);
coderMap.put(output.getKey(), windowedValueCoder);
}
return coderMap;
}
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:TranslationUtils.java
示例9: and
import org.apache.beam.sdk.values.TupleTag; //导入依赖的package包/类
/**
* Returns a new {@code KeyedPCollectionTuple<K>} that is the same as this,
* appended with the given PCollection.
*/
public <V> KeyedPCollectionTuple<K> and(
TupleTag< V> tag,
PCollection<KV<K, V>> pc) {
if (pc.getPipeline() != getPipeline()) {
throw new IllegalArgumentException(
"PCollections come from different Pipelines");
}
TaggedKeyedPCollection<K, ?> wrapper =
new TaggedKeyedPCollection<>(tag, pc);
Coder<K> myKeyCoder = keyCoder == null ? getKeyCoder(pc) : keyCoder;
List<TaggedKeyedPCollection<K, ?>>
newKeyedCollections =
copyAddLast(
keyedCollections,
wrapper);
return new KeyedPCollectionTuple<>(
getPipeline(),
newKeyedCollections,
schema.getTupleTagList().and(tag),
myKeyCoder);
}
开发者ID:apache,项目名称:beam,代码行数:26,代码来源:KeyedPCollectionTuple.java
示例10: getAdditionalInputs
import org.apache.beam.sdk.values.TupleTag; //导入依赖的package包/类
@Override
public Map<TupleTag<?>, PValue> getAdditionalInputs() {
Map<TupleTag<?>, PValue> additionalInputs = new HashMap<>();
for (Map.Entry<String, SideInput> sideInputEntry : payload.getSideInputsMap().entrySet()) {
try {
additionalInputs.put(
new TupleTag<>(sideInputEntry.getKey()),
rehydratedComponents.getPCollection(
protoTransform.getInputsOrThrow(sideInputEntry.getKey())));
} catch (IOException exc) {
throw new IllegalStateException(
String.format(
"Could not find input with name %s for %s transform",
sideInputEntry.getKey(), ParDo.class.getSimpleName()));
}
}
return additionalInputs;
}
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:ParDoTranslation.java
示例11: expand
import org.apache.beam.sdk.values.TupleTag; //导入依赖的package包/类
@Override
public PCollectionList<T> expand(PCollection<T> in) {
final TupleTagList outputTags = partitionDoFn.getOutputTags();
PCollectionTuple outputs = in.apply(
ParDo
.of(partitionDoFn)
.withOutputTags(new TupleTag<Void>(){}, outputTags));
PCollectionList<T> pcs = PCollectionList.empty(in.getPipeline());
Coder<T> coder = in.getCoder();
for (TupleTag<?> outputTag : outputTags.getAll()) {
// All the tuple tags are actually TupleTag<T>
// And all the collections are actually PCollection<T>
@SuppressWarnings("unchecked")
TupleTag<T> typedOutputTag = (TupleTag<T>) outputTag;
pcs = pcs.and(outputs.get(typedOutputTag).setCoder(coder));
}
return pcs;
}
开发者ID:apache,项目名称:beam,代码行数:22,代码来源:Partition.java
示例12: newProcessFnRunner
import org.apache.beam.sdk.values.TupleTag; //导入依赖的package包/类
public static <InputT, OutputT, RestrictionT>
ProcessFnRunner<InputT, OutputT, RestrictionT>
newProcessFnRunner(
ProcessFn<InputT, OutputT, RestrictionT, ?> fn,
PipelineOptions options,
Collection<PCollectionView<?>> views,
ReadyCheckingSideInputReader sideInputReader,
OutputManager outputManager,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
StepContext stepContext,
WindowingStrategy<?, ?> windowingStrategy) {
return new ProcessFnRunner<>(
simpleRunner(
options,
fn,
sideInputReader,
outputManager,
mainOutputTag,
additionalOutputTags,
stepContext,
windowingStrategy),
views,
sideInputReader);
}
开发者ID:apache,项目名称:beam,代码行数:26,代码来源:DoFnRunners.java
示例13: testWithDuplicatedCollections
import org.apache.beam.sdk.values.TupleTag; //导入依赖的package包/类
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testWithDuplicatedCollections() {
String transformName = "transform";
when(transform.getName()).thenReturn(transformName);
JavaStream javaStream1 = mock(JavaStream.class);
TranslationContext translationContext = mock(TranslationContext.class);
Map<TupleTag<?>, PValue> inputs = new HashMap<>();
TupleTag tag1 = mock(TupleTag.class);
PCollection mockCollection1 = mock(PCollection.class);
inputs.put(tag1, mockCollection1);
TupleTag tag2 = mock(TupleTag.class);
inputs.put(tag2, mockCollection1);
when(translationContext.getInputs()).thenReturn(inputs);
when(translationContext.getInputStream(mockCollection1)).thenReturn(javaStream1);
when(translationContext.getPipelineOptions())
.thenReturn(PipelineOptionsFactory.as(GearpumpPipelineOptions.class));
translator.translate(transform, translationContext);
verify(javaStream1).map(any(MapFunction.class), eq("dummy"));
verify(javaStream1).merge(any(JavaStream.class), eq(1), eq(transformName));
}
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:FlattenPCollectionsTranslatorTest.java
示例14: ComposedCombineFnWithContext
import org.apache.beam.sdk.values.TupleTag; //导入依赖的package包/类
private ComposedCombineFnWithContext(
ImmutableList<SerializableFunction<DataT, ?>> extractInputFns,
ImmutableList<CombineFnWithContext<?, ?, ?>> combineFnWithContexts,
ImmutableList<TupleTag<?>> outputTags) {
@SuppressWarnings({"unchecked", "rawtypes"})
List<SerializableFunction<DataT, Object>> castedExtractInputFns =
(List) extractInputFns;
this.extractInputFns = castedExtractInputFns;
@SuppressWarnings({"rawtypes", "unchecked"})
List<CombineFnWithContext<Object, Object, Object>> castedCombineFnWithContexts =
(List) combineFnWithContexts;
this.combineFnWithContexts = castedCombineFnWithContexts;
this.outputTags = outputTags;
this.combineFnCount = this.combineFnWithContexts.size();
}
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:CombineFns.java
示例15: with
import org.apache.beam.sdk.values.TupleTag; //导入依赖的package包/类
/**
* Returns a {@link ComposedCombineFnWithContext} with an additional {@link GlobalCombineFn}.
*/
public <InputT, OutputT> ComposedCombineFnWithContext<DataT> with(
SimpleFunction<DataT, InputT> extractInputFn,
GlobalCombineFn<InputT, ?, OutputT> globalCombineFn,
TupleTag<OutputT> outputTag) {
checkUniqueness(outputTags, outputTag);
return new ComposedCombineFnWithContext<>(
ImmutableList.<SerializableFunction<DataT, ?>>builder()
.addAll(extractInputFns)
.add(extractInputFn)
.build(),
ImmutableList.<CombineFnWithContext<?, ?, ?>>builder()
.addAll(combineFnWithContexts)
.add(CombineFnUtil.toFnWithContext(globalCombineFn))
.build(),
ImmutableList.<TupleTag<?>>builder()
.addAll(outputTags)
.add(outputTag)
.build());
}
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:CombineFns.java
示例16: testTranslateWithOneCollection
import org.apache.beam.sdk.values.TupleTag; //导入依赖的package包/类
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testTranslateWithOneCollection() {
JavaStream javaStream = mock(JavaStream.class);
TranslationContext translationContext = mock(TranslationContext.class);
Map<TupleTag<?>, PValue> inputs = new HashMap<>();
TupleTag tag = mock(TupleTag.class);
PCollection mockCollection = mock(PCollection.class);
inputs.put(tag, mockCollection);
when(translationContext.getInputs()).thenReturn(inputs);
when(translationContext.getInputStream(mockCollection)).thenReturn(javaStream);
PValue mockOutput = mock(PValue.class);
when(translationContext.getOutput()).thenReturn(mockOutput);
translator.translate(transform, translationContext);
verify(translationContext, times(1)).setOutputStream(mockOutput, javaStream);
}
开发者ID:apache,项目名称:beam,代码行数:21,代码来源:FlattenPCollectionsTranslatorTest.java
示例17: getSingletonMainInput
import org.apache.beam.sdk.values.TupleTag; //导入依赖的package包/类
private static <T> PCollection<T> getSingletonMainInput(
Map<TupleTag<?>, PValue> inputs, Set<TupleTag<?>> ignoredTags) {
PCollection<T> mainInput = null;
for (Map.Entry<TupleTag<?>, PValue> input : inputs.entrySet()) {
if (!ignoredTags.contains(input.getKey())) {
checkArgument(
mainInput == null,
"Got multiple inputs that are not additional inputs for a "
+ "singleton main input: %s and %s",
mainInput,
input.getValue());
checkArgument(
input.getValue() instanceof PCollection,
"Unexpected input type %s",
input.getValue().getClass());
mainInput = (PCollection<T>) input.getValue();
}
}
checkArgument(
mainInput != null,
"No main input found in inputs: Inputs %s, Side Input tags %s",
inputs,
ignoredTags);
return mainInput;
}
开发者ID:apache,项目名称:beam,代码行数:26,代码来源:PTransformReplacements.java
示例18: DoFnRunnerFactory
import org.apache.beam.sdk.values.TupleTag; //导入依赖的package包/类
public DoFnRunnerFactory(
GearpumpPipelineOptions pipelineOptions,
DoFn<InputT, OutputT> doFn,
Collection<PCollectionView<?>> sideInputs,
DoFnRunners.OutputManager outputManager,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> sideOutputTags,
StepContext stepContext,
WindowingStrategy<?, ?> windowingStrategy) {
this.fn = doFn;
this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
this.sideInputs = sideInputs;
this.outputManager = outputManager;
this.mainOutputTag = mainOutputTag;
this.sideOutputTags = sideOutputTags;
this.stepContext = stepContext;
this.windowingStrategy = windowingStrategy;
}
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:DoFnRunnerFactory.java
示例19: testOverride
import org.apache.beam.sdk.values.TupleTag; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testOverride() {
PCollectionList<Long> empty = PCollectionList.empty(pipeline);
PCollection<Long> emptyFlattened =
empty.apply(
factory
.getReplacementTransform(
AppliedPTransform
.<PCollectionList<Long>, PCollection<Long>, Flatten.PCollections<Long>>of(
"nonEmptyInput",
Collections.<TupleTag<?>, PValue>emptyMap(),
Collections.<TupleTag<?>, PValue>emptyMap(),
Flatten.<Long>pCollections(),
pipeline))
.getTransform());
PAssert.that(emptyFlattened).empty();
pipeline.run();
}
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:EmptyFlattenAsCreateFactoryTest.java
示例20: testParDoReadingFromUnknownSideInput
import org.apache.beam.sdk.values.TupleTag; //导入依赖的package包/类
@Test
@Category(NeedsRunner.class)
public void testParDoReadingFromUnknownSideInput() {
List<Integer> inputs = Arrays.asList(3, -42, 666);
PCollectionView<Integer> sideView = pipeline
.apply("Create3", Create.of(3))
.apply(View.<Integer>asSingleton());
pipeline.apply("CreateMain", Create.of(inputs))
.apply(ParDo.of(new TestDoFn(
Arrays.<PCollectionView<Integer>>asList(sideView),
Arrays.<TupleTag<String>>asList())));
thrown.expect(RuntimeException.class);
thrown.expectMessage("calling sideInput() with unknown view");
pipeline.run();
}
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:ParDoTest.java
注:本文中的org.apache.beam.sdk.values.TupleTag类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论