Mark Reyes
06/13/2024, 6:04 AMMarvin
06/13/2024, 6:05 AMpython
from prefect import task, flow
@task
def my_task(x):
return x * 2
@task
def aggregate(results):
return sum(results)
@flow
def my_flow():
# Map the my_task over a list of items
mapped_results = my_task.map([1, 2, 3, 4])
# Wait for the mapped tasks to complete and aggregate their results
aggregated_result = aggregate(
wait_for=[r for r in mapped_results]
)
return aggregated_result.result()
if __name__ == "__main__":
result = my_flow()
print(f"Final Result: {result}")
### Key Points:
- Mapping Tasks: Use .map
to apply a task to multiple inputs.
- Waiting for Tasks: Use the wait_for
keyword to specify dependencies on task completions.
### References:
- Task Concepts in Prefect
- Task Runners in Prefect
For more complex examples or if you want to dive deeper into async workflows, you might want to check this code example.Mark Reyes
06/13/2024, 6:07 AMmapped_results
Marvin
06/13/2024, 6:07 AMmapped_results
without aggregating them in the same `wait_for`:
python
from prefect import task, flow
@task
def my_task(x):
return x * 2
@flow
def my_flow():
# Map the my_task over a list of items
mapped_results = my_task.map([1, 2, 3, 4])
# Wait for the mapped tasks to complete
for result in mapped_results:
final_state = result.wait()
print(f"Task completed with result: {final_state.result()}")
if __name__ == "__main__":
my_flow()
### Key Points:
- Mapping Tasks: Use .map
to apply a task to multiple inputs.
- Waiting for Tasks: Use the .wait()
method to wait for each task's completion and retrieve their results.
### References:
- Task Concepts in Prefect
- Task Runners in PrefectMark Reyes
06/13/2024, 6:07 AMMarvin
06/13/2024, 6:07 AMpython
from prefect import task, flow
@task
def my_task(x):
return x * 2
@flow
def my_flow():
# Map the my_task over a list of items
input_data = [1, 2, 3, 4]
mapped_results = my_task.map(input_data)
# List to store results in order
results = []
# Wait for each task to complete and collect results in input order
for result in mapped_results:
final_state = result.wait()
results.append(final_state.result())
return results
if __name__ == "__main__":
result = my_flow()
print(f"Final Result: {result}")
### Key Points:
- Maintaining Order: Collect results in a list in the same order as the input data.
- Using .wait()
: Ensures each task waits for completion before collecting its result.
### References:
- Task Concepts in Prefect
- Results in Prefect
This approach ensures that the results are in the same order as the input data.Mark Reyes
06/13/2024, 6:30 AMMarvin
06/13/2024, 6:30 AMsubmit
method. To wait for their results, you can use the wait
method on each task's future object. Here's an example:
python
from prefect import task, flow
@task
def task_a():
# Simulated task logic
return "Result from task_a"
@task
def task_b():
# Simulated task logic
return "Result from task_b"
@flow
def my_flow():
# Submit tasks to run concurrently
future_a = task_a.submit()
future_b = task_b.submit()
# Wait for tasks to complete and retrieve results
result_a = future_a.wait().result()
result_b = future_b.wait().result()
return result_a, result_b
if __name__ == "__main__":
result = my_flow()
print(f"Final Results: {result}")
### Key Points:
- Concurrent Execution: Use .submit()
to initiate concurrent task runs.
- Waiting for Results: Use .wait()
on the future objects to wait for task completion and .result()
to get the actual results.
### References:
- Task Runners in Prefect
- Prefect Task Concepts
This method runs task_a
and task_b
concurrently, then waits for each task to complete, retrieving their results in the order specified.Mark Reyes
06/13/2024, 6:32 AMMarvin
06/13/2024, 6:33 AMpython
from prefect import task, flow
@task
def task_a(arg):
# Simulated task logic
return f"Result from task_a with arg: {arg}"
@task
def task_b(arg):
# Simulated task logic
return f"Result from task_b with arg: {arg}"
@flow
def my_flow():
# Define arguments for each task
arg_a = 1
arg_b = 2
# Submit tasks with arguments to run concurrently
future_a = task_a.submit(arg_a)
future_b = task_b.submit(arg_b)
# Wait for tasks to complete and retrieve results
result_a = future_a.wait().result()
result_b = future_b.wait().result()
return result_a, result_b
if __name__ == "__main__":
result = my_flow()
print(f"Final Results: {result}")
### Key Points:
- Passing Arguments: Argument values are passed during the .submit()
call.
- Concurrent Execution: Tasks run concurrently because they are submitted without blocking until explicitly waited upon.
- Wait for Completion: Use .wait()
on the future object to ensure the task has completed before accessing its result.
This approach allows you to run task_a
and task_b
concurrently with their respective arguments and then wait for the results after all tasks are complete.