Hey All, Apologies if this is the wrong channel fo...
# prefect-community
e
Hey All, Apologies if this is the wrong channel for this. We've come into a bit of an issue with creating a dynamic flow in Prefect 1.0, see below:
Copy code
with Flow("tax_assessor_inc") as flow:
    
    raw_s3_keys = Parameter("raw_s3_keys")
    
    flow_tasks = []
    for s3_key in raw_s3_keys:
        _clean = clean([s3_key])
        _load_data = load_data(_clean)
        _test_raw_data = test_raw_data()
        _transform_task = transform_task()
        flow_tasks.append([_clean, _load_data, _test_raw_data, _transform_task])

    reduced_flow_tasks = sum(flow_tasks, [])
    for i in range(len(reduced_flow_tasks) - 1):
        reduced_flow_tasks[i].set_downstream(reduced_flow_tasks[i + 1])
The issue arises when trying to iterate over the
raw_s3_keys
parameter as it's not iterable. It works when hardcoding the s3 paths but obviously that's not what we want, as it would great to dynamically define that list. Also annoyingly each task has to occur synchronously otherwise we'd use something like prefect map. Firstly is it possible to iterate the output of a parameter object. If not is there a better way of solving this problem. Sorry for the long winded message, Thank you 🙂
a
You can't have a for loop in Prefect 1.0, you would need to leverage mapping
You could do that in Prefect 2.0 though
And thanks for the great write up
e
ahh dam, yeah saw prefect 2.0 has what we want unfortunately we're not quite ready to upgrade. Can you use mapping synchronously my experience with it is that it creates parallel tasks?
thanks for the fast response
a
what do you mean by synchronously - one task after the other rather than in parallel?
if so, this is actually the default behavior in Prefect 1.0 with mapping because it uses by default the LocalExecutor
only if you attach LocalDaskExecutor or DaskExecutor it will run things in parallel
e
Exactly, ahh so it needs to be the local executor gotcha. Thank you 🙂
👍 1