• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java InputSplit类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.core.io.InputSplit的典型用法代码示例。如果您正苦于以下问题:Java InputSplit类的具体用法?Java InputSplit怎么用?Java InputSplit使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



InputSplit类属于org.apache.flink.core.io包,在下文中一共展示了InputSplit类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: testCassandraBatchFormats

import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Test
public void testCassandraBatchFormats() throws Exception {
	OutputFormat<Tuple3<String, Integer, Integer>> sink = new CassandraOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder);
	sink.configure(new Configuration());
	sink.open(0, 1);

	for (Tuple3<String, Integer, Integer> value : collection) {
		sink.writeRecord(value);
	}

	sink.close();

	InputFormat<Tuple3<String, Integer, Integer>, InputSplit> source = new CassandraInputFormat<>(injectTableName(SELECT_DATA_QUERY), builder);
	source.configure(new Configuration());
	source.open(null);

	List<Tuple3<String, Integer, Integer>> result = new ArrayList<>();

	while (!source.reachedEnd()) {
		result.add(source.nextRecord(new Tuple3<String, Integer, Integer>()));
	}

	source.close();
	Assert.assertEquals(20, result.size());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:CassandraConnectorITCase.java


示例2: readObject

import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
	// read the parent fields and the final fields
	in.defaultReadObject();

	// the job conf knows how to deserialize itself
	jobConf = new JobConf();
	jobConf.readFields(in);

	try {
		hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance(splitType);
	}
	catch (Exception e) {
		throw new RuntimeException("Unable to instantiate Hadoop InputSplit", e);
	}

	if (hadoopInputSplit instanceof Configurable) {
		((Configurable) hadoopInputSplit).setConf(this.jobConf);
	}
	else if (hadoopInputSplit instanceof JobConfigurable) {
		((JobConfigurable) hadoopInputSplit).configure(this.jobConf);
	}
	hadoopInputSplit.readFields(in);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:HadoopInputSplit.java


示例3: testJDBCInputFormatWithParallelismAndGenericSplitting

import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Test
public void testJDBCInputFormatWithParallelismAndGenericSplitting() throws IOException {
	Serializable[][] queryParameters = new String[2][1];
	queryParameters[0] = new String[]{TEST_DATA[3].author};
	queryParameters[1] = new String[]{TEST_DATA[0].author};
	ParameterValuesProvider paramProvider = new GenericParameterValuesProvider(queryParameters);
	jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
			.setDrivername(DRIVER_CLASS)
			.setDBUrl(DB_URL)
			.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR)
			.setRowTypeInfo(ROW_TYPE_INFO)
			.setParametersProvider(paramProvider)
			.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
			.finish();

	jdbcInputFormat.openInputFormat();
	InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
	//this query exploit parallelism (1 split for every queryParameters row)
	Assert.assertEquals(queryParameters.length, splits.length);

	verifySplit(splits[0], TEST_DATA[3].id);
	verifySplit(splits[1], TEST_DATA[0].id + TEST_DATA[1].id);

	jdbcInputFormat.closeInputFormat();
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:JDBCInputFormatTest.java


示例4: verifySplit

import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
private void verifySplit(InputSplit split, int expectedIDSum) throws IOException {
	int sum = 0;

	Row row =  new Row(5);
	jdbcInputFormat.open(split);
	while (!jdbcInputFormat.reachedEnd()) {
		row = jdbcInputFormat.nextRecord(row);

		int id = ((int) row.getField(0));
		int testDataIndex = id - 1001;

		assertEquals(TEST_DATA[testDataIndex], row);
		sum += id;
	}

	Assert.assertEquals(expectedIDSum, sum);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:JDBCInputFormatTest.java


示例5: getNextInputSplit

import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Override
public InputSplit getNextInputSplit(String host, int taskId) {
	InputSplit next = null;
	
	// keep the synchronized part short
	synchronized (this.splits) {
		if (this.splits.size() > 0) {
			next = this.splits.remove(this.splits.size() - 1);
		}
	}
	
	if (LOG.isDebugEnabled()) {
		if (next == null) {
			LOG.debug("No more input splits available");
		} else {
			LOG.debug("Assigning split " + next + " to " + host);
		}
	}
	return next;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:DefaultInputSplitAssigner.java


示例6: testSerialSplitAssignment

import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Test
public void testSerialSplitAssignment() {
	try {
		final int NUM_SPLITS = 50;
		
		Set<InputSplit> splits = new HashSet<InputSplit>();
		for (int i = 0; i < NUM_SPLITS; i++) {
			splits.add(new GenericInputSplit(i, NUM_SPLITS));
		}
		
		DefaultInputSplitAssigner ia = new DefaultInputSplitAssigner(splits);
		InputSplit is = null;
		while ((is = ia.getNextInputSplit("", 0)) != null) {
			assertTrue(splits.remove(is));
		}
		
		assertTrue(splits.isEmpty());
		assertNull(ia.getNextInputSplit("", 0));
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:DefaultSplitAssignerTest.java


示例7: getNextInputSplit

import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Override
public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException {
	Preconditions.checkNotNull(userCodeClassLoader);

	CompletableFuture<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(
		jobVertexID,
		executionAttemptID);

	try {
		SerializedInputSplit serializedInputSplit = futureInputSplit.get(timeout.getSize(), timeout.getUnit());

		if (serializedInputSplit.isEmpty()) {
			return null;
		} else {
			return InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(), userCodeClassLoader);
		}
	} catch (Exception e) {
		throw new InputSplitProviderException("Requesting the next input split failed.", e);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:RpcInputSplitProvider.java


示例8: testRequestNextInputSplitWithInvalidExecutionID

import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Test
public void testRequestNextInputSplitWithInvalidExecutionID() throws InputSplitProviderException {

	final JobID jobID = new JobID();
	final JobVertexID vertexID = new JobVertexID();
	final ExecutionAttemptID executionID = new ExecutionAttemptID();
	final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);

	final ActorGateway gateway = new NullInputSplitGateway();


	final TaskInputSplitProvider provider = new TaskInputSplitProvider(
		gateway,
		jobID,
		vertexID,
		executionID,
		timeout);

	// The jobManager will return a
	InputSplit nextInputSplit = provider.getNextInputSplit(getClass().getClassLoader());

	assertTrue(nextInputSplit == null);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:TaskInputSplitProviderTest.java


示例9: testInputFormatVertex

import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Test
public void testInputFormatVertex() {
	try {
		final TestInputFormat inputFormat = new TestInputFormat();
		final InputFormatVertex vertex = new InputFormatVertex("Name");
		new TaskConfig(vertex.getConfiguration()).setStubWrapper(new UserCodeObjectWrapper<InputFormat<?, ?>>(inputFormat));
		
		final ClassLoader cl = getClass().getClassLoader();
		
		vertex.initializeOnMaster(cl);
		InputSplit[] splits = vertex.getInputSplitSource().createInputSplits(77);
		
		assertNotNull(splits);
		assertEquals(1, splits.length);
		assertEquals(TestSplit.class, splits[0].getClass());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:22,代码来源:JobTaskVertexTest.java


示例10: createInputSplits

import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Override
public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
	Assert.assertTrue(isConfigured);
	InputSplit[] splits = new InputSplit[minNumSplits];
	for (int i = 0; i < minNumSplits; i++) {
		final int idx = i;
		splits[idx] = new InputSplit() {
			private static final long serialVersionUID = -1480792932361908285L;

			@Override
			public int getSplitNumber() {
				return idx;
			}
		};
	}
	return splits;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:InputFormatSourceFunctionTest.java


示例11: getInputSplitProvider

import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Override
public InputSplitProvider getInputSplitProvider() {
	try {
		this.inputSplits = format.createInputSplits(noOfSplits);
		Assert.assertTrue(inputSplits.length == noOfSplits);
	} catch (IOException e) {
		e.printStackTrace();
	}

	return new InputSplitProvider() {
		@Override
		public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) {
			if (nextSplit < inputSplits.length) {
				return inputSplits[nextSplit++];
			}
			return null;
		}
	};
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:InputFormatSourceFunctionTest.java


示例12: read

import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Override
public void read(DataInputView in) throws IOException {
	this.splitNumber=in.readInt();
	this.hadoopInputSplitTypeName = in.readUTF();
	if(hadoopInputSplit == null) {
		try {
			Class<? extends org.apache.hadoop.io.Writable> inputSplit =
					Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class);
			this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit );
		}
		catch (Exception e) {
			throw new RuntimeException("Unable to create InputSplit", e);
		}
	}
	jobConf = new JobConf();
	jobConf.readFields(in);
	if (this.hadoopInputSplit instanceof Configurable) {
		((Configurable) this.hadoopInputSplit).setConf(this.jobConf);
	}
	this.hadoopInputSplit.readFields(in);

}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:23,代码来源:HadoopInputSplit.java


示例13: read

import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Override
public void read(DataInputView in) throws IOException {
	this.splitNumber=in.readInt();
	String className = in.readUTF();
	
	if(this.mapreduceInputSplit == null) {
		try {
			Class<? extends org.apache.hadoop.io.Writable> inputSplit = 
					Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class);
			this.mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(inputSplit);
		} catch (Exception e) {
			throw new RuntimeException("Unable to create InputSplit", e);
		}
	}
	((Writable)this.mapreduceInputSplit).readFields(in);
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:17,代码来源:HadoopInputSplit.java


示例14: executeOnCollections

import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
protected List<OUT> executeOnCollections(boolean mutableObjectSafe) throws Exception {
	@SuppressWarnings("unchecked")
	InputFormat<OUT, InputSplit> inputFormat = (InputFormat<OUT, InputSplit>) this.formatWrapper.getUserCodeObject();
	inputFormat.configure(this.parameters);
	
	List<OUT> result = new ArrayList<OUT>();
	
	// splits
	InputSplit[] splits = inputFormat.createInputSplits(1);
	TypeSerializer<OUT> serializer = getOperatorInfo().getOutputType().createSerializer();
	
	for (InputSplit split : splits) {
		inputFormat.open(split);
		
		while (!inputFormat.reachedEnd()) {
			OUT next = inputFormat.nextRecord(serializer.createInstance());
			if (next != null) {
				result.add(mutableObjectSafe ? serializer.copy(next) : next);
			}
		}
		
		inputFormat.close();
	}
	
	return result;
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:27,代码来源:GenericDataSourceBase.java


示例15: getNextInputSplit

import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Override
public InputSplit getNextInputSplit(String host) {
	InputSplit next = null;
	
	// keep the synchronized part short
	synchronized (this.splits) {
		if (this.splits.size() > 0) {
			next = this.splits.remove(this.splits.size() - 1);
		}
	}
	
	if (LOG.isDebugEnabled()) {
		if (next == null) {
			LOG.debug("No more input splits available");
		} else {
			LOG.debug("Assigning split " + next + " to " + host);
		}
	}
	return next;
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:21,代码来源:DefaultInputSplitAssigner.java


示例16: testSerialSplitAssignment

import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Test
public void testSerialSplitAssignment() {
	try {
		final int NUM_SPLITS = 50;
		
		Set<InputSplit> splits = new HashSet<InputSplit>();
		for (int i = 0; i < NUM_SPLITS; i++) {
			splits.add(new GenericInputSplit(i, NUM_SPLITS));
		}
		
		DefaultInputSplitAssigner ia = new DefaultInputSplitAssigner(splits);
		InputSplit is = null;
		while ((is = ia.getNextInputSplit("")) != null) {
			assertTrue(splits.remove(is));
		}
		
		assertTrue(splits.isEmpty());
		assertNull(ia.getNextInputSplit(""));
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:25,代码来源:DefaultSplitAssignerTest.java


示例17: HadoopInputSplit

import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
public HadoopInputSplit(int splitNumber, org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) {
	super(splitNumber, (String) null);

	if (hInputSplit == null) {
		throw new NullPointerException("Hadoop input split must not be null");
	}
	if (jobconf == null) {
		throw new NullPointerException("Hadoop JobConf must not be null");
	}

	this.splitType = hInputSplit.getClass();

	this.jobConf = jobconf;
	this.hadoopInputSplit = hInputSplit;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:16,代码来源:HadoopInputSplit.java


示例18: HadoopInputSplit

import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
public HadoopInputSplit(int splitNumber, org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit, JobContext jobContext) {
	super(splitNumber, (String) null);

	if (mapreduceInputSplit == null) {
		throw new NullPointerException("Hadoop input split must not be null");
	}
	if (!(mapreduceInputSplit instanceof Writable)) {
		throw new IllegalArgumentException("InputSplit must implement Writable interface.");
	}
	this.splitType = mapreduceInputSplit.getClass();
	this.mapreduceInputSplit = mapreduceInputSplit;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:13,代码来源:HadoopInputSplit.java


示例19: readObject

import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
	// read the parent fields and the final fields
	in.defaultReadObject();

	try {
		Class<? extends Writable> writableSplit = splitType.asSubclass(Writable.class);
		mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(writableSplit);
	}

	catch (Exception e) {
		throw new RuntimeException("Unable to instantiate the Hadoop InputSplit", e);
	}

	((Writable) mapreduceInputSplit).readFields(in);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:16,代码来源:HadoopInputSplit.java


示例20: createInputSplits

import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Override
public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
	if (parameterValues == null) {
		return new GenericInputSplit[]{new GenericInputSplit(0, 1)};
	}
	GenericInputSplit[] ret = new GenericInputSplit[parameterValues.length];
	for (int i = 0; i < ret.length; i++) {
		ret[i] = new GenericInputSplit(i, ret.length);
	}
	return ret;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:12,代码来源:JDBCInputFormat.java



注:本文中的org.apache.flink.core.io.InputSplit类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java BmobNotifyManager类代码示例发布时间:2022-05-23
下一篇:
Java YamlRepresenter类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap