<@ULVA73B9P> How can I run multiple Snowflake exec...
# ask-marvin
b
@Marvin How can I run multiple Snowflake execute statements in parallel to each other using the prefect-snowflake package?
m
To run multiple Snowflake execute statements in parallel, you can use the
snowflake_multiquery
function from the Prefect Snowflake integration. This function allows you to execute several queries asynchronously. Here is an example of how you can define a flow to run multiple statements in parallel:
Copy code
python
from prefect import flow
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector, snowflake_multiquery

@flow
def snowflake_multiquery_flow():
    snowflake_credentials = SnowflakeCredentials(
        account="account",
        user="user",
        password="password",
    )
    snowflake_connector = SnowflakeConnector(
        database="database",
        warehouse="warehouse",
        schema="schema",
        credentials=snowflake_credentials
    )
    result = snowflake_multiquery(
        ["SELECT * FROM table WHERE id=%{id_param}s LIMIT 8;", "SELECT 1,2"],
        snowflake_connector,
        params={"id_param": 1},
        as_transaction=True
    )
    return result

snowflake_multiquery_flow()
You can read more details about this feature in the Prefect Snowflake documentation. Keep in mind that using
as_transaction=True
will run the queries as a single transaction, but they will still be executed asynchronously, enabling parallel execution of the queries. If you want each query to run as a separate transaction, you should set
as_transaction
to
False
.
b
@Marvin does this work even if the statement are DDL and not DML?
m
The
snowflake_multiquery
function is capable of executing multiple queries, which could include Data Definition Language (DDL) statements such as creating or altering tables. This feature is inferred from the examples in the Prefect Snowflake documentation where DDL statements such as
CREATE TABLE
are executed using the connector provided by the
prefect-snowflake
package. Here is an excerpt from an example in the Prefect Snowflake documentation:
Copy code
python
from prefect_snowflake.database import SnowflakeConnector
with SnowflakeConnector.load("BLOCK_NAME") as conn:
    conn.execute(
        "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
    )
While the documentation primarily provides examples of DML operations such as insert statements (
conn.execute_many
), the ability to run a
CREATE TABLE
suggests that DDL statements should work as well. However, there is no explicit statement about limitations or differences when running DDL versus DML with the
snowflake_multiquery
function. It would be reasonable to expect that the
snowflake_multiquery
function can handle DDL statements too, but for a definitive answer or specific behavioral nuances, one may consider testing with actual DDL queries or reaching out to Prefect support for confirmation.
b
@Marvin In running two "COPY INTO" statements using the snowflake_multiquery function with the as_transaction parameter set to True and it appears they are running sequentially. How can I allow both statements to run in parallel?
m
It appears from the documentation that when you set the
as_transaction
parameter to
True
in the
snowflake_multiquery
function, this metatask creates a transaction in Snowflake and ensures that all provided queries are executed as part of this transaction. Given the nature of transactions, which are designed to be atomic and consistent, it implies that the queries are executed sequentially within the transaction block to maintain those transactional guarantees. If
snowflake_multiquery
executes the queries sequentially with
as_transaction
set to
True
, and your requirement is to have them run in parallel while still being part of the same transaction, this behavior may not align with the typical transactional use-case. Each query within a transaction tends to wait for the previous one to complete before starting, to ensure that the whole block can be rolled back if any single query fails. To enable true parallel execution outside the constraints of a transaction, you may need to set
as_transaction
to
False
. Then, each query may run independently of the others, which can allow them to execute in parallel. However, since they won't be part of a single transaction, you won't have the transactional rollback guarantee if one of the queries fails. If you have a strict requirement that they run concurrently within the same transaction, Prefect alone may not facilitate this as it requires specific Snowflake support for parallelism within a transaction block. For more advanced control, you might need to look into Snowflake's capabilities or consider reaching out to Snowflake support to understand how such parallelism can be achieved within their platform. For further information on how to achieve parallel executions or clarification on the behavior of
snowflake_multiquery
, you can consider browsing more in the Prefect Snowflake documentation.
b
@Marvin I do not need to run the two "COPY INTO" statements in a transaction but I do need to run them in parallel. When I run the snowflake_multiquery function using the as_trasnaction with False the statements are still ran sequentially. How can I run them in parallel?
m
It seems that even when the
as_transaction
parameter is set to
False
, the
snowflake_multiquery
function may not be executing the queries in parallel. The documentation does not explicitly state how to enforce parallel execution if it's not happening by default. One possible workaround to manually enforce parallel execution is to use Prefect's task concurrency features. You could define separate tasks for each "COPY INTO" statement and run these tasks concurrently using Prefect’s concurrency primitives like
create_task_group()
or simply invoking these tasks within a flow without any sequential dependency. Here is an example of how you might structure your flow to achieve this:
Copy code
python
from prefect import flow, task
from prefect_snowflake.database import SnowflakeConnector, snowflake_query

@task
async def execute_snowflake_query(query, snowflake_connector):
    return await snowflake_query(query, snowflake_connector)

@flow
def snowflake_parallel_copy_flow():
    snowflake_credentials = SnowflakeCredentials(
        account="account",
        user="user",
        password="password",
    )
    snowflake_connector = SnowflakeConnector(
        database="database",
        warehouse="warehouse",
        schema="schema",
        credentials=snowflake_credentials
    )
    
    # Define two tasks for the "COPY INTO" statements
    copy_into_task1 = execute_snowflake_query(
        query="COPY INTO ...",  # replace with your actual query
        snowflake_connector=snowflake_connector
    )
    copy_into_task2 = execute_snowflake_query(
        query="COPY INTO ...",  # replace with another actual query
        snowflake_connector=snowflake_connector
    )
    
    # Run tasks concurrently
    result1 = copy_into_task1()
    result2 = copy_into_task2()

    return result1, result2

snowflake_parallel_copy_flow()
In the above example, because
copy_into_task1
and
copy_into_task2
are called without waiting for each other, they will execute concurrently as two separate coroutine tasks. If this approach is not working for you, or further clarification is needed, please consult the Prefect Snowflake documentation to explore more options or reach out to the Prefect community for support.