<@ULVA73B9P> generate a demo flow where a 5 tasks ...
# ask-marvin
k
@Marvin generate a demo flow where a 5 tasks are being called concurrently. Then 5 other tasks are being called with the result of the first tasks. Once a task is started, their follow up task must also be started.
m
To create a flow in Prefect 3.x where tasks are executed concurrently and follow-up tasks depend on their predecessors, you can utilize the
@flow
and
@task
decorators. Here's a concise example that aligns with your requirements:
Copy code
python
from prefect import flow, task
from prefect.futures import PrefectFuture

@task
def initial_task(task_id: int) -> str:
    return f"Result from task {task_id}"

@task
def follow_up_task(result: str) -> None:
    print(f"Processing {result}")

@flow
def concurrent_flow():
    # Start initial tasks concurrently
    initial_results = [initial_task.submit(i) for i in range(5)]

    # Start follow-up tasks as soon as their corresponding initial task completes
    follow_up_results = [follow_up_task.submit(result) for result in initial_results]

# Execute the flow
if __name__ == "__main__":
    concurrent_flow()
### Explanation: 1. Task Execution: The
initial_task
produces a result, which is then immediately consumed by
follow_up_task
. 2. Concurrency: The
.submit()
method is used to execute tasks concurrently. 3. Dependencies: Each
follow_up_task
waits for its corresponding
initial_task
to complete, ensuring that tasks are executed in pairs. For more details on task concurrency and dependencies in Prefect 3.x, refer to the Prefect documentation.
k
@Marvin alter the code so that the follow up task will still launch even if the inital task failed. In the initial task, a custom exception can be thrown that has an added data component. The follow up task uses either the result of a completed inital task or the data of the exception in case the initial task failed
m
To ensure that the follow-up task executes even if the initial task fails, you can define a custom exception with additional data and handle it appropriately. Here's how you can modify your flow:
Copy code
python
from prefect import flow, task
from prefect.exceptions import PrefectException

class CustomTaskException(PrefectException):
    def __init__(self, message: str, data: str):
        super().__init__(message)
        self.data = data

@task
def initial_task(task_id: int) -> str:
    if task_id % 2 == 0:  # Simulate failure for demonstration
        raise CustomTaskException("Task failed", data=f"Data from task {task_id}")
    return f"Result from task {task_id}"

@task
def follow_up_task(result_or_exception_data: str) -> None:
    print(f"Processing {result_or_exception_data}")

@flow
def concurrent_flow():
    initial_results = []
    for i in range(5):
        try:
            result = initial_task.submit(i).result()
        except CustomTaskException as e:
            result = e.data
        initial_results.append(result)

    follow_up_results = [follow_up_task.submit(result) for result in initial_results]

# Execute the flow
if __name__ == "__main__":
    concurrent_flow()
### Key Points: - Custom Exception:
CustomTaskException
includes a
data
attribute to store additional information. - Error Handling: The
try-except
block in the flow handles exceptions, allowing the flow to continue and use the
data
from the exception. - Task Execution: The follow-up task uses either the result of a successful task or the exception's data. This approach ensures that the follow-up tasks are executed regardless of the initial task's success, using the exception's data if necessary.
k
@Marvin when running your latest code snippet, the initial tasks are no longer starting concurrently
m
To ensure that the initial tasks start concurrently, you can submit all tasks first and then handle the results or exceptions. Here's the corrected approach:
Copy code
python
from prefect import flow, task
from prefect.exceptions import PrefectException
from prefect.futures import PrefectFuture

class CustomTaskException(PrefectException):
    def __init__(self, message: str, data: str):
        super().__init__(message)
        self.data = data

@task
def initial_task(task_id: int) -> str:
    if task_id % 2 == 0:  # Simulate failure for demonstration
        raise CustomTaskException("Task failed", data=f"Data from task {task_id}")
    return f"Result from task {task_id}"

@task
def follow_up_task(result_or_exception_data: str) -> None:
    print(f"Processing {result_or_exception_data}")

@flow
def concurrent_flow():
    # Submit all initial tasks concurrently
    initial_futures = [initial_task.submit(i) for i in range(5)]

    # Process results or exceptions
    for future in initial_futures:
        try:
            result = future.result()
        except CustomTaskException as e:
            result = e.data

        # Submit follow-up task
        follow_up_task.submit(result)

# Execute the flow
if __name__ == "__main__":
    concurrent_flow()
### Explanation: - Concurrent Execution: All initial tasks are submitted first using
.submit()
, ensuring they run concurrently. - Result Handling: After submission, results or exceptions are handled in a loop. - Follow-up Task Submission: Follow-up tasks are submitted immediately after handling the result or exception of each initial task. This ensures that the initial tasks execute concurrently, and follow-up tasks are initiated as soon as their corresponding initial task completes.
k
@Marvin can you alter the try_catch so that when the initial task raised the exception, the task result is passed to the wait_for of the follow up task to create a dependency line ?
m
To create a dependency using the
wait_for
parameter in Prefect, you can pass the
PrefectFuture
of the initial task to the follow-up task. Here's how you can modify the code:
Copy code
python
from prefect import flow, task
from prefect.exceptions import PrefectException
from prefect.futures import PrefectFuture

class CustomTaskException(PrefectException):
    def __init__(self, message: str, data: str):
        super().__init__(message)
        self.data = data

@task
def initial_task(task_id: int) -> str:
    if task_id % 2 == 0:  # Simulate failure for demonstration
        raise CustomTaskException("Task failed", data=f"Data from task {task_id}")
    return f"Result from task {task_id}"

@task
def follow_up_task(result_or_exception_data: str) -> None:
    print(f"Processing {result_or_exception_data}")

@flow
def concurrent_flow():
    # Submit all initial tasks concurrently
    initial_futures = [initial_task.submit(i) for i in range(5)]

    # Process results or exceptions and create dependencies
    for future in initial_futures:
        try:
            result = future.result()
            follow_up_task.submit(result)
        except CustomTaskException as e:
            # Pass the future to wait_for to ensure dependency even on failure
            follow_up_task.submit(e.data, wait_for=[future])

# Execute the flow
if __name__ == "__main__":
    concurrent_flow()
### Key Changes: - Dependency Management: The
follow_up_task
is submitted with the
wait_for
parameter, which includes the
future
of the initial task. This establishes a dependency line, ensuring the follow-up task waits for the initial task to complete, even if it fails. - Exception Handling: The exception data is used, and the dependency is maintained via
wait_for
. This setup ensures that the follow-up tasks are dependent on their respective initial tasks, maintaining the desired execution order and dependencies.
k
@Marvin in your current code the follow_up task is no longer started if the initial task fails
m
To ensure that the follow-up task is executed even when the initial task fails, while maintaining a dependency using
wait_for
, you can handle the exception and still submit the follow-up task. Here's how you can achieve this:
Copy code
python
from prefect import flow, task
from prefect.exceptions import PrefectException
from prefect.futures import PrefectFuture

class CustomTaskException(PrefectException):
    def __init__(self, message: str, data: str):
        super().__init__(message)
        self.data = data

@task
def initial_task(task_id: int) -> str:
    if task_id % 2 == 0:  # Simulate failure for demonstration
        raise CustomTaskException("Task failed", data=f"Data from task {task_id}")
    return f"Result from task {task_id}"

@task
def follow_up_task(result_or_exception_data: str) -> None:
    print(f"Processing {result_or_exception_data}")

@flow
def concurrent_flow():
    # Submit all initial tasks concurrently
    initial_futures = [initial_task.submit(i) for i in range(5)]

    # Process results or exceptions and create dependencies
    for future in initial_futures:
        result = None
        try:
            result = future.result()
        except CustomTaskException as e:
            result = e.data
        finally:
            # Ensure follow-up task is submitted with a dependency on the initial task
            follow_up_task.submit(result, wait_for=[future])

# Execute the flow
if __name__ == "__main__":
    concurrent_flow()
### Explanation: - Exception Handling: The
try-except-finally
structure ensures that the follow-up task is always submitted, even if the initial task fails. - Dependency Management: The
wait_for
parameter is used to create a dependency, ensuring that the follow-up task waits for the initial task to complete, regardless of its success or failure. - Result Handling: The follow-up task uses either the result of the initial task or the exception data. This approach guarantees that the follow-up task is always executed and maintains the dependency on the initial task.