Hi everyone, we are building a flow where we are s...
# ask-community
v
Hi everyone, we are building a flow where we are submitting steps to an EMR cluster. There are a lot of modules to be run. each module is a step. we submit the step and then check for its status. If the step is complete without errors we proceed to submit the next step. All of the modules/steps are in a huge list…….
[module_name1, module_name2,....]
. we are using a for loop to iterate over the list and submit the steps and check the status. However prefect does not submit these steps in sequence/order of the list. How do I make sure that the order is maintained ? Below is a screenshot of the code:
a
Can you share the entire flow definition? You can’t use a for loop in the Flow block, you could rewrite it a bit to use mapping instead. Similarly, “with case” is only needed in the Flow block, in a task you can use the normal Python conditional flow with if/else statements
to ensure a specific order, you can specify dependencies between tasks as shown here
v
here’s the full flow. what we want to achieve is to iterate over the list and submit steps and check their status…then proceed to submit the next step in the list
a
thanks. The problem is the for loop. You can’t do that in the Flow block. This will be possible in Orion but not in the current Prefect. For now, you can do:
Copy code
from prefect import task

@task
def get_spe_module_list():
    return ["module_a", "module_b"]

@task
def process_spe_module(module):
    emr_cluster = spin_emr(env=env, version=version, no_workers=no_of_workers, source='SPE',
                           bucket_name=company_bucket)
    emr_cluster_id = get_cluster_id(emr_cluster)
    log_file_path = get_logfile_path(emr_cluster)


with Flow("SPE_flow", state_handlers=[flow_status_to_slack],
          storage=S3(bucket="humanyze-prefect-flows", key=f"{env}/{version}/spe_flow")) as flow:
    spe_module_list = get_spe_module_list()
    process_spe_module.map(spe_module_list)
other parameters such as version and emr_cluster_id can be passed to the map function as unmapped arguments:
Copy code
from prefect import unmapped
...
process_spe_module.map(spe_module_list, unmapped(emr_cluster_id))
v
so the our main intention is to run each step then check for its status. we will only spin up the cluster once. submit steps one by one in the list and check for their completion. if any step fails we will fail the flow and shutdown the cluster.
a
sure, as long as you move this logic to a task and you don’t use a for loop in the Flow block, you can do that in Prefect
👍 1
v
I will try to use this approach and add the check status task to process_spe_module(): will let you know how it goes. Thanks a lot.
🙌 1
Hi @Anna Geller I tried using this approach. Using map does work but still the list is not traversed sequentially. Any clue how to make the function access it sequentially
ok i figured out turning off parallelism helped.
🙌 1
Hi @Anna Geller just wondering is there a specific reason why we should not be using a for loop within the flow and instead use map? Also when we enable parallelism is it possible to submit / run the mapped tasks in sequence/order. right now if we enable parallelism we are seeing the tasks are being submitted randomly.
a
Yes. Generally, the Flow block in the current Prefect should primarily be used to construct a DAG. By definition, a DAG should be directed and acyclic, therefore loops are generally problematic. Technically, you can use a for loop if you iterate over a known and finite list of inputs which are defined explicitly in your flow (they don’t come as an input from other tasks):
Copy code
from prefect import Flow, task


@task
def add_one(x):
    res = x + 1
    print(res)
    return res


@task
def add_two(x):
    res = x + 2
    print(res)
    return res


with Flow("forloop") as flow:
    inputs = [1, 2, 3, 4, 5]
    tasks = []
    for _in in inputs:
        a = add_one(_in)
        b = add_two(a)
        tasks.append(a)
        tasks.append(b)

    # set dependencies
    for i in range(1, len(tasks)):
        tasks[i].set_upstream(tasks[i - 1])

flow.run()
Mapped tasks will run sequentially in order if you use the default sequential
LocalExecutor
. Only if you assign one of Dask executors, your mapped tasks will run in parallel.
v
Thanks….when we try to use the for loop with the Dask executor we notice that the list ex [1,2,3,4,5] 4 processes are created but they do not submit in order like process 1 - takes 1 from the list….process2 - takes 2 from the list and so on.
so if we set the upstream dependency for the tasks will it work
a
I wouldn’t use a for loop with Dask executor. If you use Dask, mapping is the recommended approach
v
but using map with dask still submits the task randomly and not in order is there a way we can define the dependency/specify to traverse the list in order
a
Yes, not using Dask 😄 Dask is for parallel execution. If you want things to run sequentially, then assign
LocalExecutor
.
perhaps @Kevin Kho knows some Dask tricks to force it to run things sequentially, but to the best of my knowledge DaskExecutor will by definition try to run mapped tasks in parallel
v
so what we want is from the list [1,2,3,4,5] process p1 takes 1 ….p2 takes 2….p3 takes 3 and so on rather than p1 processing 1….p2 processing…3 and so on
we want parallelism but still maintain the sequence of the list.
k
With the way things are set-up right now, there is no mechanism to do this. But this is very doable in Orion by submitting things with a for loop
Why do you need it sequentially if they are independent?
v
so apart from this task there maybe other tasks which we want to run in parallel. we also want to submit these modules in parallel to our emr cluster. but the order of the tasks matters. we will loose parallelism for the flow overall.
k
Ah ok I see. Yeah you probably need to use task looping if you need to enforce sequential execution. If your list is predefined and not dynamic, you can add tasks sequentially with a for loop inside the Flow block.
Copy code
@task
def add_one(x):
    res = x + 1
    print(res)
    return res


@task
def add_two(x):
    res = x + 2
    print(res)
    return res


with Flow("forloop") as flow:
    inputs = [1, 2, 3, 4, 5]
    tasks = []
    for _in in inputs:
        a = add_one(_in)
        b = add_two(a)
        tasks.append(a)
        tasks.append(b)

    # set dependencies
    for i in range(1, len(tasks)):
        tasks[i].set_upstream(tasks[i - 1])

flow.run()
👍 1
🙏 1
a
not 100% sure if this is the right approach but if there is only one specific type of task that needs to run sequentially, perhaps you could leverage task concurrency limits to force Prefect to never run more than one instance of that specific task. But note that this is a Cloud-only feature
👍 1
v
Thank you @Kevin Kho and @Anna Geller the above flow worked for us without having to give up parallelism.
👍 1