itay livni
05/27/2020, 6:55 PMtask_run_id
as a target is a bug using 0.11.3
from prefect import task, Flow
from prefect.engine.results import LocalResult
lcl_res = LocalResult(dir="~/prefect_guide/results/{flow_name}")
@task(target="{task_name}/{task_run_id}",
)
def return_list():
return [1, 2, 3]
@task(target="{task_name}/{map_index}.prefect")
def mapped_task(x):
return x + 1
with Flow("blah", result=lcl_res) as flow:
mapped_task.map(return_list)
st = flow.run()
flow.visualize(flow_state=st)
itay livni
05/27/2020, 6:57 PM(py37moc) ilivni@ilivni-UX430UAR:~/prefect_guide$ /home/ilivni/miniconda3/envs/py37moc/bin/python /home/ilivni/prefect_guide/tst_map.py
0.11.3
[2020-05-27 18:56:17] INFO - prefect.FlowRunner | Beginning Flow run for 'blah'
[2020-05-27 18:56:17] INFO - prefect.FlowRunner | Starting flow run.
[2020-05-27 18:56:17] INFO - prefect.TaskRunner | Task 'return_list': Starting task run...
[2020-05-27 18:56:17] ERROR - prefect.TaskRunner | Unexpected error: KeyError('task_run_id')
Traceback (most recent call last):
File "/miniconda3/envs/py37moc/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/miniconda3/envs/py37moc/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 660, in check_target
if result.exists(target, **prefect.context):
File "/home/ilivni/miniconda3/envs/py37moc/lib/python3.7/site-packages/prefect/engine/results/local_result.py", line 123, in exists
return os.path.exists(os.path.join(self.dir, location.format(**kwargs)))
KeyError: 'task_run_id'
nicholas
itay livni
05/27/2020, 6:59 PMnicholas
Chris White
task_run_id
in Core-only so that’s the “why”; we should find a way to not have a discrepancy between core / cloud hereitay livni
05/27/2020, 7:13 PMChris White