I'm having trouble figuring out how to run multipl...
# ask-community
b
I'm having trouble figuring out how to run multiple linear "subflows" in parallel with
.map
. I would like to: 1. start a batch of worker instances 2. download data on each instance 3. run a job on each instance obviously, the job for a given instance shouldn't start until the data is downloaded onto that instance. however, I don't want to wait until the data is downloaded onto all of the instances before kicking off jobs.
here's a minimal example:
Copy code
from prefect import Flow, Parameter, unmapped, task, Task
from prefect.triggers import all_finished


@task
def start_workers(num_workers=1):
    return [f'id-{num}' for num in range(num_workers)]


class RunCommandOnWorker(Task):
    def run(self, worker_id, cmd, dummy_data=None):
        pass


with Flow('test') as flow:
    
    # Parameters
    num_workers = Parameter('Number of workers', default=1)
    worker_ids = start_workers(num_workers)

    # Download data
    download_cmds = ['aws cp <s3://data> .']
    download_data_to_instances = RunCommandOnWorker(name='download data')
    download_statuses = download_data_to_instances.map(worker_ids, unmapped(download_cmds))

    # Run jobs
    run_job_cmds = ['pip3 install scikit-learn']
    run_jobs_on_instances = RunCommandOnWorker(name='run jobs')
    job_statuses = run_jobs_on_instances.map(worker_ids, unmapped(run_job_cmds), upstream_tasks=download_statuses)
this fails with "TypeError: Task is not iterable"
if I add a dummy data dependency between the mapped tasks by replacing the last line with
job_statuses = run_jobs_on_instances.map(worker_ids, unmapped(run_job_cmds), dummy_data=download_statuses)
, then it's working as I would like. Is there a way to do the same thing without introducing dummy data?
k
Hi @Brian Keating, can you try
job_statuses = run_jobs_on_instances.map(worker_ids, unmapped(run_job_cmds), upstream_tasks=[download_statuses])
Note the list around
download_statuses
b
I think that did the trick, thanks @Kevin Kho. can't say that I understand it though -- if
download_statuses
is mappable as a data input, why isn't it iterable as
upstream_tasks
?
k
upstream_tasks
takes a List[Task], even if just one. The mapped task is not a Python iterable. It’s still a type Task.
download_statuses is not a List. It’s still a Task because Task is the class that allows for deferred execution inside the Flow.
You can trace what Task.map() returns in the code and you’ll notice it’s not a List.
b
thanks for the insight!
k
Anytime. Hope that helps.