Laraib Siddiqui
12/22/2022, 5:50 PMfrom prefect.tasks.shell import ShellTask
it throws out and error
ModuleNotFoundError: No module named 'prefect.tasks.shell'; 'prefect.tasks' is not a package
I know i can use prefect-shell
to overcome this and run python scripts but i couldn't find any resolution on the above errorRyan Peden
12/22/2022, 5:51 PMShellTask
is a Prefect v1 feature. It looks like you are using 2.x, which is why you are seeing that error.Laraib Siddiqui
12/22/2022, 5:52 PMRyan Peden
12/22/2022, 5:52 PMLaraib Siddiqui
12/22/2022, 5:53 PMRyan Peden
12/22/2022, 5:56 PMprefect-shell
, feel free to reply in this thread.Laraib Siddiqui
12/23/2022, 7:24 AMRyan Peden
12/23/2022, 1:16 PMshell_run_command
from prefect-shell
is a task, so you can call it several times inside a single flow.Laraib Siddiqui
12/23/2022, 1:19 PMRyan Peden
12/23/2022, 1:25 PMLaraib Siddiqui
12/23/2022, 1:26 PMfrom prefect import flow, task
from prefect_shell import shell_run_command
@flow
def first_shell_command():
return shell_run_command(command="python2 /telecaller_leads.py ", return_all=True)
@flow
def second_shell_command():
return shell_run_command(command="python2 /teleconsultation.py", return_all=True)
@flow
def third_shell_command():
return shell_run_command(command="python2 /user_profiles_created_last_24_hours.py", return_all=True)
@flow
def third_shell_command():
return shell_run_command(command="python2 /user_profiles_created_last_24_hours.py", return_all=True)
@flow
def fourth_shell_command():
return shell_run_command(command="python2 /non_delivered_orders.py", return_all=True)
@flow
def final_shell_run():
first_shell_command()
second_shell_command()
third_shell_command()
fourth_shell_command()
print(final_shell_run())
# print(example_shell_run_command_flow())
flow.run()
Ryan Peden
12/23/2022, 2:16 PM@flow
decorators from the four shell command functions, they will run as tasks since shell_run_command
is a task.
This should work:
def first_shell_command():
return shell_run_command(command="python2 /telecaller_leads.py ", return_all=True)
def second_shell_command():
return shell_run_command(command="python2 /teleconsultation.py", return_all=True)
def third_shell_command():
return shell_run_command(command="python2 /user_profiles_created_last_24_hours.py", return_all=True)
def fourth_shell_command():
return shell_run_command(command="python2 /non_delivered_orders.py", return_all=True)
@flow
def final_shell_run():
first_shell_command()
second_shell_command()
third_shell_command()
fourth_shell_command()
if __name__ == "__main__":
final_shell_run()
The downside here is that the tasks won't have custom names; they would show up as shell_run_command-(random id)
.
If you want the tasks to be named, you can use `with_options`:
def first_shell_command():
run = shell_run_command.with_options(name="Command 1")
return run(command="python2 /telecaller_leads.py ", return_all=True)
def second_shell_command():
run = shell_run_command.with_options(name="Command 2")
return run(command="python2 /teleconsultation.py", return_all=True)
def third_shell_command():
run = shell_run_command.with_options(name="Command 3")
return run(command="python2 /user_profiles_created_last_24_hours.py", return_all=True)
def fourth_shell_command():
run = shell_run_command.with_options(name="Command 4")
return run(command="python2 /non_delivered_orders.py", return_all=True)
It's worth giving these a try to see if they do what you are looking for.
If you want to run lots of named shell command tasks, I can probably suggest some patterns that would prevent the need to use two lines of code every time you run a command.Laraib Siddiqui
12/23/2022, 5:08 PMfrom prefect import flow, task
from prefect_shell import shell_run_command
from prefect_dask.task_runners import DaskTaskRunner
def tele_caller():
return shell_run_command(
command="python3 /data-analytics/telecaller_leads.py -r 0 ",
return_all=True,
)
def teleconsultation():
return shell_run_command(
command="python3 /data-analytics/teleconsultation.py -r 0 ",
return_all=True,
)
def user_profiles():
return shell_run_command(
command=
"python3 /data-analytics/user_profiles_created_last_24_hours.py -r 0 ",
return_all=True,
)
def error_pages():
return shell_run_command(
command="python3 /data-analytics/error_pages_data.py -r 0 ",
return_all=True,
)
def non_del():
return shell_run_command(
command="python3 /data-analytics/non_delivered_orders.py -r 0 ",
return_all=True,
)
def non_ship():
return shell_run_command(
command="python3 /data-analytics/non_shipped_orders.py -r 0 ",
return_all=True)
def gmd_review():
return shell_run_command(
command="python3 /data-analytics/good_md_content_data.py -r 0 ",
return_all=True,
)
def gmd_content():
return shell_run_command(
command="python3 /data-analytics/good_md_review_data.py -r 0 ",
return_all=True)
@flow
def final_docker_run(task_runner=DaskTaskRunner()):
user_profiles.submit()
tele_caller.submit()
teleconsultation.submit()
non_del.submit()
non_ship.submit()
error_pages.submit()
gmd_content.submit()
gmd_review.submit()
if __name__ == '__main__':
final_docker_run()
# print(final_shell_run())
# print(example_shell_run_command_flow())
from prefect import flow, task
from prefect_shell import shell_run_command
from prefect.orion.schemas.schedules import CronSchedule
import pendulum
daily_schedule = CronSchedule("0 17 * * *")
@flow
def first_shell_command(schedule=daily_schedule):
return shell_run_command(command="python3 /data-analytics/telecaller_leads.py -r 0 ", return_all=True)
daily_schedule_2 = CronSchedule("0 17 * * *")
@flow
def second_shell_command(schedule=daily_schedule_2):
return shell_run_command(command="python3 /data-analytics/teleconsultation.py -r 0", return_all=True)
@flow
def final_shell_run():
first_shell_command()
second_shell_command()
if __name__ == '__main__':
final_shell_run()