Laraib Siddiqui
01/10/2023, 7:41 AMfrom prefect import flow, task
from prefect_shell import shell_run_command
from prefect_dask.task_runners import DaskTaskRunner
@task
def tele_caller():
return shell_run_command(
command="python3 /data-analytics/telecaller_leads.py -r 0 ",
return_all=True,
)
@task
def teleconsultation():
return shell_run_command(
command="python3 /data-analytics/teleconsultation.py -r 0 ",
return_all=True,
)
@task
def user_profiles():
return shell_run_command(
command=
"python3 /data-analytics/user_profiles_created_last_24_hours.py -r 0 ",
return_all=True,
)
@task
def error_pages():
return shell_run_command(
command="python3 /data-analytics/error_pages_data.py -r 0 ",
return_all=True,
)
@task
def non_del():
return shell_run_command(
command="python3 /data-analytics/non_delivered_orders.py -r 0 ",
return_all=True,
)
@task
def non_ship():
return shell_run_command(
command="python3 /data-analytics/non_shipped_orders.py -r 0 ",
return_all=True)
@task
def gmd_review():
return shell_run_command(
command="python3 /data-analytics/good_md_content_data.py -r 0 ",
return_all=True,
)
@task
def gmd_content():
return shell_run_command(
command="python3 /data-analytics/good_md_review_data.py -r 0 ",
return_all=True)
@flow
def final_docker_run(task_runner=DaskTaskRunner()):
user_profiles.submit()
tele_caller.submit()
teleconsultation.submit()
non_del.submit()
non_ship.submit()
error_pages.submit()
gmd_content.submit()
gmd_review.submit()
if __name__ == '__main__':
final_docker_run()
# print(final_shell_run())
# print(example_shell_run_command_flow())
Jeff Hale
01/10/2023, 1:10 PM@task
decorator.
So for example:
@task()
def tele_caller():
return shell_run_command(
command="python3 /data-analytics/telecaller_leads.py -r 0 ",
return_all=True,
)
Laraib Siddiqui
01/10/2023, 1:32 PMJeff Hale
01/10/2023, 2:07 PMLaraib Siddiqui
01/10/2023, 2:16 PMMaybe either run your agent in Docker with Prefect Cloud or run your Orion server and agent outside Prefect for an easier time getting started, too.I am running my agent in docker with prefect cloud only. I removed the dask runner, encountered the following error: RuntimeError: Tasks cannot be run from within tasks. Did you mean to call this task in a flow? The shell_command is already a task, when i put a task decorator it makes it a task inside, i suppose that's why the error. If i remove the task decorator, the functions doesn't have a submit attribute.
Jeff Hale
01/10/2023, 2:19 PMLaraib Siddiqui
01/10/2023, 2:29 PMJeff Hale
01/10/2023, 2:46 PMConcurrentTaskRunner
. Or firing up Dask or Ray to submit the tasks in parallel. Here are instructions for running on Dask: https://docs.prefect.io/concepts/task-runners/#running-tasks-on-daskLaraib Siddiqui
01/10/2023, 2:50 PM.submit()
to run your task with a task runner. Calling the task directly, without .submit()
, from within a flow will run the task sequentially instead of using a specified task runner.
I can't use .submti()
on these shell commands, as they are task themselves.
That's where the problem isJeff Hale
01/10/2023, 2:56 PMLaraib Siddiqui
01/10/2023, 2:58 PMJeff Hale
01/10/2023, 3:00 PMLaraib Siddiqui
01/10/2023, 3:03 PM