James Ashby
07/17/2024, 7:30 PMMarvin
07/17/2024, 7:31 PMpython
from prefect import flow, task
from prefect.engine.results import State
# Define a sample subflow
@flow
def subflow(index: int):
if index % 2 == 0:
return f"Subflow {index} - Succeeded"
else:
raise Exception(f"Subflow {index} - Failed")
# Define the parent flow
@flow
def parent_flow(subflow_count: int):
results = {}
for i in range(subflow_count):
try:
# Run the subflow
subflow_result = subflow.with_options(name=f"Subflow {i}")(i)
results[i] = (subflow_result, "Succeeded")
except Exception as e:
results[i] = (str(e), "Failed")
# Print the results
for i, (result, status) in results.items():
print(f"Subflow {i}: {status} - {result}")
# Run the parent flow with a specified number of subflows
parent_flow(5)
Explanation:
- Subflows: Defined using the @flow
decorator. The example subflow raises an exception for odd-indexed runs to simulate failure.
- Parent Flow: Runs each subflow and captures the result or exception. The results are stored in a dictionary with their status for later evaluation.
You can expand on this example to cover more complex cases or use Prefect’s features to manage state and logging more effectively.
For more details, you can refer to the Prefect documentation:
- Concepts: Flows: https://docs.prefect.io/latest/concepts/flows/
- Composing Flows Example: https://raw.githubusercontent.com/zzstoatzz/prefect-code-examples/main/examples/flows/basic.py
Feel free to modify the code as needed to match your specific use case.James Ashby
07/17/2024, 7:45 PMMarvin
07/17/2024, 7:46 PMreturn_state=True
and state.result(raise_on_failure=False)
functionalities in Prefect. This will allow you to handle the state of each subflow run directly.
Here's an example of how you can achieve this:
python
from prefect import flow, task
from prefect.engine.results import State
# Define a sample subflow
@flow
def subflow(index: int):
if index % 2 == 0:
return f"Subflow {index} - Succeeded"
else:
raise Exception(f"Subflow {index} - Failed")
# Define the parent flow
@flow
def parent_flow(subflow_count: int):
results = {}
for i in range(subflow_count):
subflow_state = subflow.with_options(name=f"Subflow {i}").run(i, return_state=True)
subflow_result = subflow_state.result(raise_on_failure=False)
if subflow_state.is_failed():
results[i] = (subflow_result, "Failed")
else:
results[i] = (subflow_result, "Succeeded")
# Print the results
for i, (result, status) in results.items():
print(f"Subflow {i}: {status} - {result}")
# Run the parent flow with a specified number of subflows
parent_flow(5)
Explanation:
- Subflows: Defined using the @flow
decorator. The example subflow raises an exception for odd-indexed runs to simulate failure.
- Parent Flow: Runs each subflow and captures the state directly using return_state=True
.
- Uses state.result(raise_on_failure=False)
to get the result without raising an exception.
- Checks the state using subflow_state.is_failed()
to determine if the subflow failed or succeeded.
- Stores the results and their status (succeeded or failed) in a dictionary.
This approach makes it simpler to handle the results of subflows without using a try-except block.
For more details, you can refer to the Prefect documentation:
- Concepts: Results: https://docs.prefect.io/latest/concepts/results/
Feel free to modify the code as needed to match your specific use case.James Ashby
07/17/2024, 9:28 PM