https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • t

    Tom Manterfield

    05/13/2022, 11:31 AM
    Hey, just saw the announcement for
    2.0b4
    , some great stuff in there. One thing that caught my eye was:
    Futures from async tasks in sync flows are now marked as synchronous
    The docs section for async tasks just says ‘Coming soon’. Is this a feature that’s already available and the docs are out of date or am I misunderstanding what is meant here?
    a
    12 replies · 2 participants
  • g

    Guilherme Petris

    05/13/2022, 11:35 AM
    Hey peeps! I’m currently trying it out some tests here to be ran with prefect cloud. When i’m using the flow.run() in my machine just using a LocalRun and Local storage, everything seems to run fine, but when i try to register the flow and run through the UI i get back the ModuleNotFoundError(“No module name ‘/Users/username’“). I’ve already searched for it’s solution back in the discourse page,(https://discourse.prefect.io/t/when-i-run-my-flow-i-see-an-error-failed-to-load-and-exe[…]derror-no-module-named-users-username-what-is-happening/33) but none of the solutions seems to work, even when i specify the module within the agent. The module that i’m using is in the same folder structure and everything seems to work when running directly from my IDE. Can’t figure it out what’s happening 🤔
    a
    10 replies · 2 participants
  • a

    Anurag Bajpai

    05/13/2022, 12:53 PM
    We'd like to integrate Sentry with our Prefect flows as described here. However, the approach that has been taken here has been to have the sentry initialisation done in a separate task that is called at the beginning of each flow. Am i correct in believing that this will only work for LocalExecutor and not for LocalDaskExecutor or DaskExecutor? If so, what would be the best practice to have the sentry initialisation done for those executors? Calling the initialisation as a function at the beginning of each task is technically feasible but clunky. Would a Task subclass help here? Could the initialisation be put in a task state handler?
    ✅ 1
    a
    k
    +1
    10 replies · 4 participants
  • n

    Naga Sravika Bodapati

    05/13/2022, 2:00 PM
    Seeing this error: can't pickle CompiledFFI objects , can someone help what might be the issue?
    ✅ 1
    a
    k
    23 replies · 3 participants
  • j

    Jessica Smith

    05/13/2022, 2:33 PM
    can GraphQl be used to invite users to your tenant?
    k
    3 replies · 2 participants
  • j

    John Kang

    05/13/2022, 2:44 PM
    Hi all, totally random question but are there any books written about Prefect? I know some popular (like pytest) python libraries have books written on them. Not sure if Prefect had one or not.
    k
    a
    7 replies · 3 participants
  • j

    Jason

    05/13/2022, 3:05 PM
    Morning! When I raise a
    RETRY
    , is it possible to queue an upstream task that had already passed? For example:
    drop_glue_database = Parameter()
    case(drop_glue_database, True):
        drop_stuff()
    
    save_to_parquet()
    If save to parquet fails, I'd like the task to drop and rebuild the database, which is usually unnecessary and parameterized.
    k
    2 replies · 2 participants
  • j

    JK

    05/13/2022, 3:09 PM
    Hello, have been seeing this problem intermittently, any ideas? Prefect endpoint returning 500?
    prefect.exceptions.ClientError: [{'path': ['secret_value'], 'message': 'No value found for the requested key ("xxx") in tenant xxx', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    k
    8 replies · 2 participants
  • a

    Andrew Lawlor

    05/13/2022, 3:10 PM
    how do i set/access environment variables with a dask executor? i had been setting them with my runconfig, but when i switched to a dask executor they arent showing up anymore
    k
    7 replies · 2 participants
  • t

    Taras Svirskyi

    05/13/2022, 4:05 PM
    Hi, Could you please help with configuring DockerFlowRunner to access flows from GCP storage? I’m following Perfect 2.0 tutorial. I configured GCP storage and successfully used it with “universal runner” for non-dockerised flows. But when I try to run flow deployed with a DockerRunner it fails with the following error cause it couldn’t access the GCP(exception is in the thread). I’m using default user authentification for GCP. Setting
    GOOGLE_APPLICATION_CREDENTIALS
    env var to point to a default location (
    /Users/test/.config/gcloud/application_default_credentials.json
    ) before running an agent didn’t help.
    ✅ 1
    k
    14 replies · 2 participants
  • m

    Muhammad Daniyal

    05/13/2022, 5:29 PM
    can we run workflow inside workflow, like
    def workflow1():
       @task
       def abc(): somelogichere
    
       with Flow('one') as f:
          abc()
       
       f.run()
    
    @task
    def xyz(): workflow1()
    
    with Flow('main flow') as f:
       xyz()
    
    f.run()
    j
    k
    2 replies · 3 participants
  • b

    Benny Warlick

    05/13/2022, 6:21 PM
    Hey all, I've been playing with running Prefect 2 in a docker container on Google Compute Engine. I created a basic "hello world" example that uses Github Actions to build the container and deploy to Compute Engine. Let me know if you have any suggestions and if this is helpful: https://github.com/BennyJW/prefect2-docker-gce
    👏 2
    k
    a
    3 replies · 3 participants
  • j

    Jake

    05/13/2022, 6:27 PM
    When we delete flows using the graphql mutation; how do we ensure that all versions of a flow are deleted and not just the most recent / active version?
    k
    4 replies · 2 participants
  • j

    John Kang

    05/13/2022, 6:43 PM
    Hi, I'm looking to integrate with a database and we are exploring putting some data into a CockroachDB database, but I did not see it in the prefect integrations. Do you recommend instead using a PostgreSQL database?
    k
    2 replies · 2 participants
  • m

    Malthe Karbo

    05/13/2022, 6:52 PM
    Hi, posting incase anyone else is having issues in their pytest CICD pipelines after updating to the awesome beta4 release of 2.0: It seems that the
    prefect_test_harness
    was moved into a new module (
    prefect.testing
    ), that is not available in 2.0b4 - even though it is available in the orion branch in the repo. I created an issue at GH also: https://github.com/PrefectHQ/prefect/issues/5787
    :upvote: 2
    k
    1 reply · 2 participants
  • a

    Arnas

    05/13/2022, 7:27 PM
    Hello! Hopefully a simple question: I am trying to create a flow with two schedules (let's say A and B). Is there any way to specify an additional flow label for each schedule (e.g., 'a' for A schedule and 'b' for schedule B)? What I ultimately would like to achieve is to enable the same flow to be run on different agents at different times - maybe there is a better way? P.s. I'm using Prefect 1.0
    k
    2 replies · 2 participants
  • a

    Andrew Lawlor

    05/13/2022, 8:46 PM
    seeing errors like
    Error during execution of task: KeyError(<Thread(Dask-Default-Threads-12-578, started daemon 140412823688960)>)
    when retrying tasks run on dask. is there special configuration i need to do for a retry with dask?
    k
    3 replies · 2 participants
  • f

    Frederick Thomas

    05/13/2022, 9:32 PM
    Hi all, we've just upgraded the python version to 3.10 and re-registered all the flows but we are getting this error, could someone assist?:
    Exception raised while calling state handlers: SystemError('unknown opcode')
    Traceback (most recent call last):
      File "/mnt/data/prefect/venv/lib/python3.8/site-packages/prefect/engine/cloud/flow_runner.py", line 119, in call_runner_target_handlers
        new_state = super().call_runner_target_handlers(
      File "/mnt/data/prefect/venv/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 116, in call_runner_target_handlers
        new_state = handler(self.flow, old_state, new_state) or new_state
      File "/mnt/data/prefect/venv3.10/lib/python3.10/site-packages/prefect/utilities/notifications/notifications.py", line 65, in state_handler
        def state_handler(
    SystemError: unknown opcode
    k
    h
    10 replies · 3 participants
  • s

    Steve s

    05/14/2022, 2:48 PM
    Hi all, I'm seeing a new error crop up in a flow I've been running stably for a few months. The flow is a top-level pipeline that runs a series of
    create_flow_run
    (and
    wait_for_flow_run
    ) tasks. One of these steps is followed up with a
    get_task_run_result
    , which has always worked without issue until today. Now it's throwing this error:
    ValueError: The task result cannot be loaded if it is not finished
    . I'm not seeing how this could be, since I can see in the logs that the upstream task did in fact finish successfully. I tried explicitly setting the result of
    wait_for_flow_run
    as an upstream dependency of
    get_task_run_result
    (which i think shouldn't be needed), and I also tried setting the
    poll_time
    to
    30
    , but still no luck. Does anyone have any ideas?
    a
    k
    24 replies · 3 participants
  • r

    Ramzi

    05/15/2022, 2:44 AM
    I am in the process of building CI/CD pipeline using prefect 2.0 for cloud. I am running into an issue where i get the error:
    You have not configured default storage on the server
    or set a storage to use for this deployment but this deployment is using a 
    Kubernetes flow runner which requires remote storage.
    I have already defined the s3 bucket as the storage in prior steps and make sure to even reset it as the default before hand. I have no problem creating the deployment locally only an issue when running it on github actions.
    m
    a
    5 replies · 3 participants
  • m

    Mikkel Duif

    05/15/2022, 11:07 AM
    Got a questions in regards to handling DST correctly. If i specify the anchor_date in winther time, it will be offset by 1 hour. is there a way to handle this correctly?
    import asyncio
    import pendulum
    from datetime import timedelta
    from prefect.orion.schemas.schedules import IntervalSchedule
    
    winter_schedule = IntervalSchedule(
       interval=timedelta(hours=24),
       anchor_date=pendulum.datetime(2022, 1, 1, 0, 30, 0, tz="Europe/Copenhagen")
    )
    
    summer_schedule = IntervalSchedule(
       interval=timedelta(hours=24),
       anchor_date=pendulum.datetime(2022, 4, 1, 0, 30, 0, tz="Europe/Copenhagen")
    )
    
    
    print(asyncio.run(winter_schedule.get_dates(1))[0])
    print(asyncio.run(summer_schedule.get_dates(1))[0])
    
    >>> "2022-05-16T01:30:00+02:00"
    >>> "2022-05-16T00:30:00+02:00"
    ✅ 1
    a
    m
    10 replies · 3 participants
  • f

    Frank Embleton

    05/15/2022, 11:38 AM
    Hello am new to prefect and just reading through the docs. On the Flows page this is super confusing to me:
    Return a future
    If a flow returns one or more futures, the final state is determined based on the underlying states.
    ```from prefect import task, flow
    @task
    def always_fails_task():
    raise ValueError("I am bad task")
    @task
    def always_succeeds_task():
    return "foo"
    @flow
    def always_succeeds_flow():
    x = always_fails_task()
    y = always_succeeds_task()
    return y```
    What does it mean by futures here? My understanding was that futures are to do with threading and async in Python, of which I see neither here? What am I missing? 🤔
    a
    3 replies · 2 participants
  • r

    Raviraja Ganta

    05/15/2022, 5:25 PM
    I am using some deep learning model inside a task in the flow. The flow is configured with docker agent. It is downloading the model from web everytime I run the flow. How to load the model from local system where the agent is running?
    a
    s
    2 replies · 3 participants
  • n

    Nash Taylor

    05/15/2022, 11:44 PM
    Hey all, I’ve been looking into Prefect as a potential tool for a streaming data pipeline, and I feel like I might be misunderstanding something about the pricing model, so I’m wondering if someone can help me sort it out. After reading [this blog post](https://www.prefect.io/blog/you-no-longer-need-two-separate-systems-for-batch-processing-and-streaming/), I was pretty excited about trying Prefect for this use case. When I looked at the pricing page, I saw that the unit being used to measure usage was task runs. So I guess I have two questions about that: 1. Is it fair to say that an implication of paying for task runs is that one would want to minimize the number of tasks in their flow, so as to get the most “bang for their buck?” This seems counter-intuitive to me; I’m on Page 3 of the Prefect Docs, “Thinking Prefectly”, and one thing I’m pretty sure I know by now is that smaller and more discrete tasks are better. Plus, that blog post (rightfully) pointed out that a Flow is better orchestrated by breaking up the logic into multiple tasks as needed. 2. Is it also fair to say that given that a streaming data pipeline pulls events from a stream at some chosen interval, and presumably each pull constitutes at least one “task run”, the cost of the Prefect job is proportional to the rate at which data is processed? For example, is running a Flow which is efficiently able to pull and process batches of messages every 2 seconds going to be twice as expensive as a Flow which is pulling and processing new batches every 4 seconds? (by the way, when I plug the number of monthly task runs equivalent to continuously pulling batches of messages every 2 seconds into the Pricing calcuator, it comes out to… Contact Us 😄)
    a
    11 replies · 2 participants
  • d

    davzucky

    05/15/2022, 11:58 PM
    Do you have any date about when the development of Orion will happen on the open repo on Github?
    a
    7 replies · 2 participants
  • r

    Ryan Sattler

    05/16/2022, 4:09 AM
    Hi - is there a way to change the logging format when using Prefect Cloud? Setting
    PREFECT__LOGGING__FORMAT
    in the flow config env doesn’t seem to work.
    a
    4 replies · 2 participants
  • h

    Horatiu Bota

    05/16/2022, 10:35 AM
    hi community! i'm trying to set up bitbucket storage (using bitbucket cloud) for my flows (working off this doc https://docs.prefect.io/api/latest/storage.html#bitbucket), but i'm getting a 404 when trying to execute the flow
    404 Client Error: Not Found for url
    . I've tried various combinations of repo/project/auth credentials for Bitbucket storage, but all with the same issue - has anyone successfully set up Bitbucket storage? (using Prefect 1.1.0)
    a
    27 replies · 2 participants
  • p

    Pedro Machado

    05/16/2022, 12:05 PM
    Hi there. Could someone help me understand why some flow runs get stuck? This shows on Prefect Cloud as if it has been running since May 4 but the process is no longer running. Thanks https://cloud.prefect.io/flow-run/a1f0f6df-793d-4fa8-af53-a7f586eadabc
    a
    4 replies · 2 participants
  • m

    Mathijs Carlu

    05/16/2022, 12:12 PM
    Hi, I would like to use a custom image from my private image registry to run a flow on my kubernetes flow runner (orion). How can I provide credentials for this?
    a
    d
    +1
    9 replies · 4 participants
  • f

    Florian Guily

    05/16/2022, 12:33 PM
    hey, might be a dumb question but where could i find the changelogs for Prefect Orion ?
    a
    k
    7 replies · 3 participants
Powered by Linen
Title
f

Florian Guily

05/16/2022, 12:33 PM
hey, might be a dumb question but where could i find the changelogs for Prefect Orion ?
a

Anna Geller

05/16/2022, 12:34 PM
not a dumb question at all! • GitHub: https://github.com/PrefectHQ/prefect/blob/orion/RELEASE-NOTES.md • Discourse: https://discourse.prefect.io/tag/release-notes
f

Florian Guily

05/16/2022, 12:35 PM
Thanks !
Oh the github link leads me to 404
https://github.com/PrefectHQ/prefect/blob/orion/RELEASE-NOTES.md
👍 1
a

Anna Geller

05/16/2022, 12:42 PM
yup, edited my original message
👍 1
k

Kevin Kho

05/16/2022, 1:28 PM
There was a blog for this one
f

Florian Guily

05/16/2022, 1:29 PM
ooh nice thanks !
View count: 4