https://prefect.io logo
Title
s

Sean Malone

02/03/2023, 3:07 PM
Hello! I am using
prefect-snowflake
and was wondering if there is a way to “name” the task that spawns from calling
snowflake_query()
. Right now it generates a task in a format like:
snowflake_query-abcd123-00
and Im wondering if this can be controlled to look like
snowflake_query-<my_query_name>
?
1
:plus-one: 1
j

James Sopkin

02/03/2023, 3:32 PM
You could just pass in a name for the task decorator
@task(name="snowflake-query-query-name")
Now if you wanted to template names maybe an option would be use the
with_options
to create a new task with the specific query name. Something like
>>> @task(name="My task")
>>> def my_task():
>>>     return 1
>>>
>>> new_task = my_task.with_options(name="My new task")
I made a quick psedo-code example
@task
def test(task_name='my-test-task'):
    return 1,2

@flow
def test_flow():
    name="asdfasd"
    new_task = test.with_options(name=f"my-task-{name}")
    a, b = new_task()
    print(f"a is {a}")
    print(f"b is {b}")

if __name__ == '__main__':
    test_flow()
09:36:34.717 | INFO    | prefect.engine - Created flow run 'bold-harrier' for flow 'test-flow'
09:36:35.562 | INFO    | Flow run 'bold-harrier' - Created task run 'my-task-asdfasd-f7d3c15a-0' for task 'my-task-asdfasd'
09:36:35.563 | INFO    | Flow run 'bold-harrier' - Executing 'my-task-asdfasd-f7d3c15a-0' immediately...
09:36:36.446 | INFO    | Task run 'my-task-asdfasd-f7d3c15a-0' - Finished in state Completed()
a is 1
b is 2
09:36:36.682 | INFO    | Flow run 'bold-harrier' - Finished in state Completed('All states completed.')
if for example you had a list of query names, you could iterate with a for loop and create new tasks
with_options
for each query name
s

Sean Malone

02/03/2023, 4:07 PM
Thanks for the response. I am using
prefect_snowflake.database.snowflake_query()
which is a task already .So I am unable to put it in my own decorated “named”
@task
function or i get the error:
RuntimeError: Tasks cannot be run from within tasks. Did you mean to call this task in a flow?
So I am forced to call it within the
@flow
function.
I am using an older version of
prefect-snowflake
(v0.2.1) but the release notes don’t indicate any major changes to
snowflake_query()
that would support my use case. Maybe release 0.2.4 makes my use case possible with
.execute()
instead of
snowflake_query()
, but I wanted to check here in community before making dependency changes.
j

James Sopkin

02/03/2023, 4:17 PM
yes, tasks need to be called within a flow
Is there a reason why you would need to run a snowflake query in a task rather than a flow?
s

Sean Malone

02/03/2023, 4:20 PM
The only reason would be to take advantage of “re-naming” the task with
def test(task_name='my-test-task'):
j

James Sopkin

02/03/2023, 4:20 PM
ahh I see. You could also use subflows and rename those as well
s

Sean Malone

02/03/2023, 4:22 PM
Okay cool I’ll explore using that approach 👍 ty
👍 1
I think the subflow approach still suffers from the output
11:26:51.961 | INFO    | Flow run 'dainty-orangutan' - Executing 'snowflake_query-asdfasdf-12' immediately...
Ideally
'snowflake_query-asdfasdf-12'
can be renamed to something like
'snowflake_query-loading-some-data'
but I understand this may not be possible given the nature of the library and how it “generates tasks”.
j

James Sopkin

02/03/2023, 4:34 PM
Hey Sean, you can generate a templated flow name by using
with_options.
it would be the same idea as the pseudocode outlined above except with a flow
>>> @flow(name="My flow")
>>> def my_flow():
>>>     return 1
>>>
>>> new_flow = my_flow.with_options(name=f"My new flow {query_name}")
s

Sean Malone

02/03/2023, 4:39 PM
For my use case I am looking to do something like this:
from prefect import flow
from prefect_snowflake.database import SnowflakeConnector, snowflake_query

@flow()
def test_flow():
    snowflake_connector_block = SnowflakeConnector.load("my-snowflake-connector")

    # AttributeError: 'list' object has no attribute 'with_options'
    result = snowflake_query("select 1 as test", snowflake_connector_block).with_options(name="my_query_name")

    # ... Run more snowflake_query() with different names ...

    return result

    
if __name__ == "__main__":
    test_flow()
Note: this is an improper use of with_options()
j

James Sopkin

02/03/2023, 4:43 PM
ahh yes. So rather than applying options to a flow result, you would have to create a new flow with options. I'll clarify
#wrong
new_flow = flow(my_params).with_options
#correct
new_flow = flow.with_options(name='new_name')
🎉 1
1
👍 1
s

Sean Malone

02/03/2023, 4:53 PM
Ooh! I see the flaw in my approach now, my issue is resolved. Appreciate the assistance @James Sopkin 🙌
:party-parrot: 1