Hello Team, I am trying to execute below flow, It ...
# ask-community
m
Hello Team, I am trying to execute below flow, It is working fine with flow.run(). But when i trigger this same flow with quick run in prefect UI with local agent, It is getting succeeded with in a few seconds but queries are not submitted with snowflake account.
Copy code
import re
import sys
from datetime import datetime
import subprocess
import os
import prefect
from prefect import Task
from prefect import task, Flow, Parameter
from datetime import timedelta
class Snowsql(Task):
    def __init__(self, filename):
        self.filename = filename
        super().__init__(
            name=filename, max_retries=3, retry_delay=timedelta(seconds=30)
        )


    def run(self):
	    #SnowSQL.py script take sql file as argument.
        file_path="/path/to/my/SnowSQL.py"
        data = subprocess.Popen(['/prefect_env/bin/python', file_path, self.filename], stdout = subprocess.PIPE)
        data.communicate()
        print ("Executing Snowflake sql procedure {}".format(self.filename))

sql_list=['1.sql', '2.sql', '3.sql', '4.sql']

flow = Flow(name='snowflake-sql')
ORDER = None
for statement in sql_list:
    task = Snowsql(statement)
    if not ORDER:
        flow.add_task(task)
    else:
        flow.set_dependencies(task=task, upstream_tasks=[ORDER])
    ORDER = task
flow.register(project_name="example")
flow.run()
k
Hey Mahesh, is SnowSQL.py the file that contains the credentials?
Also, the SnowflakeQuery task has been updated by the way:. Not released yet but maybe you can copy it?https://github.com/PrefectHQ/prefect/pull/4293
m
yes @Kevin Kho, SnowSql.py contains credentials, we are using aws secret manager to store those credentials.
I copied the latest snowflake fix, But we are able to pass one query at a time with SnowflakeQuery. Our requirement is to call snowflake procedure sql file.
k
I see. So you have other stuff going on besides querying?
I’ll look into this
m
thanks kevin. do you think, snowflake credentials causes problem? If so is there any way to fix this for now?
k
To check if your credentials are the problem, you can try the updated task and one query first and see if that works.
But the task should fail. Does it work with flow.run()? Would you be able to show the content of SnowSQL.py or make a similar script so I get a better idea for the content?
Have you tried the ShellTask instead of the
subprocess.Popen
call? https://docs.prefect.io/api/latest/tasks/shell.html#shelltask
It seems the issue here is that these subprocess.Popen calls will be labelled as SUCCESS even if they didn’t work. I suggest using the ShellTask instead because it will handle the state for you. The current code will be a SUCCESS even if it fails.
m
thanks kevin!..Will try with shelltask and let you know soon
Hi @Kevin Kho,
Copy code
from datetime import datetime
import prefect
from prefect import Task
from prefect import task, Flow, Parameter
from datetime import timedelta
from prefect.tasks.shell import ShellTask

class SnowShell(Task):
    def __init__(self, filename):
        self.filename = filename
        super().__init__(
            name=filename, max_retries=3, retry_delay=timedelta(seconds=30)
        )


    def run(self):
        exec_task = ShellTask(helper_script="cd /opt/prefect_env/sql_list", shell="bash", log_stderr=True, return_all=True, stream_output=True)
        try:
            exec_task(command='/opt/prefect_env/bin/python /path/to//SnowSQL.py {}'.format(self.filename))
        except Exception as some_error:
            raise some_error



flow = Flow(name='snowflake-shell')
sql_list=['1.sql', '2.sql', '3.sql']
ORDER = None
for filename in sql_list:
    task = SnowShell(filename)
    if not ORDER:
        flow.add_task(task)
    else:
        flow.set_dependencies(task=task, upstream_tasks=[ORDER])
    ORDER = task

flow.register(project_name="Example")
flow.run()
now am getting
raise ValueError("Could not infer an active Flow context.")
am i doing anything wrong here @Kevin Kho?
k
let me try to get a working example for you
@Mahesh
Copy code
from prefect import Flow
from prefect.tasks.shell import ShellTask

sql_list = ["1.sql", "2.sql", "3.sql"]
shell_task = ShellTask(
    helper_script="",
    shell="bash",
    log_stderr=True,
    return_all=True,
    stream_output=True,
)
with Flow(name="Example") as flow:
    tasks = [
        shell_task(
            command="/opt/prefect_env/bin/python /path/to//SnowSQL.py {}".format(
                statement
            )
        )
        for statement in sql_list
    ]
    for i in range(1, len(tasks)):
        tasks[i].set_upstream(tasks[i - 1])
flow.run()
One of the current problems with your code is the Task within a Task. You need to call task.run() to get the output if you’re gonna do that.
m
Thank you @Kevin Kho, It works!! 🙂 But problem is, 1. In schematic from UI, it is showing all tasks name as ShellTask, It should show my sql filename which i passed in shell task or is there any way to naming the tasks on above example.
k
hey @Mahesh, I think this is what you’re looking for: https://docs.prefect.io/core/idioms/task-run-names.html
👍 1