aviv
03/18/2024, 9:38 AM@flow(task_runner=DaskTaskRunner())
def myflow():
download_data = download_task() # return data in batches
adapted_data = adapt_task.map(download_data) # adapt each batch data
create_json = create_json_task.map(adapted_data) # create json from each batch
merged_data = merge_task(create_json) # merge all batches into one file
all tasks are working fine but when it get to merge task it gives me this error:
prefect.exceptions.UnfinishedRun: Run is in PENDING state, its result is not available.
and when I debug it it doesn’t enter the merge task
have anyone encountered something similar or have any suggestions?Richard Sundqvist
03/18/2024, 10:24 AMcreate_json_task.map
returns futures, same as submit
. You probably need result()
on the futures.aviv
03/18/2024, 11:45 AM