Thomas Mignon
04/06/2022, 8:22 AMimport time
from dask_jobqueue import PBSCluster
from prefect.executors import DaskExecutor
from prefect import Flow, task
@task
def times():
return [1, 5, 10]
@task
def sleep(x):
print(f'sleeping for {x}')
time.sleep(x)
print(f'done sleeping for {x}')
return x
with Flow("mapping-test") as flow:
sleep.map(sleep.map(times))
if __name__ == "__main__":
flow.run(executor=DaskExecutor(
cluster_class=PBSCluster,
cluster_kwargs={
"cores": 1,
"memory": '24GB',
"queue": 'sequentiel',
"walltime": '00:05:00',
"interface": 'ib0',
"n_workers": 1
}
))
launched with python myhelloworld.py
, i got an output file dask_worker.o**** with my prints, the qsub was done well by dask on the PBS cluster
and if i do the same helloworld with prefect 2 :
import time
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner
@task
def times():
return [1, 5, 10]
@task
def sleep(x):
print(f'sleeping for {x}')
time.sleep(x)
print(f'done sleeping for {x}')
return x
if __name__ == "__main__":
@flow(task_runner=DaskTaskRunner(
cluster_class="dask_jobqueue.PBSCluster",
cluster_kwargs={
"cores": 1,
"memory": '24GB',
"queue": 'sequentiel',
"walltime": '00:05:00',
"interface": 'ib0',
"n_workers": 1
}
))
def hello_world_flow():
sleep.map(sleep.map(times))
I have no output file at the location where i launch my python script, i have doubt that the qsub was really submitted or maybe there is an option that has been set in prefect 2 to set the output in another location ? I'm a bit lost 🙂Anna Geller
04/06/2022, 11:12 AMfrom prefect import flow, task
from prefect.task_runners import DaskTaskRunner
@task
def my_mapped_task(x):
return x
@task
def my_mapped_task_2(x):
return x
@task
def list_task():
return [1, 2, 3, 4, 5]
@flow(task_runner=task_runner=DaskTaskRunner(
cluster_class="dask_jobqueue.PBSCluster",
cluster_kwargs={
"cores": 1,
"memory": '24GB',
"queue": 'sequentiel',
"walltime": '00:05:00',
"interface": 'ib0',
"n_workers": 1
}
))
def my_flow():
for i in range(10):
my_mapped_task(i)
if __name__ == "__main__":
my_flow()
Thomas Mignon
04/06/2022, 11:35 AMfrom prefect import flow, task
from prefect.task_runners import DaskTaskRunner
@task
def my_mapped_task(x):
print(x)
if __name__ == "__main__":
@flow(task_runner=DaskTaskRunner(
cluster_class="dask_jobqueue.PBSCluster",
cluster_kwargs={
"cores": 1,
"memory": '24GB',
"queue": 'sequentiel',
"walltime": '00:05:00',
"interface": 'ib0',
"n_workers": 1
}
))
def hello_world_flow():
for i in range(10):
my_mapped_task(i)
But this is not better, i don't have any output file from daskAnna Geller
04/06/2022, 12:42 PMfrom prefect import flow, task
from prefect.task_runners import DaskTaskRunner
@task
def my_mapped_task(x):
print(x)
@flow(task_runner=DaskTaskRunner(
cluster_class="dask_jobqueue.PBSCluster",
cluster_kwargs={
"cores": 1,
"memory": '24GB',
"queue": 'sequentiel',
"walltime": '00:05:00',
"interface": 'ib0',
"n_workers": 1
}
))
def hello_world_flow():
for i in range(10):
my_mapped_task(i)
if __name__ == "__main__":
hello_world_flow()
main is a gate to start some process - here to trigger a flow run from the flow function. Your flow shouldn't be in main - this way you couldn't pass it e.g. to DeploymentSpec
what do you mean by output file from Dask?Thomas Mignon
04/06/2022, 12:53 PMAnna Geller
04/06/2022, 1:16 PMThomas Mignon
04/06/2022, 1:27 PMdistributed.nanny - INFO - Start Nanny at: '<tcp://10.148.0.181:54336>'
distributed.worker - INFO - Start worker at: <tcp://10.148.0.181:35490>
distributed.worker - INFO - Listening to: <tcp://10.148.0.181:35490>
distributed.worker - INFO - dashboard at: 10.148.0.181:40645
distributed.worker - INFO - Waiting to connect to: <tcp://10.148.1.146:55485>
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 22.35 GiB
distributed.worker - INFO - Local Directory: /dev/shm/pbs.9835088.datarmor0/dask-worker-space/worker-apnm7ik1
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: <tcp://10.148.1.146:55485>
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.core - INFO - Event loop was unresponsive in Worker for 12.74s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client.py:1195: UserWarning: No default storage has been set on the server. Using temporary local storage for results.
warnings.warn(
13:25:49.297 | INFO | Task run 'my_mapped_task-ca1f3dbb-0' - Finished in state Completed(None)
13:25:51.325 | INFO | Task run 'my_mapped_task-ca1f3dbb-1' - Finished in state Completed(None)
13:25:53.381 | INFO | Task run 'my_mapped_task-ca1f3dbb-2' - Finished in state Completed(None)
13:25:55.389 | INFO | Task run 'my_mapped_task-ca1f3dbb-3' - Finished in state Completed(None)
13:25:57.393 | INFO | Task run 'my_mapped_task-ca1f3dbb-4' - Finished in state Completed(None)
13:25:59.413 | INFO | Task run 'my_mapped_task-ca1f3dbb-5' - Finished in state Completed(None)
13:26:01.417 | INFO | Task run 'my_mapped_task-ca1f3dbb-6' - Finished in state Completed(None)
13:26:03.425 | INFO | Task run 'my_mapped_task-ca1f3dbb-7' - Finished in state Completed(None)
13:26:05.441 | INFO | Task run 'my_mapped_task-ca1f3dbb-8' - Finished in state Completed(None)
13:26:07.586 | INFO | Task run 'my_mapped_task-ca1f3dbb-9' - Finished in state Completed(None)
distributed.dask_worker - INFO - Exiting on signal 15
from prefect import flow
from prefect_graph.hello_world_task import hello_world_task_flow
@flow
def hello_world_flow():
for i in range(10):
hello_world_task_flow(i)
if __name__ == "__main__":
hello_world_flow()
and hello_world_task.py who is :
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner
@flow(task_runner=DaskTaskRunner(
cluster_class="dask_jobqueue.PBSCluster",
cluster_kwargs={
"cores": 1,
"memory": '24GB',
"queue": 'sequentiel',
"walltime": '00:05:00',
"interface": 'ib0',
"n_workers": 1
}
))
def hello_world_task_flow(x):
my_mapped_task(x)
@task
def my_mapped_task(x):
print(x)
I got the same error in the dask-worker.o* a timeout error, for the 10 flow that are called
distributed.nanny - INFO - Start Nanny at: '<tcp://10.148.1.114:59860>'
distributed.worker - INFO - Start worker at: <tcp://10.148.1.114:37373>
distributed.worker - INFO - Listening to: <tcp://10.148.1.114:37373>
distributed.worker - INFO - dashboard at: 10.148.1.114:44616
distributed.worker - INFO - Waiting to connect to: <tcp://10.148.1.146:40619>
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 22.35 GiB
distributed.worker - INFO - Local Directory: /dev/shm/pbs.9835069.datarmor0/dask-worker-space/worker-0oganzzi
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: <tcp://10.148.1.146:40619>
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.core - INFO - Event loop was unresponsive in Worker for 12.55s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.worker - WARNING - Compute Failed
Function: begin_task_run
args: ()
kwargs: {'task': <prefect.tasks.Task object at 0x2aaab091ed90>, 'task_run': TaskRun(id=UUID('bd7c31c6-f577-49ee-a438-f7db41ec1069'), name='my_mapped_task-60f685bd-2', flow_run_id=UUID('9b7aee9e-6453-4e36-8518-cedfa012f375'), task_key='60f685bd73a2d907cce1732888a4657e', dynamic_key='2', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0), tags=[], state_id=UUID('63e4ad1a-d7e4-4b33-8631-ac205202d59d'), task_inputs={'x': []}, state_type=StateType.PENDING, run_count=0, expected_start_time=datetime.datetime(2022, 4, 6, 13, 23, 6, 309348, tzinfo=datetime.timezone.utc), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microseconds=61193), state=Pending(message=None, type=PENDING, result=None, task_run_id=bd7c31c6-f577-49ee-a438-f7db41ec1069)), 'parameters': {'x': 2}, 'wait_for': None, '
Exception: 'TimeoutError()'
Terminated
distributed.dask_worker - INFO - Exiting on signal 15
distributed.nanny - INFO - Closing Nanny at '<tcp://10.148.1.114:59860>'
distributed.nanny - INFO - Worker process 26133 was killed by signal 15
distributed.dask_worker - INFO - End worker
Anna Geller
04/06/2022, 1:35 PMfrom prefect import get_run_logger, task, flow
@task
def compute_something(x):
logger = get_run_logger()
<http://logger.info|logger.info>("Computing: %d x 2 = %d", x, x * 2)
time.sleep(2)
Thomas Mignon
04/06/2022, 1:37 PM13:55:00.356 | INFO | prefect.engine - Created flow run 'rigorous-hog' for flow 'hello-world-flow'
13:55:00.356 | INFO | Flow run 'rigorous-hog' - Using task runner 'ConcurrentTaskRunner'
13:55:01.420 | INFO | Flow run 'rigorous-hog' - Created subflow run 'spectral-goldfish' for flow 'hello-world-task-flow'
13:55:01.422 | INFO | prefect.task_runner.dask - Creating a new Dask cluster with `dask_jobqueue.pbs.PBSCluster`
13:55:01.989 | INFO | prefect.task_runner.dask - The Dask dashboard is available at <http://10.148.1.145:8787/status>
13:55:02.960 | INFO | Flow run 'spectral-goldfish' - Created task run 'my_mapped_task-60f685bd-0' for task 'my_mapped_task'
/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client.py:1195: UserWarning: No default storage has been set on the server. Using temporary local storage for results.
warnings.warn(
13:55:29.286 | INFO | Flow run 'spectral-goldfish' - Finished in state Completed('All states completed.')
13:55:30.308 | INFO | Flow run 'rigorous-hog' - Created subflow run 'pink-bulldog' for flow 'hello-world-task-flow'
13:55:30.310 | INFO | prefect.task_runner.dask - Creating a new Dask cluster with `dask_jobqueue.pbs.PBSCluster`
13:55:30.753 | INFO | prefect.task_runner.dask - The Dask dashboard is available at <http://10.148.1.145:8787/status>
13:55:31.428 | INFO | Flow run 'pink-bulldog' - Created task run 'my_mapped_task-60f685bd-1' for task 'my_mapped_task'
/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client.py:1195: UserWarning: No default storage has been set on the server. Using temporary local storage for results.
warnings.warn(
13:56:02.864 | INFO | Flow run 'pink-bulldog' - Finished in state Completed('All states completed.')
13:56:04.668 | INFO | Flow run 'rigorous-hog' - Created subflow run 'offbeat-mamba' for flow 'hello-world-task-flow'
13:56:04.670 | INFO | prefect.task_runner.dask - Creating a new Dask cluster with `dask_jobqueue.pbs.PBSCluster`
13:56:04.975 | INFO | prefect.task_runner.dask - The Dask dashboard is available at <http://10.148.1.145:8787/status>
13:56:06.351 | INFO | Flow run 'offbeat-mamba' - Created task run 'my_mapped_task-60f685bd-2' for task 'my_mapped_task'
/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client.py:1195: UserWarning: No default storage has been set on the server. Using temporary local storage for results.
warnings.warn(
13:56:27.540 | ERROR | Flow run 'offbeat-mamba' - Crash detected! Execution was interrupted by an unexpected exception.
13:56:27.965 | ERROR | Flow run 'rigorous-hog' - Encountered exception during execution:
Traceback (most recent call last):
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/engine.py", line 456, in orchestrate_flow_run
result = await run_sync_in_worker_thread(flow_call)
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 52, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(context.run, call, cancellable=True)
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/anyio/to_thread.py", line 28, in run_sync
return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 818, in run_sync_in_worker_thread
return await future
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 754, in run
result = context.run(func, *args)
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect_graph/hello_world.py", line 9, in hello_world_flow
hello_world_task_flow(i)
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/flows.py", line 314, in __call__
return enter_flow_run_engine_from_flow_call(self, parameters)
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/engine.py", line 114, in enter_flow_run_engine_from_flow_call
return run_async_from_worker_thread(begin_run)
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 63, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/anyio/from_thread.py", line 35, in run
return asynclib.run_async_from_thread(func, *args)
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 847, in run_async_from_thread
return f.result()
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/concurrent/futures/_base.py", line 445, in result
return self.__get_result()
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
raise self._exception
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client.py", line 82, in with_injected_client
return await fn(*args, **kwargs)
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/engine.py", line 369, in create_and_begin_subflow_run
terminal_state = await orchestrate_flow_run(
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/contextlib.py", line 670, in __aexit__
raise exc_details[1]
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/contextlib.py", line 199, in __aexit__
await self.gen.athrow(typ, value, traceback)
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/engine.py", line 866, in report_flow_run_crashes
raise exc from None
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/engine.py", line 849, in report_flow_run_crashes
yield
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/contextlib.py", line 653, in __aexit__
cb_suppress = await cb(*exc_details)
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/contextlib.py", line 188, in __aexit__
await self.gen.__anext__()
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/task_runners.py", line 158, in start
self._started = False
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/contextlib.py", line 670, in __aexit__
raise exc_details[1]
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/contextlib.py", line 653, in __aexit__
cb_suppress = await cb(*exc_details)
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/distributed/deploy/cluster.py", line 474, in __aexit__
await f
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/distributed/deploy/spec.py", line 414, in _close
assert w.status == Status.closed, w.status
AssertionError: Status.running
/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client.py:1195: UserWarning: No default storage has been set on the server. Using temporary local storage for results.
warnings.warn(
13:56:28.924 | ERROR | Flow run 'rigorous-hog' - Finished in state Failed('Flow run encountered an exception.')
Thus no job control in this shell.
distributed.nanny - INFO - Start Nanny at: '<tcp://10.148.0.171:53913>'
distributed.worker - INFO - Start worker at: <tcp://10.148.0.171:48407>
distributed.worker - INFO - Listening to: <tcp://10.148.0.171:48407>
distributed.worker - INFO - dashboard at: 10.148.0.171:52201
distributed.worker - INFO - Waiting to connect to: <tcp://10.148.1.145:57021>
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 22.35 GiB
distributed.worker - INFO - Local Directory: /dev/shm/pbs.9835312.datarmor0/dask-worker-space/worker-q7ltl636
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: <tcp://10.148.1.145:57021>
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.core - INFO - Event loop was unresponsive in Worker for 9.00s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
13:56:24.283 | INFO | Task run 'my_mapped_task-60f685bd-2' - This is x : 2
/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client.py:1195: UserWarning: No default storage has been set on the server. Using temporary local storage for results.
warnings.warn(
13:56:26.897 | INFO | Task run 'my_mapped_task-60f685bd-2' - Finished in state Completed(None)
distributed.dask_worker - INFO - Exiting on signal 15
distributed.nanny - INFO - Closing Nanny at '<tcp://10.148.0.171:53913>'
Terminated
distributed.nanny - INFO - Worker process 925 was killed by signal 15
distributed.dask_worker - INFO - End worker
Anna Geller
04/06/2022, 2:16 PMUserWarning: No default storage has been set on the server. Using temporary local storage for results.
you can create and set default storage using:
prefect storage create
Thomas Mignon
04/07/2022, 6:53 AM07:34:03.307 | INFO | prefect.engine - Created flow run 'armored-whale' for flow 'hello-world-flow'
07:34:03.308 | INFO | Flow run 'armored-whale' - Using task runner 'ConcurrentTaskRunner'
07:34:04.387 | INFO | Flow run 'armored-whale' - Created subflow run 'shrewd-marmot' for flow 'hello-world-task-flow'
07:34:04.389 | INFO | prefect.task_runner.dask - Creating a new Dask cluster with `dask_jobqueue.pbs.PBSCluster`
07:34:04.615 | INFO | prefect.task_runner.dask - The Dask dashboard is available at <http://10.148.1.146:8787/status>
07:34:05.165 | INFO | Flow run 'shrewd-marmot' - Created task run 'my_mapped_task-60f685bd-0' for task 'my_mapped_task'
07:34:17.860 | INFO | Flow run 'shrewd-marmot' - Finished in state Completed('All states completed.')
07:34:18.871 | INFO | Flow run 'armored-whale' - Created subflow run 'nifty-lizard' for flow 'hello-world-task-flow'
07:34:18.873 | INFO | prefect.task_runner.dask - Creating a new Dask cluster with `dask_jobqueue.pbs.PBSCluster`
07:34:18.949 | INFO | prefect.task_runner.dask - The Dask dashboard is available at <http://10.148.1.146:8787/status>
07:34:19.497 | INFO | Flow run 'nifty-lizard' - Created task run 'my_mapped_task-60f685bd-1' for task 'my_mapped_task'
07:34:45.914 | INFO | Flow run 'nifty-lizard' - Finished in state Completed('All states completed.')
07:34:46.799 | INFO | Flow run 'armored-whale' - Created subflow run 'ambitious-sponge' for flow 'hello-world-task-flow'
07:34:46.801 | INFO | prefect.task_runner.dask - Creating a new Dask cluster with `dask_jobqueue.pbs.PBSCluster`
07:34:47.043 | INFO | prefect.task_runner.dask - The Dask dashboard is available at <http://10.148.1.146:8787/status>
07:34:47.589 | INFO | Flow run 'ambitious-sponge' - Created task run 'my_mapped_task-60f685bd-2' for task 'my_mapped_task'
07:35:01.606 | ERROR | Flow run 'ambitious-sponge' - Crash detected! Execution was interrupted by an unexpected exception.
So i'm trying to put a retries mechanism on my task but it doesn't work, here is what i do and nothing change, no retry is tried :
@flow(task_runner=DaskTaskRunner(
cluster_class="dask_jobqueue.PBSCluster",
cluster_kwargs={
"cores": 1,
"memory": '24GB',
"queue": 'mpi_4',
"resource_spec": "select=4:ncpus=28:mem=120gb",
"walltime": '00:05:00',
"interface": 'ib0',
"n_workers": 1
}
))
def hello_world_task_flow(x):
my_mapped_task(x)
@task(retries=10, retry_delay_seconds=5)
def my_mapped_task(x):
logger = get_run_logger()
<http://logger.info|logger.info>("This is x : %d", x)
time.sleep(2)
Maybe i have to put a retry on the flow too ( which doesn't seems to be something do-able in prefect ) ? Because maybe it's not the task itself that crash, but the flow when it does the qsub to PBS so when it crash we never reach the task because crash occurs in the flowAnna Geller
04/07/2022, 11:19 AMmy_mapped_task
. Not much for PBSCluster to do? Maybe you can try this again (which I remember worked):
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner
@task
def my_mapped_task(x):
print(x)
@flow(task_runner=DaskTaskRunner(
cluster_class="dask_jobqueue.PBSCluster",
cluster_kwargs={
"cores": 1,
"memory": '24GB',
"queue": 'sequentiel',
"walltime": '00:05:00',
"interface": 'ib0',
"n_workers": 1
}
))
def hello_world_flow():
for i in range(10):
my_mapped_task(i)
if __name__ == "__main__":
hello_world_flow()
looks like you changed the queue and resource spec as compared to the last time so maybe this can be the issue?Thomas Mignon
04/07/2022, 1:24 PM@flow(task_runner=DaskTaskRunner(
cluster_class="dask_jobqueue.PBSCluster",
cluster_kwargs={
"cores": 1,
"memory": '24GB',
"queue": 'sequentiel',
"walltime": '00:50:00',
"interface": 'ib0',
"n_workers": 1
}
))
def hello_world_task_flow(x):
my_mapped_task(x)
@task(retries=10, retry_delay_seconds=5)
def my_mapped_task(x):
logger = get_run_logger()
<http://logger.info|logger.info>("This is x : %d", x)
time.sleep(2)
and
from prefect import flow
from prefect_graph.hello_world_task import hello_world_task_flow
@flow
def hello_world_flow():
for i in range(10):
hello_world_task_flow(i)
if __name__ == "__main__":
hello_world_flow()
But it crash randomly, between two task, and the last task was successfully done, so i'm thinking that's when the new qsub is done there is a problem, and the qsub is submitted on the flow, that's why i would try to put a retry on the flow not on the taskAnna Geller
04/07/2022, 5:02 PMdef hello_world_task_flow(x):
for i in range(10):
my_mapped_task(i)
Thomas Mignon
04/07/2022, 7:22 PMAnna Geller
04/08/2022, 10:26 AMThomas Mignon
04/08/2022, 11:47 AMfrom prefect import flow
from prefect_graph.hello_world_task import hello_world_task_flow
@flow
def hello_world_flow():
hello_world_task_flow(1)
hello_world_task_flow(2)
hello_world_task_flow(3)
hello_world_task_flow(4)
hello_world_task_flow(5)
hello_world_task_flow(6)
hello_world_task_flow(7)
hello_world_task_flow(8)
hello_world_task_flow(9)
hello_world_task_flow(10)
if __name__ == "__main__":
hello_world_flow()
Anna Geller
04/08/2022, 12:07 PMCuncurrentTaskRunner
for parallelism and you can decide on which one to use based on the business logic of each subflow. and then your main flow can just trigger the subflows in the right order, but you don't necessarily need to map over subflows - your use case with passing an incremental number to each subflow seem unlikely in practice, would you agree? 🙂
a more likely scenario would be something like this - only subflow_2 doing something where PBS Cluster may be helpful in some way:
@flow
def process_ml_workflow():
# subflow 1 is doing IO operation so it may be more efficient to use ConcurrentTaskRunner rather than PBS DaskCluster
subflow_1 = extract_raw_data_concurrently_and_load_to_some_destination()
subflow_2 = read_and_process_data_in_parallel(wait_for=[subflow_1]) # this may use PBS
subflow_3 = train_some_ml_model_and_pickle_it(wait_for=[subflow_2])
Thomas Mignon
04/08/2022, 1:31 PM