Mahesh
03/26/2021, 10:23 AMMahesh
03/26/2021, 10:24 AMimport re
import sys
from datetime import datetime
import subprocess
import os
import prefect
from prefect import Task
from prefect import task, Flow, Parameter
from datetime import timedelta
class Snowsql(Task):
def __init__(self, filename):
self.filename = filename
super().__init__(
name=filename, max_retries=3, retry_delay=timedelta(seconds=30)
)
def run(self):
#SnowSQL.py script take sql file as argument.
file_path="/path/to/my/SnowSQL.py"
data = subprocess.Popen(['/prefect_env/bin/python', file_path, self.filename], stdout = subprocess.PIPE)
data.communicate()
print ("Executing Snowflake sql procedure {}".format(self.filename))
sql_list=['1.sql', '2.sql', '3.sql', '4.sql']
flow = Flow(name='snowflake-sql')
ORDER = None
for statement in sql_list:
task = Snowsql(statement)
if not ORDER:
flow.add_task(task)
else:
flow.set_dependencies(task=task, upstream_tasks=[ORDER])
ORDER = task
flow.register(project_name="example")
flow.run()
Kevin Kho
Kevin Kho
Mahesh
03/26/2021, 1:42 PMMahesh
03/26/2021, 1:45 PMKevin Kho
Kevin Kho
Mahesh
03/26/2021, 1:49 PMKevin Kho
Kevin Kho
Kevin Kho
subprocess.Popen
call? https://docs.prefect.io/api/latest/tasks/shell.html#shelltaskKevin Kho
Mahesh
03/26/2021, 2:11 PMMahesh
03/26/2021, 4:10 PMfrom datetime import datetime
import prefect
from prefect import Task
from prefect import task, Flow, Parameter
from datetime import timedelta
from prefect.tasks.shell import ShellTask
class SnowShell(Task):
def __init__(self, filename):
self.filename = filename
super().__init__(
name=filename, max_retries=3, retry_delay=timedelta(seconds=30)
)
def run(self):
exec_task = ShellTask(helper_script="cd /opt/prefect_env/sql_list", shell="bash", log_stderr=True, return_all=True, stream_output=True)
try:
exec_task(command='/opt/prefect_env/bin/python /path/to//SnowSQL.py {}'.format(self.filename))
except Exception as some_error:
raise some_error
flow = Flow(name='snowflake-shell')
sql_list=['1.sql', '2.sql', '3.sql']
ORDER = None
for filename in sql_list:
task = SnowShell(filename)
if not ORDER:
flow.add_task(task)
else:
flow.set_dependencies(task=task, upstream_tasks=[ORDER])
ORDER = task
flow.register(project_name="Example")
flow.run()
Mahesh
03/26/2021, 4:11 PMraise ValueError("Could not infer an active Flow context.")
Mahesh
03/26/2021, 4:12 PMKevin Kho
Kevin Kho
from prefect import Flow
from prefect.tasks.shell import ShellTask
sql_list = ["1.sql", "2.sql", "3.sql"]
shell_task = ShellTask(
helper_script="",
shell="bash",
log_stderr=True,
return_all=True,
stream_output=True,
)
with Flow(name="Example") as flow:
tasks = [
shell_task(
command="/opt/prefect_env/bin/python /path/to//SnowSQL.py {}".format(
statement
)
)
for statement in sql_list
]
for i in range(1, len(tasks)):
tasks[i].set_upstream(tasks[i - 1])
flow.run()
Kevin Kho
Mahesh
03/29/2021, 5:03 AMKevin Kho