Hey everyone, I'm having a weird issue that looks ...
# ask-community
b
Hey everyone, I'm having a weird issue that looks like a bug to me with child flows and custom reference tasks. More in thread.
I am using
create_flow_run
to run a child flow which has its reference_tasks set via
set_reference_tasks
. When this task is run, it produces the following output:
Copy code
Task 'create_flow_run[0]': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 859, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/opt/hostedtoolcache/Python/3.8.12/x64/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 118, in create_flow_run
  File "/usr/local/lib/python3.8/site-packages/prefect/backend/flow.py", line 204, in from_flow_name
    return cls._from_flow_data(flow)
  File "/usr/local/lib/python3.8/site-packages/prefect/backend/flow.py", line 71, in _from_flow_data
    deserialized_flow = FlowSchema().load(data=flow_data["serialized_flow"])
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/serialization.py", line 141, in load
    return super().load(data, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/marshmallow/schema.py", line 719, in load
    return self._do_load(
  File "/usr/local/lib/python3.8/site-packages/marshmallow/schema.py", line 889, in _do_load
    result = self._invoke_load_processors(
  File "/usr/local/lib/python3.8/site-packages/marshmallow/schema.py", line 1087, in _invoke_load_processors
    data = self._invoke_processors(
  File "/usr/local/lib/python3.8/site-packages/marshmallow/schema.py", line 1219, in _invoke_processors
    data = processor(data, many=many, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/serialization/flow.py", line 114, in create_object
    flow = super().create_object(data)
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/serialization.py", line 164, in create_object
    return object_class(**init_data)
  File "/usr/local/lib/python3.8/site-packages/prefect/core/flow.py", line 203, in __init__
    self.set_reference_tasks(reference_tasks or [])
  File "/usr/local/lib/python3.8/site-packages/prefect/core/flow.py", line 485, in set_reference_tasks
    raise ValueError("reference tasks must be part of the flow.")
ValueError: reference tasks must be part of the flow.
I suspect it might be connected to this issue: https://github.com/PrefectHQ/prefect/issues/4957.
a
Could you share your Flow code? I was trying to reproduce the bug, but it seemed to work for me using
flow.set_reference_tasks([task])
b
I have a minimal example below that fails for me, however it only fails when I execute it on prefect-cloud with a docker-agent using azure blob storage to store the flows. When I run it with a local prefect backend and with a local agent it works fine. I use 0.15.5 as core version.
Copy code
from prefect import Flow
from prefect.tasks.prefect import create_flow_run

with Flow("parent_in_minimal_ex") as flow:
    children_run_ids = create_flow_run(
        flow_name="minimal_ex_for_slack",
    )
and child:
Copy code
import prefect
from prefect import Flow
from prefect.utilities.tasks import task


@task
def wat():
    logger = prefect.context["logger"]
    <http://logger.info|logger.info>("wat")


with Flow("minimal_ex_for_slack") as flow:
    foo = wat()

flow.set_reference_tasks([foo])
a
I would definitely check if both, child and parent Flow, are registered to Cloud. Then, to be more explicit, you can pass the project name to the create_flow_run() as follows:
Copy code
from prefect import Flow
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run


with Flow("minimal_ex_for_slack") as flow:
    child_flow_run_id = create_flow_run(flow_name="minimal_ex_for_slack", project_name="YOUR_PROJECT")
    child_flow_run_id_wait = wait_for_flow_run
    flow.set_reference_tasks([child_flow_run_id])
Based on the logs you provided, it seems that the reference task that you set is defined in a different flow, which won't work.
b
Thank you. I am pretty confident that both flows are correctly registered to the same project in Cloud, the issue first appeared in our real flows which worked before we added the custom reference task (and are working again after removing the custom reference task). If the reference task was defined in a different flow, the example I posted above should fail both on cloud / docker agent and local server / local agent. However it works on local server / local agent. Looking at the stack trace, it looks like the error occurs in code related to serialization. If there was an issue there that could potentially explain the different behaviour on cloud / local.
k
I’ll give this a shot in a bit
I can replicate. Will look a bit more but will likely open a bug report
I replicated without even using the
create_flow_run
. The issue is here
upvote 2
b
Btw: Thank both of you for the great support. When my team was recently debating choosing prefect or airflow, this slack channel was a huge plus for prefect. We really appreciate it.
🎉 2
k
Thank you 🙂