https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • r

    Rik

    07/22/2022, 2:10 PM
    Hey, we are trying to automate sending Prefect invites to users and assigning them a default role. I was just wondering if there's documentation or a query already available that I can test with. We are using Prefect Cloud 1.0.
    ✅ 1
    k
    • 2
    • 1
  • h

    haris khan

    07/22/2022, 3:52 PM
    prefect 2.0 . storage not creating
    ✅ 1
    k
    s
    • 3
    • 8
  • s

    Steph Clacksman

    07/22/2022, 4:30 PM
    Are there any limits on what types can be parameters for a flow (or subflow)? I am trying to pass a pandas dataframe from one flow to another, and I get a pydantic error
    RuntimeError: no validator found for <class 'pandas.core.frame.DataFrame'>
    , so I tried to use
    validate_parameters=False
    in the flow annotation, but it made no difference. What am I doing wrong?
    ✅ 1
    k
    a
    • 3
    • 4
  • j

    JR Carneiro

    07/22/2022, 4:48 PM
    Hi, I have been experiencing some issues with Prefect Docker Agents in our deployment: • we setup agents manually with
    prefect docker agent
    on a single server; • then we run several flows; • we make updates to flows, and each update creates a new version of docker image; with each run the docker images are pulled onto the machine. After some point we run out of disk space
    ✅ 1
    a
    • 2
    • 1
  • j

    JR Carneiro

    07/22/2022, 4:49 PM
    Has anyone encountered and solved a similar issue before?
    ✅ 1
    s
    k
    • 3
    • 5
  • a

    Andrew Pruchinski

    07/22/2022, 5:13 PM
    Hello, I've noticed tasks start up, run and succeed. Then start up again a few minutes later. Is this expected behavior? Is there a way to ensure this doesn't happen?
    ✅ 1
    k
    • 2
    • 7
  • d

    Divya

    07/22/2022, 5:33 PM
    Hello, I am trying to schedule metadata ingestion flows for Amundsen using Prefect. The Amundsen code that I am trying to schedule has run() function. So on trying to run the code as part of task is failing. Do you have any suggestions to schedule such type of flows? We are currently using Perfect1.0 Thank you, Divya
    ✅ 1
    k
    a
    • 3
    • 4
  • c

    Chris Reuter

    07/22/2022, 7:48 PM
    Thanks so much to @Jeshua Irving and @Mathijs Miermans for chatting about their experience with Prefect today. The recording of our fireside chat is

    now available▾

    on our YouTube channel.
    :thank-you: 3
    🙌 4
  • m

    Mike Vanbuskirk

    07/22/2022, 10:14 PM
    do you absolutely have to explicitly specify an ECS cluster name to an
    ECSRun
    config or can it not infer the correct cluster from providing agent labels?
    ✅ 1
    m
    a
    • 3
    • 12
  • c

    Cole Murray

    07/23/2022, 12:45 AM
    Hi All, I’m running into an issue with the 2.0b12 version.
    root@c0713100a1dd:/opt/app# python
    Python 3.8.13 (default, Jul 13 2022, 05:54:24)
    [GCC 10.2.1 20210110] on linux
    Type "help", "copyright", "credits" or "license" for more information.
    >>> import prefect
    >>> import prefect.flow_runners
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ModuleNotFoundError: No module named 'prefect.flow_runners'
    >>> prefect.__version__
    '2.0b12'
    >>>
    It looks like we’ve removed the flow_runners module, but it doesn’t appear reflected in the documentation. Version was published earlier today
    ✅ 2
    n
    a
    s
    • 4
    • 15
  • b

    Benoit Chabord

    07/23/2022, 4:46 AM
    Hi All, I have a 2.0 Flow/Tasks architecture question: My use case is to Check for one or more files in a s3 buckets, if there is something then execute a few tasks. This check is going to run every 5 minutes but will probably process files only once an hour on average: I separated my code/flows/tasks this way: • A flow named "Check for files on S3) with one task downloading the files and returning the list of downloaded files • If the list is not empty Subflow is called invoked with the list of files that were just downloaded ◦ A few tasks are executed doing transformations and calling some APIs ◦ My last task is to delete from S3 the files I just processed if all of the previous tasks succeeded The reason I did a subflow is so that I can filter easily the flow that is actually processing files instead of having a successful flow every 5min that may not contain any transformation. Is this an antipattern? Also I have to add a
    # type: ignore
    on every task call because the task returns the type of the wrapped function so I cannot do .wait() on it without raising an issue with mypy. Not sure how we can do better though
    ✅ 1
    a
    c
    • 3
    • 4
  • a

    Andreas Nigg

    07/23/2022, 6:46 AM
    Hey, I've encountered a problem with prefect 2.0b12 (I'm running on prefect cloud 2.0). I changed my flow_runner configurations in my deployments to infrastructure. I'm using yaml-configuration for deployments. When trying to create the deployment with prefect deployment create <my-path>, I get the following error:
    Found 1 invalid deployments:
        1 validation error for deployment with name 'sync_goodread_score_to_volat_bigquery'
        infrastructure
          Can't instantiate abstract class Infrastructure with abstract methods preview, run (type=type_error)
    Please find my deployment in the thread
    ✅ 1
    a
    m
    • 3
    • 8
  • o

    Oliver Mannion

    07/23/2022, 7:56 AM
    The latest version of Orion introduces
    .submit()
    , eg:
    @flow
    def greetings(names: List[str]) -> None:
        for name in names:
            say_hello.submit(name)
            say_goodbye.submit(name)
    but the flow still works without using
    .submit()
    . What are the implications of not using
    .submit()
    ?
    ✅ 1
    v
    a
    +2
    • 5
    • 14
  • y

    yu zeng

    07/23/2022, 10:16 AM
    hello. can i spec a arg to restart dask worker every hours and scatter the work's in-memory data to cluster before restart the worker ? i found my dask worker will oom when a long execution time, i can't find the mem-leak reason and so i have to restart works by a period of time, but when i restart worker it will loss the in-memory data and it will re-compute them, this waste time a lot and will make many mistakes.
    ✅ 1
    a
    • 2
    • 3
  • p

    Prasanth Kothuri

    07/23/2022, 2:22 PM
    I have a prefect flow which consumes messages from rabbitmq, I want this task to run continuously, what would the schedule be? I am thinking of starting the flow and then include max_retries and retry_delay for the task which consumes from rabbitmq, is there a better way? thank you
    ✅ 1
    a
    • 2
    • 6
  • n

    Noam Cohen

    07/23/2022, 7:06 PM
    Hi, for some reason my deployments disappeared together with my blocks. I tried to reconfigure the GCP storage (which was already configured) but now I get
    (finance-lGLYToYy-py3.9) ➜  finance git:(master) ✗ prefect storage create
    No storage types are available. 
    (finance-lGLYToYy-py3.9) ➜  finance git:(master) ✗ prefect version
    Version:             2.0b10
    API version:         0.7.0
    Python version:      3.9.10
    Git commit:          e71ef86c
    Any idea what changed?
    ✅ 1
    a
    • 2
    • 8
  • s

    Sander

    07/23/2022, 8:38 PM
    Hi, For prefect 2 I understand that you can set notifications to slack. And to email in the cloud version. Will email be available in the non-cloud version as well? And will you be able to create custom events based on state changes of flow runs? Say create Jira ticket when a flow run fails?
    ✅ 1
    a
    • 2
    • 5
  • c

    Chu

    07/23/2022, 8:38 PM
    Hi Community, I’m using a flow of flow framework, and use KubernetesRun to run the flow. My rough design is like the screenshot: 1. A, B, C, D, E, F are all flows 2. C, D and E, F needs parallel running (where I will go with LocalDaskExecutor), where they basically are running some dbt jobs to do data transformation 3. A, B does not need parallel 4. currently I decide to use a parent flow (as an orchestrator) to run these 6 flows and submit to K8s My questions are : 1. where can I pass parameters (I know I can pass a dictionary params to flow.run() when local run, but no idea with
    flow.run_config = KubernetesRun()
    ) 2. Where can I attach LocalDaskExecutor? Only to the parent flow? (Maybe for flow A, B, I just set their parameters using
    unmapped()
    function?) 3. Is there a way for me to test my project locally (to make sure the design works as wished) without registering every flow to the Cloud? 4. (Optional) for my use case, is there a better design to achieve my goals? Since I’m quite new to Prefect, but really astonished by its cool functionality, maybe there is some place I can make great improvements for our company Thanks! It’s a long question, but really appreciate for any help!
    ✅ 1
    a
    • 2
    • 4
  • v

    Vipul

    07/23/2022, 8:40 PM
    Hi Prefect Team, I had ask this question in past and asking it again if there is any plan to support caching based on the file target in Prefect Orion as it was supported in Prefect Version 1.0
    ✅ 1
    a
    • 2
    • 1
  • t

    Tom Klein

    07/24/2022, 4:39 PM
    howdy 🙋 I have a couple of questions regarding best-practices with Prefect. • the first one is regarding a “validation” step (e.g. - if current state in the world already == desired state, then just SKIP the flow entirely). I know i can SKIP a task (by raising skip), but if i wanna skip the entire flow, should i just make sure that the downstream flows depend on it and that’s it? • the second Q is more general about orchestration : let’s say i have an external resource that i want to change to state “GREEN” at some time in the day (every day) and then set back to “BLUE” at some later time in the day (similarly, every day) - in the beginning i thought of doing like a single flow (that would run for example once an hour) and would contain an internal representation the desired schedule - and apply it within the run itself to decide if the state needs to be changed, and if so - to what. Then i figured it might be simple to just run the flow with two different parameters - twice a day (once to make things “Green”, and then again to change them back to “Blue”, with a flow-run parameter — is that even possible in prefect? to have two schedules with different parameters?) --- i’m not sure which one is deemed more “correct”?
    ✅ 1
    a
    • 2
    • 13
  • c

    Chu

    07/24/2022, 7:39 PM
    hi Community, I have 2 flows: A and B, their dependency is
    A -> B
    , both A and B will take a list of same org_ids as parameters (need to use map function for parallel running) when I use a parent flow to schedule flow A and B, can I use map function to pass parameters like this way? or is there a better way to do that? code in the thread
    ✅ 1
    a
    • 2
    • 8
  • d

    Divya

    07/24/2022, 10:51 PM
    Hello, I updated to Prefect2.0 I am trying to run the below commands to start the prefect server but on running the below commands I am getting the error message as below: prefect backend server prefect server start
    ✅ 1
    a
    • 2
    • 6
  • j

    James Constable

    07/25/2022, 3:41 AM
    Hi, I think I must be missing something. I am working my way through the getting started for prefect 2.0 and I got to this page https://orion-docs.prefect.io/tutorials/deployments/#configuring-storage . It seems that the storage command does not exist. Or is this just a windows issue?
    ✅ 1
    s
    • 2
    • 4
  • a

    Andreas

    07/25/2022, 9:55 AM
    In latest Prefect 2.0 beta 2.0b12 deployments registered to backend are not showing for me in the UI. They are showing however when I run
    prefect deployment ls
    . I tried a reset on orion's database and running the deployment create script again with no luck. This happened after upgrading from b11 to b12
    ✅ 1
    r
    j
    f
    • 4
    • 8
  • j

    Justin Trautmann

    07/25/2022, 10:17 AM
    hello prefect team, i'm experiencing errors when trying to execute async flow deployments locally in prefect 2.0. how should i write my async flows so that i local execution works properly? running the flow script locally or running the deployment via an agent works perfectly fine but
    prefect deployment execute ...
    fails with
    RuntimeWarning: coroutine 'create_then_begin_flow_run' was never awaited
    . see thread for my flow code. thanks a lot for your support.
    ✅ 1
    j
    • 2
    • 3
  • s

    Slackbot

    07/25/2022, 10:17 AM
    This message was deleted.
    h
    • 2
    • 2
  • t

    Tom Thurstan

    07/25/2022, 10:23 AM
    Hi Prefect team! Since upgrading to Prefect-2.0b12 the module flow_runners no longer exists. Where are we supposed to import e.g. SubprocessFlowRunner from? Have tried to find it in the package but no luck. Thanks!
    ✅ 1
    r
    f
    • 3
    • 8
  • f

    Florian Guily

    07/25/2022, 10:26 AM
    Hello, i was wondering if we can apply prefect 2 in Object oriented programmation with python . Like having class functions being tasks and flow using objects and those class functions
    👀 1
    ✅ 1
    j
    • 2
    • 2
  • r

    Riccardo Tesselli

    07/25/2022, 11:08 AM
    hello guys, just wanted to give you some feedback on Prefect 2.0. I’m using Pycharm for development, and I’ve noticed that with a lot of Prefect’s classes (like
    Deployment
    ) Pycharm can not provide autocompletion suggestions for the class arguments (like
    name
    ,
    tags
    …). I think this is because of the decorators applied to the classes which “mask” the arguments (like
    @PrefectObjectRegistry.register_instances
    ). To improve the development experience it could be nice to find a way to expose the arguments so Pycharm and IDEs can provide autosuggestions
    ➕ 5
    ✅ 1
    b
    j
    • 3
    • 2
  • j

    Joshua Greenhalgh

    07/25/2022, 11:51 AM
    Think there may be a bug in the GQL API;
    👀 1
    m
    • 2
    • 3
Powered by Linen
Title
j

Joshua Greenhalgh

07/25/2022, 11:51 AM
Think there may be a bug in the GQL API;
👀 1
query Task_run($where: task_run_bool_exp) {
  task_run(where: $where) {
    id
    flow_run_id
    state
    task {
      name
    }
  }
}
With variables;
{
  "where": {
      "start_time": {
        "_gte": "2022-07-20",
        "_lt": "2022-07-21"
      }
}
Always times out;
{
  "errors": [
    {
      "path": [
        "task_run"
      ],
      "message": "Operation timed out",
      "extensions": {
        "code": "API_ERROR"
      }
    }
  ],
  "data": null
}
m

Mason Menges

07/25/2022, 10:11 PM
Hey @Joshua Greenhalgh, Do you know if this query runs in the Interactive Api when you're signed in to cloud.prefect.io?
j

Joshua Greenhalgh

07/29/2022, 6:20 PM
@Mason Menges I don't use that no idea how to do variables in it...
View count: 4