I have a strange error on a task, any one seen the...
# prefect-community
p
I have a strange error on a task, any one seen these before?
Copy code
Task 'copy_from_s3_to_sftp': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.9/dist-packages/prefect/utilities/executors.py", line 454, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "flows/k8s/my_flow_name.py", line 46, in copy_from_s3_to_sftp
SystemError: unknown opcode
k
Are you using the DaskExecutor?
p
no, python task running in docker agent
k
I believe this is because you registered a flow with a certain version of a package (boto3 or something) and then during execution it was deserialized on another version and the version mismatch causes this issue. This happens frequently on Dask when there is a mismatch of scheduler package versions and worker package versions
p
ok, thank you, let me check those versions
I am using python 3.8 (while registering) and 3.9 (while executing), for this I need to create a flow of flows, here how do I execute the 2nd flow only if the first flow is successful
this example here is clear - https://docs.prefect.io/core/idioms/flow-to-flow.html however I don't know how I can execute flow B only if flow is successfully completed without any errors
k
Something like this:
Copy code
with Flow(...) as flow:
    a = create_flow_run(..)
    b = wait_for_flow_run(a, raise_final_state=True)
    c = create_flow_run(..., upstream_tasks=[b])
raise_final_state being the important part so failure gets raised and then propagates
👍 1
p
@Kevin Kho in a flow can I mix flows and tasks ? e.g in Flow B I want to call Flow A and then run Task B ?
Copy code
with Flow("priority_report", storage=storage, schedule=schedule) as f:
    flow_a = create_flow_run()
    wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)
    copy_from_s3_to_sftp(upstream_tasks=[wait_for_flow_a])
k
Do you mean you have Flow A which can have
task_one
,
task_two
,
task_three
but you only want
task_two
? Or do you mean a parent Flow that calls one sub flow and one task? Because what you have looks good to me
p
the second one
parent Flow that calls one sub flow and one task
, strangely I am getting this error
Copy code
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/prefect/agent/agent.py", line 391, in _deploy_flow_run
    deployment_info = self.deploy_flow(flow_run)
  File "/usr/local/lib/python3.8/dist-packages/prefect/agent/local/agent.py", line 133, in deploy_flow
    env = self.populate_env_vars(flow_run, run_config=run_config)
  File "/usr/local/lib/python3.8/dist-packages/prefect/agent/local/agent.py", line 190, in populate_env_vars
    else os.getcwd()
FileNotFoundError: [Errno 2] No such file or directory
k
This is on the create_flow_run right? It’s an error on the subflow?
That looks like a storage error on the subflow
Does the subflow work if you run it individually?
p
the subflow runs on its own with out any issues
k
I think I’d need to see the full flow of subflow and mainflow to understand this better. Just remove sensitive info
p
some issue with the agent, after restarting the agent the flow works, thanks again for your help
👍 1