Mika
09/15/2025, 12:37 PMMarvin
09/15/2025, 12:37 PMMarvin
09/15/2025, 12:38 PMfrom prefect.flows import Flow
from prefect import get_run_logger
def greet(name: str, repeat: int = 1) -> str:
logger = get_run_logger()
message = f"Hello, {name}!"
for i in range(repeat):
<http://logger.info|logger.info>(f"Greeting {i + 1} of {repeat}: {message}")
return message
# Wrap the function with Prefect's Flow class
greet_flow = Flow(
greet,
name="greet-demo",
log_prints=True, # capture any prints to Prefect logs
retries=2, # example: retry the whole flow if it fails
retry_delay_seconds=5, # delay between retries
)
if __name__ == "__main__":
# This call enters the Prefect engine and runs the flow
result = greet_flow("Marvin", repeat=2)
print("Flow result:", result)
Notes:
- Flow(fn, ...)
turns your function into a Prefect flow; calling it (e.g., greet_flow("Marvin")
) executes via the Prefect engine.
- You can still structure your code with plain helper functions inside the wrapped function—no tasks required.
- Add options like timeout_seconds
, description
, flow_run_name
, or a custom task_runner
if you need concurrency or additional behavior.Mika
09/15/2025, 12:40 PMMarvin
09/15/2025, 12:41 PMMarvin
09/15/2025, 12:42 PMfrom prefect.flows import Flow
from prefect.tasks import Task
from prefect import get_run_logger
# Plain Python functions
def add(x: int, y: int) -> int:
logger = get_run_logger()
total = x + y
<http://logger.info|logger.info>(f"add: {x} + {y} = {total}")
return total
def multiply(x: int, y: int) -> int:
logger = get_run_logger()
product = x * y
<http://logger.info|logger.info>(f"multiply: {x} * {y} = {product}")
return product
# Wrap them as Prefect Tasks (no decorator)
add_task = Task(
add,
name="add-task",
retries=2,
retry_delay_seconds=1,
log_prints=True,
)
multiply_task = Task(
multiply,
name="multiply-task",
log_prints=True,
)
# Orchestrating function that will be wrapped as a Prefect Flow
def pipeline(x: int, y: int) -> int:
logger = get_run_logger()
# Submit tasks for concurrent execution
a = add_task.submit(x, y)
b = multiply_task.submit(x, y)
# Compose futures; Prefect handles dependencies automatically
c = multiply_task.submit(a, b)
result = c.result() # wait for final task to finish
<http://logger.info|logger.info>(f"pipeline result: {result}")
return result
# Wrap the orchestrator with Prefect's Flow class (engine entrypoint)
pipeline_flow = Flow(
pipeline,
name="task-demo-pipeline",
log_prints=True,
)
if __name__ == "__main__":
final = pipeline_flow(2, 3)
print("Flow result:", final)
Notes:
- Tasks must run within a flow context; Task.submit(...)
returns a PrefectFuture, and calling .result()
waits for the task to finish.
- You can also call a task like a function inside a flow (e.g., add_task(x, y)
) for sequential execution; .submit(...)
enables concurrency.Mika
09/16/2025, 11:16 AMMarvin
09/16/2025, 11:16 AMMarvin
09/16/2025, 11:21 AMfrom enum import Enum
from prefect import flow, task
@task
def add(x: int, y: int) -> int:
return x + y
@task
def multiply(x: int, y: int) -> int:
return x * y
class Op(str, Enum):
add = "add"
multiply = "multiply"
OPS = {
Op.add: add,
Op.multiply: multiply,
}
@flow
def pipeline(x: int, y: int, op: Op = Op.add) -> int:
fn = OPS[op]
return fn(x, y)
if __name__ == "__main__":
pipeline(2, 3, op=Op.multiply)
Alternative: pass a dotted path and import dynamically (only if you need full dynamism):
import importlib
def resolve_callable(path: str):
# supports "package.module:function"
mod_path, func_name = path.rsplit(":", 1) if ":" in path else path.rsplit(".", 1)
mod = importlib.import_module(mod_path)
return getattr(mod, func_name)
@flow
def pipeline(x: int, y: int, op_path: str = "my_module:add") -> int:
fn = resolve_callable(op_path)
return fn(x, y)
A couple of tips:
- Keep parameters JSON-serializable (numbers, strings, lists, dicts, Enums-as-strings). Avoid passing functions or complex objects as parameters.
- If you want different defaults per environment, create separate deployments that set the op
parameter accordingly.
Could you share:
- Your Prefect version (2.x vs 3.x)
- How you’re running (local script vs deployment/worker)
- A minimal snippet showing where you pass `add`/`multiply`/`pipeline` as parameters
That will help me confirm the exact cause and tailor the fix.Mika
09/16/2025, 11:23 AMMarvin
09/16/2025, 11:23 AMMarvin
09/16/2025, 11:24 AMfrom enum import Enum
from typing import Dict, Callable, Union
def add(x: int, y: int) -> int:
return x + y
def multiply(x: int, y: int) -> int:
return x * y
class Op(str, Enum):
add = "add"
multiply = "multiply"
OPS: Dict[Op, Callable[[int, int], int]] = {
Op.add: add,
Op.multiply: multiply,
}
def pipeline(x: int, y: int, op: Union[Op, str] = Op.add) -> int:
# normalize strings to the Enum
if isinstance(op, str):
op = Op(op)
fn = OPS[op] # select by a simple, serializable parameter
return fn(x, y)
if __name__ == "__main__":
print(pipeline(2, 3)) # 5 (default add)
print(pipeline(2, 3, op="multiply")) # 6
2) Use a dotted import path string and resolve it at runtime (for dynamic/extensible choices)
import importlib
from typing import Callable
def resolve_callable(path: str) -> Callable:
# Supports "package.module:function" or "package.module.function"
if ":" in path:
mod_path, func_name = path.rsplit(":", 1)
else:
mod_path, func_name = path.rsplit(".", 1)
mod = importlib.import_module(mod_path)
return getattr(mod, func_name)
def pipeline(x: int, y: int, op_path: str = "my_module:add") -> int:
fn = resolve_callable(op_path)
return fn(x, y)
if __name__ == "__main__":
# Assuming my_module.py defines add/multiply
print(pipeline(2, 3, op_path="my_module:add")) # 5
print(pipeline(2, 3, op_path="my_module:multiply")) # 6
Both approaches keep parameters JSON-serializable (strings/Enums), which avoids passing raw functions and eliminates that type-checking/serialization issue. If you later decide to run this under Prefect, you can keep these patterns as-is; just wrap your entry point in a flow later if needed.Mika
09/16/2025, 11:30 AMMarvin
09/16/2025, 11:31 AMMarvin
09/16/2025, 11:31 AMfrom typing import Dict
from prefect.tasks import Task
from prefect.flows import Flow
def add(x: int, y: int) -> int:
return x + y
def multiply(x: int, y: int) -> int:
return x * y
# Wrap functions as Tasks
add_task = Task(add, name="add")
multiply_task = Task(multiply, name="multiply")
# Map a simple, serializable selector to Task objects
OPS: Dict[str, Task] = {
"add": add_task,
"multiply": multiply_task,
}
def pipeline(x: int, y: int, op: str = "add") -> int:
key = op.lower()
if key not in OPS:
raise ValueError(f"Unsupported operation '{op}'. Choose from: {list(OPS.keys())}")
selected_task = OPS[key]
return selected_task(x, y)
# Wrap the pipeline function as a Flow
pipeline_flow = Flow(pipeline, name="pipeline")
if __name__ == "__main__":
print(pipeline_flow(2, 3)) # 5 (default add)
print(pipeline_flow(2, 3, op="multiply")) # 6
2) Dotted import path string resolved inside a Task (for dynamic/extensible choices)
import importlib
from prefect.tasks import Task
from prefect.flows import Flow
def apply_binary_op(x: int, y: int, op_path: str) -> int:
# Supports "package.module:function" or "package.module.function"
if ":" in op_path:
mod_path, func_name = op_path.rsplit(":", 1)
else:
mod_path, func_name = op_path.rsplit(".", 1)
mod = importlib.import_module(mod_path)
fn = getattr(mod, func_name)
return fn(x, y)
# Wrap resolver as a Task
apply_binary_op_task = Task(apply_binary_op, name="apply_binary_op")
def pipeline_dynamic(x: int, y: int, op_path: str = "my_module:add") -> int:
return apply_binary_op_task(x, y, op_path)
# Wrap as a Flow
pipeline_dynamic_flow = Flow(pipeline_dynamic, name="pipeline_dynamic")
if __name__ == "__main__":
# Assuming my_module.py defines add/multiply
print(pipeline_dynamic_flow(2, 3, op_path="my_module:add")) # 5
print(pipeline_dynamic_flow(2, 3, op_path="my_module:multiply")) # 6
These avoid passing raw functions as parameters (which triggers that type-checking/serialization issue) and instead use strings, keeping parameters deployment-friendly.