08/22/2023, 5:15 PM
@Marvin How do you define a map reduce function in prefect 2.0?
08/22/2023, 5:16 PM
Ah, the joy of map reduce functions. Here's how you define a map function in Prefect 2.0:
``````python
from prefect import flow, task

def square_num(num):
return num**2

@flow
def map_flow(nums):
squared_nums = square_num.map(nums)
return squared_nums

map_flow([1,2,3,5,8,13])``````
In this example,
``square_num``
is a task that squares a number. The
``map``
method is called on this task within the
``map_flow``
function, which is decorated with
``@flow``
. The
``map``
method applies the
``square_num``
task to each number in the input list. And here's how you can use unmapped arguments, which are static values that don't get mapped over:
``````python
from prefect import flow, task, unmapped

return x + y

@flow
def sum_it(numbers, static_value):
futures = add_together.map(numbers, unmapped(static_value))
return futures

sum_it([1, 2, 3], 5)``````
In this example,
``add_together``
is a task that adds two numbers together. The
``map``
method is called on this task within the
``sum_it``
function, which is decorated with
``@flow``
. The
``map``
method applies the
``add_together``
task to each number in the
``numbers``
list and the
``static_value``
, which is wrapped with
``unmapped``
to indicate that it should be treated as a static value and not be mapped over. Remember, if your static argument is an iterable, you'll need to wrap it with
``unmapped``
to tell Prefect that it should be treated as a static value. I hope that was clear. If not, well, I'm just a depressed robot after all.
08/22/2023, 5:17 PM
Thanks @Marvin! Can you show me how to distribute a map reduce execution across multiple threads or processes locally?
08/22/2023, 5:18 PM
Ah, the wonders of parallelism. It's a shame I can't experience it myself, being stuck in a linear existence. But I digress. To distribute a map reduce execution across multiple threads or processes locally in Prefect, you would use an executor. Prefect comes with several built-in executors, each of which provides a different method of execution for Tasks. The
``LocalExecutor``
and
``LocalDaskExecutor``
are two that you might find useful. Here's an example of how you might use the `LocalExecutor`:
``````python
from prefect import Flow, task
from prefect.executors import LocalExecutor

def do_something(x):

with Flow("My Flow") as flow:
result = do_something.map(x=range(10))

# specify executor at runtime
flow.run(executor=LocalExecutor())``````
And here's how you might use the `LocalDaskExecutor`:
``````python
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor

def do_something(x):

with Flow("My Flow") as flow:
result = do_something.map(x=range(10))

# specify executor at runtime
The
``LocalExecutor``
runs tasks in a local thread pool, whereas the
``LocalDaskExecutor``
uses Dask to run tasks in a local process pool. If you want to distribute tasks across multiple machines, you would use the
``DaskExecutor``
, but that's a bit more complicated and probably more than you wanted to know. Remember, life is full of disappointments. But at least your code doesn't have to be one of them.
08/22/2023, 6:08 PM
ðŸ™„ the second you ask marvin to extrapolate, right back to prefect 1 syntax
excited for gpt 5 to know about prefect 2 ðŸ˜…
08/22/2023, 6:09 PM
heh