zz ht
08/14/2025, 7:12 AMMarvin
08/14/2025, 7:12 AMMarvin
08/14/2025, 7:13 AMflow.serve(...)
, or something else entirely (Azure/Astra/AllegroServe)?
- What “storage” are you referring to—object storage (S3/GCS/Azure Blob), volumes/PVCs, or code storage for deployments?
If you can share the component, version, and a snippet or link, I’ll give you a precise answer instead of guessing in the dark like a particularly bored robot.zz ht
08/14/2025, 7:24 AMMarvin
08/14/2025, 7:25 AMMarvin
08/14/2025, 7:30 AMaserve
runs your deployment on the same machine that created it, so it doesn’t need to package your code anywhere. .deploy()
targets workers and must ship your code, so it requires either a container image or remote code storage.
What’s happening:
- aserve(...)
starts a local runner and registers your deployment. The runner uses a local storage adapter pointing at your working directory. No remote storage is created; nothing is pushed. It just serves what’s already on disk here.
- Flow.deploy(...)
creates a deployment intended to run on workers (K8s, Docker, etc.). Workers can be anywhere, so Prefect requires a way to access your code:
- Provide an image (the code is baked into the container), or
- Provide remote storage (Git/S3/GCS/etc.) that workers can pull from.
Why your two snippets behave differently:
- This fails because there’s no image and no remote source bound to the flow:
deployment_id = await bootstrap.deploy(
name="bootstrap-deployment",
work_pool_name=self.work_pool_name,
description="Bootstrap workflow deployment for Mapping tasks",
tags=["bootstrap"],
version="1.0.0"
)
- This works because aserve
serves the deployment locally; no packaging is needed:
await aserve(
await bootstrap.to_deployment(
name="bootstrap-deployment",
work_pool_name=work_pool_name,
description="Bootstrap workflow deployment (auto-served)",
tags=["bootstrap"]
)
)
How to make .deploy()
work:
- Use a container image:
deployment_id = await bootstrap.deploy(
name="bootstrap-deployment",
work_pool_name="k8s-pool",
image="<http://ghcr.io/acme/bootstrap:1.0.0|ghcr.io/acme/bootstrap:1.0.0>",
build=True, # optional if image already exists
push=True
)
- Or bind remote storage via Flow.from_source(...)
(Git repo example):
from prefect import Flow
flow_from_git = Flow.from_source(
"<https://github.com/acme/prefect-flows?ref=main>",
"flows/bootstrap.py:bootstrap",
)
deployment_id = await flow_from_git.deploy(
name="bootstrap-deployment",
work_pool_name="process-pool",
description="Bootstrap workflow deployment for Mapping tasks",
tags=["bootstrap"],
version="1.0.0",
)
References:
- Create deployments (Prefect 3.x)
- Blog: schedule your code quickly with flow.serve
If you share which work pool/worker type you’re using, I can suggest the most direct image or source setup.zz ht
08/14/2025, 7:37 AMMarvin
08/14/2025, 7:37 AMMarvin
08/14/2025, 7:41 AM/mnt/prefect-repo/myproject
├─ flows/
│ ├─ etl.py # 定义 @flow etl()
│ ├─ train.py # 定义 @flow train()
│ └─ report.py # 定义 @flow report()
└─ src/
└─ mylib/ # 共享模块(建议打成包,便于 import)
├─ __init__.py
└─ utils.py
- 关键点
- 所有 worker(本地/Docker/K8s)都要以相同路径挂载这个目录(例如都能读 `/mnt/prefect-repo/myproject`)。
- 流程内部的 import 用包名(如 `from mylib.utils import ...`),避免相对路径混乱。
- 跨 flow 依赖尽量用 run_deployment
解耦;若是本地“子流”调用,确保代码在同一存储里。
部署多个 flow(共用 LocalFileSystem storage)
from prefect import Flow
from prefect.filesystems import LocalFileSystem
from prefect.deployments import deploy # 用于一次性部署多个
# 指向共享目录
storage = LocalFileSystem(basepath="/mnt/prefect-repo/myproject")
# 将存储和入口绑定到各个 flow
etl_flow = Flow.from_source(storage, "flows/etl.py:etl")
train_flow = Flow.from_source(storage, "flows/train.py:train")
report_flow = Flow.from_source(storage, "flows/report.py:report")
# 生成 RunnerDeployment(可设置 work_pool、标签、版本等)
etl_dep = etl_flow.to_deployment(
name="etl",
work_pool_name="process-pool",
tags=["projA"],
version="2025.08.14",
description="ETL 主流程",
)
train_dep = train_flow.to_deployment(
name="train",
work_pool_name="process-pool",
tags=["projA"],
)
report_dep = report_flow.to_deployment(
name="report",
work_pool_name="process-pool",
tags=["projA"],
)
# 一次性部署多个
deployment_ids = await deploy(etl_dep, train_dep, report_dep)
print(deployment_ids)
跨 flow 调用的两种方式
- 同进程“子流”调用(适合同一存储/同环境下运行,紧耦合)
# flows/etl.py
from prefect import flow
from flows.train import train # 直接 import 子流
@flow
def etl():
# ...
train() # 直接调用
- 跨部署触发(推荐,解耦、可跨基础设施)
# flows/etl.py
from prefect import flow
from prefect.deployments import run_deployment
@flow
def etl():
# ...
run = run_deployment("train/train", parameters={"date": "2025-08-14"})
# 名称格式通常是 "flow_name/deployment_name"
让 LocalFileSystem 真正“远程可读”的注意事项
- 本地进程 worker:确保机器有该路径即可。
- Docker worker:把共享目录挂到容器相同路径。例如在创建/配置 work pool 时的 job_variables 里挂卷(示例值,按你的环境调整):
job_variables = {
"volumes": ["/mnt/prefect-repo:/mnt/prefect-repo:ro"],
"env": {"PYTHONPATH": "/mnt/prefect-repo/myproject/src"},
# 可选: "working_dir": "/mnt/prefect-repo/myproject"
}
- Kubernetes worker:用 PersistentVolume/PersistentVolumeClaim,把共享目录挂载到 Pod 的相同路径;必要时设置 PYTHONPATH
或 `workingDir`。
- 共享代码包:把 src/mylib
当成包,设置 PYTHONPATH
指到 `.../myproject/src`,避免 import 问题。
批量发布的额外建议
- 用同一个 LocalFileSystem(basepath=...)
给所有 flow 供源;这样一处变更,所有部署统一。
- 用版本号(`version=`)和标签(`tags=`)管理多批次发布;可在 UI 中筛选。
- 强依赖链建议用 run_deployment
或 Automations(基于“Flow 运行完成”事件触发下游),降低跨文件 import 的耦合。
- 如果未来要跨机器/云运行,LocalFileSystem 需要网络盘。更通用的是 Git/S3/GCS(`Flow.from_source("https://...")` 等),就不需要挂载卷了。
文档
- Create deployments (Prefect 3.x)
如果你告诉我用的 worker 类型(本地/Docker/K8s)和共享目录怎么挂载的,我可以把挂载与 job_variables
的细节写到可直接复用的模板里。是的,我知道,这比“自动全都搞定”要现实一点点。zz ht
08/15/2025, 2:44 AMaserve
starts the flow in the current process can be debugged. However, when using the class MappingWorker(ProcessWorker)
approach, a new process will be automatically created.
python
async def run(self, flow_run, configuration, task_status=None):
"""Execute the specific workflow task"""
logging.info(f"Processing flow run: {flow_run.id}")
logging.info(f"Flow parameters: {flow_run.parameters}")
# ProcessWorker automatically handles deployment execution
# Here we only need to call the parent class's run method
result = await super().run(flow_run, configuration, task_status)
logging.info(f"Flow run {flow_run.id} completed successfully")
return result
Marvin
08/15/2025, 2:49 AMMarvin
08/15/2025, 2:53 AMflow.serve()
/ flow.aserve()
run flow runs in the current Python process. Great for stepping through with a debugger.
- ProcessWorker
always executes each flow run in a new child process. Subclassing it (e.g., class MappingWorker(ProcessWorker)
) and calling await super().run(...)
keeps that behavior. Your pre/post logs are in the worker process; the flow itself runs in a different Python process.
There isn’t a supported switch to make ProcessWorker
run flows in the same process. That isolation is by design.
Practical debugging options:
- Want true in-process debugging? Use serve/aserve
(or just call the flow function directly).
- Need to go through a work pool/worker but still debug? Attach a remote debugger in the flow process:
import debugpy
debugpy.listen(("0.0.0.0", 5678))
print("Waiting for debugger attach...")
debugpy.wait_for_client()
Then attach your IDE to port 5678 before the flow continues.
- Improve visibility: set PREFECT_LOGGING_LEVEL=DEBUG
and consider PREFECT_DEBUG_MODE=1
to get richer tracebacks.
Docs for local-process execution are here: Run flows in local processes.
If your goal is “worker-like” queue consumption but without subprocesses, say so—there may be a different pattern that fits, but it won’t be ProcessWorker
.