Hello Team, Hope you guys are doing good. By using...
# ask-community
k
Hello Team, Hope you guys are doing good. By using SnowflakeQuery we can pass one query per task, Is there any way to create task for Snowflake sql file.
k
Maybe message Mahesh. He seems to have a Python script that lets him run Snowflake SQL files. He then used the
ShellTask
to call those files. This can be found here: https://prefect-community.slack.com/archives/CL09KU1K7/p1616754226464500
Sample script
from prefect.tasks.shell import ShellTask
Copy code
sql_list = ["1.sql", "2.sql", "3.sql"]
shell_task = ShellTask(
    helper_script="",
    shell="bash",
    log_stderr=True,
    return_all=True,
    stream_output=True,
)
with Flow(name="Example") as flow:
    tasks = [
        shell_task(
            command="echo '/opt/prefect_env/bin/python /path/to//SnowSQL.py {}'".format(
                statement
            )
        )
        for statement in sql_list
    ]
    for i in range(1, len(tasks)):
        tasks[i].set_upstream(tasks[i - 1])
k
Thanks @Kevin Kho. It will work for us.
👍 1
@Kevin Kho I would suggest to add this SQL file run option with prefect SnowflakeQuery tasks itself.
k
Are the SQL files queries or do they have other operations (creating table, dropping table, etc.)?
k
SQL files contains only queries
l
@Kevin Kho you can implement you own snowflake task. Prefect snowflake query task current only execute one sql statement at a time. take a look at the following link https://docs.snowflake.com/en/user-guide/python-connector-example.html#using-execute-stream-to-execute-sql-scripts
k
Hey @liren zhang, thanks for the resource. Yeah, it seems people are asking for a task to run
.sql
files directly. I can probably add this functionality to the task library.
k
@Kevin Kho can you please open GitHub issue for this feature.
k
Will bring this up with the team
👍 1
@kumar @Mahesh This Github issue is here: https://github.com/PrefectHQ/prefect/issues/4348
m
thanks @Kevin Kho.Perfect 🙂
k
Good job @Kevin Kho
m
Hi @Kevin Kho, I tried your latest commit on SnowflakeQueryFromFile, getting below issue.
Copy code
[2021-04-07 05:57:30+0000] INFO - prefect.TaskRunner | Task 'SnowflakeQueryFromFile': Starting task run...
[2021-04-07 05:57:31+0000] ERROR - prefect.TaskRunner | Unexpected error: 000006 (0A000): Multiple SQL statements in a single API call are not supported; use one API call per statement instead.
Traceback (most recent call last):
  File "/venv/prefect-env/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/venv/prefect-env/lib/python3.6/site-packages/prefect/engine/task_runner.py", line 869, in get_task_run_state
    logger=self.logger,
  File "/venv/prefect-env/lib/python3.6/site-packages/prefect/utilities/executors.py", line 299, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/venv/prefect-env/lib/python3.6/site-packages/prefect/utilities/tasks.py", line 454, in method
    return run_method(self, *args, **kwargs)
  File "/venv/prefect-env/lib/python3.6/site-packages/prefect/tasks/snowflake/snowflake.py", line 227, in run
    raise error
  File "/venv/prefect-env/lib/python3.6/site-packages/prefect/tasks/snowflake/snowflake.py", line 220, in run
    executed = cursor.execute(query).fetchall()
  File "/venv/prefect-env/lib/python3.6/site-packages/snowflake/connector/cursor.py", line 693, in execute
    self.connection, self, ProgrammingError, errvalue
  File "/venv/prefect-env/lib/python3.6/site-packages/snowflake/connector/errors.py", line 258, in errorhandler_wrapper
    cursor.errorhandler(connection, cursor, error_class, error_value)
  File "/venv/prefect-env/lib/python3.6/site-packages/snowflake/connector/errors.py", line 195, in default_errorhandler
    cursor=cursor,
snowflake.connector.errors.ProgrammingError: 000006 (0A000): Multiple SQL statements in a single API call are not supported; use one API call per statement instead.
[2021-04-07 05:57:31+0000] INFO - prefect.TaskRunner | Task 'SnowflakeQueryFromFile': Finished task run for task with final state: 'Failed'
Query file can have multiple SQL statements.
k
Thanks for telling me. I’ll look into it @Mahesh.
👍 1
m
@Kevin Kho Did you get anything for above issue?
k
Haven’t finished yet but I did find another method to use
@Mahesh, I think my issue is that I can use
execute_string
from the Snowflow connector. This can execute multiple commands, but it’s tricky to return anything (data from query). Or I can leave it where it is where I use a file, but it can only do one query. What are your thoughts?
Are you expecting something returned when you use multiple commands?
m
I am not expecting returns from query, It just needs to be executed
k
I think I’ll finalize that pull request today
k
Thanks @Kevin Kho
k
Updated the pull request to support multiple queries: https://github.com/PrefectHQ/prefect/pull/4363
m
Hi @Kevin Kho, Thanks for updating me, I tried this latest committed code and getting below issue,
Copy code
[2021-04-13 05:51:59+0000] ERROR - prefect.FlowRunner | Unexpected error: AttributeError("Can't get attribute 'SnowflakeQueriesFromFile' on <module 'prefect.tasks.snowflake.snowflake' from '/venv/prefect-env/lib/python3.6/site-packages/prefect/tasks/snowflake/snowflake.py'>",)
Traceback (most recent call last):
  File "/venv/prefect-env/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/venv/prefect-env/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 646, in get_flow_run_state
    for t in final_tasks
  File "/venv/prefect-env/lib/python3.6/site-packages/prefect/executors/dask.py", line 414, in wait
    return self.client.gather(futures)
  File "/venv/prefect-env/lib/python3.6/site-packages/distributed/client.py", line 1969, in gather
    asynchronous=asynchronous,
  File "/venv/prefect-env/lib/python3.6/site-packages/distributed/client.py", line 838, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/venv/prefect-env/lib/python3.6/site-packages/distributed/utils.py", line 351, in sync
    raise exc.with_traceback(tb)
  File "/venv/prefect-env/lib/python3.6/site-packages/distributed/utils.py", line 334, in f
    result[0] = yield future
  File "/venv/prefect-env/lib/python3.6/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/venv/prefect-env/lib/python3.6/site-packages/distributed/client.py", line 1828, in _gather
    raise exception.with_traceback(traceback)
  File "/venv/prefect-env/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
AttributeError: Can't get attribute 'SnowflakeQueriesFromFile' on <module 'prefect.tasks.snowflake.snowflake' from '/venv/prefect-env/lib/python3.6/site-packages/prefect/tasks/snowflake/snowflake.py'>
[2021-04-13 05:51:59+0000] ERROR - prefect.snow-test-file | Unexpected error occured in FlowRunner: AttributeError("Can't get attribute 'SnowflakeQueriesFromFile' on <module 'prefect.tasks.snowflake.snowflake' from '/venv/prefect-env/lib/python3.6/site-packages/prefect/tasks/snowflake/snowflake.py'>",)
k
Hi @Mahesh! this looks like it’s trying to import from Prefect but that pull request (and release) haven’t gone through yet so it won’t be part of the package. If you modify the Prefect source code on your local, you should
pip install -e .
to have it in editable mode, otherwise changed will not picked up. I suggest you just copy the task definition and move it into a folder where you keep your Tasks
m
yes @Kevin Kho, I copied the task definition and it is working fine now.
thanks @Kevin Kho for the quick fix 🙂
k
Awesome! Thanks for the patience!
👍 1