Rohan Chutke
06/22/2023, 1:23 PM@task
def extract():
logger = get_run_logger()
result = sf_operator.set_stage(url='<S3://bucket/folder/working/>',
stage_location='EXTERNAL_TABLES.BACKUP_TABLE')
[<http://logger.info|logger.info>(log_message) for log_message in result]
result = sf_operator.set_external_table(external_table='EXTERNAL_TABLES.BACKUP',
stage_location='EXTERNAL_TABLES.BACKUP_TABLE')
[<http://logger.info|logger.info>(log_message) for log_message in result]
Here in sf_operator.set_stage, I am using from prefect_snowflake.database import SnowflakeConnector, snowflake_query
to run query to create a stage in snowflake.
But I am getting
RuntimeError: Tasks cannot be run from within tasks. Did you mean to call this task in a flow?
prefect.flow_runs
Finished in state Failed('Flow run encountered an exception. RuntimeError: Tasks cannot be run from within tasks. Did you mean to call this task in a flow?')
Can anyone please explain where am I using task inside a task and a possible work around this? This worked fine in prefect 1
Any help will be appreciated.Deceivious
06/22/2023, 1:37 PMsf_operator.set_external_table
/ sf_operator.set_stage
Deceivious
06/22/2023, 1:37 PMRohan Chutke
06/22/2023, 1:57 PMfrom prefect_snowflake.database import snowflake_query
def set_stage(url, stage_location, integration_name='INT', file_format='json', file_format_opts=[]):
str_query = """ create or replace stage {stage_location}
storage_integration = {int_name}
url = {url}
file_format = (TYPE='{file_format}'{format_opts});
"""
snowflake_query(str_query, snowflake_connector)
This is my code in that function. Similarly for
sf_operator.set_external_table
Deceivious
06/22/2023, 1:58 PMDeceivious
06/22/2023, 2:00 PMsnowflake_query
is a task