https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • n

    Nikolaus Landgraf

    07/06/2022, 2:12 PM
    Hi, we just witnessed an error when running a prefect task locally on a windows 11 machine. This error is not popping up on mac. Do you know how to fix that?
    sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: json_each
    [SQL: INSERT INTO flow_run_notification_queue (flow_run_notification_policy_id, flow_run_state_id) SELECT flow_run_notification_policy.id, CAST(? AS CHAR(36)) AS anon_1
    FROM flow_run_notification_policy
    WHERE flow_run_notification_policy.is_active IS 1 AND (flow_run_notification_policy.state_names = ? OR EXISTS (SELECT 1
    FROM json_each(flow_run_notification_policy.state_names) AS json_each
    WHERE json_each.value IN (?))) AND (flow_run_notification_policy.tags = ? OR EXISTS (SELECT 1
    FROM json_each(flow_run_notification_policy.tags) AS json_each
    WHERE json_each.value IN (SELECT 1 FROM (SELECT 1) WHERE 1!=1)))]
    [parameters: ('8d01934d-c9c8-4f23-a3af-ec03ee1586a0', '[]', 'Pending', '[]')]
    (Background on this error at: <https://sqlalche.me/e/14/e3q8>)
    m
    m
    k
    • 4
    • 11
  • m

    Muddassir Shaikh

    07/06/2022, 2:15 PM
    Hi i am unable to start my Prefect Server, getting below error:
    graphql_1   | ValueError: Uniqueness violation.
    graphql_1   |
    graphql_1   | The above exception was the direct cause of the following exception:
    graphql_1   |
    graphql_1   | Traceback (most recent call last):
    graphql_1   |   File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 674, in await_completed
    graphql_1   |     return await completed
    graphql_1   |   File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 659, in await_result
    graphql_1   |     return_type, field_nodes, info, path, await result
    graphql_1   |   File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 733, in complete_value
    graphql_1   |     raise result
    graphql_1   |   File "/usr/local/lib/python3.7/site-packages/graphql/execution/execute.py", line 628, in await_result
    graphql_1   |     return await result
    graphql_1   |   File "/prefect-server/src/prefect_server/graphql/extensions.py", line 52, in resolve
    graphql_1   |     result = await result
    graphql_1   |   File "/prefect-server/src/prefect_server/graphql/tenants.py", line 14, in resolve_create_tenant
    graphql_1   |     "id": await api.tenants.create_tenant(name=input["name"], slug=input["slug"])
    graphql_1   |   File "/prefect-server/src/prefect_server/api/tenants.py", line 29, in create_tenant
    graphql_1   |     tenant_id = await models.Tenant(name=name, slug=slug).insert()
    graphql_1   |   File "/prefect-server/src/prefect_server/database/orm.py", line 222, in insert
    graphql_1   |     insert_mutation_name=self.__root_fields__.get("insert"),
    graphql_1   |   File "/prefect-server/src/prefect_server/database/hasura.py", line 237, in insert
    graphql_1   |     result = await self.execute_mutations_in_transaction(mutations=[graphql])
    graphql_1   |   File "/prefect-server/src/prefect_server/database/hasura.py", line 165, in execute_mutations_in_transaction
    graphql_1   |     as_box=as_box,
    graphql_1   |   File "/prefect-server/src/prefect_server/database/hasura.py", line 89, in execute
    graphql_1   |     raise ValueError("Uniqueness violation.")
    graphql_1   | graphql.error.graphql_error.GraphQLError: Uniqueness violation.
    graphql_1   |
    graphql_1   | GraphQL request:2:3
    graphql_1   | 1 | mutation ($input: create_tenant_input!) {
    graphql_1   | 2 |   create_tenant(input: $input) {
    graphql_1   |   |   ^
    graphql_1   | 3 |     id
    k
    • 2
    • 29
  • v

    Vadym Dytyniak

    07/06/2022, 2:23 PM
    Hi. Is it possible to specify disk space for KubernetesRun run config?
    k
    • 2
    • 2
  • i

    Ibrahim Sherif

    07/06/2022, 3:47 PM
    Hello How can I rename a task output ? Is it possible to use with it a task nout=3 ?
    k
    • 2
    • 10
  • k

    kevin

    07/06/2022, 5:34 PM
    Hey guys, I want to pull down a
    .csv
    file from a google cloud bucket. It seems like this task is the correct way to go? An alternative I can see is just implementing a python callable that uses the python library for google cloud storage. I'd like some feedback if I'm approaching this problem correctly?
    k
    • 2
    • 4
  • a

    Amogh Kulkarni

    07/06/2022, 6:21 PM
    Hi Team. In prefect 2.0, is there a way to visualize the flows like how we could do in prefect 1.0 with flow.visualize()?
    ✅ 1
    k
    m
    a
    • 4
    • 9
  • j

    Josh Paulin

    07/06/2022, 6:50 PM
    Hello. I’m trying to figure out why my configured results don’t seem to be respected/used for caching between flow runs. I have a task decorated to use checkpointing. On each run of the flow, I see the task complete with a
    Success
    state, and the results file has been overwritten. If I’m understanding correctly this should be completing with
    Cached
    . Any suggestions?
    k
    • 2
    • 2
  • j

    Jon Ruhnke

    07/06/2022, 7:19 PM
    Is the "sql_server" optional dependency (for writing to MSSQL) available in Prefect 2.0? If not, what are alternatives, do I just stick with 1.0 until things like that get implemented into 2.0, or is there a new way to go about connecting to MSSQL?
    k
    • 2
    • 2
  • k

    Kyle McChesney

    07/06/2022, 8:46 PM
    is it possible to configure the ecs agent so that it sets the flow run container command to include
    -l
    (I want a login shell). Seems like currently its passing the command like so
    overrides
        containerOverrides
            command": [
                "/bin/sh",
                "-c",
                "prefect execute flow-run"
            ]
    Also, possible or not, is this command stable / is it safe to customize this?
    k
    • 2
    • 9
  • r

    Ryan Sattler

    07/07/2022, 4:37 AM
    In Prefect 2, is the default storage option set per-workspace or the same across all workspaces?
    ✅ 3
    a
    t
    • 3
    • 3
  • y

    yu zeng

    07/07/2022, 5:16 AM
    hi, i got a flow success event before the time all task really successed. in the bellow pic, i receive a flow success at 1.09 pm , but still got a task success at 1.13 pm. is this a bug?
    ✅ 1
    a
    • 2
    • 1
  • s

    Shivam Bhatia

    07/07/2022, 6:34 AM
    Hi what is the alternative for upstream_tasks in prefect 2.0
    ✅ 1
    a
    • 2
    • 1
  • a

    Alvaro Durán Tovar

    07/07/2022, 7:33 AM
    Hi! I have a flow that executes a
    RunNamespacedJob
    task. Because of a known bug the way I'm obtaining the logs are via
    ReadNamespacedPodLogs
    . All that working well. Recently I changed the pod spec to run the application as an
    initContainer
    then some docker image to capture results as the actual
    container
    . So I have an image that generates content and logs in the
    ìnitContainer
    and a
    container
    that process the produced content (upload to gcs basically). Problem: the logs from the initContainer are not being captured by prefect, only the logs from the container. Any recommendation? 🙏
    ✅ 1
    a
    • 2
    • 16
  • r

    Rainer Schülke

    07/07/2022, 8:04 AM
    Good morning 🙂 So I have been working on this really extensive ETL and finally uploaded it to the cloud but there it is failing... I have a parameter like this:
    weeks = Parameter("weeks", required=False, default=None)
    Afterwards there is a case block where either the last actual weeks is getting pulled or set to the provided Parameter. Locally it's working like a charm but when I want to execute the flow within the cloud, I get an error for the week task:
    The following error messages were provided by the GraphQL server: INTERNAL_SERVER_ERROR: Variable "$input" got invalid value null at "input.states[0].task_run_id"; Expected non-nullable type UUID! not to be null. The GraphQL query was: mutation($input: set_task_run_states_input!) { set_task_run_states(input: $input) { states { status message id } } } The passed variables were: {"input": {"states": [{"state": {"context": {"tags": []}, "cached_inputs": {}, "message": "Starting task run.", "_result": {"__version__": "0.14.10", "type": "NoResultType"}, "__version__": "0.14.10", "type": "Running"}, "task_run_id": null, "version": null}]}}
    Task 'weeks': Finished task run for task with final state: 'ClientFailed' Do you know what might be the problem here? Why is it stated that I got a null value? It should be None.
    ✅ 2
    a
    • 2
    • 10
  • d

    Dennis Hinnenkamp

    07/07/2022, 9:06 AM
    Hi, I'm trying to list files from a S3 bucket but don't know how to get the result of the task. Can anyone help here? I would have thought to get an array of file names instead of a task object.
    ✅ 1
    a
    • 2
    • 4
  • c

    Christian Vogel

    07/07/2022, 10:59 AM
    Hi Prefect Community, I am currently receiving an error when using a local (on my pc) prefect agent which pulls from a work queue in the the prefect cloud. The error only occurs when I try to use the RayTaskRunner:
    (begin_task_run pid=141324) ImportError: cannot import name 'SubprocessFlowRunner' from partially initialized module 'prefect.flow_runners' (most likely due to a circular import) (/home/christian/Documents/ray_and_prefect/env/lib/python3.9/site-packages/prefect/flow_runners/__init__.py)
    I am using the following dependencies: prefect==2.0b7 prefect-ray==0.1.0 ray==1.13.0 Apparently I am doing something wrong with my dependencies or when I am importing them. Do you have any idea?
    ✅ 1
    a
    m
    • 3
    • 12
  • m

    Marcin Grzybowski

    07/07/2022, 12:10 PM
    Hi, i'm trying to run flow locally from ide (vs code) and i got this: what have I done wrong ?
    ✅ 1
    a
    • 2
    • 9
  • r

    Robert Kowalski

    07/07/2022, 1:06 PM
    Hi, quick question, why
    map
    function fail? and works when
    map
    was replace with
    bind
    ?
    from prefect import task, Flow, unmapped
    
    
    @task
    def first():
        return range(10)
    
    
    @task
    def second():
        return True
    
    
    @task
    def third(numbers):
        print(numbers)
    
    
    with Flow(name='test') as flow:
        numbers = first()
        second_result = second()
    
        # third.bind(numbers=numbers, upstream_tasks=[second_result])
        third.map(numbers=unmapped(numbers), upstream_tasks=[second_result])
    
    flow.run()
    k
    • 2
    • 23
  • k

    Kyle McChesney

    07/07/2022, 2:38 PM
    Is there a docker-compose.yml file out there for running prefect server. We are currently on 0.15 and I want to upgrade to 1.2.3. When I first set up server there was just a single compose file, I can’t seem to find that anymore
    m
    • 2
    • 2
  • k

    Keith Veleba

    07/07/2022, 4:29 PM
    Hello! Running into an issue I don't fully understand. I have a series of flows that have been running for a couple of months without incident. We're running all of our production on 1.2.0-python3.8 for the ECS Agent and for all ECSRun instances. Our flows use S3 storage. Nothing has been purposely or visibly changed with our Prefect installation- no new flow deployments, or changes to our AWS infrastructure. Our flows, which do little more than kick off AWS Batch jobs via the BatchSubmit task are now consistently failing with the following error on the BatchSubmit invocation:
    Unexpected error: AttributeError("'S3Result' object has no attribute 'upload_options'")
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 930, in get_task_run_state
        result = self.result.write(value, **formatting_kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/results/s3_result.py", line 89, in write
        ExtraArgs=self.upload_options,
    AttributeError: 'S3Result' object has no attribute 'upload_options'
    Attached is one of the flows that are failing. Does the task running code record execution state back to the storage bucket? Thanks in advance!
    run_delta_load_batch_job.py
    k
    • 2
    • 25
  • b

    Britt Evans

    07/07/2022, 4:35 PM
    Hi, in 2.0 I was wondering if it’s possible for the scheduler to prevent a single flow from “Running” concurrently? For example, if I use an IntervalSchedule with an interval of 5 minutes, and my first flow run takes 8 minutes, can the scheduler skip or wait until the first flow finishes before starting the new flow run?
    k
    r
    • 3
    • 7
  • r

    Rajvir Jhawar

    07/07/2022, 6:55 PM
    Prefect 2 question here: Are an agent and work queue necessary in order to run a k8 job? For instance if i am using prefect cloud and create a deployment with kubernetes flow runner. Running the flow via the GUI or using "deployment run" from vm/local machine without an agent and queue setup results in no flow being run and a state of late. The prefect docs just say you need: Remote storage and
    kubectl
    configured to connect to a cluster
    k
    • 2
    • 3
  • x

    Xavier Witdouck

    07/07/2022, 7:04 PM
    Hi all, how do you access the labels of the agent running a flow at runtime? I assumed there would be something in the context, but that doesn’t seem to be the case? Is there any agent level information you can access from inside a flow or task?
    k
    • 2
    • 1
  • m

    Matthew Seligson

    07/07/2022, 7:18 PM
    I have a running flow where a number of tasks are in a paused state. I’d like to programmatically approve all of them. What’s the best way to do that? I find that when I programmatically set all of their states to resume, I get varying behavior. Sometimes the tasks stay resumed and never get running. Any ideas why?
    k
    • 2
    • 20
  • m

    Michal Baumgartner

    07/07/2022, 7:29 PM
    Hi, I'm going through v2 docs and was wondering if module storage from v1 (
    prefect.storage.Module
    ) would be available in the future releases. We're running Prefect + Dask setup on k8s from a monorepo (i.e. agents and dask workers deployments run the same docker image & tag), where flows are sourced from Python modules (and registered before agents start up during a new release), therefore I'd prefer to not use any object storage for storing flows or results if possible
    ➕ 1
    ✅ 1
    m
    m
    • 3
    • 7
  • a

    Adam

    07/07/2022, 7:46 PM
    hey folks, I have a hopefully rather simple question also prefect noob. I’m trying to test the cacheing/restart features. For simplicity, I have 5 csv’s I’m downloading within 1 task. If the first csv is successful but then the task fails on the 2nd csv, I would like to restart the task where it left off and not need to re-download the 1st csv. From my understanding I need to leverage the results feature? I understand there’s a parameter for max retries but when retrying, is there any configuration needed to inform the task to start from the 2nd csv? Also if the max retries fails, I’d like the ability to re-start manually (presumably after fixing x bug that stopped the 2nd csv from downloading) again without having to re-download the 1st csv
    k
    a
    • 3
    • 6
  • a

    Alex Tam

    07/07/2022, 7:54 PM
    So I've been using Prefect Cloud 2.0 on my personal account for a few weeks now, and we got our company account setup earlier this week. I can't figure out to login to beta.prefect.io using the company account. I can switch between mine and the companies with normal prefect.io. Does it need to be enabled and/or is it like that because its still in beta?
    ✅ 1
    k
    n
    • 3
    • 2
  • r

    redsquare

    07/07/2022, 7:58 PM
    Is the new 2.0 release coming today?
    ✅ 1
    k
    • 2
    • 1
  • i

    Ifeanyi Okwuchi

    07/07/2022, 8:00 PM
    Hello all, I'm having difficulties figuring out what this error means
    Unexpected error: KeyError(0)
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 569, in get_flow_run_state
        executors.prepare_upstream_states_for_mapping(
      File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 682, in prepare_upstream_states_for_mapping
        value = upstream_state.result[i]
    KeyError: 0
    k
    • 2
    • 4
  • p

    Paul Stark

    07/07/2022, 10:08 PM
    I’m trying to run the following code snippet
    snowflake_query = SnowflakeQuery(
        max_retries=cfg.retry_max,
        retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
    )
    
    sql_queries = cfg.sql_query.split(';')
    
    with Flow(
        getenv('FLOW_NAME')
    ) as flow:
        run_snowflake_query = snowflake_query(
            account=PrefectSecret("SNOWFLAKE_ACCOUNT"),
            user=PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}"),
            password=PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}"),
            warehouse=cfg.snowflake_warehouse,
        ).map(query=sql_queries)
    sql_queries is getting put into a list since I can loop through it….Any thoughts on what I am doing incorrectly?
    k
    • 2
    • 11
Powered by Linen
Title
p

Paul Stark

07/07/2022, 10:08 PM
I’m trying to run the following code snippet
snowflake_query = SnowflakeQuery(
    max_retries=cfg.retry_max,
    retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
)

sql_queries = cfg.sql_query.split(';')

with Flow(
    getenv('FLOW_NAME')
) as flow:
    run_snowflake_query = snowflake_query(
        account=PrefectSecret("SNOWFLAKE_ACCOUNT"),
        user=PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}"),
        password=PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}"),
        warehouse=cfg.snowflake_warehouse,
    ).map(query=sql_queries)
sql_queries is getting put into a list since I can loop through it….Any thoughts on what I am doing incorrectly?
k

Kevin Kho

07/07/2022, 10:12 PM
Hey Paul, you are doing 3 calls.
snowflake_query = SnowflakeQuery(
    max_retries=cfg.retry_max,
    retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
)
is already an init and then
snowflake_query(
        account=PrefectSecret("SNOWFLAKE_ACCOUNT"),
        user=PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}"),
        password=PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}"),
        warehouse=cfg.snowflake_warehouse,
    ).map(query=sql_queries)
is 2 more. I know the attempt here is to initialize with secrets but you can’t do that because init is called during registration and secrets are fed in during runtime. Instead, you pass the account, user, and password along with the map by using
unmapped
Also, when you get the chance, it would help if you could move the error to the thread to keep the main channel cleaner
p

Paul Stark

07/07/2022, 10:14 PM
Moving here per request. Error message below :-)
Task 'SnowflakeQuery': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/tasks.py", line 456, in method
    return run_method(self, *args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/prefect/tasks/snowflake/snowflake.py", line 152, in run
    raise ValueError("A query string must be provided")
ValueError: A query string must be provided
k

Kevin Kho

07/07/2022, 10:15 PM
Thanks! This error is from the call before the map
p

Paul Stark

07/07/2022, 10:16 PM
Thanks….but I have another process that uses the same logic that works correctly just not trying to run multiple queries….And this process works if I just try a single query without the map
k

Kevin Kho

07/07/2022, 10:19 PM
Ah ok I was a bit off.
snowflake_query = SnowflakeQuery(
    max_retries=cfg.retry_max,
    retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
)

with Flow(
    getenv('FLOW_NAME')
) as flow:
    run_snowflake_query = snowflake_query(
        query=query,
        account=PrefectSecret("SNOWFLAKE_ACCOUNT"),
        user=PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}"),
        password=PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}"),
        warehouse=cfg.snowflake_warehouse,
    )
will work because it’s just called once after init, but the map is a call also so it should be:
with Flow(
    getenv('FLOW_NAME')
) as flow:
    run_snowflake_query = snowflake_query.map(
        query=list_of_queries,
        account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
        user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
        password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
        warehouse=unmapped(cfg.snowflake_warehouse),
    )
Basically you don’t do:
@task
def abc(x):
    return x+1

with Flow(..) as flow:
    abc().map([1,2,3])
The right syntax is:
with Flow(..) as flow:
    abc.map([1,2,3)
p

Paul Stark

07/07/2022, 10:42 PM
@Kevin Kho thanks for the clarification…that helps to clarify things! Using the above I am still getting an error this time about needing an account
snowflake_query = SnowflakeQuery(
    max_retries=cfg.retry_max,
    retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
)

sql_queries = cfg.sql_query.split(';')

with Flow(
    getenv('FLOW_NAME')
) as flow:
    run_snowflake_query = snowflake_query().map(
          query=sql_queries,
          account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
          user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
          password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
          warehouse=cfg.snowflake_warehouse,
          )
Error
Task 'SnowflakeQuery': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/tasks.py", line 456, in method
    return run_method(self, *args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/prefect/tasks/snowflake/snowflake.py", line 148, in run
    raise ValueError("An account must be provided")
ValueError: An account must be provided
k

Kevin Kho

07/07/2022, 10:46 PM
Your error is from the first parenthesis:
with Flow(
    getenv('FLOW_NAME')
) as flow:
    run_snowflake_query = snowflake_query().map(
          query=sql_queries,
          account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
          user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
          password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
          warehouse=cfg.snowflake_warehouse,
          )
which attempts to call the task. It should just be:
snowflake_query.map()
p

Paul Stark

07/07/2022, 11:02 PM
@Kevin Kho Success!!! Thank you very much for your help! Had to make 1 small change with the warehouse and was able to get a successful run. Posting final snippet for posterity :-)
snowflake_query = SnowflakeQuery(
    max_retries=cfg.retry_max,
    retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
    warehouse=cfg.snowflake_warehouse,
)

sql_queries = cfg.sql_query.split(';')

with Flow(
    getenv('FLOW_NAME')
) as flow:
    run_snowflake_query = snowflake_query.map(
          query=sql_queries,
          account=unmapped(PrefectSecret("SNOWFLAKE_ACCOUNT")),
          user=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}")),
          password=unmapped(PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}")),
          )
k

Kevin Kho

07/07/2022, 11:03 PM
Nice!
:thank-you: 1
View count: 6