Paul Stark
07/07/2022, 10:08 PMsnowflake_query = SnowflakeQuery(
max_retries=cfg.retry_max,
retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
)
sql_queries = cfg.sql_query.split(';')
with Flow(
getenv('FLOW_NAME')
) as flow:
run_snowflake_query = snowflake_query(
account=PrefectSecret("SNOWFLAKE_ACCOUNT"),
user=PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}"),
password=PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}"),
warehouse=cfg.snowflake_warehouse,
).map(query=sql_queries)
sql_queries is getting put into a list since I can loop through it….Any thoughts on what I am doing incorrectly?Kevin Kho
07/07/2022, 10:12 PMsnowflake_query = SnowflakeQuery(
max_retries=cfg.retry_max,
retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
)
is already an init and then
snowflake_query(
account=PrefectSecret("SNOWFLAKE_ACCOUNT"),
user=PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}"),
password=PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}"),
warehouse=cfg.snowflake_warehouse,
).map(query=sql_queries)
is 2 more.
I know the attempt here is to initialize with secrets but you can’t do that because init is called during registration and secrets are fed in during runtime. Instead, you pass the account, user, and password along with the map by using unmapped
Paul Stark
07/07/2022, 10:14 PMTask 'SnowflakeQuery': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/tasks.py", line 456, in method
return run_method(self, *args, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/prefect/tasks/snowflake/snowflake.py", line 152, in run
raise ValueError("A query string must be provided")
ValueError: A query string must be provided
Kevin Kho
07/07/2022, 10:15 PMPaul Stark
07/07/2022, 10:16 PMKevin Kho
07/07/2022, 10:19 PMsnowflake_query = SnowflakeQuery(
max_retries=cfg.retry_max,
retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
)
with Flow(
getenv('FLOW_NAME')
) as flow:
run_snowflake_query = snowflake_query(
query=query,
account=PrefectSecret("SNOWFLAKE_ACCOUNT"),
user=PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}"),
password=PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}"),
warehouse=cfg.snowflake_warehouse,
)
will work because it’s just called once after init, but the map is a call also so it should be:
with Flow(
getenv('FLOW_NAME')
) as flow:
run_snowflake_query = snowflake_query.map(
query=list_of_queries,
account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
warehouse=unmapped(cfg.snowflake_warehouse),
)
@task
def abc(x):
return x+1
with Flow(..) as flow:
abc().map([1,2,3])
The right syntax is:
with Flow(..) as flow:
abc.map([1,2,3)
Paul Stark
07/07/2022, 10:42 PMsnowflake_query = SnowflakeQuery(
max_retries=cfg.retry_max,
retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
)
sql_queries = cfg.sql_query.split(';')
with Flow(
getenv('FLOW_NAME')
) as flow:
run_snowflake_query = snowflake_query().map(
query=sql_queries,
account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
warehouse=cfg.snowflake_warehouse,
)
Error
Task 'SnowflakeQuery': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/tasks.py", line 456, in method
return run_method(self, *args, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/prefect/tasks/snowflake/snowflake.py", line 148, in run
raise ValueError("An account must be provided")
ValueError: An account must be provided
Kevin Kho
07/07/2022, 10:46 PMwith Flow(
getenv('FLOW_NAME')
) as flow:
run_snowflake_query = snowflake_query().map(
query=sql_queries,
account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
warehouse=cfg.snowflake_warehouse,
)
which attempts to call the task. It should just be:
snowflake_query.map()
Paul Stark
07/07/2022, 11:02 PMsnowflake_query = SnowflakeQuery(
max_retries=cfg.retry_max,
retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
warehouse=cfg.snowflake_warehouse,
)
sql_queries = cfg.sql_query.split(';')
with Flow(
getenv('FLOW_NAME')
) as flow:
run_snowflake_query = snowflake_query.map(
query=sql_queries,
account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
)
Kevin Kho
07/07/2022, 11:03 PM