I am new-ish to GCP, Dataflow, Apache Beam, Python, and OOP in general. I come from the land of functional javascript, for context.
Right now I have a streaming pipeline built with the Apache Beam python sdk, and I deploy it to GCP's Dataflow. The pipeline's source is a pubsub subscription, and the sink is a datastore.
The pipeline picks up a message from a pubsub subscription, makes a decision based on a configuration object + the contents of the message, and then puts it in the appropriate spot in the datastore depending on what decision it makes. This is all working presently.
Now I am in a situation where the configuration object, which is currently hardcoded, needs to be more dynamic. By that I mean: instead of just hardcoding the configuration object, we are now instead going to make an API call that will return the configuration. That way, we can update the configuration without having to redeploy the pipeline. This also works presently.
But! We are anticipating heavy traffic, so it is NOT ideal to fetch the configuration for every single message that comes in. So we are moving the fetch to the beginning, right before the actual pipeline starts. But this means we immediately lose the value in having it come from an API call, because the API call only happens one time when the pipeline starts up.
Here is what we have so far (stripped out irrelevant parts for clarity):
def run(argv=None):
options = PipelineOptions(
streaming=True,
save_main_session=True
)
configuration = get_configuration() # api call to fetch config
with beam.Pipeline(options=options) as pipeline:
# read incoming messages from pubsub
incoming_messages = (
pipeline
| "Read Messages From PubSub"
>> beam.io.ReadFromPubSub(subscription=f"our subscription here", with_attributes=True))
# make a decision based off of the message + the config
decision_messages = (
incoming_messages
| "Create Decision Messages" >> beam.FlatMap(create_decision_message, configuration)
)
create_decision_message
takes in the incoming message from the stream + the configuration file and then, you guessed it, makes a decision. It is pretty simple logic. Think "if the message is apples, and the configuration says we only care about oranges, then do nothing with the message". We need to be able to update it on the fly to say "nevermind, we care about apples too now suddenly".
I need to figure out a way to let the pipeline know it needs to re-fetch that configuration file every 15 minutes. I'm not totally sure what is the best way to do that with the tools I'm using. If it were javascript, I would do something like:
(please forgive the pseudo-code, not sure if this would actually run but you get the idea)
let fetch_time = Date.now() // initialized when app starts
let expiration = 900 // 900 seconds = 15 mins
let config = getConfigFromApi() // fetch config right when app starts
function fetchConfig(now){
if (fetch_time + expiration < now) {
// if fetch_time + expiration is less than the current time, we need to re-fetch the config
config = getConfigFromApi() // assign new value to config var
fetch_time = now // assign new value to fetch_time var
}
return config
}
...
const someLaterTime = Date.now() // later in the code, within the pipeline, I need to use the config object
const validConfig = fetchConfig(someLaterTime) // i pass in the current time and get back either the memory-cached config, or a just-recently-fetched config
I'm not really sure how to translate this concept to python, and I'm not really sure if I should. Is this a reasonable thing to try to pull off? Or is this type of behavior not congruent with the stack I'm using? I'm in a position where I'm the only one on my team working on this, and it is a greenfield project, so there are no examples anywhere of how it's been done in the past. I am not sure if I should try to figure this out, or if I should say "sorry bossman, we need another solution".
Any help is appreciated, no matter how small... thank you!
question from:
https://stackoverflow.com/questions/65892803/gcp-dataflow-apache-beam-caching-question