Mark Reyes
06/13/2024, 6:04 AMMarvin
06/13/2024, 6:05 AMpython
from prefect import task, flow
@task
def my_task(x):
return x * 2
@task
def aggregate(results):
return sum(results)
@flow
def my_flow():
# Map the my_task over a list of items
mapped_results = my_task.map([1, 2, 3, 4])
# Wait for the mapped tasks to complete and aggregate their results
aggregated_result = aggregate(
wait_for=[r for r in mapped_results]
)
return aggregated_result.result()
if __name__ == "__main__":
result = my_flow()
print(f"Final Result: {result}")
### Key Points:
- Mapping Tasks: Use .map
to apply a task to multiple inputs.
- Waiting for Tasks: Use the wait_for
keyword to specify dependencies on task completions.
### References:
- Task Concepts in Prefect
- Task Runners in Prefect
For more complex examples or if you want to dive deeper into async workflows, you might want to check this code example.Mark Reyes
06/13/2024, 6:07 AMmapped_results
Marvin
06/13/2024, 6:07 AMmapped_results
without aggregating them in the same `wait_for`:
python
from prefect import task, flow
@task
def my_task(x):
return x * 2
@flow
def my_flow():
# Map the my_task over a list of items
mapped_results = my_task.map([1, 2, 3, 4])
# Wait for the mapped tasks to complete
for result in mapped_results:
final_state = result.wait()
print(f"Task completed with result: {final_state.result()}")
if __name__ == "__main__":
my_flow()
### Key Points:
- Mapping Tasks: Use .map
to apply a task to multiple inputs.
- Waiting for Tasks: Use the .wait()
method to wait for each task's completion and retrieve their results.
### References:
- Task Concepts in Prefect
- Task Runners in PrefectMark Reyes
06/13/2024, 6:07 AMMarvin
06/13/2024, 6:07 AMpython
from prefect import task, flow
@task
def my_task(x):
return x * 2
@flow
def my_flow():
# Map the my_task over a list of items
input_data = [1, 2, 3, 4]
mapped_results = my_task.map(input_data)
# List to store results in order
results = []
# Wait for each task to complete and collect results in input order
for result in mapped_results:
final_state = result.wait()
results.append(final_state.result())
return results
if __name__ == "__main__":
result = my_flow()
print(f"Final Result: {result}")
### Key Points:
- Maintaining Order: Collect results in a list in the same order as the input data.
- Using .wait()
: Ensures each task waits for completion before collecting its result.
### References:
- Task Concepts in Prefect
- Results in Prefect
This approach ensures that the results are in the same order as the input data.Mark Reyes
06/13/2024, 6:30 AMMarvin
06/13/2024, 6:30 AMsubmit
method. To wait for their results, you can use the wait
method on each task's future object. Here's an example:
python
from prefect import task, flow
@task
def task_a():
# Simulated task logic
return "Result from task_a"
@task
def task_b():
# Simulated task logic
return "Result from task_b"
@flow
def my_flow():
# Submit tasks to run concurrently
future_a = task_a.submit()
future_b = task_b.submit()
# Wait for tasks to complete and retrieve results
result_a = future_a.wait().result()
result_b = future_b.wait().result()
return result_a, result_b
if __name__ == "__main__":
result = my_flow()
print(f"Final Results: {result}")
### Key Points:
- Concurrent Execution: Use .submit()
to initiate concurrent task runs.
- Waiting for Results: Use .wait()
on the future objects to wait for task completion and .result()
to get the actual results.
### References:
- Task Runners in Prefect
- Prefect Task Concepts
This method runs task_a
and task_b
concurrently, then waits for each task to complete, retrieving their results in the order specified.Mark Reyes
06/13/2024, 6:32 AMMarvin
06/13/2024, 6:33 AMpython
from prefect import task, flow
@task
def task_a(arg):
# Simulated task logic
return f"Result from task_a with arg: {arg}"
@task
def task_b(arg):
# Simulated task logic
return f"Result from task_b with arg: {arg}"
@flow
def my_flow():
# Define arguments for each task
arg_a = 1
arg_b = 2
# Submit tasks with arguments to run concurrently
future_a = task_a.submit(arg_a)
future_b = task_b.submit(arg_b)
# Wait for tasks to complete and retrieve results
result_a = future_a.wait().result()
result_b = future_b.wait().result()
return result_a, result_b
if __name__ == "__main__":
result = my_flow()
print(f"Final Results: {result}")
### Key Points:
- Passing Arguments: Argument values are passed during the .submit()
call.
- Concurrent Execution: Tasks run concurrently because they are submitted without blocking until explicitly waited upon.
- Wait for Completion: Use .wait()
on the future object to ensure the task has completed before accessing its result.
This approach allows you to run task_a
and task_b
concurrently with their respective arguments and then wait for the results after all tasks are complete.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by