https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Michael Shoemaker

    10/05/2022, 2:58 PM
    Noob Question - Thought I'd ask this here rather than on Github Issues because I'm fairly certain this is user error on my part. Trying to setup the dev environment but when running pytest I get the following errors. Any thoughts on what I'm doing wrong?
    ImportError while loading conftest '/home/gary/prefect/tests/conftest.py'.
    tests/conftest.py:32: in <module>
        import prefect
    src/prefect/__init__.py:23: in <module>
        from prefect.flows import flow, Flow
    src/prefect/flows.py:36: in <module>
        from prefect.context import PrefectObjectRegistry, registry_from_script
    src/prefect/context.py:34: in <module>
        from prefect.client import OrionClient
    src/prefect/client.py:55: in <module>
        from prefect.orion.api.server import ORION_API_VERSION, create_app
    src/prefect/orion/api/__init__.py:1: in <module>
        from . import (
    src/prefect/orion/api/block_documents.py:19: in <module>
        from prefect.orion import models, schemas
    src/prefect/orion/models/__init__.py:1: in <module>
        from . import (
    src/prefect/orion/models/block_schemas.py:12: in <module>
        from prefect.blocks.core import Block
    src/prefect/blocks/__init__.py:3: in <module>
        import prefect.blocks.notifications
    src/prefect/blocks/notifications.py:26: in <module>
        class SlackWebhook(NotificationBlock):
    pydantic/main.py:283: in pydantic.main.ModelMetaclass.__new__
        ???
    ../anaconda3/lib/python3.9/abc.py:106: in __new__
        cls = super().__new__(mcls, name, bases, namespace, **kwargs)
    src/prefect/utilities/dispatch.py:99: in _register_subclass_of_base_type
        register_type(cls)
    src/prefect/utilities/dispatch.py:154: in register_type
        key = get_dispatch_key(cls)
    src/prefect/utilities/dispatch.py:76: in get_dispatch_key
        dispatch_key = dispatch_key()
    src/prefect/blocks/core.py:127: in __dispatch_key__
        return block_schema_to_key(cls._to_block_schema())
    src/prefect/blocks/core.py:263: in _to_block_schema
        return BlockSchema(
    pydantic/main.py:340: in pydantic.main.BaseModel.__init__
        ???
    pydantic/main.py:1076: in pydantic.main.validate_model
        ???
    pydantic/fields.py:884: in pydantic.fields.ModelField.validate
        ???
    pydantic/fields.py:1101: in pydantic.fields.ModelField._validate_singleton
        ???
    pydantic/fields.py:1148: in pydantic.fields.ModelField._apply_validators
        ???
    pydantic/class_validators.py:318: in pydantic.class_validators._generic_validator_basic.lambda13
        ???
    pydantic/main.py:690: in pydantic.main.BaseModel.validate
        ???
    E   DeprecationWarning: `copy_on_model_validation` should be a string: 'deep', 'shallow' or 'none'
    k
    m
    • 3
    • 15
  • k

    Kunal Tyagi

    10/05/2022, 3:21 PM
    Is there a way to monitor for lack of logs in a prefect task (or better, in a flow)?
    k
    • 2
    • 11
  • j

    Jason Bertman

    10/05/2022, 4:54 PM
    Hey all, just recently set up Orion in our EKS cluster to pilot switching over to RayTaskRunner. We were experiencing some pretty untenable memory bloat with high numbers of tasks with Dask (25K+). My question about Ray: does the local cluster capability feature the same worker adaption that Dask does? In my testing it seems to function basically the same as a concurrent executor (single pod, local scheduling). Am I missing something here? Or is this a case where we need to deploy a local Ray cluster to get that use case?
    m
    • 2
    • 12
  • k

    Kyle D

    10/05/2022, 5:15 PM
    Hi all! Are there any code examples of using the python client to create a flow run using
    create_flow_run()
    in prefect 2? I’m trying to see how the
    flow
    parameter should be setup
    a
    m
    • 3
    • 12
  • n

    Nathan R

    10/05/2022, 6:42 PM
    Hi All, Whats the best way to get information such as flow id and task id programmatically during execution?
    ✅ 1
    b
    • 2
    • 5
  • n

    Nick McGoye

    10/05/2022, 6:57 PM
    Hi everyone! I'm sure this has been answered but I’m struggling to find in the documentation. I want to schedule a flow, but I only want to have that flow run once another flow in my Prefect Cloud instance has completed successfully. Can someone point me in the right direction? I’m using Prefect 1. Thanks!
    ✅ 1
    m
    • 2
    • 2
  • n

    Nace Plesko

    10/05/2022, 7:37 PM
    Hi everyone! I'm using Prefect V1 and I'm having some issues with the timeout configuration. Our task is kicking off
    ShellTask
    which is written in Typescript. As part of the typescript script we call python script. The prefect timeout works if it's reached while typescript is executing, but the timeout doesn't work if the process is stuck inside the python script. Has anyone ran into the same issue before in the past? Any advice is appreciated, thank you in advance!
    ✅ 1
    b
    • 2
    • 14
  • n

    Norman

    10/05/2022, 8:12 PM
    Is there an example of using a private docker container registry on Prefect V2?
    ✅ 1
    r
    n
    n
    • 4
    • 21
  • j

    Jarvis Stubblefield

    10/05/2022, 8:51 PM
    Hello everyone, I’m having trouble getting the
    prefect agent
    to execute within my Django environment (using Django ORM in the flow). I finally got the deployment created by adding my project to the sys path before running the Django setup. That works for running the flow manually, but now I’m getting another error when the Agent tries to run the flow. Full error in the thread message. So my idea at this point is to create a Django Management command (which executes in the Django environment) to run the agent through Python code. I have an example of doing this in Prefect v1, but I’m using Prefect v2 and haven’t see where this is clearly documented.
    LocalAgent
    no longer seems to exist in Prefect v2 to run an agent through code. Any ideas of other ways to run this or how to run the agent from within Python would be super helpful and wholly appreciated!
    r
    • 2
    • 33
  • a

    Adam

    10/05/2022, 9:51 PM
    hey all - I have a prefect orion server running in docker. I’m trying to run hello world flows outside of the orion docker, just a local flow but I want the flow to be connected to the orion server. From what I found I have to set the config variable for prefect with prefect config set PREFECT_API_URL = “” but I can’t figure out what the url is suppose to be.
    ✅ 1
    r
    • 2
    • 9
  • e

    Emon Li

    10/06/2022, 12:22 AM
    Hey guys, what’s the best workaround of accessing the code in a private git repo?
    a
    r
    • 3
    • 6
  • p

    Paco Ibañez

    10/06/2022, 2:45 AM
    Hello! Is there a way to install only the prefect python client? I am writing some tooling that calls the prefect api and I prefer using the client instead of manually making http calls to the api. Thanks!
    ✅ 1
    m
    • 2
    • 7
  • v

    Vadym Dytyniak

    10/06/2022, 7:50 AM
    Hi. In Prefect V1 we used the following variable -
    prefect.context.scheduled_start_time
    . Is it any alternative in Prefect 2? Thanks.
    ✅ 1
    a
    • 2
    • 5
  • a

    Andreas Tsangarides

    10/06/2022, 8:35 AM
    hi all! Any way to configure the slack notifier to give a notification only when the FLOW fails? The tasks in the flow have retry attempts, so would like only a notification when the whole flow fails (if all retry attempts have been exhausted for example or something else failed)
    slack.py
    ✅ 1
    a
    • 2
    • 2
  • p

    Pekka

    10/06/2022, 10:37 AM
    Hello. I'm getting
    ImportError: dbt-core needs to be installed to use this task; run `pip install "prefect-dbt[cli]"
    when I definitely have
    prefect-dbt[cli]
    installed both for the system and for the project. The error happens when running
    prefect_dbt.cli.commands.trigger_dbt_cli_command("dbt debug")
    -- does this have to do with missing
    trigger_kwargs
    -- not the package missing? SOLVED: INSTALL PREFECT-DBT[CLI] AS ROOT (the subprocesses don't have the same PATH variable so the
    which
    command doesn't find them)
    👍 1
    :thank-you: 2
    ✅ 1
    m
    • 2
    • 6
  • z

    Zach Schuster

    10/06/2022, 2:11 PM
    Hi folks! Would like to pick your brains regarding the CICD side of Prefect2. We currently are running flows via ECS with S3 storage. We have multiple different flows that don't share the same dependencies. It seems inefficient and error prone to build one large image for all of these flows to run in. So my question is: • Is anyone using a mono repo for prefect2 with multiple flows each having their own image and requirements files getting deployed via github actions or any other automated deployment tool? • If so, any insight into how you are building images with different requirements files in an automated fashion would be really helpful. Thank you!
    ✅ 1
    k
    c
    +2
    • 5
    • 16
  • r

    rectalogic

    10/06/2022, 3:42 PM
    does the v1 Git storage support submodules? docs https://docs-v1.prefect.io/orchestration/flow_config/storage.html#git
  • d

    David Elliott

    10/06/2022, 4:29 PM
    Hey, I’m finding
    logger
    doesn’t seem to output logs either to terminal or to the cloud UI when using the
    DaskTaskRunner()
    - I’m calling
    get_run_logger()
    within the task - code attached in 🧵 . Any ideas / am I missing something? (works fine with sequential / concurrent task runners, just not the Dask one)
    m
    r
    • 3
    • 16
  • n

    Nick DeCraene

    10/06/2022, 4:47 PM
    I'm looking to get mypy working with 2.0 and it looks like every task's return value is changed to
    None
    instead of what is hinted at in the method. Does anyone know of a work-around besides ignoring the errors?
    r
    • 2
    • 7
  • y

    YZ

    10/06/2022, 5:38 PM
    Hi folks, for a
    state_handler(task, old_state, new_state)
    Is there a way to access the original input parameters for the
    task
    ? For example, my original task is as below, and I would like to access
    config
    in the
    state_handler
    def state_handler(task, old_state, new_state):
       # Question: how can I access the `config` variable I originally passed into `task`?
    
    
    
    @task(state_handlers=[state_handler])
    def my_task(config: Any):
        if config.is_prod:
            # do something
    r
    • 2
    • 8
  • d

    Daniel Burkhardt

    10/06/2022, 5:59 PM
    Is there a way to provide a dropdown in the Prefect Cloud UI for parameter values? We would specify the list of parameters in the flow code and then users would have a dropdown for them in the UI.
    ✅ 1
    j
    • 2
    • 1
  • j

    Jason Bertman

    10/06/2022, 6:40 PM
    Does anyone know if it is possible to figure out in which context a flow is executing, local or remote - and accordingly modifying a task runner? I have a Ray cluster in k8s that our remote deployments will use, but have a requirement to allow local exec too. Problem is, users won't have access to our Ray cluster, which wouldn't be a problem, but the flow def has the address hardcoded. Looking to do something like (local exec has a temp Ray cluster, k8s exec has remote cluster):
    @flow(
        task_runner=RayTaskRunner(
            address="<ray://ray-cluster-kuberay-head-svc:10001>"
        ),
    )
    def main(...):
        ...
    
    if __name__ == "__main__": # If this flow is called directly, don't use the address
        main(..., task_runner=RayTaskRunner)
    ✅ 1
    m
    r
    • 3
    • 11
  • v

    V

    10/06/2022, 8:01 PM
    Hello everyone, I’m trying to process tables in an execution order defined in the dependency list, made some progress, however I’m not sure how I can refer to an existing task without creating one each time, using Prefect V1, sample code in the 🧵
    a
    • 2
    • 5
  • z

    Zackary Wixom

    10/06/2022, 9:49 PM
    Not sure if this is the right place, but I am running a prefect flow in a VScode python file and it will not stop running. Is there any sort of functionality like a flow.cancel() to stop this from running forever?
    ✅ 1
    m
    • 2
    • 6
  • z

    Zac Hooper

    10/07/2022, 4:47 AM
    Has anybody got a working example of using EXDATE with the rrule schedule. Basically I want a daily schedule but to exclude public holidays. Here's an example rule I have that's meant to run daily and exclude Christmas & Boxing day.
    RRuleSchedule(
        rrule="DTSTART:20221007T120000Z\nRRULE:FREQ=WEEKLY;UNTIL=20240330T120000Z\nEXDATE:20221225T120000Z\nEXDATE:20221226T120000Z"
    )
  • t

    Thomas Opsomer

    10/07/2022, 8:12 AM
    Hello prefect community 🙂 We're getting some strange error when running flows:
    Failed to load and execute flow run: FlowStorageError('An error occurred while unpickling the flow:\n  JSONDecodeError("Expecting \',\' delimiter: line 1141 column 82 (char 62124)")')
    (We're using GCS for storage)
    ✅ 1
    • 1
    • 1
  • t

    Thomas Fredriksen

    10/07/2022, 8:37 AM
    Hi there. Is there any way to have Redis act as my cache-store, instead of a filesystem? Assuming I were to create a custom block for this, how do I have the cache use Redis as a backend?
    ✅ 1
    a
    • 2
    • 27
  • c

    Clovis

    10/07/2022, 8:58 AM
    Hi guys ! Quick question here, I see that the fresh new version Prefect 2.5 was available with new blocks like
    GCloudRun
    . Great 🎉 ! However, after connecting to my prefect cloud environment no new block available and I could not find the way to find the Cloud UI current version. So my question was how can I retrieve the version from the cloud , and is there a way to update / rollback it manually ?
    ✅ 1
    a
    p
    • 3
    • 3
  • k

    Kelvin DeCosta

    10/07/2022, 9:02 AM
    Hey everyone! I just started using Prefect earlier this week and it has been amazing so far! I want to clarify a doubt I had regarding work queues. When information is sent to a work queue (eg: when I manually trigger a flow from the UI) where does it go? Is the content of the work queue stored on Prefect Cloud?
    ✅ 1
    :marvin: 1
    a
    • 2
    • 3
  • a

    Anna Geller

    10/07/2022, 11:46 AM
    Cross-posting for visibility
    :dbt: 3
Powered by Linen
Title
a

Anna Geller

10/07/2022, 11:46 AM
Cross-posting for visibility
:dbt: 3
View count: 1