本文整理汇总了Java中org.apache.flink.api.java.io.TextInputFormat类的典型用法代码示例。如果您正苦于以下问题:Java TextInputFormat类的具体用法?Java TextInputFormat怎么用?Java TextInputFormat使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
TextInputFormat类属于org.apache.flink.api.java.io包,在下文中一共展示了TextInputFormat类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: main
import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
public static void main(String... args) throws Exception {
File txtFile = new File("/tmp/test/file.txt");
File csvFile = new File("/tmp/test/file.csv");
File binFile = new File("/tmp/test/file.bin");
writeToFile(txtFile, "txt");
writeToFile(csvFile, "csv");
writeToFile(binFile, "bin");
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
final TextInputFormat format = new TextInputFormat(new Path("/tmp/test"));
GlobFilePathFilter filesFilter = new GlobFilePathFilter(
Collections.singletonList("**"),
Arrays.asList("**/file.bin")
);
System.out.println(Arrays.toString(GlobFilePathFilter.class.getDeclaredFields()));
format.setFilesFilter(filesFilter);
DataSet<String> result = env.readFile(format, "/tmp");
result.writeAsText("/temp/out");
env.execute("GlobFilePathFilter-Test");
}
开发者ID:mushketyk,项目名称:flink-examples,代码行数:25,代码来源:GlobExample.java
示例2: testInvalidPathSpecification
import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
@Test
public void testInvalidPathSpecification() throws Exception {
String invalidPath = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() + "/invalid/";
TextInputFormat format = new TextInputFormat(new Path(invalidPath));
ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
try {
monitoringFunction.run(new DummySourceContext() {
@Override
public void collect(TimestampedFileInputSplit element) {
// we should never arrive here with an invalid path
Assert.fail("Test passes with an invalid path.");
}
});
// we should never arrive here with an invalid path
Assert.fail("Test passed with an invalid path.");
} catch (FileNotFoundException e) {
Assert.assertEquals("The provided file path " + format.getFilePath() + " does not exist.", e.getMessage());
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:ContinuousFileProcessingTest.java
示例3: testProgram
import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
@Override
public void testProgram(StreamExecutionEnvironment env) {
// set the restart strategy.
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(NO_OF_RETRIES, 0));
env.enableCheckpointing(10);
// create and start the file creating thread.
fc = new FileCreator();
fc.start();
// create the monitoring source along with the necessary readers.
TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
DataStream<String> inputStream = env.readFile(format, localFsURI,
FileProcessingMode.PROCESS_CONTINUOUSLY, INTERVAL);
TestingSinkFunction sink = new TestingSinkFunction();
inputStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
out.collect(value);
}
}).addSink(sink).setParallelism(1);
}
开发者ID:axbaretto,项目名称:flink,代码行数:28,代码来源:ContinuousFileProcessingCheckpointITCase.java
示例4: testSortingOnModTime
import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
@Test
public void testSortingOnModTime() throws Exception {
String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
final long[] modTimes = new long[NO_OF_FILES];
final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES];
for (int i = 0; i < NO_OF_FILES; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> file =
createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
Thread.sleep(400);
filesCreated[i] = file.f0;
modTimes[i] = hdfs.getFileStatus(file.f0).getModificationTime();
}
TextInputFormat format = new TextInputFormat(new Path(testBasePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
// this is just to verify that all splits have been forwarded later.
FileInputSplit[] splits = format.createInputSplits(1);
ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
ModTimeVerifyingSourceContext context = new ModTimeVerifyingSourceContext(modTimes);
monitoringFunction.open(new Configuration());
monitoringFunction.run(context);
Assert.assertEquals(splits.length, context.getCounter());
// delete the created files.
for (int i = 0; i < NO_OF_FILES; i++) {
hdfs.delete(filesCreated[i], false);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:38,代码来源:ContinuousFileProcessingTest.java
示例5: readWikiDump
import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
public static DataSource<String> readWikiDump(FlinkMlpCommandConfig config, ExecutionEnvironment env) {
Path filePath = new Path(config.getDataset());
TextInputFormat inp = new TextInputFormat(filePath);
inp.setCharsetName("UTF-8");
inp.setDelimiter("</page>");
return env.readFile(inp, config.getDataset());
}
开发者ID:ag-gipp,项目名称:mathosphere,代码行数:8,代码来源:FlinkMlpRelationFinder.java
示例6: main
import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
// FlinkPdCommandConfig config = FlinkPdCommandConfig.from(args);
// run(config);
// final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// final String filename = URLDecoder.decode("file:/C:/git/flink/readFileTest/target/classes/ex1.html", "UTF-8");
// Path filePath = new Path(filename);
// TextInputFormat inp = new TextInputFormat(filePath);
// inp.setCharsetName("UTF-8");
// inp.setDelimiter("</ARXIVFILESPLIT>");
// final DataSource<String> source = env.readFile(inp, filename);
// source.writeAsText("test", org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);
// env.execute();
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
ClassLoader classLoader = WordCount.class.getClassLoader();
URL resource = classLoader.getResource("com/formulasearchengine/mathosphere/mathpd/ex1.html");
final String filename = URLDecoder.decode(resource.getFile(), "UTF-8");
//final String filename = URLDecoder.decode("file:/C:/git/flink/readFileTest/target/classes/ex1.html", "UTF-8");
Path filePath = new Path(filename);
TextInputFormat inp = new TextInputFormat(filePath);
inp.setCharsetName("UTF-8");
inp.setDelimiter("</ARXIVFILESPLIT>");
final DataSource<String> source = env.readFile(inp, filename);
// DataSet<Tuple2<String, Integer>> counts =
// // split up the lines in pairs (2-tuples) containing: (word,1)
// source.flatMap(new LineSplitter())
// // group by the tuple field "0" and sum up tuple field "1"
// .groupBy(0)
// .sum(1);
// execute and print result
//counts.print();
source.writeAsText("test", FileSystem.WriteMode.OVERWRITE);
env.execute();
}
开发者ID:ag-gipp,项目名称:mathosphere,代码行数:37,代码来源:FlinkPd.java
示例7: readWikiDump
import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
public static DataSource<String> readWikiDump(FlinkPdCommandConfig config, ExecutionEnvironment env) {
Path filePath = new Path(config.getDataset());
TextInputFormat inp = new TextInputFormat(filePath);
inp.setCharsetName("UTF-8");
inp.setDelimiter("</ARXIVFILESPLIT>");
return env.readFile(inp, config.getDataset());
}
开发者ID:ag-gipp,项目名称:mathosphere,代码行数:8,代码来源:FlinkPd.java
示例8: readRefs
import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
public static DataSource<String> readRefs(FlinkPdCommandConfig config, ExecutionEnvironment env) {
Path filePath = new Path(config.getRef());
TextInputFormat inp = new TextInputFormat(filePath);
inp.setCharsetName("UTF-8");
inp.setDelimiter("</ARXIVFILESPLIT>");
return env.readFile(inp, config.getRef());
}
开发者ID:ag-gipp,项目名称:mathosphere,代码行数:8,代码来源:FlinkPd.java
示例9: readPreprocessedFile
import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
public static DataSource<String> readPreprocessedFile(String pathname, ExecutionEnvironment env) {
Path filePath = new Path(pathname);
TextInputFormat inp = new TextInputFormat(filePath);
inp.setCharsetName("UTF-8");
// env.read
return env.readFile(inp, pathname);
}
开发者ID:ag-gipp,项目名称:mathosphere,代码行数:8,代码来源:FlinkPd.java
示例10: main
import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
//ClassLoader classLoader = WordCount.class.getClassLoader();
//URL resource = classLoader.getResource("ex1.html");
//final String filename = URLDecoder.decode(resource.getFile(), "UTF-8");
final String filename = URLDecoder.decode("file:/C:/git/flink/readFileTest/target/classes/ex1.html", "UTF-8");
Path filePath = new Path(filename);
TextInputFormat inp = new TextInputFormat(filePath);
inp.setCharsetName("UTF-8");
inp.setDelimiter("</ARXIVFILESPLIT>");
final DataSource<String> source = env.readFile(inp, filename);
// DataSet<Tuple2<String, Integer>> counts =
// // split up the lines in pairs (2-tuples) containing: (word,1)
// source.flatMap(new LineSplitter())
// // group by the tuple field "0" and sum up tuple field "1"
// .groupBy(0)
// .sum(1);
// execute and print result
//counts.print();
source.writeAsText("test", FileSystem.WriteMode.OVERWRITE);
env.execute();
}
开发者ID:ag-gipp,项目名称:mathosphere,代码行数:29,代码来源:WordCount.java
示例11: testProgram
import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
@Override
protected void testProgram() throws Exception {
/*
* This test checks the interplay between the monitor and the reader
* and also the failExternally() functionality. To test the latter we
* set the parallelism to 1 so that we have the chaining between the sink,
* which throws the SuccessException to signal the end of the test, and the
* reader.
* */
FileCreator fileCreator = new FileCreator(INTERVAL);
Thread t = new Thread(fileCreator);
t.start();
TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
format.setFilePath(hdfsURI);
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
format.setFilesFilter(FilePathFilter.createDefaultFilter());
ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, hdfsURI,
FileProcessingMode.PROCESS_CONTINUOUSLY,
env.getParallelism(), INTERVAL);
TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
TestingSinkFunction sink = new TestingSinkFunction();
DataStream<FileInputSplit> splits = env.addSource(monitoringFunction);
splits.transform("FileSplitReader", typeInfo, reader).addSink(sink).setParallelism(1);
env.execute();
} catch (Exception e) {
Throwable th = e;
int depth = 0;
for (; depth < 20; depth++) {
if (th instanceof SuccessException) {
try {
postSubmit();
} catch (Exception e1) {
e1.printStackTrace();
}
return;
} else if (th.getCause() != null) {
th = th.getCause();
} else {
break;
}
}
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:59,代码来源:ContinuousFileMonitoringFunctionITCase.java
示例12: writeMonitoringSourceSnapshot
import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
/**
* Manually run this to write binary snapshot data. Remove @Ignore to run.
*/
@Ignore
@Test
public void writeMonitoringSourceSnapshot() throws Exception {
File testFolder = tempFolder.newFolder();
long fileModTime = Long.MIN_VALUE;
for (int i = 0; i < 1; i++) {
Tuple2<File, String> file = createFileAndFillWithData(testFolder, "file", i, "This is test line.");
fileModTime = file.f0.lastModified();
}
TextInputFormat format = new TextInputFormat(new Path(testFolder.getAbsolutePath()));
final ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
new StreamSource<>(monitoringFunction);
final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
testHarness.open();
final Throwable[] error = new Throwable[1];
final OneShotLatch latch = new OneShotLatch();
// run the source asynchronously
Thread runner = new Thread() {
@Override
public void run() {
try {
monitoringFunction.run(new DummySourceContext() {
@Override
public void collect(TimestampedFileInputSplit element) {
latch.trigger();
}
@Override
public void markAsTemporarilyIdle() {
}
});
}
catch (Throwable t) {
t.printStackTrace();
error[0] = t;
}
}
};
runner.start();
if (!latch.isTriggered()) {
latch.await();
}
final OperatorStateHandles snapshot;
synchronized (testHarness.getCheckpointLock()) {
snapshot = testHarness.snapshot(0L, 0L);
}
OperatorSnapshotUtil.writeStateHandle(
snapshot,
"src/test/resources/monitoring-function-migration-test-" + fileModTime + "-flink" + flinkGenerateSavepointVersion + "-snapshot");
monitoringFunction.cancel();
runner.join();
testHarness.close();
}
开发者ID:axbaretto,项目名称:flink,代码行数:76,代码来源:ContinuousFileProcessingMigrationTest.java
示例13: testFunctionRestore
import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
@Test
public void testFunctionRestore() throws Exception {
/*
org.apache.hadoop.fs.Path path = null;
long fileModTime = Long.MIN_VALUE;
for (int i = 0; i < 1; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
path = file.f0;
fileModTime = hdfs.getFileStatus(file.f0).getModificationTime();
}
TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
final ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, format.getFilePath().toString(), new PathFilter(), FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
StreamSource<FileInputSplit, ContinuousFileMonitoringFunction<String>> src =
new StreamSource<>(monitoringFunction);
final OneInputStreamOperatorTestHarness<Void, FileInputSplit> testHarness =
new OneInputStreamOperatorTestHarness<>(src);
testHarness.open();
final Throwable[] error = new Throwable[1];
final OneShotLatch latch = new OneShotLatch();
// run the source asynchronously
Thread runner = new Thread() {
@Override
public void run() {
try {
monitoringFunction.run(new DummySourceContext() {
@Override
public void collect(FileInputSplit element) {
latch.trigger();
}
});
}
catch (Throwable t) {
t.printStackTrace();
error[0] = t;
}
}
};
runner.start();
if (!latch.isTriggered()) {
latch.await();
}
StreamTaskState snapshot = testHarness.snapshot(0, 0);
testHarness.snaphotToFile(snapshot, "src/test/resources/monitoring-function-migration-test-" + fileModTime +"-flink1.1-snapshot");
monitoringFunction.cancel();
runner.join();
testHarness.close();
*/
Long expectedModTime = Long.parseLong("1482144479339");
TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
final ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
new StreamSource<>(monitoringFunction);
final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
testHarness.setup();
testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename("monitoring-function-migration-test-1482144479339-flink1.1-snapshot"));
testHarness.open();
Assert.assertEquals((long) expectedModTime, monitoringFunction.getGlobalModificationTime());
}
开发者ID:axbaretto,项目名称:flink,代码行数:79,代码来源:ContinuousFileProcessingFrom11MigrationTest.java
示例14: writeMonitoringSourceSnapshot
import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
/**
* Manually run this to write binary snapshot data. Remove @Ignore to run.
*/
@Ignore
@Test
public void writeMonitoringSourceSnapshot() throws Exception {
File testFolder = tempFolder.newFolder();
long fileModTime = Long.MIN_VALUE;
for (int i = 0; i < 1; i++) {
Tuple2<File, String> file = createFileAndFillWithData(testFolder, "file", i, "This is test line.");
fileModTime = file.f0.lastModified();
}
TextInputFormat format = new TextInputFormat(new Path(testFolder.getAbsolutePath()));
final ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
new StreamSource<>(monitoringFunction);
final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
testHarness.open();
final Throwable[] error = new Throwable[1];
final OneShotLatch latch = new OneShotLatch();
// run the source asynchronously
Thread runner = new Thread() {
@Override
public void run() {
try {
monitoringFunction.run(new DummySourceContext() {
@Override
public void collect(TimestampedFileInputSplit element) {
latch.trigger();
}
@Override
public void markAsTemporarilyIdle() {
}
});
}
catch (Throwable t) {
t.printStackTrace();
error[0] = t;
}
}
};
runner.start();
if (!latch.isTriggered()) {
latch.await();
}
final OperatorStateHandles snapshot;
synchronized (testHarness.getCheckpointLock()) {
snapshot = testHarness.snapshot(0L, 0L);
}
OperatorSnapshotUtil.writeStateHandle(
snapshot,
"src/test/resources/monitoring-function-migration-test-" + fileModTime + "-flink1.2-snapshot");
monitoringFunction.cancel();
runner.join();
testHarness.close();
}
开发者ID:axbaretto,项目名称:flink,代码行数:76,代码来源:ContinuousFileProcessingFrom12MigrationTest.java
示例15: testProcessOnce
import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
@Test
public void testProcessOnce() throws Exception {
String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
final OneShotLatch latch = new OneShotLatch();
// create a single file in the directory
Tuple2<org.apache.hadoop.fs.Path, String> bootstrap =
createFileAndFillWithData(testBasePath, "file", NO_OF_FILES + 1, "This is test line.");
Assert.assertTrue(hdfs.exists(bootstrap.f0));
// the source is supposed to read only this file.
final Set<String> filesToBeRead = new TreeSet<>();
filesToBeRead.add(bootstrap.f0.getName());
TextInputFormat format = new TextInputFormat(new Path(testBasePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
final ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
final FileVerifyingSourceContext context = new FileVerifyingSourceContext(latch, monitoringFunction);
final Thread t = new Thread() {
@Override
public void run() {
try {
monitoringFunction.open(new Configuration());
monitoringFunction.run(context);
// we would never arrive here if we were in
// PROCESS_CONTINUOUSLY mode.
// this will trigger the latch
context.close();
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
};
t.start();
if (!latch.isTriggered()) {
latch.await();
}
// create some additional files that should be processed in the case of PROCESS_CONTINUOUSLY
final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES];
for (int i = 0; i < NO_OF_FILES; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> ignoredFile =
createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
filesCreated[i] = ignoredFile.f0;
}
// wait until the monitoring thread exits
t.join();
Assert.assertArrayEquals(filesToBeRead.toArray(), context.getSeenFiles().toArray());
// finally delete the files created for the test.
hdfs.delete(bootstrap.f0, false);
for (org.apache.hadoop.fs.Path path: filesCreated) {
hdfs.delete(path, false);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:68,代码来源:ContinuousFileProcessingTest.java
示例16: testFunctionRestore
import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
@Test
public void testFunctionRestore() throws Exception {
String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
org.apache.hadoop.fs.Path path = null;
long fileModTime = Long.MIN_VALUE;
for (int i = 0; i < 1; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
path = file.f0;
fileModTime = hdfs.getFileStatus(file.f0).getModificationTime();
}
TextInputFormat format = new TextInputFormat(new Path(testBasePath));
final ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
new StreamSource<>(monitoringFunction);
final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
testHarness.open();
final Throwable[] error = new Throwable[1];
final OneShotLatch latch = new OneShotLatch();
final DummySourceContext sourceContext = new DummySourceContext() {
@Override
public void collect(TimestampedFileInputSplit element) {
latch.trigger();
}
};
// run the source asynchronously
Thread runner = new Thread() {
@Override
public void run() {
try {
monitoringFunction.run(sourceContext);
}
catch (Throwable t) {
t.printStackTrace();
error[0] = t;
}
}
};
runner.start();
// first condition for the source to have updated its state: emit at least one element
if (!latch.isTriggered()) {
latch.await();
}
// second condition for the source to have updated its state: it's not on the lock anymore,
// this means it has processed all the splits and updated its state.
synchronized (sourceContext.getCheckpointLock()) {}
OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
monitoringFunction.cancel();
runner.join();
testHarness.close();
final ContinuousFileMonitoringFunction<String> monitoringFunctionCopy =
new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> srcCopy =
new StreamSource<>(monitoringFunctionCopy);
AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarnessCopy =
new AbstractStreamOperatorTestHarness<>(srcCopy, 1, 1, 0);
testHarnessCopy.initializeState(snapshot);
testHarnessCopy.open();
Assert.assertNull(error[0]);
Assert.assertEquals(fileModTime, monitoringFunctionCopy.getGlobalModificationTime());
hdfs.delete(path, false);
}
开发者ID:axbaretto,项目名称:flink,代码行数:82,代码来源:ContinuousFileProcessingTest.java
示例17: testProcessContinuously
import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
@Test
public void testProcessContinuously() throws Exception {
String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
final OneShotLatch latch = new OneShotLatch();
// create a single file in the directory
Tuple2<org.apache.hadoop.fs.Path, String> bootstrap =
createFileAndFillWithData(testBasePath, "file", NO_OF_FILES + 1, "This is test line.");
Assert.assertTrue(hdfs.exists(bootstrap.f0));
final Set<String> filesToBeRead = new TreeSet<>();
filesToBeRead.add(bootstrap.f0.getName());
TextInputFormat format = new TextInputFormat(new Path(testBasePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
final ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format,
FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
final int totalNoOfFilesToBeRead = NO_OF_FILES + 1; // 1 for the bootstrap + NO_OF_FILES
final FileVerifyingSourceContext context = new FileVerifyingSourceContext(latch,
monitoringFunction, 1, totalNoOfFilesToBeRead);
final Thread t = new Thread() {
@Override
public void run() {
try {
monitoringFunction.open(new Configuration());
monitoringFunction.run(context);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
};
t.start();
if (!latch.isTriggered()) {
latch.await();
}
// create some additional files that will be processed in the case of PROCESS_CONTINUOUSLY
final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES];
for (int i = 0; i < NO_OF_FILES; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> file =
createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
filesCreated[i] = file.f0;
filesToBeRead.add(file.f0.getName());
}
// wait until the monitoring thread exits
t.join();
Assert.assertArrayEquals(filesToBeRead.toArray(), context.getSeenFiles().toArray());
// finally delete the files created for the test.
hdfs.delete(bootstrap.f0, false);
for (org.apache.hadoop.fs.Path path: filesCreated) {
hdfs.delete(path, false);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:64,代码来源:ContinuousFileProcessingTest.java
示例18: getSourceNode
import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
private static DataSourceNode getSourceNode() {
return new DataSourceNode(new GenericDataSourceBase<String, TextInputFormat>(new TextInputFormat(new Path("/")), new OperatorInformation<String>(BasicTypeInfo.STRING_TYPE_INFO)));
}
开发者ID:axbaretto,项目名称:flink,代码行数:4,代码来源:FeedbackPropertiesMatchTest.java
示例19: getSourceNode
import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
private static final DataSourceNode getSourceNode() {
return new DataSourceNode(new GenericDataSourceBase<String, TextInputFormat>(
new TextInputFormat(new Path("/ignored")),
new OperatorInformation<String>(BasicTypeInfo.STRING_TYPE_INFO),
"source"));
}
开发者ID:axbaretto,项目名称:flink,代码行数:7,代码来源:ChannelTest.java
示例20: getSourceNode
import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
private static final DataSourceNode getSourceNode() {
return new DataSourceNode(new GenericDataSourceBase<String, TextInputFormat>(new TextInputFormat(new Path("/")), new OperatorInformation<String>(BasicTypeInfo.STRING_TYPE_INFO)));
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:4,代码来源:FeedbackPropertiesMatchTest.java
注:本文中的org.apache.flink.api.java.io.TextInputFormat类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论