Hi all, hoping to get some light shed on an issue we're experiencing after we upgraded our self-hosted prefect instance (from 2.10.3 to 2.10.20).
We have flows that query a db for jobs in which we then spawn subflows to perform work. Once the work is completed, the state of the job is updated.
Example pseudo code
@flow
async def subflow(job):
do_some_work(job)
update_state(job)
@flow
async def my_flow():
list_of_jobs = query db for jobs of type x
if list_of_jobs:
futures = [ subflow.with_options(flow_run_name=job["name"](job) for job in list_of_jobs ]
await asyncio.gather(*futures)
This was working fine until we upgraded then we started to experience the following error
Flow run encountered an exception. RuntimeError: The call get_task_call_return_value(task=<prefect.tasks.Task object at 0x7fb6541937f0>, flow_run_context=FlowRunContext(start_time=DateT...) is already done.
Because the subflow doesn't run, the job is left in a state where we have to manually update the state of the job in order for it to get retriggered again in the next run.
The issue doesn't happen all the time, and I've only been able to replicate the issue once locally.
Has any one experience this, or can give some details on this error?