Hi guys, I'm trying to pass from a flow of 6 task to 6 flow of 1 task with daskExecutor
t
Hi guys, I'm trying to pass from a flow of 6 task to 6 flow of 1 task with daskExecutor
Here is what i have for the moment :
Copy code
@task
def semaphore_task(command, command_parameter):
    logger = get_run_logger()
    time.sleep(10)
    <http://logger.info|logger.info>("Inside task purify")

    workspace_path: Path = Path(
        "/home1/datawork/semexp/workspace/dagit/running/",
        '{0}-{1}'.format(
            command,
            os.getenv('PBS_JOBID')
        )
    )

    <http://logger.info|logger.info>('workspace_path: {0}'.format(workspace_path))

    template_runner.TemplateRunner(
        command=command,
        version="3.4.23",
        command_parameter=command_parameter,
        workspace=workspace_path,
        skip_signal=True,
        config_file=""
    ).run(exit=False)


@flow(task_runner=DaskTaskRunner(
    cluster_class="dask_jobqueue.PBSCluster",
    cluster_kwargs={
        "cores": 1,
        "memory": '24GB',
        "queue": 'mpi_1',
        "resource_spec": "select=1:ncpus=28:mem=120gb",
        "walltime": '00:05:00',
        "interface": 'ib0',
        "n_workers": 1
    }
))
def purify_flow():
    # semaphore_task(command, command_parameter)
    semaphore_task("purify_raw_logs", "/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/2022 eftp1 -f 2022-01-01-eftp1.gz")
    semaphore_task("purify_raw_logs", "/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp2/2022 eftp2 -f 2022-01-01-eftp2.gz")
    semaphore_task("purify_raw_logs", "/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublic/2022 vpublic -f 2022-01-01-vpublic.gz")
    semaphore_task("purify_raw_logs", "/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublicnew/2022 vpublicnew -f 2022-01-01-vpublicnew.gz")
    semaphore_task("purify_raw_logs", "/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp2/2022 ftp2 -f access.log.20220101.gz")
    semaphore_task("purify_raw_logs", "/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp/2022 ftp -f access.log.20220101.gz")


@flow
def main_flow():
    purify_flow()


if __name__ == "__main__":
    main_flow()
Main flow called another flow named purify_flow who call 6 times semaphore_task, this seems to be parralel BUT the qsub is done only one time ( on the flow ) and all the task are done inside this qsub which is not what i want, i want one qsub for each task, i've tried something like this too:
Copy code
import os
import time
from pathlib import Path

from prefect import flow, get_run_logger, task
from prefect.task_runners import DaskTaskRunner
from semaphore_audience.utils import template_runner


@task
def semaphore_task(command, command_parameter):
    logger = get_run_logger()
    time.sleep(10)
    <http://logger.info|logger.info>("Inside task purify")

    workspace_path: Path = Path(
        "/home1/datawork/semexp/workspace/dagit/running/",
        '{0}-{1}'.format(
            command,
            os.getenv('PBS_JOBID')
        )
    )

    <http://logger.info|logger.info>('workspace_path: {0}'.format(workspace_path))

    template_runner.TemplateRunner(
        command=command,
        version="3.4.23",
        command_parameter=command_parameter,
        workspace=workspace_path,
        skip_signal=True,
        config_file=""
    ).run(exit=False)


@flow(task_runner=DaskTaskRunner(
    cluster_class="dask_jobqueue.PBSCluster",
    cluster_kwargs={
        "cores": 1,
        "memory": '24GB',
        "queue": 'mpi_1',
        "resource_spec": "select=1:ncpus=28:mem=120gb",
        "walltime": '00:05:00',
        "interface": 'ib0',
        "n_workers": 1
    }
))
def purify_flow(command, command_parameter):
    semaphore_task(command, command_parameter)


@flow 
def main_flow():
    purify_flow(command="purify_raw_logs",
                   command_parameter="/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/2022 eftp1 -f 2022-01-01-eftp1.gz")
    purify_flow("purify_raw_logs",
                   "/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp2/2022 eftp2 -f 2022-01-01-eftp2.gz")
    purify_flow("purify_raw_logs",
                   "/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublic/2022 vpublic -f 2022-01-01-vpublic.gz")
    purify_flow("purify_raw_logs",
                   "/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublicnew/2022 vpublicnew -f 2022-01-01-vpublicnew.gz")
    purify_flow("purify_raw_logs",
                   "/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp2/2022 ftp2 -f access.log.20220101.gz")
    purify_flow("purify_raw_logs",
                   "/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp/2022 ftp -f access.log.20220101.gz")

if __name__ == "__main__":
    main_flow()
Main flow who call 6 times purify_flow who call the task in each flow seems to be what i want BUT the problem is that it is not parralel because the main_flow doesn't have the task_runner=DaskExecutor annotation, and i need to have the qsub done on purify_flow not on main_flow
a
can you try with a for loop instead within your flow?
Copy code
paths = ["/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/2022 eftp1 -f 2022-01-01-eftp1.gz", "...", ...]

for path in paths:
    semaphore_task("purify_raw_logs", path)
note that task runner is to run tasks in parallel
t
they are done in the same qsub one after another so it's sequential but they seems to be launched as parralell, as told in the logs
This is the code i've launched :
Copy code
import os
import time
from pathlib import Path

from prefect import flow, get_run_logger, task
from prefect.task_runners import DaskTaskRunner
from semaphore_audience.utils import template_runner


@task
def semaphore_task(command, command_parameter):
    logger = get_run_logger()
    time.sleep(10)
    <http://logger.info|logger.info>("Inside task purify")

    workspace_path: Path = Path(
        "/home1/datawork/semexp/workspace/dagit/running/",
        '{0}-{1}'.format(
            command,
            os.getenv('PBS_JOBID')
        )
    )

    <http://logger.info|logger.info>('workspace_path: {0}'.format(workspace_path))

    template_runner.TemplateRunner(
        command=command,
        version="3.4.23",
        command_parameter=command_parameter,
        workspace=workspace_path,
        skip_signal=True,
        config_file=""
    ).run(exit=False)


@flow(task_runner=DaskTaskRunner(
    cluster_class="dask_jobqueue.PBSCluster",
    cluster_kwargs={
        "cores": 1,
        "memory": '24GB',
        "queue": 'mpi_1',
        "resource_spec": "select=1:ncpus=28:mem=120gb",
        "walltime": '00:05:00',
        "interface": 'ib0',
        "n_workers": 1
    }
))
def purify_flow():
    # semaphore_task(command, command_parameter)
    paths = [
        '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/2022 eftp1 -f 2022-01-01-eftp1.gz',
        '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp2/2022 eftp2 -f 2022-01-01-eftp2.gz',
        '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublic/2022 vpublic -f 2022-01-01-vpublic.gz',
        '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublicnew/2022 vpublicnew -f 2022-01-01-vpublicnew.gz',
        '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp2/2022 ftp2 -f access.log.20220101.gz',
        '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp/2022 ftp -f access.log.20220101.gz'
    ]

    for path in paths:
        semaphore_task("purify_raw_logs", path)


@flow
def main_flow():
    purify_flow()


if __name__ == "__main__":
    main_flow()
And with this code :
Copy code
import os
import time
from pathlib import Path

from prefect import flow, get_run_logger, task
from prefect.task_runners import DaskTaskRunner
from semaphore_audience.utils import template_runner


@task
def semaphore_task(command, command_parameter):
    logger = get_run_logger()
    time.sleep(10)
    <http://logger.info|logger.info>("Inside task purify")

    workspace_path: Path = Path(
        "/home1/datawork/semexp/workspace/dagit/running/",
        '{0}-{1}'.format(
            command,
            os.getenv('PBS_JOBID')
        )
    )

    <http://logger.info|logger.info>('workspace_path: {0}'.format(workspace_path))

    template_runner.TemplateRunner(
        command=command,
        version="3.4.23",
        command_parameter=command_parameter,
        workspace=workspace_path,
        skip_signal=True,
        config_file=""
    ).run(exit=False)


@flow(task_runner=DaskTaskRunner(
    cluster_class="dask_jobqueue.PBSCluster",
    cluster_kwargs={
        "cores": 1,
        "memory": '24GB',
        "queue": 'mpi_1',
        "resource_spec": "select=1:ncpus=28:mem=120gb",
        "walltime": '00:05:00',
        "interface": 'ib0',
        "n_workers": 1
    }
))
def purify_flow(command, command_parameter):
    semaphore_task(command, command_parameter)


@flow 
def main_flow():
    paths = [
        '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/2022 eftp1 -f 2022-01-01-eftp1.gz',
        '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp2/2022 eftp2 -f 2022-01-01-eftp2.gz',
        '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublic/2022 vpublic -f 2022-01-01-vpublic.gz',
        '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublicnew/2022 vpublicnew -f 2022-01-01-vpublicnew.gz',
        '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp2/2022 ftp2 -f access.log.20220101.gz',
        '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp/2022 ftp -f access.log.20220101.gz'
    ]

    for path in paths:
        purify_flow("purify_raw_logs", path)

if __name__ == "__main__":
    main_flow()
It's sequential even if it's better because i have 6 * 1 qsub, but i need to do this as parallel
a
I'd recommend skipping this part:
Copy code
@flow
def main_flow():
    purify_flow()
It's sequential even if it's better because i have 6 * 1 qsub, but i need to do this as parallel
Since you said: it's better - Are you good now? Do you still need help with this? the main issue is that to run tasks in parallel, you must call tasks in a for loop (mapping support will be added, for now, you need a for loop) and assign a task runner e.g. with your Dask cluster configuration.
t
Ok , but if i stick with the code without this part, i'm currently having only one qsub, and there a 6 task to launch in this qsub who run sequentially even if i do a for loop If i keep this part and do :
Copy code
@flow(task_runner=DaskTaskRunner(
    cluster_class="dask_jobqueue.PBSCluster",
    cluster_kwargs={
        "cores": 1,
        "memory": '24GB',
        "queue": 'mpi_1',
        "resource_spec": "select=1:ncpus=28:mem=120gb",
        "walltime": '00:05:00',
        "interface": 'ib0',
        "n_workers": 1
    }
))
def purify_flow(command, command_parameter):
   semaphore_task(command, command_parameter)
   
@flow
def main_flow():
    paths = [
        '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/2022 eftp1 -f 2022-01-01-eftp1.gz',
        '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp2/2022 eftp2 -f 2022-01-01-eftp2.gz',
        '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublic/2022 vpublic -f 2022-01-01-vpublic.gz',
        '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublicnew/2022 vpublicnew -f 2022-01-01-vpublicnew.gz',
        '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp2/2022 ftp2 -f access.log.20220101.gz',
        '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp/2022 ftp -f access.log.20220101.gz'
    ]

    for path in paths:
        purify_flow("purify_raw_logs", path)
it's different its a flow of subflow, and it seems more close to what i want but it's sequential, i could put a DaskTaskRunner on the main flow to be parallel but it will create one parent qsub for the flow, and one child qsub for the subflow, i don't know if i'm clear .. And what i want is not not task ( if possible then great too but for the moment i have only one thing to do in task so parallel/sequential is the same)to be in parallel but subflows
a
we were discussing this before, but did you consider using ConcurrentTaskRunner instead of HPC and DaskTaskRunner for this task? Given it's purely IO operation, it would likely be faster and wouldn't suffer from this qsub complexity sorry that I can't help much with that but I haven't worked with such an HPC setup myself so hard to give more concrete advice
t
It's not purely IO operation, under the hood i'm reading file, doing transformation on spark dataframes then writing files, but this is not the problem happen even if i replace all my code with a log like that :
Copy code
import os
import time
from pathlib import Path

from prefect import flow, get_run_logger, task
from prefect.task_runners import DaskTaskRunner
from semaphore_audience.utils import template_runner


@task
def semaphore_task(command, command_parameter):
    logger = get_run_logger()
    time.sleep(10)
    <http://logger.info|logger.info>("Inside task purify")


@flow(task_runner=DaskTaskRunner(
    cluster_class="dask_jobqueue.PBSCluster",
    cluster_kwargs={
        "cores": 1,
        "memory": '2GB',
        "queue": 'sequentiel',
        "walltime": '00:10:00',
        "interface": 'ib0',
        "n_workers": 1
    }
))
def purify_flow(command, command_parameter):
    semaphore_task(command, command_parameter)


@flow 
def main_flow():
    paths = [
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/2022 eftp1 -f 2022-01-01-eftp1.gz'),
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp2/2022 eftp2 -f 2022-01-01-eftp2.gz'),
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublic/2022 vpublic -f 2022-01-01-vpublic.gz'),
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublicnew/2022 vpublicnew -f 2022-01-01-vpublicnew.gz'),
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp2/2022 ftp2 -f access.log.20220101.gz'),
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp/2022 ftp -f access.log.20220101.gz')
    ]

    for p in paths:
        p


if __name__ == "__main__":
   main_flow()
I will try to reexplain where am i, let me restart from the beggining =>
The need is : to run N subflows with a DaskTaskRunner and a PBS config from a flow in parralel, but when i mean in parralel here, i think we doesn't talk of the same thing, parallel is to have 6 qsub who each run a code in parralel not 6 code running in one qsub
What i achieved with very simple example are : 1. In this case, n ( 6 in my example) qsubs are done sequentially because one start then he finish, then the second start:
Copy code
import os
import time
from pathlib import Path

from prefect import flow, get_run_logger, task
from prefect.task_runners import DaskTaskRunner
from semaphore_audience.utils import template_runner


@task
def semaphore_task(command, command_parameter):
    logger = get_run_logger()
    time.sleep(10)
    <http://logger.info|logger.info>("Inside task purify")


@flow(task_runner=DaskTaskRunner(
    cluster_class="dask_jobqueue.PBSCluster",
    cluster_kwargs={
        "cores": 1,
        "memory": '2GB',
        "queue": 'sequentiel',
        "walltime": '00:10:00',
        "interface": 'ib0',
        "n_workers": 1
    }
))
def purify_flow(command, command_parameter):
    semaphore_task(command, command_parameter)


@flow 
def main_flow():
    paths = [
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/2022 eftp1 -f 2022-01-01-eftp1.gz'),
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp2/2022 eftp2 -f 2022-01-01-eftp2.gz'),
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublic/2022 vpublic -f 2022-01-01-vpublic.gz'),
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublicnew/2022 vpublicnew -f 2022-01-01-vpublicnew.gz'),
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp2/2022 ftp2 -f access.log.20220101.gz'),
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp/2022 ftp -f access.log.20220101.gz')
    ]

    for p in paths:
        p


if __name__ == "__main__":
   main_flow()
2. In this case, With an asynchronous approach, with only the DaskTaskRunnner, i'm getting 6 daskTaskRunner start really in parralel, so in the next case i will try to add the PBS config because if 6 dask cluster are started in parralel, why not the qsub can be done in each after ?
Copy code
import os
import time
import asyncio
from pathlib import Path

from prefect import flow, get_run_logger, task
from prefect.task_runners import DaskTaskRunner
from semaphore_audience.utils import template_runner


@task
async def semaphore_task(command, command_parameter):
    logger = get_run_logger()
    asyncio.sleep(10)
    <http://logger.info|logger.info>("Inside task purify")


@flow(task_runner=DaskTaskRunner())
async def purify_flow(command, command_parameter):
    await semaphore_task(command, command_parameter)


@flow 
async def main_flow():
    paths = [
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/2022 eftp1 -f 2022-01-01-eftp1.gz'),
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp2/2022 eftp2 -f 2022-01-01-eftp2.gz'),
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublic/2022 vpublic -f 2022-01-01-vpublic.gz'),
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublicnew/2022 vpublicnew -f 2022-01-01-vpublicnew.gz'),
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp2/2022 ftp2 -f access.log.20220101.gz'),
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp/2022 ftp -f access.log.20220101.gz')
    ]

    await asyncio.gather(*paths)

if __name__ == "__main__":
   asyncio.run(main_flow())
picture 2 3. Same as the previous case, with the PBS config in addition :
Copy code
import os
import time
import asyncio
from pathlib import Path

from prefect import flow, get_run_logger, task
from prefect.task_runners import DaskTaskRunner
from semaphore_audience.utils import template_runner


@task
async def semaphore_task(command, command_parameter):
    logger = get_run_logger()
    asyncio.sleep(10)
    <http://logger.info|logger.info>("Inside task purify")


@flow(task_runner=DaskTaskRunner(
    cluster_class="dask_jobqueue.PBSCluster",
    cluster_kwargs={
        "cores": 1,
        "memory": '24GB',
        "queue": 'sequentiel',
        "walltime": '00:05:00',
        "interface": 'ib0',
        "n_workers": 1
    }
))
async def purify_flow(command, command_parameter):
    await semaphore_task(command, command_parameter)


@flow 
async def main_flow():
    paths = [
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp1/2022 eftp1 -f 2022-01-01-eftp1.gz'),
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/eftp2/2022 eftp2 -f 2022-01-01-eftp2.gz'),
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublic/2022 vpublic -f 2022-01-01-vpublic.gz'),
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/http/vpublicnew/2022 vpublicnew -f 2022-01-01-vpublicnew.gz'),
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp2/2022 ftp2 -f access.log.20220101.gz'),
        purify_flow("purify_raw_logs", '/home/datawork-semaphore-exp/spool/input/ftp-sync-isi/ftp/ftp/2022 ftp -f access.log.20220101.gz')
    ]

    await asyncio.gather(*paths)

if __name__ == "__main__":
   asyncio.run(main_flow())
picture 1 Dask are started sequentially, i just add the PBS config, without that, they would be started in parallel Here is where i'm standing now, with multiple version of things that "works" but they don't fill my need, one time it's sequential, the other time with an async code it's parallel but without the config, and with the config its again sequential I hope
I hope you will understand my problem better 😅
a
actually, you confused me even more 😄 are you saying you are running Spark code within Dask on an HPC cluster?
t
Yes ahah, it seems strange ? 🙈
it's python code, who use pandas/spark inside
but the problem is not here because in my example i don't call any python code, so we can forgot this 😛
a
I have two options for you: 1. Are you 100% decided about the technology choices here and want to only get help in figuring out this qsub/parallelism problem? 2. Or would you be open to discussing the problem you try to solve and figuring out the right approach (forgetting about qsub, Dask, Spark and async for a moment)?
I could help you with #2, for #1 I would need to get help from someone in the team who knows HPC and distributed more
t
unfortunately I need help on the first one, because i have to launch my code on a HPC with parallel qsub for performance issue, maybe i can create a github issue in order to be more easier for someone of your team who knows HPC to help me ?
a
GitHub issues are for issues with Prefect and your issue is with Dask - I'll ask the team. You would likely get better support asking about this on Dask Discourse
t
I have a doubt that the problem is in Dask, because maybe this is the way prefect handle multiple parrarel subflows with a dask task runner via the for that create the problem, not dask himself, but i'm not sure of what i'm talking here tell me when you will have some news from your team :)
a
To run multiple subflows in parallel, you would currently need to use async as shown here Then, each of your subflows may have its own DaskTaskRunner configuration as you need it for your PBSCluster
Just for your understanding: Flows block execution while tasks are run in the background. Task run behavior with respect to concurrency and parallelism are controlled by the task runner assigned to the
@flow
decorator. When you call: • a flow, it will not return until its final state is reached (blocked) • a task, it returns a future immediately and the return value can be retrieved later.
z
We’ll provide support for parallel subflows in the future, but as Anna has noted you’ll need to use async and
gather
to do so at this time.
🙏 1
t
I agree that async and gather is the way to go for parallel subflows, but unfortunately this is where i have my issue Maybe can we go step by step on my example 1. flow and subflows 2. flow and a generic subflow / 2 Bis) flow and subflows with a task 3. flow and a generic subflow who call a task 4. flow and a generic subflow with DaskTaskRunner and PBS config who call a task ( my goal ) So, the 1) is okay
Copy code
import asyncio
from prefect import flow


@flow
async def subflow_1():
    print("Subflow 1 started!")
    await asyncio.sleep(1)

@flow
async def subflow_2():
    print("Subflow 2 started!")
    await asyncio.sleep(1)

@flow
async def subflow_3():
    print("Subflow 3 started!")
    await asyncio.sleep(1)

@flow
async def subflow_4():
    print("Subflow 4 started!")
    await asyncio.sleep(1)

@flow
async def main_flow():
    parallel_subflows = [subflow_1(), subflow_2(), subflow_3(), subflow_4()]
    await asyncio.gather(*parallel_subflows)

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
Output is perfect
Copy code
06:26:09.801 | INFO    | prefect.engine - Created flow run 'victorious-longhorn' for flow 'main-flow'
06:26:09.802 | INFO    | Flow run 'victorious-longhorn' - Using task runner 'ConcurrentTaskRunner'
06:26:46.125 | INFO    | Flow run 'victorious-longhorn' - Created subflow run 'uppish-newt' for flow 'subflow-2'
06:26:46.382 | INFO    | Flow run 'victorious-longhorn' - Created subflow run 'golden-orca' for flow 'subflow-1'
06:26:46.694 | INFO    | Flow run 'victorious-longhorn' - Created subflow run 'omniscient-tamarin' for flow 'subflow-4'
Subflow 1 started!
Subflow 4 started!
06:26:47.194 | INFO    | Flow run 'victorious-longhorn' - Created subflow run 'amiable-pillbug' for flow 'subflow-3'
Subflow 3 started!
Subflow 2 started!
06:26:48.955 | INFO    | Flow run 'amiable-pillbug' - Finished in state Completed(None)
06:26:49.093 | INFO    | Flow run 'omniscient-tamarin' - Finished in state Completed(None)
06:26:49.186 | INFO    | Flow run 'uppish-newt' - Finished in state Completed(None)
06:26:49.421 | INFO    | Flow run 'golden-orca' - Finished in state Completed(None)
06:26:50.218 | INFO    | Flow run 'victorious-longhorn' - Finished in state Completed('All states completed.')
Let's go to the 2) If i'm adding a generic method subflow who can accept a parameter
Copy code
import asyncio
from prefect import flow, task
   
@flow
async def subflow(number):
    print("Subflow {0} started!".format(number))
    await asyncio.sleep(1)

@flow
async def main_flow():
    parallel_subflows = [subflow(1), subflow(2), subflow(3), subflow(4)]
    await asyncio.gather(*parallel_subflows)

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
Output has error, only 2 print are here
Copy code
06:40:37.149 | INFO    | prefect.engine - Created flow run 'private-harrier' for flow 'main-flow'
06:40:37.150 | INFO    | Flow run 'private-harrier' - Using task runner 'ConcurrentTaskRunner'
06:40:37.942 | INFO    | Flow run 'private-harrier' - Created subflow run 'imported-dodo' for flow 'subflow'
Subflow 3 started!
06:40:38.458 | INFO    | Flow run 'private-harrier' - Created subflow run 'speedy-lionfish' for flow 'subflow'
06:40:38.460 | ERROR   | Flow run 'speedy-lionfish' - Crash detected! Execution was interrupted by an unexpected exception.
06:40:38.523 | ERROR   | Flow run 'private-harrier' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/engine.py", line 454, in orchestrate_flow_run
    result = await flow_call()
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect_graph/hello_world_simple_purify.py", line 18, in main_flow
    await asyncio.gather(*parallel_subflows)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/client.py", line 82, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/engine.py", line 367, in create_and_begin_subflow_run
    task_runner = await stack.enter_async_context(flow.task_runner.start())
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/contextlib.py", line 575, in enter_async_context
    result = await _cm_type.__aenter__(cm)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/contextlib.py", line 181, in __aenter__
    return await self.gen.__anext__()
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/prefect/task_runners.py", line 148, in start
    raise RuntimeError("The task runner is already started!")
RuntimeError: The task runner is already started!
06:40:39.690 | INFO    | Flow run 'imported-dodo' - Finished in state Completed(None)
06:40:39.914 | ERROR   | Flow run 'private-harrier' - Finished in state Failed('Flow run encountered an exception.')
06:40:40.146 | INFO    | Flow run 'private-harrier' - Created subflow run 'proficient-cassowary' for flow 'subflow'
Subflow 2 started!
06:40:40.545 | INFO    | Flow run 'private-harrier' - Created subflow run 'silver-lemur' for flow 'subflow'
06:40:40.547 | ERROR   | Flow run 'silver-lemur' - Crash detected! Execution was interrupted by an unexpected exception.
06:40:41.841 | INFO    | Flow run 'proficient-cassowary' - Finished in state Completed(None)
Only 2 have correctly started 2 Bis) Flow with subflows and task
Copy code
import asyncio
from prefect import flow, task

@task
async def task(number: int):
    print("Subflow {0} started!".format(number))
    await asyncio.sleep(1)
    

@flow
async def subflow_1():
    await task(1)

@flow
async def subflow_2():
    await task(2)

@flow
async def subflow_3():
    await task(3)

@flow
async def subflow_4():
    await task(4)

@flow
async def main_flow():
    parallel_subflows = [subflow_1(), subflow_2(), subflow_3(), subflow_4()]
    await asyncio.gather(*parallel_subflows)

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
Here the chaotic output a little bit truncated but you can retry all the example locally 🙂
Copy code
[SQL: UPDATE task_run SET updated=strftime('%Y-%m-%d %H:%M:%f000', 'now'), state_type=?, run_count=?, start_time=? WHERE task_run.id = ?]
[parameters: ('RUNNING', 1, '2022-05-06 07:42:52.332058', '1c75de30-5752-487f-a8ef-3224fc55db89')]
(Background on this error at: <https://sqlalche.me/e/14/e3q8>)
07:44:00.389 | INFO    | Task run 'task-fc800755-0' - Crash detected! Execution was interrupted by an unexpected exception.
07:44:02.844 | INFO    | Task run 'task-fc800755-1' - Finished in state Completed(None)
07:44:03.360 | INFO    | Flow run 'scrupulous-rooster' - Created task run 'task-fc800755-2' for task 'task'
07:44:46.979 | INFO    | Flow run 'dark-booby' - Created task run 'task-fc800755-3' for task 'task'
Subflow 4 started!
07:45:31.936 | ERROR   | Flow run 'adaptable-leech' - Finished in state Failed('1/1 states failed.')
07:45:31.032 | ERROR   | prefect.orion - Encountered exception in request:
Traceback (most recent call last):
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1819, in _execute_context
    self.dialect.do_execute(
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 100, in execute
    self._adapt_connection._handle_exception(error)
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 229, in _handle_exception
    raise error
    result = function()
sqlite3.OperationalError: database is locked

The above exception was the direct cause of the following exception:

Traceback (most recent call last):flow-4'", '2022-05-06 07:43:18.884574')]
(Background on this error at: <https://sqlalche.me/e/14/e3q8>)
Worker information:
    Approximate queue length: 4
    Pending log batch length: 3
    Pending log batch size: 1080
07:46:10.516 | ERROR   | prefect.orion - Encountered exception in request:
Traceback (most recent call last):
  File "/home/datawork-semaphore-exp/conda/conda_envs/prefect/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1819, in _execute_context
    self.dialect.do_execute(
...
    result = function()
sqlalchemy.exc.OperationalError: (raised as a result of Query-invoked autoflush; consider using a session.no_autoflush block if this flush is occurring prematurely)
(sqlite3.OperationalError) database is locked
[SQL: UPDATE task_run SET updated=strftime('%Y-%m-%d %H:%M:%f000', 'now'), state_type=?, run_count=?, start_time=? WHERE task_run.id = ?]
[parameters: ('RUNNING', 1, '2022-05-06 07:44:47.617121', 'f695b4a2-c515-4d75-93af-b75f6440d07f')]
(Background on this error at: <https://sqlalche.me/e/14/e3q8>)
The log worker will attempt to send these logs again in 2.0s
07:46:10.518 | INFO    | Task run 'task-fc800755-3' - Crash detected! Execution was interrupted by an unexpected exception.
07:46:12.889 | INFO    | Task run 'task-fc800755-2' - Finished in state Completed(None)
07:46:13.304 | INFO    | Flow run 'scrupulous-rooster' - Finished in state Completed('All states completed.')
07:46:13.572 | ERROR   | Flow run 'dark-booby' - Finished in state Failed('1/1 states failed.')
07:46:14.380 | ERROR   | Flow run 'pragmatic-wren' - Finished in state Failed('2/4 states failed.')
I think i need to solve this before to go further but i will paste step 3 and 4 for after 3) gonna be :
Copy code
import asyncio
from prefect import flow, task

@task
async def task(number: int):
    print("Subflow {0} started!".format(number))
   
@flow
async def subflow(number):
    task(number)
    await asyncio.sleep(1)

@flow
async def main_flow():
    parallel_subflows = [subflow(1), subflow(2), subflow(3), subflow(4)]
    await asyncio.gather(*parallel_subflows)

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
4) gonna be :
Copy code
import asyncio
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner

@task
async def task(number: int):
    print("Subflow {0} started!".format(number))
   
@flow(task_runner=DaskTaskRunner(
    cluster_class="dask_jobqueue.PBSCluster",
    cluster_kwargs={
        "cores": 1,
        "memory": '24GB',
        "queue": 'mpi_1',
        "resource_spec": "select=1:ncpus=28:mem=120gb",
        "walltime": '00:05:00',
        "interface": 'ib0',
        "n_workers": 1
    }
))
async def subflow(number):
    task(number)
    await asyncio.sleep(1)

@flow
async def main_flow():
    parallel_subflows = [subflow(1), subflow(2), subflow(3), subflow(4)]
    await asyncio.gather(*parallel_subflows)

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
z
Ah running the same flow concurrently with itself is not supported right now, you’ll run into access errors like that.
👍 1
If each is a separate flow instance, it would work.
t
Let me know when concurrent flow will be available 🙂 It will be a great feature for our project, but for the moment i will have my flow entirely sequential even if my need was to parellelize some steps