Brian Keating
05/05/2021, 12:13 AM.map
. I would like to:
1. start a batch of worker instances
2. download data on each instance
3. run a job on each instance
obviously, the job for a given instance shouldn't start until the data is downloaded onto that instance. however, I don't want to wait until the data is downloaded onto all of the instances before kicking off jobs.Brian Keating
05/05/2021, 12:14 AMfrom prefect import Flow, Parameter, unmapped, task, Task
from prefect.triggers import all_finished
@task
def start_workers(num_workers=1):
return [f'id-{num}' for num in range(num_workers)]
class RunCommandOnWorker(Task):
def run(self, worker_id, cmd, dummy_data=None):
pass
with Flow('test') as flow:
# Parameters
num_workers = Parameter('Number of workers', default=1)
worker_ids = start_workers(num_workers)
# Download data
download_cmds = ['aws cp <s3://data> .']
download_data_to_instances = RunCommandOnWorker(name='download data')
download_statuses = download_data_to_instances.map(worker_ids, unmapped(download_cmds))
# Run jobs
run_job_cmds = ['pip3 install scikit-learn']
run_jobs_on_instances = RunCommandOnWorker(name='run jobs')
job_statuses = run_jobs_on_instances.map(worker_ids, unmapped(run_job_cmds), upstream_tasks=download_statuses)
this fails with "TypeError: Task is not iterable"Brian Keating
05/05/2021, 12:15 AMjob_statuses = run_jobs_on_instances.map(worker_ids, unmapped(run_job_cmds), dummy_data=download_statuses)
, then it's working as I would like. Is there a way to do the same thing without introducing dummy data?Kevin Kho
job_statuses = run_jobs_on_instances.map(worker_ids, unmapped(run_job_cmds), upstream_tasks=[download_statuses])
Kevin Kho
download_statuses
Brian Keating
05/05/2021, 12:59 AMdownload_statuses
is mappable as a data input, why isn't it iterable as upstream_tasks
?Kevin Kho
upstream_tasks
takes a List[Task], even if just one. The mapped task is not a Python iterable. It’s still a type Task.Kevin Kho
Kevin Kho
Brian Keating
05/05/2021, 1:06 AMKevin Kho