Lukas N.
11/05/2020, 2:25 PMB --state--> C
edge everything works fine. I'm running the example on local prefect server (prefect server start
) and LocalAgent.
A --data--> B --state--v
\-------data---------> C
Here is the code:
import os
from typing import List
import prefect
from prefect import Flow, task
from prefect.engine.results import LocalResult
@task(name="A")
def task_a() -> List[int]:
return [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
@task(name="B")
def task_b(foo: List) -> None:
<http://prefect.context.logger.info|prefect.context.logger.info>(f"Task B {foo}")
@task(name="C")
def task_c(foo: List[int]) -> None:
<http://prefect.context.logger.info|prefect.context.logger.info>(f"Task C {foo}")
result_directory = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "results")
)
result = LocalResult(
dir=result_directory,
location="{flow_name}/"
"{scheduled_start_time:%d-%m_%H-%M-%S}/"
"{task_full_name}-{task_run_id}.prefect_result",
)
with Flow(name="test_flow", result=result) as flow:
a = task_a()
b = task_b(a)
c = task_c(a)
# Commenting out this line removes the state dependency and the flow works
# just fine
flow.set_dependencies(task=c, upstream_tasks=[b])
if __name__ == "__main__":
flow.register(project_name="default")
and here is the error:
5 November 2020,03:16:33 prefect.LocalResult DEBUG Starting to upload result to test_flow/05-11_14-16-28/A-6007ec12-1de1-42c0-a9db-7cc500e017e7.prefect_result...
5 November 2020,03:16:33 prefect.LocalResult DEBUG Finished uploading result to /home/some_path/results/test_flow/05-11_14-16-28/A-6007ec12-1de1-42c0-a9db-7cc500e017e7.prefect_result...
...
5 November 2020,03:16:33 prefect.CloudTaskRunner INFO Task 'C': Starting task run...
5 November 2020,03:16:33 prefect.LocalResult DEBUG Starting to read result from {flow_name}/{scheduled_start_time:%d-%m_%H-%M-%S}/{task_full_name}-{task_run_id}.prefect_result...
For some reason the location string doesn't get formatted 🤔 .nicholas
11/05/2020, 2:37 PMLukas N.
11/05/2020, 2:38 PMnicholas
11/05/2020, 10:28 PM