Bruno Grande
08/05/2022, 6:07 PM.map()
extensively. The latter seems a bit harder to maintain, but I’m open to hear evidence pointing to the contrary..map()
no longer applicable/necessary in Prefect 2.0?Keith
08/05/2022, 7:33 PMmap()
function b/c you can do a normal Python loop. From my brief experience with 2.0 so far, I refactored all my 1.0 map()
code out with for ... in ...:
loops.
As to your first question, can you retrieve the last modified time
from the operating system for each file? If you can, then you should be able to track the last time you collected the file and compare to that at execution time to the current modified time, if it has changed, then you do work.Bruno Grande
08/05/2022, 8:46 PMmap()
function in favor of Python-native loops. That encourages me to rethink my approach.
Here’s the general flow of data for each manifest file (CSV):
1. Download manifest from data repository using authenticated request
2. Split manifest into logical groups (to enable parallel processing downstream)
3. For each manifest chunk:
a. Stage the files on the target compute platform
b. Kick off a remote processing job on the target compute platform
4. Collate the output files for each processing job
5. Transfer these output files back into the original data repository
Given that Step 3 would result in several jobs being initiated, am I correct to think that I can use the .submit()
method on tasks to kick off a bunch of parallel “mini-DAGs” (assuming that I use a concurrent/parallel task runner)? In terms of pseudo-code, I’m thinking of something like this (you can assume that all of the functions are Prefect tasks):
@flow
def data_pipeline(manifest_id):
manifest = download_manifest(manifest_id)
manifest_chunks = split_manifest(manifest)
jobids = list()
for chunk in manifest_chunks:
inputs = stage_files.submit(chunk)
jobid = submit_processing_job.submit(chunk, inputs)
jobids.append(jobid)
# Wait for all jobs to finish processing
all_outputs = list()
for jobid_future in jobids:
jobid = jobid_future.result()
outputs = collate_outputs(jobid)
all_outputs.append(outputs)
upload_to_repo(all_outputs)
Keith
08/05/2022, 9:47 PMresult()
I think that is actually where you will "wait for all jobs to finish processing" b/c you won't be able to get the result for that specific job until the submit_processing_job
is complete.Bruno Grande
08/06/2022, 6:45 AMsleep(5)
were run 5 times sequentially.
$ time pipenv run python test.py
[...]
23:40:57.117 | INFO | Flow run 'mahogany-coyote' - Finished in state Completed()
11,12,13,14,15
________________________________________________________
Executed in 11.45 secs fish external
usr time 6.79 secs 138.00 micros 6.79 secs
sys time 1.27 secs 540.00 micros 1.27 secs