https://prefect.io logo
p

Preston Marshall

03/25/2020, 2:40 PM
Question about how I'd go about something like this with Prefect. I have a bucket on Google Cloud Storage where files are dropped by third parties. The problem is there's no way to know how many, but they do come in batches. What I'd like to do is collect these files incrementally until one doesn't arrive for say 60 seconds. Is this something that Prefect can help with? It's certainly a difficult problem with Airflow.
j

Jeremiah

03/25/2020, 2:56 PM
Hi @Preston Marshall, this is a challenging problem but we have designed a first-class way to handle it described in PIN 14: https://docs.prefect.io/core/PINs/PIN-14-Listener-Flows-2.html Listener flows would allow you to define the listener logic and then run parameterized flow runs according to that logic. We have not built this yet but it is on our near-term roadmap (and, interestingly, has come up 4 times this week, which makes me think we should accelerate it a bit). In the meantime, we might suggest a regularly-scheduled but fast-paced flow that checks that timestamp of the last added file and processes any that arrived between its run and a previous run.
Alternatively you might implement something like our “listener” in a separate Python process that monitors the GCS bucket (outside Prefect) and kicks off new Prefect runs to handle the actual processing
p

Preston Marshall

03/25/2020, 2:57 PM
Yeah, that's how I would do it in Airflow 😉 That's great that y'all are working on it. I will definitely be following.
e

emre

03/25/2020, 2:57 PM
I think this is a good use case for task looping. My initial idea is as follows: Check for new files • If there are new files: 1. Do whatever you need to with these files 2. sleep for x seconds 3.
raise LOOP()
to search for new files once again. • If there aren’t any new files, let the task succeed, and go on with your flow. You could use this to not process any files, but just gather their names in a list in the loop payload. Then you can map over the list for each filename.
j

Jeremiah

03/25/2020, 2:58 PM
@emre’s suggestion is a good pure-prefect approach, we’ve seen this to mimic infinitely-running flows
😊 1
p

Preston Marshall

03/25/2020, 3:00 PM
Do y'all have a way to follow progress on it? Like a github issue
@emre thanks I will ponder that 🙂
j

Jeremiah

03/25/2020, 3:55 PM
We’re working on better ways to publish our roadmap work - once the work starts, you’ll see issues and PRs tagged with
[PIN 14]