Thomas Mignon
05/03/2022, 7:29 AMThomas Mignon
05/03/2022, 7:32 AM@task
def semaphore_task(command, command_parameter):
logger = get_run_logger()
time.sleep(10)
<http://logger.info|logger.info>("Inside task purify")
workspace_path: Path = Path(
"/home1/datawork/semexp/workspace/dagit/running/",
'{0}-{1}'.format(
command,
os.getenv('PBS_JOBID')
)
)
<http://logger.info|logger.info>('workspace_path: {0}'.format(workspace_path))
template_runner.TemplateRunner(
command=command,
version="3.4.23",
command_parameter=command_parameter,
workspace=workspace_path,
skip_signal=True,
config_file=""
).run(exit=False)
@flow(task_runner=DaskTaskRunner(
cluster_class="dask_jobqueue.PBSCluster",
cluster_kwargs={
"cores": 1,
"memory": '24GB',
"queue": 'mpi_1',
"resource_spec": "select=1:ncpus=28:mem=120gb",
"walltime": '00:05:00',
"interface": 'ib0',
"n_workers": 1
}
))
def purify_flow():
# semaphore_task(command, command_parameter)
semaphore_task("purify_raw_logs", "/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/2022 eftp1 -f 2022-01-01-eftp1.gz")
semaphore_task("purify_raw_logs", "/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp2/2022 eftp2 -f 2022-01-01-eftp2.gz")
semaphore_task("purify_raw_logs", "/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublic/2022 vpublic -f 2022-01-01-vpublic.gz")
semaphore_task("purify_raw_logs", "/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublicnew/2022 vpublicnew -f 2022-01-01-vpublicnew.gz")
semaphore_task("purify_raw_logs", "/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp2/2022 ftp2 -f access.log.20220101.gz")
semaphore_task("purify_raw_logs", "/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp/2022 ftp -f access.log.20220101.gz")
@flow
def main_flow():
purify_flow()
if __name__ == "__main__":
main_flow()
Main flow called another flow named purify_flow who call 6 times semaphore_task, this seems to be parralel BUT the qsub is done only one time ( on the flow ) and all the task are done inside this qsub which is not what i want, i want one qsub for each task, i've tried something like this too:
import os
import time
from pathlib import Path
from prefect import flow, get_run_logger, task
from prefect.task_runners import DaskTaskRunner
from semaphore_audience.utils import template_runner
@task
def semaphore_task(command, command_parameter):
logger = get_run_logger()
time.sleep(10)
<http://logger.info|logger.info>("Inside task purify")
workspace_path: Path = Path(
"/home1/datawork/semexp/workspace/dagit/running/",
'{0}-{1}'.format(
command,
os.getenv('PBS_JOBID')
)
)
<http://logger.info|logger.info>('workspace_path: {0}'.format(workspace_path))
template_runner.TemplateRunner(
command=command,
version="3.4.23",
command_parameter=command_parameter,
workspace=workspace_path,
skip_signal=True,
config_file=""
).run(exit=False)
@flow(task_runner=DaskTaskRunner(
cluster_class="dask_jobqueue.PBSCluster",
cluster_kwargs={
"cores": 1,
"memory": '24GB',
"queue": 'mpi_1',
"resource_spec": "select=1:ncpus=28:mem=120gb",
"walltime": '00:05:00',
"interface": 'ib0',
"n_workers": 1
}
))
def purify_flow(command, command_parameter):
semaphore_task(command, command_parameter)
@flow
def main_flow():
purify_flow(command="purify_raw_logs",
command_parameter="/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/2022 eftp1 -f 2022-01-01-eftp1.gz")
purify_flow("purify_raw_logs",
"/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp2/2022 eftp2 -f 2022-01-01-eftp2.gz")
purify_flow("purify_raw_logs",
"/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublic/2022 vpublic -f 2022-01-01-vpublic.gz")
purify_flow("purify_raw_logs",
"/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublicnew/2022 vpublicnew -f 2022-01-01-vpublicnew.gz")
purify_flow("purify_raw_logs",
"/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp2/2022 ftp2 -f access.log.20220101.gz")
purify_flow("purify_raw_logs",
"/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp/2022 ftp -f access.log.20220101.gz")
if __name__ == "__main__":
main_flow()
Main flow who call 6 times purify_flow who call the task in each flow seems to be what i want BUT the problem is that it is not parralel because the main_flow doesn't have the task_runner=DaskExecutor annotation, and i need to have the qsub done on purify_flow not on main_flowAnna Geller
paths = ["/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/2022 eftp1 -f 2022-01-01-eftp1.gz", "...", ...]
for path in paths:
semaphore_task("purify_raw_logs", path)
note that task runner is to run tasks in parallelThomas Mignon
05/03/2022, 12:17 PMThomas Mignon
05/03/2022, 12:18 PMThomas Mignon
05/03/2022, 12:18 PMThomas Mignon
05/03/2022, 12:19 PMimport os
import time
from pathlib import Path
from prefect import flow, get_run_logger, task
from prefect.task_runners import DaskTaskRunner
from semaphore_audience.utils import template_runner
@task
def semaphore_task(command, command_parameter):
logger = get_run_logger()
time.sleep(10)
<http://logger.info|logger.info>("Inside task purify")
workspace_path: Path = Path(
"/home1/datawork/semexp/workspace/dagit/running/",
'{0}-{1}'.format(
command,
os.getenv('PBS_JOBID')
)
)
<http://logger.info|logger.info>('workspace_path: {0}'.format(workspace_path))
template_runner.TemplateRunner(
command=command,
version="3.4.23",
command_parameter=command_parameter,
workspace=workspace_path,
skip_signal=True,
config_file=""
).run(exit=False)
@flow(task_runner=DaskTaskRunner(
cluster_class="dask_jobqueue.PBSCluster",
cluster_kwargs={
"cores": 1,
"memory": '24GB',
"queue": 'mpi_1',
"resource_spec": "select=1:ncpus=28:mem=120gb",
"walltime": '00:05:00',
"interface": 'ib0',
"n_workers": 1
}
))
def purify_flow():
# semaphore_task(command, command_parameter)
paths = [
'/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/2022 eftp1 -f 2022-01-01-eftp1.gz',
'/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp2/2022 eftp2 -f 2022-01-01-eftp2.gz',
'/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublic/2022 vpublic -f 2022-01-01-vpublic.gz',
'/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublicnew/2022 vpublicnew -f 2022-01-01-vpublicnew.gz',
'/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp2/2022 ftp2 -f access.log.20220101.gz',
'/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp/2022 ftp -f access.log.20220101.gz'
]
for path in paths:
semaphore_task("purify_raw_logs", path)
@flow
def main_flow():
purify_flow()
if __name__ == "__main__":
main_flow()
Thomas Mignon
05/03/2022, 12:24 PMimport os
import time
from pathlib import Path
from prefect import flow, get_run_logger, task
from prefect.task_runners import DaskTaskRunner
from semaphore_audience.utils import template_runner
@task
def semaphore_task(command, command_parameter):
logger = get_run_logger()
time.sleep(10)
<http://logger.info|logger.info>("Inside task purify")
workspace_path: Path = Path(
"/home1/datawork/semexp/workspace/dagit/running/",
'{0}-{1}'.format(
command,
os.getenv('PBS_JOBID')
)
)
<http://logger.info|logger.info>('workspace_path: {0}'.format(workspace_path))
template_runner.TemplateRunner(
command=command,
version="3.4.23",
command_parameter=command_parameter,
workspace=workspace_path,
skip_signal=True,
config_file=""
).run(exit=False)
@flow(task_runner=DaskTaskRunner(
cluster_class="dask_jobqueue.PBSCluster",
cluster_kwargs={
"cores": 1,
"memory": '24GB',
"queue": 'mpi_1',
"resource_spec": "select=1:ncpus=28:mem=120gb",
"walltime": '00:05:00',
"interface": 'ib0',
"n_workers": 1
}
))
def purify_flow(command, command_parameter):
semaphore_task(command, command_parameter)
@flow
def main_flow():
paths = [
'/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/2022 eftp1 -f 2022-01-01-eftp1.gz',
'/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp2/2022 eftp2 -f 2022-01-01-eftp2.gz',
'/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublic/2022 vpublic -f 2022-01-01-vpublic.gz',
'/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublicnew/2022 vpublicnew -f 2022-01-01-vpublicnew.gz',
'/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp2/2022 ftp2 -f access.log.20220101.gz',
'/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp/2022 ftp -f access.log.20220101.gz'
]
for path in paths:
purify_flow("purify_raw_logs", path)
if __name__ == "__main__":
main_flow()
It's sequential even if it's better because i have 6 * 1 qsub, but i need to do this as parallelAnna Geller
@flow
def main_flow():
purify_flow()
Anna Geller
It's sequential even if it's better because i have 6 * 1 qsub, but i need to do this as parallelSince you said: it's better - Are you good now? Do you still need help with this? the main issue is that to run tasks in parallel, you must call tasks in a for loop (mapping support will be added, for now, you need a for loop) and assign a task runner e.g. with your Dask cluster configuration.
Thomas Mignon
05/03/2022, 12:50 PM@flow(task_runner=DaskTaskRunner(
cluster_class="dask_jobqueue.PBSCluster",
cluster_kwargs={
"cores": 1,
"memory": '24GB',
"queue": 'mpi_1',
"resource_spec": "select=1:ncpus=28:mem=120gb",
"walltime": '00:05:00',
"interface": 'ib0',
"n_workers": 1
}
))
def purify_flow(command, command_parameter):
semaphore_task(command, command_parameter)
@flow
def main_flow():
paths = [
'/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/2022 eftp1 -f 2022-01-01-eftp1.gz',
'/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp2/2022 eftp2 -f 2022-01-01-eftp2.gz',
'/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublic/2022 vpublic -f 2022-01-01-vpublic.gz',
'/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublicnew/2022 vpublicnew -f 2022-01-01-vpublicnew.gz',
'/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp2/2022 ftp2 -f access.log.20220101.gz',
'/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp/2022 ftp -f access.log.20220101.gz'
]
for path in paths:
purify_flow("purify_raw_logs", path)
it's different its a flow of subflow, and it seems more close to what i want but it's sequential, i could put a DaskTaskRunner on the main flow to be parallel but it will create one parent qsub for the flow, and one child qsub for the subflow, i don't know if i'm clear ..
And what i want is not not task ( if possible then great too but for the moment i have only one thing to do in task so parallel/sequential is the same)to be in parallel but subflowsAnna Geller
Thomas Mignon
05/05/2022, 7:52 AMimport os
import time
from pathlib import Path
from prefect import flow, get_run_logger, task
from prefect.task_runners import DaskTaskRunner
from semaphore_audience.utils import template_runner
@task
def semaphore_task(command, command_parameter):
logger = get_run_logger()
time.sleep(10)
<http://logger.info|logger.info>("Inside task purify")
@flow(task_runner=DaskTaskRunner(
cluster_class="dask_jobqueue.PBSCluster",
cluster_kwargs={
"cores": 1,
"memory": '2GB',
"queue": 'sequentiel',
"walltime": '00:10:00',
"interface": 'ib0',
"n_workers": 1
}
))
def purify_flow(command, command_parameter):
semaphore_task(command, command_parameter)
@flow
def main_flow():
paths = [
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/2022 eftp1 -f 2022-01-01-eftp1.gz'),
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp2/2022 eftp2 -f 2022-01-01-eftp2.gz'),
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublic/2022 vpublic -f 2022-01-01-vpublic.gz'),
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublicnew/2022 vpublicnew -f 2022-01-01-vpublicnew.gz'),
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp2/2022 ftp2 -f access.log.20220101.gz'),
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp/2022 ftp -f access.log.20220101.gz')
]
for p in paths:
p
if __name__ == "__main__":
main_flow()
I will try to reexplain where am i, let me restart from the beggining =>Thomas Mignon
05/05/2022, 7:53 AMThomas Mignon
05/05/2022, 8:12 AMimport os
import time
from pathlib import Path
from prefect import flow, get_run_logger, task
from prefect.task_runners import DaskTaskRunner
from semaphore_audience.utils import template_runner
@task
def semaphore_task(command, command_parameter):
logger = get_run_logger()
time.sleep(10)
<http://logger.info|logger.info>("Inside task purify")
@flow(task_runner=DaskTaskRunner(
cluster_class="dask_jobqueue.PBSCluster",
cluster_kwargs={
"cores": 1,
"memory": '2GB',
"queue": 'sequentiel',
"walltime": '00:10:00',
"interface": 'ib0',
"n_workers": 1
}
))
def purify_flow(command, command_parameter):
semaphore_task(command, command_parameter)
@flow
def main_flow():
paths = [
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/2022 eftp1 -f 2022-01-01-eftp1.gz'),
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp2/2022 eftp2 -f 2022-01-01-eftp2.gz'),
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublic/2022 vpublic -f 2022-01-01-vpublic.gz'),
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublicnew/2022 vpublicnew -f 2022-01-01-vpublicnew.gz'),
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp2/2022 ftp2 -f access.log.20220101.gz'),
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp/2022 ftp -f access.log.20220101.gz')
]
for p in paths:
p
if __name__ == "__main__":
main_flow()
2. In this case, With an asynchronous approach, with only the DaskTaskRunnner, i'm getting 6 daskTaskRunner start really in parralel, so in the next case i will try to add the PBS config because if 6 dask cluster are started in parralel, why not the qsub can be done in each after ?
import os
import time
import asyncio
from pathlib import Path
from prefect import flow, get_run_logger, task
from prefect.task_runners import DaskTaskRunner
from semaphore_audience.utils import template_runner
@task
async def semaphore_task(command, command_parameter):
logger = get_run_logger()
asyncio.sleep(10)
<http://logger.info|logger.info>("Inside task purify")
@flow(task_runner=DaskTaskRunner())
async def purify_flow(command, command_parameter):
await semaphore_task(command, command_parameter)
@flow
async def main_flow():
paths = [
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/2022 eftp1 -f 2022-01-01-eftp1.gz'),
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp2/2022 eftp2 -f 2022-01-01-eftp2.gz'),
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublic/2022 vpublic -f 2022-01-01-vpublic.gz'),
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublicnew/2022 vpublicnew -f 2022-01-01-vpublicnew.gz'),
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp2/2022 ftp2 -f access.log.20220101.gz'),
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp/2022 ftp -f access.log.20220101.gz')
]
await asyncio.gather(*paths)
if __name__ == "__main__":
asyncio.run(main_flow())
picture 2
3. Same as the previous case, with the PBS config in addition :
import os
import time
import asyncio
from pathlib import Path
from prefect import flow, get_run_logger, task
from prefect.task_runners import DaskTaskRunner
from semaphore_audience.utils import template_runner
@task
async def semaphore_task(command, command_parameter):
logger = get_run_logger()
asyncio.sleep(10)
<http://logger.info|logger.info>("Inside task purify")
@flow(task_runner=DaskTaskRunner(
cluster_class="dask_jobqueue.PBSCluster",
cluster_kwargs={
"cores": 1,
"memory": '24GB',
"queue": 'sequentiel',
"walltime": '00:05:00',
"interface": 'ib0',
"n_workers": 1
}
))
async def purify_flow(command, command_parameter):
await semaphore_task(command, command_parameter)
@flow
async def main_flow():
paths = [
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/2022 eftp1 -f 2022-01-01-eftp1.gz'),
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp2/2022 eftp2 -f 2022-01-01-eftp2.gz'),
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublic/2022 vpublic -f 2022-01-01-vpublic.gz'),
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublicnew/2022 vpublicnew -f 2022-01-01-vpublicnew.gz'),
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp2/2022 ftp2 -f access.log.20220101.gz'),
purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp/2022 ftp -f access.log.20220101.gz')
]
await asyncio.gather(*paths)
if __name__ == "__main__":
asyncio.run(main_flow())
picture 1
Dask are started sequentially, i just add the PBS config, without that, they would be started in parallel
Here is where i'm standing now, with multiple version of things that "works" but they don't fill my need, one time it's sequential, the other time with an async code it's parallel but without the config, and with the config its again sequential
I hopeThomas Mignon
05/05/2022, 8:13 AMAnna Geller
Thomas Mignon
05/05/2022, 12:28 PMThomas Mignon
05/05/2022, 12:28 PMThomas Mignon
05/05/2022, 12:29 PMAnna Geller
Anna Geller
Thomas Mignon
05/05/2022, 12:38 PMAnna Geller
Thomas Mignon
05/05/2022, 12:59 PMAnna Geller
Anna Geller
@flow
decorator.
When you call:
• a flow, it will not return until its final state is reached (blocked)
• a task, it returns a future immediately and the return value can be retrieved later.Zanie
gather
to do so at this time.Thomas Mignon
05/06/2022, 6:42 AMimport asyncio
from prefect import flow
@flow
async def subflow_1():
print("Subflow 1 started!")
await asyncio.sleep(1)
@flow
async def subflow_2():
print("Subflow 2 started!")
await asyncio.sleep(1)
@flow
async def subflow_3():
print("Subflow 3 started!")
await asyncio.sleep(1)
@flow
async def subflow_4():
print("Subflow 4 started!")
await asyncio.sleep(1)
@flow
async def main_flow():
parallel_subflows = [subflow_1(), subflow_2(), subflow_3(), subflow_4()]
await asyncio.gather(*parallel_subflows)
if __name__ == "__main__":
main_flow_state = asyncio.run(main_flow())
Output is perfect
06:26:09.801 | INFO | prefect.engine - Created flow run 'victorious-longhorn' for flow 'main-flow'
06:26:09.802 | INFO | Flow run 'victorious-longhorn' - Using task runner 'ConcurrentTaskRunner'
06:26:46.125 | INFO | Flow run 'victorious-longhorn' - Created subflow run 'uppish-newt' for flow 'subflow-2'
06:26:46.382 | INFO | Flow run 'victorious-longhorn' - Created subflow run 'golden-orca' for flow 'subflow-1'
06:26:46.694 | INFO | Flow run 'victorious-longhorn' - Created subflow run 'omniscient-tamarin' for flow 'subflow-4'
Subflow 1 started!
Subflow 4 started!
06:26:47.194 | INFO | Flow run 'victorious-longhorn' - Created subflow run 'amiable-pillbug' for flow 'subflow-3'
Subflow 3 started!
Subflow 2 started!
06:26:48.955 | INFO | Flow run 'amiable-pillbug' - Finished in state Completed(None)
06:26:49.093 | INFO | Flow run 'omniscient-tamarin' - Finished in state Completed(None)
06:26:49.186 | INFO | Flow run 'uppish-newt' - Finished in state Completed(None)
06:26:49.421 | INFO | Flow run 'golden-orca' - Finished in state Completed(None)
06:26:50.218 | INFO | Flow run 'victorious-longhorn' - Finished in state Completed('All states completed.')
Let's go to the 2)
If i'm adding a generic method subflow who can accept a parameter
import asyncio
from prefect import flow, task
@flow
async def subflow(number):
print("Subflow {0} started!".format(number))
await asyncio.sleep(1)
@flow
async def main_flow():
parallel_subflows = [subflow(1), subflow(2), subflow(3), subflow(4)]
await asyncio.gather(*parallel_subflows)
if __name__ == "__main__":
main_flow_state = asyncio.run(main_flow())
Output has error, only 2 print are here
06:40:37.149 | INFO | prefect.engine - Created flow run 'private-harrier' for flow 'main-flow'
06:40:37.150 | INFO | Flow run 'private-harrier' - Using task runner 'ConcurrentTaskRunner'
06:40:37.942 | INFO | Flow run 'private-harrier' - Created subflow run 'imported-dodo' for flow 'subflow'
Subflow 3 started!
06:40:38.458 | INFO | Flow run 'private-harrier' - Created subflow run 'speedy-lionfish' for flow 'subflow'
06:40:38.460 | ERROR | Flow run 'speedy-lionfish' - Crash detected! Execution was interrupted by an unexpected exception.
06:40:38.523 | ERROR | Flow run 'private-harrier' - Encountered exception during execution:
Traceback (most recent call last):
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/engine.py", line 454, in orchestrate_flow_run
result = await flow_call()
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect_graph/hello_world_simple_purify.py", line 18, in main_flow
await asyncio.gather(*parallel_subflows)
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client.py", line 82, in with_injected_client
return await fn(*args, **kwargs)
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/engine.py", line 367, in create_and_begin_subflow_run
task_runner = await stack.enter_async_context(flow.task_runner.start())
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/contextlib.py", line 575, in enter_async_context
result = await _cm_type.__aenter__(cm)
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/contextlib.py", line 181, in __aenter__
return await self.gen.__anext__()
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/task_runners.py", line 148, in start
raise RuntimeError("The task runner is already started!")
RuntimeError: The task runner is already started!
06:40:39.690 | INFO | Flow run 'imported-dodo' - Finished in state Completed(None)
06:40:39.914 | ERROR | Flow run 'private-harrier' - Finished in state Failed('Flow run encountered an exception.')
06:40:40.146 | INFO | Flow run 'private-harrier' - Created subflow run 'proficient-cassowary' for flow 'subflow'
Subflow 2 started!
06:40:40.545 | INFO | Flow run 'private-harrier' - Created subflow run 'silver-lemur' for flow 'subflow'
06:40:40.547 | ERROR | Flow run 'silver-lemur' - Crash detected! Execution was interrupted by an unexpected exception.
06:40:41.841 | INFO | Flow run 'proficient-cassowary' - Finished in state Completed(None)
Only 2 have correctly started
2 Bis) Flow with subflows and task
import asyncio
from prefect import flow, task
@task
async def task(number: int):
print("Subflow {0} started!".format(number))
await asyncio.sleep(1)
@flow
async def subflow_1():
await task(1)
@flow
async def subflow_2():
await task(2)
@flow
async def subflow_3():
await task(3)
@flow
async def subflow_4():
await task(4)
@flow
async def main_flow():
parallel_subflows = [subflow_1(), subflow_2(), subflow_3(), subflow_4()]
await asyncio.gather(*parallel_subflows)
if __name__ == "__main__":
main_flow_state = asyncio.run(main_flow())
Here the chaotic output a little bit truncated but you can retry all the example locally 🙂
[SQL: UPDATE task_run SET updated=strftime('%Y-%m-%d %H:%M:%f000', 'now'), state_type=?, run_count=?, start_time=? WHERE task_run.id = ?]
[parameters: ('RUNNING', 1, '2022-05-06 07:42:52.332058', '1c75de30-5752-487f-a8ef-3224fc55db89')]
(Background on this error at: <https://sqlalche.me/e/14/e3q8>)
07:44:00.389 | INFO | Task run 'task-fc800755-0' - Crash detected! Execution was interrupted by an unexpected exception.
07:44:02.844 | INFO | Task run 'task-fc800755-1' - Finished in state Completed(None)
07:44:03.360 | INFO | Flow run 'scrupulous-rooster' - Created task run 'task-fc800755-2' for task 'task'
07:44:46.979 | INFO | Flow run 'dark-booby' - Created task run 'task-fc800755-3' for task 'task'
Subflow 4 started!
07:45:31.936 | ERROR | Flow run 'adaptable-leech' - Finished in state Failed('1/1 states failed.')
07:45:31.032 | ERROR | prefect.orion - Encountered exception in request:
Traceback (most recent call last):
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1819, in _execute_context
self.dialect.do_execute(
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
cursor.execute(statement, parameters)
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 100, in execute
self._adapt_connection._handle_exception(error)
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 229, in _handle_exception
raise error
result = function()
sqlite3.OperationalError: database is locked
The above exception was the direct cause of the following exception:
Traceback (most recent call last):flow-4'", '2022-05-06 07:43:18.884574')]
(Background on this error at: <https://sqlalche.me/e/14/e3q8>)
Worker information:
Approximate queue length: 4
Pending log batch length: 3
Pending log batch size: 1080
07:46:10.516 | ERROR | prefect.orion - Encountered exception in request:
Traceback (most recent call last):
File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1819, in _execute_context
self.dialect.do_execute(
...
result = function()
sqlalchemy.exc.OperationalError: (raised as a result of Query-invoked autoflush; consider using a session.no_autoflush block if this flush is occurring prematurely)
(sqlite3.OperationalError) database is locked
[SQL: UPDATE task_run SET updated=strftime('%Y-%m-%d %H:%M:%f000', 'now'), state_type=?, run_count=?, start_time=? WHERE task_run.id = ?]
[parameters: ('RUNNING', 1, '2022-05-06 07:44:47.617121', 'f695b4a2-c515-4d75-93af-b75f6440d07f')]
(Background on this error at: <https://sqlalche.me/e/14/e3q8>)
The log worker will attempt to send these logs again in 2.0s
07:46:10.518 | INFO | Task run 'task-fc800755-3' - Crash detected! Execution was interrupted by an unexpected exception.
07:46:12.889 | INFO | Task run 'task-fc800755-2' - Finished in state Completed(None)
07:46:13.304 | INFO | Flow run 'scrupulous-rooster' - Finished in state Completed('All states completed.')
07:46:13.572 | ERROR | Flow run 'dark-booby' - Finished in state Failed('1/1 states failed.')
07:46:14.380 | ERROR | Flow run 'pragmatic-wren' - Finished in state Failed('2/4 states failed.')
I think i need to solve this before to go further but i will paste step 3 and 4 for after
3) gonna be :
import asyncio
from prefect import flow, task
@task
async def task(number: int):
print("Subflow {0} started!".format(number))
@flow
async def subflow(number):
task(number)
await asyncio.sleep(1)
@flow
async def main_flow():
parallel_subflows = [subflow(1), subflow(2), subflow(3), subflow(4)]
await asyncio.gather(*parallel_subflows)
if __name__ == "__main__":
main_flow_state = asyncio.run(main_flow())
4) gonna be :
import asyncio
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner
@task
async def task(number: int):
print("Subflow {0} started!".format(number))
@flow(task_runner=DaskTaskRunner(
cluster_class="dask_jobqueue.PBSCluster",
cluster_kwargs={
"cores": 1,
"memory": '24GB',
"queue": 'mpi_1',
"resource_spec": "select=1:ncpus=28:mem=120gb",
"walltime": '00:05:00',
"interface": 'ib0',
"n_workers": 1
}
))
async def subflow(number):
task(number)
await asyncio.sleep(1)
@flow
async def main_flow():
parallel_subflows = [subflow(1), subflow(2), subflow(3), subflow(4)]
await asyncio.gather(*parallel_subflows)
if __name__ == "__main__":
main_flow_state = asyncio.run(main_flow())
Zanie
Zanie
Thomas Mignon
05/09/2022, 9:13 AMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by