Hello, I have this task: ```@task def extract(): ...
# ask-community
r
Hello, I have this task:
Copy code
@task
def extract():
    logger = get_run_logger()

    result = sf_operator.set_stage(url='<S3://bucket/folder/working/>',
                                   stage_location='EXTERNAL_TABLES.BACKUP_TABLE')
    [<http://logger.info|logger.info>(log_message) for log_message in result]

    result = sf_operator.set_external_table(external_table='EXTERNAL_TABLES.BACKUP',
                                            stage_location='EXTERNAL_TABLES.BACKUP_TABLE')

    [<http://logger.info|logger.info>(log_message) for log_message in result]
Here in sf_operator.set_stage, I am using
from prefect_snowflake.database import SnowflakeConnector, snowflake_query
to run query to create a stage in snowflake. But I am getting
Copy code
RuntimeError: Tasks cannot be run from within tasks. Did you mean to call this task in a flow?
prefect.flow_runs
Finished in state Failed('Flow run encountered an exception. RuntimeError: Tasks cannot be run from within tasks. Did you mean to call this task in a flow?')
Can anyone please explain where am I using task inside a task and a possible work around this? This worked fine in prefect 1 Any help will be appreciated.
1
d
sf_operator.set_external_table
/
sf_operator.set_stage
one of these maybe?
r
@Deceivious
Copy code
from prefect_snowflake.database import snowflake_query

def set_stage(url, stage_location, integration_name='INT', file_format='json', file_format_opts=[]):

    str_query = """ create or replace stage {stage_location}
                    storage_integration = {int_name}
                    url = {url}
                    file_format = (TYPE='{file_format}'{format_opts});
                """

    snowflake_query(str_query, snowflake_connector)
This is my code in that function. Similarly for
sf_operator.set_external_table
d
Try comment the list generation logger function
snowflake_query
is a task
👍 1