本文整理汇总了Java中org.apache.flink.core.io.InputSplit类的典型用法代码示例。如果您正苦于以下问题:Java InputSplit类的具体用法?Java InputSplit怎么用?Java InputSplit使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
InputSplit类属于org.apache.flink.core.io包,在下文中一共展示了InputSplit类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: testCassandraBatchFormats
import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Test
public void testCassandraBatchFormats() throws Exception {
OutputFormat<Tuple3<String, Integer, Integer>> sink = new CassandraOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder);
sink.configure(new Configuration());
sink.open(0, 1);
for (Tuple3<String, Integer, Integer> value : collection) {
sink.writeRecord(value);
}
sink.close();
InputFormat<Tuple3<String, Integer, Integer>, InputSplit> source = new CassandraInputFormat<>(injectTableName(SELECT_DATA_QUERY), builder);
source.configure(new Configuration());
source.open(null);
List<Tuple3<String, Integer, Integer>> result = new ArrayList<>();
while (!source.reachedEnd()) {
result.add(source.nextRecord(new Tuple3<String, Integer, Integer>()));
}
source.close();
Assert.assertEquals(20, result.size());
}
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:CassandraConnectorITCase.java
示例2: readObject
import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
// read the parent fields and the final fields
in.defaultReadObject();
// the job conf knows how to deserialize itself
jobConf = new JobConf();
jobConf.readFields(in);
try {
hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance(splitType);
}
catch (Exception e) {
throw new RuntimeException("Unable to instantiate Hadoop InputSplit", e);
}
if (hadoopInputSplit instanceof Configurable) {
((Configurable) hadoopInputSplit).setConf(this.jobConf);
}
else if (hadoopInputSplit instanceof JobConfigurable) {
((JobConfigurable) hadoopInputSplit).configure(this.jobConf);
}
hadoopInputSplit.readFields(in);
}
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:HadoopInputSplit.java
示例3: testJDBCInputFormatWithParallelismAndGenericSplitting
import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Test
public void testJDBCInputFormatWithParallelismAndGenericSplitting() throws IOException {
Serializable[][] queryParameters = new String[2][1];
queryParameters[0] = new String[]{TEST_DATA[3].author};
queryParameters[1] = new String[]{TEST_DATA[0].author};
ParameterValuesProvider paramProvider = new GenericParameterValuesProvider(queryParameters);
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(DRIVER_CLASS)
.setDBUrl(DB_URL)
.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR)
.setRowTypeInfo(ROW_TYPE_INFO)
.setParametersProvider(paramProvider)
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
.finish();
jdbcInputFormat.openInputFormat();
InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
//this query exploit parallelism (1 split for every queryParameters row)
Assert.assertEquals(queryParameters.length, splits.length);
verifySplit(splits[0], TEST_DATA[3].id);
verifySplit(splits[1], TEST_DATA[0].id + TEST_DATA[1].id);
jdbcInputFormat.closeInputFormat();
}
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:JDBCInputFormatTest.java
示例4: verifySplit
import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
private void verifySplit(InputSplit split, int expectedIDSum) throws IOException {
int sum = 0;
Row row = new Row(5);
jdbcInputFormat.open(split);
while (!jdbcInputFormat.reachedEnd()) {
row = jdbcInputFormat.nextRecord(row);
int id = ((int) row.getField(0));
int testDataIndex = id - 1001;
assertEquals(TEST_DATA[testDataIndex], row);
sum += id;
}
Assert.assertEquals(expectedIDSum, sum);
}
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:JDBCInputFormatTest.java
示例5: getNextInputSplit
import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Override
public InputSplit getNextInputSplit(String host, int taskId) {
InputSplit next = null;
// keep the synchronized part short
synchronized (this.splits) {
if (this.splits.size() > 0) {
next = this.splits.remove(this.splits.size() - 1);
}
}
if (LOG.isDebugEnabled()) {
if (next == null) {
LOG.debug("No more input splits available");
} else {
LOG.debug("Assigning split " + next + " to " + host);
}
}
return next;
}
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:DefaultInputSplitAssigner.java
示例6: testSerialSplitAssignment
import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Test
public void testSerialSplitAssignment() {
try {
final int NUM_SPLITS = 50;
Set<InputSplit> splits = new HashSet<InputSplit>();
for (int i = 0; i < NUM_SPLITS; i++) {
splits.add(new GenericInputSplit(i, NUM_SPLITS));
}
DefaultInputSplitAssigner ia = new DefaultInputSplitAssigner(splits);
InputSplit is = null;
while ((is = ia.getNextInputSplit("", 0)) != null) {
assertTrue(splits.remove(is));
}
assertTrue(splits.isEmpty());
assertNull(ia.getNextInputSplit("", 0));
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:DefaultSplitAssignerTest.java
示例7: getNextInputSplit
import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Override
public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException {
Preconditions.checkNotNull(userCodeClassLoader);
CompletableFuture<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(
jobVertexID,
executionAttemptID);
try {
SerializedInputSplit serializedInputSplit = futureInputSplit.get(timeout.getSize(), timeout.getUnit());
if (serializedInputSplit.isEmpty()) {
return null;
} else {
return InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(), userCodeClassLoader);
}
} catch (Exception e) {
throw new InputSplitProviderException("Requesting the next input split failed.", e);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:RpcInputSplitProvider.java
示例8: testRequestNextInputSplitWithInvalidExecutionID
import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Test
public void testRequestNextInputSplitWithInvalidExecutionID() throws InputSplitProviderException {
final JobID jobID = new JobID();
final JobVertexID vertexID = new JobVertexID();
final ExecutionAttemptID executionID = new ExecutionAttemptID();
final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
final ActorGateway gateway = new NullInputSplitGateway();
final TaskInputSplitProvider provider = new TaskInputSplitProvider(
gateway,
jobID,
vertexID,
executionID,
timeout);
// The jobManager will return a
InputSplit nextInputSplit = provider.getNextInputSplit(getClass().getClassLoader());
assertTrue(nextInputSplit == null);
}
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:TaskInputSplitProviderTest.java
示例9: testInputFormatVertex
import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Test
public void testInputFormatVertex() {
try {
final TestInputFormat inputFormat = new TestInputFormat();
final InputFormatVertex vertex = new InputFormatVertex("Name");
new TaskConfig(vertex.getConfiguration()).setStubWrapper(new UserCodeObjectWrapper<InputFormat<?, ?>>(inputFormat));
final ClassLoader cl = getClass().getClassLoader();
vertex.initializeOnMaster(cl);
InputSplit[] splits = vertex.getInputSplitSource().createInputSplits(77);
assertNotNull(splits);
assertEquals(1, splits.length);
assertEquals(TestSplit.class, splits[0].getClass());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:22,代码来源:JobTaskVertexTest.java
示例10: createInputSplits
import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Override
public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
Assert.assertTrue(isConfigured);
InputSplit[] splits = new InputSplit[minNumSplits];
for (int i = 0; i < minNumSplits; i++) {
final int idx = i;
splits[idx] = new InputSplit() {
private static final long serialVersionUID = -1480792932361908285L;
@Override
public int getSplitNumber() {
return idx;
}
};
}
return splits;
}
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:InputFormatSourceFunctionTest.java
示例11: getInputSplitProvider
import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Override
public InputSplitProvider getInputSplitProvider() {
try {
this.inputSplits = format.createInputSplits(noOfSplits);
Assert.assertTrue(inputSplits.length == noOfSplits);
} catch (IOException e) {
e.printStackTrace();
}
return new InputSplitProvider() {
@Override
public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) {
if (nextSplit < inputSplits.length) {
return inputSplits[nextSplit++];
}
return null;
}
};
}
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:InputFormatSourceFunctionTest.java
示例12: read
import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Override
public void read(DataInputView in) throws IOException {
this.splitNumber=in.readInt();
this.hadoopInputSplitTypeName = in.readUTF();
if(hadoopInputSplit == null) {
try {
Class<? extends org.apache.hadoop.io.Writable> inputSplit =
Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class);
this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit );
}
catch (Exception e) {
throw new RuntimeException("Unable to create InputSplit", e);
}
}
jobConf = new JobConf();
jobConf.readFields(in);
if (this.hadoopInputSplit instanceof Configurable) {
((Configurable) this.hadoopInputSplit).setConf(this.jobConf);
}
this.hadoopInputSplit.readFields(in);
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:23,代码来源:HadoopInputSplit.java
示例13: read
import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Override
public void read(DataInputView in) throws IOException {
this.splitNumber=in.readInt();
String className = in.readUTF();
if(this.mapreduceInputSplit == null) {
try {
Class<? extends org.apache.hadoop.io.Writable> inputSplit =
Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class);
this.mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(inputSplit);
} catch (Exception e) {
throw new RuntimeException("Unable to create InputSplit", e);
}
}
((Writable)this.mapreduceInputSplit).readFields(in);
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:17,代码来源:HadoopInputSplit.java
示例14: executeOnCollections
import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
protected List<OUT> executeOnCollections(boolean mutableObjectSafe) throws Exception {
@SuppressWarnings("unchecked")
InputFormat<OUT, InputSplit> inputFormat = (InputFormat<OUT, InputSplit>) this.formatWrapper.getUserCodeObject();
inputFormat.configure(this.parameters);
List<OUT> result = new ArrayList<OUT>();
// splits
InputSplit[] splits = inputFormat.createInputSplits(1);
TypeSerializer<OUT> serializer = getOperatorInfo().getOutputType().createSerializer();
for (InputSplit split : splits) {
inputFormat.open(split);
while (!inputFormat.reachedEnd()) {
OUT next = inputFormat.nextRecord(serializer.createInstance());
if (next != null) {
result.add(mutableObjectSafe ? serializer.copy(next) : next);
}
}
inputFormat.close();
}
return result;
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:27,代码来源:GenericDataSourceBase.java
示例15: getNextInputSplit
import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Override
public InputSplit getNextInputSplit(String host) {
InputSplit next = null;
// keep the synchronized part short
synchronized (this.splits) {
if (this.splits.size() > 0) {
next = this.splits.remove(this.splits.size() - 1);
}
}
if (LOG.isDebugEnabled()) {
if (next == null) {
LOG.debug("No more input splits available");
} else {
LOG.debug("Assigning split " + next + " to " + host);
}
}
return next;
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:21,代码来源:DefaultInputSplitAssigner.java
示例16: testSerialSplitAssignment
import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Test
public void testSerialSplitAssignment() {
try {
final int NUM_SPLITS = 50;
Set<InputSplit> splits = new HashSet<InputSplit>();
for (int i = 0; i < NUM_SPLITS; i++) {
splits.add(new GenericInputSplit(i, NUM_SPLITS));
}
DefaultInputSplitAssigner ia = new DefaultInputSplitAssigner(splits);
InputSplit is = null;
while ((is = ia.getNextInputSplit("")) != null) {
assertTrue(splits.remove(is));
}
assertTrue(splits.isEmpty());
assertNull(ia.getNextInputSplit(""));
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:25,代码来源:DefaultSplitAssignerTest.java
示例17: HadoopInputSplit
import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
public HadoopInputSplit(int splitNumber, org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) {
super(splitNumber, (String) null);
if (hInputSplit == null) {
throw new NullPointerException("Hadoop input split must not be null");
}
if (jobconf == null) {
throw new NullPointerException("Hadoop JobConf must not be null");
}
this.splitType = hInputSplit.getClass();
this.jobConf = jobconf;
this.hadoopInputSplit = hInputSplit;
}
开发者ID:axbaretto,项目名称:flink,代码行数:16,代码来源:HadoopInputSplit.java
示例18: HadoopInputSplit
import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
public HadoopInputSplit(int splitNumber, org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit, JobContext jobContext) {
super(splitNumber, (String) null);
if (mapreduceInputSplit == null) {
throw new NullPointerException("Hadoop input split must not be null");
}
if (!(mapreduceInputSplit instanceof Writable)) {
throw new IllegalArgumentException("InputSplit must implement Writable interface.");
}
this.splitType = mapreduceInputSplit.getClass();
this.mapreduceInputSplit = mapreduceInputSplit;
}
开发者ID:axbaretto,项目名称:flink,代码行数:13,代码来源:HadoopInputSplit.java
示例19: readObject
import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
// read the parent fields and the final fields
in.defaultReadObject();
try {
Class<? extends Writable> writableSplit = splitType.asSubclass(Writable.class);
mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(writableSplit);
}
catch (Exception e) {
throw new RuntimeException("Unable to instantiate the Hadoop InputSplit", e);
}
((Writable) mapreduceInputSplit).readFields(in);
}
开发者ID:axbaretto,项目名称:flink,代码行数:16,代码来源:HadoopInputSplit.java
示例20: createInputSplits
import org.apache.flink.core.io.InputSplit; //导入依赖的package包/类
@Override
public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
if (parameterValues == null) {
return new GenericInputSplit[]{new GenericInputSplit(0, 1)};
}
GenericInputSplit[] ret = new GenericInputSplit[parameterValues.length];
for (int i = 0; i < ret.length; i++) {
ret[i] = new GenericInputSplit(i, ret.length);
}
return ret;
}
开发者ID:axbaretto,项目名称:flink,代码行数:12,代码来源:JDBCInputFormat.java
注:本文中的org.apache.flink.core.io.InputSplit类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论