Fina Silva-Santisteban
11/11/2021, 10:36 PM.map()
function when using the imperative prefect api? (The docs only show examples with the functional api )
I’ve tried using the mapped
argument but that doesn’t seem to be right 🤔:
flow.set_dependencies(
upstream_tasks=task_that_returns_a_dictionary,
task=task_that_should_do_something_for_a_single_element,
mapped=True,
keyword_tasks=dict(single_element=task_that_returns_a_dictionary)
)
This is the error message:
TypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
Kevin Kho
from prefect import Task, Flow
import prefect
class RunMeFirst(Task):
def run(self):
return [1,2,3,4]
class PlusOneTask(Task):
def run(self, x):
<http://prefect.context.logger.info|prefect.context.logger.info>(x)
return x + 1
flow = Flow('My Imperative Flow')
run = RunMeFirst()
plus_one = PlusOneTask()
flow.set_dependencies(
plus_one,
upstream_tasks=[run],
keyword_tasks=dict(x=run),
mapped=True
)
Fina Silva-Santisteban
11/12/2021, 2:40 PMupstream_tasks
inside an array, the way it’s always done when you set_dependencies
!! I tried again and got a different error:
(...) in prepare_upstream_states_for_mapping
value = upstream_state.result[i]
KeyError: 0
So I compared examples and realized yours uses a list and mine uses a dictionary. Switching to a list finally made it work! I had the impression the map-reduce concept works with dictionaries as well? I’m still new to most of this so any insights you have would be greatly appreciated! 🙏Kevin Kho
Fina Silva-Santisteban
11/12/2021, 4:47 PMLocalDaskExecutor
with threads=4
and we’re using the docker agent and docker storage, so the flow is running on our aws infra. I was expecting the UI to show that it’s running 4 runs at the same time, but (from limited anecdotal experience gathered an hour ago) I’ve only seen 2 runs running at the same time. Can you provide more info around that? (Even if the map reduce process is taking up one thread, shouldn’t there 3 available thread to work with?) 🤔Kevin Kho
LocalDaskExecutor(num_workers = 4, scheduler="threads")
. Not sure if the threads kwarg will work, but I’m sure the num workers doesFina Silva-Santisteban
11/12/2021, 5:08 PMFina Silva-Santisteban
11/12/2021, 5:13 PMDaskExecutor
class and it seems like when num_workers
isn’t set explicitly it takes the amount of available CPUs, which is what I was expecting it to do (and it does do that in general, I can see how it works on 4 prefect tasks at the same time in the ui, just the mapping doesn’t seem to run in separate threads). (i’m running the updated version, I’ll let you know how that goes!)Kevin Kho
num_workers
should help you, though it is weird that you already see 4 tasks running at the same time in the UIFina Silva-Santisteban
11/12/2021, 6:33 PMflow.executor = LocalDaskExecutor(scheduler="threads")
• Workers used: ECS default value (2 workers)
• No Map/Reduce, just a regular for loop inside a task.
• Execution time: 55minutes
Setup with Map/Reduce:
• AWS Fargate
• cpu: 2048
• memory: 4096
• flow.executor = LocalDaskExecutor(scheduler="threads")
• Workers used: ECS default value (2 workers)
• Using Map/Reduce.
• Execution time: 55minutes
Setup with Map/Reduce and increased `num_workers`:
• AWS Fargate
• cpu: 2048
• memory: 4096
• flow.executor = LocalDaskExecutor(num_workers=4, scheduler="threads")
• Using Map/Reduce.
• Workers used: 4 workers.
• Execution time: 25minutes
Setup with Map/Reduce, increased num_workers
again and increased cpu
and `memory`:
• AWS Fargate
• cpu: 4 vCPU
• memory: 8192
• flow.executor = LocalDaskExecutor(num_workers=8, scheduler="threads")
• Using Map/Reduce.
• Workers used: 8 workers.
• Execution time: 14minutes.
Isn’t this incredible?! I hope this will be helpful for anyone trying to optimize their flows. 🙂Kevin Kho
Fina Silva-Santisteban
11/12/2021, 7:09 PMnum_workers=8
😅Kevin Kho