Ellie Redding
07/21/2022, 3:19 PMsnowflake_query
and snowflake_multiquery
are their own tasks, so I can’t use them as part of a different task? Which means that my flow looks like this:
do_some_stuff()
for table_name in tables:
queries = build_queries(table)
snowflake_multiquery(queries)
These tasks are all running sequentially, but there are a lot of tables so I’d like the snowflake_multiquery
tasks for each table to run concurrently. How can I make that happen?Kevin Kho
07/21/2022, 3:45 PM[snowflake_multiquery(x) for x in queries]
and that might be parallel.
You can use a task in another task by calling task.fn
but it’s just the python under the hood, not a task with retries/caching/checkpointingEllie Redding
07/21/2022, 3:53 PMqueries
needs to be run in order (they’re, like, “create table x”, “copy into table x”, etc) but the queries for each table that I’m concerned about should be run concurrently — we can load data into table X concurrently with loading into table Y and table Z.
So maybe a list comprehension like this would work?
[snowflake_multiquery(build_queries(t) for t in tables]
Though I don’t understand how that’s functionally different from the for loop I posted above. Can you explain if/why that list comprehension would have tasks running concurrently but not the for loop?Kevin Kho
07/21/2022, 3:56 PMdo_some_stuff()
for table_name in tables:
snowflake_multiquery(build_queries(table))
Also, can you make sure you are using the ConcurrentTaskRunner? I think the default may have changed to SequentialTaskRunnerEllie Redding
07/21/2022, 3:58 PMKevin Kho
07/21/2022, 4:02 PMEllie Redding
07/21/2022, 4:04 PMKevin Kho
07/21/2022, 4:05 PMEllie Redding
07/21/2022, 4:16 PMbuild_queries()
task to build the queries and then run them with snowflake_multiquery.fn()
, that should be fine — I mostly wanted to avoid having to work with the snowflake python connector directly@task
def copy_table(table):
queries = ...
creds = SnowflakeCredentials(...)
snowflake_multiquery.fn(queries, creds)
This seems like it’s running tasks concurrently — they all get kicked off and run, and report that they Finished in state Completed()
— but they’re not actually doing anything, ie, the snowflake queries aren’t being run. I’m also seeing this warning in the logs:
RuntimeWarning: coroutine 'snowflake_multiquery' was never awaited
When I update the task to return snowflake_multiquery.fn(queries, creds)
, then it fails with TypeError: cannot pickle 'coroutine' object
Am I doing something wrong here, or does this approach not actually work?Zanie
07/21/2022, 5:55 PM.submit(…)
on your tasks to send them to the task runner now