itay livni
task_run_id
from prefect import task, Flow from prefect.engine.results import LocalResult lcl_res = LocalResult(dir="~/prefect_guide/results/{flow_name}") @task(target="{task_name}/{task_run_id}", ) def return_list(): return [1, 2, 3] @task(target="{task_name}/{map_index}.prefect") def mapped_task(x): return x + 1 with Flow("blah", result=lcl_res) as flow: mapped_task.map(return_list) st = flow.run() flow.visualize(flow_state=st)
(py37moc) ilivni@ilivni-UX430UAR:~/prefect_guide$ /home/ilivni/miniconda3/envs/py37moc/bin/python /home/ilivni/prefect_guide/tst_map.py 0.11.3 [2020-05-27 18:56:17] INFO - prefect.FlowRunner | Beginning Flow run for 'blah' [2020-05-27 18:56:17] INFO - prefect.FlowRunner | Starting flow run. [2020-05-27 18:56:17] INFO - prefect.TaskRunner | Task 'return_list': Starting task run... [2020-05-27 18:56:17] ERROR - prefect.TaskRunner | Unexpected error: KeyError('task_run_id') Traceback (most recent call last): File "/miniconda3/envs/py37moc/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner new_state = method(self, state, *args, **kwargs) File "/miniconda3/envs/py37moc/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 660, in check_target if result.exists(target, **prefect.context): File "/home/ilivni/miniconda3/envs/py37moc/lib/python3.7/site-packages/prefect/engine/results/local_result.py", line 123, in exists return os.path.exists(os.path.join(self.dir, location.format(**kwargs))) KeyError: 'task_run_id'
nicholas
Chris White
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.