• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java DoFn类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.beam.sdk.transforms.DoFn的典型用法代码示例。如果您正苦于以下问题:Java DoFn类的具体用法?Java DoFn怎么用?Java DoFn使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



DoFn类属于org.apache.beam.sdk.transforms包,在下文中一共展示了DoFn类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: main

import org.apache.beam.sdk.transforms.DoFn; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
    PipelineOptionsFactory.register(TemplateOptions.class);
    TemplateOptions options = PipelineOptionsFactory
            .fromArgs(args)
            .withValidation()
            .as(TemplateOptions.class);
    options.setAutoscalingAlgorithm(THROUGHPUT_BASED);
    Pipeline pipeline = Pipeline.create(options);
    pipeline.apply(BigQueryIO.read().from(options.getBigQueryTableName()))
            .apply(ParDo.of(new DoFn<TableRow, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws Exception {
                    String commaSep = c.element().values()
                            .stream()
                            .map(cell -> cell.toString().trim())
                            .collect(Collectors.joining("\",\""));
                    c.output(commaSep);
                }
            }))
            .apply(TextIO.write().to(options.getOutputFile())
                    .withoutSharding()
                    .withWritableByteChannelFactory(GZIP)
            );
    pipeline.run();
}
 
开发者ID:shinesolutions,项目名称:bigquery-table-to-one-file,代码行数:26,代码来源:BigQueryTableToOneFile.java


示例2: MultiDoFnFunction

import org.apache.beam.sdk.transforms.DoFn; //导入依赖的package包/类
/**
 * @param metricsAccum       The Spark {@link Accumulator} that backs the Beam metrics.
 * @param doFn              The {@link DoFn} to be wrapped.
 * @param options    The {@link SerializablePipelineOptions}.
 * @param mainOutputTag     The main output {@link TupleTag}.
 * @param additionalOutputTags Additional {@link TupleTag output tags}.
 * @param sideInputs        Side inputs used in this {@link DoFn}.
 * @param windowingStrategy Input {@link WindowingStrategy}.
 * @param stateful          Stateful {@link DoFn}.
 */
public MultiDoFnFunction(
    Accumulator<MetricsContainerStepMap> metricsAccum,
    String stepName,
    DoFn<InputT, OutputT> doFn,
    SerializablePipelineOptions options,
    TupleTag<OutputT> mainOutputTag,
    List<TupleTag<?>> additionalOutputTags,
    Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs,
    WindowingStrategy<?, ?> windowingStrategy,
    boolean stateful) {
  this.metricsAccum = metricsAccum;
  this.stepName = stepName;
  this.doFn = doFn;
  this.options = options;
  this.mainOutputTag = mainOutputTag;
  this.additionalOutputTags = additionalOutputTags;
  this.sideInputs = sideInputs;
  this.windowingStrategy = windowingStrategy;
  this.stateful = stateful;
}
 
开发者ID:apache,项目名称:beam,代码行数:31,代码来源:MultiDoFnFunction.java


示例3: ProcessKeyedElements

import org.apache.beam.sdk.transforms.DoFn; //导入依赖的package包/类
/**
 * @param fn the splittable {@link DoFn}.
 * @param windowingStrategy the {@link WindowingStrategy} of the input collection.
 * @param sideInputs list of side inputs that should be available to the {@link DoFn}.
 * @param mainOutputTag {@link TupleTag Tag} of the {@link DoFn DoFn's} main output.
 * @param additionalOutputTags {@link TupleTagList Tags} of the {@link DoFn DoFn's} additional
 * @param outputTagsToCoders A map from output tag to the coder for that output, which should
 *     provide mappings for the main and all additional tags.
 */
public ProcessKeyedElements(
    DoFn<InputT, OutputT> fn,
    Coder<InputT> elementCoder,
    Coder<RestrictionT> restrictionCoder,
    WindowingStrategy<InputT, ?> windowingStrategy,
    List<PCollectionView<?>> sideInputs,
    TupleTag<OutputT> mainOutputTag,
    TupleTagList additionalOutputTags,
    Map<TupleTag<?>, Coder<?>> outputTagsToCoders) {
  this.fn = fn;
  this.elementCoder = elementCoder;
  this.restrictionCoder = restrictionCoder;
  this.windowingStrategy = windowingStrategy;
  this.sideInputs = sideInputs;
  this.mainOutputTag = mainOutputTag;
  this.additionalOutputTags = additionalOutputTags;
  this.outputTagsToCoders = outputTagsToCoders;
}
 
开发者ID:apache,项目名称:beam,代码行数:28,代码来源:SplittableParDo.java


示例4: getReplacementTransformGetFn

import org.apache.beam.sdk.transforms.DoFn; //导入依赖的package包/类
@Test
public void getReplacementTransformGetFn() {
  DoFn<Integer, Long> originalFn = new ToLongFn();
  ParDo.SingleOutput<Integer, Long> originalTransform = ParDo.of(originalFn);
  PCollection<? extends Integer> input = pipeline.apply(Create.of(1, 2, 3));
  AppliedPTransform<
          PCollection<? extends Integer>, PCollection<Long>, ParDo.SingleOutput<Integer, Long>>
      application =
          AppliedPTransform.of(
              "original",
              input.expand(),
              input.apply(originalTransform).expand(),
              originalTransform,
              pipeline);

  PTransformReplacement<PCollection<? extends Integer>, PCollection<Long>> replacementTransform =
      factory.getReplacementTransform(application);
  ParDoSingle<Integer, Long> parDoSingle =
      (ParDoSingle<Integer, Long>) replacementTransform.getTransform();

  assertThat(parDoSingle.getFn(), equalTo(originalTransform.getFn()));
  assertThat(parDoSingle.getFn(), equalTo(originalFn));
}
 
开发者ID:apache,项目名称:beam,代码行数:24,代码来源:PrimitiveParDoSingleFactoryTest.java


示例5: testMutatingOutputCoderDoFnError

import org.apache.beam.sdk.transforms.DoFn; //导入依赖的package包/类
/**
 * Tests that a {@link DoFn} that mutates an output with a bad equals() still fails
 * in the {@link DirectRunner}.
 */
@Test
public void testMutatingOutputCoderDoFnError() throws Exception {
  Pipeline pipeline = getPipeline();

  pipeline
      .apply(Create.of(42))
      .apply(ParDo.of(new DoFn<Integer, byte[]>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
          byte[] outputArray = new byte[]{0x1, 0x2, 0x3};
          c.output(outputArray);
          outputArray[0] = 0xa;
          c.output(outputArray);
        }
      }));

  thrown.expect(IllegalMutationException.class);
  thrown.expectMessage("output");
  thrown.expectMessage("must not be mutated");
  pipeline.run();
}
 
开发者ID:apache,项目名称:beam,代码行数:26,代码来源:DirectRunnerTest.java


示例6: testSimpleTimerWithContext

import org.apache.beam.sdk.transforms.DoFn; //导入依赖的package包/类
@Test
public void testSimpleTimerWithContext() throws Exception {
  DoFnSignature sig =
      DoFnSignatures.getSignature(
          new DoFn<KV<String, Integer>, Long>() {
            @TimerId("foo")
            private final TimerSpec bizzle = TimerSpecs.timer(TimeDomain.EVENT_TIME);

            @ProcessElement
            public void foo(ProcessContext context) {}

            @OnTimer("foo")
            public void onFoo(OnTimerContext c) {}
          }.getClass());

  assertThat(sig.timerDeclarations().size(), equalTo(1));
  DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get("foo");

  assertThat(decl.id(), equalTo("foo"));
  assertThat(decl.field().getName(), equalTo("bizzle"));

  assertThat(
      sig.onTimerMethods().get("foo").extraParameters().get(0),
      equalTo((Parameter) Parameter.onTimerContext()));
}
 
开发者ID:apache,项目名称:beam,代码行数:26,代码来源:DoFnSignaturesTest.java


示例7: testMultipleProcessElement

import org.apache.beam.sdk.transforms.DoFn; //导入依赖的package包/类
@Test
public void testMultipleProcessElement() throws Exception {
  thrown.expect(IllegalArgumentException.class);
  thrown.expectMessage("Found multiple methods annotated with @ProcessElement");
  thrown.expectMessage("foo()");
  thrown.expectMessage("bar()");
  thrown.expectMessage(getClass().getName() + "$");
  DoFnSignatures.getSignature(
      new DoFn<String, String>() {
        @ProcessElement
        public void foo() {}

        @ProcessElement
        public void bar() {}
      }.getClass());
}
 
开发者ID:apache,项目名称:beam,代码行数:17,代码来源:DoFnSignaturesProcessElementTest.java


示例8: testSimpleTimerIdNamedDoFn

import org.apache.beam.sdk.transforms.DoFn; //导入依赖的package包/类
@Test
public void testSimpleTimerIdNamedDoFn() throws Exception {
  class DoFnForTestSimpleTimerIdNamedDoFn extends DoFn<KV<String, Integer>, Long> {
    @TimerId("foo")
    private final TimerSpec bizzle = TimerSpecs.timer(TimeDomain.EVENT_TIME);

    @ProcessElement
    public void foo(ProcessContext context) {}

    @OnTimer("foo")
    public void onFoo() {}
  }

  // Test classes at the bottom of the file
  DoFnSignature sig =
      DoFnSignatures.signatureForDoFn(new DoFnForTestSimpleTimerIdNamedDoFn());

  assertThat(sig.timerDeclarations().size(), equalTo(1));
  DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get("foo");

  assertThat(decl.id(), equalTo("foo"));
  assertThat(
      decl.field(), equalTo(DoFnForTestSimpleTimerIdNamedDoFn.class.getDeclaredField("bizzle")));
}
 
开发者ID:apache,项目名称:beam,代码行数:25,代码来源:DoFnSignaturesTest.java


示例9: applyForSingleton

import org.apache.beam.sdk.transforms.DoFn; //导入依赖的package包/类
static <T, FinalT, ViewT, W extends BoundedWindow> PCollection<?>
applyForSingleton(
    DataflowRunner runner,
    PCollection<T> input,
    DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
        IsmRecord<WindowedValue<FinalT>>> doFn,
    Coder<FinalT> defaultValueCoder,
    PCollectionView<ViewT> view) {

  @SuppressWarnings("unchecked")
  Coder<W> windowCoder = (Coder<W>)
      input.getWindowingStrategy().getWindowFn().windowCoder();

  IsmRecordCoder<WindowedValue<FinalT>> ismCoder =
      coderForSingleton(windowCoder, defaultValueCoder);

  PCollection<IsmRecord<WindowedValue<FinalT>>> reifiedPerWindowAndSorted = input
      .apply(new GroupByWindowHashAsKeyAndWindowAsSortKey<T, W>(ismCoder))
      .apply(ParDo.of(doFn));
  reifiedPerWindowAndSorted.setCoder(ismCoder);

  runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
  reifiedPerWindowAndSorted.apply(
      CreateDataflowView.<IsmRecord<WindowedValue<FinalT>>, ViewT>forBatch(view));
  return reifiedPerWindowAndSorted;
}
 
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:BatchViewOverrides.java


示例10: wrapContextAsStartBundle

import org.apache.beam.sdk.transforms.DoFn; //导入依赖的package包/类
private DoFn<InputT, OutputT>.StartBundleContext wrapContextAsStartBundle(
    final StartBundleContext baseContext) {
  return fn.new StartBundleContext() {
    @Override
    public PipelineOptions getPipelineOptions() {
      return baseContext.getPipelineOptions();
    }

    private void throwUnsupportedOutput() {
      throw new UnsupportedOperationException(
          String.format(
              "Splittable DoFn can only output from @%s",
              ProcessElement.class.getSimpleName()));
    }
  };
}
 
开发者ID:apache,项目名称:beam,代码行数:17,代码来源:SplittableParDoViaKeyedWorkItems.java


示例11: testTimerParameterDuplicate

import org.apache.beam.sdk.transforms.DoFn; //导入依赖的package包/类
@Test
public void testTimerParameterDuplicate() throws Exception {
  thrown.expect(IllegalArgumentException.class);
  thrown.expectMessage("duplicate");
  thrown.expectMessage("my-id");
  thrown.expectMessage("myProcessElement");
  thrown.expectMessage("index 2");
  thrown.expectMessage(not(mentionsState()));
  DoFnSignature sig =
      DoFnSignatures.getSignature(
          new DoFn<KV<String, Integer>, Long>() {
            @TimerId("my-id")
            private final TimerSpec myfield = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

            @ProcessElement
            public void myProcessElement(
                ProcessContext context,
                @TimerId("my-id") Timer one,
                @TimerId("my-id") Timer two) {}

            @OnTimer("my-id")
            public void onWhatever() {}
          }.getClass());
}
 
开发者ID:apache,项目名称:beam,代码行数:25,代码来源:DoFnSignaturesTest.java


示例12: applyTyped

import org.apache.beam.sdk.transforms.DoFn; //导入依赖的package包/类
private PCollection<Event> applyTyped(PCollection<Event> events) {
  final Coder<Event> coder = events.getCoder();
  return events
      // Force round trip through coder.
      .apply(name + ".Serialize",
          ParDo.of(new DoFn<Event, Event>() {
                private final Counter bytesMetric =
                  Metrics.counter(name , "bytes");

                @ProcessElement
                public void processElement(ProcessContext c) throws CoderException, IOException {
                  ByteArrayOutputStream outStream = new ByteArrayOutputStream();
                  coder.encode(c.element(), outStream, Coder.Context.OUTER);
                  byte[] byteArray = outStream.toByteArray();
                  bytesMetric.inc((long) byteArray.length);
                  ByteArrayInputStream inStream = new ByteArrayInputStream(byteArray);
                  Event event = coder.decode(inStream, Coder.Context.OUTER);
                  c.output(event);
                }
              }));
}
 
开发者ID:apache,项目名称:beam,代码行数:22,代码来源:Query0.java


示例13: logBytesMetric

import org.apache.beam.sdk.transforms.DoFn; //导入依赖的package包/类
private PTransform<? super PCollection<BeamRecord>, PCollection<BeamRecord>> logBytesMetric(
    final BeamRecordCoder coder) {

  return ParDo.of(new DoFn<BeamRecord, BeamRecord>() {
    private final Counter bytesMetric = Metrics.counter(name , "bytes");

    @ProcessElement
    public void processElement(ProcessContext c) throws CoderException, IOException {
      ByteArrayOutputStream outStream = new ByteArrayOutputStream();
      coder.encode(c.element(), outStream, Coder.Context.OUTER);
      byte[] byteArray = outStream.toByteArray();
      bytesMetric.inc((long) byteArray.length);
      ByteArrayInputStream inStream = new ByteArrayInputStream(byteArray);
      BeamRecord record = coder.decode(inStream, Coder.Context.OUTER);
      c.output(record);
    }
  });
}
 
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:SqlQuery0.java


示例14: DoFnRunnerFactory

import org.apache.beam.sdk.transforms.DoFn; //导入依赖的package包/类
public DoFnRunnerFactory(
    GearpumpPipelineOptions pipelineOptions,
    DoFn<InputT, OutputT> doFn,
    Collection<PCollectionView<?>> sideInputs,
    DoFnRunners.OutputManager outputManager,
    TupleTag<OutputT> mainOutputTag,
    List<TupleTag<?>> sideOutputTags,
    StepContext stepContext,
    WindowingStrategy<?, ?> windowingStrategy) {
  this.fn = doFn;
  this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
  this.sideInputs = sideInputs;
  this.outputManager = outputManager;
  this.mainOutputTag = mainOutputTag;
  this.sideOutputTags = sideOutputTags;
  this.stepContext = stepContext;
  this.windowingStrategy = windowingStrategy;
}
 
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:DoFnRunnerFactory.java


示例15: stateOrTimerParDoSingle

import org.apache.beam.sdk.transforms.DoFn; //导入依赖的package包/类
/**
 * A {@link PTransformMatcher} that matches a {@link ParDo.SingleOutput} containing a {@link DoFn}
 * that uses state or timers, as specified by {@link DoFnSignature#usesState()} and {@link
 * DoFnSignature#usesTimers()}.
 */
public static PTransformMatcher stateOrTimerParDoSingle() {
  return new PTransformMatcher() {
    @Override
    public boolean matches(AppliedPTransform<?, ?, ?> application) {
      PTransform<?, ?> transform = application.getTransform();
      if (transform instanceof ParDo.SingleOutput) {
        DoFn<?, ?> fn = ((ParDo.SingleOutput<?, ?>) transform).getFn();
        DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn);
        return signature.usesState() || signature.usesTimers();
      }
      return false;
    }

    @Override
    public String toString() {
      return MoreObjects.toStringHelper("StateOrTimerParDoSingleMatcher").toString();
    }
  };
}
 
开发者ID:apache,项目名称:beam,代码行数:25,代码来源:PTransformMatchers.java


示例16: testProcessElementException

import org.apache.beam.sdk.transforms.DoFn; //导入依赖的package包/类
@Test
public void testProcessElementException() throws Exception {
  DoFnInvoker<Integer, Integer> invoker =
      DoFnInvokers.invokerFor(
          new DoFn<Integer, Integer>() {
            @ProcessElement
            public void processElement(@SuppressWarnings("unused") ProcessContext c) {
              throw new IllegalArgumentException("bogus");
            }
          });
  thrown.expect(UserCodeException.class);
  thrown.expectMessage("bogus");
  invoker.invokeProcessElement(new FakeArgumentProvider<Integer, Integer>() {
    @Override
    public DoFn<Integer, Integer>.ProcessContext processContext(DoFn<Integer, Integer> fn) {
      return null;
    }
  });
}
 
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:DoFnInvokersTest.java


示例17: SplittableDoFnOperator

import org.apache.beam.sdk.transforms.DoFn; //导入依赖的package包/类
public SplittableDoFnOperator(
    DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> doFn,
    String stepName,
    Coder<WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>>> inputCoder,
    TupleTag<OutputT> mainOutputTag,
    List<TupleTag<?>> additionalOutputTags,
    OutputManagerFactory<OutputT> outputManagerFactory,
    WindowingStrategy<?, ?> windowingStrategy,
    Map<Integer, PCollectionView<?>> sideInputTagMapping,
    Collection<PCollectionView<?>> sideInputs,
    PipelineOptions options,
    Coder<?> keyCoder) {
  super(
      doFn,
      stepName,
      inputCoder,
      mainOutputTag,
      additionalOutputTags,
      outputManagerFactory,
      windowingStrategy,
      sideInputTagMapping,
      sideInputs,
      options,
      keyCoder);
}
 
开发者ID:apache,项目名称:beam,代码行数:26,代码来源:SplittableDoFnOperator.java


示例18: removesOnExceptionInOnTimer

import org.apache.beam.sdk.transforms.DoFn; //导入依赖的package包/类
@Test
public void removesOnExceptionInOnTimer() throws Exception {
  ParDoEvaluator<Object> underlying = mock(ParDoEvaluator.class);
  doThrow(Exception.class)
      .when(underlying)
      .onTimer(any(TimerData.class), any(BoundedWindow.class));

  DoFn<?, ?> original = lifecycleManager.get();
  assertThat(original, not(nullValue()));
  DoFnLifecycleManagerRemovingTransformEvaluator<Object> evaluator =
      DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(underlying, lifecycleManager);

  try {
    evaluator.onTimer(
        TimerData.of("foo", StateNamespaces.global(), new Instant(0), TimeDomain.EVENT_TIME),
        GlobalWindow.INSTANCE);
  } catch (Exception e) {
    assertThat(lifecycleManager.get(), not(Matchers.<DoFn<?, ?>>theInstance(original)));
    return;
  }
  fail("Expected underlying evaluator to throw on method call");
}
 
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:DoFnLifecycleManagerRemovingTransformEvaluatorTest.java


示例19: newStaticAnonymousDoFn

import org.apache.beam.sdk.transforms.DoFn; //导入依赖的package包/类
public static DoFn<String, String> newStaticAnonymousDoFn() {
  return new DoFn<String, String>() {
    private DoFn<String, String>.ProcessContext invokedContext;

    @ProcessElement
    public void process(ProcessContext c) {
      assertNull("Should have been invoked just once", invokedContext);
      invokedContext = c;
    }

    @SuppressWarnings("unused")
    public void verify(DoFn<String, String>.ProcessContext context) {
      assertEquals(context, invokedContext);
    }
  };
}
 
开发者ID:apache,项目名称:beam,代码行数:17,代码来源:DoFnInvokersTestHelper.java


示例20: splittableParDoMulti

import org.apache.beam.sdk.transforms.DoFn; //导入依赖的package包/类
/**
 * A {@link PTransformMatcher} that matches a {@link ParDo.MultiOutput} containing a {@link DoFn}
 * that is splittable, as signified by {@link ProcessElementMethod#isSplittable()}.
 */
public static PTransformMatcher splittableParDoMulti() {
  return new PTransformMatcher() {
    @Override
    public boolean matches(AppliedPTransform<?, ?, ?> application) {
      PTransform<?, ?> transform = application.getTransform();
      if (transform instanceof ParDo.MultiOutput) {
        DoFn<?, ?> fn = ((ParDo.MultiOutput<?, ?>) transform).getFn();
        DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn);
        return signature.processElement().isSplittable();
      }
      return false;
    }

    @Override
    public String toString() {
      return MoreObjects.toStringHelper("SplittableParDoMultiMatcher").toString();
    }
  };
}
 
开发者ID:apache,项目名称:beam,代码行数:24,代码来源:PTransformMatchers.java



注:本文中的org.apache.beam.sdk.transforms.DoFn类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java PackedSwitchPayload类代码示例发布时间:2022-05-21
下一篇:
Java Transport类代码示例发布时间:2022-05-21
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap