• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java ConfigConstants类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.configuration.ConfigConstants的典型用法代码示例。如果您正苦于以下问题:Java ConfigConstants类的具体用法?Java ConfigConstants怎么用?Java ConfigConstants使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



ConfigConstants类属于org.apache.flink.configuration包,在下文中一共展示了ConfigConstants类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: testGetPos

import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@Test
public void testGetPos() throws Exception {
	FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
			new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 31, 17);

	for (int i = 0; i < 64; ++i) {
		Assert.assertEquals(i, stream.getPos());
		stream.write(0x42);
	}

	stream.closeAndGetHandle();

	// ----------------------------------------------------

	stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 31, 17);

	byte[] data = "testme!".getBytes(ConfigConstants.DEFAULT_CHARSET);

	for (int i = 0; i < 7; ++i) {
		Assert.assertEquals(i * (1 + data.length), stream.getPos());
		stream.write(0x42);
		stream.write(data);
	}

	stream.closeAndGetHandle();
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:27,代码来源:FsCheckpointStateOutputStreamTest.java


示例2: createFileAndFillWithData

import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
/**
 * Create a file with pre-determined String format of the form:
 * {@code fileIdx +": "+ sampleLine +" "+ lineNo}.
 * */
private static Tuple2<org.apache.hadoop.fs.Path, String> createFileAndFillWithData(
			String base, String fileName, int fileIdx, String sampleLine) throws IOException {

	assert (hdfs != null);

	final String fileRandSuffix = UUID.randomUUID().toString();

	org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileRandSuffix);
	Assert.assertFalse(hdfs.exists(file));

	org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileRandSuffix);
	FSDataOutputStream stream = hdfs.create(tmp);
	StringBuilder str = new StringBuilder();
	for (int i = 0; i < LINES_PER_FILE; i++) {
		String line = fileIdx + ": " + sampleLine + " " + i + "\n";
		str.append(line);
		stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
	}
	stream.close();

	hdfs.rename(tmp, file);

	Assert.assertTrue("No result file present", hdfs.exists(file));
	return new Tuple2<>(file, str.toString());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:30,代码来源:ContinuousFileProcessingTest.java


示例3: readObject

import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
	in.defaultReadObject();

	byte codecByte = in.readByte();
	if (codecByte >= 0) {
		setCodec(Codec.forCodecByte(codecByte));
	}

	int length = in.readInt();
	if (length != 0) {
		byte[] json = new byte[length];
		in.readFully(json);

		Schema schema = new Schema.Parser().parse(new String(json, ConfigConstants.DEFAULT_CHARSET));
		setSchema(schema);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:AvroOutputFormat.java


示例4: main

import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
			.url("http://localhost:8080/nifi")
			.portName("Data from Flink")
			.buildConfig();

	DataStreamSink<String> dataStream = env.fromElements("one", "two", "three", "four", "five", "q")
			.addSink(new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<String>() {
				@Override
				public NiFiDataPacket createNiFiDataPacket(String s, RuntimeContext ctx) {
					return new StandardNiFiDataPacket(s.getBytes(ConfigConstants.DEFAULT_CHARSET),
						new HashMap<String, String>());
				}
			}));

	env.execute();
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:NiFiSinkTopologyExample.java


示例5: run

import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@Override
public void run() {
	while (running) {
		try {
			byte[] buffer = new byte[1024];

			DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

			socket.receive(packet);

			String line = new String(packet.getData(), 0, packet.getLength(), ConfigConstants.DEFAULT_CHARSET);

			lines.put(line, obj);

			synchronized (lines) {
				lines.notifyAll();
			}
		} catch (IOException ex) {
			// ignore the exceptions
		}
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:StatsDReporterTest.java


示例6: testSavepointBackendFileSystemButCheckpointBackendJobManager

import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@Test
public void testSavepointBackendFileSystemButCheckpointBackendJobManager() throws Exception {
	Configuration config = new Configuration();
	String rootPath = System.getProperty("java.io.tmpdir");
	// This combination does not make sense, because the checkpoints will be
	// lost after the job manager shuts down.
	config.setString(ConfigConstants.STATE_BACKEND, "jobmanager");
	config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
	config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, rootPath);

	SavepointStore store = SavepointStoreFactory.createFromConfig(config);
	Assert.assertTrue(store instanceof FsSavepointStore);

	FsSavepointStore stateStore = (FsSavepointStore) store;
	Assert.assertEquals(new Path(rootPath), stateStore.getRootPath());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:17,代码来源:SavepointStoreFactoryTest.java


示例7: createFactory

import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
/**
 * Creates a FixedDelayRestartStrategy from the given Configuration.
 *
 * @param configuration Configuration containing the parameter values for the restart strategy
 * @return Initialized instance of FixedDelayRestartStrategy
 * @throws Exception
 */
public static FixedDelayRestartStrategyFactory createFactory(Configuration configuration) throws Exception {
	int maxAttempts = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);

	String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY);

	long delay;

	try {
		delay = Duration.apply(delayString).toMillis();
	} catch (NumberFormatException nfe) {
		throw new Exception("Invalid config value for " +
				ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString +
				". Value must be a valid duration (such as '100 milli' or '10 s')");
	}

	return new FixedDelayRestartStrategyFactory(maxAttempts, delay);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:FixedDelayRestartStrategy.java


示例8: setup

import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@BeforeClass
public static void setup() {
	try {
		Configuration config = new Configuration();
		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS / 2);

		cluster = new LocalFlinkMiniCluster(config);

		cluster.start();

		final JobVertex jobVertex = new JobVertex("Working job vertex.");
		jobVertex.setInvokableClass(NoOpInvokable.class);
		workingJobGraph = new JobGraph("Working testing job", jobVertex);
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:22,代码来源:JobSubmissionFailsITCase.java


示例9: testLocalEnvironmentWithConfig

import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
/**
 * Ensure that the user can pass a custom configuration object to the LocalEnvironment
 */
@Test
public void testLocalEnvironmentWithConfig() throws Exception {
	Configuration conf = new Configuration();
	conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
	
	final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
	env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
	env.getConfig().disableSysoutLogging();

	DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
			.rebalance()
			.mapPartition(new RichMapPartitionFunction<Integer, Integer>() {
				@Override
				public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
					out.collect(getRuntimeContext().getIndexOfThisSubtask());
				}
			});
	List<Integer> resultCollection = result.collect();
	assertEquals(PARALLELISM, resultCollection.size());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:ExecutionEnvironmentITCase.java


示例10: initialize

import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@BeforeClass
public static void initialize() throws Exception {
	Configuration config = new Configuration();
	config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
	config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS);
	config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
	config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);

	File logDir = File.createTempFile("TestBaseUtils-logdir", null);
	assertTrue("Unable to delete temp file", logDir.delete());
	assertTrue("Unable to create temp directory", logDir.mkdir());
	File logFile = new File(logDir, "jobmanager.log");
	File outFile = new File(logDir, "jobmanager.out");
	
	Files.createFile(logFile.toPath());
	Files.createFile(outFile.toPath());
	
	config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.getAbsolutePath());
	config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());

	cluster = new LocalFlinkMiniCluster(config, false);
	cluster.start();
	
	port = cluster.webMonitor().get().getServerPort();
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:WebFrontendITCase.java


示例11: setup

import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@BeforeClass
public static void setup() {
	try {
		Configuration config = new Configuration();
		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
		config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
		config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
		config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);

		cluster = new TestingCluster(config, false);
		cluster.start(true);

		testActorSystem = AkkaUtils.createDefaultActorSystem();

		// verify that we are not in HA mode
		Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.NONE);

	} catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:NonHAAbstractQueryableStateITCase.java


示例12: setup

import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@BeforeClass
public static void setup() {
	try {
		Configuration config = new Configuration();
		config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
		config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
		config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
		config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);

		cluster = new TestingCluster(config, false);
		cluster.start(true);
	} catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:19,代码来源:QueryableStateITCase.java


示例13: before

import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@Before
public void before() throws Exception {
	system = AkkaUtils.createLocalActorSystem(new Configuration());

	Configuration config = new Configuration();
	config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
	config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
	config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
	TestingCluster testingCluster = new TestingCluster(config, false, true);
	testingCluster.start();

	jobManagerGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
	taskManager = testingCluster.getTaskManagersAsJava().get(0);

	// generate test data
	for (int i = 0; i < NUM_ITERATIONS; i++) {
		inputData.add(i, String.valueOf(i + 1));
	}

	NotifyingMapper.finished = false;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:22,代码来源:AccumulatorLiveITCase.java


示例14: setup

import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@Before
public void setup() throws Exception {
	// detect parameter change
	if (currentBackend != backend) {
		shutDownExistingCluster();

		currentBackend = backend;

		Configuration config = new Configuration();
		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slotsPerTaskManager);

		final File checkpointDir = temporaryFolder.newFolder();
		final File savepointDir = temporaryFolder.newFolder();

		config.setString(CheckpointingOptions.STATE_BACKEND, currentBackend);
		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());

		cluster = new TestingCluster(config);
		cluster.start();
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:RescalingITCase.java


示例15: fillWithData

import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
/** Create a file and fill it with content. */
private Tuple2<org.apache.hadoop.fs.Path, String> fillWithData(
	String base, String fileName, int fileIdx, String sampleLine) throws IOException, InterruptedException {

	assert (hdfs != null);

	org.apache.hadoop.fs.Path tmp =
		new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);

	FSDataOutputStream stream = hdfs.create(tmp);
	StringBuilder str = new StringBuilder();
	for (int i = 0; i < LINES_PER_FILE; i++) {
		String line = fileIdx + ": " + sampleLine + " " + i + "\n";
		str.append(line);
		stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
	}
	stream.close();
	return new Tuple2<>(tmp, str.toString());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:ContinuousFileProcessingITCase.java


示例16: setup

import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@Before
public void setup() throws Exception {

	// Flink configuration
	final Configuration config = new Configuration();

	config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
	config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM);

	final File checkpointDir = tempFolder.newFolder("checkpoints").getAbsoluteFile();
	final File savepointDir = tempFolder.newFolder("savepoints").getAbsoluteFile();

	if (!checkpointDir.exists() || !savepointDir.exists()) {
		throw new Exception("Test setup failed: failed to create (temporary) directories.");
	}

	LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
	LOG.info("Created savepoint directory: " + savepointDir + ".");

	config.setString(CheckpointingOptions.STATE_BACKEND, "memory");
	config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
	config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
	config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());

	cluster = TestBaseUtils.startCluster(config, false);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:27,代码来源:SavepointMigrationTestBase.java


示例17: readRecord

import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@Override
public Record readRecord(Record target, byte[] record, int offset, int numBytes) {
	try {
		Thread.sleep(100);
	}
	catch (InterruptedException e) {
		return null;
	}
	
	String line = new String(record, offset, numBytes, ConfigConstants.DEFAULT_CHARSET);
	
	try {
		this.key.setValue(Integer.parseInt(line.substring(0,line.indexOf("_"))));
		this.value.setValue(Integer.parseInt(line.substring(line.indexOf("_")+1,line.length())));
	}
	catch(RuntimeException re) {
		return null;
	}
	
	target.setField(0, this.key);
	target.setField(1, this.value);
	return target;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:DataSourceTaskTest.java


示例18: main

import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
public static void main(String[] args) {
	try {
		int jobManagerPort = Integer.parseInt(args[0]);

		Configuration cfg = new Configuration();
		cfg.setString(JobManagerOptions.ADDRESS, "localhost");
		cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
		cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
		cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
		cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
		cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s");

		TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg,
			ResourceID.generate(), TaskManager.class);

		// wait forever
		Object lock = new Object();
		synchronized (lock) {
			lock.wait();
		}
	}
	catch (Throwable t) {
		LOG.error("Failed to start TaskManager process", t);
		System.exit(1);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:27,代码来源:AbstractTaskManagerProcessFailureRecoveryTest.java


示例19: testConfigure

import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@Test
public void testConfigure() throws Exception {

	File confDir = tempFolder.newFolder();
	initConfDir(confDir);

	HadoopConfOverlay overlay = new HadoopConfOverlay(confDir);

	ContainerSpecification spec = new ContainerSpecification();
	overlay.configure(spec);

	assertEquals(TARGET_CONF_DIR.getPath(), spec.getEnvironmentVariables().get("HADOOP_CONF_DIR"));
	assertEquals(TARGET_CONF_DIR.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.PATH_HADOOP_CONFIG, null));

	checkArtifact(spec, new Path(TARGET_CONF_DIR, "core-site.xml"));
	checkArtifact(spec, new Path(TARGET_CONF_DIR, "hdfs-site.xml"));
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:HadoopConfOverlayTest.java


示例20: testParseStringErrors

import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@Test
public void testParseStringErrors() throws Exception {
	StringParser stringParser = new StringParser();
	stringParser.enableQuotedStringParsing((byte) '"');

	Object[][] failures = {
			{"\"string\" trailing", FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING},
			{"\"unterminated ", FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING}
	};

	for (Object[] failure : failures) {
		String input = (String) failure[0];

		int result = stringParser.parseField(input.getBytes(ConfigConstants.DEFAULT_CHARSET), 0,
			input.length(), new byte[]{'|'}, null);

		assertThat(result, is(-1));
		assertThat(stringParser.getErrorState(), is(failure[1]));
	}

}
 
开发者ID:axbaretto,项目名称:flink,代码行数:22,代码来源:CsvInputFormatTest.java



注:本文中的org.apache.flink.configuration.ConfigConstants类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java ResourceDependency类代码示例发布时间:2022-05-22
下一篇:
Java Avros类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap