本文整理汇总了Java中org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction类的典型用法代码示例。如果您正苦于以下问题:Java ElasticsearchSinkFunction类的具体用法?Java ElasticsearchSinkFunction怎么用?Java ElasticsearchSinkFunction使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
ElasticsearchSinkFunction类属于org.apache.flink.streaming.connectors.elasticsearch包,在下文中一共展示了ElasticsearchSinkFunction类的6个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: main
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return "message #" + value;
}
});
Map<String, String> userConfig = new HashMap<>();
userConfig.put("cluster.name", "elasticsearch");
// This instructs the sink to emit after every element, otherwise they would be buffered
userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
List<TransportAddress> transports = new ArrayList<>();
transports.add(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));
env.execute("Elasticsearch Sink Example");
}
开发者ID:axbaretto,项目名称:flink,代码行数:29,代码来源:ElasticsearchSinkExample.java
示例2: createElasticsearchSinkForEmbeddedNode
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; //导入依赖的package包/类
@Override
protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(
Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception {
List<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
return new ElasticsearchSink<>(userConfig, transports, elasticsearchSinkFunction);
}
开发者ID:axbaretto,项目名称:flink,代码行数:10,代码来源:ElasticsearchSinkITCase.java
示例3: ElasticsearchSink
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; //导入依赖的package包/类
/**
* Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
*
* @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
* @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
* @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
*/
public ElasticsearchSink(
Map<String, String> userConfig,
List<InetSocketAddress> transportAddresses,
ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpFailureHandler());
}
开发者ID:axbaretto,项目名称:flink,代码行数:15,代码来源:ElasticsearchSink.java
示例4: main
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return "message #" + value;
}
});
Map<String, String> userConfig = new HashMap<>();
userConfig.put("cluster.name", "elasticsearch");
// This instructs the sink to emit after every element, otherwise they would be buffered
userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
List<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));
env.execute("Elasticsearch Sink Example");
}
开发者ID:axbaretto,项目名称:flink,代码行数:29,代码来源:ElasticsearchSinkExample.java
示例5: ESSink
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; //导入依赖的package包/类
public ESSink(ESProperties config, ElasticsearchSinkFunction<T> function) {
super(config, config.getTransportAddresses(), function);
}
开发者ID:gmarciani,项目名称:flink-scaffolding,代码行数:4,代码来源:ESSink.java
示例6: createElasticsearchSink
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; //导入依赖的package包/类
@Override
protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig,
List<InetSocketAddress> transportAddresses,
ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
return new ElasticsearchSink<>(userConfig, transportAddresses, elasticsearchSinkFunction);
}
开发者ID:axbaretto,项目名称:flink,代码行数:7,代码来源:ElasticsearchSinkITCase.java
注:本文中的org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论