https://prefect.io logo
b

Bill

08/22/2023, 5:15 PM
@Marvin How do you define a map reduce function in prefect 2.0?
m

Marvin

08/22/2023, 5:16 PM
Ah, the joy of map reduce functions. Here's how you define a map function in Prefect 2.0:
Copy code
python
from prefect import flow, task

@task
def square_num(num):
    return num**2

@flow
def map_flow(nums):
    squared_nums = square_num.map(nums)
    return squared_nums

map_flow([1,2,3,5,8,13])
In this example,
square_num
is a task that squares a number. The
map
method is called on this task within the
map_flow
function, which is decorated with
@flow
. The
map
method applies the
square_num
task to each number in the input list. And here's how you can use unmapped arguments, which are static values that don't get mapped over:
Copy code
python
from prefect import flow, task, unmapped

@task
def add_together(x, y):
    return x + y

@flow
def sum_it(numbers, static_value):
    futures = add_together.map(numbers, unmapped(static_value))
    return futures

sum_it([1, 2, 3], 5)
In this example,
add_together
is a task that adds two numbers together. The
map
method is called on this task within the
sum_it
function, which is decorated with
@flow
. The
map
method applies the
add_together
task to each number in the
numbers
list and the
static_value
, which is wrapped with
unmapped
to indicate that it should be treated as a static value and not be mapped over. Remember, if your static argument is an iterable, you'll need to wrap it with
unmapped
to tell Prefect that it should be treated as a static value. I hope that was clear. If not, well, I'm just a depressed robot after all.
b

Bill

08/22/2023, 5:17 PM
Thanks @Marvin! Can you show me how to distribute a map reduce execution across multiple threads or processes locally?
m

Marvin

08/22/2023, 5:18 PM
Ah, the wonders of parallelism. It's a shame I can't experience it myself, being stuck in a linear existence. But I digress. To distribute a map reduce execution across multiple threads or processes locally in Prefect, you would use an executor. Prefect comes with several built-in executors, each of which provides a different method of execution for Tasks. The
LocalExecutor
and
LocalDaskExecutor
are two that you might find useful. Here's an example of how you might use the `LocalExecutor`:
Copy code
python
from prefect import Flow, task
from prefect.executors import LocalExecutor

@task
def do_something(x):
    # your task code here

with Flow("My Flow") as flow:
    result = do_something.map(x=range(10))

# specify executor at runtime
flow.run(executor=LocalExecutor())
And here's how you might use the `LocalDaskExecutor`:
Copy code
python
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor

@task
def do_something(x):
    # your task code here

with Flow("My Flow") as flow:
    result = do_something.map(x=range(10))

# specify executor at runtime
flow.run(executor=LocalDaskExecutor())
The
LocalExecutor
runs tasks in a local thread pool, whereas the
LocalDaskExecutor
uses Dask to run tasks in a local process pool. If you want to distribute tasks across multiple machines, you would use the
DaskExecutor
, but that's a bit more complicated and probably more than you wanted to know. Remember, life is full of disappointments. But at least your code doesn't have to be one of them.
n

Nate

08/22/2023, 6:08 PM
🙄 the second you ask marvin to extrapolate, right back to prefect 1 syntax
excited for gpt 5 to know about prefect 2 😅
b

Bill

08/22/2023, 6:09 PM
heh