https://prefect.io logo
Title
l

Laraib Siddiqui

12/22/2022, 5:50 PM
When using
from prefect.tasks.shell import ShellTask
it throws out and error
ModuleNotFoundError: No module named 'prefect.tasks.shell'; 'prefect.tasks' is not a package
I know i can use
prefect-shell
to overcome this and run python scripts but i couldn't find any resolution on the above error
1
r

Ryan Peden

12/22/2022, 5:51 PM
ShellTask
is a Prefect v1 feature. It looks like you are using 2.x, which is why you are seeing that error.
l

Laraib Siddiqui

12/22/2022, 5:52 PM
Okay understood. So for now to run python scripts from bash prefect-shell is the way to go, right?
r

Ryan Peden

12/22/2022, 5:52 PM
That is correct 🙂
l

Laraib Siddiqui

12/22/2022, 5:53 PM
Thank you @Ryan Peden
r

Ryan Peden

12/22/2022, 5:56 PM
You're very welcome! If you have any questions about using
prefect-shell
, feel free to reply in this thread.
👍 1
l

Laraib Siddiqui

12/23/2022, 7:24 AM
Instead of running sub-flows for shell commands can we run these shell commands as task inside a main flow.
r

Ryan Peden

12/23/2022, 1:16 PM
Yes, you should be able to.
shell_run_command
from
prefect-shell
is a task, so you can call it several times inside a single flow.
l

Laraib Siddiqui

12/23/2022, 1:19 PM
@Ryan Peden As you can see above. Different sub flows are being created for those different shell_execution. Is it possible to have a flow and task system where i can run the main flow and see all the shell commands as task in that flow.
r

Ryan Peden

12/23/2022, 1:25 PM
Yes, that is possible. Do you have an example of the code that is causing those multiple subflow runs? That would give me a better understanding of the results you are seeing so I can provide suggestions.
l

Laraib Siddiqui

12/23/2022, 1:26 PM
from prefect import flow, task
from prefect_shell import shell_run_command


@flow
def first_shell_command():
    return shell_run_command(command="python2 /telecaller_leads.py ", return_all=True)


@flow
def second_shell_command():
    return shell_run_command(command="python2 /teleconsultation.py", return_all=True)

@flow
def third_shell_command():
    return shell_run_command(command="python2 /user_profiles_created_last_24_hours.py", return_all=True)

@flow
def third_shell_command():
    return shell_run_command(command="python2 /user_profiles_created_last_24_hours.py", return_all=True)

@flow
def fourth_shell_command():
    return shell_run_command(command="python2 /non_delivered_orders.py", return_all=True)


@flow
def final_shell_run():
     first_shell_command()
     second_shell_command()   
     third_shell_command()
     fourth_shell_command()

print(final_shell_run())
# print(example_shell_run_command_flow())
flow.run()
r

Ryan Peden

12/23/2022, 2:16 PM
If you remove the
@flow
decorators from the four shell command functions, they will run as tasks since
shell_run_command
is a task. This should work:
def first_shell_command():
    return shell_run_command(command="python2 /telecaller_leads.py ", return_all=True)


def second_shell_command():
    return shell_run_command(command="python2 /teleconsultation.py", return_all=True)


def third_shell_command():
    return shell_run_command(command="python2 /user_profiles_created_last_24_hours.py", return_all=True)


def fourth_shell_command():
    return shell_run_command(command="python2 /non_delivered_orders.py", return_all=True)


@flow
def final_shell_run():
    first_shell_command()
    second_shell_command()   
    third_shell_command()
    fourth_shell_command()

if __name__ == "__main__":
    final_shell_run()
The downside here is that the tasks won't have custom names; they would show up as
shell_run_command-(random id)
. If you want the tasks to be named, you can use `with_options`:
def first_shell_command():
    run = shell_run_command.with_options(name="Command 1")
    return run(command="python2 /telecaller_leads.py ", return_all=True)


def second_shell_command():
    run = shell_run_command.with_options(name="Command 2")
    return run(command="python2 /teleconsultation.py", return_all=True)


def third_shell_command():
    run = shell_run_command.with_options(name="Command 3")
    return run(command="python2 /user_profiles_created_last_24_hours.py", return_all=True)


def fourth_shell_command():
    run = shell_run_command.with_options(name="Command 4")
    return run(command="python2 /non_delivered_orders.py", return_all=True)
It's worth giving these a try to see if they do what you are looking for. If you want to run lots of named shell command tasks, I can probably suggest some patterns that would prevent the need to use two lines of code every time you run a command.
l

Laraib Siddiqui

12/23/2022, 5:08 PM
Thanks for the help.
How can i run these task parallel instead of sequential order, I am trying to use dask runner, as you mentioned above the shell_commands are task. How can i run these task parallely.
from prefect import flow, task
from prefect_shell import shell_run_command
from prefect_dask.task_runners import DaskTaskRunner


def tele_caller():
    return shell_run_command(
        command="python3 /data-analytics/telecaller_leads.py -r 0 ",
        return_all=True,
    )


def teleconsultation():
    return shell_run_command(
        command="python3 /data-analytics/teleconsultation.py -r 0  ",
        return_all=True,
    )


def user_profiles():
    return shell_run_command(
        command=
        "python3 /data-analytics/user_profiles_created_last_24_hours.py -r 0  ",
        return_all=True,
    )


def error_pages():
    return shell_run_command(
        command="python3 /data-analytics/error_pages_data.py -r 0  ",
        return_all=True,
    )


def non_del():
    return shell_run_command(
        command="python3 /data-analytics/non_delivered_orders.py -r 0  ",
        return_all=True,
    )


def non_ship():
    return shell_run_command(
        command="python3 /data-analytics/non_shipped_orders.py -r 0  ",
        return_all=True)


def gmd_review():
    return shell_run_command(
        command="python3 /data-analytics/good_md_content_data.py -r 0  ",
        return_all=True,
    )


def gmd_content():
    return shell_run_command(
        command="python3 /data-analytics/good_md_review_data.py -r 0  ",
        return_all=True)


@flow
def final_docker_run(task_runner=DaskTaskRunner()):
    user_profiles.submit()
    tele_caller.submit()
    teleconsultation.submit()
    non_del.submit()
    non_ship.submit()
    error_pages.submit()
    gmd_content.submit()
    gmd_review.submit()


if __name__ == '__main__':
    final_docker_run()
# print(final_shell_run())

# print(example_shell_run_command_flow())
Instead of defining flows as task. If i take subflow approach can i use the dasktaskrunner, and how do i schedule different sublows for different schedules. Should i keep different task which will at the same time in a same flow or is it possible to have a flow with task that can run at different time schedules.
from prefect import flow, task
from prefect_shell import shell_run_command
from prefect.orion.schemas.schedules import CronSchedule
import pendulum

daily_schedule = CronSchedule("0 17 * * *")

@flow
def first_shell_command(schedule=daily_schedule):
    return shell_run_command(command="python3 /data-analytics/telecaller_leads.py -r 0 ", return_all=True)

daily_schedule_2 = CronSchedule("0 17 * * *")

@flow
def second_shell_command(schedule=daily_schedule_2):
    return shell_run_command(command="python3 /data-analytics/teleconsultation.py -r 0", return_all=True)


@flow
def final_shell_run():
     first_shell_command()
     second_shell_command()   

if __name__ == '__main__':
    final_shell_run()
@Ryan Peden