<@U01QEJ9PP53> <@U02H1A95XDW> how do I use Prefect...
# ask-community
f
@Kevin Kho @Anna Geller how do I use Prefect’s
.map()
function when using the imperative prefect api? (The docs only show examples with the functional api ) I’ve tried using the
mapped
argument but that doesn’t seem to be right 🤔:
Copy code
flow.set_dependencies(
            upstream_tasks=task_that_returns_a_dictionary,
            task=task_that_should_do_something_for_a_single_element,
            mapped=True,
            keyword_tasks=dict(single_element=task_that_returns_a_dictionary)
        )
This is the error message:
Copy code
TypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
k
Been trying this for a bit. I think it’s like this.
Copy code
from prefect import Task, Flow
import prefect

class RunMeFirst(Task):
    def run(self):
        return [1,2,3,4]

class PlusOneTask(Task):
    def run(self, x):
        <http://prefect.context.logger.info|prefect.context.logger.info>(x)
        return x + 1

flow = Flow('My Imperative Flow')

run = RunMeFirst()
plus_one = PlusOneTask()

flow.set_dependencies(
    plus_one,
    upstream_tasks=[run],
    keyword_tasks=dict(x=run),
    mapped=True
)
f
@Kevin Kho oh I see I forgot to put the
upstream_tasks
inside an array, the way it’s always done when you
set_dependencies
!! I tried again and got a different error:
Copy code
(...) in prepare_upstream_states_for_mapping
    value = upstream_state.result[i]
KeyError: 0
So I compared examples and realized yours uses a list and mine uses a dictionary. Switching to a list finally made it work! I had the impression the map-reduce concept works with dictionaries as well? I’m still new to most of this so any insights you have would be greatly appreciated! 🙏
k
I don’t believe it will work for dict. I think mapping needs to be on a list to you need an intermediate task to pull out the keys or values
f
@Kevin Kho thank you for the info! Yes I did exactly that and the mapping works! We’re using the
LocalDaskExecutor
with
threads=4
and we’re using the docker agent and docker storage, so the flow is running on our aws infra. I was expecting the UI to show that it’s running 4 runs at the same time, but (from limited anecdotal experience gathered an hour ago) I’ve only seen 2 runs running at the same time. Can you provide more info around that? (Even if the map reduce process is taking up one thread, shouldn’t there 3 available thread to work with?) 🤔
k
Try
LocalDaskExecutor(num_workers = 4, scheduler="threads")
. Not sure if the threads kwarg will work, but I’m sure the num workers does
💡 1
f
Thank @Kevin Kho! I’ve updated my code and I’m giving that a try. I’ve compared the execution time for my flow with and without map-reduce and the time is about the same, which really makes me think something’s up with the Executor setup (I very much hope so!!). I’ll let you know how the updated version goes!
@Kevin Kho I’ve just checked Prefect’s
DaskExecutor
class and it seems like when
num_workers
isn’t set explicitly it takes the amount of available CPUs, which is what I was expecting it to do (and it does do that in general, I can see how it works on 4 prefect tasks at the same time in the ui, just the mapping doesn’t seem to run in separate threads). (i’m running the updated version, I’ll let you know how that goes!)
k
I normally see this with ECS where it uses 2 threads by default. I think
num_workers
should help you, though it is weird that you already see 4 tasks running at the same time in the UI
🤔 1
f
@Kevin Kho That’s very helpful!! I think you’re right about the threads! I’ve checked out a few of our other flows and flow runs and I think the ‘dots’ look like they’re worked on in batches of four but it’s really just two at a time! (wishful thinking maybe? 😅 ) I’ve done some task execution time benchmarking with different setups, that I’d like to share with you: ‘Original’ setup: • AWS Fargate • cpu: 2048 • memory: 4096 •
flow.executor = LocalDaskExecutor(scheduler="threads")
• Workers used: ECS default value (2 workers) • No Map/Reduce, just a regular for loop inside a task. • Execution time: 55minutes Setup with Map/Reduce: • AWS Fargate • cpu: 2048 • memory: 4096 •
flow.executor = LocalDaskExecutor(scheduler="threads")
• Workers used: ECS default value (2 workers) • Using Map/Reduce. • Execution time: 55minutes Setup with Map/Reduce and increased `num_workers`: • AWS Fargate • cpu: 2048 • memory: 4096 •
flow.executor = LocalDaskExecutor(num_workers=4, scheduler="threads")
• Using Map/Reduce. • Workers used: 4 workers. • Execution time: 25minutes Setup with Map/Reduce, increased
num_workers
again and increased
cpu
and `memory`:
• AWS Fargate • cpu: 4 vCPU • memory: 8192 •
flow.executor = LocalDaskExecutor(num_workers=8, scheduler="threads")
• Using Map/Reduce. • Workers used: 8 workers. • Execution time: 14minutes. Isn’t this incredible?! I hope this will be helpful for anyone trying to optimize their flows. 🙂
k
How did the last one get to 8 workers? 😅
😹 1
f
@Kevin Kho ah sorry!! Fixed the code snippet to say
num_workers=8
😅
k
Ah ok that makes sense 👍
👌 1