p

    Paul Stark

    2 months ago
    I’m trying to run the following code snippet
    snowflake_query = SnowflakeQuery(
        max_retries=cfg.retry_max,
        retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
    )
    
    sql_queries = cfg.sql_query.split(';')
    
    with Flow(
        getenv('FLOW_NAME')
    ) as flow:
        run_snowflake_query = snowflake_query(
            account=PrefectSecret("SNOWFLAKE_ACCOUNT"),
            user=PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}"),
            password=PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}"),
            warehouse=cfg.snowflake_warehouse,
        ).map(query=sql_queries)
    sql_queries is getting put into a list since I can loop through it….Any thoughts on what I am doing incorrectly?
    Kevin Kho

    Kevin Kho

    2 months ago
    Hey Paul, you are doing 3 calls.
    snowflake_query = SnowflakeQuery(
        max_retries=cfg.retry_max,
        retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
    )
    is already an init and then
    snowflake_query(
            account=PrefectSecret("SNOWFLAKE_ACCOUNT"),
            user=PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}"),
            password=PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}"),
            warehouse=cfg.snowflake_warehouse,
        ).map(query=sql_queries)
    is 2 more. I know the attempt here is to initialize with secrets but you can’t do that because init is called during registration and secrets are fed in during runtime. Instead, you pass the account, user, and password along with the map by using
    unmapped
    Also, when you get the chance, it would help if you could move the error to the thread to keep the main channel cleaner
    p

    Paul Stark

    2 months ago
    Moving here per request. Error message below 😃
    Task 'SnowflakeQuery': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/usr/local/lib/python3.10/dist-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/tasks.py", line 456, in method
        return run_method(self, *args, **kwargs)
      File "/usr/local/lib/python3.10/dist-packages/prefect/tasks/snowflake/snowflake.py", line 152, in run
        raise ValueError("A query string must be provided")
    ValueError: A query string must be provided
    Kevin Kho

    Kevin Kho

    2 months ago
    Thanks! This error is from the call before the map
    p

    Paul Stark

    2 months ago
    Thanks….but I have another process that uses the same logic that works correctly just not trying to run multiple queries….And this process works if I just try a single query without the map
    Kevin Kho

    Kevin Kho

    2 months ago
    Ah ok I was a bit off.
    snowflake_query = SnowflakeQuery(
        max_retries=cfg.retry_max,
        retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
    )
    
    with Flow(
        getenv('FLOW_NAME')
    ) as flow:
        run_snowflake_query = snowflake_query(
            query=query,
            account=PrefectSecret("SNOWFLAKE_ACCOUNT"),
            user=PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}"),
            password=PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}"),
            warehouse=cfg.snowflake_warehouse,
        )
    will work because it’s just called once after init, but the map is a call also so it should be:
    with Flow(
        getenv('FLOW_NAME')
    ) as flow:
        run_snowflake_query = snowflake_query.map(
            query=list_of_queries,
            account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
            user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
            password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
            warehouse=unmapped(cfg.snowflake_warehouse),
        )
    Basically you don’t do:
    @task
    def abc(x):
        return x+1
    
    with Flow(..) as flow:
        abc().map([1,2,3])
    The right syntax is:
    with Flow(..) as flow:
        abc.map([1,2,3)
    p

    Paul Stark

    2 months ago
    @Kevin Kho thanks for the clarification…that helps to clarify things! Using the above I am still getting an error this time about needing an account
    snowflake_query = SnowflakeQuery(
        max_retries=cfg.retry_max,
        retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
    )
    
    sql_queries = cfg.sql_query.split(';')
    
    with Flow(
        getenv('FLOW_NAME')
    ) as flow:
        run_snowflake_query = snowflake_query().map(
              query=sql_queries,
              account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
              user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
              password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
              warehouse=cfg.snowflake_warehouse,
              )
    Error
    Task 'SnowflakeQuery': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/usr/local/lib/python3.10/dist-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/tasks.py", line 456, in method
        return run_method(self, *args, **kwargs)
      File "/usr/local/lib/python3.10/dist-packages/prefect/tasks/snowflake/snowflake.py", line 148, in run
        raise ValueError("An account must be provided")
    ValueError: An account must be provided
    Kevin Kho

    Kevin Kho

    2 months ago
    Your error is from the first parenthesis:
    with Flow(
        getenv('FLOW_NAME')
    ) as flow:
        run_snowflake_query = snowflake_query().map(
              query=sql_queries,
              account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
              user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
              password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
              warehouse=cfg.snowflake_warehouse,
              )
    which attempts to call the task. It should just be:
    snowflake_query.map()
    p

    Paul Stark

    2 months ago
    @Kevin Kho Success!!! Thank you very much for your help! Had to make 1 small change with the warehouse and was able to get a successful run. Posting final snippet for posterity 😃
    snowflake_query = SnowflakeQuery(
        max_retries=cfg.retry_max,
        retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
        warehouse=cfg.snowflake_warehouse,
    )
    
    sql_queries = cfg.sql_query.split(';')
    
    with Flow(
        getenv('FLOW_NAME')
    ) as flow:
        run_snowflake_query = snowflake_query.map(
              query=sql_queries,
              account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
              user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
              password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
              )
    Kevin Kho

    Kevin Kho

    2 months ago