本文整理汇总了Java中org.apache.flink.api.common.io.OutputFormat类的典型用法代码示例。如果您正苦于以下问题:Java OutputFormat类的具体用法?Java OutputFormat怎么用?Java OutputFormat使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
OutputFormat类属于org.apache.flink.api.common.io包,在下文中一共展示了OutputFormat类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: testCassandraBatchFormats
import org.apache.flink.api.common.io.OutputFormat; //导入依赖的package包/类
@Test
public void testCassandraBatchFormats() throws Exception {
OutputFormat<Tuple3<String, Integer, Integer>> sink = new CassandraOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder);
sink.configure(new Configuration());
sink.open(0, 1);
for (Tuple3<String, Integer, Integer> value : collection) {
sink.writeRecord(value);
}
sink.close();
InputFormat<Tuple3<String, Integer, Integer>, InputSplit> source = new CassandraInputFormat<>(injectTableName(SELECT_DATA_QUERY), builder);
source.configure(new Configuration());
source.open(null);
List<Tuple3<String, Integer, Integer>> result = new ArrayList<>();
while (!source.reachedEnd()) {
result.add(source.nextRecord(new Tuple3<String, Integer, Integer>()));
}
source.close();
Assert.assertEquals(20, result.size());
}
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:CassandraConnectorITCase.java
示例2: writeAsCsv
import org.apache.flink.api.common.io.OutputFormat; //导入依赖的package包/类
/**
* Writes a DataStream to the file specified by the path parameter. The
* writing is performed periodically every millis milliseconds.
*
* <p>For every field of an element of the DataStream the result of {@link Object#toString()}
* is written. This method can only be used on data streams of tuples.
*
* @param path
* the path pointing to the location the text file is written to
* @param writeMode
* Controls the behavior for existing files. Options are
* NO_OVERWRITE and OVERWRITE.
* @param rowDelimiter
* the delimiter for two rows
* @param fieldDelimiter
* the delimiter for two fields
*
* @return the closed DataStream
*/
@SuppressWarnings("unchecked")
@PublicEvolving
public <X extends Tuple> DataStreamSink<T> writeAsCsv(
String path,
WriteMode writeMode,
String rowDelimiter,
String fieldDelimiter) {
Preconditions.checkArgument(
getType().isTupleType(),
"The writeAsCsv() method can only be used on data streams of tuples.");
CsvOutputFormat<X> of = new CsvOutputFormat<>(
new Path(path),
rowDelimiter,
fieldDelimiter);
if (writeMode != null) {
of.setWriteMode(writeMode);
}
return writeUsingOutputFormat((OutputFormat<T>) of);
}
开发者ID:axbaretto,项目名称:flink,代码行数:42,代码来源:DataStream.java
示例3: readObject
import org.apache.flink.api.common.io.OutputFormat; //导入依赖的package包/类
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
hadoopOutputFormatName = in.readUTF();
if(jobConf == null) {
jobConf = new JobConf();
}
jobConf.readFields(in);
try {
this.hadoopOutputFormat = (org.apache.hadoop.mapred.OutputFormat<K,V>) Class.forName(this.hadoopOutputFormatName).newInstance();
} catch (Exception e) {
throw new RuntimeException("Unable to instantiate the hadoop output format", e);
}
ReflectionUtils.setConf(hadoopOutputFormat, jobConf);
converter = (FlinkTypeConverter<K,V>) in.readObject();
fileOutputCommitterWrapper = (HadoopFileOutputCommitter) in.readObject();
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:17,代码来源:HadoopRecordOutputFormat.java
示例4: readObject
import org.apache.flink.api.common.io.OutputFormat; //导入依赖的package包/类
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
String hadoopOutputFormatClassName = in.readUTF();
org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
configuration.readFields(in);
if(this.configuration == null) {
this.configuration = configuration;
}
try {
this.mapreduceOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat<K,V>) Class.forName(hadoopOutputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
} catch (Exception e) {
throw new RuntimeException("Unable to instantiate the hadoop output format", e);
}
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:18,代码来源:HadoopOutputFormat.java
示例5: DataSink
import org.apache.flink.api.common.io.OutputFormat; //导入依赖的package包/类
public DataSink(DataSet<T> data, OutputFormat<T> format, TypeInformation<T> type) {
if (format == null) {
throw new IllegalArgumentException("The output format must not be null.");
}
if (type == null) {
throw new IllegalArgumentException("The input type information must not be null.");
}
if (data == null) {
throw new IllegalArgumentException("The data set must not be null.");
}
this.format = format;
this.data = data;
this.type = type;
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:17,代码来源:DataSink.java
示例6: initializeOnMaster
import org.apache.flink.api.common.io.OutputFormat; //导入依赖的package包/类
@Override
public void initializeOnMaster(ClassLoader loader) throws Exception {
if (this.outputFormat == null) {
TaskConfig cfg = new TaskConfig(getConfiguration());
UserCodeWrapper<OutputFormat<?>> wrapper = cfg.<OutputFormat<?>>getStubWrapper(loader);
if (wrapper == null) {
throw new Exception("No output format present in OutputFormatVertex's task configuration.");
}
this.outputFormat = wrapper.getUserCodeObject(OutputFormat.class, loader);
this.outputFormat.configure(cfg.getStubParameters());
}
if (this.outputFormat instanceof InitializeOnMaster) {
((InitializeOnMaster) this.outputFormat).initializeGlobal(getParallelism());
}
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:19,代码来源:OutputFormatVertex.java
示例7: DataSink
import org.apache.flink.api.common.io.OutputFormat; //导入依赖的package包/类
public DataSink(DataSet<T> data, OutputFormat<T> format, TypeInformation<T> type) {
if (format == null) {
throw new IllegalArgumentException("The output format must not be null.");
}
if (type == null) {
throw new IllegalArgumentException("The input type information must not be null.");
}
if (data == null) {
throw new IllegalArgumentException("The data set must not be null.");
}
this.format = format;
this.data = data;
this.type = type;
}
开发者ID:axbaretto,项目名称:flink,代码行数:16,代码来源:DataSink.java
示例8: internalWriteAsCsv
import org.apache.flink.api.common.io.OutputFormat; //导入依赖的package包/类
@SuppressWarnings("unchecked")
private <X extends Tuple> DataSink<T> internalWriteAsCsv(Path filePath, String rowDelimiter, String fieldDelimiter, WriteMode wm) {
Preconditions.checkArgument(getType().isTupleType(), "The writeAsCsv() method can only be used on data sets of tuples.");
CsvOutputFormat<X> of = new CsvOutputFormat<>(filePath, rowDelimiter, fieldDelimiter);
if (wm != null) {
of.setWriteMode(wm);
}
return output((OutputFormat<T>) of);
}
开发者ID:axbaretto,项目名称:flink,代码行数:10,代码来源:DataSet.java
示例9: output
import org.apache.flink.api.common.io.OutputFormat; //导入依赖的package包/类
/**
* Emits a DataSet using an {@link OutputFormat}. This method adds a data sink to the program.
* Programs may have multiple data sinks. A DataSet may also have multiple consumers (data sinks
* or transformations) at the same time.
*
* @param outputFormat The OutputFormat to process the DataSet.
* @return The DataSink that processes the DataSet.
*
* @see OutputFormat
* @see DataSink
*/
public DataSink<T> output(OutputFormat<T> outputFormat) {
Preconditions.checkNotNull(outputFormat);
// configure the type if needed
if (outputFormat instanceof InputTypeConfigurable) {
((InputTypeConfigurable) outputFormat).setInputType(getType(), context.getConfig());
}
DataSink<T> sink = new DataSink<>(this, outputFormat, getType());
this.context.registerDataSink(sink);
return sink;
}
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:DataSet.java
示例10: initOutputFormat
import org.apache.flink.api.common.io.OutputFormat; //导入依赖的package包/类
/**
* Initializes the OutputFormat implementation and configuration.
*
* @throws RuntimeException
* Throws if instance of OutputFormat implementation can not be
* obtained.
*/
private void initOutputFormat() {
ClassLoader userCodeClassLoader = getUserCodeClassLoader();
// obtain task configuration (including stub parameters)
Configuration taskConf = getTaskConfiguration();
this.config = new TaskConfig(taskConf);
try {
this.format = config.<OutputFormat<IT>>getStubWrapper(userCodeClassLoader).getUserCodeObject(OutputFormat.class, userCodeClassLoader);
// check if the class is a subclass, if the check is required
if (!OutputFormat.class.isAssignableFrom(this.format.getClass())) {
throw new RuntimeException("The class '" + this.format.getClass().getName() + "' is not a subclass of '" +
OutputFormat.class.getName() + "' as is required.");
}
}
catch (ClassCastException ccex) {
throw new RuntimeException("The stub class is not a proper subclass of " + OutputFormat.class.getName(), ccex);
}
Thread thread = Thread.currentThread();
ClassLoader original = thread.getContextClassLoader();
// configure the stub. catch exceptions here extra, to report them as originating from the user code
try {
thread.setContextClassLoader(userCodeClassLoader);
this.format.configure(this.config.getStubParameters());
}
catch (Throwable t) {
throw new RuntimeException("The user defined 'configure()' method in the Output Format caused an error: "
+ t.getMessage(), t);
}
finally {
thread.setContextClassLoader(original);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:42,代码来源:DataSinkTask.java
示例11: setRuntimeContext
import org.apache.flink.api.common.io.OutputFormat; //导入依赖的package包/类
@Test
public void setRuntimeContext() throws Exception {
RuntimeContext mockRuntimeContext = Mockito.mock(RuntimeContext.class);
// Make sure setRuntimeContext of the rich output format is called
RichOutputFormat<?> mockRichOutputFormat = Mockito.mock(RichOutputFormat.class);
new OutputFormatSinkFunction<>(mockRichOutputFormat).setRuntimeContext(mockRuntimeContext);
Mockito.verify(mockRichOutputFormat, Mockito.times(1)).setRuntimeContext(Mockito.eq(mockRuntimeContext));
// Make sure setRuntimeContext work well when output format is not RichOutputFormat
OutputFormat<?> mockOutputFormat = Mockito.mock(OutputFormat.class);
new OutputFormatSinkFunction<>(mockOutputFormat).setRuntimeContext(mockRuntimeContext);
}
开发者ID:axbaretto,项目名称:flink,代码行数:14,代码来源:OutputFormatSinkFunctionTest.java
示例12: readObject
import org.apache.flink.api.common.io.OutputFormat; //导入依赖的package包/类
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
String hadoopOutputFormatName = in.readUTF();
if(jobConf == null) {
jobConf = new JobConf();
}
jobConf.readFields(in);
try {
this.mapredOutputFormat = (org.apache.hadoop.mapred.OutputFormat<K,V>) Class.forName(hadoopOutputFormatName, true, Thread.currentThread().getContextClassLoader()).newInstance();
} catch (Exception e) {
throw new RuntimeException("Unable to instantiate the hadoop output format", e);
}
ReflectionUtils.setConf(mapredOutputFormat, jobConf);
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:15,代码来源:HadoopOutputFormat.java
示例13: HadoopRecordOutputFormat
import org.apache.flink.api.common.io.OutputFormat; //导入依赖的package包/类
public HadoopRecordOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> hadoopFormat, JobConf job, FlinkTypeConverter<K,V> conv) {
super();
this.hadoopOutputFormat = hadoopFormat;
this.hadoopOutputFormatName = hadoopFormat.getClass().getName();
this.converter = conv;
this.fileOutputCommitterWrapper = new HadoopFileOutputCommitter();
HadoopUtils.mergeHadoopConf(job);
this.jobConf = job;
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:10,代码来源:HadoopRecordOutputFormat.java
示例14: internalWriteAsCsv
import org.apache.flink.api.common.io.OutputFormat; //导入依赖的package包/类
@SuppressWarnings("unchecked")
private <X extends Tuple> DataSink<T> internalWriteAsCsv(Path filePath, String rowDelimiter, String fieldDelimiter, WriteMode wm) {
Validate.isTrue(this.type.isTupleType(), "The writeAsCsv() method can only be used on data sets of tuples.");
CsvOutputFormat<X> of = new CsvOutputFormat<X>(filePath, rowDelimiter, fieldDelimiter);
if(wm != null) {
of.setWriteMode(wm);
}
return output((OutputFormat<T>) of);
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:10,代码来源:DataSet.java
示例15: output
import org.apache.flink.api.common.io.OutputFormat; //导入依赖的package包/类
/**
* Emits a DataSet using an {@link OutputFormat}. This method adds a data sink to the program.
* Programs may have multiple data sinks. A DataSet may also have multiple consumers (data sinks
* or transformations) at the same time.
*
* @param outputFormat The OutputFormat to process the DataSet.
* @return The DataSink that processes the DataSet.
*
* @see OutputFormat
* @see DataSink
*/
public DataSink<T> output(OutputFormat<T> outputFormat) {
Validate.notNull(outputFormat);
// configure the type if needed
if (outputFormat instanceof InputTypeConfigurable) {
((InputTypeConfigurable) outputFormat).setInputType(this.type);
}
DataSink<T> sink = new DataSink<T>(this, outputFormat, this.type);
this.context.registerDataSink(sink);
return sink;
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:24,代码来源:DataSet.java
示例16: executeOnCollections
import org.apache.flink.api.common.io.OutputFormat; //导入依赖的package包/类
protected void executeOnCollections(List<IN> inputData) throws Exception {
OutputFormat<IN> format = this.formatWrapper.getUserCodeObject();
format.configure(this.parameters);
format.open(0, 1);
for (IN element : inputData) {
format.writeRecord(element);
}
format.close();
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:11,代码来源:GenericDataSinkBase.java
示例17: cancel
import org.apache.flink.api.common.io.OutputFormat; //导入依赖的package包/类
@Override
public void cancel() throws Exception {
this.taskCanceled = true;
OutputFormat<IT> format = this.format;
if (format != null) {
try {
this.format.close();
} catch (Throwable t) {}
}
if (LOG.isDebugEnabled()) {
LOG.debug(getLogString("Cancelling data sink operator"));
}
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:15,代码来源:DataSinkTask.java
示例18: initOutputFormat
import org.apache.flink.api.common.io.OutputFormat; //导入依赖的package包/类
/**
* Initializes the OutputFormat implementation and configuration.
*
* @throws RuntimeException
* Throws if instance of OutputFormat implementation can not be
* obtained.
*/
private void initOutputFormat() {
ClassLoader userCodeClassLoader = getUserCodeClassLoader();
// obtain task configuration (including stub parameters)
Configuration taskConf = getTaskConfiguration();
this.config = new TaskConfig(taskConf);
try {
this.format = config.<OutputFormat<IT>>getStubWrapper(userCodeClassLoader).getUserCodeObject(OutputFormat.class, userCodeClassLoader);
// check if the class is a subclass, if the check is required
if (!OutputFormat.class.isAssignableFrom(this.format.getClass())) {
throw new RuntimeException("The class '" + this.format.getClass().getName() + "' is not a subclass of '" +
OutputFormat.class.getName() + "' as is required.");
}
}
catch (ClassCastException ccex) {
throw new RuntimeException("The stub class is not a proper subclass of " + OutputFormat.class.getName(), ccex);
}
// configure the stub. catch exceptions here extra, to report them as originating from the user code
try {
this.format.configure(this.config.getStubParameters());
}
catch (Throwable t) {
throw new RuntimeException("The user defined 'configure()' method in the Output Format caused an error: "
+ t.getMessage(), t);
}
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:36,代码来源:DataSinkTask.java
示例19: getFormat
import org.apache.flink.api.common.io.OutputFormat; //导入依赖的package包/类
@Internal
public OutputFormat<T> getFormat() {
return format;
}
开发者ID:axbaretto,项目名称:flink,代码行数:5,代码来源:DataSink.java
示例20: executeOnCollections
import org.apache.flink.api.common.io.OutputFormat; //导入依赖的package包/类
@SuppressWarnings("unchecked")
protected void executeOnCollections(List<IN> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
OutputFormat<IN> format = this.formatWrapper.getUserCodeObject();
TypeInformation<IN> inputType = getInput().getOperatorInfo().getOutputType();
if (this.localOrdering != null) {
int[] sortColumns = this.localOrdering.getFieldPositions();
boolean[] sortOrderings = this.localOrdering.getFieldSortDirections();
final TypeComparator<IN> sortComparator;
if (inputType instanceof CompositeType) {
sortComparator = ((CompositeType<IN>) inputType).createComparator(sortColumns, sortOrderings, 0, executionConfig);
} else if (inputType instanceof AtomicType) {
sortComparator = ((AtomicType<IN>) inputType).createComparator(sortOrderings[0], executionConfig);
} else {
throw new UnsupportedOperationException("Local output sorting does not support type "+inputType+" yet.");
}
Collections.sort(inputData, new Comparator<IN>() {
@Override
public int compare(IN o1, IN o2) {
return sortComparator.compare(o1, o2);
}
});
}
if(format instanceof InitializeOnMaster) {
((InitializeOnMaster)format).initializeGlobal(1);
}
format.configure(this.parameters);
if(format instanceof RichOutputFormat){
((RichOutputFormat<?>) format).setRuntimeContext(ctx);
}
format.open(0, 1);
for (IN element : inputData) {
format.writeRecord(element);
}
format.close();
if(format instanceof FinalizeOnMaster) {
((FinalizeOnMaster)format).finalizeGlobal(1);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:46,代码来源:GenericDataSinkBase.java
注:本文中的org.apache.flink.api.common.io.OutputFormat类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论