Bruno Grande08/05/2022, 6:07 PM
extensively. The latter seems a bit harder to maintain, but I’m open to hear evidence pointing to the contrary.
no longer applicable/necessary in Prefect 2.0?
Keith08/05/2022, 7:33 PM
function b/c you can do a normal Python loop. From my brief experience with 2.0 so far, I refactored all my 1.0
code out with
loops. As to your first question, can you retrieve the
for ... in ...:
from the operating system for each file? If you can, then you should be able to track the last time you collected the file and compare to that at execution time to the current modified time, if it has changed, then you do work.
last modified time
Bruno Grande08/05/2022, 8:46 PM
function in favor of Python-native loops. That encourages me to rethink my approach. Here’s the general flow of data for each manifest file (CSV): 1. Download manifest from data repository using authenticated request 2. Split manifest into logical groups (to enable parallel processing downstream) 3. For each manifest chunk: a. Stage the files on the target compute platform b. Kick off a remote processing job on the target compute platform 4. Collate the output files for each processing job 5. Transfer these output files back into the original data repository Given that Step 3 would result in several jobs being initiated, am I correct to think that I can use the
method on tasks to kick off a bunch of parallel “mini-DAGs” (assuming that I use a concurrent/parallel task runner)? In terms of pseudo-code, I’m thinking of something like this (you can assume that all of the functions are Prefect tasks):
@flow def data_pipeline(manifest_id): manifest = download_manifest(manifest_id) manifest_chunks = split_manifest(manifest) jobids = list() for chunk in manifest_chunks: inputs = stage_files.submit(chunk) jobid = submit_processing_job.submit(chunk, inputs) jobids.append(jobid) # Wait for all jobs to finish processing all_outputs = list() for jobid_future in jobids: jobid = jobid_future.result() outputs = collate_outputs(jobid) all_outputs.append(outputs) upload_to_repo(all_outputs)
Keith08/05/2022, 9:47 PM
I think that is actually where you will "wait for all jobs to finish processing" b/c you won't be able to get the result for that specific job until the
Bruno Grande08/06/2022, 6:45 AM
were run 5 times sequentially.
$ time pipenv run python test.py [...] 23:40:57.117 | INFO | Flow run 'mahogany-coyote' - Finished in state Completed() 11,12,13,14,15 ________________________________________________________ Executed in 11.45 secs fish external usr time 6.79 secs 138.00 micros 6.79 secs sys time 1.27 secs 540.00 micros 1.27 secs