Hi. I'm experiencing a bug/error when running a su...
# ask-community
t
Hi. I'm experiencing a bug/error when running a subflow using
StartFlowRun
. The subflow runs succesfully but in the parent flow I get the following error:
Task 'SUB FLOW 1': Exception encountered during task execution!
Traceback (most recent call last):
File "/root/miniconda3/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/root/miniconda3/lib/python3.8/site-packages/prefect/utilities/executors.py", line 454, in run_task_with_timeout
return task.run(*args, **kwargs)  # type: ignore
File "/root/miniconda3/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 456, in method
return run_method(self, *args, **kwargs)
File "/root/miniconda3/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 440, in run
flow_run_state = client.get_flow_run_info(flow_run_id).state
File "/root/miniconda3/lib/python3.8/site-packages/prefect/client/client.py", line 1582, in get_flow_run_info
state=State.deserialize(result.serialized_state),
File "/root/miniconda3/lib/python3.8/site-packages/prefect/engine/state.py", line 390, in deserialize
state = StateSchema().load(json_blob)
File "/root/miniconda3/lib/python3.8/site-packages/marshmallow_oneofschema/one_of_schema.py", line 153, in load
raise exc
marshmallow.exceptions.ValidationError: {'_schema': 'Invalid data type: None'}
I can't replicate it if I rerun it now. Has anyone seen the same?
a
I haven’t seen this error before. But if this happens using StartFlowRun, you can try switching to
create_flow_run
. It’s described here.
t
@Anna Geller from what I can tell from https://github.com/PrefectHQ/prefect/blob/master/src/prefect/tasks/prefect/flow_run.py they do pretty much the same thing but just splitting it into to tasks. So it shouldn't be that then?
It only happen to one of my many subflows.
a
I don’t have enough information to know what may have caused that. I would reregister both child and parent flow and check if that fixes the issue. But if you wanna dive deeper into root cause analysis, you would need to provide more info: 1. do you run it on Cloud or Server? 2. what is your prefect version - can you share your “prefect diagnostics”? 3. how do you define your flow? 4. what agent do you use? 5. how did you start this flow - did it run on schedule or ad hoc when you had the issue? 6. did it happen only once and only for that flow? 7. can you share the child flow definition? do you have some parameters there that are missing default values? (this is actually the first thing I would check based on the error)
t
@Anna Geller It is not a stable error. Running the parent again didn't result in the error. But I'm still interested in finding the problem. 1: Cloud 2:
Copy code
{
  "config_overrides": {},
  "env_vars": [],
  "system_information": {
    "platform": "Linux-5.0.0-38-generic-x86_64-with-glibc2.17",
    "prefect_backend": "cloud",
    "prefect_version": "0.15.7",
    "python_version": "3.8.12"
  }
}
3 : Flow definition
Copy code
from prefect import Flow
from prefect.tasks.prefect import StartFlowRun
from prefect.run_configs import KubernetesRun

child_flow_1 = StartFlowRun(
    "child_flow_1", project_name="project", wait=True
)
child_flow_2 = StartFlowRun(
    "child_flow_2", project_name="project", wait=True
)
child_flow_3 = StartFlowRun(
    "child_flow_3", project_name="project", wait=True
)

run_config = KubernetesRun(
    cpu_limit=2.0, cpu_request=0.25, job_template_path=_job_template_path, env=env
)

with Flow(
    "Hourly Flow", run_config = run_config
) as hourly_traffic_light_flow:
    child_flow_1_ran = child_flow_1()
    child_flow_2(upstream_tasks=[child_flow_1_ran])
    child_flow_3(upstream_tasks=[child_flow_1_ran])
4: Kubernetes Agent 5: On a schedule 6: Yes, it has only happend once. 7: The subflow defintion is here. It does have a parameter but it has a default:
Copy code
with Flow(
    "child_flow_3",
) as child_flow_3:

    ids = Parameter("ids", default=None)
    func_var = get_variables_for_functions(
        imo_numbers=ids, dependent_on_date=False
    )
a
Thanks for all the answers! Your flow definition looks good. I suspect that perhaps the Prefect version of your Kubernetes agent doesn’t like the default parameter value of None in the child flow run. This would explain the error:
Copy code
marshmallow.exceptions.ValidationError: {'_schema': 'Invalid data type: None'}
What Prefect version do you use on your agent? I will ask the team if they have other ideas.
You could also try passing the parameter values explicitly in your child flow run - this could potentially mitigate the issue. It could be that parameter value couldn’t be inferred for one specific child flow run and this is why this error happened only once. Here is how to set it explicitly:
Copy code
child_flow_1_ran = child_flow_1(parameters=dict(ids=None))
I asked the team and apparently the state returned from the backend was 
None
 so there’s a deserialization error here, but we are not sure what may have caused that
Using the new flow run tasks (
create_flow_run
+
wair_for_flow_run
) might be helpful because they might raise a better error for an edge case like this, so if this happens again I would encourage you to try moving this to the new tasks
t
@Anna Geller Thank you for the answer. I'm using 0.15.10 on the agent. Ok, I will se if appears again and then make change over if that is the case 🙂
👍 1