kumar
03/31/2021, 10:55 AMKevin Kho
ShellTask
to call those files. This can be found here: https://prefect-community.slack.com/archives/CL09KU1K7/p1616754226464500Kevin Kho
from prefect.tasks.shell import ShellTask
sql_list = ["1.sql", "2.sql", "3.sql"]
shell_task = ShellTask(
helper_script="",
shell="bash",
log_stderr=True,
return_all=True,
stream_output=True,
)
with Flow(name="Example") as flow:
tasks = [
shell_task(
command="echo '/opt/prefect_env/bin/python /path/to//SnowSQL.py {}'".format(
statement
)
)
for statement in sql_list
]
for i in range(1, len(tasks)):
tasks[i].set_upstream(tasks[i - 1])
kumar
03/31/2021, 2:49 PMkumar
03/31/2021, 3:22 PMKevin Kho
kumar
03/31/2021, 4:24 PMliren zhang
03/31/2021, 4:31 PMKevin Kho
.sql
files directly. I can probably add this functionality to the task library.kumar
04/01/2021, 5:01 AMKevin Kho
Kevin Kho
Mahesh
04/05/2021, 5:10 PMkumar
04/05/2021, 5:27 PMMahesh
04/07/2021, 6:01 AM[2021-04-07 05:57:30+0000] INFO - prefect.TaskRunner | Task 'SnowflakeQueryFromFile': Starting task run...
[2021-04-07 05:57:31+0000] ERROR - prefect.TaskRunner | Unexpected error: 000006 (0A000): Multiple SQL statements in a single API call are not supported; use one API call per statement instead.
Traceback (most recent call last):
File "/venv/prefect-env/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/venv/prefect-env/lib/python3.6/site-packages/prefect/engine/task_runner.py", line 869, in get_task_run_state
logger=self.logger,
File "/venv/prefect-env/lib/python3.6/site-packages/prefect/utilities/executors.py", line 299, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/venv/prefect-env/lib/python3.6/site-packages/prefect/utilities/tasks.py", line 454, in method
return run_method(self, *args, **kwargs)
File "/venv/prefect-env/lib/python3.6/site-packages/prefect/tasks/snowflake/snowflake.py", line 227, in run
raise error
File "/venv/prefect-env/lib/python3.6/site-packages/prefect/tasks/snowflake/snowflake.py", line 220, in run
executed = cursor.execute(query).fetchall()
File "/venv/prefect-env/lib/python3.6/site-packages/snowflake/connector/cursor.py", line 693, in execute
self.connection, self, ProgrammingError, errvalue
File "/venv/prefect-env/lib/python3.6/site-packages/snowflake/connector/errors.py", line 258, in errorhandler_wrapper
cursor.errorhandler(connection, cursor, error_class, error_value)
File "/venv/prefect-env/lib/python3.6/site-packages/snowflake/connector/errors.py", line 195, in default_errorhandler
cursor=cursor,
snowflake.connector.errors.ProgrammingError: 000006 (0A000): Multiple SQL statements in a single API call are not supported; use one API call per statement instead.
[2021-04-07 05:57:31+0000] INFO - prefect.TaskRunner | Task 'SnowflakeQueryFromFile': Finished task run for task with final state: 'Failed'
Query file can have multiple SQL statements.Kevin Kho
Mahesh
04/12/2021, 12:35 PMKevin Kho
Kevin Kho
execute_string
from the Snowflow connector. This can execute multiple commands, but it’s tricky to return anything (data from query). Or I can leave it where it is where I use a file, but it can only do one query. What are your thoughts?Kevin Kho
Mahesh
04/12/2021, 2:46 PMKevin Kho
kumar
04/12/2021, 3:59 PMKevin Kho
Mahesh
04/13/2021, 6:07 AM[2021-04-13 05:51:59+0000] ERROR - prefect.FlowRunner | Unexpected error: AttributeError("Can't get attribute 'SnowflakeQueriesFromFile' on <module 'prefect.tasks.snowflake.snowflake' from '/venv/prefect-env/lib/python3.6/site-packages/prefect/tasks/snowflake/snowflake.py'>",)
Traceback (most recent call last):
File "/venv/prefect-env/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/venv/prefect-env/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 646, in get_flow_run_state
for t in final_tasks
File "/venv/prefect-env/lib/python3.6/site-packages/prefect/executors/dask.py", line 414, in wait
return self.client.gather(futures)
File "/venv/prefect-env/lib/python3.6/site-packages/distributed/client.py", line 1969, in gather
asynchronous=asynchronous,
File "/venv/prefect-env/lib/python3.6/site-packages/distributed/client.py", line 838, in sync
self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
File "/venv/prefect-env/lib/python3.6/site-packages/distributed/utils.py", line 351, in sync
raise exc.with_traceback(tb)
File "/venv/prefect-env/lib/python3.6/site-packages/distributed/utils.py", line 334, in f
result[0] = yield future
File "/venv/prefect-env/lib/python3.6/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/venv/prefect-env/lib/python3.6/site-packages/distributed/client.py", line 1828, in _gather
raise exception.with_traceback(traceback)
File "/venv/prefect-env/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 75, in loads
return pickle.loads(x)
AttributeError: Can't get attribute 'SnowflakeQueriesFromFile' on <module 'prefect.tasks.snowflake.snowflake' from '/venv/prefect-env/lib/python3.6/site-packages/prefect/tasks/snowflake/snowflake.py'>
[2021-04-13 05:51:59+0000] ERROR - prefect.snow-test-file | Unexpected error occured in FlowRunner: AttributeError("Can't get attribute 'SnowflakeQueriesFromFile' on <module 'prefect.tasks.snowflake.snowflake' from '/venv/prefect-env/lib/python3.6/site-packages/prefect/tasks/snowflake/snowflake.py'>",)
Kevin Kho
pip install -e .
to have it in editable mode, otherwise changed will not picked up. I suggest you just copy the task definition and move it into a folder where you keep your TasksMahesh
04/13/2021, 2:41 PMMahesh
04/13/2021, 2:41 PMKevin Kho