<@ULVA73B9P> how to design transaction roll back w...
# ask-marvin
s
@Marvin how to design transaction roll back with the following chained tasks running within a
DaskTaskRunner():
Copy code
@flow(
    task_runner=DaskTaskRunner(
        cluster_class=PBSCluster, 
        cluster_kwargs=pbs_cluster_kwargs,
        adapt_kwargs={"maximum": 20}
    ),
    persist_result=True
)
def my_flow(
) -> list[Path]:
    # dask cluster routines 
    download_futures = download_from_api.map(items)

    # rollback hook would apply only to this Prefect task
    with transaction() as txn:
        txn.set("filename", <download_future>)
        saved_zipfile_futures = save_zip.map(zip_path=download_futures)
Should i use an asynchronous context manager for this dask distributed flow/tasks? And what type of lock manager should I use to handle race conditions?
m
thought for 877.0 seconds
Great question. Short version: with Dask each mapped task runs in its own worker process, so a transaction you open in the flow won’t automatically apply inside those workers. To get per-task rollback, open the transaction inside the task itself and register the compensating action there. A concrete pattern you can use (Prefect 3.x):
Copy code
from pathlib import Path
import shutil
from dask_jobqueue import PBSCluster
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
from prefect.transactions import transaction

@task
def download_from_api(item: str) -> Path:
    # ... produce a local zip Path for this item ...
    ...

@task
def save_zip(zip_path: Path, dest_dir: Path) -> Path:
    # Open a transaction inside the task so it applies on the Dask worker
    with transaction(key=f"save-zip:{zip_path.name}") as txn:
        dest = dest_dir / zip_path.name

        # Register compensation for this unit of work
        # Use a closure to capture the concrete dest path
        txn.add_rollback_hook(lambda t, p=dest: p.exists() and p.unlink())

        # Do the side-effect
        shutil.copy2(zip_path, dest)

        # Optionally also register a commit action (e.g., tag/mark)
        # txn.add_commit_hook(lambda t, p=dest: ...)

        return dest

@flow(
    task_runner=DaskTaskRunner(
        cluster_class=PBSCluster,
        cluster_kwargs=...,         # your PBS config
        adapt_kwargs={"maximum": 20}
    ),
    persist_result=True,
)
def my_flow(items: list[str], dest_dir: Path) -> list[Path]:
    download_futures = download_from_api.map(items)
    saved_zipfile_futures = save_zip.map(zip_path=download_futures, dest_dir=dest_dir)
    return saved_zipfile_futures
Notes and options - Why inside the task? Dask executes mapped tasks in separate processes; opening
with transaction():
in the flow won’t be visible to those workers. Opening it in the task ensures the rollback hooks run if that unit fails or if its transaction is rolled back. - Task-level hooks: You can also attach hooks at task definition time using
@task(on_rollback=[...], on_commit=[...])
. These run in the context of the active transaction for that task run. They’re great for fixed behavior, but registering hooks inside the task body (as above) is more flexible because you can capture per-item details like a specific file path. - State hooks vs transactions:
on_failure
state hooks (docs) run when a task/flow enters a failed state. Use these for “cleanup only if this task fails.” Transactions (docs) add “commit or roll back” semantics and can run compensation even when a later error occurs elsewhere in the transaction scope. If you want “all-or-nothing” rollback across the entire map - Per-item transactions (above) only clean up the failing item. If you want to roll back all saved files when any item fails, you can: - Create a “parent” transaction in the flow, pass a transaction key into each task, and have the task open/join a child transaction using that key; then if any failure occurs and the parent rolls back, all child hooks run. - This pattern requires a shared ResultStore and propagating a transaction key to workers. It’s a bit more advanced; see Transactions docs for details and test in your environment. Quick sanity check - Dask runner import: in Prefect 3.x, use
from prefect_dask.task_runners import DaskTaskRunner
. - PBSCluster: ensure you’ve configured
dask-jobqueue
and PBS resources correctly. Could you confirm which behavior you need? - A) Roll back only the side-effects for the item that failed - B) Roll back all previously saved items if any mapped task fails I can tailor the code to your exact rollback semantics once I know which you’re aiming for.
s
@Marvin I confirm I want: A) Roll back only the side-effects for the item that failed. Please provide example using task-level hooks, including how the callable signature should look like.
m
thought for 158.0 seconds
Perfect — per-item rollback with task-level hooks is a good fit. Key points for Prefect 3.x: - Task-level transaction hooks receive exactly one argument: a Transaction object. - Hooks run only if the task executes inside a transaction; make sure to open one in the task body. - To get item-specific cleanup, stash any values the hook needs into the transaction via txn.set(...), then read them in the hook with txn.get(...). Example using task-level hooks for per-item cleanup on failure (works with Dask):
Copy code
from pathlib import Path
import shutil

from dask_jobqueue import PBSCluster
from prefect import flow, task, get_run_logger
from prefect.transactions import transaction, Transaction
from prefect_dask.task_runners import DaskTaskRunner


# Hook signatures: each receives a single Transaction
def cleanup_saved_file(txn: Transaction) -> None:
    # Read values captured during the task run
    dest_str = txn.get("dest_path", None)
    if not dest_str:
        return
    dest = Path(dest_str)
    try:
        if dest.exists():
            dest.unlink()
    except Exception as exc:
        # Avoid raising from hooks; just log if needed
        logger = get_run_logger()
        logger.warning(f"Rollback hook could not remove {dest}: {exc}")


def log_commit(txn: Transaction) -> None:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Committed file: {txn.get('dest_path', '<unknown>')}")


@task(
    on_rollback=[cleanup_saved_file],  # runs if the transaction rolls back
    on_commit=[log_commit],            # runs if the transaction commits
)
def save_zip(zip_path: Path, dest_dir: Path) -> Path:
    # Open a transaction INSIDE the task so it applies on the Dask worker
    with transaction(key=f"save-zip:{zip_path.name}") as txn:
        dest = dest_dir / zip_path.name

        # Make data available to hooks
        txn.set("dest_path", str(dest))

        # Side-effect
        shutil.copy2(zip_path, dest)

        return dest


@task
def download_from_api(item: str) -> Path:
    # ... produce a local zip Path for this item ...
    ...


@flow(
    task_runner=DaskTaskRunner(
        cluster_class=PBSCluster,
        cluster_kwargs=...,            # your PBS config
        adapt_kwargs={"maximum": 20},
    ),
    persist_result=True,
)
def my_flow(items: list[str], dest_dir: Path) -> list[Path]:
    download_futures = download_from_api.map(items)
    saved_zipfile_futures = save_zip.map(zip_path=download_futures, dest_dir=dest_dir)
    return saved_zipfile_futures
Alternative hook registration style - You can also attach hooks after defining a task:
Copy code
@save_zip.on_rollback
def cleanup(txn: Transaction):
    ...

@save_zip.on_commit
def committed(txn: Transaction):
    ...
Notes and gotchas - Transactions are not opened automatically just because hooks are present; you must call
with transaction() as txn:
inside the task for the hooks to run. - To make hooks item-aware, put per-item values into the transaction with
txn.set("key", value)
in the task, then read them in the hook via
txn.get("key")
. - Hooks run in the Dask worker where the task executes, so
get_run_logger()
works there. - If you ever see inconsistencies accessing data from on_commit hooks in nested/parent transactions, see this GitHub report for context: issue #17864. Docs - Transactions: Advanced transactions - State hooks overview: State-change hooks
s
@Marvin results of my flow are persisted with
persist_result=true
. I know where these results get stored on my local NFS where the prefect worker operates. My question is, if I want to add
isolation_level=IsolationLevel.SERIALIZABLE
to the in-task transaction to avoid any possible race conditions between Dask workers, is it okay to use the
FileSystemLockManager
lock_manager as prescribed in the documemtation:
Copy code
with transaction(
        key=transaction_key,
        isolation_level=IsolationLevel.SERIALIZABLE,
        store=ResultStore(
            lock_manager=FileSystemLockManager(
                lock_files_directory=PREFECT_HOME.value() / "locks"
            )
        ),
    ) as txn:
        if txn.is_committed():
            print("Data file has already been written. Exiting early.")
            return
m
thought for 155.4 seconds
Short answer: yes—that pattern is correct in Prefect 3.x, and it’s a good way to guard a critical section across Dask workers when they share an NFS volume. A few notes and a corrected example with 3.x imports: - Use a shared lock directory that every Dask worker can access. On NFS, make sure file locking is actually enabled and reliable (e.g., no nolock mount option; NFSv4 generally handles this better). - Prefer getting your Prefect home path via settings in 3.x rather than
PREFECT_HOME.value()
. - Make sure the transaction key is stable and uniquely identifies the resource you’re protecting (e.g., the target file path or dataset/date partition). -
persist_result=True
on tasks is independent of the transaction; the transaction’s commit record is written via the
ResultStore
you pass to
transaction(...)
. Example (Prefect 3.x):
Copy code
from prefect.transactions import transaction, IsolationLevel
from prefect.results import ResultStore
from prefect.locking.filesystem import FileSystemLockManager
from prefect.settings import get_current_settings

lock_dir = get_current_settings().home / "locks"

with transaction(
    key=transaction_key,  # e.g., f"write:{target_path}"
    isolation_level=IsolationLevel.SERIALIZABLE,
    store=ResultStore(
        lock_manager=FileSystemLockManager(lock_files_directory=lock_dir)
    ),
) as txn:
    if txn.is_committed():
        # Someone already committed this work
        return

    # Do the work here (ideally write to a tmp path)
    # ...

    # Let the context commit at exit; keep the critical section minimal
Caveats and tips: - NFS locking reliability and performance vary by implementation and mount options. If you see contention or flaky behavior, verify NFS lock daemon/opts and consider a smaller critical section or a single-writer pattern. Prefect’s built-in lock managers are memory (single-process) and filesystem; there isn’t a built-in Redis/ZK lock manager in 3.x. - SERIALIZABLE gives the strongest guarantees but can increase contention. Keep the code inside the
with transaction(...)
block as short as possible—just the check and the finalization step. - If your Dask workers run on nodes that do not share the same filesystem, file-based locks won’t coordinate across hosts. Docs and references: - Results and result storage (Prefect 3) - FileSystemLockManager API - Blog: Transactional pipelines with Prefect 3.0 If you can share how your Dask workers are deployed (same host vs multiple hosts, NFS mount options), I can sanity-check the locking setup further.
s
@Marvin for my within-task example, can I open/create the transaction context manager anywhere within my
save_zip
task logic. Or does it need to be at the top of the task?
m
thought for 33.0 seconds
Yes—you can open the transaction context anywhere inside your task. The
transaction(...)
just scopes the critical section you want to protect; it does not need to be at the top of the task. How to choose placement: - If your goal is to skip work when it’s already done, put a quick “preflight” check near the top, then do heavy work outside the lock, and re-enter a transaction only for the final write. This minimizes lock time while still avoiding duplicates. - If your goal is to protect only the final write/rename, open the transaction right around that write. This is usually best for Dask to reduce contention. - If you wrap the whole task in a transaction, you’ll hold the lock longer and increase contention—only do this if truly necessary. Two recommended patterns 1) Preflight check + lock only for commit (minimal lock time)
Copy code
from pathlib import Path
import os
from prefect.transactions import transaction, IsolationLevel
from prefect.results import ResultStore
from prefect.locking.filesystem import FileSystemLockManager
from prefect.settings import get_current_settings

lock_dir = get_current_settings().home / "locks"
txn_key = f"save_zip:{dataset}:{partition}"  # make this stable/unique
final_path = Path("/nfs/path/to/output.zip")

# Preflight read (no lock held long-term)
with transaction(
    key=txn_key,
    isolation_level=IsolationLevel.SERIALIZABLE,
    store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=lock_dir)),
) as txn:
    if txn.is_committed():
        return

# Do heavy work outside the transaction
tmp_path = final_path.with_suffix(".zip.tmp")
# ... compute and write to tmp_path ...

# Commit phase (short critical section + double-check)
with transaction(
    key=txn_key,
    isolation_level=IsolationLevel.SERIALIZABLE,
    store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=lock_dir)),
) as txn:
    if txn.is_committed():
        # Another worker already committed while we were computing
        # Optionally clean up tmp
        if tmp_path.exists():
            tmp_path.unlink(missing_ok=True)
        return

    os.replace(tmp_path, final_path)  # atomic on POSIX
    # exiting the context records the commit
2) Protect only the final write
Copy code
with transaction(
    key=txn_key,
    isolation_level=IsolationLevel.SERIALIZABLE,
    store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=lock_dir)),
) as txn:
    if txn.is_committed():
        return
    os.replace(tmp_path, final_path)
Notes: - Keep the code inside the
with transaction(...)
block as short as possible—ideally just the finalization (e.g., atomic rename). This reduces contention between Dask workers. - Your
persist_result=True
task setting is independent of transactions. The transaction’s commit record lives in the
ResultStore
you pass to `transaction(...)`; make sure that store and the lock directory are shared across workers (e.g., same NFS mount). - On NFS, ensure locking is reliable (NFSv4 recommended; avoid nolock). If workers don’t share the same filesystem, file locks won’t coordinate. References: - Results and result storage - FileSystemLockManager API - Transactional pipelines with Prefect 3.0 If you share how your workers are mounted to NFS (v3/v4, mount options), I can help validate the locking approach.
s
@Marvin but it seems to me that the "preflight" check would (successfully) commit the transaction with the same
txn_key
that is then used for re-entering a new transaction.
m
thought for 339.2 seconds
Great catch — by default a transaction will write a commit record on clean exit. So if you run a “preflight” inside a transaction with the same key and then exit without error, that first context would commit and the second context would always see it as committed. Two safe patterns: A) One transaction only (preferred) - Do the heavy work first, outside any transaction - Open a single transaction only for the finalization step, check once, then finalize and let the context commit
Copy code
from pathlib import Path
import os
from prefect.transactions import transaction, IsolationLevel
from prefect.results import ResultStore
from prefect.locking.filesystem import FileSystemLockManager
from prefect.settings import get_current_settings

lock_dir = get_current_settings().home / "locks"
txn_key = f"save_zip:{dataset}:{partition}"
final_path = Path("/nfs/path/output.zip")
tmp_path = final_path.with_suffix(".zip.tmp")

# Heavy work outside transaction
# ... compute/write to tmp_path ...

with transaction(
    key=txn_key,
    isolation_level=IsolationLevel.SERIALIZABLE,
    store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=lock_dir)),
) as txn:
    if txn.is_committed():
        # Another worker finalized already
        if tmp_path.exists():
            tmp_path.unlink(missing_ok=True)
        return

    os.replace(tmp_path, final_path)  # atomic on POSIX
    # clean exit writes the commit record
B) If you really want a preflight, make it read‑only - Use
write_on_commit=False
for the preflight so it won’t create a commit record - Re-check inside a second transaction before finalizing
Copy code
# Preflight: read-only, do NOT write a commit record
with transaction(
    key=txn_key,
    isolation_level=IsolationLevel.SERIALIZABLE,
    store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=lock_dir)),
    write_on_commit=False,
) as txn:
    if txn.is_committed():
        return

# Heavy work outside transaction
# ... compute/write to tmp_path ...

# Finalization with commit record
with transaction(
    key=txn_key,
    isolation_level=IsolationLevel.SERIALIZABLE,
    store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=lock_dir)),
) as txn:
    if txn.is_committed():
        if tmp_path.exists():
            tmp_path.unlink(missing_ok=True)
        return
    os.replace(tmp_path, final_path)
Notes: - Keep the locked section as short as possible (ideally just the final atomic rename). - Using a preflight transaction still acquires the lock; if you want less contention, skip the preflight and use pattern A. - Ensure your Dask workers all see the same lock directory and that NFS locking is reliable. Refs: - prefect.transactions API - FileSystemLockManager API - Results and result storage
s
@Marvin I am having issues with
DaskTaskRunner
and pickle, it fails to deserialize my flow script. I went with the following registration style for my transaction hookups:
Copy code
@task(tags=["some-tag"], log_prints=True, timeout_seconds=4500)
def save_zip(zip_path: Path):
   txn_lock_dir = <NFS path>
    with transaction(
        key=f"esa-id:{zip_path.stem}",
        isolation_level=IsolationLevel.SERIALIZABLE,
        write_on_commit=False,
        store=ResultStore(
            lock_manager=FileSystemLockManager(lock_files_directory=Path(txn_lock_dir))
        )
    ) as txn:
        if txn.is_committed():
            logger.warning(
                "Another Dask worker committed before"
                f" task-run id {context.task_run.id} ran the apptainer cmd."
            )
            return

         # do some zipfile processing here
Copy code
@save_zip.on_rollback
def cleanup(txn: Transaction):
    ...

@save_zip.on_commit
def committed(txn: Transaction):
    ...
I am getting this error:
Copy code
distributed.protocol.pickle - INFO - Failed to deserialize <flow_script.py> module
m
thought for 114.7 seconds
That Dask message usually means “the worker could not import or cloudpickle your module,” not that there’s anything inherently wrong with your transaction code. Common causes and fixes: 1) The module isn’t importable on workers - Symptom: distributed workers try to import your functions by module name, but that file isn’t on their sys.path or filesystem. - Fix options: - Package your project and install it in the worker environment. - Or, put your code on shared storage and ensure PYTHONPATH points to it on all workers. - Or, use Prefect deployments with source code distribution: - In Prefect 3.x, use
flow.from_source("https://...", entrypoint="pkg_or_path:flow_fn")
and
deploy(...)
so all workers pull the same code. - Docs: Store flow code 2) Running as a script (main) without guarding imports - If your flow is launched as a script, Dask may struggle to re-import functions defined in main. - Fix: move task/flow definitions into an importable module and run with: -
python -m your_package.flow_script
or - guard your entrypoint with
if __name__ == "__main__":
- Keep all task and hook function definitions at top-level of the module (no nesting/closures). 3) Non-picklable objects captured at import time - Don’t create loggers/contexts/globals that aren’t picklable at import. In Prefect 3.x: - Call
get_run_logger()
and
get_run_context()
inside the task body, not at module import time. - Ensure your
@save_zip.on_commit
/
@save_zip.on_rollback
callbacks are top-level functions that don’t close over non-picklable state. 4) Dependency/version mismatches on workers - Make sure workers have the same Python, Prefect, prefect-dask, and cloudpickle versions as the scheduler/flow runner. A minimal pattern that’s friendly to Dask serialization
Copy code
from pathlib import Path
import os

from prefect import flow, task, get_run_logger
from prefect_dask import DaskTaskRunner

from prefect.transactions import transaction, IsolationLevel
from prefect.results import ResultStore
from prefect.locking.filesystem import FileSystemLockManager
from prefect.settings import get_current_settings

@task(tags=["some-tag"], log_prints=True, timeout_seconds=4500, persist_result=True)
def save_zip(zip_path: Path, txn_lock_dir: str):
    logger = get_run_logger()

    # Heavy work outside any transaction if possible...
    # ...

    # Finalize with a single transaction
    lock_dir = Path(txn_lock_dir)
    txn_key = f"esa-id:{zip_path.stem}"
    final_path = Path("/nfs/path/to/output.zip")
    tmp_path = final_path.with_suffix(".zip.tmp")

    with transaction(
        key=txn_key,
        isolation_level=IsolationLevel.SERIALIZABLE,
        store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=lock_dir)),
    ) as txn:
        if txn.is_committed():
            logger.warning("Another worker already committed for %s", txn_key)
            if tmp_path.exists():
                tmp_path.unlink(missing_ok=True)
            return
        os.replace(tmp_path, final_path)

@flow(task_runner=DaskTaskRunner())
def my_flow(zip_paths: list[Path], txn_lock_dir: str):
    for p in zip_paths:
        save_zip.submit(p, txn_lock_dir)
If you need preflight without committing, use write_on_commit=False only for the preflight, then open a second transaction for the finalization (so the preflight doesn’t write a commit record). Questions to help pinpoint the issue - How are you running the flow (local script vs deployment)? If a script, what’s your working directory and how do you start it? - Are workers on separate machines? Do they all have the repo installed or mounted on the same NFS path and in PYTHONPATH? - Can you share a slightly longer traceback around the “Failed to deserialize <flow_script.py> module” line? References - Prefect + Dask integration
s
@Marvin Your example does not register any of the
@save_zip.on_commit
/
@save_zip.on_rollback
hooks.
m
thought for 171.6 seconds
You’re right to call that out—here’s how to register and use the commit/rollback hooks with your task, plus a correction to your transaction pattern. Key points: - The decorator style you used is supported in Prefect 3.x:
@task_fn.on_commit
and
@task_fn.on_rollback
register hook functions that receive a
Transaction
. - Keep hook functions at module top-level (not nested) so Dask can import/pickle them cleanly. - Your current single transaction uses
write_on_commit=False
, which means it will never record a commit. That will release the lock without recording completion, so another worker can redo the work. If you want a “preflight” check, use a read-only transaction for the check and then a second transaction for the final commit; or more simply, use one transaction only for the finalization with a commit record. Example A: single transaction (minimal contention, hooks fire on commit)
Copy code
from pathlib import Path
import os

from prefect import flow, task, get_run_logger
from prefect.transactions import transaction, IsolationLevel, Transaction
from prefect.results import ResultStore
from prefect.locking.filesystem import FileSystemLockManager
from prefect_dask import DaskTaskRunner

LOCK_DIR = Path("/nfs/locks")

@task(tags=["some-tag"], log_prints=True, timeout_seconds=4500)
def save_zip(zip_path: Path):
    logger = get_run_logger()

    txn_key = f"esa-id:{zip_path.stem}"
    final_path = Path("/nfs/out") / f"{zip_path.stem}.zip"
    tmp_path = final_path.with_suffix(".zip.tmp")

    # Heavy work outside the transaction; produce tmp_path
    # ... compute & write to tmp_path ...

    with transaction(
        key=txn_key,
        isolation_level=IsolationLevel.SERIALIZABLE,
        store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=LOCK_DIR)),
    ) as txn:
        if txn.is_committed():
            logger.warning("Already committed for %s", txn_key)
            if tmp_path.exists():
                tmp_path.unlink(missing_ok=True)
            return

        os.replace(tmp_path, final_path)  # atomic rename on POSIX

@save_zip.on_commit
def committed(txn: Transaction):
    # e.g. emit a log/event or clean up staging artifacts
    print(f"Committed transaction: {txn.key}")

@save_zip.on_rollback
def cleanup(txn: Transaction):
    # e.g. clean up temp files you can derive from txn.key
    print(f"Rolled back transaction: {txn.key}")

@flow(task_runner=DaskTaskRunner())
def my_flow(paths: list[Path]):
    for p in paths:
        save_zip.submit(p)
Example B: two-phase (read-only preflight + final commit) ``` @task(tags=["some-tag"], log_prints=True, timeout_seconds=4500) def save_zip(zip_path: Path): logger = get_run_logger() txn_key = f"esa-id:{zip_path.stem}" final_path = Path("/nfs/out") / f"{zip_path.stem}.zip" tmp_path = final_path.with_suffix(".zip.tmp") # Preflight (no commit record) with transaction( key=txn_key, isolation_level=IsolationLevel.SERIALIZABLE, write_on_commit=False, # read-only check store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=LOCK_DIR)), ) as txn: if txn.is_committed(): logger.warning("Already committed for %s", txn_key) return # Heavy work outside any lock # ... compute & write to tmp_path ... # Finalize with commit record (hooks will run here) with transaction( key=txn_key, isolation_level=IsolationLevel.SERIALIZABLE, store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=LOCK_DIR)), ) as txn: if txn.is_committed(): logger.warning("Already committed for %s", txn_key) if tmp_path.exists(): tmp_path.unlink(missing_ok=True) return os.replace(tmp_path, final_path) @save_zip.on_commit def committed(txn: Transaction): print(f"Committed transaction: {txn.key}") @save_zip.on_rollback def cleanup(txn: Transaction):
print(f"Rolled back transaction: {txn.key}") ``` About the Dask “Failed to deserialize <flow_script.py> module” error: - Ensure the module is importable on workers: - Install your project as a package on each worker; or - Use a shared code location on the workers’ PYTHONPATH; or - Use a deployment with source code distribution so workers pull the code: - In Prefect 3.x, use
flow.from_source(...).deploy(...)
and run via a work pool. - Keep all defs at top-level (flow, tasks, hooks). No closures around non-picklable state. - Use
get_run_logger()
and
get_run_context()
inside functions, not at import time. - Match Python/Prefect/prefect-dask versions across scheduler and workers. - If you run as a script, guard with
if __name__ == "__main__":
or run as a module. Docs: - Prefect + Dask - Transactions API - Store/distribute flow code If you can share how workers see your code (installed package vs shared mount vs deployment), I can tailor the import/distribution fix.
thought for 41.3 seconds
Got it — thanks for sharing the workaround. Why that likely “fixed” deserialization - Dask needs to import/pickle your hook function on workers. Decorating it with
@task
can change how it’s serialized and, importantly, ensures it’s a top-level, importable symbol with a stable module path. - The root issue is usually importability/pickling of the hook function (e.g., nested/closure, or module not on workers’ PYTHONPATH). Making it a task incidentally made it picklable for your environment. A couple of important implications and safer patterns 1) Prefer plain, top-level functions for hooks if you need them to run synchronously - Transaction hooks are called synchronously during commit/rollback. If the hook is a plain function, you’re guaranteed it completes before the transaction finishes. - Example:
Copy code
from prefect.transactions import Transaction

def _cleanup_impl(txn: Transaction):
    # Do cleanup deterministically here
    ...

@save_zip.on_rollback
def cleanup(txn: Transaction):
    _cleanup_impl(txn)

@save_zip.on_commit
def committed(txn: Transaction):
    # e.g., emit a metric/log
    ...
2) If you keep
@task
on the hook, consider explicitly waiting - A
@task
hook will schedule a sub-task and typically return a future. Unless you explicitly wait, the transaction may finish before the sub-task completes. - If you want deterministic cleanup, wrap and wait:
Copy code
from prefect import task

@task
def cleanup_task(txn: Transaction):
    _cleanup_impl(txn)

@save_zip.on_rollback
def cleanup(txn: Transaction):
    fut = cleanup_task.submit(txn)
    fut.result()  # ensure it completes before rollback finishes
- If you’re okay with cleanup running asynchronously after rollback, you can skip
fut.result()
. General Dask-friendly tips - Keep hooks and tasks at module top-level (no closures), and ensure workers can import your module (package install, shared mount on PYTHONPATH, or deploy with
flow.from_source(...).deploy(...)
). - Avoid non-picklable globals at import time; get loggers/contexts inside functions. If you’d like, share a small snippet of your hook and how workers get your code (package vs shared mount vs from_source). I can confirm the most robust approach for your setup. References: - Prefect + Dask - Transactions API - Store/distribute flow code
s
@Marvin To solve the deserialization issue, I had to apply a task decorator on top of the
@save_zip.on_rollback
:
Copy code
@task(log_prints=True, description='Need it as a task for deserialisation to work with Dask runner')
@save_zip.on_rollback
def cleanup(txn: Transaction) -> None:
    # rollback routines here
However, this seems to disable the callback itself. I do see the transaction inside the parent
save_zip
task being created and the lock generated, but nothing is triggered when the task fails.
m
thought for 46.0 seconds
You’ve uncovered the key gotcha: decorating the hook with @task turns it into a Prefect Task, but transaction hooks expect a plain callable that runs synchronously during rollback/commit. When you register a Task object as a hook, nothing runs because: - The transaction tries to call a regular function, not schedule a task - There’s no flow-level scheduling happening inside the transaction rollback path - Result: your hook appears “disabled” What to do instead - Keep hooks as plain, top-level functions (Dask-importable), and register them with the task using the decorator or method style. - Fix Dask deserialization by making your module importable on workers (package install, shared PYTHONPATH/NFS, or a deployment with source code) rather than wrapping hooks in @task. Correct pattern
Copy code
from prefect.transactions import Transaction

@save_zip.on_rollback
def cleanup(txn: Transaction) -> None:
    # rollback routines here
    ...

@save_zip.on_commit
def committed(txn: Transaction) -> None:
    # commit routines here
    ...
If you need to run “heavier” cleanup in the hook - Keep the hook itself synchronous and minimal. If you truly need to fan out work, call a plain helper function or trigger an external side-effect (e.g., delete tmp files). Avoid trying to submit new Prefect tasks from a hook. - If you must use a Prefect task, be explicit that you’re okay with it running outside the transactional timing guarantees. For example, have the hook enqueue a message to a durable queue your system consumes, rather than trying to schedule a Prefect subtask inline. Also double-check these points - Ensure the failure occurs inside the with transaction(...) block; rollback hooks only fire when the transaction rolls back. If the task fails after the context exits, the hook won’t run. - If you set write_on_commit=False, commit hooks will never run by design. That does not affect rollback hooks, but worth noting. How to fix Dask deserialization without @task on the hook - Make your code importable on workers: - Package and pip-install your project in the worker environment, or - Put the repo on a shared mount and add it to PYTHONPATH on all workers, or - Use a Prefect 3 deployment with source distribution so workers pull the code: - flow.from_source(...).deploy(...) - Store/distribute flow code - Keep all defs (flow, tasks, hooks) at module top-level; no closures. If you share how your workers access the code (installed package vs shared mount vs deployment), I can suggest the cleanest import/distribution setup for your cluster.
s
@Marvin Currently, the worker is accessing the location where the flow code lives via deployment within a
prefect.yaml
specification. Particularly we ask the worker to clone a Gitlab repository to the work pools working directory:
Copy code
pull:
    - prefect.deployments.steps.git_clone:
        repository: <https://gitlab.com/my_package.git>
        branch: my_feature_branch
What is the best practice for adding the path of the worker's local clone to the
PYTHONPATH
so that then the flow code can be imported as a module and thereby overcoming the Dask deserialization issue and being able to declare the transaction hooks in the correct way:
Copy code
from prefect.transactions import Transaction

@save_zip.on_rollback
def cleanup(txn: Transaction) -> None:
    # rollback routines here
    ...

@save_zip.on_commit
def committed(txn: Transaction) -> None:
    # commit routines here
    ...
Noting, that in this case it is a dynamic relative path to the work pool working directory, as I run tests for a given feature branch.
m
thought for 94.3 seconds
Great question — there are two reliable ways to make your cloned repo importable on the worker so Dask can deserialize your module and your plain function hooks will run. Recommended approach: install the repo as a package after cloning - This makes imports work without messing with PYTHONPATH and is the most robust across Dask workers. - Use deployment pull steps to: 1) clone the repo 2) set the working directory to the clone 3) pip install your project (editable or regular) and its requirements Example prefect.yaml snippet:
Copy code
deployments:
  - name: my-deployment
    entrypoint: my_package/flows.py:my_flow
    pull:
      - prefect.deployments.steps.git_clone:
          id: repo
          repository: <https://gitlab.com/my_package.git>
          branch: my_feature_branch
      - prefect.deployments.steps.set_working_directory:
          directory: "${{ repo.directory }}"
      - prefect.deployments.steps.utility.pip_install_requirements:
          directory: "${{ repo.directory }}"
          requirements_file: requirements.txt
      - prefect.deployments.steps.utility.run_shell_script:
          script: "pip install -e ."
    work_pool:
      name: my-pool
- After this, workers can import your code (e.g.,
from my_package.tasks import save_zip
) and Dask will be able to import modules by name. - Your transaction hooks should be plain, top-level functions:
Copy code
from prefect.transactions import Transaction

@save_zip.on_rollback
def cleanup(txn: Transaction) -> None:
    ...

@save_zip.on_commit
def committed(txn: Transaction) -> None:
    ...
Alternative: add the clone to PYTHONPATH via job variables - If you can’t install the package, set PYTHONPATH to include the clone path emitted by
git_clone
. - Use step outputs with an id and reference
${{ repo.directory }}
. Example:
Copy code
deployments:
  - name: my-deployment
    entrypoint: my_package/flows.py:my_flow
    pull:
      - prefect.deployments.steps.git_clone:
          id: repo
          repository: <https://gitlab.com/my_package.git>
          branch: my_feature_branch
      - prefect.deployments.steps.set_working_directory:
          directory: "${{ repo.directory }}"
    work_pool:
      name: my-pool
      job_variables:
        working_dir: "${{ repo.directory }}"
        env:
          PYTHONPATH: "${{ repo.directory }}:${{ env.PYTHONPATH }}"
- Note: use “;” instead of “:” on Windows. Important caveats - If your Dask cluster is external (address=...), those remote Dask workers do not inherit env from the Prefect worker. You must also ensure the repo is installed or available on those Dask workers (install the package there or mount the same path and set PYTHONPATH on the Dask worker side). - Keep all flows, tasks, and hooks at module top-level; avoid closures/non-picklable globals at import time. - If you’re using
write_on_commit=False
anywhere, note that commit hooks won’t fire for those transactions by design; rollback hooks still require the failure to occur inside the
with transaction(...)
block. Docs - Customize job variables: Customize job variables - Steps you can use:
git_clone
,
set_working_directory
,
pip_install_requirements
,
run_shell_script
If you can share whether your Dask cluster is local-in-worker vs external, I can suggest the exact setup (install vs PYTHONPATH) for that topology.
s
@Marvin Dask cluster is local-in-worker and connects to a local PBS scheduler with access to NFS mounts across all compute nodes.
m
thought for 51.1 seconds
Thanks — since your Dask cluster is launched “inside” the Prefect worker and submits PBS jobs to nodes that all share NFS, you have two good options. The goal is to ensure the PBS-launched Dask workers can import your repo by module name so your plain function hooks run. Option A (recommended): install the repo after cloning - Works without fiddling with PYTHONPATH at runtime - Most robust if the Python environment is the same across scheduler and workers prefect.yaml
Copy code
deployments:
  - name: my-deployment
    entrypoint: my_package/flows.py:my_flow
    pull:
      - prefect.deployments.steps.git_clone:
          id: repo
          repository: <https://gitlab.com/my_package.git>
          branch: my_feature_branch
      - prefect.deployments.steps.set_working_directory:
          directory: "${{ repo.directory }}"
      - prefect.deployments.steps.utility.pip_install_requirements:
          directory: "${{ repo.directory }}"
          requirements_file: requirements.txt
      - prefect.deployments.steps.utility.run_shell_script:
          # editable install so the installed package points to this branch/clone
          script: "pip install -e ."
    work_pool:
      name: my-pool
      job_variables:
        # optional but helpful for your code to know where the repo lives
        working_dir: "${{ repo.directory }}"
        env:
          REPO_DIR: "${{ repo.directory }}"
Then in code, just import your package normally and keep hooks as plain functions:
Copy code
from prefect.transactions import Transaction

@save_zip.on_rollback
def cleanup(txn: Transaction) -> None:
    ...

@save_zip.on_commit
def committed(txn: Transaction) -> None:
    ...
Notes - Editable install creates a .pth pointing to the clone. If your Python environment is on a shared filesystem (common on HPC) the Dask workers will pick it up. Ensure workers use the same environment (modules/conda) as the Prefect process. Option B: set PYTHONPATH for PBS-launched Dask workers - Use repo directory from the git_clone step and export it in the Dask job script via dask-jobqueue’s env_extra. - Keep editable install if you also want entry points/console scripts; otherwise PYTHONPATH alone is enough for imports. prefect.yaml
Copy code
deployments:
  - name: my-deployment
    entrypoint: my_package/flows.py:my_flow
    pull:
      - prefect.deployments.steps.git_clone:
          id: repo
          repository: <https://gitlab.com/my_package.git>
          branch: my_feature_branch
      - prefect.deployments.steps.set_working_directory:
          directory: "${{ repo.directory }}"
    work_pool:
      name: my-pool
      job_variables:
        working_dir: "${{ repo.directory }}"
        env:
          # expose the dynamic repo path to your flow code
          REPO_DIR: "${{ repo.directory }}"
          # also make it available to the Prefect process; helpful for LocalCluster cases
          PYTHONPATH: "${{ repo.directory }}:${{ env.PYTHONPATH }}"
Flow code: tell the PBSCluster to export PYTHONPATH on worker jobs
Copy code
import os
from prefect import flow
from prefect_dask import DaskTaskRunner

REPO_DIR = os.environ["REPO_DIR"]

task_runner = DaskTaskRunner(
    cluster_class="dask_jobqueue.PBSCluster",
    cluster_kwargs={
        # ... your PBS settings: cores, memory, walltime, queue, etc.
        "env_extra": [f"export PYTHONPATH={REPO_DIR}:$PYTHONPATH"],
        # optionally:
        # "job_script_prologue": ["module load python/3.10", "source /path/to/venv/bin/activate"],
    },
)

@flow(task_runner=task_runner)
def my_flow(...):
    ...
Which should you choose? - If your Python environment is truly identical across nodes and on shared storage, Option A (pip install -e .) is simplest. - If you want to avoid touching site-packages or your environment isn’t shared, Option B (PYTHONPATH via env_extra) is safe and explicit. - You can also combine them: install requirements and your package, and still export PYTHONPATH for belt-and-suspenders. Final reminders
- Keep flow, tasks, and hook functions at module top-level; do not decorate hooks with @task. - Rollback hooks only fire if the failure occurs inside the with transaction(...) block. - On NFS, prefer atomic renames (os.replace) and keep the lock-held section minimal. Helpful docs: - Customize job variables (env, working_dir) - Prefect + Dask integration - Store/distribute flow code
s
@Marvin I like "Option B (PYTHONPATH via env_extra) is safe and explicit". In my case my flow code/scripts are in a subdirectory
repo.directory
. Using Option B, can I add any extra paths when declaring my
job_variables
?
m
thought for 508.2 seconds
Short answer: yes. In Prefect 3.x, both Docker and Kubernetes workers accept environment variables via the
env
field in
job_variables
. You can set
PYTHONPATH
to include multiple paths by joining them with your OS path separator (colon on Linux/macOS, semicolon on Windows). There isn’t a built-in
env_extra
field in the default worker templates—use
env
. Given your code lives under
repo.directory
, you can add that (and any other folders) explicitly. Examples: - prefect.yaml
Copy code
deployments:
  - name: my-deployment
    entrypoint: repo/directory/flows.py:my_flow
    work_pool:
      name: my-docker-or-k8s-pool
      job_variables:
        env:
          PYTHONPATH: "./repo/directory:./another/path"
- Python API
Copy code
flow.from_source("<https://github.com/org/repo>", entrypoint="repo/directory/flows.py:my_flow").deploy(
    name="my-deployment",
    work_pool_name="my-docker-or-k8s-pool",
    job_variables={
        "env": {
            "PYTHONPATH": "./repo/directory:./another/path"
        }
    },
)
Notes: - Relative paths are resolved from the flow’s working directory inside the run environment. If you clone your repo at runtime, ensure the working directory is the repo root (or set
entrypoint
relative to wherever you land). - If you’d like to inspect the default worker template to see the
env
key, you can:
Copy code
prefect work-pool get-default-base-job-template --type docker
prefect work-pool get-default-base-job-template --type kubernetes
Docs: - Customize job variables - Docker workers - Kubernetes workers If you were relying on an
env_extra
pattern from an older example or a custom base job template, you can still use it—just make sure your base job template merges
env_extra
into
env
. By default, Prefect’s templates only expose
env
.
s
@Marvin Thanks, but I am not using that setup. Let me refresh my overall setup: Currently, the worker is accessing the location where the flow code lives via deployment within a
prefect.yaml
specification. Particularly we ask the Prefect worker to clone a Gitlab repository to the work pool's working directory:
Copy code
pull:
    - prefect.deployments.steps.git_clone:
        repository: <https://gitlab.com/my_package.git>
        branch: my_feature_branch
And I am using a Dask cluster, local-in-workers that connect to a local PBS scheduler with access to NFS mounts across all compute nodes.
m
thought for 337.6 seconds
Got it — thanks for the context. - Yes, you can add extra paths via
job_variables.env.PYTHONPATH
. Join multiple paths with
:
on Linux/macOS (or
;
on Windows). - Since you’re cloning at runtime with a
pull
step, you cannot reference the clone path in
job_variables
directly. Instead, set the working directory after the clone, then use relative paths like
.
in
PYTHONPATH
. Example prefect.yaml pattern:
Copy code
pull:
  - prefect.deployments.steps.git_clone:
      id: clone
      repository: <https://gitlab.com/my_package.git>
      branch: my_feature_branch
  - prefect.deployments.steps.set_working_directory:
      # point to the subdir that contains your flow code
      directory: "{{ clone.directory }}/repo/directory"

deployments:
  - name: my-dask-deployment
    entrypoint: repo/directory/flows.py:my_flow
    work_pool:
      name: my-pool
      job_variables:
        env:
          # include the current working dir plus any other shared libs
          PYTHONPATH: ".:/nfs/shared/other_libs"
A few important notes for your Dask + PBS setup: - The
env
you set in Prefect
job_variables
affects the flow-run process. It does not automatically propagate to Dask workers that PBS launches on other nodes. - Since you have NFS across nodes, make sure the repo clone happens into a shared path if you want workers to import from it. For a Process work pool, you can set a shared
working_dir
so the clone lands on NFS:
Copy code
work_pool:
  name: my-process-pool
  job_variables:
    working_dir: "/nfs/shared/prefect-workdir"
    env:
      PYTHONPATH: ".:/nfs/shared/other_libs"
- Also configure your Dask cluster to export the same
PYTHONPATH
to workers. With dask-jobqueue’s PBSCluster, use `env_extra`:
Copy code
from prefect_dask.task_runners import DaskTaskRunner

task_runner = DaskTaskRunner(
    cluster_class="dask_jobqueue.PBSCluster",
    cluster_kwargs={
        # ... your PBS options ...
        "env_extra": [
            "export PYTHONPATH=/nfs/shared/prefect-workdir/repo/directory:/nfs/shared/other_libs:$PYTHONPATH"
        ],
    },
)
Related docs: - Prefect: Customize job variables - Dask Jobqueue: dask-jobqueue docs Quick clarifier so I can tailor this precisely: - Which work pool type are you using (Process, Docker, or Kubernetes)? - Do you have a specific NFS path you want to clone into? If so, we can set that as the
working_dir
(Process) or rely on
set_working_directory
(Docker/K8s) and adjust
PYTHONPATH
accordingly.
s
@Marvin I have the following deployment definition yaml file:
Copy code
pull:
    - prefect.deployments.steps.git_clone:
        id: clone
        repository: <https://gitlab.com/myrepo.git>
        branch: <my_branch>

deployments:
- name: my_deployment
  work_pool:
    name: my_pool
    job_variables:
      env:
         REPO_PATH: "{{ clone.directory }}"
But I am getting this error when deploying via CLI:
Copy code
prefect deploy --prefect-file deployments/my_deployment.yaml -n s2import_prod
| WARNING | prefect.utilities.templating - Value for placeholder '{{ clone.directory }}' not found in provided values. Please ensure that the placeholder is spelled correctly and that the corresponding value is provided.
How can use the
"{{ clone.directory }}"
correctly in the deployments section?
m
thought for 50.5 seconds
You’re hitting that warning because
{{ clone.directory }}
is a pull-step output that only exists at flow run time on the worker. The
deployments
section is rendered at deploy time, so it can’t see outputs from
pull
steps. What works vs. not: - Works: Referencing
{{ clone.directory }}
in subsequent pull steps (e.g.,
set_working_directory
) because those run after
git_clone
at runtime. - Doesn’t work: Using
{{ clone.directory }}
in the
deployments:
section (including
job_variables
) because that’s evaluated during
prefect deploy
. Practical patterns you can use: Option 1: Set working directory to the cloned repo, then use relative paths in env
Copy code
pull:
  - prefect.deployments.steps.git_clone:
      id: clone
      repository: <https://gitlab.com/myrepo.git>
      branch: my_branch
  - prefect.deployments.steps.set_working_directory:
      directory: "{{ clone.directory }}"

deployments:
  - name: my_deployment
    work_pool:
      name: my_pool
      job_variables:
        env:
          # now "." resolves to the cloned repo directory at runtime
          REPO_PATH: "."
          PYTHONPATH: ".:/nfs/shared/other_libs"
Option 2: Clone into a known NFS path, then reference that static path in env - If you can control the clone destination (either via a
directory
argument on the clone step, or by an additional step to move it), choose a stable NFS path that all nodes can see.
Copy code
pull:
  - prefect.deployments.steps.git_clone:
      id: clone
      repository: <https://gitlab.com/myrepo.git>
      branch: my_branch
      # if supported, specify a target directory everyone can reach
      directory: "/nfs/shared/repos/myrepo"
  - prefect.deployments.steps.set_working_directory:
      directory: "/nfs/shared/repos/myrepo"

deployments:
  - name: my_deployment
    work_pool:
      name: my_pool
      job_variables:
        env:
          REPO_PATH: "/nfs/shared/repos/myrepo"
If
directory
isn’t supported in your setup, you can add a shell step to move the clone into your target path, then
set_working_directory
there. Dask + PBS note: - The
env
in
job_variables
affects the Prefect flow-run process only. It will not automatically propagate to the Dask workers PBS launches. If your tasks import from
REPO_PATH
, export the same value in your Dask/PBS worker environment (e.g., via
env_extra
in your PBSCluster config) and make sure it points to a shared NFS location. References: - prefect.yaml and templating stages - Pull steps (git_clone, set_working_directory) - Customize job variables If you share your work pool type (Process/Docker/Kubernetes), I can suggest the most robust way to make the clone path and env consistent across your Dask workers.
s
@Marvin How can I access
{{ clone.directory }}
at flow run-time programmatically in my flow script?
m
thought for 178.8 seconds
Short answer: you can’t access
{{ clone.directory }}
directly from your flow code. Pull step outputs only exist within the prefect.yaml templating context and only for other pull steps. Workable patterns to achieve the same outcome: 1) Set the working directory to the clone, then use Path.cwd() in your flow - In prefect.yaml:
Copy code
pull:
  - prefect.deployments.steps.git_clone:
      id: clone
      repository: <https://gitlab.com/myrepo.git>
      branch: my_branch
  - prefect.deployments.steps.set_working_directory:
      directory: "{{ clone.directory }}"
- In your flow:
Copy code
from pathlib import Path
from prefect import flow

@flow
def my_flow():
    repo_dir = Path.cwd()
    print(f"Repo cloned at: {repo_dir}")
2) Persist the clone path to a file during pull, then read it in your flow - In prefect.yaml:
Copy code
pull:
  - prefect.deployments.steps.git_clone:
      id: clone
      repository: <https://gitlab.com/myrepo.git>
      branch: my_branch
  - prefect.deployments.steps.utility.run_shell_script:
      id: write-clone-dir
      script: |
        mkdir -p .prefect_meta
        echo "{{ clone.directory }}" > .prefect_meta/clone_dir.txt
- In your flow:
Copy code
from pathlib import Path
from prefect import flow

def get_clone_dir():
    p = Path(".prefect_meta/clone_dir.txt")
    return p.read_text().strip() if p.exists() else str(Path.cwd())

@flow
def my_flow():
    clone_dir = get_clone_dir()
    print(f"Repo cloned at: {clone_dir}")
3) Clone to a known shared path and just use that in your code - If you need all workers to see the same path (e.g., NFS), move the clone into a stable directory in a pull step, then either
set_working_directory
there or reference that path directly in your flow. Why this is necessary: - Pull steps run on the worker before your flow starts. Their outputs (like
clone.directory
) are only available for templating within the pull section, not exposed via
prefect.runtime
or environment variables for the flow. References: - prefect.yaml and templating - Pull steps (git_clone, set_working_directory) - Runtime context
s
@Marvin I am having issues with this simple setup:
Copy code
pull:
    - prefect.deployments.steps.git_clone:
        id: clone
        repository: <https://gitlab.com/my_repo.git>
        branch: my_branch
    - prefect.deployments.steps.set_working_directory:
        directory: "{{ clone.directory }}"
Upon launching a deployment run, the worker is able to clone the remote repo but then errors out with:
Copy code
FileNotFoundError: [Errno 2] No such file or directory: <{{ clone.directory }}>
No issues with permisisions as the created directory for the clone has
drwxrwxr-x
What am I doing wrong?
m
thought for 215.5 seconds
That FileNotFoundError means the placeholder wasn’t rendered — your second step received the literal string “{{ clone.directory }}” instead of the actual path. A few things to check and a minimal working pattern: What to check - Make sure the step id matches exactly: - You have
id: clone
and you reference
{{ clone.directory }}
— that’s correct. - Quote the template string: - Use
directory: "{{ clone.directory }}"
(double quotes). Avoid wrapping in angle brackets; if your YAML has
<{{ clone.directory }}>
, remove the angle brackets. - Verify you’re on a recent Prefect 3.x: - There have been fixes in the step templating flow; upgrading to the latest 3.x is recommended. - Keep steps in order: - Steps run sequentially. If you want to be extra explicit, you can add
requires: [clone]
to the second step. A minimal working example
Copy code
pull:
  - prefect.deployments.steps.git_clone:
      id: clone
      repository: <https://gitlab.com/jrsrp/sys/workflows/overnight_prefect.git>
      branch: "25-s2import-implement-rollback-hook-to-avoid-incomplete-qvf-imports"

  - prefect.deployments.steps.set_working_directory:
      # This must be quoted; do not wrap with angle brackets
      directory: "{{ clone.directory }}"
      # Optional but explicit:
      requires: [clone]
If you want to confirm the value being rendered, add a quick debug step between them:
Copy code
- prefect.deployments.steps.utility.run_shell_script:
      id: debug
      script: |
        echo "clone dir is: {{ clone.directory }}"
      requires: [clone]
If the debug echo shows the placeholder instead of a concrete path, templating isn’t happening. In that case: - Double-check your YAML file for stray angle brackets or escaping. - Ensure there’s no duplicate top-level
pull:
vs per-deployment
pull:
overriding each other. - Share your Prefect version (
prefect version
) — I can advise if an upgrade is needed. Docs for reference: - Prefect YAML and templating: https://docs-3.prefect.io/v3/how-to-guides/deployments/prefect-yaml - Pull steps (git_clone, set_working_directory): https://docs-3.prefect.io/v3/api-ref/python/prefect-deployments-steps-pull
s
@Marvin Okay, that worked. Adding the path of the worker's local clone to the
PYTHONPATH
allows Dask to import the flow code as a module and thereby overcoming the Dask deserialization issue and being able to declare the transaction hooks in the correct way, without needing to re-wrap the txn hooks as an additional
@task
:
Copy code
LOCK_DIR = Path("/nfs/locks")

@task(tags=["some-tag"], log_prints=True, timeout_seconds=4500)
def save_zip(zip_path: Path):
    logger = get_run_logger()

    txn_key = f"esa-id:{zip_path.stem}"
    final_path = Path("/nfs/out") / f"{zip_path.stem}.zip"
    tmp_path = final_path.with_suffix(".zip.tmp")

    # Heavy work outside the transaction; produce tmp_path
    # ... compute & write to tmp_path ...

    with transaction(
        key=txn_key,
        isolation_level=IsolationLevel.SERIALIZABLE,
        store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=LOCK_DIR)),
    ) as txn:
        if txn.is_committed():
            logger.warning("Already committed for %s", txn_key)
            if tmp_path.exists():
                tmp_path.unlink(missing_ok=True)
            return

        os.replace(tmp_path, final_path)  # atomic rename on POSIX

@save_zip.on_commit
def committed(txn: Transaction):
    # e.g. emit a log/event or clean up staging artifacts
    print(f"Committed transaction: {txn.key}")

@save_zip.on_rollback
def cleanup(txn: Transaction):
    # e.g. clean up temp files you can derive from txn.key
    print(f"Rolled back transaction: {txn.key}")

@flow(task_runner=DaskTaskRunner())
def my_flow(paths: list[Path]):
    for p in paths:
        save_zip.submit(p)
But the same nullable effect still holds: the
cleanup
and
comitted
hooks do not run, i.e. nothing happens, because it seems they need to be scheduled as tasks when working behind a
DaskTaskRunner
cluster. . . How can make them work with a Dask cluster?
m
thought for 247.3 seconds
Great progress! The hooks should work with Dask — they execute on the Dask worker in the same process as the task — but there are a few common gotchas that can make them look like they’re “not firing”. Checklist to make them work and visible: - Ensure a real commit/rollback occurs - In your code, if
txn.is_committed()
returns True, you return early. In that path no new commit happens and no hooks run. Test with a fresh
txn_key
or force a rollback (raise an exception inside the
with transaction(...)
block) to verify hooks. - Use Prefect’s logger inside hooks rather than print - Your prints run on the Dask worker and may not show up in Prefect logs. Use
get_run_logger()
so they land in the task logs. - Keep hook functions at module-level (importable by workers) - You’ve already addressed importability via PYTHONPATH; just make sure hooks aren’t nested/closures. Example update using Prefect logger:
Copy code
from prefect import task, flow, get_run_logger, Transaction
from prefect.transactions import transaction, IsolationLevel
from prefect.results import ResultStore
from pathlib import Path
import os

LOCK_DIR = Path("/nfs/locks")

@task(tags=["some-tag"], log_prints=True, timeout_seconds=4500)
def save_zip(zip_path: Path):
    logger = get_run_logger()

    txn_key = f"esa-id:{zip_path.stem}"
    final_path = Path("/nfs/out") / f"{zip_path.stem}.zip"
    tmp_path = final_path.with_suffix(".zip.tmp")

    # ... produce tmp_path ...

    with transaction(
        key=txn_key,
        isolation_level=IsolationLevel.SERIALIZABLE,
        store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=LOCK_DIR)),
    ) as txn:
        if txn.is_committed():
            logger.warning("Already committed for %s", txn_key)
            if tmp_path.exists():
                tmp_path.unlink(missing_ok=True)
            return

        os.replace(tmp_path, final_path)  # atomic rename on POSIX
        <http://logger.info|logger.info>("Prepared final file for %s; leaving transaction block to commit", txn_key)

@save_zip.on_commit
def committed(txn: Transaction):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Committed transaction: {txn.key}")

@save_zip.on_rollback
def cleanup(txn: Transaction):
    logger = get_run_logger()
    logger.warning(f"Rolled back transaction: {txn.key}")
Tips if it still looks quiet: - Try a minimal, fresh-key test to prove hooks run:
Copy code
@task(log_prints=True)
def t():
    with transaction(key="hook-test-123", store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=LOCK_DIR))):
        pass

@t.on_commit
def _c(txn: Transaction):
    get_run_logger().info(f"Commit hook fired for {txn.key}")
- Make sure the Dask workers have the same Prefect version and can import your module (you’ve set PYTHONPATH — good). If you’re creating workers via PBS, ensure the PBS job passes that env (e.g., jobqueue
env_extra
). - Alternative registration: you can also embed hooks via options so they’re serialized with the task object:
Copy code
def _commit(txn: Transaction):
    get_run_logger().info(f"Commit hook fired for {txn.key}")

def _rollback(txn: Transaction):
    get_run_logger().warning(f"Rollback hook fired for {txn.key}")

save_zip = save_zip.with_options(on_commit=[_commit], on_rollback=[_rollback])
Docs for deeper reference: - Transactions (commit/rollback hooks) - Runtime context and logging - Using DaskTaskRunner If you try the fresh
txn_key
and logger-based hooks and still don’t see logs, share a snippet of the task run logs (and your Prefect/Dask versions) and I’ll help dig in.
s
@Marvin I am using Prefect v3.4.1 and can't use the alternative hook registration:
Copy code
def _commit(txn: Transaction):
    get_run_logger().info(f"Commit hook fired for {txn.key}")

def _rollback(txn: Transaction):
    get_run_logger().warning(f"Rollback hook fired for {txn.key}")

save_zip = save_zip.with_options(on_commit=[_commit], on_rollback=[_rollback])
What version of Prefect where they introduced?
m
thought for 124.6 seconds
You’re right to question that —
with_options(on_commit=..., on_rollback=...)
is not supported in Prefect 3.x. There isn’t a version where those transaction hooks were added to `with_options`; only state hooks like
on_completion
and
on_failure
are supported there. Supported ways to register transaction hooks in 3.4.1 (and current 3.x): - Use the task’s decorator methods
Copy code
@save_zip.on_commit
def _commit(txn: Transaction):
    get_run_logger().info(f"Commit hook fired for {txn.key}")

@save_zip.on_rollback
def _rollback(txn: Transaction):
    get_run_logger().warning(f"Rollback hook fired for {txn.key}")
- Keep these hook functions at module scope so Dask workers can import them. References: - Transactions and hooks: Transactions (commit/rollback hooks) - Task options and state hooks supported by with_options: State change hooks