• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java DoFnTester类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.beam.sdk.transforms.DoFnTester的典型用法代码示例。如果您正苦于以下问题:Java DoFnTester类的具体用法?Java DoFnTester怎么用?Java DoFnTester使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



DoFnTester类属于org.apache.beam.sdk.transforms包,在下文中一共展示了DoFnTester类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: testDatastoreToGcs_EntityToJson_noTransform

import org.apache.beam.sdk.transforms.DoFnTester; //导入依赖的package包/类
@Test
public void testDatastoreToGcs_EntityToJson_noTransform() throws Exception {
  DoFnTester<Entity, String> fnTester = DoFnTester.of(EntityToJson.newBuilder()
      .setJsTransformPath(StaticValueProvider.of(null))
      .setJsTransformFunctionName(StaticValueProvider.of(null))
      .build());

  Builder entityBuilder = Entity.newBuilder();
  JsonFormat.parser().usingTypeRegistry(
      TypeRegistry.newBuilder()
          .add(Entity.getDescriptor())
          .build())
      .merge(mEntityJson, entityBuilder);

  Entity entity = entityBuilder.build();
  List<String> entityJsonOutputs = fnTester.processBundle(entity);
  Assert.assertEquals(mEntityJson, entityJsonOutputs.get(0));
}
 
开发者ID:cobookman,项目名称:teleport,代码行数:19,代码来源:DatastoreToGcsTest.java


示例2: testDatastoreToGcs_EntityToJson_withTransform

import org.apache.beam.sdk.transforms.DoFnTester; //导入依赖的package包/类
@Test
public void testDatastoreToGcs_EntityToJson_withTransform() throws Exception {
  DoFnTester<Entity, String> fnTester = DoFnTester.of(EntityToJson.newBuilder()
      .setJsTransformPath(StaticValueProvider.of(jsTransformPath))
      .setJsTransformFunctionName(StaticValueProvider.of("transform"))
      .build());

  Builder entityBuilder = Entity.newBuilder();
  JsonFormat.parser().usingTypeRegistry(
      TypeRegistry.newBuilder()
          .add(Entity.getDescriptor())
          .build())
      .merge(mEntityJson, entityBuilder);

  Entity entity = entityBuilder.build();
  List<String> entityJsonOutputs = fnTester.processBundle(entity);
  Assert.assertEquals(mTransformedEntityJson, entityJsonOutputs.get(0));
}
 
开发者ID:cobookman,项目名称:teleport,代码行数:19,代码来源:DatastoreToGcsTest.java


示例3: testGcsToDatastore_EntityToJson_noTransform

import org.apache.beam.sdk.transforms.DoFnTester; //导入依赖的package包/类
@Test
public void testGcsToDatastore_EntityToJson_noTransform() throws Exception {
  DoFnTester<String, Entity> fnTester = DoFnTester.of(JsonToEntity.newBuilder()
      .setJsTransformPath(StaticValueProvider.of(null))
      .setJsTransformFunctionName(StaticValueProvider.of(null))
      .build());
  List<Entity> output = fnTester.processBundle(mEntityJson);
  Entity outputEntity = output.get(0);

  Printer printer = JsonFormat.printer()
      .omittingInsignificantWhitespace()
      .usingTypeRegistry(
          TypeRegistry.newBuilder()
              .add(Entity.getDescriptor())
              .build());
  Assert.assertEquals(mEntityJson, printer.print(outputEntity));
}
 
开发者ID:cobookman,项目名称:teleport,代码行数:18,代码来源:GcsToDatastoreTest.java


示例4: testDatastoreToBq_EntityToTableRow_notransform

import org.apache.beam.sdk.transforms.DoFnTester; //导入依赖的package包/类
@Test
public void testDatastoreToBq_EntityToTableRow_notransform() throws Exception, IOException {
  DoFnTester<Entity, TableRow> fnTester = DoFnTester.of(EntityToTableRow.newBuilder()
      .setStrictCast(StaticValueProvider.of(true))
      .setTableSchemaJson(StaticValueProvider.of(mTableSchemaJson))
      .setJsTransformFunctionName(StaticValueProvider.of(null))
      .setJsTransformPath(StaticValueProvider.of(null))
  .build());

  Builder entityBuilder = Entity.newBuilder();
  JsonFormat.parser().usingTypeRegistry(
      TypeRegistry.newBuilder()
          .add(Entity.getDescriptor())
          .build())
      .merge(mEntityJson, entityBuilder);

  Entity entity = entityBuilder.build();
  List<TableRow> tableRows = fnTester.processBundle(entity);
  TableRow tr = tableRows.get(0);

  Assert.assertEquals(1, tableRows.size());
  Assert.assertEquals("key(Drawing, '31ce830e-91d0-405e-855a-abe416cadc1f')", tr.get("__key__"));
  Assert.assertEquals("79a1d9d9-e255-427a-9b09-f45157e97790", tr.get("canvasId"));
}
 
开发者ID:cobookman,项目名称:teleport,代码行数:25,代码来源:DatastoreToBqTest.java


示例5: testApply

import org.apache.beam.sdk.transforms.DoFnTester; //导入依赖的package包/类
@Test
public void testApply() {
  ObjectMapper mapperObj = new ObjectMapper();
  Map map = new HashMap();
  map.put("foo", 1);
  map.put("bar", Arrays.asList(1, 2, 3));
  Map submap = new HashMap();
  submap.put("moge", 1);
  map.put("hoge", submap);
  String jsonStr = new Gson().toJson(map);

  PubsubMessage2TableRowFn fn = new PubsubMessage2TableRowFn();
  DoFnTester<String, TableRow> fnTester = DoFnTester.of(fn);

  try {
    List<TableRow> result = fnTester.processBundle(jsonStr);
    String msg = result.get(0).get(PubsubMessage2TableRowFn.getMessageColumnName()).toString();
    assertEquals(jsonStr, msg);
  } catch (Exception e) {
    e.printStackTrace();
  }
}
 
开发者ID:yu-iskw,项目名称:google-log-aggregation-example,代码行数:23,代码来源:PubsubMessage2TableRowFnTest.java


示例6: testUserAgentAnalysisDoFn

import org.apache.beam.sdk.transforms.DoFnTester; //导入依赖的package包/类
@Test
public void testUserAgentAnalysisDoFn() throws Exception {
    DoFn<TestRecord, TestRecord> fn = new TestDoFn();

    DoFnTester<TestRecord, TestRecord> fnTester = DoFnTester.of(fn);

    // Process a bundle containing a single input element:
    TestRecord testInput = new TestRecord("Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) " +
        "Chrome/48.0.2564.82 Safari/537.36");
    List<TestRecord> testOutputs = fnTester.processBundle(testInput);

    assertEquals(1, testOutputs.size());
    TestRecord record = testOutputs.get(0);
    assertEquals("Desktop", record.deviceClass);
    assertEquals("Chrome 48.0.2564.82", record.agentNameVersion);
    assertNull(record.shouldRemainNull);
}
 
开发者ID:nielsbasjes,项目名称:yauaa,代码行数:18,代码来源:TestUserAgentAnalysisDoFnRaw.java


示例7: testFormatCounts

import org.apache.beam.sdk.transforms.DoFnTester; //导入依赖的package包/类
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testFormatCounts() throws Exception {
  DoFnTester<KV<Integer, Long>, TableRow> formatCountsFn =
      DoFnTester.of(new FormatCountsFn());
  KV empty[] = {};
  List<TableRow> results = formatCountsFn.processBundle(empty);
  Assert.assertTrue(results.size() == 0);
  KV input[] = { KV.of(3, 0L),
                 KV.of(4, Long.MAX_VALUE),
                 KV.of(5, Long.MIN_VALUE) };
  results = formatCountsFn.processBundle(input);
  Assert.assertEquals(results.size(), 3);
  Assert.assertEquals(results.get(0).get("month"), 3);
  Assert.assertEquals(results.get(0).get("tornado_count"), 0L);
  Assert.assertEquals(results.get(1).get("month"), 4);
  Assert.assertEquals(results.get(1).get("tornado_count"), Long.MAX_VALUE);
  Assert.assertEquals(results.get(2).get("month"), 5);
  Assert.assertEquals(results.get(2).get("tornado_count"), Long.MIN_VALUE);
}
 
开发者ID:apache,项目名称:beam,代码行数:21,代码来源:BigQueryTornadoesTest.java


示例8: datastoreWriterFnTest

import org.apache.beam.sdk.transforms.DoFnTester; //导入依赖的package包/类
private void datastoreWriterFnTest(int numMutations) throws Exception {
  // Create the requested number of mutations.
  List<Mutation> mutations = new ArrayList<>(numMutations);
  for (int i = 0; i < numMutations; ++i) {
    mutations.add(
        makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build());
  }

  DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID),
      null, mockDatastoreFactory, new FakeWriteBatcher());
  DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
  doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
  doFnTester.processBundle(mutations);

  int start = 0;
  while (start < numMutations) {
    int end = Math.min(numMutations, start + DATASTORE_BATCH_UPDATE_ENTITIES_START);
    CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
    commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
    commitRequest.addAllMutations(mutations.subList(start, end));
    // Verify all the batch requests were made with the expected mutations.
    verify(mockDatastore, times(1)).commit(commitRequest.build());
    start = end;
  }
}
 
开发者ID:apache,项目名称:beam,代码行数:26,代码来源:DatastoreV1Test.java


示例9: testDatatoreWriterFnRetriesErrors

import org.apache.beam.sdk.transforms.DoFnTester; //导入依赖的package包/类
/** Tests {@link DatastoreWriterFn} with a failed request which is retried. */
@Test
public void testDatatoreWriterFnRetriesErrors() throws Exception {
  List<Mutation> mutations = new ArrayList<>();
  int numRpcs = 2;
  for (int i = 0; i < DATASTORE_BATCH_UPDATE_ENTITIES_START * numRpcs; ++i) {
    mutations.add(
        makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build());
  }

  CommitResponse successfulCommit = CommitResponse.getDefaultInstance();
  when(mockDatastore.commit(any(CommitRequest.class))).thenReturn(successfulCommit)
    .thenThrow(
        new DatastoreException("commit", Code.DEADLINE_EXCEEDED, "", null))
    .thenReturn(successfulCommit);

  DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID),
      null, mockDatastoreFactory, new FakeWriteBatcher());
  DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
  doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
  doFnTester.processBundle(mutations);
}
 
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:DatastoreV1Test.java


示例10: testSplitQueryFnWithQueryLimit

import org.apache.beam.sdk.transforms.DoFnTester; //导入依赖的package包/类
/**
 * Tests {@link DatastoreV1.Read.SplitQueryFn} when the query has a user specified limit.
 */
@Test
public void testSplitQueryFnWithQueryLimit() throws Exception {
  Query queryWithLimit = QUERY.toBuilder()
      .setLimit(Int32Value.newBuilder().setValue(1))
      .build();

  SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, 10, mockDatastoreFactory);
  DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
  doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
  List<Query> queries = doFnTester.processBundle(queryWithLimit);

  assertEquals(queries.size(), 1);
  verifyNoMoreInteractions(mockDatastore);
  verifyNoMoreInteractions(mockQuerySplitter);
}
 
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:DatastoreV1Test.java


示例11: testReadFnRetriesErrors

import org.apache.beam.sdk.transforms.DoFnTester; //导入依赖的package包/类
/** Tests that {@link ReadFn} retries after an error. */
@Test
public void testReadFnRetriesErrors() throws Exception {
  // An empty query to read entities.
  Query query = Query.newBuilder().setLimit(
      Int32Value.newBuilder().setValue(1)).build();

  // Use mockResponseForQuery to generate results.
  when(mockDatastore.runQuery(any(RunQueryRequest.class)))
      .thenThrow(
          new DatastoreException("RunQuery", Code.DEADLINE_EXCEEDED, "", null))
      .thenAnswer(new Answer<RunQueryResponse>() {
        @Override
        public RunQueryResponse answer(InvocationOnMock invocationOnMock) throws Throwable {
          Query q = ((RunQueryRequest) invocationOnMock.getArguments()[0]).getQuery();
          return mockResponseForQuery(q);
        }
      });

  ReadFn readFn = new ReadFn(V_1_OPTIONS, mockDatastoreFactory);
  DoFnTester<Query, Entity> doFnTester = DoFnTester.of(readFn);
  doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
  List<Entity> entities = doFnTester.processBundle(query);
}
 
开发者ID:apache,项目名称:beam,代码行数:25,代码来源:DatastoreV1Test.java


示例12: runQuery

import org.apache.beam.sdk.transforms.DoFnTester; //导入依赖的package包/类
@Test
public void runQuery() throws Exception {
  SpannerIO.Read read =
      SpannerIO.read()
          .withProjectId("test")
          .withInstanceId("123")
          .withDatabaseId("aaa")
          .withQuery("SELECT * FROM users")
          .withServiceFactory(serviceFactory);

  NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read.getSpannerConfig());
  DoFnTester<ReadOperation, Struct> fnTester = DoFnTester.of(readFn);

  when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class)))
      .thenReturn(mockTx);
  when(mockTx.executeQuery(any(Statement.class)))
      .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS));

  List<Struct> result = fnTester.processBundle(read.getReadOperation());
  assertThat(result, Matchers.containsInAnyOrder(FAKE_ROWS.toArray()));

  verify(serviceFactory.mockDatabaseClient()).readOnlyTransaction(TimestampBound
      .strong());
  verify(mockTx).executeQuery(Statement.of("SELECT * FROM users"));
}
 
开发者ID:apache,项目名称:beam,代码行数:26,代码来源:SpannerIOReadTest.java


示例13: runRead

import org.apache.beam.sdk.transforms.DoFnTester; //导入依赖的package包/类
@Test
public void runRead() throws Exception {
  SpannerIO.Read read =
      SpannerIO.read()
          .withProjectId("test")
          .withInstanceId("123")
          .withDatabaseId("aaa")
          .withTable("users")
          .withColumns("id", "name")
          .withServiceFactory(serviceFactory);

  NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read.getSpannerConfig());
  DoFnTester<ReadOperation, Struct> fnTester = DoFnTester.of(readFn);

  when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class)))
      .thenReturn(mockTx);
  when(mockTx.read("users", KeySet.all(), Arrays.asList("id", "name")))
      .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS));

  List<Struct> result = fnTester.processBundle(read.getReadOperation());
  assertThat(result, Matchers.containsInAnyOrder(FAKE_ROWS.toArray()));

  verify(serviceFactory.mockDatabaseClient()).readOnlyTransaction(TimestampBound.strong());
  verify(mockTx).read("users", KeySet.all(), Arrays.asList("id", "name"));
}
 
开发者ID:apache,项目名称:beam,代码行数:26,代码来源:SpannerIOReadTest.java


示例14: testBatchViewAsSingletonToIsmRecord

import org.apache.beam.sdk.transforms.DoFnTester; //导入依赖的package包/类
@Test
public void testBatchViewAsSingletonToIsmRecord() throws Exception {
  DoFnTester<
          KV<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>,
          IsmRecord<WindowedValue<String>>>
      doFnTester =
          DoFnTester.of(
              new BatchViewOverrides.BatchViewAsSingleton.IsmRecordForSingularValuePerWindowDoFn<
                  String, GlobalWindow>(GlobalWindow.Coder.INSTANCE));

  assertThat(
      doFnTester.processBundle(
          ImmutableList.of(
              KV.<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>of(
                  0, ImmutableList.of(KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")))))),
      contains(IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE), valueInGlobalWindow("a"))));
}
 
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:BatchViewOverridesTest.java


示例15: testBatchViewAsSingletonToIsmRecordWithMultipleValuesThrowsException

import org.apache.beam.sdk.transforms.DoFnTester; //导入依赖的package包/类
@Test
public void testBatchViewAsSingletonToIsmRecordWithMultipleValuesThrowsException()
    throws Exception {
  DoFnTester<
          KV<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>,
          IsmRecord<WindowedValue<String>>>
      doFnTester =
          DoFnTester.of(
              new BatchViewOverrides.BatchViewAsSingleton.IsmRecordForSingularValuePerWindowDoFn<
                  String, GlobalWindow>(GlobalWindow.Coder.INSTANCE));

  thrown.expect(IllegalStateException.class);
  thrown.expectMessage("found for singleton within window");
  doFnTester.processBundle(
      ImmutableList.of(
          KV.<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>of(
              0,
              ImmutableList.of(
                  KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")),
                  KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("b"))))));
}
 
开发者ID:apache,项目名称:beam,代码行数:22,代码来源:BatchViewOverridesTest.java


示例16: testNormalize_nothing

import org.apache.beam.sdk.transforms.DoFnTester; //导入依赖的package包/类
/**
 * Invalid path to normalize => throw error
 */
@Test
public void testNormalize_nothing() throws Exception {
    NormalizeProperties properties = new NormalizeProperties("test");
    properties.init();
    properties.schemaListener.afterSchema();
    properties.isList.setValue(false);
    properties.trim.setValue(true);
    properties.discardTrailingEmptyStr.setValue(true);

    // Normalize `a` simple field
    properties.columnToNormalize.setValue(null);

    NormalizeDoFn function = new NormalizeDoFn().withProperties(properties);
    DoFnTester<IndexedRecord, IndexedRecord> fnTester = DoFnTester.of(function);
    List<IndexedRecord> outputs = fnTester.processBundle(inputParentRecord);
    Assert.assertEquals(0, outputs.size());
}
 
开发者ID:Talend,项目名称:components,代码行数:21,代码来源:NormalizeDoFnTest.java


示例17: testNormalizeSimpleFields_a

import org.apache.beam.sdk.transforms.DoFnTester; //导入依赖的package包/类
/**
 * Input parent record: {@link NormalizeDoFnTest#inputParentRecord}
 *
 * Normalize simple field: `a`
 *
 * Expected normalized results of the field `a`:
 *
 * {"a": "aaa", "b": {"x": "x1;x2", "y": {"d": {"j": [{"l": "l1"}, {"l": "l2"}], "k": "k1;k2"}, "e": "e"}}, "c":
 * {"f": "f", "g": [{"h": "h1", "i": "i2"}, {"h": "h2", "i": "i1"}]}, "m": ["m1", "m2", "m3"]}
 *
 * @throws Exception
 */
@Test
public void testNormalizeSimpleFields_a() throws Exception {
    NormalizeProperties properties = new NormalizeProperties("test");
    properties.init();
    properties.schemaListener.afterSchema();
    properties.isList.setValue(false);
    properties.trim.setValue(true);
    properties.discardTrailingEmptyStr.setValue(true);

    // Normalize `a` simple field
    properties.columnToNormalize.setValue("a");

    NormalizeDoFn function = new NormalizeDoFn().withProperties(properties);
    DoFnTester<IndexedRecord, IndexedRecord> fnTester = DoFnTester.of(function);
    List<IndexedRecord> outputs = fnTester.processBundle(inputParentRecord);
    Assert.assertEquals(1, outputs.size());
    GenericRecord outputRecord = (GenericRecord) outputs.get(0);
    Assert.assertEquals(inputParentRecord.toString(), outputRecord.toString());
    Assert.assertEquals(inputParentRecord.getSchema().toString(), outputRecord.getSchema().toString());
}
 
开发者ID:Talend,项目名称:components,代码行数:33,代码来源:NormalizeDoFnTest.java


示例18: testNormalizeSimpleFields_a_withDot

import org.apache.beam.sdk.transforms.DoFnTester; //导入依赖的package包/类
/**
 * Input parent record: {@link NormalizeDoFnTest#inputParentRecord}
 *
 * Normalize simple field: `.a`
 *
 * Expected normalized results of the field `a`:
 *
 * {"a": "aaa", "b": {"x": "x1;x2", "y": {"d": {"j": [{"l": "l1"}, {"l": "l2"}], "k": "k1;k2"}, "e": "e"}}, "c":
 * {"f": "f", "g": [{"h": "h1", "i": "i2"}, {"h": "h2", "i": "i1"}]}, "m": ["m1", "m2", "m3"]}
 *
 * @throws Exception
 */
@Test
public void testNormalizeSimpleFields_a_withDot() throws Exception {
    NormalizeProperties properties = new NormalizeProperties("test");
    properties.init();
    properties.schemaListener.afterSchema();
    properties.isList.setValue(false);
    properties.trim.setValue(true);
    properties.discardTrailingEmptyStr.setValue(true);

    // Normalize `a` simple field
    properties.columnToNormalize.setValue(".a");

    NormalizeDoFn function = new NormalizeDoFn().withProperties(properties);
    DoFnTester<IndexedRecord, IndexedRecord> fnTester = DoFnTester.of(function);
    List<IndexedRecord> outputs = fnTester.processBundle(inputParentRecord);
    Assert.assertEquals(1, outputs.size());
    GenericRecord outputRecord = (GenericRecord) outputs.get(0);
    Assert.assertEquals(inputParentRecord.toString(), outputRecord.toString());
    Assert.assertEquals(inputParentRecord.getSchema().toString(), outputRecord.getSchema().toString());
}
 
开发者ID:Talend,项目名称:components,代码行数:33,代码来源:NormalizeDoFnTest.java


示例19: testNormalizeComplexFields_b

import org.apache.beam.sdk.transforms.DoFnTester; //导入依赖的package包/类
/**
 * Input parent record: {@link NormalizeDoFnTest#inputParentRecord}
 *
 * Normalize complex field: `b`
 *
 * Expected: no change
 *
 * @throws Exception
 */
@Test
public void testNormalizeComplexFields_b() throws Exception {
    NormalizeProperties properties = new NormalizeProperties("test");
    properties.init();
    properties.schemaListener.afterSchema();

    // Normalize `b` complex field
    properties.columnToNormalize.setValue("b");

    NormalizeDoFn function = new NormalizeDoFn().withProperties(properties);
    DoFnTester<IndexedRecord, IndexedRecord> fnTester = DoFnTester.of(function);
    List<IndexedRecord> outputs = fnTester.processBundle(inputParentRecord);
    Assert.assertEquals(1, outputs.size());
    GenericRecord outputRecord = (GenericRecord) outputs.get(0);
    Assert.assertEquals(inputParentRecord.toString(), outputRecord.toString());
    Assert.assertEquals(inputParentRecord.getSchema().toString(), outputRecord.getSchema().toString());
}
 
开发者ID:Talend,项目名称:components,代码行数:27,代码来源:NormalizeDoFnTest.java


示例20: testNormalizeComplexFields_by

import org.apache.beam.sdk.transforms.DoFnTester; //导入依赖的package包/类
/**
 * Input parent record: {@link NormalizeDoFnTest#inputParentRecord}
 *
 * Normalize complex field: `b.y`
 *
 * Expected: no change
 *
 * @throws Exception
 */
@Test
public void testNormalizeComplexFields_by() throws Exception {
    NormalizeProperties properties = new NormalizeProperties("test");
    properties.init();
    properties.schemaListener.afterSchema();

    // Normalize `b.y` complex field
    properties.columnToNormalize.setValue("b.y");

    NormalizeDoFn function = new NormalizeDoFn().withProperties(properties);
    DoFnTester<IndexedRecord, IndexedRecord> fnTester = DoFnTester.of(function);
    List<IndexedRecord> outputs = fnTester.processBundle(inputParentRecord);
    Assert.assertEquals(1, outputs.size());
    GenericRecord outputRecord = (GenericRecord) outputs.get(0);
    Assert.assertEquals(inputParentRecord.toString(), outputRecord.toString());
    Assert.assertEquals(inputParentRecord.getSchema().toString(), outputRecord.getSchema().toString());

}
 
开发者ID:Talend,项目名称:components,代码行数:28,代码来源:NormalizeDoFnTest.java



注:本文中的org.apache.beam.sdk.transforms.DoFnTester类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java EV3LargeRegulatedMotor类代码示例发布时间:2022-05-22
下一篇:
Java NodeFencer类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap