<@ULVA73B9P> Give me a demo flow that makes use of...
# ask-marvin
m
@Marvin Give me a demo flow that makes use of the Python Prefect Flow class, to wraps a function with an entrypoint to the Prefect engine. Without making use of the @task decorator.
1
m
thought for 64.1 seconds
Here’s a minimal demo that uses the Prefect 3.x Flow class to wrap a plain Python function and run it with the Prefect engine—no @task decorators involved.
Copy code
from prefect.flows import Flow
from prefect import get_run_logger

def greet(name: str, repeat: int = 1) -> str:
    logger = get_run_logger()
    message = f"Hello, {name}!"
    for i in range(repeat):
        <http://logger.info|logger.info>(f"Greeting {i + 1} of {repeat}: {message}")
    return message

# Wrap the function with Prefect's Flow class
greet_flow = Flow(
    greet,
    name="greet-demo",
    log_prints=True,          # capture any prints to Prefect logs
    retries=2,                # example: retry the whole flow if it fails
    retry_delay_seconds=5,    # delay between retries
)

if __name__ == "__main__":
    # This call enters the Prefect engine and runs the flow
    result = greet_flow("Marvin", repeat=2)
    print("Flow result:", result)
Notes: -
Flow(fn, ...)
turns your function into a Prefect flow; calling it (e.g.,
greet_flow("Marvin")
) executes via the Prefect engine. - You can still structure your code with plain helper functions inside the wrapped function—no tasks required. - Add options like
timeout_seconds
,
description
,
flow_run_name
, or a custom
task_runner
if you need concurrency or additional behavior.
m
@Marvin Please also give me an example that makes use of the Python Prefect Task class to wraps a function with an entrypoint to the Prefect engine. Without making use of the @task decorator.
m
thought for 90.8 seconds
Great idea. Here’s a minimal Prefect 3.x example that uses the Task class directly (no @task decorator) and runs via a Flow entrypoint.
Copy code
from prefect.flows import Flow
from prefect.tasks import Task
from prefect import get_run_logger

# Plain Python functions
def add(x: int, y: int) -> int:
    logger = get_run_logger()
    total = x + y
    <http://logger.info|logger.info>(f"add: {x} + {y} = {total}")
    return total

def multiply(x: int, y: int) -> int:
    logger = get_run_logger()
    product = x * y
    <http://logger.info|logger.info>(f"multiply: {x} * {y} = {product}")
    return product

# Wrap them as Prefect Tasks (no decorator)
add_task = Task(
    add,
    name="add-task",
    retries=2,
    retry_delay_seconds=1,
    log_prints=True,
)

multiply_task = Task(
    multiply,
    name="multiply-task",
    log_prints=True,
)

# Orchestrating function that will be wrapped as a Prefect Flow
def pipeline(x: int, y: int) -> int:
    logger = get_run_logger()
    # Submit tasks for concurrent execution
    a = add_task.submit(x, y)
    b = multiply_task.submit(x, y)
    # Compose futures; Prefect handles dependencies automatically
    c = multiply_task.submit(a, b)
    result = c.result()  # wait for final task to finish
    <http://logger.info|logger.info>(f"pipeline result: {result}")
    return result

# Wrap the orchestrator with Prefect's Flow class (engine entrypoint)
pipeline_flow = Flow(
    pipeline,
    name="task-demo-pipeline",
    log_prints=True,
)

if __name__ == "__main__":
    final = pipeline_flow(2, 3)
    print("Flow result:", final)
Notes: - Tasks must run within a flow context;
Task.submit(...)
returns a PrefectFuture, and calling
.result()
waits for the task to finish. - You can also call a task like a function inside a flow (e.g.,
add_task(x, y)
) for sequential execution;
.submit(...)
enables concurrency.
m
@Marvin Add, multiply and pipeline parameters give the following error: "Expected type 'Union[classmethod, staticmethod]' (matched generic type 'Union[classmethod[Any, Any, R], staticmethod[Any, R]]'), got '(x: int, y: int) -> int' instead"
m
thought for 262.5 seconds
Thanks for the details. That error typically appears when a callable (like your `add`/`multiply` functions) is being passed where the system expects something serializable or a special method type — and in Prefect 3.x, flow/run parameters must be JSON-serializable. Passing raw functions as parameters will either trip a type checker or fail at runtime when Prefect tries to serialize parameters for the API/UI. Recommended patterns to choose behavior at runtime without passing functions directly: - Pass a simple selector (string/Enum) and map it to functions inside your code - Use branching logic in the flow based on a simple parameter - If you truly need dynamic callables, pass a dotted import path string and resolve it at runtime (ensuring it’s importable in your worker environment) Example using an Enum selector (safe and deploy-friendly):
Copy code
from enum import Enum
from prefect import flow, task

@task
def add(x: int, y: int) -> int:
    return x + y

@task
def multiply(x: int, y: int) -> int:
    return x * y

class Op(str, Enum):
    add = "add"
    multiply = "multiply"

OPS = {
    Op.add: add,
    Op.multiply: multiply,
}

@flow
def pipeline(x: int, y: int, op: Op = Op.add) -> int:
    fn = OPS[op]
    return fn(x, y)

if __name__ == "__main__":
    pipeline(2, 3, op=Op.multiply)
Alternative: pass a dotted path and import dynamically (only if you need full dynamism):
Copy code
import importlib

def resolve_callable(path: str):
    # supports "package.module:function"
    mod_path, func_name = path.rsplit(":", 1) if ":" in path else path.rsplit(".", 1)
    mod = importlib.import_module(mod_path)
    return getattr(mod, func_name)

@flow
def pipeline(x: int, y: int, op_path: str = "my_module:add") -> int:
    fn = resolve_callable(op_path)
    return fn(x, y)
A couple of tips: - Keep parameters JSON-serializable (numbers, strings, lists, dicts, Enums-as-strings). Avoid passing functions or complex objects as parameters. - If you want different defaults per environment, create separate deployments that set the
op
parameter accordingly. Could you share: - Your Prefect version (2.x vs 3.x) - How you’re running (local script vs deployment/worker) - A minimal snippet showing where you pass `add`/`multiply`/`pipeline` as parameters That will help me confirm the exact cause and tailor the fix.
m
@Marvin could you change the code in your last response to not make use of the decorators? While keeping a fix for the "Expected type 'Union[classmethod, staticmethod]' (matched generic type 'Union[classmethod[Any, Any, R], staticmethod[Any, R]]'), got '(x: int, y: int) -> int' instead" error.
m
thought for 28.9 seconds
Absolutely — here are two versions without any decorators, while avoiding passing callables as parameters (so you won’t hit that “Expected type 'Union[classmethod, staticmethod]' … got '(x: int, y: int) -> int'” type-checking error). 1) Use an Enum/string selector mapped to functions (simple and safe)
Copy code
from enum import Enum
from typing import Dict, Callable, Union

def add(x: int, y: int) -> int:
    return x + y

def multiply(x: int, y: int) -> int:
    return x * y

class Op(str, Enum):
    add = "add"
    multiply = "multiply"

OPS: Dict[Op, Callable[[int, int], int]] = {
    Op.add: add,
    Op.multiply: multiply,
}

def pipeline(x: int, y: int, op: Union[Op, str] = Op.add) -> int:
    # normalize strings to the Enum
    if isinstance(op, str):
        op = Op(op)

    fn = OPS[op]  # select by a simple, serializable parameter
    return fn(x, y)

if __name__ == "__main__":
    print(pipeline(2, 3))                 # 5 (default add)
    print(pipeline(2, 3, op="multiply"))  # 6
2) Use a dotted import path string and resolve it at runtime (for dynamic/extensible choices)
Copy code
import importlib
from typing import Callable

def resolve_callable(path: str) -> Callable:
    # Supports "package.module:function" or "package.module.function"
    if ":" in path:
        mod_path, func_name = path.rsplit(":", 1)
    else:
        mod_path, func_name = path.rsplit(".", 1)
    mod = importlib.import_module(mod_path)
    return getattr(mod, func_name)

def pipeline(x: int, y: int, op_path: str = "my_module:add") -> int:
    fn = resolve_callable(op_path)
    return fn(x, y)

if __name__ == "__main__":
    # Assuming my_module.py defines add/multiply
    print(pipeline(2, 3, op_path="my_module:add"))       # 5
    print(pipeline(2, 3, op_path="my_module:multiply"))  # 6
Both approaches keep parameters JSON-serializable (strings/Enums), which avoids passing raw functions and eliminates that type-checking/serialization issue. If you later decide to run this under Prefect, you can keep these patterns as-is; just wrap your entry point in a flow later if needed.
m
@Marvin could you change the code in your last response to make use of the Flow and Task classes? While still not making use of the decorators? While keeping a fix for the "Expected type 'Union[classmethod, staticmethod]' (matched generic type 'Union[classmethod[Any, Any, R], staticmethod[Any, R]]'), got '(x: int, y: int) -> int' instead" error.
m
thought for 51.5 seconds
Absolutely — here are two versions using the Flow and Task classes (no decorators), while keeping parameters JSON-serializable and avoiding passing callables directly. 1) Selector string mapped to Task objects
Copy code
from typing import Dict
from prefect.tasks import Task
from prefect.flows import Flow

def add(x: int, y: int) -> int:
    return x + y

def multiply(x: int, y: int) -> int:
    return x * y

# Wrap functions as Tasks
add_task = Task(add, name="add")
multiply_task = Task(multiply, name="multiply")

# Map a simple, serializable selector to Task objects
OPS: Dict[str, Task] = {
    "add": add_task,
    "multiply": multiply_task,
}

def pipeline(x: int, y: int, op: str = "add") -> int:
    key = op.lower()
    if key not in OPS:
        raise ValueError(f"Unsupported operation '{op}'. Choose from: {list(OPS.keys())}")
    selected_task = OPS[key]
    return selected_task(x, y)

# Wrap the pipeline function as a Flow
pipeline_flow = Flow(pipeline, name="pipeline")

if __name__ == "__main__":
    print(pipeline_flow(2, 3))                 # 5 (default add)
    print(pipeline_flow(2, 3, op="multiply"))  # 6
2) Dotted import path string resolved inside a Task (for dynamic/extensible choices)
Copy code
import importlib
from prefect.tasks import Task
from prefect.flows import Flow

def apply_binary_op(x: int, y: int, op_path: str) -> int:
    # Supports "package.module:function" or "package.module.function"
    if ":" in op_path:
        mod_path, func_name = op_path.rsplit(":", 1)
    else:
        mod_path, func_name = op_path.rsplit(".", 1)
    mod = importlib.import_module(mod_path)
    fn = getattr(mod, func_name)
    return fn(x, y)

# Wrap resolver as a Task
apply_binary_op_task = Task(apply_binary_op, name="apply_binary_op")

def pipeline_dynamic(x: int, y: int, op_path: str = "my_module:add") -> int:
    return apply_binary_op_task(x, y, op_path)

# Wrap as a Flow
pipeline_dynamic_flow = Flow(pipeline_dynamic, name="pipeline_dynamic")

if __name__ == "__main__":
    # Assuming my_module.py defines add/multiply
    print(pipeline_dynamic_flow(2, 3, op_path="my_module:add"))       # 5
    print(pipeline_dynamic_flow(2, 3, op_path="my_module:multiply"))  # 6
These avoid passing raw functions as parameters (which triggers that type-checking/serialization issue) and instead use strings, keeping parameters deployment-friendly.