Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
230 views
in Technique[技术] by (71.8m points)

apache flink - Update external Database in RichCoFlatMapFunction

I have a RichCoFlatMapFunction

DataStream<Metadata> metadataKeyedStream =
    env.addSource(metadataStream)
        .keyBy(Metadata::getId);

SingleOutputStreamOperator<Output> outputStream =
    env.addSource(recordStream)
        .assignTimestampsAndWatermarks(new RecordTimeExtractor())
        .keyBy(Record::getId)
        .connect(metadataKeyedStream)
        .flatMap(new CustomCoFlatMap(metadataTable.listAllAsMap()));

public class CustomCoFlatMap extends RichCoFlatMapFunction<Record, Metadata, Output> {

    private transient Map<String, Metadata> datasource;
    private transient ValueState<String, Metadata> metadataState;

    @Inject
    public void setDataSource(Map<String, Metadata> datasource) {
        this.datasource = datasource;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        // read ValueState
        metadataState = getRuntimeContext().getState(
            new ValueStateDescriptor<String, Metadata>("metadataState", Metadata.class));
    }

    @Override
    public void flatMap2(Metadata metadata, Collector<Output> collector) throws Exception {
        // if metadata record is removed from table, removing the same from local state
        if(metadata.getEventName().equals("REMOVE")) {
            metadataState.clear();
            return;
        }
        // update metadata in ValueState
        this.metadataState.update(metadata);
    }

    @Override
    public void flatMap1(Record record, Collector<Output> collector) throws Exception {
        
        Metadata metadata = this.metadataState.value();
        // if metadata is not present in ValueState
        if(metadata == null) {
            // get metadata from datasource
            metadata = datasource.get(record.getId());
            // if metadata found in datasource, add it to ValueState
            if(metadata != null) {
                metadataState.update(metadata);
                Output output = new Output(record.getId(), metadataState.getName(), 
                    metadataState.getVersion(), metadata.getType());
                if(metadata.getId() == 123) {
                    // here I want to update metadata into another Database
                    // can I do it here directly ?
                }
                collector.collect(output);
            }
        }
    }
}

Here, in flatmap1 method, I want to update a database. Can I do that operation in flatmap1, I am asking this because it involves some wait time to query DB and then update db.

question from:https://stackoverflow.com/questions/65837859/update-external-database-in-richcoflatmapfunction

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

While it in principle it is possible to do this, it's not a good idea. Doing synchronous i/o in a Flink user function causes two problems:

  1. You are tying up considerable resources that are spending most of their time idle, waiting for a response.
  2. While waiting, that operator is creating backpressure that prevents checkpoint barriers from making progress. This can easily cause occasional checkpoint timeouts and job failures.

It would be better to use a KeyedCoProcessFunction instead, and emit the intended database update as a side output. This can then be handled downstream either by a database sink or by using a RichAsyncFunction.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...