Jeffrey Lam
07/21/2023, 9:35 PM@flow
async def subflow(job):
do_some_work(job)
update_state(job)
@flow
async def my_flow():
list_of_jobs = query db for jobs of type x
if list_of_jobs:
futures = [ subflow.with_options(flow_run_name=job["name"](job) for job in list_of_jobs ]
await asyncio.gather(*futures)
This was working fine until we upgraded then we started to experience the following error
Flow run encountered an exception. RuntimeError: The call get_task_call_return_value(task=<prefect.tasks.Task object at 0x7fb6541937f0>, flow_run_context=FlowRunContext(start_time=DateT...) is already done.
Because the subflow doesn't run, the job is left in a state where we have to manually update the state of the job in order for it to get retriggered again in the next run.
The issue doesn't happen all the time, and I've only been able to replicate the issue once locally.
Has any one experience this, or can give some details on this error?Taylor Curran
08/24/2023, 2:00 PMprefect version
Jeffrey Lam
08/24/2023, 2:26 PMTaylor Curran
08/24/2023, 2:33 PMprefect version
?Jeffrey Lam
08/24/2023, 3:33 PMVersion: 2.10.13
API version: 0.8.4
Python version: 3.10.12
Git commit: 179edeac
Built: Thu, Jun 8, 2023 4:10 PM
OS/Arch: linux/x86_64
Profile: default
Server type: ephemeral
Server:
Database: postgresql