https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • b

    Ben Muller

    11/02/2022, 8:06 AM
    Hey - say I have some CI process and it deploys a flow, its infra and storage on every deployment. Then I change the name of one of the deployments, or the flow name ( due to some new requirements ). Is there any recommended pattern besides manually cleaning everything up on the UI to delete the old resources that will continue to live in the workspace?
    ✅ 1
    j
    • 2
    • 3
  • d

    Denys Volokh

    11/02/2022, 8:17 AM
    Hi, Is there Prefect Task that works with Gmail? Read email, download attachment?
    ✅ 1
    j
    • 2
    • 2
  • w

    wonsun

    11/02/2022, 8:36 AM
    Hi all, I can't find where the error occurred. I'm using prefect 1.0. As I looked for examples to set the dependency between flows, I saw an article written by anna geller. (Link) Some time has passed since she wrote this article, so I changed
    FlowRunTask
    to
    StartFlowRun
    (reference link) among the methods written in that article to create the same situation. Below attached scripts, it is the case that two flows(flow_waveforms and flow_pipeline) that are executed well have already been created, and parent_flow is created to sequentially execute those flows.
    # parent_flow.py
    import prefect
    from prefect.tasks.prefect.flow_run import StartFlowRun
    
    with Flow('mother_flow') as flow:
        first_job = StartFlowRun(flow_name='flow_waveforms',
                                 project_name='InterFlow_Dependencies',
                                 wait=True)
        second_job = StartFlowRun(flow_name='flow_pipeline',
                                  project_name='InterFlow_Dependencies',
                                  wait=True)
        first_job.set_downstream(second_job)
    
    if __name__ == '__main__':
        flow.register("InterFlow_Dependencies")
    # flow_waveforms.py
    import prefect
    from prefect import Flow, task
    
    @task
    def task1():
        something
        return one, two
    @task
    def task2():
        something
    
    with Flow('flow_waveforms') as flow:
        var1, var2 = task1()
        finish = task2()
    
    flow.run_config = UniversalRun(env={'PREFECT__CLOUD__HEARTBEAT_MODE':'thread'})
    flow.register('InterFlow_Dependencies')
    # flow_pipeline.py
    import prefect
    from prefect import Flow, task
    
    @task
    def task3():
        something
    
    with Flow('flow_pipelines') as flow:
        task3()
    
    flow.register('InterFlow_Dependencies')
    If i excuted mother_flow.py in this case, prefect responds following error.(image) Any idea where I went wrong? Or could you tell me if there is an easy way to define dependencies between flows other than this way? Thx.
    a
    • 2
    • 6
  • p

    pk13055

    11/02/2022, 10:09 AM
    Hi, I have a pretty simple issue that I can't seem to find any documentation about. I set up an
    orion
    +
    agent
    container setup locally, and everything is working as expected when I try to run the flows manually. I was able to successfully create and apply a deployment as well (from within the
    cli
    container). However, when trying to run the deployment I am faced with a
    File not found error
    . Here's the traceback:
    Flow could not be retrieved from deployment.
    Traceback (most recent call last):
      File "<frozen importlib._bootstrap_external>", line 879, in exec_module
      File "<frozen importlib._bootstrap_external>", line 1016, in get_code
      File "<frozen importlib._bootstrap_external>", line 1073, in get_data
    FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmp_zx6qpglprefect/flows/fetch_etfs.py'
    My
    docker-compose
    is as follows:
    version: "3.9"
    services:
      db:
        image: timescale/timescaledb:latest-pg14
        volumes:
          - $PWD/data/db:/var/lib/postgresql/data
          - $PWD/config/db:/docker-entrypoint-initdb.d/
        healthcheck:
          test: [ "CMD-SHELL", "pg_isready" ]
          interval: 10s
          timeout: 5s
          retries: 5
        environment:
          - POSTGRES_DB=$POSTGRES_DB
          - POSTGRES_USER=$POSTGRES_USER
          - POSTGRES_PASSWORD=$POSTGRES_PASSWORD
        networks:
          - db_network
      orion:
        image: prefecthq/prefect:2.6.3-python3.10
        restart: always
        volumes:
          - $PWD/prefect:/root/.prefect
        entrypoint: [ "prefect", "orion", "start" ]
        environment:
          - PREFECT_ORION_API_HOST=0.0.0.0
          - PREFECT_ORION_DATABASE_CONNECTION_URL=$PREFECT_DB_URL
        ports:
          - 4200:4200
        depends_on:
          - db
        networks:
          - prefect_network
          - db_network
    
      agent:
        build: ./prefect
        restart: always
        entrypoint: [ "prefect", "agent", "start", "-q", "main_queue" ]
        environment:
          - PREFECT_API_URL=<http://orion:4200/api>
        networks:
          - prefect_network
          - db_network
    
      cli:
        build: ./prefect
        entrypoint: "bash"
        working_dir: "/root/flows"
        volumes:
          - "$PWD/prefect:/root"
        environment:
          - PREFECT_API_URL=<http://orion:4200/api>
        networks:
          - prefect_network
          - db_network
    
    networks:
      db_network:
      web_network:
      prefect_network:
    To create and apply the deployment(s):
    > docker compose run cli
    $ prefect deployment create ./flows/fetch_etf.py:flow_name --name flow_name
    $ prefect deployment apply deployment.yaml
    ✅ 1
    r
    • 2
    • 2
  • a

    Anna Geller

    11/02/2022, 4:13 PM
    cross-posting
  • j

    John Kang

    11/02/2022, 5:18 PM
    I've been re-deploying some flows this morning and I have been successful in re-deploying flows until about 15 minutes ago when I received this error. I have not changed anything prefect-wise in my code, is this a server error?
    ✅ 1
    a
    b
    • 3
    • 14
  • j

    Jiri Klein

    11/02/2022, 5:52 PM
    Hi, is there a Prefect integration with GitHub that could show the Status of my flow on my PR so that reviewers don’t need to login to Prefect to see that my flow succeeded?
    k
    • 2
    • 1
  • c

    Conor Casey

    11/02/2022, 6:25 PM
    Hello is there a way to send an email or slack alert if a flow fails to run?
    ✅ 1
    a
    • 2
    • 2
  • a

    Amir

    11/02/2022, 6:45 PM
    One of my flows is struggling to process: agent is on AKS and the CPU usage, through lens, shows a value of 1 for about 6 hours before dropping to negligible amounts (0.002, etc) while still maintaining high levels of RAM usage (30-40gb). At first I assumed it was stalling, but looking further it seems that the flow is only using 1/8 cores at 100%. This explains the continued RAM use. Any suggestions on how to utilize the full compute power of the aks vm? I'm assuming DaskTaskRunner is the route to go, and I'm currently experimenting with it, but not having much luck with configurations.
    :kubernetes: 1
  • j

    Jon Young

    11/02/2022, 7:42 PM
    i have a flow that i want to dynamically name, schedule, and pass parameters to, how do i do that?
    1️⃣ 1
    m
    • 2
    • 14
  • j

    Jon Ruhnke

    11/02/2022, 8:46 PM
    I'm testing flows locally during development by simply typing "py .\flow.py" in powershell, and the results are inadvertently getting communicated up to my prefect cloud UI without an agent running. How is this possible?
    ✅ 1
    b
    r
    • 3
    • 10
  • b

    Ben Muller

    11/02/2022, 8:52 PM
    Is there a way to restrict access for an API Key to only one workspace?
    ✅ 1
    k
    r
    • 3
    • 12
  • b

    Ben Muller

    11/02/2022, 9:22 PM
    Starting a new thread for a new topic - Are we able to tag infrastructure blocks? I would like to tag my EcsTask but can not see the arg available for it in the code?
    ✅ 1
    k
    m
    • 3
    • 30
  • b

    Bradley McLaughlin

    11/02/2022, 10:02 PM
    In Prefect 2.0, is there a pattern to replicate the subflow orchestration in the same way as 1.0? Instead of the subflows running in the same process asynchronously using asyncio.gather, I need the flow runs to be created with create_flow_run() (this part works) but then I need to wait for the subflow runs to complete in the parent flow. How can I make the parent flow wait for subflows running in different processes?
    k
    • 2
    • 2
  • d

    David Prince

    11/02/2022, 10:43 PM
    Hey all, I need some urgent assistance dealing with corrupted flow metadata. We deleted a flow yesterday after we had some performance issues. Every since then, our prefect agent has been stalling and repeatedly producing this error:
    prefect.exceptions.ClientError: [{'path': ['flow_run', 0, 'id'], 'message': 'Cannot return null for non-nullable field flow_run.id.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]"
    As far as I understand, this is a flow_run record without a flow. I need some assistance with removing this corrupted metadata as our data pipelines are all majorly delayed.
    m
    • 2
    • 5
  • s

    Stephen Lloyd

    11/03/2022, 4:01 AM
    In the prefect 1.0 cloud UI, how can I see a breakdown of where runs are coming from?
    ✅ 1
    b
    • 2
    • 3
  • m

    merlin

    11/03/2022, 4:06 AM
    Deployment Environment Variables There's something confusing about executing flows from the command line vs. deployed flow runs in a local execution environment. Deployed flow runs are writing to files in system temp directory, rather than to the local file system relative to the flow invocation. • Running prefect orion and flow code from a poetry managed environment, locally • CLI invocation of flow code runs in local $PWD, ie the local python project folder • Deployment runs flow code in a system temp folder, like
    tmp_path
    from
    pytest
    The confusing part was the flow code was found via relative path, but when writing the output to text file on a relative path the output files landed at an ephemeral system temporary folder. Naturally this wouldn't be an issue writing to s3, but for development I like to work in a local environment and write files relative to my project root. I don't think this is caused by the poetry environment, because the agent is running in the same environment but the distinction in behavior is between deployed flows and flows called from the shell:
    # flow.py
    import
    @task 
    def write_output(dataframe, filepath):
        dataframe.to_csv(filepath)
        logger.debug(f"current working directory: {Path().cwd()}")
    
    @flow
    def write_dataframe():
        write_output(df, 'data/csv/datafile.csv')
    
    # shell invocation
    poetry run python src/flow.py
        # prefect logs:
        | DEBUG   | Task run...  - current working directory: /Users/merlin/prefect-repo
    
    # deployed flow invocation
    poetry shell
    prefect deployment run write_dataframe/depl-write
        # prefect logs:
        | DEBUG   | Task run...  - current working directory: /private/var/folders/gh/...4r3zq.../T/tmpXookj0s08prefect
    My goal here is for my task to write to the same local file path where my project folder is stored. Is there a standard way to specify the working directory for deployed flow code on a local environment? Maybe this is managed by the config defaults (I couldn't find a likely ENV variable). My next attempt will be to specify an absolute path for the write location, but this is not ideal.
    c
    j
    +2
    • 5
    • 17
  • i

    iKeepo w

    11/03/2022, 4:55 AM
    hi, anyway to set a custom default time interval here?
    a
    • 2
    • 1
  • t

    Tim-Oliver

    11/03/2022, 7:24 AM
    Hi, I would like to save task-results in a custom way. Say a task produces an image and I would like to cache this result. Does anyone know how I could save it as
    .tif
    file? I tried to implement a custom
    Serializer
    which just passes the image through and then wanted to use a custom
    WritableFileSystem
    as storage option that takes care of writing/reading the image as
    .tif
    . So far I did not manage and would be grateful for some advice. I am trying this in Prefect 2.x. In Prefect 1.x I managed to do something like this by implementing a custom
    LocalResult
    .
    a
    m
    • 3
    • 5
  • j

    José Duarte

    11/03/2022, 10:07 AM
    Hey all, in Prefect 2, is there a way to figure out if I’m running locally or with a remote? From my understanding, in Prefect 1 there was
    prefect.context.get("running_with_backend")
    a
    • 2
    • 2
  • j

    José Duarte

    11/03/2022, 10:13 AM
    Btw, docs for
    ContextModel.copy
    are broken https://docs.prefect.io/api-ref/prefect/context/?h=context#prefect.context.ContextModel.copy
    ✅ 1
    t
    • 2
    • 1
  • t

    Tim-Oliver

    11/03/2022, 11:40 AM
    I think there is a bug in the cloud UI for the infrastructure block process. Once a environment variable is set, the field can not be removed completely. It can be changed, but setting no variable at all is impossible.
    ✅ 1
    :gratitude-thank-you: 1
    r
    • 2
    • 1
  • j

    José Duarte

    11/03/2022, 12:39 PM
    How can I set a state with no data? Prefect throws an error if there’s no data set, but I have nothing to return
    k
    • 2
    • 12
  • d

    David Elliott

    11/03/2022, 2:17 PM
    Hey folks, bit of a niche one - what’s the intention / purpose of job_watch_timeout_seconds in the
    KubernetesJob
    infra? I’m finding that the agent creates the job + pod just fine (and it the flow + pod run through to completion) but after X seconds (per that timeout parameter) the agent logs
    Job 'xxxxx': Job did not complete.
    per this even though the job is mid-way through running? ie it doesn’t seem to have any negative effect on the flow, it’s just telling me the job didn’t complete even when the job is very much still running..? Feels like something’s not quite right, just wanting to understand what the intention is….
    :kubernetes: 1
    m
    j
    • 3
    • 12
  • t

    Tim-Oliver

    11/03/2022, 3:34 PM
    Hey, I would like to use a string-block value during the flow configuration. How would I do that?
    k
    • 2
    • 8
  • j

    Jai P

    11/03/2022, 4:47 PM
    👋 are there guidelines around long-running tasks (e.g. max timeout, should they be marked
    async
    , etc.)?
    m
    • 2
    • 2
  • j

    Javier Ochoa

    11/03/2022, 7:29 PM
    Hello, question. Is there a way to change the backend to "cloud" or "server" directly from the prefect python api?
    k
    • 2
    • 3
  • b

    Brian Phillips

    11/03/2022, 9:08 PM
    Hi, (1.0) we're seeing some strange behavior with child flows. When a new version of a long running parent flow (e.g. a backfiller) is registered with prefect cloud, some child flows kicked off with
    prefect.tasks.prefect.create_flow_run
    are being canceled without any additional info in the logs. Is this expected? Has anyone else encountered similar behavior?
    ✅ 1
    b
    • 2
    • 7
  • m

    Madison Schott

    11/03/2022, 9:13 PM
    For the Fivetran connectors in 2.0, what does
    schedule_type
    signify? Do you only use this if you want the sync to be scheduled outside of a scheduled flow?
    n
    • 2
    • 2
  • m

    Madison Schott

    11/03/2022, 9:15 PM
    Another question- will my secrets created in 1.0 transfer to the 2.0 UI? Or do I need to recreate them?
    ✅ 1
    m
    • 2
    • 1
Powered by Linen
Title
m

Madison Schott

11/03/2022, 9:15 PM
Another question- will my secrets created in 1.0 transfer to the 2.0 UI? Or do I need to recreate them?
✅ 1
m

Michael Adkins

11/03/2022, 9:15 PM
They won’t transfer — we do not use the same backing store for the secrets.
👍 1
View count: 1