I'm having trouble figuring out how to run multiple linear "subflows" in parallel with
.map
. I would like to:
1. start a batch of worker instances
2. download data on each instance
3. run a job on each instance
obviously, the job for a given instance shouldn't start until the data is downloaded onto that instance. however, I don't want to wait until the data is downloaded onto all of the instances before kicking off jobs.
Brian Keating
05/05/2021, 12:14 AM
here's a minimal example:
Copy code
from prefect import Flow, Parameter, unmapped, task, Task
from prefect.triggers import all_finished
@task
def start_workers(num_workers=1):
return [f'id-{num}' for num in range(num_workers)]
class RunCommandOnWorker(Task):
def run(self, worker_id, cmd, dummy_data=None):
pass
with Flow('test') as flow:
# Parameters
num_workers = Parameter('Number of workers', default=1)
worker_ids = start_workers(num_workers)
# Download data
download_cmds = ['aws cp <s3://data> .']
download_data_to_instances = RunCommandOnWorker(name='download data')
download_statuses = download_data_to_instances.map(worker_ids, unmapped(download_cmds))
# Run jobs
run_job_cmds = ['pip3 install scikit-learn']
run_jobs_on_instances = RunCommandOnWorker(name='run jobs')
job_statuses = run_jobs_on_instances.map(worker_ids, unmapped(run_job_cmds), upstream_tasks=download_statuses)
this fails with "TypeError: Task is not iterable"
Brian Keating
05/05/2021, 12:15 AM
if I add a dummy data dependency between the mapped tasks by replacing the last line with
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.