Thomas Hoeck
01/06/2022, 8:23 AMStartFlowRun
. The subflow runs succesfully but in the parent flow I get the following error:Thomas Hoeck
01/06/2022, 8:24 AMTask 'SUB FLOW 1': Exception encountered during task execution!
Traceback (most recent call last):
File "/root/miniconda3/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/root/miniconda3/lib/python3.8/site-packages/prefect/utilities/executors.py", line 454, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/root/miniconda3/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 456, in method
return run_method(self, *args, **kwargs)
File "/root/miniconda3/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 440, in run
flow_run_state = client.get_flow_run_info(flow_run_id).state
File "/root/miniconda3/lib/python3.8/site-packages/prefect/client/client.py", line 1582, in get_flow_run_info
state=State.deserialize(result.serialized_state),
File "/root/miniconda3/lib/python3.8/site-packages/prefect/engine/state.py", line 390, in deserialize
state = StateSchema().load(json_blob)
File "/root/miniconda3/lib/python3.8/site-packages/marshmallow_oneofschema/one_of_schema.py", line 153, in load
raise exc
marshmallow.exceptions.ValidationError: {'_schema': 'Invalid data type: None'}
Thomas Hoeck
01/06/2022, 8:26 AMAnna Geller
create_flow_run
. It’s described here.Thomas Hoeck
01/06/2022, 12:57 PMThomas Hoeck
01/06/2022, 12:58 PMAnna Geller
Thomas Hoeck
01/06/2022, 1:40 PM{
"config_overrides": {},
"env_vars": [],
"system_information": {
"platform": "Linux-5.0.0-38-generic-x86_64-with-glibc2.17",
"prefect_backend": "cloud",
"prefect_version": "0.15.7",
"python_version": "3.8.12"
}
}
3 : Flow definition
from prefect import Flow
from prefect.tasks.prefect import StartFlowRun
from prefect.run_configs import KubernetesRun
child_flow_1 = StartFlowRun(
"child_flow_1", project_name="project", wait=True
)
child_flow_2 = StartFlowRun(
"child_flow_2", project_name="project", wait=True
)
child_flow_3 = StartFlowRun(
"child_flow_3", project_name="project", wait=True
)
run_config = KubernetesRun(
cpu_limit=2.0, cpu_request=0.25, job_template_path=_job_template_path, env=env
)
with Flow(
"Hourly Flow", run_config = run_config
) as hourly_traffic_light_flow:
child_flow_1_ran = child_flow_1()
child_flow_2(upstream_tasks=[child_flow_1_ran])
child_flow_3(upstream_tasks=[child_flow_1_ran])
4: Kubernetes Agent
5: On a schedule
6: Yes, it has only happend once.
7: The subflow defintion is here. It does have a parameter but it has a default:
with Flow(
"child_flow_3",
) as child_flow_3:
ids = Parameter("ids", default=None)
func_var = get_variables_for_functions(
imo_numbers=ids, dependent_on_date=False
)
Anna Geller
marshmallow.exceptions.ValidationError: {'_schema': 'Invalid data type: None'}
What Prefect version do you use on your agent?
I will ask the team if they have other ideas.Anna Geller
child_flow_1_ran = child_flow_1(parameters=dict(ids=None))
Anna Geller
None
so there’s a deserialization error here, but we are not sure what may have caused thatAnna Geller
create_flow_run
+ wair_for_flow_run
) might be helpful because they might raise a better error for an edge case like this, so if this happens again I would encourage you to try moving this to the new tasksThomas Hoeck
01/06/2022, 2:48 PM