https://prefect.io logo
Title
e

Ellie Redding

07/21/2022, 3:19 PM
👋 Hi, I’m new here, trialing Prefect 2.0, and have a couple of question. I’m writing a flow to copy data into a bunch of Snowflake tables. I wanted to use the Snowflake connection instead of managing a connection myself. Seems like
snowflake_query
and
snowflake_multiquery
are their own tasks, so I can’t use them as part of a different task? Which means that my flow looks like this:
do_some_stuff()
for table_name in tables:
    queries = build_queries(table)
    snowflake_multiquery(queries)
These tasks are all running sequentially, but there are a lot of tables so I’d like the
snowflake_multiquery
tasks for each table to run concurrently. How can I make that happen?
k

Kevin Kho

07/21/2022, 3:45 PM
I think you can use list comprehension instead:
[snowflake_multiquery(x) for x in queries]
and that might be parallel. You can use a task in another task by calling
task.fn
but it’s just the python under the hood, not a task with retries/caching/checkpointing
e

Ellie Redding

07/21/2022, 3:53 PM
To clarify — each of the query in
queries
needs to be run in order (they’re, like, “create table x”, “copy into table x”, etc) but the queries for each table that I’m concerned about should be run concurrently — we can load data into table X concurrently with loading into table Y and table Z. So maybe a list comprehension like this would work?
[snowflake_multiquery(build_queries(t) for t in tables]
Though I don’t understand how that’s functionally different from the for loop I posted above. Can you explain if/why that list comprehension would have tasks running concurrently but not the for loop?
k

Kevin Kho

07/21/2022, 3:56 PM
I think the issue might be overwriting queries each time. Could you try:
do_some_stuff()
for table_name in tables:
    snowflake_multiquery(build_queries(table))
Also, can you make sure you are using the ConcurrentTaskRunner? I think the default may have changed to SequentialTaskRunner
👀 1
👍 1
e

Ellie Redding

07/21/2022, 3:58 PM
logs indicate it is using the ConcurrentTaskRunner. Trying your suggestion, one sec
hm, no dice, they’re still running sequentially
k

Kevin Kho

07/21/2022, 4:02 PM
Are you using a subflow somewhere?
e

Ellie Redding

07/21/2022, 4:04 PM
Nope — it’s a pretty simple flow I think
k

Kevin Kho

07/21/2022, 4:05 PM
Let me ask a teammate
:thank-you: 1
e

Ellie Redding

07/21/2022, 4:16 PM
If the solution is to update my
build_queries()
task to build the queries and then run them with
snowflake_multiquery.fn()
, that should be fine — I mostly wanted to avoid having to work with the snowflake python connector directly
quick update: I tried that approach, like so:
@task
def copy_table(table):
  queries = ...
  creds = SnowflakeCredentials(...)
  snowflake_multiquery.fn(queries, creds)
This seems like it’s running tasks concurrently — they all get kicked off and run, and report that they
Finished in state Completed()
— but they’re not actually doing anything, ie, the snowflake queries aren’t being run. I’m also seeing this warning in the logs:
RuntimeWarning: coroutine 'snowflake_multiquery' was never awaited
When I update the task to
return snowflake_multiquery.fn(queries, creds)
, then it fails with
TypeError: cannot pickle 'coroutine' object
Am I doing something wrong here, or does this approach not actually work?
z

Zanie

07/21/2022, 5:55 PM
We require you to call
.submit(…)
on your tasks to send them to the task runner now
Concurrency is opt-in rather than default behavior