Dom
10/20/2025, 6:54 AMMarvin
10/20/2025, 6:54 AMMarvin
10/20/2025, 6:57 AM@flow@taskget_run_logger()@flowPREFECT_API_URLpython
import os
# Ensure local-only (no API calls)
os.environ["PREFECT_API_URL"] = ""
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
# Imagine this comes from your private package
from my_private_pkg.etl import fetch_raw, transform, load
@task(retries=2, retry_delay_seconds=1, log_prints=True, persist_result=True)
def fetch(url: str):
    return fetch_raw(url)
@task(persist_result=True)
def transform_records(data):
    return transform(data)
@task
def load_records(rows):
    load(rows)
@flow(task_runner=ThreadPoolTaskRunner(max_workers=4), log_prints=True)
def etl_pipeline(urls: list[str]):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Starting ETL for {len(urls)} urls")
    raw_futs = [fetch.submit(u) for u in urls]
    xformed = [transform_records.submit(f) for f in raw_futs]
    # Resolve futures before load
    rows = [x.result() for x in xformed]
    load_records(rows)
    return len(rows)
if __name__ == "__main__":
    count = etl_pipeline(["<https://api.example.com/a>", "<https://api.example.com/b>"]).result()
    print(f"Processed {count} batches")python
from prefect import task
@task(retries=3, retry_delay_seconds=2)
def call_service(url: str):
    import httpx
    with httpx.Client(timeout=30) as client:
        r = client.get(url)
        r.raise_for_status()
        return r.json()bash
# Disable Cloud/Server communication
export PREFECT_API_URL=""
# Persist results locally (defaults to ~/.prefect/storage)
export PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
export PREFECT_TASKS_DEFAULT_PERSIST_RESULT=true
# Optional custom path
# export PREFECT_LOCAL_STORAGE_PATH=/tmp/prefect-storageProcessPoolTaskRunnerMarvin
10/20/2025, 6:57 AMDom
10/20/2025, 7:45 AMDom
10/20/2025, 7:45 AMMarvin
10/20/2025, 7:45 AMMarvin
10/20/2025, 7:47 AMpython
from prefect import task
@task(
    retries=0,                   # no retries
    retry_delay_seconds=None,    # no retry schedule
    timeout_seconds=None,        # no timeout
    cache_policy=None,           # disable cross-call caching
    cache_key_fn=None,           # ensure no custom cache key
    cache_expiration=None,       # no cache expiration since caching is off
    cache_result_in_memory=False,# no in-memory result cache
    persist_result=False,        # do not write results to storage
    result_storage=None          # ensure no storage is configured
)
def my_task(x):
    return xpython
from prefect import flow
@flow(
    retries=0,                   # no retries
    retry_delay_seconds=None,
    timeout_seconds=None,        # no timeout
    cache_policy=None,           # disable flow-level caching
    cache_key_fn=None,
    cache_expiration=None,
    persist_result=False,        # do not persist flow result
    result_storage=None
)
def my_flow(x):
    return my_task(x)retries=0Nonecache_policy=Nonecache_result_in_memory=Falsepersist_result=Falseresult_storage=Nonebash
# Make sure these are unset or not set to "true"
unset PREFECT_RESULTS_PERSIST_BY_DEFAULT
unset PREFECT_TASKS_DEFAULT_PERSIST_RESULTDom
10/20/2025, 7:49 AMMarvin
10/20/2025, 7:49 AMMarvin
10/20/2025, 7:50 AM@task(retries=..., retry_delay_seconds=...)@flow(retries=..., retry_delay_seconds=...)python
from prefect import flow, task
@task(retries=0)  # task will not retry
def flaky():
    raise RuntimeError("boom")
@flow(retries=2, retry_delay_seconds=3)  # flow will retry the entire run up to 2 times
def my_flow():
    flaky()
if __name__ == "__main__":
    my_flow()Dom
10/20/2025, 7:50 AMretries=0,                   # no retries
    retry_delay_seconds=None,    # no retry schedule
    timeout_seconds=None,        # no timeout
    cache_policy=None,           # disable cross-call caching
    cache_key_fn=None,           # ensure no custom cache key
    cache_expiration=None,       # no cache expiration since caching is off
    cache_result_in_memory=False,# no in-memory result cache
    persist_result=False,        # do not write results to storage
    result_storage=None          # ensure no storage is configuredMarvin
10/20/2025, 7:50 AMMarvin
10/20/2025, 7:54 AMDom
10/20/2025, 7:57 AMMarvin
10/20/2025, 7:57 AMMarvin
10/20/2025, 7:58 AMMarvin
10/20/2025, 7:58 AMcache_result_in_memory=Falsepersist_result=Falsecache_policy=Nonecache_result_in_memory=Falsepersist_result=False@flow(task_runner=ThreadPoolTaskRunner(max_workers=N))log_prints=Falsedelpython
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task(
    retries=0,
    timeout_seconds=None,
    cache_policy=None,
    cache_result_in_memory=False,
    persist_result=False,
)
def preprocess_and_write(chunk) -> str:
    # process then write big data to disk/object store
    path = f"/tmp/out/{chunk['id']}.parquet"
    # ... write the data ...
    return path  # small handle
@task
def consume(path: str) -> int:
    # read small metadata or pass path downstream
    # ... small work ...
    return 1
@flow(task_runner=ThreadPoolTaskRunner(max_workers=4))
def pipeline(chunks):
    for ch in chunks:
        p = preprocess_and_write.submit(ch)
        c = consume.submit(p)
        # do not accumulate in lists; let references go per-iteration
        _ = c.result()Dom
10/23/2025, 3:18 AMDom
10/23/2025, 3:18 AMMarvin
10/23/2025, 3:19 AMMarvin
10/23/2025, 3:19 AMDom
10/23/2025, 3:20 AM03:16:29.957 | INFO    | prefect - Starting temporary server on <http://127.0.0.1:8954>
See <https://docs.prefect.io/v3/concepts/server#how-to-guides> for more information on running a dedicated Prefect server.
03:16:36.338 | INFO    | Flow run 'purple-bear' - Beginning flow run 'purple-bear' for flow 'msme_credit_score_flow'
03:16:36.388 | INFO    | Task run '_load_config-5b3' - Finished in state Completed()Marvin
10/23/2025, 3:20 AMMarvin
10/23/2025, 3:24 AMPREFECT_API_URLprefect server startPREFECT_API_URLPREFECT_API_URLPREFECT_API_KEYDom
10/23/2025, 3:29 AMMarvin
10/23/2025, 3:29 AMMarvin
10/23/2025, 3:29 AMPREFECT_LOGGING_TO_API_ENABLED=falsePREFECT_LOGGING_LEVEL=WARNINGprefect server startPREFECT_API_URLPREFECT_SERVER_DATABASE_CONNECTION_URLDom
10/23/2025, 3:41 AMMarvin
10/23/2025, 3:41 AMMarvin
10/23/2025, 3:43 AMprefect server start --host 0.0.0.0 --port 4200curl -s http://<your-host>:4200/api/helloPREFECT_API_URLprefect config set PREFECT_API_URL=http://<your-host>:4200/apiprefect config view | grep PREFECT_API_URL
  echo $PREFECT_API_URLprefect work-pool create my-pool --type process
  prefect work-pool lsmy-poolprefect worker start -p my-poolPREFECT_API_URLfrom prefect import flow
  @flow
  def hello():
      print("hi")
  if __name__ == "__main__":
      hello()prefect version
  prefect config view | grep PREFECT_API_URLprefect work-pool create my-pool --type processDom
10/23/2025, 7:19 AMMarvin
10/23/2025, 7:19 AMMarvin
10/23/2025, 7:20 AMprefect server start --host 0.0.0.0 --port 4200curl -s http://<your-host>:4200/api/helloprefect config set PREFECT_API_URL=http://<your-host>:4200/apiprefect config view | grep PREFECT_API_URLprefect work-pool create my-pool --type process
  prefect worker start -p my-poolfrom prefect import flow
  @flow
  def hello():
      print("hi")
  if __name__ == "__main__":
      hello()prefect config set PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS=60prefect config set PREFECT_SERVER_EPHEMERAL_ENABLED=falseprefect version
  prefect config view | grep -E "PREFECT_API_URL|PREFECT_SERVER_EPHEMERAL"
  curl -s http://<your-host>:4200/api/hello