:wave: Hi, I’m new here, trialing Prefect 2.0, and...
# prefect-community
e
👋 Hi, I’m new here, trialing Prefect 2.0, and have a couple of question. I’m writing a flow to copy data into a bunch of Snowflake tables. I wanted to use the Snowflake connection instead of managing a connection myself. Seems like
snowflake_query
and
snowflake_multiquery
are their own tasks, so I can’t use them as part of a different task? Which means that my flow looks like this:
Copy code
do_some_stuff()
for table_name in tables:
    queries = build_queries(table)
    snowflake_multiquery(queries)
These tasks are all running sequentially, but there are a lot of tables so I’d like the
snowflake_multiquery
tasks for each table to run concurrently. How can I make that happen?
k
I think you can use list comprehension instead:
Copy code
[snowflake_multiquery(x) for x in queries]
and that might be parallel. You can use a task in another task by calling
task.fn
but it’s just the python under the hood, not a task with retries/caching/checkpointing
e
To clarify — each of the query in
queries
needs to be run in order (they’re, like, “create table x”, “copy into table x”, etc) but the queries for each table that I’m concerned about should be run concurrently — we can load data into table X concurrently with loading into table Y and table Z. So maybe a list comprehension like this would work?
[snowflake_multiquery(build_queries(t) for t in tables]
Though I don’t understand how that’s functionally different from the for loop I posted above. Can you explain if/why that list comprehension would have tasks running concurrently but not the for loop?
k
I think the issue might be overwriting queries each time. Could you try:
Copy code
do_some_stuff()
for table_name in tables:
    snowflake_multiquery(build_queries(table))
Also, can you make sure you are using the ConcurrentTaskRunner? I think the default may have changed to SequentialTaskRunner
👀 1
👍 1
e
logs indicate it is using the ConcurrentTaskRunner. Trying your suggestion, one sec
hm, no dice, they’re still running sequentially
k
Are you using a subflow somewhere?
e
Nope — it’s a pretty simple flow I think
k
Let me ask a teammate
🙏 1
e
If the solution is to update my
build_queries()
task to build the queries and then run them with
snowflake_multiquery.fn()
, that should be fine — I mostly wanted to avoid having to work with the snowflake python connector directly
quick update: I tried that approach, like so:
Copy code
@task
def copy_table(table):
  queries = ...
  creds = SnowflakeCredentials(...)
  snowflake_multiquery.fn(queries, creds)
This seems like it’s running tasks concurrently — they all get kicked off and run, and report that they
Finished in state Completed()
— but they’re not actually doing anything, ie, the snowflake queries aren’t being run. I’m also seeing this warning in the logs:
Copy code
RuntimeWarning: coroutine 'snowflake_multiquery' was never awaited
When I update the task to
return snowflake_multiquery.fn(queries, creds)
, then it fails with
TypeError: cannot pickle 'coroutine' object
Am I doing something wrong here, or does this approach not actually work?
z
We require you to call
.submit(…)
on your tasks to send them to the task runner now
Concurrency is opt-in rather than default behavior