Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
152 views
in Technique[技术] by (71.8m points)

python - Modify Dask task graph based on previous task results

I have two tasks; one slower and one faster. These tasks are run iteratively on their respective data. I occasionally perform a check that depends on the faster task's output, and under some circumstances, the check tells me to swap data from the two sources.

What I'm trying to do is to get this workflow running without unnecessary blocking while waiting for the slow task. The first way I think of to accomplish this would be for the underlying DAG that Dask is executing to depend on the output of one of the tasks. If the check task says that I don't need to swap, then I can immediately run the next fast_data_task. The idea is illustrated in these DAGs: the black DAG is the blocking version, the red DAG is non-blocking if we assume that check says not to swap data, and the blue DAG is non-blocking if we assume that check says we should swap data. Is there a way in Dask to modify the task dependencies based on the results of previous tasks? (Or is there some other mechanism to achieve the same effect?)

Three DAGs

Another possible approach might be if I could actually send the Future to the worker; i.e., send a reference to the result as opposed to the result itself. In that case, the update tasks could depend on the boolean result of check and the two "future-references." These "future-references" would never be blocking (they're just used to give the correct input/output of the task graph; the actual contents of the result are not used in the update tasks). The Future returned by the update tasks would just be a reference to whichever is correct. However, I don't know if such a thing is possible with Dask.

Below is an example code that works with blocking (the black DAG). I'm hoping it can be modified to give the same results, but not block at the "update" stages while waiting for the slow_data_task to complete. (Also, I'd like to avoid blocking later runs of other_task -- this swapping is just part of a much larger task graph!)

import distributed
import time
import numpy as np

def slow_data_task(input_data):
    time.sleep(5)
    return input_data

def fast_data_task(input_data):
    return input_data

def other_task(input_data):
    return input_data

def check(fast_data):
    do_mixing = np.random.choice([True, False])
    return fast_data, do_mixing

def update_fast_data(check_1_result, slow_data):
    fast_data, needed = check_1_result
    if needed:
        fast_data = slow_data
    return fast_data

def update_slow_data(check_1_result, slow_data):
    fast_data, needed = check_1_result
    if needed:
        slow_data = fast_data
    return slow_data

client = distributed.Client()

fast_data = 'foo'
slow_data = 'bar'
other_data = 'qux'
n_iterations = 2
for iteration in range(n_iterations):
    slow_data = client.submit(slow_data_task, slow_data)
    fast_data = client.submit(fast_data_task, fast_data)
    other_data = client.submit(other_task, other_data)
    check_1_result = client.submit(check, fast_data)
    new_fast_data = client.submit(update_fast_data, check_1_result, slow_data)
    new_slow_data = client.submit(update_slow_data, check_1_result, slow_data)
    fast_data = new_fast_data
    slow_data = new_slow_data

More details

I've come up with one approach that works for this toy system, but breaks down in my real use case. This would be to change the output of the tasks to be (data, task_type), where task_type in ['fast', 'slow'], and to replace the fast_data_task and slow_data_task with data_task, which internally behaves differently depending on the value of task_type. With only 1 swap coupling these, that would work. But in the real system, there may be coupling between many different data sources (fast, slow, faster, slower), and this approach would make it impossible to know which would be the other data in the swap. (The check task depends on the name of the other data source, i.e., 'slow', even though it doesn't depend on the specific data.)

question from:https://stackoverflow.com/questions/65888058/modify-dask-task-graph-based-on-previous-task-results

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...