Bastian Röhrig
10/05/2021, 8:03 AMBastian Röhrig
10/05/2021, 8:03 AMcreate_flow_run
to run a child flow which has its reference_tasks set via set_reference_tasks
. When this task is run, it produces the following output:
Task 'create_flow_run[0]': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 859, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/opt/hostedtoolcache/Python/3.8.12/x64/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 118, in create_flow_run
File "/usr/local/lib/python3.8/site-packages/prefect/backend/flow.py", line 204, in from_flow_name
return cls._from_flow_data(flow)
File "/usr/local/lib/python3.8/site-packages/prefect/backend/flow.py", line 71, in _from_flow_data
deserialized_flow = FlowSchema().load(data=flow_data["serialized_flow"])
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/serialization.py", line 141, in load
return super().load(data, **kwargs)
File "/usr/local/lib/python3.8/site-packages/marshmallow/schema.py", line 719, in load
return self._do_load(
File "/usr/local/lib/python3.8/site-packages/marshmallow/schema.py", line 889, in _do_load
result = self._invoke_load_processors(
File "/usr/local/lib/python3.8/site-packages/marshmallow/schema.py", line 1087, in _invoke_load_processors
data = self._invoke_processors(
File "/usr/local/lib/python3.8/site-packages/marshmallow/schema.py", line 1219, in _invoke_processors
data = processor(data, many=many, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/serialization/flow.py", line 114, in create_object
flow = super().create_object(data)
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/serialization.py", line 164, in create_object
return object_class(**init_data)
File "/usr/local/lib/python3.8/site-packages/prefect/core/flow.py", line 203, in __init__
self.set_reference_tasks(reference_tasks or [])
File "/usr/local/lib/python3.8/site-packages/prefect/core/flow.py", line 485, in set_reference_tasks
raise ValueError("reference tasks must be part of the flow.")
ValueError: reference tasks must be part of the flow.
I suspect it might be connected to this issue: https://github.com/PrefectHQ/prefect/issues/4957.Anna Geller
flow.set_reference_tasks([task])
Bastian Röhrig
10/05/2021, 12:37 PMfrom prefect import Flow
from prefect.tasks.prefect import create_flow_run
with Flow("parent_in_minimal_ex") as flow:
children_run_ids = create_flow_run(
flow_name="minimal_ex_for_slack",
)
and child:
import prefect
from prefect import Flow
from prefect.utilities.tasks import task
@task
def wat():
logger = prefect.context["logger"]
<http://logger.info|logger.info>("wat")
with Flow("minimal_ex_for_slack") as flow:
foo = wat()
flow.set_reference_tasks([foo])
Anna Geller
from prefect import Flow
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
with Flow("minimal_ex_for_slack") as flow:
child_flow_run_id = create_flow_run(flow_name="minimal_ex_for_slack", project_name="YOUR_PROJECT")
child_flow_run_id_wait = wait_for_flow_run
flow.set_reference_tasks([child_flow_run_id])
Anna Geller
Bastian Röhrig
10/05/2021, 2:25 PMKevin Kho
Kevin Kho
Kevin Kho
create_flow_run
. The issue is hereBastian Röhrig
10/07/2021, 6:48 AMKevin Kho