Hi all, I have a problem with prefect-snowflake an...
# prefect-community
m
Hi all, I have a problem with prefect-snowflake and/or concurrentTaskRunner and/or asyncio: I'm trying to run ~10 snowflake queries simultaneously (example code in thread) And only 4 or 5 or 6 are picked, the rest queries run when some previous query finishes. It looks that the threads do not "go to sleep" as intended when they wait for snowflake results and because of that other tasks are not picked.
this is simple reproduction
Copy code
for x in range(10):
         snowflake_multiquery.submit(["select count(*) from (select seq4() from  TABLE(GENERATOR(ROWCOUNT => 1000000000000)) )"], snowflake_credentials, as_transaction=transaction)
this is the code from prefect_snowflake/database.py that causes the problem
Copy code
with connection.cursor(cursor_type) as cursor:
            results = []
            for query in queries:
                response = cursor.execute_async(query, params=params)
                query_id = response["queryId"]
                while connection.is_still_running(
                    connection.get_query_status_throw_if_error(query_id)
                ):
                    await asyncio.sleep(0.05)
                cursor.get_results_from_sfqid(query_id)
                result = cursor.fetchall()
                results.append(result)
There is a
Copy code
`asyncio.sleep(0.05)`
and I believe that when processor is a bit busy, those "mini sleeps" (0.05s) are not really happening, and other threads cannot start... When I change 0.05 to for example 1s, I am able to run 10 queries concurrently
I don't know if it's issue with snowflake connector or with asyncio or with concurrent task runner... And that 1s workaround probably isn't perfect. Maybe someone will have some ideas...
I can use daskTaskRunner but am curious how to run that code with ConcurrentTaskRunner as well
it's prefect 2.0.2
b
Hi Marcin!
And only 4 or 5 or 6 are picked, the rest queries run when some previous query finishes.
At first glance, this sounds like maybe a concurrency limit issue. Have you set a limit for the flows/tasks? https://docs-v1.prefect.io/orchestration/flow-runs/concurrency-limits.html#flow-run-limits
m
Hi, I haven't set it. I will try this, but as I wrote above, I am able to run 10 tasks concurrently, when I change asyncio.sleep time in prefect_snowflake module...