Hi :slightly_smiling_face: I'm facing an error on ...
# prefect-community
t
Hi 🙂 I'm facing an error on an HPC cluster with my hello_world only with prefect 2:
Here is my functionnal hello_world with prefect 1 :
Copy code
import time

from dask_jobqueue import PBSCluster
from prefect.executors import DaskExecutor

from prefect import Flow, task

@task
def times():
  return [1, 5, 10]

@task
def sleep(x):
  print(f'sleeping for {x}')
  time.sleep(x)
  print(f'done sleeping for {x}')
  return x


with Flow("mapping-test") as flow:
  sleep.map(sleep.map(times))


if __name__ == "__main__":
  flow.run(executor=DaskExecutor(
    cluster_class=PBSCluster,
    cluster_kwargs={
        "cores": 1,
        "memory": '24GB',
        "queue": 'sequentiel',
        "walltime": '00:05:00',
        "interface": 'ib0',
        "n_workers": 1
      }
  ))
launched with
python myhelloworld.py
, i got an output file dask_worker.o**** with my prints, the qsub was done well by dask on the PBS cluster and if i do the same helloworld with prefect 2 :
Copy code
import time

from prefect import flow, task
from prefect.task_runners import DaskTaskRunner


@task
def times():
    return [1, 5, 10]


@task
def sleep(x):
    print(f'sleeping for {x}')
    time.sleep(x)
    print(f'done sleeping for {x}')
    return x

if __name__ == "__main__":

    @flow(task_runner=DaskTaskRunner(
        cluster_class="dask_jobqueue.PBSCluster",
        cluster_kwargs={
            "cores": 1,
            "memory": '24GB',
            "queue": 'sequentiel',
            "walltime": '00:05:00',
            "interface": 'ib0',
            "n_workers": 1
    
        }
    ))
    def hello_world_flow():
        sleep.map(sleep.map(times))
I have no output file at the location where i launch my python script, i have doubt that the qsub was really submitted or maybe there is an option that has been set in prefect 2 to set the output in another location ? I'm a bit lost 🙂
a
The map operator doesn't exist yet in Prefect 2.0. You could do:
Copy code
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner

@task
def my_mapped_task(x):
    return x

@task
def my_mapped_task_2(x):
    return x

@task
def list_task():
    return [1, 2, 3, 4, 5]

@flow(task_runner=task_runner=DaskTaskRunner(
        cluster_class="dask_jobqueue.PBSCluster",
        cluster_kwargs={
            "cores": 1,
            "memory": '24GB',
            "queue": 'sequentiel',
            "walltime": '00:05:00',
            "interface": 'ib0',
            "n_workers": 1
    
        }
    ))
def my_flow():
    for i in range(10):
        my_mapped_task(i)
and then, it's best to then call the flow within main:
Copy code
if __name__ == "__main__":
    my_flow()
t
my code is now :
Copy code
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner


@task
def my_mapped_task(x):
    print(x)

if __name__ == "__main__":

    @flow(task_runner=DaskTaskRunner(
        cluster_class="dask_jobqueue.PBSCluster",
        cluster_kwargs={
            "cores": 1,
            "memory": '24GB',
            "queue": 'sequentiel',
            "walltime": '00:05:00',
            "interface": 'ib0',
            "n_workers": 1

        }
    ))
    def hello_world_flow():
        for i in range(10):
            my_mapped_task(i)
But this is not better, i don't have any output file from dask
a
why do you keep defining your flow in main? 😄 can you try:
Copy code
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner


@task
def my_mapped_task(x):
    print(x)

@flow(task_runner=DaskTaskRunner(
    cluster_class="dask_jobqueue.PBSCluster",
    cluster_kwargs={
        "cores": 1,
        "memory": '24GB',
        "queue": 'sequentiel',
        "walltime": '00:05:00',
        "interface": 'ib0',
        "n_workers": 1

    }
))
def hello_world_flow():
    for i in range(10):
        my_mapped_task(i)

if __name__ == "__main__":
    hello_world_flow()
main is a gate to start some process - here to trigger a flow run from the flow function. Your flow shouldn't be in main - this way you couldn't pass it e.g. to DeploymentSpec what do you mean by output file from Dask?
by default, when you run this, you should see a folder "dask-worker-space" - is this what you're looking for?
t
the hello world worked with your code 🙂
a
Nice! 🙌
t
But where i'm supposed to see the print ?
In my file dask-worker.o***, i have
Copy code
distributed.nanny - INFO -         Start Nanny at: '<tcp://10.148.0.181:54336>'
distributed.worker - INFO -       Start worker at:   <tcp://10.148.0.181:35490>
distributed.worker - INFO -          Listening to:   <tcp://10.148.0.181:35490>
distributed.worker - INFO -          dashboard at:         10.148.0.181:40645
distributed.worker - INFO - Waiting to connect to:   <tcp://10.148.1.146:55485>
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                  22.35 GiB
distributed.worker - INFO -       Local Directory: /dev/shm/pbs.9835088.datarmor0/dask-worker-space/worker-apnm7ik1
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:   <tcp://10.148.1.146:55485>
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.core - INFO - Event loop was unresponsive in Worker for 12.74s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client.py:1195: UserWarning: No default storage has been set on the server. Using temporary local storage for results.
  warnings.warn(
13:25:49.297 | INFO    | Task run 'my_mapped_task-ca1f3dbb-0' - Finished in state Completed(None)
13:25:51.325 | INFO    | Task run 'my_mapped_task-ca1f3dbb-1' - Finished in state Completed(None)
13:25:53.381 | INFO    | Task run 'my_mapped_task-ca1f3dbb-2' - Finished in state Completed(None)
13:25:55.389 | INFO    | Task run 'my_mapped_task-ca1f3dbb-3' - Finished in state Completed(None)
13:25:57.393 | INFO    | Task run 'my_mapped_task-ca1f3dbb-4' - Finished in state Completed(None)
13:25:59.413 | INFO    | Task run 'my_mapped_task-ca1f3dbb-5' - Finished in state Completed(None)
13:26:01.417 | INFO    | Task run 'my_mapped_task-ca1f3dbb-6' - Finished in state Completed(None)
13:26:03.425 | INFO    | Task run 'my_mapped_task-ca1f3dbb-7' - Finished in state Completed(None)
13:26:05.441 | INFO    | Task run 'my_mapped_task-ca1f3dbb-8' - Finished in state Completed(None)
13:26:07.586 | INFO    | Task run 'my_mapped_task-ca1f3dbb-9' - Finished in state Completed(None)
distributed.dask_worker - INFO - Exiting on signal 15
But there is no sign of the prints
And if i try to split my hello world code into 2 file, in order to have a flow calling a flow calling a task ( who looks what i'm trying to achieve with my real code who is broken by now but it's better to do it with a hello world first ), i have hello_world.py who is :
Copy code
from prefect import flow

from prefect_graph.hello_world_task import hello_world_task_flow


@flow
def hello_world_flow():
    for i in range(10):
        hello_world_task_flow(i)


if __name__ == "__main__":
    hello_world_flow()
and hello_world_task.py who is :
Copy code
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner


@flow(task_runner=DaskTaskRunner(
    cluster_class="dask_jobqueue.PBSCluster",
    cluster_kwargs={
        "cores": 1,
        "memory": '24GB',
        "queue": 'sequentiel',
        "walltime": '00:05:00',
        "interface": 'ib0',
        "n_workers": 1

    }
))
def hello_world_task_flow(x):
        my_mapped_task(x)


@task
def my_mapped_task(x):
    print(x)
I got the same error in the dask-worker.o* a timeout error, for the 10 flow that are called
Copy code
distributed.nanny - INFO -         Start Nanny at: '<tcp://10.148.1.114:59860>'
distributed.worker - INFO -       Start worker at:   <tcp://10.148.1.114:37373>
distributed.worker - INFO -          Listening to:   <tcp://10.148.1.114:37373>
distributed.worker - INFO -          dashboard at:         10.148.1.114:44616
distributed.worker - INFO - Waiting to connect to:   <tcp://10.148.1.146:40619>
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                  22.35 GiB
distributed.worker - INFO -       Local Directory: /dev/shm/pbs.9835069.datarmor0/dask-worker-space/worker-0oganzzi
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:   <tcp://10.148.1.146:40619>
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.core - INFO - Event loop was unresponsive in Worker for 12.55s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.worker - WARNING - Compute Failed
Function:  begin_task_run
args:      ()
kwargs:    {'task': <prefect.tasks.Task object at 0x2aaab091ed90>, 'task_run': TaskRun(id=UUID('bd7c31c6-f577-49ee-a438-f7db41ec1069'), name='my_mapped_task-60f685bd-2', flow_run_id=UUID('9b7aee9e-6453-4e36-8518-cedfa012f375'), task_key='60f685bd73a2d907cce1732888a4657e', dynamic_key='2', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0), tags=[], state_id=UUID('63e4ad1a-d7e4-4b33-8631-ac205202d59d'), task_inputs={'x': []}, state_type=StateType.PENDING, run_count=0, expected_start_time=datetime.datetime(2022, 4, 6, 13, 23, 6, 309348, tzinfo=datetime.timezone.utc), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microseconds=61193), state=Pending(message=None, type=PENDING, result=None, task_run_id=bd7c31c6-f577-49ee-a438-f7db41ec1069)), 'parameters': {'x': 2}, 'wait_for': None, '
Exception: 'TimeoutError()'

Terminated
distributed.dask_worker - INFO - Exiting on signal 15
distributed.nanny - INFO - Closing Nanny at '<tcp://10.148.1.114:59860>'
distributed.nanny - INFO - Worker process 26133 was killed by signal 15
distributed.dask_worker - INFO - End worker
a
instead of prints, you need to use logs - you can use `get_run_logger`:
Copy code
from prefect import get_run_logger, task, flow

@task
def compute_something(x):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Computing: %d x 2 = %d", x, x * 2)
    time.sleep(2)
t
Ok, i will try with my 2 versions of the hello world to see whats going on 🙂
👍 1
The version with one file is ok, i see the log in my dask-worker output file
But for the other version, this is ok too i see the logs when the task of the flow of a flow are launched but it crash at the third/fourth/fifth sometime, i don't know why
here is what i see as error, this time this is at the second :
Copy code
13:55:00.356 | INFO    | prefect.engine - Created flow run 'rigorous-hog' for flow 'hello-world-flow'
13:55:00.356 | INFO    | Flow run 'rigorous-hog' - Using task runner 'ConcurrentTaskRunner'
13:55:01.420 | INFO    | Flow run 'rigorous-hog' - Created subflow run 'spectral-goldfish' for flow 'hello-world-task-flow'
13:55:01.422 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `dask_jobqueue.pbs.PBSCluster`
13:55:01.989 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at <http://10.148.1.145:8787/status>
13:55:02.960 | INFO    | Flow run 'spectral-goldfish' - Created task run 'my_mapped_task-60f685bd-0' for task 'my_mapped_task'
/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client.py:1195: UserWarning: No default storage has been set on the server. Using temporary local storage for results.
  warnings.warn(
13:55:29.286 | INFO    | Flow run 'spectral-goldfish' - Finished in state Completed('All states completed.')
13:55:30.308 | INFO    | Flow run 'rigorous-hog' - Created subflow run 'pink-bulldog' for flow 'hello-world-task-flow'
13:55:30.310 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `dask_jobqueue.pbs.PBSCluster`
13:55:30.753 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at <http://10.148.1.145:8787/status>
13:55:31.428 | INFO    | Flow run 'pink-bulldog' - Created task run 'my_mapped_task-60f685bd-1' for task 'my_mapped_task'
/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client.py:1195: UserWarning: No default storage has been set on the server. Using temporary local storage for results.
  warnings.warn(
13:56:02.864 | INFO    | Flow run 'pink-bulldog' - Finished in state Completed('All states completed.')
13:56:04.668 | INFO    | Flow run 'rigorous-hog' - Created subflow run 'offbeat-mamba' for flow 'hello-world-task-flow'
13:56:04.670 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `dask_jobqueue.pbs.PBSCluster`
13:56:04.975 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at <http://10.148.1.145:8787/status>
13:56:06.351 | INFO    | Flow run 'offbeat-mamba' - Created task run 'my_mapped_task-60f685bd-2' for task 'my_mapped_task'
/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client.py:1195: UserWarning: No default storage has been set on the server. Using temporary local storage for results.
  warnings.warn(
13:56:27.540 | ERROR   | Flow run 'offbeat-mamba' - Crash detected! Execution was interrupted by an unexpected exception.
13:56:27.965 | ERROR   | Flow run 'rigorous-hog' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/engine.py", line 456, in orchestrate_flow_run
    result = await run_sync_in_worker_thread(flow_call)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 52, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(context.run, call, cancellable=True)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/anyio/to_thread.py", line 28, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 818, in run_sync_in_worker_thread
    return await future
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 754, in run
    result = context.run(func, *args)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect_graph/hello_world.py", line 9, in hello_world_flow
    hello_world_task_flow(i)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/flows.py", line 314, in __call__
    return enter_flow_run_engine_from_flow_call(self, parameters)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/engine.py", line 114, in enter_flow_run_engine_from_flow_call
    return run_async_from_worker_thread(begin_run)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 63, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/anyio/from_thread.py", line 35, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 847, in run_async_from_thread
    return f.result()
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/concurrent/futures/_base.py", line 445, in result
    return self.__get_result()
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
    raise self._exception
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client.py", line 82, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/engine.py", line 369, in create_and_begin_subflow_run
    terminal_state = await orchestrate_flow_run(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/contextlib.py", line 670, in __aexit__
    raise exc_details[1]
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/contextlib.py", line 199, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/engine.py", line 866, in report_flow_run_crashes
    raise exc from None
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/engine.py", line 849, in report_flow_run_crashes
    yield
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/contextlib.py", line 653, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/contextlib.py", line 188, in __aexit__
    await self.gen.__anext__()
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/task_runners.py", line 158, in start
    self._started = False
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/contextlib.py", line 670, in __aexit__
    raise exc_details[1]
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/contextlib.py", line 653, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/distributed/deploy/cluster.py", line 474, in __aexit__
    await f
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/distributed/deploy/spec.py", line 414, in _close
    assert w.status == Status.closed, w.status
AssertionError: Status.running
/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client.py:1195: UserWarning: No default storage has been set on the server. Using temporary local storage for results.
  warnings.warn(
13:56:28.924 | ERROR   | Flow run 'rigorous-hog' - Finished in state Failed('Flow run encountered an exception.')
and if i look inside the qsub log of this tasks, she hasn't crashed in reality :
Copy code
Thus no job control in this shell.
distributed.nanny - INFO -         Start Nanny at: '<tcp://10.148.0.171:53913>'
distributed.worker - INFO -       Start worker at:   <tcp://10.148.0.171:48407>
distributed.worker - INFO -          Listening to:   <tcp://10.148.0.171:48407>
distributed.worker - INFO -          dashboard at:         10.148.0.171:52201
distributed.worker - INFO - Waiting to connect to:   <tcp://10.148.1.145:57021>
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                  22.35 GiB
distributed.worker - INFO -       Local Directory: /dev/shm/pbs.9835312.datarmor0/dask-worker-space/worker-q7ltl636
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:   <tcp://10.148.1.145:57021>
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.core - INFO - Event loop was unresponsive in Worker for 9.00s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
13:56:24.283 | INFO    | Task run 'my_mapped_task-60f685bd-2' - This is x : 2
/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client.py:1195: UserWarning: No default storage has been set on the server. Using temporary local storage for results.
  warnings.warn(
13:56:26.897 | INFO    | Task run 'my_mapped_task-60f685bd-2' - Finished in state Completed(None)
distributed.dask_worker - INFO - Exiting on signal 15
distributed.nanny - INFO - Closing Nanny at '<tcp://10.148.0.171:53913>'
Terminated
distributed.nanny - INFO - Worker process 925 was killed by signal 15
distributed.dask_worker - INFO - End worker
Seems strange
a
re this error:
Copy code
UserWarning: No default storage has been set on the server. Using temporary local storage for results.
you can create and set default storage using:
Copy code
prefect storage create
t
Thank you for all the help and this tips, warning gone 🙂 I'm searching now why my flow of tasks crash randomly on a task with a timeout when other ones are ok , maybe it come from the queue ( sequentiel ) i'm using on the pbs cluster, or from the config , or something else
I've tried a lot of conf, one seems to be better but only when i'm doing a flow of 10 task ( so 1 qsub to PBS cluster ) But this config doesn't work when i'm doing a flow of 10 flow who call a task ( so 10 qsub behind to my PBS cluster ) cf this trace :
Copy code
07:34:03.307 | INFO    | prefect.engine - Created flow run 'armored-whale' for flow 'hello-world-flow'
07:34:03.308 | INFO    | Flow run 'armored-whale' - Using task runner 'ConcurrentTaskRunner'
07:34:04.387 | INFO    | Flow run 'armored-whale' - Created subflow run 'shrewd-marmot' for flow 'hello-world-task-flow'
07:34:04.389 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `dask_jobqueue.pbs.PBSCluster`
07:34:04.615 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at <http://10.148.1.146:8787/status>
07:34:05.165 | INFO    | Flow run 'shrewd-marmot' - Created task run 'my_mapped_task-60f685bd-0' for task 'my_mapped_task'
07:34:17.860 | INFO    | Flow run 'shrewd-marmot' - Finished in state Completed('All states completed.')
07:34:18.871 | INFO    | Flow run 'armored-whale' - Created subflow run 'nifty-lizard' for flow 'hello-world-task-flow'
07:34:18.873 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `dask_jobqueue.pbs.PBSCluster`
07:34:18.949 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at <http://10.148.1.146:8787/status>
07:34:19.497 | INFO    | Flow run 'nifty-lizard' - Created task run 'my_mapped_task-60f685bd-1' for task 'my_mapped_task'
07:34:45.914 | INFO    | Flow run 'nifty-lizard' - Finished in state Completed('All states completed.')
07:34:46.799 | INFO    | Flow run 'armored-whale' - Created subflow run 'ambitious-sponge' for flow 'hello-world-task-flow'
07:34:46.801 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `dask_jobqueue.pbs.PBSCluster`
07:34:47.043 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at <http://10.148.1.146:8787/status>
07:34:47.589 | INFO    | Flow run 'ambitious-sponge' - Created task run 'my_mapped_task-60f685bd-2' for task 'my_mapped_task'
07:35:01.606 | ERROR   | Flow run 'ambitious-sponge' - Crash detected! Execution was interrupted by an unexpected exception.
So i'm trying to put a retries mechanism on my task but it doesn't work, here is what i do and nothing change, no retry is tried :
Copy code
@flow(task_runner=DaskTaskRunner(
    cluster_class="dask_jobqueue.PBSCluster",
    cluster_kwargs={
        "cores": 1,
        "memory": '24GB',
        "queue": 'mpi_4',
        "resource_spec": "select=4:ncpus=28:mem=120gb",
        "walltime": '00:05:00',
        "interface": 'ib0',
        "n_workers": 1

    }
))
def hello_world_task_flow(x):
        my_mapped_task(x)


@task(retries=10, retry_delay_seconds=5)
def my_mapped_task(x):
    logger = get_run_logger()
    <http://logger.info|logger.info>("This is x : %d", x)
    time.sleep(2)
Maybe i have to put a retry on the flow too ( which doesn't seems to be something do-able in prefect ) ? Because maybe it's not the task itself that crash, but the flow when it does the qsub to PBS so when it crash we never reach the task because crash occurs in the flow
a
I'm no HPC expert and I didn't understand your issue. Can you explain in simple terms what problem do you see here? So far I see no mapping / no parallelizable operation within
my_mapped_task
. Not much for PBSCluster to do? Maybe you can try this again (which I remember worked):
Copy code
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner


@task
def my_mapped_task(x):
    print(x)

@flow(task_runner=DaskTaskRunner(
    cluster_class="dask_jobqueue.PBSCluster",
    cluster_kwargs={
        "cores": 1,
        "memory": '24GB',
        "queue": 'sequentiel',
        "walltime": '00:05:00',
        "interface": 'ib0',
        "n_workers": 1

    }
))
def hello_world_flow():
    for i in range(10):
        my_mapped_task(i)

if __name__ == "__main__":
    hello_world_flow()
looks like you changed the queue and resource spec as compared to the last time so maybe this can be the issue?
t
The example you paste work, but it does one qsub to the HPC cluster, then start a dask worker inside the qsub and then launch the 10 task inside the dask worker The thing i want to test now is to do 10 qsub in order to launch 1 task in each to the HPC, i don't know if i'm clear
In order to do that, i've split my code in two file with two flows ( you can forgot the queue and ressource spec this is not the problem 🙂 )
Copy code
@flow(task_runner=DaskTaskRunner(
    cluster_class="dask_jobqueue.PBSCluster",
    cluster_kwargs={
        "cores": 1,
        "memory": '24GB',
        "queue": 'sequentiel',
        "walltime": '00:50:00',
        "interface": 'ib0',
        "n_workers": 1

    }
))
def hello_world_task_flow(x):
        my_mapped_task(x)


@task(retries=10, retry_delay_seconds=5)
def my_mapped_task(x):
    logger = get_run_logger()
    <http://logger.info|logger.info>("This is x : %d", x)
    time.sleep(2)
and
Copy code
from prefect import flow

from prefect_graph.hello_world_task import hello_world_task_flow


@flow
def hello_world_flow():
    for i in range(10):
        hello_world_task_flow(i)


if __name__ == "__main__":
    hello_world_flow()
But it crash randomly, between two task, and the last task was successfully done, so i'm thinking that's when the new qsub is done there is a problem, and the qsub is submitted on the flow, that's why i would try to put a retry on the flow not on the task
I don't know if my explication is better 😅
a
we don't have flow-level retries as of now, but this is in the backlog already and may be added in the future; re your issue, I'm not sure whether it makes sense to submit 1-task flows to your PBS cluster this way. Are you just doing it to explore various options, or is there some concrete problem you are trying to solve this way? moving the loop to the PBS-DaskTaskRunner-flow makes more sense:
Copy code
def hello_world_task_flow(x):
    for i in range(10):
        my_mapped_task(i)
t
I'm doing it this way because the graph that i need to produce for my project gonna be a flow of subflows with many tasks in each subflows ( more than 1 but it's to simplify the helloworld )
I think flow/subflow-level retries could be a good solution, but if i can found a temporary solution to my problem it would be so great 🙂
a
I see, but do you believe you need to map over flows? you could map/parallelize your data flow operations in one flow and then use some parent flow just to call those subflows, not necessarily via mapping, right? It all depends on your use case
t
I don't understand the question, the for is a map for you ? I don't need this for/map in my real use case but i will need to have one parent flow that has subflows with tasks, so i can rewrite my hello_world with subflow like this in order to be close to my real use case, is this what you mean to when saying parallelize ? :
Copy code
from prefect import flow

from prefect_graph.hello_world_task import hello_world_task_flow


@flow
def hello_world_flow():
        hello_world_task_flow(1)
        hello_world_task_flow(2)
        hello_world_task_flow(3)
        hello_world_task_flow(4)
        hello_world_task_flow(5)
        hello_world_task_flow(6)
        hello_world_task_flow(7)
        hello_world_task_flow(8)
        hello_world_task_flow(9)
        hello_world_task_flow(10)


if __name__ == "__main__":
    hello_world_flow()
a
yup exactly, for loop is the same as mapping = looping over some function with your business logic (it may be a task or subflow). by parallelized I meant running tasks in parallel
my main point is: each of your child flows that needs parallelism may use either Dask or Ray or just a
CuncurrentTaskRunner
for parallelism and you can decide on which one to use based on the business logic of each subflow. and then your main flow can just trigger the subflows in the right order, but you don't necessarily need to map over subflows - your use case with passing an incremental number to each subflow seem unlikely in practice, would you agree? 🙂 a more likely scenario would be something like this - only subflow_2 doing something where PBS Cluster may be helpful in some way:
Copy code
@flow
def process_ml_workflow():
    # subflow 1 is doing IO operation so it may be more efficient to use ConcurrentTaskRunner rather than PBS DaskCluster
    subflow_1 = extract_raw_data_concurrently_and_load_to_some_destination()
    subflow_2 = read_and_process_data_in_parallel(wait_for=[subflow_1]) # this may use PBS
    subflow_3 = train_some_ml_model_and_pickle_it(wait_for=[subflow_2])
t
Yes i agree 🙂 I will take your version and adapt it to my real use case thank you a lot for all your help during these past days, you was very helpful and you explain the things in a very simple way :)
🙌 1