Robert Denham
06/28/2025, 12:21 AMfrom prefect import flow, task, get_run_logger
import random
from prefect.futures import wait
from dask_jobqueue import PBSCluster
from prefect_dask import DaskTaskRunner
@task(retries=3)
def task1_logic(url):
pval = random.random()
if pval < 0.5:
raise ValueError("some failure in step 1")
else:
return pval
@task(retries=4)
def task2_logic(pval):
pval2 = random.random()
if pval2 < 0.5:
raise ValueError("some failure in step 2")
else:
return "ok"
@flow(task_runner=DaskTaskRunner(),
persist_result=True)
def process_urls():
urls = list(range(10))
successful_results = []
failed_tasks = []
logger = get_run_logger()
# first submit the stage 1
task1_futs = task1_logic.map(urls)
task2_futs = task2_logic.map(task1_futs)
done, not_done = wait(task2_futs)
successes = 0
failures = 0
for donfut in done:
if donfut.state.is_completed():
print(f"state of task is complete")
print(f"result is {donfut.result()}")
successes += 1
else:
print(f"task is incomplete {donfut.task_run_id}")
print(f"finished in {donfut.state}")
failures += 1
print(f"Done: {len(done)}")
print(f"Not Done: {len(not_done)}")
print(f"Failed {failures}")
print(f"Successes {successes}")
# Usage
if __name__ == "__main__":
process_urls()
does this look okMarvin
06/28/2025, 12:21 AMMarvin
06/28/2025, 12:21 AM.map()
for concurrent execution
2. Proper use of wait()
to collect futures
3. Good state checking with is_completed()
4. Retries on both tasks
5. Using DaskTaskRunner for distributed execution
6. Proper result tracking with success/failure counts
A few small suggestions to make it even better:
python
from prefect import flow, task, get_run_logger
import random
from prefect.futures import wait
from dask_jobqueue import PBSCluster
from prefect_dask import DaskTaskRunner
from typing import List, Any
@task(retries=3)
def task1_logic(url: Any) -> float:
pval = random.random()
if pval < 0.5: # Note: using < instead of <
raise ValueError("some failure in step 1")
return pval
@task(retries=4)
def task2_logic(pval: float) -> str:
pval2 = random.random()
if pval2 < 0.5:
raise ValueError("some failure in step 2")
return "ok"
@flow(task_runner=DaskTaskRunner(),
persist_result=True)
def process_urls(urls: List[Any] = None) -> dict:
if urls is None:
urls = list(range(10))
logger = get_run_logger()
# Submit all tasks
task1_futs = task1_logic.map(urls)
task2_futs = task2_logic.map(task1_futs)
# Wait for completion
done, not_done = wait(task2_futs)
# Track results
successes = 0
failures = 0
failed_tasks = []
for fut in done:
if fut.state.is_completed():
<http://logger.info|logger.info>(f"Task completed successfully: {fut.result()}")
successes += 1
else:
logger.error(f"Task failed: {fut.task_run_id} - State: {fut.state}")
failed_tasks.append({
"task_run_id": fut.task_run_id,
"state": str(fut.state),
"error": str(fut.state.result()) if fut.state.result() else None
})
failures += 1
return {
"total_tasks": len(done) + len(not_done),
"completed": len(done),
"not_done": len(not_done),
"successes": successes,
"failures": failures,
"failed_tasks": failed_tasks
}
if __name__ == "__main__":
results = process_urls()
print(f"Execution Summary:")
print(f"Total Tasks: {results['total_tasks']}")
print(f"Completed: {results['completed']}")
print(f"Not Done: {results['not_done']}")
print(f"Successes: {results['successes']}")
print(f"Failures: {results['failures']}")
if results['failed_tasks']:
print("\nFailed Tasks:")
for task in results['failed_tasks']:
print(f"- Task {task['task_run_id']}: {task['error']}")
Key improvements:
1. Added type hints for better code clarity
2. Made the URLs parameter configurable
3. Enhanced error tracking to capture specific failure information
4. Using logger instead of print statements
5. Structured return value with complete execution summary
6. Better error reporting in the main execution block
7. Fixed HTML entity (<
to <
)
The code will now give you more detailed information about failures while maintaining all the concurrent execution benefits of your original implementation. The structured return value makes it easier to process the results programmatically if needed.
Also, since you're using DaskTaskRunner, make sure your PBS cluster configuration matches your needs. You might want to add cluster configuration if you haven't already:
```python
@flow(task_runner=DaskTaskRunner(
cluster_Robert Denham
06/28/2025, 12:25 AM10:19:07.444 | ERROR | Task run 'task1_logic-2f1' - Finished in state Failed('Task run encountered an exception ValueError: some failure in step 1')
10:19:07.423 | INFO | Task run 'task1_logic-78f' - Finished in state Completed()
10:19:07.433 | INFO | Task run 'task1_logic-463' - Finished in state Completed()
but fails if I use
@flow(task_runner=DaskTaskRunner(cluster_class=PBSCluster,
cluster_kwargs=pbs_cluster_kwargs,
adapt_kwargs={"maximum": 20}),
persist_result=True)
All my tasks fail with this:
task is incomplete 92d8c3ac-a843-4746-9b12-80a2aec19dc0
finished in Pending()
task is incomplete 5e1f7ea5-f454-4bc6-bc32-3ebe7e23bb64
finished in Pending()
task is incomplete e202b5f8-8731-41e3-8599-bc824ff8018c
finished in Pending()
task is incomplete 26fe00c0-6024-4c4b-9699-320afad5ca5f
finished in Pending()
task is incomplete 5c499667-a6fe-4e76-a714-b1b219782aa0
finished in Pending()
task is incomplete 275f697b-2115-430c-b5c9-ab3b5ccdf91c
finished in Pending()
task is incomplete 6431ee14-88f9-4b6f-9429-4a9c503e812a
finished in Pending()
task is incomplete 668cca99-e7a3-4ac3-8168-19880c18385c
finished in Pending()
task is incomplete 63e42e53-22da-4422-9695-38c866860ce6
finished in Pending()
task is incomplete 07f8f069-f1fc-4940-a369-031a0de469d1
how do I fix this?Marvin
06/28/2025, 12:26 AMMarvin
06/28/2025, 12:26 AMpython
from prefect import flow, task, get_run_logger
import random
from prefect.futures import wait
from dask_jobqueue import PBSCluster
from prefect_dask import DaskTaskRunner
from distributed import Client
import os
# Define PBS cluster configuration
pbs_cluster_kwargs = {
"name": "dask-worker",
"queue": "your_queue_name", # Replace with your PBS queue name
"cores": 1,
"memory": "4GB",
"processes": 1,
"walltime": "01:00:00",
"resource_spec": "select=1:ncpus=1:mem=4gb",
# Add any necessary PBS directives
"job_extra": [
"-l select=1:ncpus=1:mem=4gb",
"-l walltime=01:00:00",
],
"local_directory": os.getenv("TMPDIR", "/tmp"), # Use TMPDIR if available
}
# Adaptation settings
adapt_kwargs = {
"minimum": 1,
"maximum": 20,
"interval": "30s"
}
@task(retries=3)
def task1_logic(url):
pval = random.random()
if pval < 0.5:
raise ValueError("some failure in step 1")
return pval
@task(retries=4)
def task2_logic(pval):
pval2 = random.random()
if pval2 < 0.5:
raise ValueError("some failure in step 2")
return "ok"
@flow(task_runner=DaskTaskRunner(
cluster_class=PBSCluster,
cluster_kwargs=pbs_cluster_kwargs,
adapt_kwargs=adapt_kwargs
),
persist_result=True)
def process_urls():
logger = get_run_logger()
# Log cluster information
client = Client.current()
<http://logger.info|logger.info>(f"Dask dashboard: {client.dashboard_link}")
<http://logger.info|logger.info>(f"Cluster status: {client.status}")
urls = list(range(10))
# Submit tasks
task1_futs = task1_logic.map(urls)
# Wait a bit for workers to come online if needed
client.wait_for_workers(1)
task2_futs = task2_logic.map(task1_futs)
done, not_done = wait(task2_futs)
successes = 0
failures = 0
for fut in done:
if fut.state.is_completed():
<http://logger.info|logger.info>(f"Task completed: {fut.result()}")
successes += 1
else:
logger.error(f"Task failed: {fut.task_run_id} - State: {fut.state}")
failures += 1
<http://logger.info|logger.info>(f"Done: {len(done)}")
<http://logger.info|logger.info>(f"Not Done: {len(not_done)}")
<http://logger.info|logger.info>(f"Failed: {failures}")
<http://logger.info|logger.info>(f"Successes: {successes}")
# Log final cluster status
<http://logger.info|logger.info>(f"Final cluster status: {client.status}")
if __name__ == "__main__":
process_urls()
Key changes to fix the Pending state issue:
1. Proper PBS Configuration: Make sure to adjust these values according to your cluster:
- `queue`: Your PBS queue name
- `resource_spec`: Match your cluster's resource specification format
- `job_extra`: Include any necessary PBS directives
- Adjust memory and cores based on your needs
2. Adaptation Settings: More detailed settings for worker scaling:
```python
adapt_kwargs = {
"minimum": 1, # At least one worker
"maximum": 20, # Maximum number of workers
"interval": "30s" # How often to check for scaling