https://prefect.io logo
Title
l

Laraib Siddiqui

01/10/2023, 7:41 AM
How can i run these task parallel instead of sequential order, I am trying to use dask runner, as you mentioned above the shell_commands are task. How can i run these task parallely.
from prefect import flow, task
from prefect_shell import shell_run_command
from prefect_dask.task_runners import DaskTaskRunner

@task
def tele_caller():
    return shell_run_command(
        command="python3 /data-analytics/telecaller_leads.py -r 0 ",
        return_all=True,
    )

@task
def teleconsultation():
    return shell_run_command(
        command="python3 /data-analytics/teleconsultation.py -r 0  ",
        return_all=True,
    )

@task
def user_profiles():
    return shell_run_command(
        command=
        "python3 /data-analytics/user_profiles_created_last_24_hours.py -r 0  ",
        return_all=True,
    )

@task
def error_pages():
    return shell_run_command(
        command="python3 /data-analytics/error_pages_data.py -r 0  ",
        return_all=True,
    )

@task
def non_del():
    return shell_run_command(
        command="python3 /data-analytics/non_delivered_orders.py -r 0  ",
        return_all=True,
    )

@task
def non_ship():
    return shell_run_command(
        command="python3 /data-analytics/non_shipped_orders.py -r 0  ",
        return_all=True)

@task
def gmd_review():
    return shell_run_command(
        command="python3 /data-analytics/good_md_content_data.py -r 0  ",
        return_all=True,
    )

@task
def gmd_content():
    return shell_run_command(
        command="python3 /data-analytics/good_md_review_data.py -r 0  ",
        return_all=True)


@flow
def final_docker_run(task_runner=DaskTaskRunner()):
    user_profiles.submit()
    tele_caller.submit()
    teleconsultation.submit()
    non_del.submit()
    non_ship.submit()
    error_pages.submit()
    gmd_content.submit()
    gmd_review.submit()


if __name__ == '__main__':
    final_docker_run()
# print(final_shell_run())

# print(example_shell_run_command_flow())
âś… 1
j

Jeff Hale

01/10/2023, 1:10 PM
Hi Laraib. You need to make each of your functions into tasks by adding an
@task
decorator. So for example:
@task()
def tele_caller():
    return shell_run_command(
        command="python3 /data-analytics/telecaller_leads.py -r 0 ",
        return_all=True,
    )
Feel free to move your code into this thread to keep the channel tidy. 🙂
l

Laraib Siddiqui

01/10/2023, 1:32 PM
ValueError: No global client found and no address provided I am getting the following error. I haven't worked with dask previously. I have updated my code above. Do i have to run dask-worker and dask-scheduler separately.
To give you an idea. I have setup prefect inside a docker on my server. I am running one agent inside the docker screen simultaneously.
j

Jeff Hale

01/10/2023, 2:07 PM
To reduce complexity as you are getting set up, I suggest running without Dask first. With submit statements and no task runner specified you will be using the concurrent task runner. Maybe either run your agent in Docker with Prefect Cloud or run your Orion server and agent outside Prefect for an easier time getting started, too.
l

Laraib Siddiqui

01/10/2023, 2:16 PM
Maybe either run your agent in Docker with Prefect Cloud or run your Orion server and agent outside Prefect for an easier time getting started, too.
I am running my agent in docker with prefect cloud only. I removed the dask runner, encountered the following error: RuntimeError: Tasks cannot be run from within tasks. Did you mean to call this task in a flow? The shell_command is already a task, when i put a task decorator it makes it a task inside, i suppose that's why the error. If i remove the task decorator, the functions doesn't have a submit attribute.
j

Jeff Hale

01/10/2023, 2:19 PM
Ok. Yes. You could just put the shell command in the main flow.
l

Laraib Siddiqui

01/10/2023, 2:29 PM
I tried your approach of calling the shell_commands inside the flow, the still ran sequentially. Is there any more efficient way to implement this?
j

Jeff Hale

01/10/2023, 2:46 PM
Ok. I just wanted to make sure they were running first. Looking at this more closely - it appears you’re running a bunch of Python scripts. You could make those scripts into tasks, import the task functions, and run them concurrently by submitting the tasks to the
ConcurrentTaskRunner
. Or firing up Dask or Ray to submit the tasks in parallel. Here are instructions for running on Dask: https://docs.prefect.io/concepts/task-runners/#running-tasks-on-dask
l

Laraib Siddiqui

01/10/2023, 2:50 PM
Yeah, they are running fine. I have a daily batch running, I just wanted to move from them running sequentially to parallel.
Make sure you use
.submit()
to run your task with a task runner. Calling the task directly, without
.submit()
, from within a flow will run the task sequentially instead of using a specified task runner. I can't use
.submti()
on these shell commands, as they are task themselves. That's where the problem is
j

Jeff Hale

01/10/2023, 2:56 PM
That they are running now sequentially is helpful context. I’m suggesting that instead of running Python scripts via shell commands you decorate the Python functions in the scripts as Prefect tasks and then import them into your main flow file and submit them to ConcurrentTask runner (or Dask or Ray)
l

Laraib Siddiqui

01/10/2023, 2:58 PM
Okay, I can do that, but there is nothing to do with running shell commands concurrently?
j

Jeff Hale

01/10/2023, 3:00 PM
Maybe you could put the function calls directly in the main flow and use Python’s native async/await for concurrency, but that wouldn’t get you parallelism. I wouldn’t go that direction, personally.
l

Laraib Siddiqui

01/10/2023, 3:03 PM
Okay, thanks for the help.