https://prefect.io logo
Title
j

Jacques

04/17/2020, 2:34 PM
I was wondering if there was a good pattern to trigger a second flow (using the next_page token as a parameter) from a task?
j

Jeremiah

04/17/2020, 2:36 PM
Hey @Jacques - for paged API’s we have a LOOP operator. You could use it to collect all your data in one task, then return the list of results, and use a
map
operator to process them in parallel.
j

Jacques

04/17/2020, 2:36 PM
I've implemented that, but as it turns out, that means we can't start transforming until we've fetched all the pages.
Another thing I've noticed is that if each page is 10M, and we have a 100 pages it causes some pretty spikey memory and cpu usage, as you got from 100% I/O to 100% cpu.
Also our load mechanism doesn't like loading a ton of data, but I could fix that by just mapping the results into the load function I think.
j

Jeremiah

04/17/2020, 2:40 PM
That’s true - if its possible to get a stable API URL for each page, then you could have a single task generate all of the URLs (or URL parameters), and then return that list. Then you could have a task that loads the data from that URL in a map, followed by another task that processes the item retrieved by the previous task. At the moment, Prefect maps are breadth-first but we will be releasing a depth-first enhancement very soon. That would look something like:
with Flow("test"):
  all_urls = retrieve_all_urls()
  load_result = load_data.map(all_urls)
  transform_data.map(load_result)
👍 1
If the API isn’t stable (for example, you must paginate within a session or pass a “next” cursor), then I think you may have to do at least some of the collection in a single task
j

Jacques

04/17/2020, 2:42 PM
Unfortunately the API is unstable.
I was thinking about doing something like implementing a message queue to trigger a new flow from a task, but it's going to be a bit dirty I think...
j

Jeremiah

04/17/2020, 2:43 PM
Ah ok. In that case, back to your original question, if you are using Prefect Cloud or the local server you could register another flow to process data and then kick it off with the GraphQL API or the
prefect.client.Client.create_flow_run()
method, passing your payload to the flow as a parameter. We don’t usually encourage this as a first cut because you lose a little visibility since your work is spread across multiple flows, but it may solve the issue and sometimes thats more important 🙂
Here are some docs on various ways to kick off flow runs, if it helps! https://docs.prefect.io/orchestration/concepts/flow_runs.html#creating-a-flow-run
j

Jacques

04/17/2020, 2:46 PM
Currently we are just using core, second option there makes sense, actually just trigger the second flow from task directly
Thanks!
j

Jeremiah

04/17/2020, 2:46 PM
:yes: give it a shot in the local server or with the free tier of Cloud and let us know!
j

Jacques

04/17/2020, 2:46 PM
Just thinking about this it's almost like we want something like a "meta flow", something that can link flows together.
👍 1
Cool will do
Btw, we are using a combination of fargate and lambda (for small jobs) at the moment.
j

Jeremiah

04/17/2020, 2:47 PM
Yeah, this is something we grapple with a lot - the use cases are clear but the complexity it introduces can be enormous, so we are working to find an appropriate API
Excellent, glad the lambda kick off model is working
j

Jacques

04/17/2020, 2:48 PM
Nice, hope you figure it out, if you can do it as cleanly as the rest it would amazing.
j

Jeremiah

04/17/2020, 2:48 PM
Ha, thanks!
j

Jacques

04/17/2020, 2:49 PM
Yeah I've been experimenting with running the dask local executor in lambda and just run it sort of self contained so we can easily move bits from lambda to fargate as they grow - you have any comments about using dask local in prod?
j

Jeremiah

04/17/2020, 2:50 PM
Only that we think dask is awesome and if you run into any issues at all we’d love to hear about it so we can improve the experience
j

Jacques

04/17/2020, 2:50 PM
Great, of no obvious problems lurking I'll crack on and let you know if it explodes!
👍 1
Thanks so much, this was super useful!
j

Jeremiah

04/17/2020, 2:51 PM
You’re welcome!
j

Jacques

04/20/2020, 2:11 PM
Hi, still doing some experiments and just wondered if there is a way to run() a flow without blocking?
k

Kyle Moon-Wright

04/20/2020, 5:58 PM
Hmm, I’m no expert but after asking around it seems if you want
flow.run
 to be non-blocking, you should use 
concurrent.futures
 or a threadpool to run the flow. https://docs.python.org/3/library/concurrent.futures.html
j

Jacques

04/20/2020, 6:08 PM
Thanks, will give that a go.