https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-server
  • l

    Lucian Rosu

    11/25/2021, 11:33 AM
    Hi All, I`ve been trying since yesterday to create a prefect cloud account. I registered for an account and got the confirmation email but the confirmation url gives a 404 - page not found and trying to log in give me 'Unable to sign in'. Anyone else had problems signin in? Is there a problem with website?
    a
    • 2
    • 11
  • m

    Marko Jamedzija

    11/25/2021, 4:47 PM
    Hi, can someone tell me what is the release schedule for the Prefect Server 🙂? This fix resolved many issues which we had due to AKS (k8s jobs for tasks or flows getting stuck), but some of them (double running due to connection dropouts e.g.) we are still experiencing as we can’t activate it on the agent due to this fix in helm chart not being released yet. Therefore, I’d like to know when can we expect this release, and how the timing of it is decided in general? Thanks!
    a
    m
    • 3
    • 7
  • a

    Anh Nguyen

    11/26/2021, 7:38 AM
    Hi all I have issue when running flow in server. could you help me?
    âś… 2
  • m

    M. Siddiqui

    11/26/2021, 3:20 PM
    Hello guys ! I am currently doing some ETL operations via DBT. As our cases start to grow, we decided to opt for an orchestration tool other than lambdas on AWS. I had experience working with Airflow in the past, but within DBT's docs I stumbled upon Prefect and it seemed to check ALL the right marks regarding the pain points I struggled with Airflow. Hence as a newbie I'm a little confused with some concepts: • Since Agents are responsible for actually executing the tasks, are we responsible for handling their environment python dependencies ? For example, I'm trying to create a Dockerfile which would be run on an AWS ECS container. I found a prefect docker image on dockerhub which would serve as a base image for adding DBT dependencies so that the agent can execute the code. Is this assumption right ? Since I can find prefect cli commands to deploy agents on ECS, Docker, Kubernetes and etc, but I don't see anywhere where the dependencies get resolved .... • I'm also a little confused as to the purpose of Storage ... Is it a compiled version of the flows ? What benefit do I get by adding a Github or S3 Storage layer ? Thanks !
    a
    • 2
    • 7
  • c

    Christopher Chong Tau Teng

    11/29/2021, 8:09 AM
    Hi there. I am trying to setup
    DockerRun
    but am faced with issue importing my flow task from another module — when I run my flow registration script below, it complains that
    src
    module is not found. I think I am doing something wrong because I am confused on how DockerRun works. I have my Python tasks defined as
    task_test_flow
    which reside in a different python module. I then built a docker image of that python module into
    <http://gcr.io/some-bucket/prefect-flows:v1|gcr.io/some-bucket/prefect-flows:v1>
    which I defined inside
    DockerRun(image='')
    here. Then I created the following registration script where I believe I need to import the task (as done so in https://docs.prefect.io/orchestration/flow_config/docker.html#dependency-requirements). But here’s the problem,
    task_test_flow
    is defined in the docker image
    <http://gcr.io/some-bucket/prefect-flows:v1|gcr.io/some-bucket/prefect-flows:v1>
    , and its not located in the same directory as this registration script below… I guess my question is, as I want to define my task separately from my flow registration script, how can I import my task (that is already built into a docker image) into my flow registration script? Do I even need to do that in the first place?
    from datetime import timedelta
    from prefect import Flow
    from prefect.schedules import IntervalSchedule
    from prefect.storage import GCS
    from prefect.run_configs import DockerRun
    from src.test_flow import task_test_flow
    
    
    schedule = IntervalSchedule(interval=timedelta(minutes=1))
    
    with Flow("test-flow", schedule) as flow:
        task_test_flow()
    
    flow.storage = GCS(bucket="some-bucket")
    flow.run_config = DockerRun(image="<http://gcr.io/some-bucket/prefect-flows:v1|gcr.io/some-bucket/prefect-flows:v1>")
    flow.register()
    a
    • 2
    • 4
  • c

    Christopher Chong Tau Teng

    11/29/2021, 9:02 AM
    Hi guys, another question here, my company has this strict policy of only using containers in cloud VM, and prohibiting the installation of any libraries directly on the local machine. I would like to know if running Prefect client in containers is possible? Specifically, I would like to run DockerAgent in a container, and the registration of my flows should also done in a container. I’ve seen previous Slack discussion on how running DockerAgent in a container will cause issues with memory (as flows containers are created within the agent container). May I know if there is any fix for that? Are there any other downsides to running Prefect client in container?
    a
    • 2
    • 6
  • w

    William Clark

    11/29/2021, 5:28 PM
    Hello, I was wondering what the correct pattern is to run multiple flows that are dependent on the same tasks via different parameters. I know that you can parameterize your flows but I need to create new runs for a list of passed in parameters in a more automatic way then registering the parameters in the flow themselves.
    k
    • 2
    • 11
  • o

    Omar Alvarado

    11/30/2021, 1:08 AM
    Hi everyone... I am just trying to configure prefect server with our current infrastructure. We have blocked all external Ip's for accesing our Mariadb RDS AWS database. We want to whitelist the Prefect Server Ip's addresses, so we allow the access, does anyone knows where to find that information?
    k
    • 2
    • 4
  • c

    Cormac Long

    11/30/2021, 8:30 AM
    Hello. This is related to the questions asked by @Christopher Chong Tau Teng I am trying to understand if Prefect Server can be used in our environment to manage multiple workflow streams. We want to manage workflows in the following manner 1. Workflows are containerised 2. You can update the container image backing one workflow without affecting any other workflows 3. Some workflows depend on other workflows having completed correctly 4. Prefect itself is deployed in containers, not on bare-metal host Call a container image which is to be used by one or more workflows a workflow engine. My current understanding on how to do this is to separate out the flow scripts from the workflow engines, but I am a bit confused as to how I should actually implement this. As a template example, consider a setup with * Prefect Server running as as set of docker containers * Two engine container images, tagged engine:1 and engine:2 respectively * Two workflows WF1 and WF2 with asscociated scripts workflow_1.py and workflow_2.py Then, the scenario plays out as follows: 1. both workflow_1.py and workflow_2.py are initially using engine:1. 2. an "issue" is identified with WF1 using engine:1. It has to be updated to use engine:2 3. workflow_2.py is updated to use engine:2 and re-registered with Prefect 4. WF1 now runs using engine:1 while WF2 runs using engine:2 How might this setup be configured in Prefect?
    a
    • 2
    • 2
  • s

    Sylvain Hazard

    11/30/2021, 8:36 AM
    Hi ! I'm having a weird issue with flow registration where it looks like Prefect tries to parse
    .sql
    files as if they were
    .py
    scripts. Explanation and relevant code below.
    a
    • 2
    • 7
  • g

    Gagan Singh Saluja

    11/30/2021, 1:16 PM
    Hi all , im trying to run an ecs agent over ec2 , prefect server, I can see fargate task are getting spawn at ecs , but it stops immediately, and my prefect flow is stuck in scheduled state
    a
    m
    • 3
    • 24
  • g

    Gagan Singh Saluja

    11/30/2021, 1:20 PM
    Exit code 1 Command [“/bin/sh”, “-c”, “prefect execute flow-run”]
    a
    • 2
    • 1
  • p

    Payam Vaezi

    11/30/2021, 7:10 PM
    We have a flow task run that finished with the status of
    RUNNING
    instead of
    SUCCESS
    or
    FAILURE
    and resulted in
    PENDING
    status in all downstream tasks. Can you please advise what such behavior happened and what we need to do on our side to address such behavior?
    k
    • 2
    • 11
  • b

    Bruno Kuasney

    12/01/2021, 8:59 AM
    Hi guys! I’m trying to set up prometheus on prefect (inside the flow, so each time the flow runs I send something to prometheus endpoint), I tried with the example you provide on pushgateway.py (at the dockstring of the run function). But, ok, the app is not breaking (meaning that probably is working), but also, apparently is not doing anything (I cannot hit the endpoint provided on 
    pushgateway_url
     parameter). how is suppose to work that example since I’m not able to hit the endpoint? I also kinda forced the endpoint to be created by using
    start_http_server(8000)
     from 
    prometheus_client
     . Like I said, is not breaking the code, so maybe is working, I’m just not able to see it somehow.
    a
    d
    t
    • 4
    • 18
  • w

    William Clark

    12/01/2021, 12:29 PM
    Hello! I am running into an issue with Docker Storage and a flow that is stored as a script. I am using the dockerfile parameter to build a custom image where the flow is stored is. The issue is that the healthchecks fail because it's looking for the dockerfile that was used in the flow. I didn't copy it over into the docker image because it wasn't relevant to the flow itself. Has this been something that you any of you have ran into? I can disable the healthchecks but that doesn't seem like the best path.
    a
    • 2
    • 2
  • g

    Gagan Singh Saluja

    12/01/2021, 3:18 PM
    Hi , I have four files 1 main file 3 helper files, main imports these files, I am using s3 storage, now When I am registering main file and running , it says modulenotfound error, please help
    k
    • 2
    • 5
  • l

    Lana Dann

    12/01/2021, 7:37 PM
    Is there a way to skip a flow registration using python logic? I have one
    flows
    directory that registers the same flows in different runtime environments, but some flows I only want to run in
    staging
    and not in
    production
    , for example. I register all staging flows using:
    prefect register -p lib/flows/ --project {project} --label staging
    k
    b
    • 3
    • 3
  • c

    Christopher Chong Tau Teng

    12/02/2021, 11:42 AM
    Hi @Anna Geller @Kevin Kho, according to the code structure here describing how to use Docker agent, I have some questions. Assuming the following is my directory
    -flows/
    ---flow_1.py
    ---flow_2.py
    -src/
    ---task_flow_1.py
    ---task_flow_2.py
    -Dockerfile
    and samples from each file:
    flow_1.py
    from datetime import timedelta
    from prefect import Flow
    from prefect.schedules import IntervalSchedule
    from prefect.storage import GCS
    from prefect.run_configs import DockerRun
    
    import sys
    sys.path.append('.../src')
    from task_flow_1 import task_test_flow
    
    schedule = IntervalSchedule(interval=timedelta(minutes=1))
    
    with Flow("test-flow-1", schedule) as flow:
        task_test_flow()
    
    flow.storage = GCS(bucket="xxx")
    flow.run_config = DockerRun(image="xxx/prefect:v1")
    flow.register(project_name='docker-runner-01')
    task_flow_1.py
    import prefect
    from prefect import task
    import numpy as np
    
    @task
    def task_test_flow():
        logger = prefect.context.get("logger")
        test_arr = np.array([1, 2, 3])
        <http://logger.info|logger.info>(f"{test_arr}")
    Dockerfile
    FROM prefecthq/prefect:latest
    WORKDIR /app
    ADD . .
    RUN prefect backend server
    Now assuming I have registered both flows with the server and they are running as expected. One day suddenly,
    flow_1
    breaks and I need to change
    task_flow_1.py
    to fix the bug. I then updated the following image to v2 in
    flow_1.py
    .
    flow.run_config = DockerRun(image="xxx/prefect:v2")
    I then built a new docker image v2 and pushed to
    xxx/prefect:v2
    . Here’s my question: before I register these 2 flows with the server, do I also need to update the image in
    flow_2.py
    to use
    xxx/prefect:v2
    , or can it continue to use
    xxx/prefect:v1
    ?
    a
    k
    • 3
    • 4
  • v

    Vaibhav Ariyur

    12/02/2021, 8:10 PM
    is it possible to pass parameters to the callable called by the
    on_failure
    param of a flow? Should the function called by
    on_failure
    be able to read your flow's results?
    k
    • 2
    • 17
  • l

    Lana Dann

    12/02/2021, 9:20 PM
    hi, how can i set
    PYTHONPATH
    for storage? for context, i’m using gitlab storage for my flow and flows are in
    lib/flows
    but when the flow tries to run, it errors with
    ModuleNotFoundError: No module named 'lib'
    when the flow tries to access the storage.
    k
    • 2
    • 10
  • g

    Gagan Singh Saluja

    12/03/2021, 7:23 AM
    Hi , if the server goes down for some reason, how to ensure that the data is persistent? Because every time I close and start the server I loose the track of the previous runs and metrics in UI
    a
    a
    • 3
    • 14
  • j

    jack

    12/03/2021, 4:31 PM
    How to track reruns? We've been calling
    client.create_flow_run()
    to create several flow runs (using ECSRun), and then polling each with
    client.get_flow_run_state
    to know when all the flows have completed. When one of the flows fails (and prefect starts a new flow run to take its place), how can we check when the rerun is complete (and whether it succeeded)?
    k
    • 2
    • 17
  • j

    jack

    12/03/2021, 6:20 PM
    How to detect state == FAILED as soon as heartbeat fails? We have a flow set up to run on ECS that purposefully consumes all the available memory on the container. (We want to make sure we can handle such edge cases.) We monitor the prefect logs, and this message comes through:
    No heartbeat detected from the remote task; marking the run as failed.
    For 20+ minutes following that log message, fetching the flow run state from prefect cloud still shows
    <Running: "Running flow.">
    Ideally, as soon as the flow run is marked as failed, state from prefect cloud would say Failed. Suggestions?
    k
    a
    • 3
    • 25
  • l

    Lukas N.

    12/03/2021, 6:51 PM
    Does
    CloudFlowRunner
    support graceful shutdown? We're running flows as Kubernetes jobs on EC2 spot instances, which get terminated from time to time. Let's say the job starts a pod
    a
    on a node that terminates. Kubernetes will quickly reschedule it and spawn a pod
    b
    , but
    b
    does nothing because the state of the tasks is
    Running
    , but they are not (
    a
    is dead). We need to wait for the heartbeat to timeout for Prefect to reschedule it which takes a long time. Instead setting something like
    Pending
    or even
    Failed
    to tasks that are
    Running
    when a SIGTERM is received would be nice.
    k
    a
    • 3
    • 9
  • b

    Benson Mwangi

    12/05/2021, 1:10 AM
    Hi, I'm new to prefect and I was wondering if it's possible to trigger a flow through the graphql endpoint. I am currently self-hosting a prefect server and would like to trigger a flow from a separate system running off a UI. Would appreciate any insights or materials that can lead me to a solution. I've successfully installed prefect and able to execute some flows (via linux terminal) including scheduled flows but what I'm looking for is the ability to trigger a flow externally via the graphql api. Sorry for the beginner question 🙂
    k
    • 2
    • 3
  • s

    Sylvain Hazard

    12/06/2021, 7:55 AM
    Hey everyone ! Hope you had a great weekend ! I'm having troubles regarding flows memory footprint. I have a bunch of flows that look like this :
    ids = get_ids()
    step_1 = do_step_1.map(ids)
    step_2 = do_step_2.map(step_1)
    step_3 = do_step_3.map(step_2)
    The number of ids retrieved can vary by a few orders of magnitude and I cannot predict it. The issue I see is that while the flow runs, the memory footprint keeps increasing which sometimes results in an OOM kill of the pod running the flow. Is there any way to have the memory footprint be near constant with regard to the number of executed mapped tasks in the flow ? I understand that the initial mapping requires a bunch of memory and that there is no way around it. I am running on K8S, using a LocalDaskExecutor (threaded) and had hoped that depth first execution would mean there would be some amount of garbage collecting with fully executed branches. Would setting up a
    Result
    in the mapped tasks help in any way ? I tried manually releasing memory inside the tasks code (with
    del
    and
    gc
    mostly) but saw no improvement. Another solution I see would be to have steps 1-3 be executed in their own separate flow but that means we would spin up a bunch of ephemeral pods and lengthen the flow overall I suppose ? Thanks !
    a
    k
    • 3
    • 3
  • r

    Robert Hales

    12/06/2021, 10:03 AM
    Hi there, had a few tasks finish with the state on
    Pending
    which I think lead to double runs. From what I understand
    Pending
    shouldn’t be a terminal state?
    a
    • 2
    • 37
  • g

    Guilherme Petris

    12/06/2021, 10:13 AM
    Hey! I’m trying to figure it out why my flows are registering the run in less than a second, but in this situation if a run the script separately they run for a couple of minutes.
    from analytics_toolbox.analytics_toolbox import *
    from analytics_toolbox.analytics_toolbox import PendoUtils as pendo_utils
    from analytics_toolbox.analytics_toolbox import SnowflakeUtils as snowflake_utils
    from prefect import task, Flow
    from prefect.utilities.notifications import slack_notifier
    
    @task ()
    def main():
    ......
    with Flow("pendo_featureEvents") as flow:
        main()
    
    flow.register(project_name="pendo_api")
    a
    • 2
    • 9
  • f

    Faizan Qazi

    12/06/2021, 10:51 AM
    Hi guys. I am running prefect server locally and trying to access it using the ip_address which i also configured in the
    config.toml
    file . But as i access it at
    <http://ip.add.re.ss:8080>
    , the page displays but then redirects to the getting started page . So basically i want to access my prefect instance from any other machine.
    [server]
      [server.ui]
      apollo_url="<http://ip.add.re.ss:4200/graphql>"
    a
    • 2
    • 15
  • l

    Lukas Brower

    12/06/2021, 3:06 PM
    Hey everyone, my team is seeing issues with executing long running flows, and we aren’t sure if the issue lies in Prefect or the underlying dask scheduler. We have some flow runs that take upwards of 12 hours to complete at times. These runs reliably fail somewhere in the 11:30 -12 hour mark with a top level
    concurrent.futures._base.CancelledError
    We are trying to determine if there is something in Prefect or dask which kills the dask scheduler when execution nears the 12 hour mark. I don’t see any major resource constraints in workers at the time of the flow cancellations, so I figured I would start here and ask if there is anything special about the 12 hour mark in Prefect, or if this is likely a dask-specific issue.
    k
    a
    • 3
    • 5
Powered by Linen
Title
l

Lukas Brower

12/06/2021, 3:06 PM
Hey everyone, my team is seeing issues with executing long running flows, and we aren’t sure if the issue lies in Prefect or the underlying dask scheduler. We have some flow runs that take upwards of 12 hours to complete at times. These runs reliably fail somewhere in the 11:30 -12 hour mark with a top level
concurrent.futures._base.CancelledError
We are trying to determine if there is something in Prefect or dask which kills the dask scheduler when execution nears the 12 hour mark. I don’t see any major resource constraints in workers at the time of the flow cancellations, so I figured I would start here and ask if there is anything special about the 12 hour mark in Prefect, or if this is likely a dask-specific issue.
k

Kevin Kho

12/06/2021, 3:10 PM
Hi @Lukas Brower, Prefect has no default timeouts built in and this error really looks like a Dask one. Are you mapping a large amount of tasks? That can kill the scheduler because there is some memory bloat with the DaskExecutor that we are working on
Large amount meaning maybe 50k tasks and above. If you just have long tasks, then I’m not seeing anything on the Prefect side.
l

Lukas Brower

12/06/2021, 3:14 PM
Got it, thanks for the info Kevin. We only have ~10 tasks max running at a time usually, and I have seen this fail even when just a single worker is executing a long running task and all others are idle. I will focus on looking into the dask side of things.
a

Anna Geller

12/06/2021, 3:16 PM
@Lukas Brower not sure if this can help, but Dask performance reports have been recently added to Prefect - sharing in case it might help with debugging https://docs.prefect.io/orchestration/flow_config/executors.html#performance-reports
l

Lukas Brower

12/06/2021, 3:46 PM
Awesome I’ll try that, thanks Anna
View count: 2