• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java RichMapPartitionFunction类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.api.common.functions.RichMapPartitionFunction的典型用法代码示例。如果您正苦于以下问题:Java RichMapPartitionFunction类的具体用法?Java RichMapPartitionFunction怎么用?Java RichMapPartitionFunction使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



RichMapPartitionFunction类属于org.apache.flink.api.common.functions包,在下文中一共展示了RichMapPartitionFunction类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: testUserSpecificParallelism

import org.apache.flink.api.common.functions.RichMapPartitionFunction; //导入依赖的package包/类
/**
 * Ensure that the program parallelism can be set even if the configuration is supplied.
 */
@Test
public void testUserSpecificParallelism() throws Exception {
	Configuration config = new Configuration();
	config.setString(AkkaOptions.STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);

	final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
			cluster.getHostname(),
			cluster.getPort(),
			config
	);
	env.setParallelism(USER_DOP);
	env.getConfig().disableSysoutLogging();

	DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
			.rebalance()
			.mapPartition(new RichMapPartitionFunction<Integer, Integer>() {
				@Override
				public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
					out.collect(getRuntimeContext().getIndexOfThisSubtask());
				}
			});
	List<Integer> resultCollection = result.collect();
	assertEquals(USER_DOP, resultCollection.size());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:28,代码来源:RemoteEnvironmentITCase.java


示例2: testLocalEnvironmentWithConfig

import org.apache.flink.api.common.functions.RichMapPartitionFunction; //导入依赖的package包/类
/**
 * Ensure that the user can pass a custom configuration object to the LocalEnvironment
 */
@Test
public void testLocalEnvironmentWithConfig() throws Exception {
	Configuration conf = new Configuration();
	conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
	
	final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
	env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
	env.getConfig().disableSysoutLogging();

	DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
			.rebalance()
			.mapPartition(new RichMapPartitionFunction<Integer, Integer>() {
				@Override
				public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
					out.collect(getRuntimeContext().getIndexOfThisSubtask());
				}
			});
	List<Integer> resultCollection = result.collect();
	assertEquals(PARALLELISM, resultCollection.size());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:ExecutionEnvironmentITCase.java


示例3: testLocalEnvironmentWithConfig

import org.apache.flink.api.common.functions.RichMapPartitionFunction; //导入依赖的package包/类
/**
 * Ensure that the user can pass a custom configuration object to the LocalEnvironment.
 */
@Test
public void testLocalEnvironmentWithConfig() throws Exception {
	Configuration conf = new Configuration();
	conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);

	final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
	env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
	env.getConfig().disableSysoutLogging();

	DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
			.rebalance()
			.mapPartition(new RichMapPartitionFunction<Integer, Integer>() {
				@Override
				public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
					out.collect(getRuntimeContext().getIndexOfThisSubtask());
				}
			});
	List<Integer> resultCollection = result.collect();
	assertEquals(PARALLELISM, resultCollection.size());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:ExecutionEnvironmentITCase.java


示例4: countElementsPerPartition

import org.apache.flink.api.common.functions.RichMapPartitionFunction; //导入依赖的package包/类
/**
 * Method that goes over all the elements in each partition in order to retrieve
 * the total number of elements.
 *
 * @param input the DataSet received as input
 * @return a data set containing tuples of subtask index, number of elements mappings.
 */
public static <T> DataSet<Tuple2<Integer, Long>> countElementsPerPartition(DataSet<T> input) {
	return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Integer, Long>>() {
		@Override
		public void mapPartition(Iterable<T> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
			long counter = 0;
			for (T value : values) {
				counter++;
			}
			out.collect(new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(), counter));
		}
	});
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:DataSetUtils.java


示例5: zipWithUniqueId

import org.apache.flink.api.common.functions.RichMapPartitionFunction; //导入依赖的package包/类
/**
 * Method that assigns a unique {@link Long} value to all elements in the input data set as described below.
 * <ul>
 *  <li> a map function is applied to the input data set
 *  <li> each map task holds a counter c which is increased for each record
 *  <li> c is shifted by n bits where n = log2(number of parallel tasks)
 * 	<li> to create a unique ID among all tasks, the task id is added to the counter
 * 	<li> for each record, the resulting counter is collected
 * </ul>
 *
 * @param input the input data set
 * @return a data set of tuple 2 consisting of ids and initial values.
 */
public static <T> DataSet<Tuple2<Long, T>> zipWithUniqueId (DataSet <T> input) {

	return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Long, T>>() {

		long maxBitSize = getBitSize(Long.MAX_VALUE);
		long shifter = 0;
		long start = 0;
		long taskId = 0;
		long label = 0;

		@Override
		public void open(Configuration parameters) throws Exception {
			super.open(parameters);
			shifter = getBitSize(getRuntimeContext().getNumberOfParallelSubtasks() - 1);
			taskId = getRuntimeContext().getIndexOfThisSubtask();
		}

		@Override
		public void mapPartition(Iterable<T> values, Collector<Tuple2<Long, T>> out) throws Exception {
			for (T value : values) {
				label = (start << shifter) + taskId;

				if (getBitSize(start) + shifter < maxBitSize) {
					out.collect(new Tuple2<>(label, value));
					start++;
				} else {
					throw new Exception("Exceeded Long value range while generating labels");
				}
			}
		}
	});
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:46,代码来源:DataSetUtils.java


示例6: zipWithIndex

import org.apache.flink.api.common.functions.RichMapPartitionFunction; //导入依赖的package包/类
/**
 * Method that assigns a unique {@link Long} value to all elements in the input data set. The generated values are
 * consecutive.
 *
 * @param input the input data set
 * @return a data set of tuple 2 consisting of consecutive ids and initial values.
 */
public static <T> DataSet<Tuple2<Long, T>> zipWithIndex(DataSet<T> input) {

	DataSet<Tuple2<Integer, Long>> elementCount = countElementsPerPartition(input);

	return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Long, T>>() {

		long start = 0;

		@Override
		public void open(Configuration parameters) throws Exception {
			super.open(parameters);

			List<Tuple2<Integer, Long>> offsets = getRuntimeContext().getBroadcastVariableWithInitializer(
					"counts",
					new BroadcastVariableInitializer<Tuple2<Integer, Long>, List<Tuple2<Integer, Long>>>() {
						@Override
						public List<Tuple2<Integer, Long>> initializeBroadcastVariable(Iterable<Tuple2<Integer, Long>> data) {
							// sort the list by task id to calculate the correct offset
							List<Tuple2<Integer, Long>> sortedData = new ArrayList<>();
							for (Tuple2<Integer, Long> datum : data) {
								sortedData.add(datum);
							}
							Collections.sort(sortedData, new Comparator<Tuple2<Integer, Long>>() {
								@Override
								public int compare(Tuple2<Integer, Long> o1, Tuple2<Integer, Long> o2) {
									return o1.f0.compareTo(o2.f0);
								}
							});
							return sortedData;
						}
					});

			// compute the offset for each partition
			for (int i = 0; i < getRuntimeContext().getIndexOfThisSubtask(); i++) {
				start += offsets.get(i).f1;
			}
		}

		@Override
		public void mapPartition(Iterable<T> values, Collector<Tuple2<Long, T>> out) throws Exception {
			for (T value: values) {
				out.collect(new Tuple2<>(start++, value));
			}
		}
	}).withBroadcastSet(elementCount, "counts");
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:54,代码来源:DataSetUtils.java


示例7: testMapPartitionWithRuntimeContext

import org.apache.flink.api.common.functions.RichMapPartitionFunction; //导入依赖的package包/类
@Test
public void testMapPartitionWithRuntimeContext() {
	try {
		final String taskName = "Test Task";
		final AtomicBoolean opened = new AtomicBoolean();
		final AtomicBoolean closed = new AtomicBoolean();
		
		final MapPartitionFunction<String, Integer> parser = new RichMapPartitionFunction<String, Integer>() {
			
			@Override
			public void open(Configuration parameters) throws Exception {
				opened.set(true);
				RuntimeContext ctx = getRuntimeContext();
				assertEquals(0, ctx.getIndexOfThisSubtask());
				assertEquals(1, ctx.getNumberOfParallelSubtasks());
				assertEquals(taskName, ctx.getTaskName());
			}
			
			@Override
			public void mapPartition(Iterable<String> values, Collector<Integer> out) {
				for (String s : values) {
					out.collect(Integer.parseInt(s));
				}
			}
			
			@Override
			public void close() throws Exception {
				closed.set(true);
			}
		};
		
		MapPartitionOperatorBase<String, Integer, MapPartitionFunction<String, Integer>> op = 
				new MapPartitionOperatorBase<String, Integer, MapPartitionFunction<String,Integer>>(
				parser, new UnaryOperatorInformation<String, Integer>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), taskName);
		
		List<String> input = new ArrayList<String>(asList("1", "2", "3", "4", "5", "6"));

		final TaskInfo taskInfo = new TaskInfo(taskName, 1, 0, 1, 0);

		ExecutionConfig executionConfig = new ExecutionConfig();
		executionConfig.disableObjectReuse();
		
		List<Integer> resultMutableSafe = op.executeOnCollections(input,
				new RuntimeUDFContext(taskInfo, null, executionConfig,
						new HashMap<String, Future<Path>>(),
						new HashMap<String, Accumulator<?, ?>>(),
						new UnregisteredMetricsGroup()),
				executionConfig);
		
		executionConfig.enableObjectReuse();
		List<Integer> resultRegular = op.executeOnCollections(input,
				new RuntimeUDFContext(taskInfo, null, executionConfig,
						new HashMap<String, Future<Path>>(),
						new HashMap<String, Accumulator<?, ?>>(),
						new UnregisteredMetricsGroup()),
				executionConfig);
		
		assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);
		assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);
		
		assertTrue(opened.get());
		assertTrue(closed.get());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:69,代码来源:PartitionMapOperatorTest.java


示例8: testMapPartitionWithRuntimeContext

import org.apache.flink.api.common.functions.RichMapPartitionFunction; //导入依赖的package包/类
@Test
public void testMapPartitionWithRuntimeContext() {
	try {
		final String taskName = "Test Task";
		final AtomicBoolean opened = new AtomicBoolean();
		final AtomicBoolean closed = new AtomicBoolean();
		
		final MapPartitionFunction<String, Integer> parser = new RichMapPartitionFunction<String, Integer>() {
			
			@Override
			public void open(Configuration parameters) throws Exception {
				opened.set(true);
				RuntimeContext ctx = getRuntimeContext();
				assertEquals(0, ctx.getIndexOfThisSubtask());
				assertEquals(1, ctx.getNumberOfParallelSubtasks());
				assertEquals(taskName, ctx.getTaskName());
			}
			
			@Override
			public void mapPartition(Iterable<String> values, Collector<Integer> out) {
				for (String s : values) {
					out.collect(Integer.parseInt(s));
				}
			}
			
			@Override
			public void close() throws Exception {
				closed.set(true);
			}
		};
		
		MapPartitionOperatorBase<String, Integer, MapPartitionFunction<String, Integer>> op = 
				new MapPartitionOperatorBase<String, Integer, MapPartitionFunction<String,Integer>>(
				parser, new UnaryOperatorInformation<String, Integer>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), taskName);
		
		List<String> input = new ArrayList<String>(asList("1", "2", "3", "4", "5", "6"));
		
		List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), true);
		List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), false);
		
		assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);
		assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);
		
		assertTrue(opened.get());
		assertTrue(closed.get());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:52,代码来源:PartitionMapOperatorTest.java



注:本文中的org.apache.flink.api.common.functions.RichMapPartitionFunction类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java AbstractCfgNodeTraversalCallback类代码示例发布时间:2022-05-22
下一篇:
Java RMFatalEventType类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap