@flow(task_runner=DaskTaskRunner())
def myflow():
download_data = download_task() # return data in batches
adapted_data = adapt_task.map(download_data) # adapt each batch data
create_json = create_json_task.map(adapted_data) # create json from each batch
merged_data = merge_task(create_json) # merge all batches into one file
all tasks are working fine but when it get to merge task it gives me this error:
prefect.exceptions.UnfinishedRun: Run is in PENDING state, its result is not available.
and when I debug it it doesn’t enter the merge task
have anyone encountered something similar or have any suggestions?
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.