<@ULVA73B9P> this is my modified example ```from ...
# ask-marvin
r
@Marvin this is my modified example
Copy code
from prefect import flow, task, get_run_logger
import random 
from prefect.futures import wait
from dask_jobqueue import PBSCluster
from prefect_dask import DaskTaskRunner

@task(retries=3)
def task1_logic(url):
    pval = random.random() 
    if pval < 0.5:
        raise ValueError("some failure in step 1")
    else:
        return pval 
    
@task(retries=4)
def task2_logic(pval):
    pval2 = random.random()
    if pval2 < 0.5:
        raise ValueError("some failure in step 2")
    else:
        return "ok"
@flow(task_runner=DaskTaskRunner(),
      persist_result=True)
def process_urls():
    urls = list(range(10))
    successful_results = []
    failed_tasks = []
    logger = get_run_logger()
    
    # first submit the stage 1
    task1_futs = task1_logic.map(urls)
    task2_futs = task2_logic.map(task1_futs)
    done, not_done = wait(task2_futs)
    successes = 0
    failures = 0
    for donfut in done:
        if donfut.state.is_completed():
            print(f"state of task is complete")
            print(f"result is {donfut.result()}")
            successes += 1
        else:
            print(f"task is incomplete {donfut.task_run_id}")
            print(f"finished in {donfut.state}")
            failures += 1
    print(f"Done: {len(done)}")
    print(f"Not Done: {len(not_done)}")
    print(f"Failed {failures}")
    print(f"Successes {successes}")


# Usage
if __name__ == "__main__":
    process_urls()
does this look ok
m
thought for 15.8 seconds
Yes, this looks like a good implementation! You're using several good practices here, including: 1. Using
.map()
for concurrent execution 2. Proper use of
wait()
to collect futures 3. Good state checking with
is_completed()
4. Retries on both tasks 5. Using DaskTaskRunner for distributed execution 6. Proper result tracking with success/failure counts A few small suggestions to make it even better:
Copy code
python
from prefect import flow, task, get_run_logger
import random 
from prefect.futures import wait
from dask_jobqueue import PBSCluster
from prefect_dask import DaskTaskRunner
from typing import List, Any

@task(retries=3)
def task1_logic(url: Any) -> float:
    pval = random.random() 
    if pval < 0.5:  # Note: using < instead of <
        raise ValueError("some failure in step 1")
    return pval 
    
@task(retries=4)
def task2_logic(pval: float) -> str:
    pval2 = random.random()
    if pval2 < 0.5:
        raise ValueError("some failure in step 2")
    return "ok"

@flow(task_runner=DaskTaskRunner(),
      persist_result=True)
def process_urls(urls: List[Any] = None) -> dict:
    if urls is None:
        urls = list(range(10))
        
    logger = get_run_logger()
    
    # Submit all tasks
    task1_futs = task1_logic.map(urls)
    task2_futs = task2_logic.map(task1_futs)
    
    # Wait for completion
    done, not_done = wait(task2_futs)
    
    # Track results
    successes = 0
    failures = 0
    failed_tasks = []
    
    for fut in done:
        if fut.state.is_completed():
            <http://logger.info|logger.info>(f"Task completed successfully: {fut.result()}")
            successes += 1
        else:
            logger.error(f"Task failed: {fut.task_run_id} - State: {fut.state}")
            failed_tasks.append({
                "task_run_id": fut.task_run_id,
                "state": str(fut.state),
                "error": str(fut.state.result()) if fut.state.result() else None
            })
            failures += 1

    return {
        "total_tasks": len(done) + len(not_done),
        "completed": len(done),
        "not_done": len(not_done),
        "successes": successes,
        "failures": failures,
        "failed_tasks": failed_tasks
    }

if __name__ == "__main__":
    results = process_urls()
    print(f"Execution Summary:")
    print(f"Total Tasks: {results['total_tasks']}")
    print(f"Completed: {results['completed']}")
    print(f"Not Done: {results['not_done']}")
    print(f"Successes: {results['successes']}")
    print(f"Failures: {results['failures']}")
    if results['failed_tasks']:
        print("\nFailed Tasks:")
        for task in results['failed_tasks']:
            print(f"- Task {task['task_run_id']}: {task['error']}")
Key improvements: 1. Added type hints for better code clarity 2. Made the URLs parameter configurable 3. Enhanced error tracking to capture specific failure information 4. Using logger instead of print statements 5. Structured return value with complete execution summary 6. Better error reporting in the main execution block 7. Fixed HTML entity (
<
to
<
) The code will now give you more detailed information about failures while maintaining all the concurrent execution benefits of your original implementation. The structured return value makes it easier to process the results programmatically if needed. Also, since you're using DaskTaskRunner, make sure your PBS cluster configuration matches your needs. You might want to add cluster configuration if you haven't already: ```python @flow(task_runner=DaskTaskRunner( cluster_
r
@Marvin this works k if I use DaskTaskRunner(), I get output like
Copy code
10:19:07.444 | ERROR   | Task run 'task1_logic-2f1' - Finished in state Failed('Task run encountered an exception ValueError: some failure in step 1')
10:19:07.423 | INFO    | Task run 'task1_logic-78f' - Finished in state Completed()
10:19:07.433 | INFO    | Task run 'task1_logic-463' - Finished in state Completed()
but fails if I use
Copy code
@flow(task_runner=DaskTaskRunner(cluster_class=PBSCluster, 
                                 cluster_kwargs=pbs_cluster_kwargs,
                                 adapt_kwargs={"maximum": 20}),
      persist_result=True)
All my tasks fail with this:
Copy code
task is incomplete 92d8c3ac-a843-4746-9b12-80a2aec19dc0
finished in Pending()
task is incomplete 5e1f7ea5-f454-4bc6-bc32-3ebe7e23bb64
finished in Pending()
task is incomplete e202b5f8-8731-41e3-8599-bc824ff8018c
finished in Pending()
task is incomplete 26fe00c0-6024-4c4b-9699-320afad5ca5f
finished in Pending()
task is incomplete 5c499667-a6fe-4e76-a714-b1b219782aa0
finished in Pending()
task is incomplete 275f697b-2115-430c-b5c9-ab3b5ccdf91c
finished in Pending()
task is incomplete 6431ee14-88f9-4b6f-9429-4a9c503e812a
finished in Pending()
task is incomplete 668cca99-e7a3-4ac3-8168-19880c18385c
finished in Pending()
task is incomplete 63e42e53-22da-4422-9695-38c866860ce6
finished in Pending()
task is incomplete 07f8f069-f1fc-4940-a369-031a0de469d1
how do I fix this?
m
thought for 16.3 seconds
This issue typically occurs when the PBS cluster isn't properly initialized or can't schedule the jobs. Let's fix the PBS cluster configuration:
Copy code
python
from prefect import flow, task, get_run_logger
import random 
from prefect.futures import wait
from dask_jobqueue import PBSCluster
from prefect_dask import DaskTaskRunner
from distributed import Client
import os

# Define PBS cluster configuration
pbs_cluster_kwargs = {
    "name": "dask-worker",
    "queue": "your_queue_name",  # Replace with your PBS queue name
    "cores": 1,
    "memory": "4GB",
    "processes": 1,
    "walltime": "01:00:00",
    "resource_spec": "select=1:ncpus=1:mem=4gb",
    # Add any necessary PBS directives
    "job_extra": [
        "-l select=1:ncpus=1:mem=4gb",
        "-l walltime=01:00:00",
    ],
    "local_directory": os.getenv("TMPDIR", "/tmp"),  # Use TMPDIR if available
}

# Adaptation settings
adapt_kwargs = {
    "minimum": 1,
    "maximum": 20,
    "interval": "30s"
}

@task(retries=3)
def task1_logic(url):
    pval = random.random() 
    if pval < 0.5:
        raise ValueError("some failure in step 1")
    return pval 
    
@task(retries=4)
def task2_logic(pval):
    pval2 = random.random()
    if pval2 < 0.5:
        raise ValueError("some failure in step 2")
    return "ok"

@flow(task_runner=DaskTaskRunner(
        cluster_class=PBSCluster,
        cluster_kwargs=pbs_cluster_kwargs,
        adapt_kwargs=adapt_kwargs
      ),
      persist_result=True)
def process_urls():
    logger = get_run_logger()
    
    # Log cluster information
    client = Client.current()
    <http://logger.info|logger.info>(f"Dask dashboard: {client.dashboard_link}")
    <http://logger.info|logger.info>(f"Cluster status: {client.status}")
    
    urls = list(range(10))
    
    # Submit tasks
    task1_futs = task1_logic.map(urls)
    
    # Wait a bit for workers to come online if needed
    client.wait_for_workers(1)
    
    task2_futs = task2_logic.map(task1_futs)
    done, not_done = wait(task2_futs)
    
    successes = 0
    failures = 0
    for fut in done:
        if fut.state.is_completed():
            <http://logger.info|logger.info>(f"Task completed: {fut.result()}")
            successes += 1
        else:
            logger.error(f"Task failed: {fut.task_run_id} - State: {fut.state}")
            failures += 1
    
    <http://logger.info|logger.info>(f"Done: {len(done)}")
    <http://logger.info|logger.info>(f"Not Done: {len(not_done)}")
    <http://logger.info|logger.info>(f"Failed: {failures}")
    <http://logger.info|logger.info>(f"Successes: {successes}")
    
    # Log final cluster status
    <http://logger.info|logger.info>(f"Final cluster status: {client.status}")

if __name__ == "__main__":
    process_urls()
Key changes to fix the Pending state issue: 1. Proper PBS Configuration: Make sure to adjust these values according to your cluster: - `queue`: Your PBS queue name - `resource_spec`: Match your cluster's resource specification format - `job_extra`: Include any necessary PBS directives - Adjust memory and cores based on your needs 2. Adaptation Settings: More detailed settings for worker scaling: ```python adapt_kwargs = { "minimum": 1, # At least one worker "maximum": 20, # Maximum number of workers "interval": "30s" # How often to check for scaling