https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • f

    Florian Kühnlenz

    11/08/2022, 2:19 PM
    hi, is there an good way to have exponential delay in a task retry (v1)?
    m
    • 2
    • 2
  • g

    Gordon Silvera

    11/08/2022, 2:51 PM
    Hi, I'm hitting a recurring error when trying to run DbtShellTask:
    AttributeError: module 'prefect.tasks' has no attribute 'dbt'
    . I've tried to run this locally and within the jupyter-scipy Docker image. Any thoughts why Prefect cannot find the module?
    # dbt_shell_task.py
    
    import sys
    import prefect
    from prefect import task, flow, get_run_logger
    
    
    @flow
    def dbt_flow(cmd='dbt run'):
    
        # Execute specified command
        task = prefect.tasks.dbt.dbt.DbtShellTask(
            command=cmd,
            profile_name='default',
            environment='Development',
            dbt_kwargs={'type': 'bigquery'},
            overwrite_profiles=False,
            profiles_dir='/home/jovyan/.dbt/profiles.yml'
        )
        logger = get_run_logger()
        <http://logger.info|logger.info>("Command Run: %s!", name)
        return task
    
    if __name__ == "__main__":
        cmd = sys.argv[1]
        dbt_flow(cmd)
    # packages
    prefect==2.6.6
    prefect-dbt==0.2.4
    prefect-shell==0.1.3
    dbt-bigquery==1.3.0
    dbt-core==1.3.0
    dbt-extractor==0.4.1
    ✅ 1
    j
    • 2
    • 2
  • j

    Jared Robbins

    11/08/2022, 3:45 PM
    Is there a best practice for removing data in the orion db older than x amount of time?
    ✅ 1
    ➕ 2
    j
    m
    • 3
    • 2
  • a

    Axel Sundberg

    11/08/2022, 3:55 PM
    Hi! I'm on Prefect 1.2 and use function tasks with
    @task
    annotation. I have defined two tasks in a flow like this:
    @task(name="Backup product catalogue", log_stdout=True)
    def backup_task():
        backup_product_catalogue_to_gcs()
    
    
    @task(name="Update Product Catalogue", log_stdout=True, trigger=triggers.all_finished)
    def upsert_pack_products_task():
        upsert_pack_products()
    
    
    with Flow(
        name="Product catalogue",
        run_config=get_run_config(),
        storage=get_storage("product_catalogue/flow.py"),
    ) as product_catalogue_flow:
        backup_task()
        upsert_pack_products_task(upstream_tasks=[backup_task])
    I renamed the function name
    upsert_pack_products_task
    and re-deployed. Now I can see two
    Backup product catalogue
    in my flow on Prefect and they both run (see image) when the flow is triggered. How can I remove that extra task? And how can I debug what is going on?
    ✅ 2
    • 1
    • 1
  • s

    Sunjay

    11/08/2022, 4:13 PM
    Can we set a task to success state, ie. based on a condition execute a task else set it to success and trigger the downstream?
    ✅ 1
    t
    j
    • 3
    • 4
  • j

    James Brady

    11/08/2022, 5:19 PM
    Hello all, I'm running a Prefect flow in a Kubernetes cluster via DaskTaskRunner. My task is erroring out with
    Crash detected! Execution was interrupted by an unexpected exception.
    (full logs in thread). I'm trying to figure out the best way to debug this further. In general – and to help diagnose this particular case – I'd like to shuttle logs to our DataDog instance: I found this discussion on the topic, but is there something more recent / more complete for Prefect v2? Aside from getting logs out, any other ideas for diagnosing this terse error?
    m
    • 2
    • 3
  • m

    Madison Schott

    11/08/2022, 5:23 PM
    Getting this error with a fivetran task, even though I defined it exactly as in the docs (https://github.com/fivetran/prefect-fivetran). Any ideas?
    sync = await fivetran_sync_flow(
                    ^
    SyntaxError: 'await' outside async function
    ✅ 1
    r
    • 2
    • 5
  • b

    Blake Stefansen

    11/08/2022, 7:27 PM
    Hey everyone, I seem to be getting an
    Orion logging error
    with 2.6.6 for our k8 jobs Will post logs in the thread
    m
    • 2
    • 3
  • d

    David Beck

    11/08/2022, 8:08 PM
    Hi all! I was curious if there is anything in the works for creating add-ons to the prefect_azure collection for interacting/accessing Azure Key Vaults. I know I can go ahead and create it myself, though I didn't want to reinvent the wheel if it was forthcoming. 🙂
    ✅ 1
    r
    j
    • 3
    • 15
  • p

    Paco Ibañez

    11/08/2022, 10:43 PM
    Hello! I have added a slack notification for my flow but when the flow times out I don't get the notification, what state does timed out map to? I have selected all states from the notification menu
    a
    • 2
    • 2
  • r

    Rohith

    11/09/2022, 8:40 AM
    who can I talk to with a query regarding prefect-sqlalchemy
    ✅ 1
    j
    • 2
    • 1
  • p

    Pierre Monico

    11/09/2022, 9:41 AM
    Side question: is this still the official Slack for Prefect after moving to 2.0? I was here before but not sure where to ask those questions after we migrate 🙂
    ✅ 1
    o
    j
    • 3
    • 6
  • t

    Thomas Pedersen

    11/09/2022, 10:05 AM
    When flows crash (according to their logs), Orion UI keeps showing the flows as running, and their run time keeps increasing. If the flow isn't running on the agent, shouldn't it show as "Failed" and not running? Still looking into what actually causes the crash - this is more about how Prefect handles them. We do have flows that runs successful as well, this is just a single flow or two crashing every time.
    t
    m
    • 3
    • 4
  • d

    Dan Wise

    11/09/2022, 11:48 AM
    Hi does anyone know in Prefect 2 why there would be a 30 second delay between the agent picking up a flow run and starting to execute it? Many thanks.
    ✅ 1
    t
    t
    +2
    • 5
    • 11
  • t

    Toshali Narula

    11/09/2022, 2:20 PM
    Hi, is there any way to add description of a task in schematic (flow visualisation)
    b
    k
    • 3
    • 4
  • r

    Revanth

    11/09/2022, 2:33 PM
    Hi , is there a way to deploy multiple data pipelines(one for daily and other pipeline for weekly etc) , would appreciate the help , thanks in advance
    ✅ 1
    p
    • 2
    • 14
  • v

    Vadym Dytyniak

    11/09/2022, 3:09 PM
    Hi. What I should use instead of wait_for_flow_run(Prefect 1) task in Prefect 2?
    ✅ 1
    o
    n
    • 3
    • 25
  • k

    Khuyen Tran

    11/09/2022, 4:59 PM
    In Prefect Live at 3PM Eastern today (Nov 9th), we will host @ben from Club 42. We will talk about how he uses Prefect to coordinate workflows in his college and his experience transitioning from Prefect 1.0 to 2.0. Come join us live on Twitch.
    :marvin: 3
    :youtube: 2
    🙌 3
    👍 3
    :party-parrot: 3
    :blob-attention-gif: 4
    • 1
    • 1
  • a

    Alexandru Anghel

    11/09/2022, 5:33 PM
    Hi guys, Is there a way to import a function form another file as a task? I was trying something like this:
    from tasks.date import get_start_date
    @task(name='Generate start date', retries=2, retry_delay_seconds=10)(get_start_date)
    It fails with `RuntimeError: Tasks cannot be run outside of a flow. To call the underlying task function outside of a flow use
    task.fn()
    .` Thanks!
    ✅ 1
    r
    • 2
    • 2
  • k

    Kingsley

    11/09/2022, 7:22 PM
    There seems to be a problem with Prefect v1 where new schedules are not being added to flows. Is there an outage on our side?
    m
    • 2
    • 3
  • t

    Tim Enders

    11/09/2022, 7:40 PM
    Can anyone point me towards documentation on setting up a profile to attach to the cloud API from your local machine? Thank you
    ✅ 1
    m
    n
    • 3
    • 3
  • m

    Madison Schott

    11/09/2022, 10:07 PM
    What's the best way to define dependencies in 2.0?
    ✅ 1
    m
    • 2
    • 1
  • s

    Scott McCallen

    11/09/2022, 10:25 PM
    Hello! I am using Prefect 1.x and store my flows using the Git storage option. I’m trying to “load additional files” from the repo using the steps from the storage docs under Loading Additional Files with Git Storage. However, the file I am looking for isn’t on the file system and I get a “FileNotFoundError: [Errno 2] No such file or directory” error. Looking at the source code for Prefect 1.4.0, the Git storage implementation uses a ‘TemporaryGitRepo’ object to get the flow definition - see line 148 here https://github.com/PrefectHQ/prefect/blob/1.4.0/src/prefect/storage/git.py#L148) which seems like it deletes the repo contents from the file system after reading the flow definition file. I think I’m following the docs correctly, but I might be missing something. I’d appreciate any help and/or suggestions on what to try. Thanks in advance!
    ✅ 1
    r
    b
    • 3
    • 2
  • t

    Tim Ricablanca

    11/09/2022, 11:55 PM
    Hi! What’s the best way to see error logs or otherwise debug an MS Teams notification? Running Prefect 2.6.4 and I see notification webhooks coming across for flow
    Running
    states but not for
    Failed
    — details in thread:
    ✅ 1
    m
    • 2
    • 23
  • k

    Kishan

    11/10/2022, 2:44 AM
    I was able to setup a prefect agent in kubernetes (GKE), and now I'm a little confused on next steps. If you are to create a flow deployment, does that code deployment need to be in a storage block? I'm assuming it will have to be. I tried a flow run without the storage block and got an error saying it couldn't find the flow (and it referenced my local computer's path).
    ✅ 1
    m
    r
    • 3
    • 2
  • k

    Kishan

    11/10/2022, 2:47 AM
    Also, is it possible to keep the agent separate from all the code dependencies by running flows in Docker containers and building all the dependencies within the Docker container? If so, is there a tutorial fo rthis?
    ✅ 1
    r
    • 2
    • 1
  • m

    merlin

    11/10/2022, 4:46 AM
    Returning Manual State from a Task I want to run a scheduled flow that deals with an intermittent service, and I want to handle the exceptions gracefully. The common idiom seems to be raising
    ValueError
    to trigger a failure, and thats how to manage the state of subsequent tasks or flows. Here's the thing: I don't want to see a stack trace unless it is some new failure that I need to troubleshoot. For the expected exceptions I want to post an error the log and proceed with my flow according to logic I design. I'm having trouble returning a manual state of Cancelled, concrete example and logs posted in thread. Can anyone guide me toward causing the entire flow to be cancelled without raising an exception?
    ✅ 1
    k
    m
    m
    • 4
    • 13
  • r

    Rohith

    11/10/2022, 9:56 AM
    is there a way to execute long sql queries from a .sql file with sqlalchemy_execute ?
    👀 1
    k
    • 2
    • 1
  • r

    redsquare

    11/10/2022, 10:01 AM
    Can we get a /health + /stats endpoint on an agent so we can monitor and cycle if needed
    c
    • 2
    • 15
  • d

    Dan Wise

    11/10/2022, 11:18 AM
    Hi, am struggling to find a standard way to handle flow state changes in Prefect 2. the state handler approach in v1 is deprecated. I need to define a decorator I assume which applies the flow and then applies some kind of state handler on top of that. Does anyone have some code samples on how to do that please?
    ✅ 1
    c
    k
    • 3
    • 19
Powered by Linen
Title
d

Dan Wise

11/10/2022, 11:18 AM
Hi, am struggling to find a standard way to handle flow state changes in Prefect 2. the state handler approach in v1 is deprecated. I need to define a decorator I assume which applies the flow and then applies some kind of state handler on top of that. Does anyone have some code samples on how to do that please?
✅ 1
c

Christopher Boyd

11/10/2022, 3:03 PM
Hi Dan, I think notifications are what you are looking for? https://docs.prefect.io/ui/notifications/
k

Khuyen Tran

11/10/2022, 4:04 PM
This might be something you are looking for: https://khuyentran1401.github.io/prefect-alert/
Right now this collection only supports alerting on failure, but we plan to support handling any states in the future
d

Dan Wise

11/10/2022, 4:34 PM
Thanks. I tried prefect-alert and it works fine when running as per the example directly from a script. However when I deploy then run with an agent the decorator is not triggered.
Also SlackWebhook uses the old incoming webhook alerts. We use messages posted via an app using the slack client so was wondering it a notification type will be created for firing messages to Slack that way
Also, in the alert_on_failure decorator, you define the block_type to be an object instance of a AppriseNotificationBlock but I believe you are expecting a class instance
And thanks @Christopher Boyd I have seen Notifications. But I was looking for a decorator much like @Khuyen Tran was referring to.
k

Khuyen Tran

11/10/2022, 4:49 PM
When I deploy then run with an agent the decorator is not triggered
Which version of prefect-alert are you in?
We use messages posted via an app using the slack client so was wondering it a notification type will be created for firing messages to Slack that way
You mean creating notification using something that is not a notification block?
In the alert_on_failure decorator, you define the block_type to be an object instance of a AppriseNotificationBlock but I believe you are expecting a class instance
Could you be more specific about that you mean by object instance and class instance?
d

Dan Wise

11/10/2022, 4:55 PM
I'm on 0.1.3 of prefect-alert
def alert_on_failure(block_type: AppriseNotificationBlock, block_name: str):
This indicates block_type should be e.g. an instance of a SlackWebhook or similar derived class
According to your type hint
However it actually want you to pass a class instance so SlackWebhook not SlackWebhook("my-hook-name")
So the type hint is telling you one thing but the actuality is you want a different type passed to the hint
This is highlighted nicely in PyCharm for instance
Of course doesn't stop the code from working
To fix the type hint, use:def alert_on_failure(block_type:Type[AppriseNotificationBlock], block_name: str):
k

Khuyen Tran

11/10/2022, 7:41 PM
Thanks for the feedback. I’ll make some changes to prefect-alert and keep you updated
:gratitude-thank-you: 1
View count: 3