Sean Malone
02/03/2023, 3:07 PMprefect-snowflake
and was wondering if there is a way to
“name” the task that spawns from calling snowflake_query()
. Right now it generates a task in a format like: snowflake_query-abcd123-00
and Im wondering if this can be controlled to look like snowflake_query-<my_query_name>
?James Sopkin
02/03/2023, 3:32 PM@task(name="snowflake-query-query-name")
with_options
to create a new task with the specific query name. Something like
>>> @task(name="My task")
>>> def my_task():
>>> return 1
>>>
>>> new_task = my_task.with_options(name="My new task")
@task
def test(task_name='my-test-task'):
return 1,2
@flow
def test_flow():
name="asdfasd"
new_task = test.with_options(name=f"my-task-{name}")
a, b = new_task()
print(f"a is {a}")
print(f"b is {b}")
if __name__ == '__main__':
test_flow()
09:36:34.717 | INFO | prefect.engine - Created flow run 'bold-harrier' for flow 'test-flow'
09:36:35.562 | INFO | Flow run 'bold-harrier' - Created task run 'my-task-asdfasd-f7d3c15a-0' for task 'my-task-asdfasd'
09:36:35.563 | INFO | Flow run 'bold-harrier' - Executing 'my-task-asdfasd-f7d3c15a-0' immediately...
09:36:36.446 | INFO | Task run 'my-task-asdfasd-f7d3c15a-0' - Finished in state Completed()
a is 1
b is 2
09:36:36.682 | INFO | Flow run 'bold-harrier' - Finished in state Completed('All states completed.')
with_options
for each query nameSean Malone
02/03/2023, 4:07 PMprefect_snowflake.database.snowflake_query()
which is a task already .So I am unable to put it in my own decorated “named” @task
function or i get the error:
RuntimeError: Tasks cannot be run from within tasks. Did you mean to call this task in a flow?
So I am forced to call it within the @flow
function.prefect-snowflake
(v0.2.1) but the release notes don’t indicate any major changes to snowflake_query()
that would support my use case. Maybe release 0.2.4 makes my use case possible with .execute()
instead of snowflake_query()
, but I wanted to check here in community before making dependency changes.James Sopkin
02/03/2023, 4:17 PMSean Malone
02/03/2023, 4:20 PMdef test(task_name='my-test-task'):
James Sopkin
02/03/2023, 4:20 PMSean Malone
02/03/2023, 4:22 PM11:26:51.961 | INFO | Flow run 'dainty-orangutan' - Executing 'snowflake_query-asdfasdf-12' immediately...
Ideally 'snowflake_query-asdfasdf-12'
can be renamed to something like 'snowflake_query-loading-some-data'
but I understand this may not be possible given the nature of the library and how it “generates tasks”.James Sopkin
02/03/2023, 4:34 PMwith_options.
>>> @flow(name="My flow")
>>> def my_flow():
>>> return 1
>>>
>>> new_flow = my_flow.with_options(name=f"My new flow {query_name}")
Sean Malone
02/03/2023, 4:39 PMfrom prefect import flow
from prefect_snowflake.database import SnowflakeConnector, snowflake_query
@flow()
def test_flow():
snowflake_connector_block = SnowflakeConnector.load("my-snowflake-connector")
# AttributeError: 'list' object has no attribute 'with_options'
result = snowflake_query("select 1 as test", snowflake_connector_block).with_options(name="my_query_name")
# ... Run more snowflake_query() with different names ...
return result
if __name__ == "__main__":
test_flow()
Note: this is an improper use of with_options()James Sopkin
02/03/2023, 4:43 PM#wrong
new_flow = flow(my_params).with_options
#correct
new_flow = flow.with_options(name='new_name')
Sean Malone
02/03/2023, 4:53 PM