本文整理汇总了Java中org.apache.flink.configuration.ConfigConstants类的典型用法代码示例。如果您正苦于以下问题:Java ConfigConstants类的具体用法?Java ConfigConstants怎么用?Java ConfigConstants使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
ConfigConstants类属于org.apache.flink.configuration包,在下文中一共展示了ConfigConstants类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: testGetPos
import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@Test
public void testGetPos() throws Exception {
FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 31, 17);
for (int i = 0; i < 64; ++i) {
Assert.assertEquals(i, stream.getPos());
stream.write(0x42);
}
stream.closeAndGetHandle();
// ----------------------------------------------------
stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 31, 17);
byte[] data = "testme!".getBytes(ConfigConstants.DEFAULT_CHARSET);
for (int i = 0; i < 7; ++i) {
Assert.assertEquals(i * (1 + data.length), stream.getPos());
stream.write(0x42);
stream.write(data);
}
stream.closeAndGetHandle();
}
开发者ID:axbaretto,项目名称:flink,代码行数:27,代码来源:FsCheckpointStateOutputStreamTest.java
示例2: createFileAndFillWithData
import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
/**
* Create a file with pre-determined String format of the form:
* {@code fileIdx +": "+ sampleLine +" "+ lineNo}.
* */
private static Tuple2<org.apache.hadoop.fs.Path, String> createFileAndFillWithData(
String base, String fileName, int fileIdx, String sampleLine) throws IOException {
assert (hdfs != null);
final String fileRandSuffix = UUID.randomUUID().toString();
org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileRandSuffix);
Assert.assertFalse(hdfs.exists(file));
org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileRandSuffix);
FSDataOutputStream stream = hdfs.create(tmp);
StringBuilder str = new StringBuilder();
for (int i = 0; i < LINES_PER_FILE; i++) {
String line = fileIdx + ": " + sampleLine + " " + i + "\n";
str.append(line);
stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
}
stream.close();
hdfs.rename(tmp, file);
Assert.assertTrue("No result file present", hdfs.exists(file));
return new Tuple2<>(file, str.toString());
}
开发者ID:axbaretto,项目名称:flink,代码行数:30,代码来源:ContinuousFileProcessingTest.java
示例3: readObject
import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
byte codecByte = in.readByte();
if (codecByte >= 0) {
setCodec(Codec.forCodecByte(codecByte));
}
int length = in.readInt();
if (length != 0) {
byte[] json = new byte[length];
in.readFully(json);
Schema schema = new Schema.Parser().parse(new String(json, ConfigConstants.DEFAULT_CHARSET));
setSchema(schema);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:AvroOutputFormat.java
示例4: main
import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data from Flink")
.buildConfig();
DataStreamSink<String> dataStream = env.fromElements("one", "two", "three", "four", "five", "q")
.addSink(new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<String>() {
@Override
public NiFiDataPacket createNiFiDataPacket(String s, RuntimeContext ctx) {
return new StandardNiFiDataPacket(s.getBytes(ConfigConstants.DEFAULT_CHARSET),
new HashMap<String, String>());
}
}));
env.execute();
}
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:NiFiSinkTopologyExample.java
示例5: run
import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@Override
public void run() {
while (running) {
try {
byte[] buffer = new byte[1024];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
socket.receive(packet);
String line = new String(packet.getData(), 0, packet.getLength(), ConfigConstants.DEFAULT_CHARSET);
lines.put(line, obj);
synchronized (lines) {
lines.notifyAll();
}
} catch (IOException ex) {
// ignore the exceptions
}
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:StatsDReporterTest.java
示例6: testSavepointBackendFileSystemButCheckpointBackendJobManager
import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@Test
public void testSavepointBackendFileSystemButCheckpointBackendJobManager() throws Exception {
Configuration config = new Configuration();
String rootPath = System.getProperty("java.io.tmpdir");
// This combination does not make sense, because the checkpoints will be
// lost after the job manager shuts down.
config.setString(ConfigConstants.STATE_BACKEND, "jobmanager");
config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, rootPath);
SavepointStore store = SavepointStoreFactory.createFromConfig(config);
Assert.assertTrue(store instanceof FsSavepointStore);
FsSavepointStore stateStore = (FsSavepointStore) store;
Assert.assertEquals(new Path(rootPath), stateStore.getRootPath());
}
开发者ID:axbaretto,项目名称:flink,代码行数:17,代码来源:SavepointStoreFactoryTest.java
示例7: createFactory
import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
/**
* Creates a FixedDelayRestartStrategy from the given Configuration.
*
* @param configuration Configuration containing the parameter values for the restart strategy
* @return Initialized instance of FixedDelayRestartStrategy
* @throws Exception
*/
public static FixedDelayRestartStrategyFactory createFactory(Configuration configuration) throws Exception {
int maxAttempts = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY);
long delay;
try {
delay = Duration.apply(delayString).toMillis();
} catch (NumberFormatException nfe) {
throw new Exception("Invalid config value for " +
ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString +
". Value must be a valid duration (such as '100 milli' or '10 s')");
}
return new FixedDelayRestartStrategyFactory(maxAttempts, delay);
}
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:FixedDelayRestartStrategy.java
示例8: setup
import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@BeforeClass
public static void setup() {
try {
Configuration config = new Configuration();
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS / 2);
cluster = new LocalFlinkMiniCluster(config);
cluster.start();
final JobVertex jobVertex = new JobVertex("Working job vertex.");
jobVertex.setInvokableClass(NoOpInvokable.class);
workingJobGraph = new JobGraph("Working testing job", jobVertex);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:22,代码来源:JobSubmissionFailsITCase.java
示例9: testLocalEnvironmentWithConfig
import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
/**
* Ensure that the user can pass a custom configuration object to the LocalEnvironment
*/
@Test
public void testLocalEnvironmentWithConfig() throws Exception {
Configuration conf = new Configuration();
conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
env.getConfig().disableSysoutLogging();
DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
.rebalance()
.mapPartition(new RichMapPartitionFunction<Integer, Integer>() {
@Override
public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
out.collect(getRuntimeContext().getIndexOfThisSubtask());
}
});
List<Integer> resultCollection = result.collect();
assertEquals(PARALLELISM, resultCollection.size());
}
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:ExecutionEnvironmentITCase.java
示例10: initialize
import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@BeforeClass
public static void initialize() throws Exception {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
File logDir = File.createTempFile("TestBaseUtils-logdir", null);
assertTrue("Unable to delete temp file", logDir.delete());
assertTrue("Unable to create temp directory", logDir.mkdir());
File logFile = new File(logDir, "jobmanager.log");
File outFile = new File(logDir, "jobmanager.out");
Files.createFile(logFile.toPath());
Files.createFile(outFile.toPath());
config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.getAbsolutePath());
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());
cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
port = cluster.webMonitor().get().getServerPort();
}
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:WebFrontendITCase.java
示例11: setup
import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@BeforeClass
public static void setup() {
try {
Configuration config = new Configuration();
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
cluster = new TestingCluster(config, false);
cluster.start(true);
testActorSystem = AkkaUtils.createDefaultActorSystem();
// verify that we are not in HA mode
Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.NONE);
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:NonHAAbstractQueryableStateITCase.java
示例12: setup
import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@BeforeClass
public static void setup() {
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
cluster = new TestingCluster(config, false);
cluster.start(true);
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:19,代码来源:QueryableStateITCase.java
示例13: before
import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@Before
public void before() throws Exception {
system = AkkaUtils.createLocalActorSystem(new Configuration());
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
TestingCluster testingCluster = new TestingCluster(config, false, true);
testingCluster.start();
jobManagerGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
taskManager = testingCluster.getTaskManagersAsJava().get(0);
// generate test data
for (int i = 0; i < NUM_ITERATIONS; i++) {
inputData.add(i, String.valueOf(i + 1));
}
NotifyingMapper.finished = false;
}
开发者ID:axbaretto,项目名称:flink,代码行数:22,代码来源:AccumulatorLiveITCase.java
示例14: setup
import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@Before
public void setup() throws Exception {
// detect parameter change
if (currentBackend != backend) {
shutDownExistingCluster();
currentBackend = backend;
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slotsPerTaskManager);
final File checkpointDir = temporaryFolder.newFolder();
final File savepointDir = temporaryFolder.newFolder();
config.setString(CheckpointingOptions.STATE_BACKEND, currentBackend);
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
cluster = new TestingCluster(config);
cluster.start();
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:RescalingITCase.java
示例15: fillWithData
import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
/** Create a file and fill it with content. */
private Tuple2<org.apache.hadoop.fs.Path, String> fillWithData(
String base, String fileName, int fileIdx, String sampleLine) throws IOException, InterruptedException {
assert (hdfs != null);
org.apache.hadoop.fs.Path tmp =
new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
FSDataOutputStream stream = hdfs.create(tmp);
StringBuilder str = new StringBuilder();
for (int i = 0; i < LINES_PER_FILE; i++) {
String line = fileIdx + ": " + sampleLine + " " + i + "\n";
str.append(line);
stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
}
stream.close();
return new Tuple2<>(tmp, str.toString());
}
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:ContinuousFileProcessingITCase.java
示例16: setup
import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@Before
public void setup() throws Exception {
// Flink configuration
final Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
final File checkpointDir = tempFolder.newFolder("checkpoints").getAbsoluteFile();
final File savepointDir = tempFolder.newFolder("savepoints").getAbsoluteFile();
if (!checkpointDir.exists() || !savepointDir.exists()) {
throw new Exception("Test setup failed: failed to create (temporary) directories.");
}
LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
LOG.info("Created savepoint directory: " + savepointDir + ".");
config.setString(CheckpointingOptions.STATE_BACKEND, "memory");
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
cluster = TestBaseUtils.startCluster(config, false);
}
开发者ID:axbaretto,项目名称:flink,代码行数:27,代码来源:SavepointMigrationTestBase.java
示例17: readRecord
import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@Override
public Record readRecord(Record target, byte[] record, int offset, int numBytes) {
try {
Thread.sleep(100);
}
catch (InterruptedException e) {
return null;
}
String line = new String(record, offset, numBytes, ConfigConstants.DEFAULT_CHARSET);
try {
this.key.setValue(Integer.parseInt(line.substring(0,line.indexOf("_"))));
this.value.setValue(Integer.parseInt(line.substring(line.indexOf("_")+1,line.length())));
}
catch(RuntimeException re) {
return null;
}
target.setField(0, this.key);
target.setField(1, this.value);
return target;
}
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:DataSourceTaskTest.java
示例18: main
import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
public static void main(String[] args) {
try {
int jobManagerPort = Integer.parseInt(args[0]);
Configuration cfg = new Configuration();
cfg.setString(JobManagerOptions.ADDRESS, "localhost");
cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg,
ResourceID.generate(), TaskManager.class);
// wait forever
Object lock = new Object();
synchronized (lock) {
lock.wait();
}
}
catch (Throwable t) {
LOG.error("Failed to start TaskManager process", t);
System.exit(1);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:27,代码来源:AbstractTaskManagerProcessFailureRecoveryTest.java
示例19: testConfigure
import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@Test
public void testConfigure() throws Exception {
File confDir = tempFolder.newFolder();
initConfDir(confDir);
HadoopConfOverlay overlay = new HadoopConfOverlay(confDir);
ContainerSpecification spec = new ContainerSpecification();
overlay.configure(spec);
assertEquals(TARGET_CONF_DIR.getPath(), spec.getEnvironmentVariables().get("HADOOP_CONF_DIR"));
assertEquals(TARGET_CONF_DIR.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.PATH_HADOOP_CONFIG, null));
checkArtifact(spec, new Path(TARGET_CONF_DIR, "core-site.xml"));
checkArtifact(spec, new Path(TARGET_CONF_DIR, "hdfs-site.xml"));
}
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:HadoopConfOverlayTest.java
示例20: testParseStringErrors
import org.apache.flink.configuration.ConfigConstants; //导入依赖的package包/类
@Test
public void testParseStringErrors() throws Exception {
StringParser stringParser = new StringParser();
stringParser.enableQuotedStringParsing((byte) '"');
Object[][] failures = {
{"\"string\" trailing", FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING},
{"\"unterminated ", FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING}
};
for (Object[] failure : failures) {
String input = (String) failure[0];
int result = stringParser.parseField(input.getBytes(ConfigConstants.DEFAULT_CHARSET), 0,
input.length(), new byte[]{'|'}, null);
assertThat(result, is(-1));
assertThat(stringParser.getErrorState(), is(failure[1]));
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:22,代码来源:CsvInputFormatTest.java
注:本文中的org.apache.flink.configuration.ConfigConstants类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论