• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java TextInputFormat类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.api.java.io.TextInputFormat的典型用法代码示例。如果您正苦于以下问题:Java TextInputFormat类的具体用法?Java TextInputFormat怎么用?Java TextInputFormat使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



TextInputFormat类属于org.apache.flink.api.java.io包,在下文中一共展示了TextInputFormat类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: main

import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
public static void main(String... args) throws  Exception {
    File txtFile = new File("/tmp/test/file.txt");
    File csvFile = new File("/tmp/test/file.csv");
    File binFile = new File("/tmp/test/file.bin");

    writeToFile(txtFile, "txt");
    writeToFile(csvFile, "csv");
    writeToFile(binFile, "bin");

    final ExecutionEnvironment env =
            ExecutionEnvironment.getExecutionEnvironment();
    final TextInputFormat format = new TextInputFormat(new Path("/tmp/test"));

    GlobFilePathFilter filesFilter = new GlobFilePathFilter(
            Collections.singletonList("**"),
            Arrays.asList("**/file.bin")
    );
    System.out.println(Arrays.toString(GlobFilePathFilter.class.getDeclaredFields()));
    format.setFilesFilter(filesFilter);

    DataSet<String> result = env.readFile(format, "/tmp");
    result.writeAsText("/temp/out");
    env.execute("GlobFilePathFilter-Test");
}
 
开发者ID:mushketyk,项目名称:flink-examples,代码行数:25,代码来源:GlobExample.java


示例2: testInvalidPathSpecification

import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
@Test
public void testInvalidPathSpecification() throws Exception {

	String invalidPath = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() + "/invalid/";
	TextInputFormat format = new TextInputFormat(new Path(invalidPath));

	ContinuousFileMonitoringFunction<String> monitoringFunction =
		new ContinuousFileMonitoringFunction<>(format,
			FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
	try {
		monitoringFunction.run(new DummySourceContext() {
			@Override
			public void collect(TimestampedFileInputSplit element) {
				// we should never arrive here with an invalid path
				Assert.fail("Test passes with an invalid path.");
			}
		});

		// we should never arrive here with an invalid path
		Assert.fail("Test passed with an invalid path.");

	} catch (FileNotFoundException e) {
		Assert.assertEquals("The provided file path " + format.getFilePath() + " does not exist.", e.getMessage());
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:ContinuousFileProcessingTest.java


示例3: testProgram

import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
@Override
public void testProgram(StreamExecutionEnvironment env) {

	// set the restart strategy.
	env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(NO_OF_RETRIES, 0));
	env.enableCheckpointing(10);

	// create and start the file creating thread.
	fc = new FileCreator();
	fc.start();

	// create the monitoring source along with the necessary readers.
	TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI));
	format.setFilesFilter(FilePathFilter.createDefaultFilter());

	DataStream<String> inputStream = env.readFile(format, localFsURI,
		FileProcessingMode.PROCESS_CONTINUOUSLY, INTERVAL);

	TestingSinkFunction sink = new TestingSinkFunction();

	inputStream.flatMap(new FlatMapFunction<String, String>() {
		@Override
		public void flatMap(String value, Collector<String> out) throws Exception {
			out.collect(value);
		}
	}).addSink(sink).setParallelism(1);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:28,代码来源:ContinuousFileProcessingCheckpointITCase.java


示例4: testSortingOnModTime

import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
@Test
public void testSortingOnModTime() throws Exception {
	String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";

	final long[] modTimes = new long[NO_OF_FILES];
	final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES];

	for (int i = 0; i < NO_OF_FILES; i++) {
		Tuple2<org.apache.hadoop.fs.Path, String> file =
			createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
		Thread.sleep(400);

		filesCreated[i] = file.f0;
		modTimes[i] = hdfs.getFileStatus(file.f0).getModificationTime();
	}

	TextInputFormat format = new TextInputFormat(new Path(testBasePath));
	format.setFilesFilter(FilePathFilter.createDefaultFilter());

	// this is just to verify that all splits have been forwarded later.
	FileInputSplit[] splits = format.createInputSplits(1);

	ContinuousFileMonitoringFunction<String> monitoringFunction =
		new ContinuousFileMonitoringFunction<>(format,
			FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);

	ModTimeVerifyingSourceContext context = new ModTimeVerifyingSourceContext(modTimes);

	monitoringFunction.open(new Configuration());
	monitoringFunction.run(context);
	Assert.assertEquals(splits.length, context.getCounter());

	// delete the created files.
	for (int i = 0; i < NO_OF_FILES; i++) {
		hdfs.delete(filesCreated[i], false);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:38,代码来源:ContinuousFileProcessingTest.java


示例5: readWikiDump

import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
public static DataSource<String> readWikiDump(FlinkMlpCommandConfig config, ExecutionEnvironment env) {
    Path filePath = new Path(config.getDataset());
    TextInputFormat inp = new TextInputFormat(filePath);
    inp.setCharsetName("UTF-8");
    inp.setDelimiter("</page>");
    return env.readFile(inp, config.getDataset());
}
 
开发者ID:ag-gipp,项目名称:mathosphere,代码行数:8,代码来源:FlinkMlpRelationFinder.java


示例6: main

import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
//        FlinkPdCommandConfig config = FlinkPdCommandConfig.from(args);
//        run(config);
//        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//        final String filename = URLDecoder.decode("file:/C:/git/flink/readFileTest/target/classes/ex1.html", "UTF-8");
//        Path filePath = new Path(filename);
//        TextInputFormat inp = new TextInputFormat(filePath);
//        inp.setCharsetName("UTF-8");
//        inp.setDelimiter("</ARXIVFILESPLIT>");
//        final DataSource<String> source = env.readFile(inp, filename);
//        source.writeAsText("test", org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);
//        env.execute();
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // get input data
        ClassLoader classLoader = WordCount.class.getClassLoader();
        URL resource = classLoader.getResource("com/formulasearchengine/mathosphere/mathpd/ex1.html");
        final String filename = URLDecoder.decode(resource.getFile(), "UTF-8");
        //final String filename = URLDecoder.decode("file:/C:/git/flink/readFileTest/target/classes/ex1.html", "UTF-8");
        Path filePath = new Path(filename);
        TextInputFormat inp = new TextInputFormat(filePath);
        inp.setCharsetName("UTF-8");
        inp.setDelimiter("</ARXIVFILESPLIT>");
        final DataSource<String> source = env.readFile(inp, filename);
//		DataSet<Tuple2<String, Integer>> counts =
//				// split up the lines in pairs (2-tuples) containing: (word,1)
//				source.flatMap(new LineSplitter())
//				// group by the tuple field "0" and sum up tuple field "1"
//				.groupBy(0)
//				.sum(1);

        // execute and print result
        //counts.print();
        source.writeAsText("test", FileSystem.WriteMode.OVERWRITE);
        env.execute();
    }
 
开发者ID:ag-gipp,项目名称:mathosphere,代码行数:37,代码来源:FlinkPd.java


示例7: readWikiDump

import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
public static DataSource<String> readWikiDump(FlinkPdCommandConfig config, ExecutionEnvironment env) {
    Path filePath = new Path(config.getDataset());
    TextInputFormat inp = new TextInputFormat(filePath);
    inp.setCharsetName("UTF-8");
    inp.setDelimiter("</ARXIVFILESPLIT>");
    return env.readFile(inp, config.getDataset());
}
 
开发者ID:ag-gipp,项目名称:mathosphere,代码行数:8,代码来源:FlinkPd.java


示例8: readRefs

import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
public static DataSource<String> readRefs(FlinkPdCommandConfig config, ExecutionEnvironment env) {
    Path filePath = new Path(config.getRef());
    TextInputFormat inp = new TextInputFormat(filePath);
    inp.setCharsetName("UTF-8");
    inp.setDelimiter("</ARXIVFILESPLIT>");
    return env.readFile(inp, config.getRef());
}
 
开发者ID:ag-gipp,项目名称:mathosphere,代码行数:8,代码来源:FlinkPd.java


示例9: readPreprocessedFile

import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
public static DataSource<String> readPreprocessedFile(String pathname, ExecutionEnvironment env) {
    Path filePath = new Path(pathname);
    TextInputFormat inp = new TextInputFormat(filePath);
    inp.setCharsetName("UTF-8");
    // env.read
    return env.readFile(inp, pathname);
}
 
开发者ID:ag-gipp,项目名称:mathosphere,代码行数:8,代码来源:FlinkPd.java


示例10: main

import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
public static void main(String[] args) throws Exception {

		// set up the execution environment
		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		// get input data
		//ClassLoader classLoader = WordCount.class.getClassLoader();
		//URL resource = classLoader.getResource("ex1.html");
		//final String filename = URLDecoder.decode(resource.getFile(), "UTF-8");
		final String filename = URLDecoder.decode("file:/C:/git/flink/readFileTest/target/classes/ex1.html", "UTF-8");
		Path filePath = new Path(filename);
		TextInputFormat inp = new TextInputFormat(filePath);
		inp.setCharsetName("UTF-8");
		inp.setDelimiter("</ARXIVFILESPLIT>");
		final DataSource<String> source = env.readFile(inp, filename);
//		DataSet<Tuple2<String, Integer>> counts =
//				// split up the lines in pairs (2-tuples) containing: (word,1)
//				source.flatMap(new LineSplitter())
//				// group by the tuple field "0" and sum up tuple field "1"
//				.groupBy(0)
//				.sum(1);

		// execute and print result
		//counts.print();
		source.writeAsText("test", FileSystem.WriteMode.OVERWRITE);
		env.execute();

	}
 
开发者ID:ag-gipp,项目名称:mathosphere,代码行数:29,代码来源:WordCount.java


示例11: testProgram

import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
@Override
protected void testProgram() throws Exception {

	/*
	* This test checks the interplay between the monitor and the reader
	* and also the failExternally() functionality. To test the latter we
	* set the parallelism to 1 so that we have the chaining between the sink,
	* which throws the SuccessException to signal the end of the test, and the
	* reader.
	* */

	FileCreator fileCreator = new FileCreator(INTERVAL);
	Thread t = new Thread(fileCreator);
	t.start();

	TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
	format.setFilePath(hdfsURI);

	try {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(4);

		format.setFilesFilter(FilePathFilter.createDefaultFilter());
		ContinuousFileMonitoringFunction<String> monitoringFunction =
			new ContinuousFileMonitoringFunction<>(format, hdfsURI,
				FileProcessingMode.PROCESS_CONTINUOUSLY,
				env.getParallelism(), INTERVAL);

		TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
		ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
		TestingSinkFunction sink = new TestingSinkFunction();

		DataStream<FileInputSplit> splits = env.addSource(monitoringFunction);
		splits.transform("FileSplitReader", typeInfo, reader).addSink(sink).setParallelism(1);
		env.execute();

	} catch (Exception e) {
		Throwable th = e;
		int depth = 0;

		for (; depth < 20; depth++) {
			if (th instanceof SuccessException) {
				try {
					postSubmit();
				} catch (Exception e1) {
					e1.printStackTrace();
				}
				return;
			} else if (th.getCause() != null) {
				th = th.getCause();
			} else {
				break;
			}
		}
		e.printStackTrace();
		Assert.fail(e.getMessage());
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:59,代码来源:ContinuousFileMonitoringFunctionITCase.java


示例12: writeMonitoringSourceSnapshot

import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
/**
 * Manually run this to write binary snapshot data. Remove @Ignore to run.
 */
@Ignore
@Test
public void writeMonitoringSourceSnapshot() throws Exception {

	File testFolder = tempFolder.newFolder();

	long fileModTime = Long.MIN_VALUE;
	for (int i = 0; i < 1; i++) {
		Tuple2<File, String> file = createFileAndFillWithData(testFolder, "file", i, "This is test line.");
		fileModTime = file.f0.lastModified();
	}

	TextInputFormat format = new TextInputFormat(new Path(testFolder.getAbsolutePath()));

	final ContinuousFileMonitoringFunction<String> monitoringFunction =
		new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);

	StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
		new StreamSource<>(monitoringFunction);

	final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
			new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);

	testHarness.open();

	final Throwable[] error = new Throwable[1];

	final OneShotLatch latch = new OneShotLatch();

	// run the source asynchronously
	Thread runner = new Thread() {
		@Override
		public void run() {
			try {
				monitoringFunction.run(new DummySourceContext() {
					@Override
					public void collect(TimestampedFileInputSplit element) {
						latch.trigger();
					}

					@Override
					public void markAsTemporarilyIdle() {

					}
				});
			}
			catch (Throwable t) {
				t.printStackTrace();
				error[0] = t;
			}
		}
	};
	runner.start();

	if (!latch.isTriggered()) {
		latch.await();
	}

	final OperatorStateHandles snapshot;
	synchronized (testHarness.getCheckpointLock()) {
		snapshot = testHarness.snapshot(0L, 0L);
	}

	OperatorSnapshotUtil.writeStateHandle(
			snapshot,
			"src/test/resources/monitoring-function-migration-test-" + fileModTime + "-flink" + flinkGenerateSavepointVersion + "-snapshot");

	monitoringFunction.cancel();
	runner.join();

	testHarness.close();
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:76,代码来源:ContinuousFileProcessingMigrationTest.java


示例13: testFunctionRestore

import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
@Test
public void testFunctionRestore() throws Exception {

	/*
	org.apache.hadoop.fs.Path path = null;
	long fileModTime = Long.MIN_VALUE;
	for (int i = 0; i < 1; i++) {
		Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
		path = file.f0;
		fileModTime = hdfs.getFileStatus(file.f0).getModificationTime();
	}

	TextInputFormat format = new TextInputFormat(new Path(hdfsURI));

	final ContinuousFileMonitoringFunction<String> monitoringFunction =
		new ContinuousFileMonitoringFunction<>(format, format.getFilePath().toString(), new PathFilter(), FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);

	StreamSource<FileInputSplit, ContinuousFileMonitoringFunction<String>> src =
		new StreamSource<>(monitoringFunction);

	final OneInputStreamOperatorTestHarness<Void, FileInputSplit> testHarness =
		new OneInputStreamOperatorTestHarness<>(src);
	testHarness.open();

	final Throwable[] error = new Throwable[1];

	final OneShotLatch latch = new OneShotLatch();

	// run the source asynchronously
	Thread runner = new Thread() {
		@Override
		public void run() {
			try {
				monitoringFunction.run(new DummySourceContext() {
					@Override
					public void collect(FileInputSplit element) {
						latch.trigger();
					}
				});
			}
			catch (Throwable t) {
				t.printStackTrace();
				error[0] = t;
			}
		}
	};
	runner.start();

	if (!latch.isTriggered()) {
		latch.await();
	}

	StreamTaskState snapshot = testHarness.snapshot(0, 0);
	testHarness.snaphotToFile(snapshot, "src/test/resources/monitoring-function-migration-test-" + fileModTime +"-flink1.1-snapshot");
	monitoringFunction.cancel();
	runner.join();

	testHarness.close();
	*/

	Long expectedModTime = Long.parseLong("1482144479339");
	TextInputFormat format = new TextInputFormat(new Path(hdfsURI));

	final ContinuousFileMonitoringFunction<String> monitoringFunction =
		new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);

	StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
		new StreamSource<>(monitoringFunction);

	final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
		new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
	testHarness.setup();
	testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename("monitoring-function-migration-test-1482144479339-flink1.1-snapshot"));
	testHarness.open();

	Assert.assertEquals((long) expectedModTime, monitoringFunction.getGlobalModificationTime());

}
 
开发者ID:axbaretto,项目名称:flink,代码行数:79,代码来源:ContinuousFileProcessingFrom11MigrationTest.java


示例14: writeMonitoringSourceSnapshot

import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
/**
 * Manually run this to write binary snapshot data. Remove @Ignore to run.
 */
@Ignore
@Test
public void writeMonitoringSourceSnapshot() throws Exception {

	File testFolder = tempFolder.newFolder();

	long fileModTime = Long.MIN_VALUE;
	for (int i = 0; i < 1; i++) {
		Tuple2<File, String> file = createFileAndFillWithData(testFolder, "file", i, "This is test line.");
		fileModTime = file.f0.lastModified();
	}

	TextInputFormat format = new TextInputFormat(new Path(testFolder.getAbsolutePath()));

	final ContinuousFileMonitoringFunction<String> monitoringFunction =
		new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);

	StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
		new StreamSource<>(monitoringFunction);

	final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
			new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);

	testHarness.open();

	final Throwable[] error = new Throwable[1];

	final OneShotLatch latch = new OneShotLatch();

	// run the source asynchronously
	Thread runner = new Thread() {
		@Override
		public void run() {
			try {
				monitoringFunction.run(new DummySourceContext() {
					@Override
					public void collect(TimestampedFileInputSplit element) {
						latch.trigger();
					}

					@Override
					public void markAsTemporarilyIdle() {

					}
				});
			}
			catch (Throwable t) {
				t.printStackTrace();
				error[0] = t;
			}
		}
	};
	runner.start();

	if (!latch.isTriggered()) {
		latch.await();
	}

	final OperatorStateHandles snapshot;
	synchronized (testHarness.getCheckpointLock()) {
		snapshot = testHarness.snapshot(0L, 0L);
	}

	OperatorSnapshotUtil.writeStateHandle(
			snapshot,
			"src/test/resources/monitoring-function-migration-test-" + fileModTime + "-flink1.2-snapshot");

	monitoringFunction.cancel();
	runner.join();

	testHarness.close();
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:76,代码来源:ContinuousFileProcessingFrom12MigrationTest.java


示例15: testProcessOnce

import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
@Test
public void testProcessOnce() throws Exception {
	String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";

	final OneShotLatch latch = new OneShotLatch();

	// create a single file in the directory
	Tuple2<org.apache.hadoop.fs.Path, String> bootstrap =
		createFileAndFillWithData(testBasePath, "file", NO_OF_FILES + 1, "This is test line.");
	Assert.assertTrue(hdfs.exists(bootstrap.f0));

	// the source is supposed to read only this file.
	final Set<String> filesToBeRead = new TreeSet<>();
	filesToBeRead.add(bootstrap.f0.getName());

	TextInputFormat format = new TextInputFormat(new Path(testBasePath));
	format.setFilesFilter(FilePathFilter.createDefaultFilter());

	final ContinuousFileMonitoringFunction<String> monitoringFunction =
		new ContinuousFileMonitoringFunction<>(format,
			FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);

	final FileVerifyingSourceContext context = new FileVerifyingSourceContext(latch, monitoringFunction);

	final Thread t = new Thread() {
		@Override
		public void run() {
			try {
				monitoringFunction.open(new Configuration());
				monitoringFunction.run(context);

				// we would never arrive here if we were in
				// PROCESS_CONTINUOUSLY mode.

				// this will trigger the latch
				context.close();

			} catch (Exception e) {
				Assert.fail(e.getMessage());
			}
		}
	};
	t.start();

	if (!latch.isTriggered()) {
		latch.await();
	}

	// create some additional files that should be processed in the case of PROCESS_CONTINUOUSLY
	final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES];
	for (int i = 0; i < NO_OF_FILES; i++) {
		Tuple2<org.apache.hadoop.fs.Path, String> ignoredFile =
			createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
		filesCreated[i] = ignoredFile.f0;
	}

	// wait until the monitoring thread exits
	t.join();

	Assert.assertArrayEquals(filesToBeRead.toArray(), context.getSeenFiles().toArray());

	// finally delete the files created for the test.
	hdfs.delete(bootstrap.f0, false);
	for (org.apache.hadoop.fs.Path path: filesCreated) {
		hdfs.delete(path, false);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:68,代码来源:ContinuousFileProcessingTest.java


示例16: testFunctionRestore

import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
@Test
public void testFunctionRestore() throws Exception {
	String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";

	org.apache.hadoop.fs.Path path = null;
	long fileModTime = Long.MIN_VALUE;
	for (int i = 0; i < 1; i++) {
		Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
		path = file.f0;
		fileModTime = hdfs.getFileStatus(file.f0).getModificationTime();
	}

	TextInputFormat format = new TextInputFormat(new Path(testBasePath));

	final ContinuousFileMonitoringFunction<String> monitoringFunction =
		new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);

	StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
		new StreamSource<>(monitoringFunction);

	final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
		new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
	testHarness.open();

	final Throwable[] error = new Throwable[1];

	final OneShotLatch latch = new OneShotLatch();

	final DummySourceContext sourceContext = new DummySourceContext() {
		@Override
		public void collect(TimestampedFileInputSplit element) {
			latch.trigger();
		}
	};

	// run the source asynchronously
	Thread runner = new Thread() {
		@Override
		public void run() {
			try {
				monitoringFunction.run(sourceContext);
			}
			catch (Throwable t) {
				t.printStackTrace();
				error[0] = t;
			}
		}
	};
	runner.start();

	// first condition for the source to have updated its state: emit at least one element
	if (!latch.isTriggered()) {
		latch.await();
	}

	// second condition for the source to have updated its state: it's not on the lock anymore,
	// this means it has processed all the splits and updated its state.
	synchronized (sourceContext.getCheckpointLock()) {}

	OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
	monitoringFunction.cancel();
	runner.join();

	testHarness.close();

	final ContinuousFileMonitoringFunction<String> monitoringFunctionCopy =
		new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);

	StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> srcCopy =
		new StreamSource<>(monitoringFunctionCopy);

	AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarnessCopy =
		new AbstractStreamOperatorTestHarness<>(srcCopy, 1, 1, 0);
	testHarnessCopy.initializeState(snapshot);
	testHarnessCopy.open();

	Assert.assertNull(error[0]);
	Assert.assertEquals(fileModTime, monitoringFunctionCopy.getGlobalModificationTime());

	hdfs.delete(path, false);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:82,代码来源:ContinuousFileProcessingTest.java


示例17: testProcessContinuously

import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
@Test
public void testProcessContinuously() throws Exception {
	String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";

	final OneShotLatch latch = new OneShotLatch();

	// create a single file in the directory
	Tuple2<org.apache.hadoop.fs.Path, String> bootstrap =
		createFileAndFillWithData(testBasePath, "file", NO_OF_FILES + 1, "This is test line.");
	Assert.assertTrue(hdfs.exists(bootstrap.f0));

	final Set<String> filesToBeRead = new TreeSet<>();
	filesToBeRead.add(bootstrap.f0.getName());

	TextInputFormat format = new TextInputFormat(new Path(testBasePath));
	format.setFilesFilter(FilePathFilter.createDefaultFilter());

	final ContinuousFileMonitoringFunction<String> monitoringFunction =
		new ContinuousFileMonitoringFunction<>(format,
			FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);

	final int totalNoOfFilesToBeRead = NO_OF_FILES + 1; // 1 for the bootstrap + NO_OF_FILES
	final FileVerifyingSourceContext context = new FileVerifyingSourceContext(latch,
		monitoringFunction, 1, totalNoOfFilesToBeRead);

	final Thread t = new Thread() {

		@Override
		public void run() {
			try {
				monitoringFunction.open(new Configuration());
				monitoringFunction.run(context);
			} catch (Exception e) {
				Assert.fail(e.getMessage());
			}
		}
	};
	t.start();

	if (!latch.isTriggered()) {
		latch.await();
	}

	// create some additional files that will be processed in the case of PROCESS_CONTINUOUSLY
	final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES];
	for (int i = 0; i < NO_OF_FILES; i++) {
		Tuple2<org.apache.hadoop.fs.Path, String> file =
			createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
		filesCreated[i] = file.f0;
		filesToBeRead.add(file.f0.getName());
	}

	// wait until the monitoring thread exits
	t.join();

	Assert.assertArrayEquals(filesToBeRead.toArray(), context.getSeenFiles().toArray());

	// finally delete the files created for the test.
	hdfs.delete(bootstrap.f0, false);
	for (org.apache.hadoop.fs.Path path: filesCreated) {
		hdfs.delete(path, false);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:64,代码来源:ContinuousFileProcessingTest.java


示例18: getSourceNode

import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
private static DataSourceNode getSourceNode() {
	return new DataSourceNode(new GenericDataSourceBase<String, TextInputFormat>(new TextInputFormat(new Path("/")), new OperatorInformation<String>(BasicTypeInfo.STRING_TYPE_INFO)));
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:4,代码来源:FeedbackPropertiesMatchTest.java


示例19: getSourceNode

import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
private static final DataSourceNode getSourceNode() {
	return new DataSourceNode(new GenericDataSourceBase<String, TextInputFormat>(
			new TextInputFormat(new Path("/ignored")), 
			new OperatorInformation<String>(BasicTypeInfo.STRING_TYPE_INFO),
			"source"));
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:7,代码来源:ChannelTest.java


示例20: getSourceNode

import org.apache.flink.api.java.io.TextInputFormat; //导入依赖的package包/类
private static final DataSourceNode getSourceNode() {
	return new DataSourceNode(new GenericDataSourceBase<String, TextInputFormat>(new TextInputFormat(new Path("/")), new OperatorInformation<String>(BasicTypeInfo.STRING_TYPE_INFO)));
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:4,代码来源:FeedbackPropertiesMatchTest.java



注:本文中的org.apache.flink.api.java.io.TextInputFormat类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java PDAnnotationLink类代码示例发布时间:2022-05-23
下一篇:
Java Cas20ProxyHandler类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap