datamongus
08/20/2024, 10:25 PMMarvin
08/20/2024, 10:26 PM.map
and .submit
methods in Prefect to pass arguments to tasks. Here’s how you can do it:
Using `.map`:
The .map
function automatically creates a task run for each element of its inputs. Here are some code examples:
from prefect import flow, task
@task
def print_nums(nums):
for n in nums:
print(n)
@task
def square_num(num):
return num**2
@flow
def map_flow(nums):
print_nums(nums)
squared_nums = square_num.map(nums)
print_nums(squared_nums)
map_flow([1, 2, 3, 5, 8, 13])
You can also pass static values that don't get mapped:
from prefect import flow, task
@task
def add_together(x, y):
return x + y
@flow
def sum_it(numbers, static_value):
futures = add_together.map(numbers, static_value)
return futures
sum_it([1, 2, 3], 5)
Using `.submit`:
The .submit
method is used to run a task asynchronously and doesn't block the flow. Here’s how you can do it:
from prefect import flow, task
import time
@task
def sleep(t: int):
print(f"Going to sleep for {t} seconds...")
time.sleep(t)
print("...and I'm awake!")
@flow(log_prints=True)
def my_flow():
sleep(3) # block the flow for 3 seconds (not concurrently)
future = sleep.submit(2) # submit a task run to the task runner, doesn't block flow
future.wait() # block the flow until the submitted task run is complete
sleep.map([5] * 10) # submit 10 at once, each sleeps for 5 seconds, don't block
if __name__ == "__main__":
my_flow()
For more details:
- Documentation on Map:
https://docs.prefect.io/latest/concepts/tasks/
- Additional Examples:
https://raw.githubusercontent.com/zzstoatzz/prefect-code-examples/main/examples/tasks/submitting.py
https://raw.githubusercontent.com/zzstoatzz/prefect-code-examples/main/examples/tasks/basic.pydatamongus
08/20/2024, 10:28 PMMarvin
08/20/2024, 10:28 PM.map
to call tasks that accept multiple arguments, you can pass those arguments as lists. You can also use the unmapped
function to handle static values. Here’s how you can do it:
Example with multiple dynamic arguments:
from prefect import flow, task
@task
def process_values(x, y, z):
return x + y + z
@flow
def main_flow(x_vals, y_vals, z_vals):
result = process_values.map(x=x_vals, y=y_vals, z=z_vals)
return result
main_flow([1, 2, 3], [4, 5, 6], [7, 8, 9])
Example with static and dynamic arguments using `unmapped`:
from prefect import flow, task, unmapped
@task
def process_values(x, y, z):
return x + y + z
@flow
def main_flow(x_vals, y_vals, static_z):
result = process_values.map(x=x_vals, y=y_vals, z=unmapped(static_z))
return result
main_flow([1, 2, 3], [4, 5, 6], 10)
In these examples, you can see how to pass both dynamic lists of values and static values by using unmapped
.
For further details, you can check the Prefect documentation on Tasks.