Brian K
04/15/2024, 7:17 PMMarvin
04/15/2024, 7:17 PMprefect_snowflake
connector within a task can be achieved by using the snowflake_multiquery
or snowflake_query
functions provided by the Prefect Snowflake integration. The example below uses the snowflake_multiquery
function to execute multiple queries. The results are fetched within a loop, though it does not directly illustrate yielding in batches.
Here's a code example for querying a Snowflake database:
python
from prefect import flow
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector, snowflake_multiquery
@flow
def snowflake_multiquery_flow():
snowflake_credentials = SnowflakeCredentials(
account="account",
user="user",
password="password",
)
snowflake_connector = SnowflakeConnector(
database="database",
warehouse="warehouse",
schema="schema",
credentials=snowflake_credentials
)
result = snowflake_multiquery(
["SELECT * FROM table WHERE id=%{id_param}s LIMIT 8;", "SELECT 1,2"],
snowflake_connector,
params={"id_param": 1},
as_transaction=True
)
return result
snowflake_multiquery_flow()
For more details and documentation, you can refer to the following link:
- Prefect Snowflake Connector Documentation: Prefect Snowflake Connector
You would typically use a loop over the queries and fetch results for each query to create the notion of batches. However, the current examples do not show explicitly how to yield batches asynchronously, which may require additional handling within the task to manage state and control flow.Brian K
04/15/2024, 7:19 PM@task()
def snowflake_data_stream(block_name: str, query: str, batch_size: int = 10_000):
batch_cnt = 0
row_cnt = 0
with SnowflakeConnector.load(block_name) as conn:
while True:
batch = conn.fetch_many(query, size=batch_size)
if len(batch) == 0:
break
row_cnt += len(batch)
print(f"batch: {batch_cnt}, rows: {row_cnt}")
batch_cnt += 1
yield batch
@Marvin this does not seem to work as a generator any reason why?Marvin
04/15/2024, 7:20 PMNone
or an empty list []
in later versions. This behavior seems unintentional and might be considered a bug. This might be the reason why your generator pattern is not working as expected.
There is also a feature request for "Yieldable task results in orion flows (generator pattern for tasks and futures)" which suggests that the ability to pass results from a generator to new processes as they complete is not currently available in Prefect 2.x, but this is a desired feature by the community.
Please make sure that your version of Prefect is up to date and check the mentioned GitHub issues for any updates or workarounds by the community. If the functionality is still not working as expected, it may be worthwhile to contribute to the issue discussions or look out for updates in future releases that address these concerns.