https://prefect.io logo
Title
j

Josh Paulin

09/07/2022, 3:18 PM
Seeing some strange behaviour with an async task I’ve got. Running inside an async flow nand trying to await the task, the logs show it as executing immediately but it’s never actually run. If I drop the
@task
decorator and run it like a normal function, it works as expected. Stepping into the engine code I can see it shows a
final_state
of
NotReady
here.
👀 1
m

Mason Menges

09/07/2022, 3:50 PM
Hey @Josh Paulin Would you be able to share a simplified version the code you're running here?
j

Josh Paulin

09/07/2022, 4:02 PM
Would you like the flow, the task, both?
m

Mason Menges

09/07/2022, 4:05 PM
Both if you don't mind 😄
j

Josh Paulin

09/07/2022, 4:08 PM
Problematic Task
All the task runs that occur before the problem one
await
fine as far as I can tell when stepping through
m

Mason Menges

09/07/2022, 4:44 PM
Hmm running a more simplified version of the flow seems to work fine so I don't think it's an issue with how the flow itself is setup, for the sake of argument if you run a simplified version of the task does the flow run to completion? i.e. maybe just return a dict with default values skipping the other steps in the task, assuming it does you could probably start narrowing down whats occurring by adding the other steps gradually back.
j

Josh Paulin

09/07/2022, 4:59 PM
Ok. So trying to simplify it a bit, I’ve narrowed things down to being that if I’m passing variable from the comprehension to the task it doesn’t run i.e. if I replace line 28-30 with
results["alerted"] += [
            await alerts.send_alert(None, alert_type, env) for run in failed_runs
        ]
The task executes when expected
m

Mason Menges

09/07/2022, 7:40 PM
Passing a variable from an iterable in the comprehension shouldn't be a syntax issue so it's more likely an issue with what it's iterating, So I'm not 100% sure what might be causing it, since it is an async function maybe it's related to the failed_runs variable since that's technically being overwritten in the for loop maybe try running lines 23-30 outside of the for loop with a specific alert type and see if that completes successfully. Asynchronous functions aren't my forte so it's definitely possible I'm way out in left field haha.
j

Josh Paulin

09/07/2022, 8:12 PM
No change. I tried: 1. Removing the alert type loop 2. Rewriting the comprehension as a for loop 3. Directly indexing the
failed_runs
array All yield the same results.
Any additional guidance for this one?
m

Mason Menges

09/08/2022, 4:07 PM
Hey Josh what specifically is this function returning/performing?
failed_runs = await orion_api.get_flow_runs_since(
            date_time=start_time,
            tags=[alert_type.value, env.value],
            states=[StateType.FAILED, StateType.CRASHED],
        )
j

Josh Paulin

09/08/2022, 4:29 PM
It’s just a wrapper around
read_flow_runs
from the orion api