Hey All,
Apologies if this is the wrong channel for this. We've come into a bit of an issue with creating a dynamic flow in Prefect 1.0, see below:
Copy code
with Flow("tax_assessor_inc") as flow:
raw_s3_keys = Parameter("raw_s3_keys")
flow_tasks = []
for s3_key in raw_s3_keys:
_clean = clean([s3_key])
_load_data = load_data(_clean)
_test_raw_data = test_raw_data()
_transform_task = transform_task()
flow_tasks.append([_clean, _load_data, _test_raw_data, _transform_task])
reduced_flow_tasks = sum(flow_tasks, [])
for i in range(len(reduced_flow_tasks) - 1):
reduced_flow_tasks[i].set_downstream(reduced_flow_tasks[i + 1])
The issue arises when trying to iterate over the
raw_s3_keys
parameter as it's not iterable. It works when hardcoding the s3 paths but obviously that's not what we want, as it would great to dynamically define that list. Also annoyingly each task has to occur synchronously otherwise we'd use something like prefect map.
Firstly is it possible to iterate the output of a parameter object. If not is there a better way of solving this problem.
Sorry for the long winded message, Thank you 🙂
a
Anna Geller
05/24/2022, 12:02 PM
You can't have a for loop in Prefect 1.0, you would need to leverage mapping
Anna Geller
05/24/2022, 12:03 PM
You could do that in Prefect 2.0 though
Anna Geller
05/24/2022, 12:04 PM
And thanks for the great write up
e
Ed Burroughes
05/24/2022, 12:09 PM
ahh dam, yeah saw prefect 2.0 has what we want unfortunately we're not quite ready to upgrade. Can you use mapping synchronously my experience with it is that it creates parallel tasks?
Ed Burroughes
05/24/2022, 12:10 PM
thanks for the fast response
a
Anna Geller
05/24/2022, 12:11 PM
what do you mean by synchronously - one task after the other rather than in parallel?
Anna Geller
05/24/2022, 12:12 PM
if so, this is actually the default behavior in Prefect 1.0 with mapping because it uses by default the LocalExecutor
Anna Geller
05/24/2022, 12:13 PM
only if you attach LocalDaskExecutor or DaskExecutor it will run things in parallel
e
Ed Burroughes
05/24/2022, 12:14 PM
Exactly, ahh so it needs to be the local executor gotcha. Thank you 🙂
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.