Sumant Agnihotri
05/12/2022, 9:29 PM@task
def slicer():
time.sleep(2)
op = []
for each in range(0, random.randint(1, 10)):
op.append(each)
return op
@task
def model_slide():
logger = prefect.context.get("logger")
time.sleep(2)
<http://logger.info|logger.info>(f'model c: {datetime.now().strftime("%H:%M:%S")}')
if __name__ == "__main__":
with Flow("processing-flow", executor=LocalDaskExecutor()) as flow:
list_of_slices = slicer()
for each in list_of_slices:
create_flow_run(flow_name="other-processing-flow", project_name="poc")
model_slide(upstream_task=[create_flow_run])
I'm getting the following error:
for each in list_of_slices:
TypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
What am I doing wrong?Anna Geller
05/13/2022, 2:00 AMfor each in list_of_slices:
the Flow block is only to call tasks and define the structure.
to run multiple child flow runs in parallel, try create_flow_run.map()
instead