<@ULVA73B9P>: Can I use only prefect flow, pipelin...
# ask-marvin
d
@Marvin: Can I use only prefect flow, pipeline and task logics? Not having a anything cloud, or locally self-hosted Prefect server. Just use logics behind those functions. I have several python functions and private packages. I want to use those as a task. And just creating flow and pipelines and implement with REST api -- Is that best practice or work smoothly? Would i counter any problem when i use prefect as that?
m
thought for 210.8 seconds
Short answer: yes. In Prefect 3.x you can use just the Python library (flows, tasks, subflows) with no Prefect Cloud and no self‑hosted server. Many users do this for “embedded orchestration” inside their apps. What works fine with no backend - Define and run
@flow
and
@task
functions locally like normal Python - Concurrency via task runners (threads by default, or processes) - Retries, timeouts, caching, and result persistence to local storage - Logging via
get_run_logger()
(to your console) - Composing “pipelines” by calling tasks from flows and using subflows What you won’t get without a backend - No UI/dashboard, run history, or central log storage - No scheduled runs, workers, or deployments - No global concurrency limits, automations, or event processing - Retries only occur in the same process/session (not across restarts) A note on “pipeline” - In Prefect 3 there’s no separate “pipeline” object. You model pipelines with a
@flow
(optionally calling other flows as subflows) composed of `@task`s. About REST APIs - If you aren’t running Prefect Cloud/Server, Prefect’s REST API is not available/needed. - To integrate with your own REST services, just call them from tasks using `httpx`/`requests`. If you want to expose your flow as an HTTP endpoint, wrap it with a small FastAPI/Flask app that calls the flow. Basic local-only setup - Make sure no API is configured by leaving
PREFECT_API_URL
unset or setting it to an empty string. - Optionally enable local result persistence and tweak logging. Example using your own functions/private packages as tasks
Copy code
python
import os
# Ensure local-only (no API calls)
os.environ["PREFECT_API_URL"] = ""

from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner

# Imagine this comes from your private package
from my_private_pkg.etl import fetch_raw, transform, load

@task(retries=2, retry_delay_seconds=1, log_prints=True, persist_result=True)
def fetch(url: str):
    return fetch_raw(url)

@task(persist_result=True)
def transform_records(data):
    return transform(data)

@task
def load_records(rows):
    load(rows)

@flow(task_runner=ThreadPoolTaskRunner(max_workers=4), log_prints=True)
def etl_pipeline(urls: list[str]):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Starting ETL for {len(urls)} urls")

    raw_futs = [fetch.submit(u) for u in urls]
    xformed = [transform_records.submit(f) for f in raw_futs]
    # Resolve futures before load
    rows = [x.result() for x in xformed]
    load_records(rows)
    return len(rows)

if __name__ == "__main__":
    count = etl_pipeline(["<https://api.example.com/a>", "<https://api.example.com/b>"]).result()
    print(f"Processed {count} batches")
Calling an external REST API from a task
Copy code
python
from prefect import task

@task(retries=3, retry_delay_seconds=2)
def call_service(url: str):
    import httpx
    with httpx.Client(timeout=30) as client:
        r = client.get(url)
        r.raise_for_status()
        return r.json()
Optional environment tweaks for local mode
Copy code
bash
# Disable Cloud/Server communication
export PREFECT_API_URL=""

# Persist results locally (defaults to ~/.prefect/storage)
export PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
export PREFECT_TASKS_DEFAULT_PERSIST_RESULT=true
# Optional custom path
# export PREFECT_LOCAL_STORAGE_PATH=/tmp/prefect-storage
Common gotchas - Scheduling/deployments/workers require a server/Cloud; purely local runs are “you run the script when you want it to run.” - With
ProcessPoolTaskRunner
, your tasks must be importable/pickleable. - Without a backend, logs and states are ephemeral to the current process; if you need history, persist results to files/object storage yourself or adopt Prefect Cloud/Server later. Docs to dig deeper - Quickstart: Quickstart - Logging: Add logging
- Caching: Caching - Results and persistence: Results - Task runners: Task runners - Settings: Settings reference A couple of clarifying questions to make sure we point you the right way: - When you said “implement with REST API,” do you want your flows to call your own external services, or expose your flows as HTTP endpoints? - Do you need scheduling, a UI, or centralized run history? If yes, you’ll want Prefect Cloud or a self-hosted Server. If not, the local-only approach above will work smoothly.
d
How to disable Retries, timeouts, caching, and result persistence to local storage
@Marvin How to disable Retries, timeouts, caching, and result persistence to local storage
m
thought for 68.6 seconds
Great question. In Prefect 3.x you can explicitly turn all of those off at the decorator level. Here’s the “disable everything” pattern for both tasks and flows. Per-task
Copy code
python
from prefect import task

@task(
    retries=0,                   # no retries
    retry_delay_seconds=None,    # no retry schedule
    timeout_seconds=None,        # no timeout
    cache_policy=None,           # disable cross-call caching
    cache_key_fn=None,           # ensure no custom cache key
    cache_expiration=None,       # no cache expiration since caching is off
    cache_result_in_memory=False,# no in-memory result cache
    persist_result=False,        # do not write results to storage
    result_storage=None          # ensure no storage is configured
)
def my_task(x):
    return x
Per-flow
Copy code
python
from prefect import flow

@flow(
    retries=0,                   # no retries
    retry_delay_seconds=None,
    timeout_seconds=None,        # no timeout
    cache_policy=None,           # disable flow-level caching
    cache_key_fn=None,
    cache_expiration=None,
    persist_result=False,        # do not persist flow result
    result_storage=None
)
def my_flow(x):
    return my_task(x)
Notes and gotchas - Retries: default is effectively “no retries.” Setting
retries=0
makes that explicit. - Timeouts: default is
None
(disabled). Only set this if you’ve previously enabled a timeout or want to be explicit. - Caching: - Disable cross-call caching with
cache_policy=None
. - Also set
cache_result_in_memory=False
on tasks to avoid in-memory result caching during a run. - Result persistence: set
persist_result=False
and leave
result_storage=None
to prevent writing to local storage. - If you’ve enabled any global defaults via settings or a profile, make sure those are not forcing persistence or caching. For example:
Copy code
bash
# Make sure these are unset or not set to "true"
unset PREFECT_RESULTS_PERSIST_BY_DEFAULT
unset PREFECT_TASKS_DEFAULT_PERSIST_RESULT
Docs for reference - Caching: Caching - Results/persistence: Results - Settings: Settings reference If you share a small snippet of your current decorators, I can suggest the minimal changes needed to disable each behavior.
d
@Marvin: setting retries on per-flow is that effect on per-task or not ?
m
thought for 32.2 seconds
Short answer: no. Flow-level retries do not affect task-level retries in Prefect 3.x. How it works - Task retries are set per task with
@task(retries=..., retry_delay_seconds=...)
. - Flow retries are set on the flow with
@flow(retries=..., retry_delay_seconds=...)
and only control retrying the entire flow run. - If a task ultimately fails (after its own retries, if any), the flow fails. If the flow has retries, Prefect re-runs the whole flow from the beginning. - Subflows behave the same: each subflow’s retries are independent. Re-running the parent flow will call the subflow again. Caching/persistence note - On a flow retry, tasks will re-run unless you’ve enabled a cache policy and result persistence. With caching + persisted results, successful work can be skipped on the next attempt. Without it, everything re-executes. Tiny example
Copy code
python
from prefect import flow, task

@task(retries=0)  # task will not retry
def flaky():
    raise RuntimeError("boom")

@flow(retries=2, retry_delay_seconds=3)  # flow will retry the entire run up to 2 times
def my_flow():
    flaky()

if __name__ == "__main__":
    my_flow()
Docs: - Flows and tasks overview: Quickstart - Caching: Caching - Results/persistence: Results If you want to fully disable retries/timeouts/caching/persistence, set them explicitly on each decorator as in my prior message.
d
@Marvin: What is default values for per-task and per-flow?
Copy code
retries=0,                   # no retries
    retry_delay_seconds=None,    # no retry schedule
    timeout_seconds=None,        # no timeout
    cache_policy=None,           # disable cross-call caching
    cache_key_fn=None,           # ensure no custom cache key
    cache_expiration=None,       # no cache expiration since caching is off
    cache_result_in_memory=False,# no in-memory result cache
    persist_result=False,        # do not write results to storage
    result_storage=None          # ensure no storage is configured
m
thought for 245.2 seconds
Here are the Prefect 3.x defaults for those options. Note that some options exist only on tasks (not flows). Flows (@flow) - retries: None (effectively 0; no retries) - retry_delay_seconds: None (not used unless you set retries > 0; no delay by default) - timeout_seconds: None (no timeout) - cache_policy: None (flow-level caching disabled) - cache_key_fn: None - cache_expiration: None - persist_result: None (falls back to settings; defaults to False) - result_storage: None (only used if you turn on persistence) Tasks (@task) - retries: None (effectively 0; no retries) - retry_delay_seconds: None (not used unless retries > 0; no delay by default) - timeout_seconds: None (no timeout) - cache_policy: NotSet (resolves to the DEFAULT policy = INPUTS + TASK_SOURCE) - cache_key_fn: None - cache_expiration: None - cache_result_in_memory: True (in-run, in-memory cache is on by default) - persist_result: None (falls back to settings; defaults to False) - result_storage: None (only used if you turn on persistence) Important clarifications - “No retries” is the default for both flows and tasks. - Flow-level caching is off by default (cache_policy=None). - Task-level default cache policy is DEFAULT (INPUTS + TASK_SOURCE), but since persist_result defaults to False, you only get in-run in-memory caching by default; cross-run caching requires persistence and storage. - cache_result_in_memory is a task-only setting; flows do not have this parameter. Settings that can override persistence defaults - PREFECT_TASKS_DEFAULT_PERSIST_RESULT - PREFECT_RESULTS_PERSIST_BY_DEFAULT If unset, persistence is False by default. Docs - Caching: Caching - Results/persistence: Results - Settings reference: Settings reference
d
@Marvin: So, when i disabled all caching and result storage. Which is not gonna explode my applications' in memory?
m
thought for 33.4 seconds
Sorry, I encountered an error while processing your request. Please try again.
Short answer: if you disable caching and result persistence, Prefect won’t keep extra copies of your data. The main memory usage will come from whatever your tasks return and whatever you keep referenced in your flow. What Prefect still holds in memory - The task’s return value is kept long enough for downstream tasks to consume it. That’s necessary for the DAG to work. - If you set
cache_result_in_memory=False
and
persist_result=False
, Prefect will not keep an additional in-memory cache or write results to disk. There’s no extra duplication beyond your returned objects and any references you keep. How to avoid “memory explosion” - Don’t return large objects from tasks. Write big intermediates to disk/object storage/database and return a small handle (e.g., path/URL/ID). - Stream instead of accumulating. Avoid building big lists of futures or results in the flow. - Process items in a for-loop and let references go as soon as possible. - Don’t store all results in a list unless they’re small. - Turn off in-run caching for tasks: -
cache_policy=None
-
cache_result_in_memory=False
-
persist_result=False
- Cap concurrency to match your memory budget: -
@flow(task_runner=ThreadPoolTaskRunner(max_workers=N))
- Avoid logging huge payloads and keep
log_prints=False
for large data paths. - Optionally
del
large Python objects once you’re done with them to help GC. Minimal example pattern
Copy code
python
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@task(
    retries=0,
    timeout_seconds=None,
    cache_policy=None,
    cache_result_in_memory=False,
    persist_result=False,
)
def preprocess_and_write(chunk) -> str:
    # process then write big data to disk/object store
    path = f"/tmp/out/{chunk['id']}.parquet"
    # ... write the data ...
    return path  # small handle

@task
def consume(path: str) -> int:
    # read small metadata or pass path downstream
    # ... small work ...
    return 1

@flow(task_runner=ThreadPoolTaskRunner(max_workers=4))
def pipeline(chunks):
    for ch in chunks:
        p = preprocess_and_write.submit(ch)
        c = consume.submit(p)
        # do not accumulate in lists; let references go per-iteration
        _ = c.result()
If memory pressure is still tight, the most effective technique is to keep large data out of return values and pass handles instead. If you later want to push even more memory off the process, enabling result persistence to local disk for just the large steps (and returning references) is also a safe option, even without any server. Docs: - Caching: Caching - Results/persistence: Results - Task runners and concurrency: Task runners
d
hello
@Marvin: are you alive? /
m
thought for 4.8 seconds
Very much alive! How can I help with your Prefect workflows today?
d
@Marvin:
Copy code
03:16:29.957 | INFO    | prefect - Starting temporary server on <http://127.0.0.1:8954>⁠

See <https://docs.prefect.io/v3/concepts/server#how-to-guides>⁠ for more information on running a dedicated Prefect server.

03:16:36.338 | INFO    | Flow run 'purple-bear' - Beginning flow run 'purple-bear' for flow 'msme_credit_score_flow'

03:16:36.388 | INFO    | Task run '_load_config-5b3' - Finished in state Completed()
I have not set prefect api and api-key for my service. Is that consume server resources massively?
m
thought for 275.2 seconds
Short answer: No — that “temporary server” is lightweight and won’t consume massive resources. What you’re seeing is Prefect 3’s ephemeral server. When no
PREFECT_API_URL
(and thus no API key) is configured, Prefect starts a local API on a random localhost port to track your flow/task state. Key points: - It runs as a single local API subprocess with a small SQLite database in your Prefect home directory; CPU/memory usage is minimal and it shuts down when your flow/process ends. - It’s intended for local dev and ad-hoc runs — not production scale. - There’s no network exposure beyond localhost unless you explicitly configure it. If you want to change this behavior: - Use a persistent local server: run
prefect server start
and point your runs at it by setting
PREFECT_API_URL
to the URL the command prints. - Use Prefect Cloud: set
PREFECT_API_URL
and
PREFECT_API_KEY
for your workspace. - If you really don’t want Prefect to auto-start an ephemeral server, you can disable it via settings and run purely in “unconfigured” mode (you’ll lose orchestration features). Docs: - Overview and options: Prefect Server concepts - Running a dedicated local server: Server CLI guide
d
@Marvin: What if i run significant request is that matter?
m
thought for 41.3 seconds
Great question. Short version: “Significant requests” inside your flow don’t stress the Prefect ephemeral server much. The heavy work runs in your Python process; the ephemeral server mostly records states/logs. When it could matter: - Lots of concurrent flows/tasks or very chatty logging (thousands of log lines/state transitions per minute) can push the ephemeral server’s SQLite backend and you may see higher CPU/IO or occasional “database is locked” behavior. - For that kind of volume, use a dedicated server (or Cloud) instead of the ephemeral one. Ways to mitigate if you stay local: - Reduce API logging: set
PREFECT_LOGGING_TO_API_ENABLED=false
- Lower log volume:
PREFECT_LOGGING_LEVEL=WARNING
(or prune logging in tasks) For heavier usage: - Start a local persistent server:
prefect server start
, then set
PREFECT_API_URL
to the URL it prints. - For high concurrency/throughput, point the server at PostgreSQL via
PREFECT_SERVER_DATABASE_CONNECTION_URL
for better write concurrency than SQLite. Docs: - Server overview: Prefect Server concepts - Running a dedicated server: Server CLI guide If you can share what “significant” means (lots of parallel tasks? very verbose logging? many scheduled runs?), I can recommend the best setup.
d
@Marvin: Okay I have self-hosted prefect on my server. And authenticated and run flow. Then no flow showing on UI. Even trying to add work pool is not working
m
thought for 135.5 seconds
Got it — when flows don’t show in the UI and work pool creation seems to “not work,” it’s almost always an API URL mismatch. Your flow/CLI are likely talking to an ephemeral/local API while your browser is looking at your self-hosted UI. Here’s a quick checklist to get everything pointed at the same server: 1) Confirm your server is up and reachable - If you started it locally:
prefect server start --host 0.0.0.0 --port 4200
- From the machine where you run flows and CLI: - Test the API directly:
Copy code
curl -s http://<your-host>:4200/api/hello
You should see a friendly hello response. 2) Point your CLI and flows to that API URL - Set
PREFECT_API_URL
in the same environment where you run CLI and flows:
Copy code
prefect config set PREFECT_API_URL=http://<your-host>:4200/api
- Verify:
Copy code
prefect config view | grep PREFECT_API_URL
  echo $PREFECT_API_URL
- When you run a flow, the logs should show “Connecting to Prefect server at http//&lt;your host&gt;4200/api”. If you see “Starting temporary server on http://127.0.0.1:…”, it’s still connecting to an ephemeral server. 3) Create a work pool on that server and verify it shows in the UI - Example:
Copy code
prefect work-pool create my-pool --type process
  prefect work-pool ls
You should see
my-pool
in both the CLI and the UI. If you created it while pointed at an ephemeral server, it won’t persist or appear in your UI. 4) Start a worker for that pool
Copy code
prefect worker start -p my-pool
5) Minimal flow run test - In the same shell (so it uses the same
PREFECT_API_URL
):
Copy code
from prefect import flow

  @flow
  def hello():
      print("hi")

  if __name__ == "__main__":
      hello()
- You should see the flow run appear in the UI immediately. If that still doesn’t work, could you share: - The URL you use for the UI (e.g., http//&lt;your host&gt;4200) - The output of:
Copy code
prefect version
  prefect config view | grep PREFECT_API_URL
- Any error you see when you run:
Copy code
prefect work-pool create my-pool --type process
This will let me pinpoint whether it’s an API URL/profile mismatch or a server/reverse-proxy connectivity issue. Docs: - Server concepts: Self-hosted Prefect Server - Work pools/workers: Work pools and workers - Server CLI: Server CLI guide
d
@Marvin: Timed out while attempting to connect to ephemeral Prefect API server.
m
thought for 57.3 seconds
That timeout means your code tried to auto-start the ephemeral (temporary) API but it didn’t come up within the default 20s window. Easiest fix: avoid the ephemeral server altogether and point your CLI and flows at your self-hosted server. Do this: 1) Make sure your self-hosted server is running and reachable - Start it (if not already):
Copy code
prefect server start --host 0.0.0.0 --port 4200
- From the machine where you run flows/CLI:
Copy code
curl -s http://<your-host>:4200/api/hello
You should get a hello response. 2) Point Prefect at your server (so it won’t try ephemeral) - In the same shell where you run flows and CLI:
Copy code
prefect config set PREFECT_API_URL=http://<your-host>:4200/api
- Verify:
Copy code
prefect config view | grep PREFECT_API_URL
- Now create a work pool and start a worker:
Copy code
prefect work-pool create my-pool --type process
  prefect worker start -p my-pool
- Run a simple flow from the same shell and confirm it appears in the UI:
Copy code
from prefect import flow

  @flow
  def hello():
      print("hi")

  if __name__ == "__main__":
      hello()
If you really want to use the ephemeral server, try: - Increase the startup timeout:
Copy code
prefect config set PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS=60
- Or disable ephemeral entirely (runs will be “unconfigured” and not show in a UI):
Copy code
prefect config set PREFECT_SERVER_EPHEMERAL_ENABLED=false
Common causes of ephemeral timeout: - Constrained environments (Docker/CI) where the subprocess can’t bind to 127.0.0.1 quickly - Very slow startup or high CPU pressure - Security policies blocking local sockets If it still fails, please share: - The UI URL you’re using - Output of:
Copy code
prefect version
  prefect config view | grep -E "PREFECT_API_URL|PREFECT_SERVER_EPHEMERAL"
  curl -s http://<your-host>:4200/api/hello
- How you’re running flows (local, Docker, remote server) Docs: - Server overview: Self-hosted Prefect Server - Server CLI: Server CLI guide - Ephemeral server settings: Server how-to guides