https://prefect.io logo
b

Benoit Chabord

07/23/2022, 4:46 AM
Hi All, I have a 2.0 Flow/Tasks architecture question: My use case is to Check for one or more files in a s3 buckets, if there is something then execute a few tasks. This check is going to run every 5 minutes but will probably process files only once an hour on average: I separated my code/flows/tasks this way: • A flow named "Check for files on S3) with one task downloading the files and returning the list of downloaded files • If the list is not empty Subflow is called invoked with the list of files that were just downloaded ◦ A few tasks are executed doing transformations and calling some APIs ◦ My last task is to delete from S3 the files I just processed if all of the previous tasks succeeded The reason I did a subflow is so that I can filter easily the flow that is actually processing files instead of having a successful flow every 5min that may not contain any transformation. Is this an antipattern? Also I have to add a
# type: ignore
on every task call because the task returns the type of the wrapped function so I cannot do .wait() on it without raising an issue with mypy. Not sure how we can do better though
1
a

Anna Geller

07/23/2022, 1:33 PM
it looks like you ask several questions here 1. There is no antipattern here if you decide to use subflow instead of tasks, it depends on what you prefer - the main tradeoff is how complex your end solution may become though and given you are implementing a polling pattern, this lends itself well to event-driven or real-time use case for which we will build more recipes very soon 2. Is this solution working for you or do you see some errors or issues? 3. Can't tell about mypy - do you need to return a wrapped function? seems complex for this use case but it's for sure your decision some resources: • https://discourse.prefect.io/t/how-to-add-retries-when-processing-files-and-know-which-files-failed-to-get-processed/1201#solution-3 • this is for 1.0 but the patterns are still valid https://discourse.prefect.io/t/is-there-an-equivalent-to-sensors-in-prefect-how-do-i-trigger-event-driven-workflows/76 follow updated to this topic for best practice recommendations for such use cases https://discourse.prefect.io/t/event-driven-workflows-in-prefect-2-0-serverless-workflo[…]ed-based-on-some-action-rather-than-based-on-schedule/1054 - it's just a scaffold for now though
c

Cole Murray

07/23/2022, 6:34 PM
@Benoit Chabord Some general thoughts about the described workflow, not specific to Prefect. The architecture you described is somewhat inefficient, as you’re required to continually poll against the server. To overcome this, you could swap to an event-based processing, where your workflow is only triggered once you know the required conditions have been met (In this case that there is files for download). Depending on the specifics of how the files are generated and have to be processed you could make some of the following changes: • If files are not required to all be processed together, use S3 event notifications with a lambda. This lambda can then invoke your prefect job with parameters based on the event • If all the files are required to be processed together, you need a way of addressing when all the files are present. This can take a few forms: ◦ Publish a “Done” or .SUCCCESS file, which is what triggers the event notification. This is done within the hadoop ecosystem as a way of signifying completion of a multi-part write. ◦ (Inefficient) have your lambda get triggered on every file upload, and use logic to determine when the run condition has been met. This is better than polling, as if there is large gaps in your uploads, you aren’t wasting resources checking every 5 mins
💯 1
b

Benoit Chabord

07/24/2022, 4:02 AM
@Cole Murray Thanks for the detailed answer, I usually use event driven architecture but in this case the S3 bucket is owned by a client not me and I have no way to get notified (This could also be a SFTP for example) I am trying to find what would be the best way to implement a polling pattern in those cases. Being notified of network polling issues but focusing on the processing success not on polling success with nothing to process
a

Anna Geller

07/26/2022, 12:53 PM
why you have no way to get notified? it's a matter of the right IAM role with S3 permissions to grant Lambda access to read from that bucket but for SFTP yes, polling in a sensor style seems the right way to approach it as I shared on Discourse
2 Views