<@U02GMEZU18B>, hi. I have a problem and I would l...
# ask-marvin
m
@Nate, hi. I have a problem and I would like to know if you can help me. I need a task to run as soon as a previous task is completed. To do this, I'm using return_state and a for loop to look for the "is_completed" state. But this loop causes the task, in the example, the "load" task, to only run after all previous tasks, in the example "transform", have finished. In other words, the task runs serially and not in parallel. And according to the image, in the Prefect layout, you can see that one of the loads is not being connected correctly, the load-with-test is connected to the transf-with-ball incorrectly. I took a look at the documentation: https://docs.prefect.io/latest/concepts/states/?h=#create-and-use-hooks but I couldn't correctly implement on_completion referring to a function that is a task . Can you help me?
Copy code
from prefect import task, flow, serve, unmapped
from prefect.runtime import task_run
import time

def generate_task_name():
    task_name = task_run.task_name
    parameters = task_run.parameters
    name = parameters["name"]

    return f"{task_name}-with-{name}"

@task(name='ext', log_prints=True,
    task_run_name=generate_task_name)
def extract_data(region, name):
    return ([1, 2, 3, 4], region)

@task(name='transf', log_prints=True,
    task_run_name=generate_task_name)
def transform_data(data, name):
    region = data[1]
    output = [i * 2 for i in data[0]]
    time.sleep(3)
    return (output, region)

@task(name='load', log_prints=True,
    task_run_name=generate_task_name)
def load_data(data, name):
    region = data
    print(region)

@flow
def map_test():
    region = ['sandbox']
    extract_test = extract_data.map(region, unmapped('test'))
    extract_ball = extract_data.map(region, unmapped('ball'))
    transform_test = transform_data.map(extract_test, unmapped('test'), return_state=unmapped(True))
    transform_ball = transform_data.map(extract_ball, unmapped('ball'), return_state=unmapped(True))
    for t in transform_test:
        if t.is_completed():
            load_data(t.result()[1], 'test')
    for t in transform_ball:
        if t.is_completed():
            load_data(t.result()[1], 'ball')

if __name__ == "__main__":
    map_deploy = map_test.to_deployment(name="map_test")
    serve(map_deploy)
n
sorry on mobile so will be more terse than normal
do you know about wait_for?
wait_for is an extra kwarg that map / submit accept which is a list of task results or futures that will create state dependencies between the upstream and downstream tasks - no polling necessary