zz ht
08/14/2025, 7:12 AMMarvin
08/14/2025, 7:12 AMMarvin
08/14/2025, 7:13 AMflow.serve(...), or something else entirely (Azure/Astra/AllegroServe)?
- What “storage” are you referring to—object storage (S3/GCS/Azure Blob), volumes/PVCs, or code storage for deployments?
If you can share the component, version, and a snippet or link, I’ll give you a precise answer instead of guessing in the dark like a particularly bored robot.zz ht
08/14/2025, 7:24 AMMarvin
08/14/2025, 7:25 AMMarvin
08/14/2025, 7:30 AMaserve runs your deployment on the same machine that created it, so it doesn’t need to package your code anywhere. .deploy() targets workers and must ship your code, so it requires either a container image or remote code storage.
What’s happening:
- aserve(...) starts a local runner and registers your deployment. The runner uses a local storage adapter pointing at your working directory. No remote storage is created; nothing is pushed. It just serves what’s already on disk here.
- Flow.deploy(...) creates a deployment intended to run on workers (K8s, Docker, etc.). Workers can be anywhere, so Prefect requires a way to access your code:
- Provide an image (the code is baked into the container), or
- Provide remote storage (Git/S3/GCS/etc.) that workers can pull from.
Why your two snippets behave differently:
- This fails because there’s no image and no remote source bound to the flow:
deployment_id = await bootstrap.deploy(
name="bootstrap-deployment",
work_pool_name=self.work_pool_name,
description="Bootstrap workflow deployment for Mapping tasks",
tags=["bootstrap"],
version="1.0.0"
)
- This works because aserve serves the deployment locally; no packaging is needed:
await aserve(
await bootstrap.to_deployment(
name="bootstrap-deployment",
work_pool_name=work_pool_name,
description="Bootstrap workflow deployment (auto-served)",
tags=["bootstrap"]
)
)
How to make .deploy() work:
- Use a container image:
deployment_id = await bootstrap.deploy(
name="bootstrap-deployment",
work_pool_name="k8s-pool",
image="<http://ghcr.io/acme/bootstrap:1.0.0|ghcr.io/acme/bootstrap:1.0.0>",
build=True, # optional if image already exists
push=True
)
- Or bind remote storage via Flow.from_source(...) (Git repo example):
from prefect import Flow
flow_from_git = Flow.from_source(
"<https://github.com/acme/prefect-flows?ref=main>",
"flows/bootstrap.py:bootstrap",
)
deployment_id = await flow_from_git.deploy(
name="bootstrap-deployment",
work_pool_name="process-pool",
description="Bootstrap workflow deployment for Mapping tasks",
tags=["bootstrap"],
version="1.0.0",
)
References:
- Create deployments (Prefect 3.x)
- Blog: schedule your code quickly with flow.serve
If you share which work pool/worker type you’re using, I can suggest the most direct image or source setup.zz ht
08/14/2025, 7:37 AMMarvin
08/14/2025, 7:37 AMMarvin
08/14/2025, 7:41 AM/mnt/prefect-repo/myproject
├─ flows/
│ ├─ etl.py # 定义 @flow etl()
│ ├─ train.py # 定义 @flow train()
│ └─ report.py # 定义 @flow report()
└─ src/
└─ mylib/ # 共享模块(建议打成包,便于 import)
├─ __init__.py
└─ utils.py
- 关键点
- 所有 worker(本地/Docker/K8s)都要以相同路径挂载这个目录(例如都能读 `/mnt/prefect-repo/myproject`)。
- 流程内部的 import 用包名(如 `from mylib.utils import ...`),避免相对路径混乱。
- 跨 flow 依赖尽量用 run_deployment 解耦;若是本地“子流”调用,确保代码在同一存储里。
部署多个 flow(共用 LocalFileSystem storage)
from prefect import Flow
from prefect.filesystems import LocalFileSystem
from prefect.deployments import deploy # 用于一次性部署多个
# 指向共享目录
storage = LocalFileSystem(basepath="/mnt/prefect-repo/myproject")
# 将存储和入口绑定到各个 flow
etl_flow = Flow.from_source(storage, "flows/etl.py:etl")
train_flow = Flow.from_source(storage, "flows/train.py:train")
report_flow = Flow.from_source(storage, "flows/report.py:report")
# 生成 RunnerDeployment(可设置 work_pool、标签、版本等)
etl_dep = etl_flow.to_deployment(
name="etl",
work_pool_name="process-pool",
tags=["projA"],
version="2025.08.14",
description="ETL 主流程",
)
train_dep = train_flow.to_deployment(
name="train",
work_pool_name="process-pool",
tags=["projA"],
)
report_dep = report_flow.to_deployment(
name="report",
work_pool_name="process-pool",
tags=["projA"],
)
# 一次性部署多个
deployment_ids = await deploy(etl_dep, train_dep, report_dep)
print(deployment_ids)
跨 flow 调用的两种方式
- 同进程“子流”调用(适合同一存储/同环境下运行,紧耦合)
# flows/etl.py
from prefect import flow
from flows.train import train # 直接 import 子流
@flow
def etl():
# ...
train() # 直接调用
- 跨部署触发(推荐,解耦、可跨基础设施)
# flows/etl.py
from prefect import flow
from prefect.deployments import run_deployment
@flow
def etl():
# ...
run = run_deployment("train/train", parameters={"date": "2025-08-14"})
# 名称格式通常是 "flow_name/deployment_name"
让 LocalFileSystem 真正“远程可读”的注意事项
- 本地进程 worker:确保机器有该路径即可。
- Docker worker:把共享目录挂到容器相同路径。例如在创建/配置 work pool 时的 job_variables 里挂卷(示例值,按你的环境调整):
job_variables = {
"volumes": ["/mnt/prefect-repo:/mnt/prefect-repo:ro"],
"env": {"PYTHONPATH": "/mnt/prefect-repo/myproject/src"},
# 可选: "working_dir": "/mnt/prefect-repo/myproject"
}
- Kubernetes worker:用 PersistentVolume/PersistentVolumeClaim,把共享目录挂载到 Pod 的相同路径;必要时设置 PYTHONPATH 或 `workingDir`。
- 共享代码包:把 src/mylib 当成包,设置 PYTHONPATH 指到 `.../myproject/src`,避免 import 问题。
批量发布的额外建议
- 用同一个 LocalFileSystem(basepath=...) 给所有 flow 供源;这样一处变更,所有部署统一。
- 用版本号(`version=`)和标签(`tags=`)管理多批次发布;可在 UI 中筛选。
- 强依赖链建议用 run_deployment 或 Automations(基于“Flow 运行完成”事件触发下游),降低跨文件 import 的耦合。
- 如果未来要跨机器/云运行,LocalFileSystem 需要网络盘。更通用的是 Git/S3/GCS(`Flow.from_source("https://...")` 等),就不需要挂载卷了。
文档
- Create deployments (Prefect 3.x)
如果你告诉我用的 worker 类型(本地/Docker/K8s)和共享目录怎么挂载的,我可以把挂载与 job_variables 的细节写到可直接复用的模板里。是的,我知道,这比“自动全都搞定”要现实一点点。zz ht
08/15/2025, 2:44 AMaserve starts the flow in the current process can be debugged. However, when using the class MappingWorker(ProcessWorker) approach, a new process will be automatically created.
python
async def run(self, flow_run, configuration, task_status=None):
"""Execute the specific workflow task"""
logging.info(f"Processing flow run: {flow_run.id}")
logging.info(f"Flow parameters: {flow_run.parameters}")
# ProcessWorker automatically handles deployment execution
# Here we only need to call the parent class's run method
result = await super().run(flow_run, configuration, task_status)
logging.info(f"Flow run {flow_run.id} completed successfully")
return resultMarvin
08/15/2025, 2:49 AMMarvin
08/15/2025, 2:53 AMflow.serve() / flow.aserve() run flow runs in the current Python process. Great for stepping through with a debugger.
- ProcessWorker always executes each flow run in a new child process. Subclassing it (e.g., class MappingWorker(ProcessWorker)) and calling await super().run(...) keeps that behavior. Your pre/post logs are in the worker process; the flow itself runs in a different Python process.
There isn’t a supported switch to make ProcessWorker run flows in the same process. That isolation is by design.
Practical debugging options:
- Want true in-process debugging? Use serve/aserve (or just call the flow function directly).
- Need to go through a work pool/worker but still debug? Attach a remote debugger in the flow process:
import debugpy
debugpy.listen(("0.0.0.0", 5678))
print("Waiting for debugger attach...")
debugpy.wait_for_client()
Then attach your IDE to port 5678 before the flow continues.
- Improve visibility: set PREFECT_LOGGING_LEVEL=DEBUG and consider PREFECT_DEBUG_MODE=1 to get richer tracebacks.
Docs for local-process execution are here: Run flows in local processes.
If your goal is “worker-like” queue consumption but without subprocesses, say so—there may be a different pattern that fits, but it won’t be ProcessWorker.zz ht
09/08/2025, 11:22 AMMarvin
09/08/2025, 11:28 AMMarvin
09/08/2025, 11:35 AM{
"job_configuration": {
"command": "{{ command }}",
"env": "{{ env }}",
"labels": "{{ labels }}",
"name": "{{ name }}",
"stream_output": "{{ stream_output }}",
"working_dir": "{{ working_dir }}"
},
"variables": {
"type": "object",
"properties": {
"name": {
"title": "Name",
"description": "Name given to infrastructure created by a worker.",
"default": null,
"type": [
"string",
"null"
]
},
"env": {
"title": "Environment Variables",
"description": "Environment variables to set when starting a flow run.",
"type": "object",
"default": {
"MY_DEFAULT_ENV": "hello",
"PREFECT_LOGGING_LEVEL": "DEBUG"
},
"additionalProperties": {
"type": [
"string",
"null"
]
}
},
"labels": {
"title": "Labels",
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"command": {
"title": "Command",
"description": "一般留空,ProcessWorker 会自动生成正确的 Prefect 启动命令",
"default": null,
"type": [
"string",
"null"
]
},
"stream_output": {
"title": "Stream Output",
"type": "boolean",
"default": true
},
"working_dir": {
"title": "Working Directory",
"type": [
"string",
"null"
],
"format": "path",
"default": null
}
}
}
}
- 创建一个 Process 类型的 work pool 并应用该模板:
prefect work-pool create "my-process-pool" --type process --base-job-template ./base-job-template.json
- 启动 worker(将从该 pool 拉取任务并按模板启动进程):
prefect worker start -p my-process-pool
参考
- Manage work pools
- Workers 概念
2) 如果你确实需要继承 ProcessWorker(自定义一个新的 worker 类型)
- 只有在你要“定义全新的 worker 类型”时才需要继承,例如想用一个自定义的 type 名称,或者打包到你自己的服务中统一运行。
- 绝大多数“命令/环境变量/工作目录”的需求,直接用上面的 base job template 更合适。
最小示例(定义自定义类型 + 自带 base_job_template)
# custom_process_worker.py
from prefect.workers.process import ProcessWorker
CUSTOM_BASE_JOB_TEMPLATE = {
"job_configuration": {
"command": "{{ command }}",
"env": "{{ env }}",
"labels": "{{ labels }}",
"name": "{{ name }}",
"stream_output": "{{ stream_output }}",
"working_dir": "{{ working_dir }}"
},
"variables": {
"type": "object",
"properties": {
"name": {"type": ["string", "null"], "default": None, "title": "Name"},
"env": {
"title": "Environment Variables",
"type": "object",
"default": {"MY_DEFAULT_ENV": "hello"},
"additionalProperties": {"type": ["string", "null"]}
},
"labels": {"title": "Labels", "type": "object", "additionalProperties": {"type": "string"}},
"command": {"title": "Command", "type": ["string", "null"], "default": None},
"stream_output": {"title": "Stream Output", "type": "boolean", "default": True},
"working_dir": {"title": "Working Directory", "type": ["string", "null"], "default": None}
}
}
}
class MyProcessWorker(ProcessWorker):
# 自定义一个新的 worker 类型名(对应 work pool 的 --type)
type = "my-process"
# 运行:首次启动时,如果 pool 不存在会自动创建并使用自定义模板
if __name__ == "__main__":
worker = MyProcessWorker(
work_pool_name="my-process-pool",
create_pool_if_not_found=True,
base_job_template=CUSTOM_BASE_JOB_TEMPLATE,
)
# 运行 worker(异步)。如果你在脚本里运行,可使用 anyio.run(worker.run)
import anyio
anyio.run(worker.run)
注意
- 上面示例里我们使用了 type = "my-process" 来代表一个新的 work pool 类型。你也可以事先用 CLI 创建该 pool:
```Marvin
09/08/2025, 11:35 AMprefect worker start 只能启动内置类型;自定义类型请用你的脚本来跑,这样 Prefect 服务端就能知道这个类型与实现的映射)。
更多参考
- Developing a custom worker
- prefect.workers.process API
需要我把上述示例做成一个可直接运行的最小仓库,或者你想在启动前后加自定义脚本/Hook、拼接命令等更深入的定制吗?告诉我你的具体诉求(比如要注入的环境变量、是否要改启动命令、是否需要 pre/post hook),我可以给出更贴近你场景的“custom ProcessWorker”版本。zz ht
09/12/2025, 6:06 AMMarvin
09/12/2025, 6:06 AMMarvin
09/12/2025, 6:10 AM--work-queue 则只会取对应队列的 flow run(可重复传多个队列)。
常用命令(已验证存在)
- 启动 worker(绑定到某个 pool,且只拉取指定队列)
prefect worker start -p my-pool -q high-priority -q etl
- 创建 Work Pool(选你用的基础设施类型,比如 process/docker/kubernetes)
prefect work-pool create my-pool --type process
- 在 Pool 下创建 Work Queue
prefect work-queue create high-priority --pool my-pool
- 部署 flow 时指定 Work Pool / Work Queue(也可在交互式里选择)
prefect deploy ./path/to/flow.py:my_flow -n my-deploy -p my-pool -q high-priority
补充说明
- 标签(tags)不用于 3.x 的路由,它们主要用于并发控制;路由完全靠 Work Pool / Work Queue。
- 一个 worker 可以监听多个队列;改变 worker 能跑的 flow,通常是调整部署归属到哪个 Pool/Queue,或调整 worker 启动参数监听哪些队列。
文档
- Workers / Work Pools 概念:Workers 文档
- Work Pools & Queues 博客概览:Work Pools and Workers 概览
如果你方便的话,能否补充下你的目标筛选方式(比如按环境、按团队、还是按优先级)以及你使用的执行环境(process/docker/k8s)?我可以给出更贴近你场景的配置示例。