https://prefect.io logo
Title
p

Paul Stark

07/07/2022, 10:08 PM
I’m trying to run the following code snippet
snowflake_query = SnowflakeQuery(
    max_retries=cfg.retry_max,
    retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
)

sql_queries = cfg.sql_query.split(';')

with Flow(
    getenv('FLOW_NAME')
) as flow:
    run_snowflake_query = snowflake_query(
        account=PrefectSecret("SNOWFLAKE_ACCOUNT"),
        user=PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}"),
        password=PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}"),
        warehouse=cfg.snowflake_warehouse,
    ).map(query=sql_queries)
sql_queries is getting put into a list since I can loop through it….Any thoughts on what I am doing incorrectly?
k

Kevin Kho

07/07/2022, 10:12 PM
Hey Paul, you are doing 3 calls.
snowflake_query = SnowflakeQuery(
    max_retries=cfg.retry_max,
    retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
)
is already an init and then
snowflake_query(
        account=PrefectSecret("SNOWFLAKE_ACCOUNT"),
        user=PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}"),
        password=PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}"),
        warehouse=cfg.snowflake_warehouse,
    ).map(query=sql_queries)
is 2 more. I know the attempt here is to initialize with secrets but you can’t do that because init is called during registration and secrets are fed in during runtime. Instead, you pass the account, user, and password along with the map by using
unmapped
Also, when you get the chance, it would help if you could move the error to the thread to keep the main channel cleaner
p

Paul Stark

07/07/2022, 10:14 PM
Moving here per request. Error message below :-)
Task 'SnowflakeQuery': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/tasks.py", line 456, in method
    return run_method(self, *args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/prefect/tasks/snowflake/snowflake.py", line 152, in run
    raise ValueError("A query string must be provided")
ValueError: A query string must be provided
k

Kevin Kho

07/07/2022, 10:15 PM
Thanks! This error is from the call before the map
p

Paul Stark

07/07/2022, 10:16 PM
Thanks….but I have another process that uses the same logic that works correctly just not trying to run multiple queries….And this process works if I just try a single query without the map
k

Kevin Kho

07/07/2022, 10:19 PM
Ah ok I was a bit off.
snowflake_query = SnowflakeQuery(
    max_retries=cfg.retry_max,
    retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
)

with Flow(
    getenv('FLOW_NAME')
) as flow:
    run_snowflake_query = snowflake_query(
        query=query,
        account=PrefectSecret("SNOWFLAKE_ACCOUNT"),
        user=PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}"),
        password=PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}"),
        warehouse=cfg.snowflake_warehouse,
    )
will work because it’s just called once after init, but the map is a call also so it should be:
with Flow(
    getenv('FLOW_NAME')
) as flow:
    run_snowflake_query = snowflake_query.map(
        query=list_of_queries,
        account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
        user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
        password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
        warehouse=unmapped(cfg.snowflake_warehouse),
    )
Basically you don’t do:
@task
def abc(x):
    return x+1

with Flow(..) as flow:
    abc().map([1,2,3])
The right syntax is:
with Flow(..) as flow:
    abc.map([1,2,3)
p

Paul Stark

07/07/2022, 10:42 PM
@Kevin Kho thanks for the clarification…that helps to clarify things! Using the above I am still getting an error this time about needing an account
snowflake_query = SnowflakeQuery(
    max_retries=cfg.retry_max,
    retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
)

sql_queries = cfg.sql_query.split(';')

with Flow(
    getenv('FLOW_NAME')
) as flow:
    run_snowflake_query = snowflake_query().map(
          query=sql_queries,
          account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
          user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
          password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
          warehouse=cfg.snowflake_warehouse,
          )
Error
Task 'SnowflakeQuery': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/tasks.py", line 456, in method
    return run_method(self, *args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/prefect/tasks/snowflake/snowflake.py", line 148, in run
    raise ValueError("An account must be provided")
ValueError: An account must be provided
k

Kevin Kho

07/07/2022, 10:46 PM
Your error is from the first parenthesis:
with Flow(
    getenv('FLOW_NAME')
) as flow:
    run_snowflake_query = snowflake_query().map(
          query=sql_queries,
          account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
          user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
          password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
          warehouse=cfg.snowflake_warehouse,
          )
which attempts to call the task. It should just be:
snowflake_query.map()
p

Paul Stark

07/07/2022, 11:02 PM
@Kevin Kho Success!!! Thank you very much for your help! Had to make 1 small change with the warehouse and was able to get a successful run. Posting final snippet for posterity :-)
snowflake_query = SnowflakeQuery(
    max_retries=cfg.retry_max,
    retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
    warehouse=cfg.snowflake_warehouse,
)

sql_queries = cfg.sql_query.split(';')

with Flow(
    getenv('FLOW_NAME')
) as flow:
    run_snowflake_query = snowflake_query.map(
          query=sql_queries,
          account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
          user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
          password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
          )
k

Kevin Kho

07/07/2022, 11:03 PM
Nice!
:thank-you: 1