Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
166 views
in Technique[技术] by (71.8m points)

python - GCP Dataflow + Apache Beam - caching question

I am new-ish to GCP, Dataflow, Apache Beam, Python, and OOP in general. I come from the land of functional javascript, for context.

Right now I have a streaming pipeline built with the Apache Beam python sdk, and I deploy it to GCP's Dataflow. The pipeline's source is a pubsub subscription, and the sink is a datastore.

The pipeline picks up a message from a pubsub subscription, makes a decision based on a configuration object + the contents of the message, and then puts it in the appropriate spot in the datastore depending on what decision it makes. This is all working presently.

Now I am in a situation where the configuration object, which is currently hardcoded, needs to be more dynamic. By that I mean: instead of just hardcoding the configuration object, we are now instead going to make an API call that will return the configuration. That way, we can update the configuration without having to redeploy the pipeline. This also works presently.

But! We are anticipating heavy traffic, so it is NOT ideal to fetch the configuration for every single message that comes in. So we are moving the fetch to the beginning, right before the actual pipeline starts. But this means we immediately lose the value in having it come from an API call, because the API call only happens one time when the pipeline starts up.

Here is what we have so far (stripped out irrelevant parts for clarity):

def run(argv=None):

    options = PipelineOptions(
        streaming=True,
        save_main_session=True
    )

    configuration = get_configuration() # api call to fetch config

    with beam.Pipeline(options=options) as pipeline:

        # read incoming messages from pubsub
        incoming_messages = (
            pipeline
            | "Read Messages From PubSub"
            >> beam.io.ReadFromPubSub(subscription=f"our subscription here", with_attributes=True))

         # make a decision based off of the message + the config
         decision_messages = (
                incoming_messages
                | "Create Decision Messages" >> beam.FlatMap(create_decision_message, configuration)
        )

create_decision_message takes in the incoming message from the stream + the configuration file and then, you guessed it, makes a decision. It is pretty simple logic. Think "if the message is apples, and the configuration says we only care about oranges, then do nothing with the message". We need to be able to update it on the fly to say "nevermind, we care about apples too now suddenly".

I need to figure out a way to let the pipeline know it needs to re-fetch that configuration file every 15 minutes. I'm not totally sure what is the best way to do that with the tools I'm using. If it were javascript, I would do something like:

(please forgive the pseudo-code, not sure if this would actually run but you get the idea)


let fetch_time = Date.now()  // initialized when app starts
let expiration = 900 // 900 seconds = 15 mins 
let config = getConfigFromApi() // fetch config right when app starts

function fetchConfig(now){
    if (fetch_time + expiration < now) { 
      // if fetch_time + expiration is less than the current time, we need to re-fetch the config
      config = getConfigFromApi() // assign new value to config var
      fetch_time = now // assign new value to fetch_time var
  } 
    return config 
}

...

const someLaterTime = Date.now() // later in the code, within the pipeline, I need to use the config object
const validConfig = fetchConfig(someLaterTime) // i pass in the current time and get back either the memory-cached config, or a just-recently-fetched config

I'm not really sure how to translate this concept to python, and I'm not really sure if I should. Is this a reasonable thing to try to pull off? Or is this type of behavior not congruent with the stack I'm using? I'm in a position where I'm the only one on my team working on this, and it is a greenfield project, so there are no examples anywhere of how it's been done in the past. I am not sure if I should try to figure this out, or if I should say "sorry bossman, we need another solution".

Any help is appreciated, no matter how small... thank you!

question from:https://stackoverflow.com/questions/65892803/gcp-dataflow-apache-beam-caching-question

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

I think there are multiple ways to implement what you want to achieve, the most straight-forward way is probably through stateful processing in which you record your config through state in a Stateful DoFn and set a looping timer to refresh the record.

You can read more about stateful processing here https://beam.apache.org/blog/timely-processing/

more from the beam programming guide about the state and timer: https://beam.apache.org/documentation/programming-guide/#types-of-state.

I would image that you can define your processing logic which requires the config in a ParDo like:

class MakeDecision(beam.DoFn):
  CONFIG = ReadModifyWriteState('config', coders.StrUtf8Coder())
  REFRESH_TIMER = TimerSpec('output', TimeDomain.REAL_TIME)

  def process(self,
              element,
              config=DoFn.StateParam(CONFIG),
              timer=DoFn.TimerParam(REFRESH_TIMER)):
    valid_config={}
    if config.read():
      valid_config=json.loads(config.read())
    else: # config is None and hasn't been fetched before.
      valid_config=fetch_config() # your own fetch function.
      config.write(json.dumps(valid_config))
      timer.set(Timestamp.now() + Duration(seconds=900))
    # Do what ever you need with the config.
    ...
  
  @on_timer(REFRESH_TIMER)
  def refresh_config(self,
                     config=DoFn.StateParam(CONFIG),
                     timer=DoFn.TimerParam(REFRESH_TIMER)):
      valid_config=fetch_config()
      config.write(json.dumps(valid_config))
      timer.set(Timestamp.now() + Duration(seconds=900))

And then you can now process your messages with the Stateful DoFn.

with beam.Pipeline(options=options) as pipeline:
    pipeline
            | "Read Messages From PubSub"
            >> beam.io.ReadFromPubSub(subscription=f"our subscription here", with_attributes=True))
            | "Make decision" >> beam.ParDo(MakeDecision())


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...