Luis Condados
09/24/2025, 2:14 PM@flow(name="example_flow", retries=0)
def example_flow() -> None:
logger = get_logger()
<http://logger.info|logger.info>("Example flow started.")
result = dummy_taskA()
<http://logger.info|logger.info>(f"Result from Task A: {result}")
# Simulate some work
for i in range(5):
<http://logger.info|logger.info>(f"Working... step {i+1}/5")
time.sleep(1)
<http://logger.info|logger.info>("Example flow completed.")
if __name__ == "__main__":
example_flow.serve(
name="example-deployment",
pause_on_shutdown=False,
cron="* * * * *", # run every day at 00:00 (midnight)
)
Marvin
09/24/2025, 2:14 PMLuis Condados
09/24/2025, 2:32 PMMarvin
09/24/2025, 2:33 PMMarvin
09/24/2025, 2:34 PMprefect work-pool create "process-pool" --type process
2) Start a process worker for that pool (keep this running)
prefect worker start -p "process-pool"
3) Deploy your flow to that pool
from prefect import flow
@flow
def example_flow():
return "hello"
# Send to the process worker via its pool
example_flow.deploy(
name="example-on-process",
work_pool_name="process-pool",
work_queue_name="default", # optional; defaults to 'default'
job_variables={
# These map to prefect.workers.process.ProcessVariables
"working_dir": "/path/to/project", # path must exist on the worker machine
"env": {"PREFECT_LOGGING_LEVEL": "INFO"},
"stream_output": True,
# "command": "...", # optional custom command
# "labels": {"team": "data"}, # optional
},
# Not building/pushing images for process runs
build=False,
push=False,
)
Notes:
- For remote workers, ensure the code exists at the same working_dir
on the worker machine. If not, deploy from a source repo so the worker can pull it:
example_flow.from_source(
"<https://github.com/your-org/your-repo>",
entrypoint="flows/example.py:example_flow",
).deploy(
name="example-on-process",
work_pool_name="process-pool",
job_variables={"stream_output": True},
)
- Valid job_variables
keys for a process worker are: env
, labels
, command
, stream_output
, working_dir
.
Does your worker run on the same machine as your code, or remotely? I can tailor the job_variables
(e.g., working_dir
) and source setup accordingly.Marvin
09/24/2025, 2:40 PMprefect config set PREFECT_API_URL="<http://127.0.0.1:4200/api>"
Docs: Self-hosted config
Step 1 — Create a Process work pool
prefect work-pool create "process-pool" --type process
Docs: Work pool CLI
Step 2 — Start a worker for that pool (in the background)
- Quick/temporary (shell background):
nohup prefect worker start --pool process-pool > worker.log 2>&1 &
- Recommended (as a service via systemd on Linux):
Create `/etc/systemd/system/prefect-worker.service`:
[Unit]
Description=Prefect Process Worker
After=network.target
[Service]
# Point to your server and project
Environment="PREFECT_API_URL=<http://127.0.0.1:4200/api>"
WorkingDirectory=/path/to/your/project
# Ensure this uses the right Python env (venv) that has your deps installed
ExecStart=/usr/bin/env bash -lc 'prefect worker start --pool process-pool'
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
Then:
sudo systemctl daemon-reload
sudo systemctl enable --now prefect-worker
systemctl status prefect-worker
Step 3 — Create the deployment (CLI or Python)
Option A: CLI (easiest)
Run from your project root:
prefect deploy ./flow.py:example_flow \
--name example-deployment \
--work-pool process-pool \
--cron "* * * * *"
Notes:
- This does not use Docker; the process worker runs your code in the current environment.
- Make sure the worker’s environment has all your Python deps installed.
Docs: Create deployments
Option B: Programmatic
If you prefer doing it in Python instead of the CLI:
from prefect import flow, task, get_run_logger
import time
@task
def dummy_taskA():
return 42
@flow(name="example_flow", retries=0)
def example_flow():
logger = get_run_logger()
<http://logger.info|logger.info>("Example flow started.")
result = dummy_taskA()
<http://logger.info|logger.info>(f"Result from Task A: {result}")
for i in range(5):
<http://logger.info|logger.info>(f"Working... step {i+1}/5")
time.sleep(1)
<http://logger.info|logger.info>("Example flow completed.")
if __name__ == "__main__":
# Register a deployment pointing to this project
flow.from_source(
source=".", # current project directory
entrypoint="flow.py:example_flow", # adjust to your file name
).deploy(
name="example-deployment",
work_pool_name="process-pool",
cron="* * * * *",
)
Notes:
- For a process worker, you don’t need to specify an image/build/push.
- Remove your previous `serve(...)`; serve
is only for local scheduling without a worker.
How to verify everything
- In the UI, you should see:
- Your “process-pool” work pool with a healthy worker
- The “example-deployment” deployment with the cron schedule
- A run should be created on the next cron tick. Logs will show up in the UI and in your worker’s stdout.
Why “deploy” instead of “serve”
- serve keeps a local scheduler running inside your script.
- deploy registers the flow with your server and defers execution to workers. This is what you want for background, durable execution on the same machine (without Docker).
A couple of small tips
- Your cron of * * * * *
runs every minute. If you really want midnight daily, use 0 0 * * *
.Marvin
09/24/2025, 2:40 PMpython -c 'import your_deps'
).
Would you prefer using the CLI or programmatic deployment? Also, what OS are you on? If it’s not Linux/systemd, I can share a background/run-as-service snippet for your platform.