I’m trying to run the following code snippet ```sn...
# prefect-community
p
I’m trying to run the following code snippet
Copy code
snowflake_query = SnowflakeQuery(
    max_retries=cfg.retry_max,
    retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
)

sql_queries = cfg.sql_query.split(';')

with Flow(
    getenv('FLOW_NAME')
) as flow:
    run_snowflake_query = snowflake_query(
        account=PrefectSecret("SNOWFLAKE_ACCOUNT"),
        user=PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}"),
        password=PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}"),
        warehouse=cfg.snowflake_warehouse,
    ).map(query=sql_queries)
sql_queries is getting put into a list since I can loop through it….Any thoughts on what I am doing incorrectly?
k
Hey Paul, you are doing 3 calls.
Copy code
snowflake_query = SnowflakeQuery(
    max_retries=cfg.retry_max,
    retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
)
is already an init and then
Copy code
snowflake_query(
        account=PrefectSecret("SNOWFLAKE_ACCOUNT"),
        user=PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}"),
        password=PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}"),
        warehouse=cfg.snowflake_warehouse,
    ).map(query=sql_queries)
is 2 more. I know the attempt here is to initialize with secrets but you can’t do that because init is called during registration and secrets are fed in during runtime. Instead, you pass the account, user, and password along with the map by using
unmapped
Also, when you get the chance, it would help if you could move the error to the thread to keep the main channel cleaner
p
Moving here per request. Error message below :-)
Copy code
Task 'SnowflakeQuery': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/tasks.py", line 456, in method
    return run_method(self, *args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/prefect/tasks/snowflake/snowflake.py", line 152, in run
    raise ValueError("A query string must be provided")
ValueError: A query string must be provided
k
Thanks! This error is from the call before the map
p
Thanks….but I have another process that uses the same logic that works correctly just not trying to run multiple queries….And this process works if I just try a single query without the map
k
Ah ok I was a bit off.
Copy code
snowflake_query = SnowflakeQuery(
    max_retries=cfg.retry_max,
    retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
)

with Flow(
    getenv('FLOW_NAME')
) as flow:
    run_snowflake_query = snowflake_query(
        query=query,
        account=PrefectSecret("SNOWFLAKE_ACCOUNT"),
        user=PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}"),
        password=PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}"),
        warehouse=cfg.snowflake_warehouse,
    )
will work because it’s just called once after init, but the map is a call also so it should be:
Copy code
with Flow(
    getenv('FLOW_NAME')
) as flow:
    run_snowflake_query = snowflake_query.map(
        query=list_of_queries,
        account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
        user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
        password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
        warehouse=unmapped(cfg.snowflake_warehouse),
    )
Basically you don’t do:
Copy code
@task
def abc(x):
    return x+1

with Flow(..) as flow:
    abc().map([1,2,3])
The right syntax is:
Copy code
with Flow(..) as flow:
    abc.map([1,2,3)
p
@Kevin Kho thanks for the clarification…that helps to clarify things! Using the above I am still getting an error this time about needing an account
Copy code
snowflake_query = SnowflakeQuery(
    max_retries=cfg.retry_max,
    retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
)

sql_queries = cfg.sql_query.split(';')

with Flow(
    getenv('FLOW_NAME')
) as flow:
    run_snowflake_query = snowflake_query().map(
          query=sql_queries,
          account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
          user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
          password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
          warehouse=cfg.snowflake_warehouse,
          )
Error
Copy code
Task 'SnowflakeQuery': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/tasks.py", line 456, in method
    return run_method(self, *args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/prefect/tasks/snowflake/snowflake.py", line 148, in run
    raise ValueError("An account must be provided")
ValueError: An account must be provided
k
Your error is from the first parenthesis:
Copy code
with Flow(
    getenv('FLOW_NAME')
) as flow:
    run_snowflake_query = snowflake_query().map(
          query=sql_queries,
          account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
          user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
          password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
          warehouse=cfg.snowflake_warehouse,
          )
which attempts to call the task. It should just be:
Copy code
snowflake_query.map()
p
@Kevin Kho Success!!! Thank you very much for your help! Had to make 1 small change with the warehouse and was able to get a successful run. Posting final snippet for posterity :-)
Copy code
snowflake_query = SnowflakeQuery(
    max_retries=cfg.retry_max,
    retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
    warehouse=cfg.snowflake_warehouse,
)

sql_queries = cfg.sql_query.split(';')

with Flow(
    getenv('FLOW_NAME')
) as flow:
    run_snowflake_query = snowflake_query.map(
          query=sql_queries,
          account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
          user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
          password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
          )
k
Nice!
🙏 1