Ed Burroughes
05/24/2022, 11:59 AMwith Flow("tax_assessor_inc") as flow:
raw_s3_keys = Parameter("raw_s3_keys")
flow_tasks = []
for s3_key in raw_s3_keys:
_clean = clean([s3_key])
_load_data = load_data(_clean)
_test_raw_data = test_raw_data()
_transform_task = transform_task()
flow_tasks.append([_clean, _load_data, _test_raw_data, _transform_task])
reduced_flow_tasks = sum(flow_tasks, [])
for i in range(len(reduced_flow_tasks) - 1):
reduced_flow_tasks[i].set_downstream(reduced_flow_tasks[i + 1])
The issue arises when trying to iterate over the raw_s3_keys
parameter as it's not iterable. It works when hardcoding the s3 paths but obviously that's not what we want, as it would great to dynamically define that list. Also annoyingly each task has to occur synchronously otherwise we'd use something like prefect map.
Firstly is it possible to iterate the output of a parameter object. If not is there a better way of solving this problem.
Sorry for the long winded message, Thank you 🙂Anna Geller
Ed Burroughes
05/24/2022, 12:09 PMAnna Geller
Ed Burroughes
05/24/2022, 12:14 PM