k

    kumar

    1 year ago
    Hello Team, Hope you guys are doing good. By using SnowflakeQuery we can pass one query per task, Is there any way to create task for Snowflake sql file.
    Kevin Kho

    Kevin Kho

    1 year ago
    Maybe message Mahesh. He seems to have a Python script that lets him run Snowflake SQL files. He then used the
    ShellTask
    to call those files. This can be found here: https://prefect-community.slack.com/archives/CL09KU1K7/p1616754226464500
    Sample script
    from prefect.tasks.shell import ShellTask
    sql_list = ["1.sql", "2.sql", "3.sql"]
    shell_task = ShellTask(
        helper_script="",
        shell="bash",
        log_stderr=True,
        return_all=True,
        stream_output=True,
    )
    with Flow(name="Example") as flow:
        tasks = [
            shell_task(
                command="echo '/opt/prefect_env/bin/python /path/to//SnowSQL.py {}'".format(
                    statement
                )
            )
            for statement in sql_list
        ]
        for i in range(1, len(tasks)):
            tasks[i].set_upstream(tasks[i - 1])
    k

    kumar

    1 year ago
    Thanks @Kevin Kho. It will work for us.
    @Kevin Kho I would suggest to add this SQL file run option with prefect SnowflakeQuery tasks itself.
    Kevin Kho

    Kevin Kho

    1 year ago
    Are the SQL files queries or do they have other operations (creating table, dropping table, etc.)?
    k

    kumar

    1 year ago
    SQL files contains only queries
    l

    liren zhang

    1 year ago
    @Kevin Kho you can implement you own snowflake task. Prefect snowflake query task current only execute one sql statement at a time. take a look at the following link https://docs.snowflake.com/en/user-guide/python-connector-example.html#using-execute-stream-to-execute-sql-scripts
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @liren zhang, thanks for the resource. Yeah, it seems people are asking for a task to run
    .sql
    files directly. I can probably add this functionality to the task library.
    k

    kumar

    1 year ago
    @Kevin Kho can you please open GitHub issue for this feature.
    Kevin Kho

    Kevin Kho

    1 year ago
    Will bring this up with the team
    @kumar @Mahesh This Github issue is here: https://github.com/PrefectHQ/prefect/issues/4348
    Mahesh

    Mahesh

    1 year ago
    thanks @Kevin Kho.Perfect 🙂
    k

    kumar

    1 year ago
    Good job @Kevin Kho
    Mahesh

    Mahesh

    1 year ago
    Hi @Kevin Kho, I tried your latest commit on SnowflakeQueryFromFile, getting below issue.
    [2021-04-07 05:57:30+0000] INFO - prefect.TaskRunner | Task 'SnowflakeQueryFromFile': Starting task run...
    [2021-04-07 05:57:31+0000] ERROR - prefect.TaskRunner | Unexpected error: 000006 (0A000): Multiple SQL statements in a single API call are not supported; use one API call per statement instead.
    Traceback (most recent call last):
      File "/venv/prefect-env/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/venv/prefect-env/lib/python3.6/site-packages/prefect/engine/task_runner.py", line 869, in get_task_run_state
        logger=self.logger,
      File "/venv/prefect-env/lib/python3.6/site-packages/prefect/utilities/executors.py", line 299, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/venv/prefect-env/lib/python3.6/site-packages/prefect/utilities/tasks.py", line 454, in method
        return run_method(self, *args, **kwargs)
      File "/venv/prefect-env/lib/python3.6/site-packages/prefect/tasks/snowflake/snowflake.py", line 227, in run
        raise error
      File "/venv/prefect-env/lib/python3.6/site-packages/prefect/tasks/snowflake/snowflake.py", line 220, in run
        executed = cursor.execute(query).fetchall()
      File "/venv/prefect-env/lib/python3.6/site-packages/snowflake/connector/cursor.py", line 693, in execute
        self.connection, self, ProgrammingError, errvalue
      File "/venv/prefect-env/lib/python3.6/site-packages/snowflake/connector/errors.py", line 258, in errorhandler_wrapper
        cursor.errorhandler(connection, cursor, error_class, error_value)
      File "/venv/prefect-env/lib/python3.6/site-packages/snowflake/connector/errors.py", line 195, in default_errorhandler
        cursor=cursor,
    snowflake.connector.errors.ProgrammingError: 000006 (0A000): Multiple SQL statements in a single API call are not supported; use one API call per statement instead.
    [2021-04-07 05:57:31+0000] INFO - prefect.TaskRunner | Task 'SnowflakeQueryFromFile': Finished task run for task with final state: 'Failed'
    Query file can have multiple SQL statements.
    Kevin Kho

    Kevin Kho

    1 year ago
    Thanks for telling me. I’ll look into it @Mahesh.
    Mahesh

    Mahesh

    1 year ago
    @Kevin Kho Did you get anything for above issue?
    Kevin Kho

    Kevin Kho

    1 year ago
    Haven’t finished yet but I did find another method to use
    @Mahesh, I think my issue is that I can use
    execute_string
    from the Snowflow connector. This can execute multiple commands, but it’s tricky to return anything (data from query). Or I can leave it where it is where I use a file, but it can only do one query. What are your thoughts?
    Are you expecting something returned when you use multiple commands?
    Mahesh

    Mahesh

    1 year ago
    I am not expecting returns from query, It just needs to be executed
    Kevin Kho

    Kevin Kho

    1 year ago
    I think I’ll finalize that pull request today
    k

    kumar

    1 year ago
    Thanks @Kevin Kho
    Kevin Kho

    Kevin Kho

    1 year ago
    Updated the pull request to support multiple queries: https://github.com/PrefectHQ/prefect/pull/4363
    Mahesh

    Mahesh

    1 year ago
    Hi @Kevin Kho, Thanks for updating me, I tried this latest committed code and getting below issue,
    [2021-04-13 05:51:59+0000] ERROR - prefect.FlowRunner | Unexpected error: AttributeError("Can't get attribute 'SnowflakeQueriesFromFile' on <module 'prefect.tasks.snowflake.snowflake' from '/venv/prefect-env/lib/python3.6/site-packages/prefect/tasks/snowflake/snowflake.py'>",)
    Traceback (most recent call last):
      File "/venv/prefect-env/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/venv/prefect-env/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 646, in get_flow_run_state
        for t in final_tasks
      File "/venv/prefect-env/lib/python3.6/site-packages/prefect/executors/dask.py", line 414, in wait
        return self.client.gather(futures)
      File "/venv/prefect-env/lib/python3.6/site-packages/distributed/client.py", line 1969, in gather
        asynchronous=asynchronous,
      File "/venv/prefect-env/lib/python3.6/site-packages/distributed/client.py", line 838, in sync
        self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
      File "/venv/prefect-env/lib/python3.6/site-packages/distributed/utils.py", line 351, in sync
        raise exc.with_traceback(tb)
      File "/venv/prefect-env/lib/python3.6/site-packages/distributed/utils.py", line 334, in f
        result[0] = yield future
      File "/venv/prefect-env/lib/python3.6/site-packages/tornado/gen.py", line 762, in run
        value = future.result()
      File "/venv/prefect-env/lib/python3.6/site-packages/distributed/client.py", line 1828, in _gather
        raise exception.with_traceback(traceback)
      File "/venv/prefect-env/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 75, in loads
        return pickle.loads(x)
    AttributeError: Can't get attribute 'SnowflakeQueriesFromFile' on <module 'prefect.tasks.snowflake.snowflake' from '/venv/prefect-env/lib/python3.6/site-packages/prefect/tasks/snowflake/snowflake.py'>
    [2021-04-13 05:51:59+0000] ERROR - prefect.snow-test-file | Unexpected error occured in FlowRunner: AttributeError("Can't get attribute 'SnowflakeQueriesFromFile' on <module 'prefect.tasks.snowflake.snowflake' from '/venv/prefect-env/lib/python3.6/site-packages/prefect/tasks/snowflake/snowflake.py'>",)
    Kevin Kho

    Kevin Kho

    1 year ago
    Hi @Mahesh! this looks like it’s trying to import from Prefect but that pull request (and release) haven’t gone through yet so it won’t be part of the package. If you modify the Prefect source code on your local, you should
    pip install -e .
    to have it in editable mode, otherwise changed will not picked up. I suggest you just copy the task definition and move it into a folder where you keep your Tasks
    Mahesh

    Mahesh

    1 year ago
    yes @Kevin Kho, I copied the task definition and it is working fine now.
    thanks @Kevin Kho for the quick fix 🙂
    Kevin Kho

    Kevin Kho

    1 year ago
    Awesome! Thanks for the patience!