https://prefect.io logo
Title
c

Chris Goddard

09/06/2022, 11:45 PM
hi there! does anyone have any good examples (beyond what’s in the docs) for building async flows and sub-flows? my use case is that I’m pulling data from an API with 2000 results at a time and I can only do 1 request per 30 seconds, so I want to use this waiting time to process the results asynchronously in a sub-flow right now I’m using
loop.create_task
to try to schedule the sub-flow but I’m getting an unknown error
Crash detected! Execution was interrupted by an unexpected exception
- I feel like I’m missing something obvious. (I haven’t done much in asyncio before so it’s very possible)
1
j

John Girvan

09/07/2022, 12:08 AM
wouldn’t it be easier to use a task in place of your sub-flow and use the map functions to fan out the requests? https://docs-v1.prefect.io/orchestration/getting-started/flow-configs.html#configure-environment-variables
c

Chris Goddard

09/07/2022, 12:10 AM
this is a 2.0 flow - it’s possible that my use of sub-flows is the problem - just trying to understand how async works in this context
j

John Girvan

09/07/2022, 12:13 AM
having not done any v2, I think this shows how to do it for v2 https://docs.prefect.io/tutorials/execution/
c

Chris Goddard

09/07/2022, 12:14 AM
no worries - unfortunately the async examples aren’t comprehensive yet
n

Nate

09/07/2022, 2:28 AM
@Chris Goddard depending on your situation, it could make sense to use
OrionClient.create_flow_run_from_deployment
to send your results as parameters to a different deployment's flow runs for processing the orchestrator-worker (2 deployments) pattern is one that's worked for me while support for async subflows (and subflows with independent infra) is coming along, at some point in the future when we can call async subflows more natively, I'll just swap the
create_flow_run_from_deployment
in the parent flow for a direct call to my subflow
c

Chris Goddard

09/08/2022, 4:44 PM
Unfortunately the parent flow is passing a fair amount of data through to the sub flow so I'm not sure if this would work. I think it's partly figuring out async fundamental. There were definitely issues of uncaught exceptions and things failing silently. It would be really awesome if you guys could publish some more examples of async flows - I see that there are async utilities within prefect.utilities but it's not always clear how those can/should be used.
n

Nate

09/08/2022, 4:50 PM
Thanks for the feedback @Chris Goddard , we'll try to get some more async examples together. As for the problem of passing data to subflows, one thing that's solved that problem for me is writing to blob storage from the parent and either passing the blob path to a worker flow for retrieval / processing or even better, triggering the worker flow on the event of the data landing in blob storage
c

Chris Goddard

09/08/2022, 5:17 PM
yeah that’s a good point - I might look at that if I’m still running into issues
one note on the async examples - something that would be helpful is now that there is first-class support for sub flows as well as async, it can be a little less intuitive to understand what exactly should be a task vs a flow - and how prefect modifies the operation of async functions. for instance, it seems like you can’t run a sub-flow via asyncio.create_task - but I can’t quite understand why. I was able to get it working with anyio and task groups as long as I did a copy.deepcopy of the flow object first. anyway - anything you guys could add would be great
n

Nate

09/08/2022, 5:32 PM
This pain point is on the radar here, there are a couple workarounds, one being the copy you mentioned. We're in the process of improving the interface for async subflows, but again the feedback is very much appreciated so thank you!