Koen
03/17/2025, 1:38 PMMarvin
03/17/2025, 1:39 PM@flow
and @task
decorators. Here's a concise example that aligns with your requirements:
python
from prefect import flow, task
from prefect.futures import PrefectFuture
@task
def initial_task(task_id: int) -> str:
return f"Result from task {task_id}"
@task
def follow_up_task(result: str) -> None:
print(f"Processing {result}")
@flow
def concurrent_flow():
# Start initial tasks concurrently
initial_results = [initial_task.submit(i) for i in range(5)]
# Start follow-up tasks as soon as their corresponding initial task completes
follow_up_results = [follow_up_task.submit(result) for result in initial_results]
# Execute the flow
if __name__ == "__main__":
concurrent_flow()
### Explanation:
1. Task Execution: The initial_task
produces a result, which is then immediately consumed by follow_up_task
.
2. Concurrency: The .submit()
method is used to execute tasks concurrently.
3. Dependencies: Each follow_up_task
waits for its corresponding initial_task
to complete, ensuring that tasks are executed in pairs.
For more details on task concurrency and dependencies in Prefect 3.x, refer to the Prefect documentation.Koen
03/17/2025, 1:44 PMMarvin
03/17/2025, 1:44 PMpython
from prefect import flow, task
from prefect.exceptions import PrefectException
class CustomTaskException(PrefectException):
def __init__(self, message: str, data: str):
super().__init__(message)
self.data = data
@task
def initial_task(task_id: int) -> str:
if task_id % 2 == 0: # Simulate failure for demonstration
raise CustomTaskException("Task failed", data=f"Data from task {task_id}")
return f"Result from task {task_id}"
@task
def follow_up_task(result_or_exception_data: str) -> None:
print(f"Processing {result_or_exception_data}")
@flow
def concurrent_flow():
initial_results = []
for i in range(5):
try:
result = initial_task.submit(i).result()
except CustomTaskException as e:
result = e.data
initial_results.append(result)
follow_up_results = [follow_up_task.submit(result) for result in initial_results]
# Execute the flow
if __name__ == "__main__":
concurrent_flow()
### Key Points:
- Custom Exception: CustomTaskException
includes a data
attribute to store additional information.
- Error Handling: The try-except
block in the flow handles exceptions, allowing the flow to continue and use the data
from the exception.
- Task Execution: The follow-up task uses either the result of a successful task or the exception's data.
This approach ensures that the follow-up tasks are executed regardless of the initial task's success, using the exception's data if necessary.Koen
03/17/2025, 1:47 PMMarvin
03/17/2025, 1:47 PMpython
from prefect import flow, task
from prefect.exceptions import PrefectException
from prefect.futures import PrefectFuture
class CustomTaskException(PrefectException):
def __init__(self, message: str, data: str):
super().__init__(message)
self.data = data
@task
def initial_task(task_id: int) -> str:
if task_id % 2 == 0: # Simulate failure for demonstration
raise CustomTaskException("Task failed", data=f"Data from task {task_id}")
return f"Result from task {task_id}"
@task
def follow_up_task(result_or_exception_data: str) -> None:
print(f"Processing {result_or_exception_data}")
@flow
def concurrent_flow():
# Submit all initial tasks concurrently
initial_futures = [initial_task.submit(i) for i in range(5)]
# Process results or exceptions
for future in initial_futures:
try:
result = future.result()
except CustomTaskException as e:
result = e.data
# Submit follow-up task
follow_up_task.submit(result)
# Execute the flow
if __name__ == "__main__":
concurrent_flow()
### Explanation:
- Concurrent Execution: All initial tasks are submitted first using .submit()
, ensuring they run concurrently.
- Result Handling: After submission, results or exceptions are handled in a loop.
- Follow-up Task Submission: Follow-up tasks are submitted immediately after handling the result or exception of each initial task.
This ensures that the initial tasks execute concurrently, and follow-up tasks are initiated as soon as their corresponding initial task completes.Koen
03/17/2025, 1:57 PMMarvin
03/17/2025, 1:58 PMwait_for
parameter in Prefect, you can pass the PrefectFuture
of the initial task to the follow-up task. Here's how you can modify the code:
python
from prefect import flow, task
from prefect.exceptions import PrefectException
from prefect.futures import PrefectFuture
class CustomTaskException(PrefectException):
def __init__(self, message: str, data: str):
super().__init__(message)
self.data = data
@task
def initial_task(task_id: int) -> str:
if task_id % 2 == 0: # Simulate failure for demonstration
raise CustomTaskException("Task failed", data=f"Data from task {task_id}")
return f"Result from task {task_id}"
@task
def follow_up_task(result_or_exception_data: str) -> None:
print(f"Processing {result_or_exception_data}")
@flow
def concurrent_flow():
# Submit all initial tasks concurrently
initial_futures = [initial_task.submit(i) for i in range(5)]
# Process results or exceptions and create dependencies
for future in initial_futures:
try:
result = future.result()
follow_up_task.submit(result)
except CustomTaskException as e:
# Pass the future to wait_for to ensure dependency even on failure
follow_up_task.submit(e.data, wait_for=[future])
# Execute the flow
if __name__ == "__main__":
concurrent_flow()
### Key Changes:
- Dependency Management: The follow_up_task
is submitted with the wait_for
parameter, which includes the future
of the initial task. This establishes a dependency line, ensuring the follow-up task waits for the initial task to complete, even if it fails.
- Exception Handling: The exception data is used, and the dependency is maintained via wait_for
.
This setup ensures that the follow-up tasks are dependent on their respective initial tasks, maintaining the desired execution order and dependencies.Koen
03/17/2025, 2:01 PMMarvin
03/17/2025, 2:01 PMwait_for
, you can handle the exception and still submit the follow-up task. Here's how you can achieve this:
python
from prefect import flow, task
from prefect.exceptions import PrefectException
from prefect.futures import PrefectFuture
class CustomTaskException(PrefectException):
def __init__(self, message: str, data: str):
super().__init__(message)
self.data = data
@task
def initial_task(task_id: int) -> str:
if task_id % 2 == 0: # Simulate failure for demonstration
raise CustomTaskException("Task failed", data=f"Data from task {task_id}")
return f"Result from task {task_id}"
@task
def follow_up_task(result_or_exception_data: str) -> None:
print(f"Processing {result_or_exception_data}")
@flow
def concurrent_flow():
# Submit all initial tasks concurrently
initial_futures = [initial_task.submit(i) for i in range(5)]
# Process results or exceptions and create dependencies
for future in initial_futures:
result = None
try:
result = future.result()
except CustomTaskException as e:
result = e.data
finally:
# Ensure follow-up task is submitted with a dependency on the initial task
follow_up_task.submit(result, wait_for=[future])
# Execute the flow
if __name__ == "__main__":
concurrent_flow()
### Explanation:
- Exception Handling: The try-except-finally
structure ensures that the follow-up task is always submitted, even if the initial task fails.
- Dependency Management: The wait_for
parameter is used to create a dependency, ensuring that the follow-up task waits for the initial task to complete, regardless of its success or failure.
- Result Handling: The follow-up task uses either the result of the initial task or the exception data.
This approach guarantees that the follow-up task is always executed and maintains the dependency on the initial task.