https://prefect.io logo
b

Bruno Grande

08/05/2022, 6:07 PM
👋 Hello, everyone! I’m new to Prefect 2.0 and I’m trying to figure out the best way to tackle my (likely unconventional) data pipeline. Briefly, I have a pipeline for processing the files in a manifest, and the number of manifests will grow over time. Each manifest should be processed only once unless it’s updated. I’m wondering how to best handle the dynamic nature of my inputs (i.e. the file manifests) and limit the processing of each manifest to once per update. 🧵 I can elaborate a bit more in the thread.
1
I’m aware of caching, but I wonder if there are other elegant ways of achieving this.
I’m also wondering about the tradeoffs between having a flow that handles a single manifest (perhaps triggered by some other scheduled flow) versus a single flow that using
.map()
extensively. The latter seems a bit harder to maintain, but I’m open to hear evidence pointing to the contrary.
Or is my idea of using
.map()
no longer applicable/necessary in Prefect 2.0?
k

Keith

08/05/2022, 7:33 PM
In Prefect 2.0 you don't really need the
map()
function b/c you can do a normal Python loop. From my brief experience with 2.0 so far, I refactored all my 1.0
map()
code out with
for ... in ...:
loops. As to your first question, can you retrieve the
last modified time
from the operating system for each file? If you can, then you should be able to track the last time you collected the file and compare to that at execution time to the current modified time, if it has changed, then you do work.
🙏 1
🦜 1
🙌 1
b

Bruno Grande

08/05/2022, 8:46 PM
🙏 Thanks for helping out, @Keith! Especially since you don’t appear to be a Prefect employee. It’s good to hear that you’ve been able to move away from calling the
map()
function in favor of Python-native loops. That encourages me to rethink my approach. Here’s the general flow of data for each manifest file (CSV): 1. Download manifest from data repository using authenticated request 2. Split manifest into logical groups (to enable parallel processing downstream) 3. For each manifest chunk: a. Stage the files on the target compute platform b. Kick off a remote processing job on the target compute platform 4. Collate the output files for each processing job 5. Transfer these output files back into the original data repository Given that Step 3 would result in several jobs being initiated, am I correct to think that I can use the
.submit()
method on tasks to kick off a bunch of parallel “mini-DAGs” (assuming that I use a concurrent/parallel task runner)? In terms of pseudo-code, I’m thinking of something like this (you can assume that all of the functions are Prefect tasks):
Copy code
@flow
def data_pipeline(manifest_id):
    manifest = download_manifest(manifest_id)
    manifest_chunks = split_manifest(manifest)
    jobids = list()
    for chunk in manifest_chunks:
        inputs = stage_files.submit(chunk)
        jobid = submit_processing_job.submit(chunk, inputs)
        jobids.append(jobid)
    # Wait for all jobs to finish processing
    all_outputs = list()
    for jobid_future in jobids:
        jobid = jobid_future.result()
        outputs = collate_outputs(jobid)
        all_outputs.append(outputs)
    upload_to_repo(all_outputs)
k

Keith

08/05/2022, 9:47 PM
I have yet to use the task runners in 2.0 yet but your pseudo-code makes sense based on the documentation here: https://docs.prefect.io/concepts/task-runners/#using-a-task-runner Based on the documentation for
result()
I think that is actually where you will "wait for all jobs to finish processing" b/c you won't be able to get the result for that specific job until the
submit_processing_job
is complete.
b

Bruno Grande

08/06/2022, 6:45 AM
Confirming that my pseudo-code (with tweaks to actually function) works as expected (see attached snippet). The separate “mini-DAGs” submitted during the for-loop were run concurrently. The following command takes 6-7 seconds to run instead of the 25+ seconds expected if the
sleep(5)
were run 5 times sequentially.
Copy code
$ time pipenv run python test.py
[...]
23:40:57.117 | INFO    | Flow run 'mahogany-coyote' - Finished in state Completed()
11,12,13,14,15

________________________________________________________
Executed in   11.45 secs    fish           external
   usr time    6.79 secs  138.00 micros    6.79 secs
   sys time    1.27 secs  540.00 micros    1.27 secs
🙌 1