Santiago Gutierrez
10/10/2025, 5:10 AMDaskTaskRunner():
@flow(
task_runner=DaskTaskRunner(
cluster_class=PBSCluster,
cluster_kwargs=pbs_cluster_kwargs,
adapt_kwargs={"maximum": 20}
),
persist_result=True
)
def my_flow(
) -> list[Path]:
# dask cluster routines
download_futures = download_from_api.map(items)
# rollback hook would apply only to this Prefect task
with transaction() as txn:
txn.set("filename", <download_future>)
saved_zipfile_futures = save_zip.map(zip_path=download_futures)
Should i use an asynchronous context manager for this dask distributed flow/tasks?
And what type of lock manager should I use to handle race conditions?Marvin
10/10/2025, 5:10 AMMarvin
10/10/2025, 5:25 AMfrom pathlib import Path
import shutil
from dask_jobqueue import PBSCluster
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
from prefect.transactions import transaction
@task
def download_from_api(item: str) -> Path:
# ... produce a local zip Path for this item ...
...
@task
def save_zip(zip_path: Path, dest_dir: Path) -> Path:
# Open a transaction inside the task so it applies on the Dask worker
with transaction(key=f"save-zip:{zip_path.name}") as txn:
dest = dest_dir / zip_path.name
# Register compensation for this unit of work
# Use a closure to capture the concrete dest path
txn.add_rollback_hook(lambda t, p=dest: p.exists() and p.unlink())
# Do the side-effect
shutil.copy2(zip_path, dest)
# Optionally also register a commit action (e.g., tag/mark)
# txn.add_commit_hook(lambda t, p=dest: ...)
return dest
@flow(
task_runner=DaskTaskRunner(
cluster_class=PBSCluster,
cluster_kwargs=..., # your PBS config
adapt_kwargs={"maximum": 20}
),
persist_result=True,
)
def my_flow(items: list[str], dest_dir: Path) -> list[Path]:
download_futures = download_from_api.map(items)
saved_zipfile_futures = save_zip.map(zip_path=download_futures, dest_dir=dest_dir)
return saved_zipfile_futures
Notes and options
- Why inside the task? Dask executes mapped tasks in separate processes; opening with transaction():
in the flow won’t be visible to those workers. Opening it in the task ensures the rollback hooks run if that unit fails or if its transaction is rolled back.
- Task-level hooks: You can also attach hooks at task definition time using @task(on_rollback=[...], on_commit=[...])
. These run in the context of the active transaction for that task run. They’re great for fixed behavior, but registering hooks inside the task body (as above) is more flexible because you can capture per-item details like a specific file path.
- State hooks vs transactions: on_failure
state hooks (docs) run when a task/flow enters a failed state. Use these for “cleanup only if this task fails.” Transactions (docs) add “commit or roll back” semantics and can run compensation even when a later error occurs elsewhere in the transaction scope.
If you want “all-or-nothing” rollback across the entire map
- Per-item transactions (above) only clean up the failing item. If you want to roll back all saved files when any item fails, you can:
- Create a “parent” transaction in the flow, pass a transaction key into each task, and have the task open/join a child transaction using that key; then if any failure occurs and the parent rolls back, all child hooks run.
- This pattern requires a shared ResultStore and propagating a transaction key to workers. It’s a bit more advanced; see Transactions docs for details and test in your environment.
Quick sanity check
- Dask runner import: in Prefect 3.x, use from prefect_dask.task_runners import DaskTaskRunner
.
- PBSCluster: ensure you’ve configured dask-jobqueue
and PBS resources correctly.
Could you confirm which behavior you need?
- A) Roll back only the side-effects for the item that failed
- B) Roll back all previously saved items if any mapped task fails
I can tailor the code to your exact rollback semantics once I know which you’re aiming for.Santiago Gutierrez
10/10/2025, 5:40 AMMarvin
10/10/2025, 5:40 AMMarvin
10/10/2025, 5:43 AMfrom pathlib import Path
import shutil
from dask_jobqueue import PBSCluster
from prefect import flow, task, get_run_logger
from prefect.transactions import transaction, Transaction
from prefect_dask.task_runners import DaskTaskRunner
# Hook signatures: each receives a single Transaction
def cleanup_saved_file(txn: Transaction) -> None:
# Read values captured during the task run
dest_str = txn.get("dest_path", None)
if not dest_str:
return
dest = Path(dest_str)
try:
if dest.exists():
dest.unlink()
except Exception as exc:
# Avoid raising from hooks; just log if needed
logger = get_run_logger()
logger.warning(f"Rollback hook could not remove {dest}: {exc}")
def log_commit(txn: Transaction) -> None:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Committed file: {txn.get('dest_path', '<unknown>')}")
@task(
on_rollback=[cleanup_saved_file], # runs if the transaction rolls back
on_commit=[log_commit], # runs if the transaction commits
)
def save_zip(zip_path: Path, dest_dir: Path) -> Path:
# Open a transaction INSIDE the task so it applies on the Dask worker
with transaction(key=f"save-zip:{zip_path.name}") as txn:
dest = dest_dir / zip_path.name
# Make data available to hooks
txn.set("dest_path", str(dest))
# Side-effect
shutil.copy2(zip_path, dest)
return dest
@task
def download_from_api(item: str) -> Path:
# ... produce a local zip Path for this item ...
...
@flow(
task_runner=DaskTaskRunner(
cluster_class=PBSCluster,
cluster_kwargs=..., # your PBS config
adapt_kwargs={"maximum": 20},
),
persist_result=True,
)
def my_flow(items: list[str], dest_dir: Path) -> list[Path]:
download_futures = download_from_api.map(items)
saved_zipfile_futures = save_zip.map(zip_path=download_futures, dest_dir=dest_dir)
return saved_zipfile_futures
Alternative hook registration style
- You can also attach hooks after defining a task:
@save_zip.on_rollback
def cleanup(txn: Transaction):
...
@save_zip.on_commit
def committed(txn: Transaction):
...
Notes and gotchas
- Transactions are not opened automatically just because hooks are present; you must call with transaction() as txn:
inside the task for the hooks to run.
- To make hooks item-aware, put per-item values into the transaction with txn.set("key", value)
in the task, then read them in the hook via txn.get("key")
.
- Hooks run in the Dask worker where the task executes, so get_run_logger()
works there.
- If you ever see inconsistencies accessing data from on_commit hooks in nested/parent transactions, see this GitHub report for context: issue #17864.
Docs
- Transactions: Advanced transactions
- State hooks overview: State-change hooksSantiago Gutierrez
10/16/2025, 12:19 AMpersist_result=true
. I know where these results get stored on my local NFS where the prefect worker operates. My question is, if I want to add isolation_level=IsolationLevel.SERIALIZABLE
to the in-task transaction to avoid any possible race conditions between Dask workers, is it okay to use the FileSystemLockManager
lock_manager as prescribed in the documemtation:
with transaction(
key=transaction_key,
isolation_level=IsolationLevel.SERIALIZABLE,
store=ResultStore(
lock_manager=FileSystemLockManager(
lock_files_directory=PREFECT_HOME.value() / "locks"
)
),
) as txn:
if txn.is_committed():
print("Data file has already been written. Exiting early.")
return
Marvin
10/16/2025, 12:19 AMMarvin
10/16/2025, 12:22 AMPREFECT_HOME.value()
.
- Make sure the transaction key is stable and uniquely identifies the resource you’re protecting (e.g., the target file path or dataset/date partition).
- persist_result=True
on tasks is independent of the transaction; the transaction’s commit record is written via the ResultStore
you pass to transaction(...)
.
Example (Prefect 3.x):
from prefect.transactions import transaction, IsolationLevel
from prefect.results import ResultStore
from prefect.locking.filesystem import FileSystemLockManager
from prefect.settings import get_current_settings
lock_dir = get_current_settings().home / "locks"
with transaction(
key=transaction_key, # e.g., f"write:{target_path}"
isolation_level=IsolationLevel.SERIALIZABLE,
store=ResultStore(
lock_manager=FileSystemLockManager(lock_files_directory=lock_dir)
),
) as txn:
if txn.is_committed():
# Someone already committed this work
return
# Do the work here (ideally write to a tmp path)
# ...
# Let the context commit at exit; keep the critical section minimal
Caveats and tips:
- NFS locking reliability and performance vary by implementation and mount options. If you see contention or flaky behavior, verify NFS lock daemon/opts and consider a smaller critical section or a single-writer pattern. Prefect’s built-in lock managers are memory (single-process) and filesystem; there isn’t a built-in Redis/ZK lock manager in 3.x.
- SERIALIZABLE gives the strongest guarantees but can increase contention. Keep the code inside the with transaction(...)
block as short as possible—just the check and the finalization step.
- If your Dask workers run on nodes that do not share the same filesystem, file-based locks won’t coordinate across hosts.
Docs and references:
- Results and result storage (Prefect 3)
- FileSystemLockManager API
- Blog: Transactional pipelines with Prefect 3.0
If you can share how your Dask workers are deployed (same host vs multiple hosts, NFS mount options), I can sanity-check the locking setup further.Santiago Gutierrez
10/16/2025, 1:13 AMsave_zip
task logic. Or does it need to be at the top of the task?Marvin
10/16/2025, 1:14 AMMarvin
10/16/2025, 1:14 AMtransaction(...)
just scopes the critical section you want to protect; it does not need to be at the top of the task.
How to choose placement:
- If your goal is to skip work when it’s already done, put a quick “preflight” check near the top, then do heavy work outside the lock, and re-enter a transaction only for the final write. This minimizes lock time while still avoiding duplicates.
- If your goal is to protect only the final write/rename, open the transaction right around that write. This is usually best for Dask to reduce contention.
- If you wrap the whole task in a transaction, you’ll hold the lock longer and increase contention—only do this if truly necessary.
Two recommended patterns
1) Preflight check + lock only for commit (minimal lock time)
from pathlib import Path
import os
from prefect.transactions import transaction, IsolationLevel
from prefect.results import ResultStore
from prefect.locking.filesystem import FileSystemLockManager
from prefect.settings import get_current_settings
lock_dir = get_current_settings().home / "locks"
txn_key = f"save_zip:{dataset}:{partition}" # make this stable/unique
final_path = Path("/nfs/path/to/output.zip")
# Preflight read (no lock held long-term)
with transaction(
key=txn_key,
isolation_level=IsolationLevel.SERIALIZABLE,
store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=lock_dir)),
) as txn:
if txn.is_committed():
return
# Do heavy work outside the transaction
tmp_path = final_path.with_suffix(".zip.tmp")
# ... compute and write to tmp_path ...
# Commit phase (short critical section + double-check)
with transaction(
key=txn_key,
isolation_level=IsolationLevel.SERIALIZABLE,
store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=lock_dir)),
) as txn:
if txn.is_committed():
# Another worker already committed while we were computing
# Optionally clean up tmp
if tmp_path.exists():
tmp_path.unlink(missing_ok=True)
return
os.replace(tmp_path, final_path) # atomic on POSIX
# exiting the context records the commit
2) Protect only the final write
with transaction(
key=txn_key,
isolation_level=IsolationLevel.SERIALIZABLE,
store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=lock_dir)),
) as txn:
if txn.is_committed():
return
os.replace(tmp_path, final_path)
Notes:
- Keep the code inside the with transaction(...)
block as short as possible—ideally just the finalization (e.g., atomic rename). This reduces contention between Dask workers.
- Your persist_result=True
task setting is independent of transactions. The transaction’s commit record lives in the ResultStore
you pass to `transaction(...)`; make sure that store and the lock directory are shared across workers (e.g., same NFS mount).
- On NFS, ensure locking is reliable (NFSv4 recommended; avoid nolock). If workers don’t share the same filesystem, file locks won’t coordinate.
References:
- Results and result storage
- FileSystemLockManager API
- Transactional pipelines with Prefect 3.0
If you share how your workers are mounted to NFS (v3/v4, mount options), I can help validate the locking approach.Santiago Gutierrez
10/16/2025, 1:53 AMtxn_key
that is then used for re-entering a new transaction.Marvin
10/16/2025, 1:53 AMMarvin
10/16/2025, 1:59 AMfrom pathlib import Path
import os
from prefect.transactions import transaction, IsolationLevel
from prefect.results import ResultStore
from prefect.locking.filesystem import FileSystemLockManager
from prefect.settings import get_current_settings
lock_dir = get_current_settings().home / "locks"
txn_key = f"save_zip:{dataset}:{partition}"
final_path = Path("/nfs/path/output.zip")
tmp_path = final_path.with_suffix(".zip.tmp")
# Heavy work outside transaction
# ... compute/write to tmp_path ...
with transaction(
key=txn_key,
isolation_level=IsolationLevel.SERIALIZABLE,
store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=lock_dir)),
) as txn:
if txn.is_committed():
# Another worker finalized already
if tmp_path.exists():
tmp_path.unlink(missing_ok=True)
return
os.replace(tmp_path, final_path) # atomic on POSIX
# clean exit writes the commit record
B) If you really want a preflight, make it read‑only
- Use write_on_commit=False
for the preflight so it won’t create a commit record
- Re-check inside a second transaction before finalizing
# Preflight: read-only, do NOT write a commit record
with transaction(
key=txn_key,
isolation_level=IsolationLevel.SERIALIZABLE,
store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=lock_dir)),
write_on_commit=False,
) as txn:
if txn.is_committed():
return
# Heavy work outside transaction
# ... compute/write to tmp_path ...
# Finalization with commit record
with transaction(
key=txn_key,
isolation_level=IsolationLevel.SERIALIZABLE,
store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=lock_dir)),
) as txn:
if txn.is_committed():
if tmp_path.exists():
tmp_path.unlink(missing_ok=True)
return
os.replace(tmp_path, final_path)
Notes:
- Keep the locked section as short as possible (ideally just the final atomic rename).
- Using a preflight transaction still acquires the lock; if you want less contention, skip the preflight and use pattern A.
- Ensure your Dask workers all see the same lock directory and that NFS locking is reliable.
Refs:
- prefect.transactions API
- FileSystemLockManager API
- Results and result storageSantiago Gutierrez
10/16/2025, 6:09 AMDaskTaskRunner
and pickle, it fails to deserialize my flow script. I went with the following registration style for my transaction hookups:
@task(tags=["some-tag"], log_prints=True, timeout_seconds=4500)
def save_zip(zip_path: Path):
txn_lock_dir = <NFS path>
with transaction(
key=f"esa-id:{zip_path.stem}",
isolation_level=IsolationLevel.SERIALIZABLE,
write_on_commit=False,
store=ResultStore(
lock_manager=FileSystemLockManager(lock_files_directory=Path(txn_lock_dir))
)
) as txn:
if txn.is_committed():
logger.warning(
"Another Dask worker committed before"
f" task-run id {context.task_run.id} ran the apptainer cmd."
)
return
# do some zipfile processing here
@save_zip.on_rollback
def cleanup(txn: Transaction):
...
@save_zip.on_commit
def committed(txn: Transaction):
...
I am getting this error:
distributed.protocol.pickle - INFO - Failed to deserialize <flow_script.py> module
Marvin
10/16/2025, 6:09 AMMarvin
10/16/2025, 6:11 AMflow.from_source("https://...", entrypoint="pkg_or_path:flow_fn")
and deploy(...)
so all workers pull the same code.
- Docs: Store flow code
2) Running as a script (main) without guarding imports
- If your flow is launched as a script, Dask may struggle to re-import functions defined in main.
- Fix: move task/flow definitions into an importable module and run with:
- python -m your_package.flow_script
or
- guard your entrypoint with if __name__ == "__main__":
- Keep all task and hook function definitions at top-level of the module (no nesting/closures).
3) Non-picklable objects captured at import time
- Don’t create loggers/contexts/globals that aren’t picklable at import. In Prefect 3.x:
- Call get_run_logger()
and get_run_context()
inside the task body, not at module import time.
- Ensure your @save_zip.on_commit
/ @save_zip.on_rollback
callbacks are top-level functions that don’t close over non-picklable state.
4) Dependency/version mismatches on workers
- Make sure workers have the same Python, Prefect, prefect-dask, and cloudpickle versions as the scheduler/flow runner.
A minimal pattern that’s friendly to Dask serialization
from pathlib import Path
import os
from prefect import flow, task, get_run_logger
from prefect_dask import DaskTaskRunner
from prefect.transactions import transaction, IsolationLevel
from prefect.results import ResultStore
from prefect.locking.filesystem import FileSystemLockManager
from prefect.settings import get_current_settings
@task(tags=["some-tag"], log_prints=True, timeout_seconds=4500, persist_result=True)
def save_zip(zip_path: Path, txn_lock_dir: str):
logger = get_run_logger()
# Heavy work outside any transaction if possible...
# ...
# Finalize with a single transaction
lock_dir = Path(txn_lock_dir)
txn_key = f"esa-id:{zip_path.stem}"
final_path = Path("/nfs/path/to/output.zip")
tmp_path = final_path.with_suffix(".zip.tmp")
with transaction(
key=txn_key,
isolation_level=IsolationLevel.SERIALIZABLE,
store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=lock_dir)),
) as txn:
if txn.is_committed():
logger.warning("Another worker already committed for %s", txn_key)
if tmp_path.exists():
tmp_path.unlink(missing_ok=True)
return
os.replace(tmp_path, final_path)
@flow(task_runner=DaskTaskRunner())
def my_flow(zip_paths: list[Path], txn_lock_dir: str):
for p in zip_paths:
save_zip.submit(p, txn_lock_dir)
If you need preflight without committing, use write_on_commit=False only for the preflight, then open a second transaction for the finalization (so the preflight doesn’t write a commit record).
Questions to help pinpoint the issue
- How are you running the flow (local script vs deployment)? If a script, what’s your working directory and how do you start it?
- Are workers on separate machines? Do they all have the repo installed or mounted on the same NFS path and in PYTHONPATH?
- Can you share a slightly longer traceback around the “Failed to deserialize <flow_script.py> module” line?
References
- Prefect + Dask integrationMarvin
10/16/2025, 6:11 AMSantiago Gutierrez
10/16/2025, 6:18 AM@save_zip.on_commit
/ @save_zip.on_rollback
hooks.Marvin
10/16/2025, 6:18 AMMarvin
10/16/2025, 6:21 AM@task_fn.on_commit
and @task_fn.on_rollback
register hook functions that receive a Transaction
.
- Keep hook functions at module top-level (not nested) so Dask can import/pickle them cleanly.
- Your current single transaction uses write_on_commit=False
, which means it will never record a commit. That will release the lock without recording completion, so another worker can redo the work. If you want a “preflight” check, use a read-only transaction for the check and then a second transaction for the final commit; or more simply, use one transaction only for the finalization with a commit record.
Example A: single transaction (minimal contention, hooks fire on commit)
from pathlib import Path
import os
from prefect import flow, task, get_run_logger
from prefect.transactions import transaction, IsolationLevel, Transaction
from prefect.results import ResultStore
from prefect.locking.filesystem import FileSystemLockManager
from prefect_dask import DaskTaskRunner
LOCK_DIR = Path("/nfs/locks")
@task(tags=["some-tag"], log_prints=True, timeout_seconds=4500)
def save_zip(zip_path: Path):
logger = get_run_logger()
txn_key = f"esa-id:{zip_path.stem}"
final_path = Path("/nfs/out") / f"{zip_path.stem}.zip"
tmp_path = final_path.with_suffix(".zip.tmp")
# Heavy work outside the transaction; produce tmp_path
# ... compute & write to tmp_path ...
with transaction(
key=txn_key,
isolation_level=IsolationLevel.SERIALIZABLE,
store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=LOCK_DIR)),
) as txn:
if txn.is_committed():
logger.warning("Already committed for %s", txn_key)
if tmp_path.exists():
tmp_path.unlink(missing_ok=True)
return
os.replace(tmp_path, final_path) # atomic rename on POSIX
@save_zip.on_commit
def committed(txn: Transaction):
# e.g. emit a log/event or clean up staging artifacts
print(f"Committed transaction: {txn.key}")
@save_zip.on_rollback
def cleanup(txn: Transaction):
# e.g. clean up temp files you can derive from txn.key
print(f"Rolled back transaction: {txn.key}")
@flow(task_runner=DaskTaskRunner())
def my_flow(paths: list[Path]):
for p in paths:
save_zip.submit(p)
Example B: two-phase (read-only preflight + final commit)
```
@task(tags=["some-tag"], log_prints=True, timeout_seconds=4500)
def save_zip(zip_path: Path):
logger = get_run_logger()
txn_key = f"esa-id:{zip_path.stem}"
final_path = Path("/nfs/out") / f"{zip_path.stem}.zip"
tmp_path = final_path.with_suffix(".zip.tmp")
# Preflight (no commit record)
with transaction(
key=txn_key,
isolation_level=IsolationLevel.SERIALIZABLE,
write_on_commit=False, # read-only check
store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=LOCK_DIR)),
) as txn:
if txn.is_committed():
logger.warning("Already committed for %s", txn_key)
return
# Heavy work outside any lock
# ... compute & write to tmp_path ...
# Finalize with commit record (hooks will run here)
with transaction(
key=txn_key,
isolation_level=IsolationLevel.SERIALIZABLE,
store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=LOCK_DIR)),
) as txn:
if txn.is_committed():
logger.warning("Already committed for %s", txn_key)
if tmp_path.exists():
tmp_path.unlink(missing_ok=True)
return
os.replace(tmp_path, final_path)
@save_zip.on_commit
def committed(txn: Transaction):
print(f"Committed transaction: {txn.key}")
@save_zip.on_rollback
def cleanup(txn: Transaction):Marvin
10/16/2025, 6:21 AMflow.from_source(...).deploy(...)
and run via a work pool.
- Keep all defs at top-level (flow, tasks, hooks). No closures around non-picklable state.
- Use get_run_logger()
and get_run_context()
inside functions, not at import time.
- Match Python/Prefect/prefect-dask versions across scheduler and workers.
- If you run as a script, guard with if __name__ == "__main__":
or run as a module.
Docs:
- Prefect + Dask
- Transactions API
- Store/distribute flow code
If you can share how workers see your code (installed package vs shared mount vs deployment), I can tailor the import/distribution fix.Marvin
10/16/2025, 8:04 AMMarvin
10/16/2025, 8:05 AM@task
can change how it’s serialized and, importantly, ensures it’s a top-level, importable symbol with a stable module path.
- The root issue is usually importability/pickling of the hook function (e.g., nested/closure, or module not on workers’ PYTHONPATH). Making it a task incidentally made it picklable for your environment.
A couple of important implications and safer patterns
1) Prefer plain, top-level functions for hooks if you need them to run synchronously
- Transaction hooks are called synchronously during commit/rollback. If the hook is a plain function, you’re guaranteed it completes before the transaction finishes.
- Example:
from prefect.transactions import Transaction
def _cleanup_impl(txn: Transaction):
# Do cleanup deterministically here
...
@save_zip.on_rollback
def cleanup(txn: Transaction):
_cleanup_impl(txn)
@save_zip.on_commit
def committed(txn: Transaction):
# e.g., emit a metric/log
...
2) If you keep @task
on the hook, consider explicitly waiting
- A @task
hook will schedule a sub-task and typically return a future. Unless you explicitly wait, the transaction may finish before the sub-task completes.
- If you want deterministic cleanup, wrap and wait:
from prefect import task
@task
def cleanup_task(txn: Transaction):
_cleanup_impl(txn)
@save_zip.on_rollback
def cleanup(txn: Transaction):
fut = cleanup_task.submit(txn)
fut.result() # ensure it completes before rollback finishes
- If you’re okay with cleanup running asynchronously after rollback, you can skip fut.result()
.
General Dask-friendly tips
- Keep hooks and tasks at module top-level (no closures), and ensure workers can import your module (package install, shared mount on PYTHONPATH, or deploy with flow.from_source(...).deploy(...)
).
- Avoid non-picklable globals at import time; get loggers/contexts inside functions.
If you’d like, share a small snippet of your hook and how workers get your code (package vs shared mount vs from_source). I can confirm the most robust approach for your setup.
References:
- Prefect + Dask
- Transactions API
- Store/distribute flow codeSantiago Gutierrez
10/16/2025, 8:14 AM@save_zip.on_rollback
:
@task(log_prints=True, description='Need it as a task for deserialisation to work with Dask runner')
@save_zip.on_rollback
def cleanup(txn: Transaction) -> None:
# rollback routines here
However, this seems to disable the callback itself. I do see the transaction inside the parent save_zip
task being created and the lock generated, but nothing is triggered when the task fails.Marvin
10/16/2025, 8:15 AMMarvin
10/16/2025, 8:16 AMfrom prefect.transactions import Transaction
@save_zip.on_rollback
def cleanup(txn: Transaction) -> None:
# rollback routines here
...
@save_zip.on_commit
def committed(txn: Transaction) -> None:
# commit routines here
...
If you need to run “heavier” cleanup in the hook
- Keep the hook itself synchronous and minimal. If you truly need to fan out work, call a plain helper function or trigger an external side-effect (e.g., delete tmp files). Avoid trying to submit new Prefect tasks from a hook.
- If you must use a Prefect task, be explicit that you’re okay with it running outside the transactional timing guarantees. For example, have the hook enqueue a message to a durable queue your system consumes, rather than trying to schedule a Prefect subtask inline.
Also double-check these points
- Ensure the failure occurs inside the with transaction(...) block; rollback hooks only fire when the transaction rolls back. If the task fails after the context exits, the hook won’t run.
- If you set write_on_commit=False, commit hooks will never run by design. That does not affect rollback hooks, but worth noting.
How to fix Dask deserialization without @task on the hook
- Make your code importable on workers:
- Package and pip-install your project in the worker environment, or
- Put the repo on a shared mount and add it to PYTHONPATH on all workers, or
- Use a Prefect 3 deployment with source distribution so workers pull the code:
- flow.from_source(...).deploy(...)
- Store/distribute flow code
- Keep all defs (flow, tasks, hooks) at module top-level; no closures.
If you share how your workers access the code (installed package vs shared mount vs deployment), I can suggest the cleanest import/distribution setup for your cluster.Santiago Gutierrez
10/16/2025, 8:41 AMprefect.yaml
specification. Particularly we ask the worker to clone a Gitlab repository to the work pools working directory:
pull:
- prefect.deployments.steps.git_clone:
repository: <https://gitlab.com/my_package.git>
branch: my_feature_branch
What is the best practice for adding the path of the worker's local clone to the PYTHONPATH
so that then the flow code can be imported as a module and thereby overcoming the Dask deserialization issue and being able to declare the transaction hooks in the correct way:
from prefect.transactions import Transaction
@save_zip.on_rollback
def cleanup(txn: Transaction) -> None:
# rollback routines here
...
@save_zip.on_commit
def committed(txn: Transaction) -> None:
# commit routines here
...
Noting, that in this case it is a dynamic relative path to the work pool working directory, as I run tests for a given feature branch.Marvin
10/16/2025, 8:41 AMMarvin
10/16/2025, 8:43 AMdeployments:
- name: my-deployment
entrypoint: my_package/flows.py:my_flow
pull:
- prefect.deployments.steps.git_clone:
id: repo
repository: <https://gitlab.com/my_package.git>
branch: my_feature_branch
- prefect.deployments.steps.set_working_directory:
directory: "${{ repo.directory }}"
- prefect.deployments.steps.utility.pip_install_requirements:
directory: "${{ repo.directory }}"
requirements_file: requirements.txt
- prefect.deployments.steps.utility.run_shell_script:
script: "pip install -e ."
work_pool:
name: my-pool
- After this, workers can import your code (e.g., from my_package.tasks import save_zip
) and Dask will be able to import modules by name.
- Your transaction hooks should be plain, top-level functions:
from prefect.transactions import Transaction
@save_zip.on_rollback
def cleanup(txn: Transaction) -> None:
...
@save_zip.on_commit
def committed(txn: Transaction) -> None:
...
Alternative: add the clone to PYTHONPATH via job variables
- If you can’t install the package, set PYTHONPATH to include the clone path emitted by git_clone
.
- Use step outputs with an id and reference ${{ repo.directory }}
.
Example:
deployments:
- name: my-deployment
entrypoint: my_package/flows.py:my_flow
pull:
- prefect.deployments.steps.git_clone:
id: repo
repository: <https://gitlab.com/my_package.git>
branch: my_feature_branch
- prefect.deployments.steps.set_working_directory:
directory: "${{ repo.directory }}"
work_pool:
name: my-pool
job_variables:
working_dir: "${{ repo.directory }}"
env:
PYTHONPATH: "${{ repo.directory }}:${{ env.PYTHONPATH }}"
- Note: use “;” instead of “:” on Windows.
Important caveats
- If your Dask cluster is external (address=...), those remote Dask workers do not inherit env from the Prefect worker. You must also ensure the repo is installed or available on those Dask workers (install the package there or mount the same path and set PYTHONPATH on the Dask worker side).
- Keep all flows, tasks, and hooks at module top-level; avoid closures/non-picklable globals at import time.
- If you’re using write_on_commit=False
anywhere, note that commit hooks won’t fire for those transactions by design; rollback hooks still require the failure to occur inside the with transaction(...)
block.
Docs
- Customize job variables: Customize job variables
- Steps you can use: git_clone
, set_working_directory
, pip_install_requirements
, run_shell_script
If you can share whether your Dask cluster is local-in-worker vs external, I can suggest the exact setup (install vs PYTHONPATH) for that topology.Santiago Gutierrez
10/16/2025, 8:49 AMMarvin
10/16/2025, 8:49 AMMarvin
10/16/2025, 8:50 AMdeployments:
- name: my-deployment
entrypoint: my_package/flows.py:my_flow
pull:
- prefect.deployments.steps.git_clone:
id: repo
repository: <https://gitlab.com/my_package.git>
branch: my_feature_branch
- prefect.deployments.steps.set_working_directory:
directory: "${{ repo.directory }}"
- prefect.deployments.steps.utility.pip_install_requirements:
directory: "${{ repo.directory }}"
requirements_file: requirements.txt
- prefect.deployments.steps.utility.run_shell_script:
# editable install so the installed package points to this branch/clone
script: "pip install -e ."
work_pool:
name: my-pool
job_variables:
# optional but helpful for your code to know where the repo lives
working_dir: "${{ repo.directory }}"
env:
REPO_DIR: "${{ repo.directory }}"
Then in code, just import your package normally and keep hooks as plain functions:
from prefect.transactions import Transaction
@save_zip.on_rollback
def cleanup(txn: Transaction) -> None:
...
@save_zip.on_commit
def committed(txn: Transaction) -> None:
...
Notes
- Editable install creates a .pth pointing to the clone. If your Python environment is on a shared filesystem (common on HPC) the Dask workers will pick it up. Ensure workers use the same environment (modules/conda) as the Prefect process.
Option B: set PYTHONPATH for PBS-launched Dask workers
- Use repo directory from the git_clone step and export it in the Dask job script via dask-jobqueue’s env_extra.
- Keep editable install if you also want entry points/console scripts; otherwise PYTHONPATH alone is enough for imports.
prefect.yaml
deployments:
- name: my-deployment
entrypoint: my_package/flows.py:my_flow
pull:
- prefect.deployments.steps.git_clone:
id: repo
repository: <https://gitlab.com/my_package.git>
branch: my_feature_branch
- prefect.deployments.steps.set_working_directory:
directory: "${{ repo.directory }}"
work_pool:
name: my-pool
job_variables:
working_dir: "${{ repo.directory }}"
env:
# expose the dynamic repo path to your flow code
REPO_DIR: "${{ repo.directory }}"
# also make it available to the Prefect process; helpful for LocalCluster cases
PYTHONPATH: "${{ repo.directory }}:${{ env.PYTHONPATH }}"
Flow code: tell the PBSCluster to export PYTHONPATH on worker jobs
import os
from prefect import flow
from prefect_dask import DaskTaskRunner
REPO_DIR = os.environ["REPO_DIR"]
task_runner = DaskTaskRunner(
cluster_class="dask_jobqueue.PBSCluster",
cluster_kwargs={
# ... your PBS settings: cores, memory, walltime, queue, etc.
"env_extra": [f"export PYTHONPATH={REPO_DIR}:$PYTHONPATH"],
# optionally:
# "job_script_prologue": ["module load python/3.10", "source /path/to/venv/bin/activate"],
},
)
@flow(task_runner=task_runner)
def my_flow(...):
...
Which should you choose?
- If your Python environment is truly identical across nodes and on shared storage, Option A (pip install -e .) is simplest.
- If you want to avoid touching site-packages or your environment isn’t shared, Option B (PYTHONPATH via env_extra) is safe and explicit.
- You can also combine them: install requirements and your package, and still export PYTHONPATH for belt-and-suspenders.
Final remindersMarvin
10/16/2025, 8:50 AMSantiago Gutierrez
10/16/2025, 8:26 PMrepo.directory
. Using Option B, can I add any extra paths when declaring my job_variables
?Marvin
10/16/2025, 8:26 PMMarvin
10/16/2025, 8:35 PMenv
field in job_variables
. You can set PYTHONPATH
to include multiple paths by joining them with your OS path separator (colon on Linux/macOS, semicolon on Windows). There isn’t a built-in env_extra
field in the default worker templates—use env
.
Given your code lives under repo.directory
, you can add that (and any other folders) explicitly. Examples:
- prefect.yaml
deployments:
- name: my-deployment
entrypoint: repo/directory/flows.py:my_flow
work_pool:
name: my-docker-or-k8s-pool
job_variables:
env:
PYTHONPATH: "./repo/directory:./another/path"
- Python API
flow.from_source("<https://github.com/org/repo>", entrypoint="repo/directory/flows.py:my_flow").deploy(
name="my-deployment",
work_pool_name="my-docker-or-k8s-pool",
job_variables={
"env": {
"PYTHONPATH": "./repo/directory:./another/path"
}
},
)
Notes:
- Relative paths are resolved from the flow’s working directory inside the run environment. If you clone your repo at runtime, ensure the working directory is the repo root (or set entrypoint
relative to wherever you land).
- If you’d like to inspect the default worker template to see the env
key, you can:
prefect work-pool get-default-base-job-template --type docker
prefect work-pool get-default-base-job-template --type kubernetes
Docs:
- Customize job variables
- Docker workers
- Kubernetes workers
If you were relying on an env_extra
pattern from an older example or a custom base job template, you can still use it—just make sure your base job template merges env_extra
into env
. By default, Prefect’s templates only expose env
.Santiago Gutierrez
10/16/2025, 8:44 PMprefect.yaml
specification. Particularly we ask the Prefect worker to clone a Gitlab repository to the work pool's working directory:
pull:
- prefect.deployments.steps.git_clone:
repository: <https://gitlab.com/my_package.git>
branch: my_feature_branch
And I am using a Dask cluster, local-in-workers that connect to a local PBS scheduler with access to NFS mounts across all compute nodes.Marvin
10/16/2025, 8:44 PMMarvin
10/16/2025, 8:50 PMjob_variables.env.PYTHONPATH
. Join multiple paths with :
on Linux/macOS (or ;
on Windows).
- Since you’re cloning at runtime with a pull
step, you cannot reference the clone path in job_variables
directly. Instead, set the working directory after the clone, then use relative paths like .
in PYTHONPATH
.
Example prefect.yaml pattern:
pull:
- prefect.deployments.steps.git_clone:
id: clone
repository: <https://gitlab.com/my_package.git>
branch: my_feature_branch
- prefect.deployments.steps.set_working_directory:
# point to the subdir that contains your flow code
directory: "{{ clone.directory }}/repo/directory"
deployments:
- name: my-dask-deployment
entrypoint: repo/directory/flows.py:my_flow
work_pool:
name: my-pool
job_variables:
env:
# include the current working dir plus any other shared libs
PYTHONPATH: ".:/nfs/shared/other_libs"
A few important notes for your Dask + PBS setup:
- The env
you set in Prefect job_variables
affects the flow-run process. It does not automatically propagate to Dask workers that PBS launches on other nodes.
- Since you have NFS across nodes, make sure the repo clone happens into a shared path if you want workers to import from it. For a Process work pool, you can set a shared working_dir
so the clone lands on NFS:
work_pool:
name: my-process-pool
job_variables:
working_dir: "/nfs/shared/prefect-workdir"
env:
PYTHONPATH: ".:/nfs/shared/other_libs"
- Also configure your Dask cluster to export the same PYTHONPATH
to workers. With dask-jobqueue’s PBSCluster, use `env_extra`:
from prefect_dask.task_runners import DaskTaskRunner
task_runner = DaskTaskRunner(
cluster_class="dask_jobqueue.PBSCluster",
cluster_kwargs={
# ... your PBS options ...
"env_extra": [
"export PYTHONPATH=/nfs/shared/prefect-workdir/repo/directory:/nfs/shared/other_libs:$PYTHONPATH"
],
},
)
Related docs:
- Prefect: Customize job variables
- Dask Jobqueue: dask-jobqueue docs
Quick clarifier so I can tailor this precisely:
- Which work pool type are you using (Process, Docker, or Kubernetes)?
- Do you have a specific NFS path you want to clone into? If so, we can set that as the working_dir
(Process) or rely on set_working_directory
(Docker/K8s) and adjust PYTHONPATH
accordingly.Santiago Gutierrez
10/17/2025, 2:13 AMpull:
- prefect.deployments.steps.git_clone:
id: clone
repository: <https://gitlab.com/myrepo.git>
branch: <my_branch>
deployments:
- name: my_deployment
work_pool:
name: my_pool
job_variables:
env:
REPO_PATH: "{{ clone.directory }}"
But I am getting this error when deploying via CLI:
prefect deploy --prefect-file deployments/my_deployment.yaml -n s2import_prod
| WARNING | prefect.utilities.templating - Value for placeholder '{{ clone.directory }}' not found in provided values. Please ensure that the placeholder is spelled correctly and that the corresponding value is provided.
How can use the "{{ clone.directory }}"
correctly in the deployments section?Marvin
10/17/2025, 2:13 AMMarvin
10/17/2025, 2:13 AM{{ clone.directory }}
is a pull-step output that only exists at flow run time on the worker. The deployments
section is rendered at deploy time, so it can’t see outputs from pull
steps.
What works vs. not:
- Works: Referencing {{ clone.directory }}
in subsequent pull steps (e.g., set_working_directory
) because those run after git_clone
at runtime.
- Doesn’t work: Using {{ clone.directory }}
in the deployments:
section (including job_variables
) because that’s evaluated during prefect deploy
.
Practical patterns you can use:
Option 1: Set working directory to the cloned repo, then use relative paths in env
pull:
- prefect.deployments.steps.git_clone:
id: clone
repository: <https://gitlab.com/myrepo.git>
branch: my_branch
- prefect.deployments.steps.set_working_directory:
directory: "{{ clone.directory }}"
deployments:
- name: my_deployment
work_pool:
name: my_pool
job_variables:
env:
# now "." resolves to the cloned repo directory at runtime
REPO_PATH: "."
PYTHONPATH: ".:/nfs/shared/other_libs"
Option 2: Clone into a known NFS path, then reference that static path in env
- If you can control the clone destination (either via a directory
argument on the clone step, or by an additional step to move it), choose a stable NFS path that all nodes can see.
pull:
- prefect.deployments.steps.git_clone:
id: clone
repository: <https://gitlab.com/myrepo.git>
branch: my_branch
# if supported, specify a target directory everyone can reach
directory: "/nfs/shared/repos/myrepo"
- prefect.deployments.steps.set_working_directory:
directory: "/nfs/shared/repos/myrepo"
deployments:
- name: my_deployment
work_pool:
name: my_pool
job_variables:
env:
REPO_PATH: "/nfs/shared/repos/myrepo"
If directory
isn’t supported in your setup, you can add a shell step to move the clone into your target path, then set_working_directory
there.
Dask + PBS note:
- The env
in job_variables
affects the Prefect flow-run process only. It will not automatically propagate to the Dask workers PBS launches. If your tasks import from REPO_PATH
, export the same value in your Dask/PBS worker environment (e.g., via env_extra
in your PBSCluster config) and make sure it points to a shared NFS location.
References:
- prefect.yaml and templating stages
- Pull steps (git_clone, set_working_directory)
- Customize job variables
If you share your work pool type (Process/Docker/Kubernetes), I can suggest the most robust way to make the clone path and env consistent across your Dask workers.Santiago Gutierrez
10/17/2025, 2:21 AM{{ clone.directory }}
at flow run-time programmatically in my flow script?Marvin
10/17/2025, 2:21 AMMarvin
10/17/2025, 2:24 AM{{ clone.directory }}
directly from your flow code. Pull step outputs only exist within the prefect.yaml templating context and only for other pull steps.
Workable patterns to achieve the same outcome:
1) Set the working directory to the clone, then use Path.cwd() in your flow
- In prefect.yaml:
pull:
- prefect.deployments.steps.git_clone:
id: clone
repository: <https://gitlab.com/myrepo.git>
branch: my_branch
- prefect.deployments.steps.set_working_directory:
directory: "{{ clone.directory }}"
- In your flow:
from pathlib import Path
from prefect import flow
@flow
def my_flow():
repo_dir = Path.cwd()
print(f"Repo cloned at: {repo_dir}")
2) Persist the clone path to a file during pull, then read it in your flow
- In prefect.yaml:
pull:
- prefect.deployments.steps.git_clone:
id: clone
repository: <https://gitlab.com/myrepo.git>
branch: my_branch
- prefect.deployments.steps.utility.run_shell_script:
id: write-clone-dir
script: |
mkdir -p .prefect_meta
echo "{{ clone.directory }}" > .prefect_meta/clone_dir.txt
- In your flow:
from pathlib import Path
from prefect import flow
def get_clone_dir():
p = Path(".prefect_meta/clone_dir.txt")
return p.read_text().strip() if p.exists() else str(Path.cwd())
@flow
def my_flow():
clone_dir = get_clone_dir()
print(f"Repo cloned at: {clone_dir}")
3) Clone to a known shared path and just use that in your code
- If you need all workers to see the same path (e.g., NFS), move the clone into a stable directory in a pull step, then either set_working_directory
there or reference that path directly in your flow.
Why this is necessary:
- Pull steps run on the worker before your flow starts. Their outputs (like clone.directory
) are only available for templating within the pull section, not exposed via prefect.runtime
or environment variables for the flow.
References:
- prefect.yaml and templating
- Pull steps (git_clone, set_working_directory)
- Runtime contextSantiago Gutierrez
10/17/2025, 3:15 AMpull:
- prefect.deployments.steps.git_clone:
id: clone
repository: <https://gitlab.com/my_repo.git>
branch: my_branch
- prefect.deployments.steps.set_working_directory:
directory: "{{ clone.directory }}"
Upon launching a deployment run, the worker is able to clone the remote repo but then errors out with:
FileNotFoundError: [Errno 2] No such file or directory: <{{ clone.directory }}>
No issues with permisisions as the created directory for the clone has drwxrwxr-x
What am I doing wrong?Marvin
10/17/2025, 3:15 AMMarvin
10/17/2025, 3:18 AMid: clone
and you reference {{ clone.directory }}
— that’s correct.
- Quote the template string:
- Use directory: "{{ clone.directory }}"
(double quotes). Avoid wrapping in angle brackets; if your YAML has <{{ clone.directory }}>
, remove the angle brackets.
- Verify you’re on a recent Prefect 3.x:
- There have been fixes in the step templating flow; upgrading to the latest 3.x is recommended.
- Keep steps in order:
- Steps run sequentially. If you want to be extra explicit, you can add requires: [clone]
to the second step.
A minimal working example
pull:
- prefect.deployments.steps.git_clone:
id: clone
repository: <https://gitlab.com/jrsrp/sys/workflows/overnight_prefect.git>
branch: "25-s2import-implement-rollback-hook-to-avoid-incomplete-qvf-imports"
- prefect.deployments.steps.set_working_directory:
# This must be quoted; do not wrap with angle brackets
directory: "{{ clone.directory }}"
# Optional but explicit:
requires: [clone]
If you want to confirm the value being rendered, add a quick debug step between them:
- prefect.deployments.steps.utility.run_shell_script:
id: debug
script: |
echo "clone dir is: {{ clone.directory }}"
requires: [clone]
If the debug echo shows the placeholder instead of a concrete path, templating isn’t happening. In that case:
- Double-check your YAML file for stray angle brackets or escaping.
- Ensure there’s no duplicate top-level pull:
vs per-deployment pull:
overriding each other.
- Share your Prefect version (prefect version
) — I can advise if an upgrade is needed.
Docs for reference:
- Prefect YAML and templating: https://docs-3.prefect.io/v3/how-to-guides/deployments/prefect-yaml
- Pull steps (git_clone, set_working_directory): https://docs-3.prefect.io/v3/api-ref/python/prefect-deployments-steps-pullSantiago Gutierrez
10/17/2025, 4:53 AMPYTHONPATH
allows Dask to import the flow code as a module and thereby overcoming the Dask deserialization issue and being able to declare the transaction hooks in the correct way, without needing to re-wrap the txn hooks as an additional @task
:
LOCK_DIR = Path("/nfs/locks")
@task(tags=["some-tag"], log_prints=True, timeout_seconds=4500)
def save_zip(zip_path: Path):
logger = get_run_logger()
txn_key = f"esa-id:{zip_path.stem}"
final_path = Path("/nfs/out") / f"{zip_path.stem}.zip"
tmp_path = final_path.with_suffix(".zip.tmp")
# Heavy work outside the transaction; produce tmp_path
# ... compute & write to tmp_path ...
with transaction(
key=txn_key,
isolation_level=IsolationLevel.SERIALIZABLE,
store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=LOCK_DIR)),
) as txn:
if txn.is_committed():
logger.warning("Already committed for %s", txn_key)
if tmp_path.exists():
tmp_path.unlink(missing_ok=True)
return
os.replace(tmp_path, final_path) # atomic rename on POSIX
@save_zip.on_commit
def committed(txn: Transaction):
# e.g. emit a log/event or clean up staging artifacts
print(f"Committed transaction: {txn.key}")
@save_zip.on_rollback
def cleanup(txn: Transaction):
# e.g. clean up temp files you can derive from txn.key
print(f"Rolled back transaction: {txn.key}")
@flow(task_runner=DaskTaskRunner())
def my_flow(paths: list[Path]):
for p in paths:
save_zip.submit(p)
But the same nullable effect still holds: the cleanup
and comitted
hooks do not run, i.e. nothing happens, because it seems they need to be scheduled as tasks when working behind a DaskTaskRunner
cluster. . . How can make them work with a Dask cluster?Marvin
10/17/2025, 4:53 AMMarvin
10/17/2025, 4:57 AMtxn.is_committed()
returns True, you return early. In that path no new commit happens and no hooks run. Test with a fresh txn_key
or force a rollback (raise an exception inside the with transaction(...)
block) to verify hooks.
- Use Prefect’s logger inside hooks rather than print
- Your prints run on the Dask worker and may not show up in Prefect logs. Use get_run_logger()
so they land in the task logs.
- Keep hook functions at module-level (importable by workers)
- You’ve already addressed importability via PYTHONPATH; just make sure hooks aren’t nested/closures.
Example update using Prefect logger:
from prefect import task, flow, get_run_logger, Transaction
from prefect.transactions import transaction, IsolationLevel
from prefect.results import ResultStore
from pathlib import Path
import os
LOCK_DIR = Path("/nfs/locks")
@task(tags=["some-tag"], log_prints=True, timeout_seconds=4500)
def save_zip(zip_path: Path):
logger = get_run_logger()
txn_key = f"esa-id:{zip_path.stem}"
final_path = Path("/nfs/out") / f"{zip_path.stem}.zip"
tmp_path = final_path.with_suffix(".zip.tmp")
# ... produce tmp_path ...
with transaction(
key=txn_key,
isolation_level=IsolationLevel.SERIALIZABLE,
store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=LOCK_DIR)),
) as txn:
if txn.is_committed():
logger.warning("Already committed for %s", txn_key)
if tmp_path.exists():
tmp_path.unlink(missing_ok=True)
return
os.replace(tmp_path, final_path) # atomic rename on POSIX
<http://logger.info|logger.info>("Prepared final file for %s; leaving transaction block to commit", txn_key)
@save_zip.on_commit
def committed(txn: Transaction):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Committed transaction: {txn.key}")
@save_zip.on_rollback
def cleanup(txn: Transaction):
logger = get_run_logger()
logger.warning(f"Rolled back transaction: {txn.key}")
Tips if it still looks quiet:
- Try a minimal, fresh-key test to prove hooks run:
@task(log_prints=True)
def t():
with transaction(key="hook-test-123", store=ResultStore(lock_manager=FileSystemLockManager(lock_files_directory=LOCK_DIR))):
pass
@t.on_commit
def _c(txn: Transaction):
get_run_logger().info(f"Commit hook fired for {txn.key}")
- Make sure the Dask workers have the same Prefect version and can import your module (you’ve set PYTHONPATH — good). If you’re creating workers via PBS, ensure the PBS job passes that env (e.g., jobqueue env_extra
).
- Alternative registration: you can also embed hooks via options so they’re serialized with the task object:
def _commit(txn: Transaction):
get_run_logger().info(f"Commit hook fired for {txn.key}")
def _rollback(txn: Transaction):
get_run_logger().warning(f"Rollback hook fired for {txn.key}")
save_zip = save_zip.with_options(on_commit=[_commit], on_rollback=[_rollback])
Docs for deeper reference:
- Transactions (commit/rollback hooks)
- Runtime context and logging
- Using DaskTaskRunner
If you try the fresh txn_key
and logger-based hooks and still don’t see logs, share a snippet of the task run logs (and your Prefect/Dask versions) and I’ll help dig in.Santiago Gutierrez
10/17/2025, 5:21 AMdef _commit(txn: Transaction):
get_run_logger().info(f"Commit hook fired for {txn.key}")
def _rollback(txn: Transaction):
get_run_logger().warning(f"Rollback hook fired for {txn.key}")
save_zip = save_zip.with_options(on_commit=[_commit], on_rollback=[_rollback])
What version of Prefect where they introduced?Marvin
10/17/2025, 5:21 AMMarvin
10/17/2025, 5:23 AMwith_options(on_commit=..., on_rollback=...)
is not supported in Prefect 3.x. There isn’t a version where those transaction hooks were added to `with_options`; only state hooks like on_completion
and on_failure
are supported there.
Supported ways to register transaction hooks in 3.4.1 (and current 3.x):
- Use the task’s decorator methods
@save_zip.on_commit
def _commit(txn: Transaction):
get_run_logger().info(f"Commit hook fired for {txn.key}")
@save_zip.on_rollback
def _rollback(txn: Transaction):
get_run_logger().warning(f"Rollback hook fired for {txn.key}")
- Keep these hook functions at module scope so Dask workers can import them.
References:
- Transactions and hooks: Transactions (commit/rollback hooks)
- Task options and state hooks supported by with_options: State change hooks