Pascal Jahan Elahi (Pawsey)
12/13/2024, 7:59 AMMarvin
12/13/2024, 8:00 AMPascal Jahan Elahi (Pawsey)
12/13/2024, 8:00 AMMarvin
12/13/2024, 8:00 AMSequentialTaskRunnerpython
from prefect import flow, task, get_run_logger
from prefect.task_runners import SequentialTaskRunner
@task
def example_task():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Task is running on SequentialTaskRunner")
    # Simulated task logic
    <http://logger.info|logger.info>("Completing task...")
@flow(task_runner=SequentialTaskRunner())
def my_flow():
    example_task()
my_flow()SequentialTaskRunnerPascal Jahan Elahi (Pawsey)
12/13/2024, 8:02 AMsubflow.with_options(task_runner = …)Marvin
12/13/2024, 8:02 AMsubflow.with_optionspython
from prefect import flow, task, get_run_logger
from prefect.task_runners import DaskTaskRunner, SequentialTaskRunner
# Define a subflow with dynamic task runner
@flow
def subflow():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Running subflow")
    # Simulate logic to determine task runner
    task_runner_type = type(subflow.task_runner).__name__  # Fetch task runner type dynamically
    <http://logger.info|logger.info>(f"Using task runner: {task_runner_type}")
    my_task()
@task
def my_task():
    print("Task is running...")
@flow
def parent_flow():
    # Dynamically assign task runner to subflow
    subflow.with_options(task_runner=DaskTaskRunner())()
    subflow.with_options(task_runner=SequentialTaskRunner())()
parent_flow()subflow.task_runner