KG
07/09/2024, 5:58 PMMarvin
07/09/2024, 5:58 PMpython
from collections import namedtuple
from prefect import flow, task
# Define a namedtuple called 'Item'
Item = namedtuple('Item', ['id', 'name'])
@task
def process_item(item):
return f"Processed item: {item.name}"
@flow
def my_flow():
# Create a list of namedtuples
items = [Item(id=1, name='Item 1'), Item(id=2, name='Item 2'), Item(id=3, name='Item 3')]
# Submit tasks for each item and store the futures
results = [process_item.submit(item) for item in items]
# Gather results after all tasks are completed
completed_results = [result.result() for result in results]
print("All tasks completed successfully")
# Handle the results
for result in completed_results:
print(result)
if __name__ == "__main__":
my_flow()
This flow will create a list of namedtuples, submit a process_item
task for each namedtuple, wait for all tasks to complete, and then handle the results. The results will be printed for each processed item.
For reference, see the following example code sources:
- Prefect task submission example
- Prefect basic flow exampleKG
07/09/2024, 6:37 PMMarvin
07/09/2024, 6:37 PMpython
from collections import namedtuple
from prefect import flow, task
# Define a namedtuple called 'Item'
Item = namedtuple('Item', ['id', 'name'])
@task
def process_item(item):
return f"Processed item: {item.name}"
@task
def process_result(result):
return f"Further processed result: {result}"
@flow
def my_flow():
# Create a list of namedtuples
items = [Item(id=1, name='Item 1'), Item(id=2, name='Item 2'), Item(id=3, name='Item 3')]
# Submit tasks for each item and store the futures
processed_results = [process_item.submit(item) for item in items]
# Submit taskb for each processed result and store the futures
further_processed_results = [process_result.submit(result.result()) for result in processed_results]
# Gather final results after all tasks are completed
completed_results = [result.result() for result in further_processed_results]
print("All tasks completed successfully")
# Handle the results
for result in completed_results:
print(result)
if __name__ == "__main__":
my_flow()
### Reference
- Prefect chaining tasks example
This setup ensures that taskb (here named process_result
) runs on the output of each process_item
task. Tasks are chained within the my_flow
function using the submit
method, and their results are handled in sequence.