https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • b

    Ben Welsh

    03/31/2022, 8:28 PM
    Are there are any examples of passing environment variables into the Docker storage image with the flow script?
  • b

    Ben Welsh

    03/31/2022, 8:28 PM
    In my imagination, I'd like to have os.getenv calls at the very top of my flow.py that can pull in private vars in production for credentials and such.
    k
    • 2
    • 5
  • b

    Ben Welsh

    03/31/2022, 8:28 PM
    Working in Google Cloud and scratching my head a bit
  • e

    Erik Schomburg

    03/31/2022, 9:07 PM
    Hello! I’m running a pretty extensive pipeline on a large EC2 machine (we’re still trying to get setup to port it to prefect cloud), and have some bottleneck tasks in the middle that collect the results of a bunch of previous mapped tasks and do some calculations across the whole dataset. The bottleneck tasks don’t manage to succeed, but they don’t explicitly “fail”, either, earlier tasks just re-run (collecting the checkpointed results) and then the bottleneck task starts again, but again does not finish… I can’t figure out what the problem is, I can run through the task outside of prefect if I load all the data, so I’m not sure if it’s a silent memory issue on the worker running the large task, or some time-out issue with the scheduler, or something else I’m not thinking of. Has anyone else run into similar behavior and have a suggestion for what to do to diagnose and/or fix the problem?
    k
    • 2
    • 8
  • c

    Chu Lục Ninh

    04/01/2022, 12:51 AM
    Will it be crazy if I think I can use Prefect as CI/CD tool 😁 , Prefect has most of the thing out of a CI/CD pipeline, graph dependencies, notification, artifact, task libraries
    👀 1
    k
    • 2
    • 2
  • a

    Atul Anand

    04/01/2022, 1:12 AM
    Hi , Inside task I am using a third-party library or you can say other module python script and bound the volume to the agent. Still got an issue Module not found error. Is there any way to solve this or do tasks have some restriction that they can not call library out of scope? In simple, How can we use external modules?
    c
    k
    • 3
    • 22
  • k

    Ken Nguyen

    04/01/2022, 3:27 AM
    Hi, I would like to get flow_run_id from a flow created via create_flow_run(), how would I be able to do that? For context, I’m trying to create a FlowRunView of a child flow created by create_flow_run().
    k
    • 2
    • 13
  • a

    Andrei Aldescu

    04/01/2022, 7:47 AM
    Hi , i am runing the tutorial to get the first heloworld in perfect.io , but i get this error message in the import statement >>> from perfect import task, Flow Traceback (most recent call last): File "<pyshell#2>", line 1, in <module> from perfect import task, Flow ImportError: cannot import name 'task' from '<unknown module name>' (unknown location)
    a
    • 2
    • 2
  • i

    Iván Sánchez

    04/01/2022, 7:49 AM
    Hello! I want to build a flow several components of which need to load a heavy model before they do their work. This model takes more than two minutes to load, therefore I'd like to do some sort of stateful component that can have the model loaded in memory and when data comes to it, it just has to do the .predict(). I've seen that such thing as “stateful components” do not exist, and I shouldn't rely on a component's state because. Is there any guideline or best-practice on how to achieve this? Maybe Prefect is not intended for this and the components can't be so heavy? Thank you
    m
    a
    k
    • 4
    • 15
  • d

    Domenico Di Gangi

    04/01/2022, 8:49 AM
    Hi all, I am trying to run the orion server on a machine and open the ui from a different machine on the same network. When I type in my browser http😕/{ip_server_machine}:4200 the first page of orion-ui loads fine but it does not fetch any info on deployments or current runs. I am not a javascript expert but, it seems that it tries to fetch data from http://127.0.0.1:4200/api while it should probably use http😕/{ip_server_machine}:4200 . Is there a way to see the ui from a different machine, i.e. not from the same machine where the orion server is running?
    a
    v
    • 3
    • 9
  • a

    Andrei Aldescu

    04/01/2022, 11:14 AM
    How to access the Prefect CLI ? is this something that I download on a windows machine ?
    a
    • 2
    • 2
  • p

    Patrick Tan

    04/01/2022, 12:00 PM
    Hi, Flow A repeatedly (loop) run Flow B. I want run next instance of Flow B after current Flow B is completed. I ran below and 2 instances of the same flow run at same time, and flow-id is same. see screenshot.
    with Flow("parent-flow") as flow:
    
        for i in range(2):
            flow_a = create_flow_run(flow_name="livelots flow", project_name="LiveLots-ETL")
            wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)
            print(i)
    a
    k
    • 3
    • 23
  • m

    Matthew Seligson

    04/01/2022, 12:14 PM
    What’s the best practice for running a flow of flows locally?
    a
    • 2
    • 4
  • m

    Maria

    04/01/2022, 3:09 PM
    Hello, is there a way to see in the UI which code is being run like in Airflow?
    k
    • 2
    • 3
  • w

    Wei Mei

    04/01/2022, 3:30 PM
    Hi! In my flow I call my tasks like this. I am experiencing a problem where the snowflake_load is running before the upload_to_s3 task.
    today, dir = get_data(endpoints=api_endpoints)
        upload_to_s3(today, dir, endpoints=api_endpoints)
        snowflake_load(today, schema="statistics", endpoints=api_endpoints)
    k
    • 2
    • 22
  • d

    Donnchadh McAuliffe

    04/01/2022, 3:51 PM
    Hey, I've signed up to Orion Cloud to continue my investigation of Prefect for my engineering team. I deployed a simple flow and am just trying to run the flow via a simple API request:
    POST <https://api-beta.prefect.io/api/accounts/123/workspaces/456/deployments/789/create_flow_run>
    
    PAYLOAD
    {
        "state": {
            "type": "SCHEDULED",
            "message": "Quick run through UI"
        }
    }
    
    Headers {Authorization: Bearer api_key}
    Getting a
    SSL Error: Certificate has expired
    . Any ideas?
    k
    • 2
    • 3
  • j

    Joshua Greenhalgh

    04/01/2022, 4:00 PM
    Hi wonder if someone could help me - I am trying to implement a combination of module storage with kubernetes job run config - my process is to build a python module with all my flows within a docker image - push this image to a registry and then specify this image as the one that the kubernetes job runs - now when I register these flows with cloud if the code in the module changes I would expect to see a version bump however I just get the following message on subsequent registrations;
    Collecting flows...
    Processing 'flows':
      Building `Module` storage...
      Building 'first_flow'... Done
    Writing output to 'flows.json'
    ========================== 1 built ==========================
    Collecting flows...
    Processing 'flows.json':
      Registering 'first_flow'... Skipped (metadata unchanged)
    ================== 0 registered, 1 skipped ==================
    k
    • 2
    • 7
  • a

    Amogh Kulkarni

    04/01/2022, 9:49 PM
    Hi Prefect team. Is there any documentation on what is the fundamental difference between Flow ID, Flow Group ID and Version Group Id? I could spot few differences but haven’t quite understood the need for having three Ids.
    k
    • 2
    • 1
  • k

    Ken Nguyen

    04/02/2022, 9:05 AM
    I currently have a flow that is set up like so:
    with Flow("data-quality-tracking-model-run-duration-flow", run_config=RUN_CONFIG, storage=STORAGE) as flow:
    
      dbt_run_flow_run_id = create_flow_run(
          flow_name="test-dbt-run-flow",
          project_name="data_quality_tracking"
      )
    
    
      flow_run = wait_for_flow_run(
          dbt_run_flow_run_id, raise_final_state=True, stream_logs=True
      )
    
    
      flow_logs = get_logs(
          dbt_run_flow_run_id,
          task_args={"name": "Getting logs", "trigger": all_finished},
          upstream_tasks=[flow_run],)
    get_logs
    is a function that takes in a
    flow_run_id
    , create a
    FlowRunView
    , then get the logs of that
    FlowRunView
    . My issue is that despite having both upstream_task defined AND a FlowRunView.get_latest() for the
    get_logs
    task, I’m still getting logs that are incomplete. Do you have any suggestions for why my
    get_logs
    function is prematurely retrieving logs of the child flow/retrieving incomplete logs?
    a
    e
    • 3
    • 12
  • r

    R Zo

    04/02/2022, 11:42 AM
    Hi prefect team,
    
    I am trying to spin up a flow of flows, so I started a prefect server and agent with label "test". Below is a snippet of code that should run child_flow_1 which is a flow with multiple tasks. However child_flow_1 does not run while prefect quickly returns a few success logs and there is no error reported. What I am missing? I have run child_flow_1 without the use of flow of flows and it works fine. Following is a snippet of the log using Dask, but I have tried LocalDaskExecutor as well.
    
    test_flows created
    Flow URL: <http://localhost:8080/default/flow/cd6dc2bb-8f54-4f5a-b046-d90e5c853dbe>
     └── ID: e405d852-5c75-4001-815e-619687e592d3
     └── Project: test_flows
     └── Labels: ['mymachinexxx', 'test']
    [2022-04-02 22:30:01+1100] INFO - prefect.FlowRunner | Beginning Flow run for 'test_flows'
    [2022-04-02 22:30:01+1100] INFO - prefect.DaskExecutor | Connecting to an existing Dask cluster at <tcp://192.168.20.9:8786>
    [2022-04-02 22:30:02+1100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    
    
    test_flows = make_three_flows()
    for tfl in test_flows:
            tfl.register("test_flows")
            # tfl.executor = LocalDaskExecutor(scheduler="threads",num_workers=10)
    
    with Flow("parent_flow", run_config=LocalRun(labels=["test"])) as parent_flow_complete:
            working_dir = Parameter("working_dir")  #
            create_flow_run(
                flow_name="child_flow_1",parameters={"working_dir": working_dir})
    
    parent_flow_complete.executor = DaskExecutor(address="<tcp://localhost:8786>")
    # parent_flow_complete.register("test_flows")
    parent_flow_complete.run(parameters={"working_dir": working_dir},)
    a
    • 2
    • 5
  • j

    Jai P

    04/02/2022, 5:06 PM
    👋 hello folks! I suspect the answer to this will be "use prefect 2.0" but wanted to ask: my company is investigating prefect for the use case of storing "DAG-y" transformations elsewhere (perhaps, behind some service interface), and dynamically executing them at runtime. I think prefect 1.0 won't solve this since the
    flow
    needs to be defined up front, but correct me if i'm wrong. I'll try to add a few more details in the thread, and thanks in advance for your help!
    k
    a
    • 3
    • 12
  • s

    Shiyu Gan

    04/03/2022, 2:36 AM
    Can someone verify they can open the case study docs on this page? https://www.prefect.io/why-prefect/case-studies/ I can only open the Jellyfish case study.
  • s

    Shiyu Gan

    04/03/2022, 2:36 AM
    The ones below simply do not do anything when I click to open then.
    a
    k
    a
    • 4
    • 3
  • a

    Atul Anand

    04/03/2022, 1:13 PM
    Just want to from which API can I create a dependent task? , A, B,C,D like these are my task and D task need to be executed when A and B task fails and C task will be executed with the input results of A and B.
    a
    k
    • 3
    • 5
  • j

    Jai P

    04/03/2022, 6:44 PM
    Hi there! Can you visualize a flow in prefect 2.0, similar to
    flow.visualize()
    in prefect 1.0 (not using radar)? If not, are there plans to add that functionality?
    a
    • 2
    • 14
  • b

    Bruno Nunes

    04/03/2022, 6:54 PM
    Hello, I went through the steps in this tutorial to deploy prefect orion 2.0b2 in k8s (if you provide a namespace other than default the service and deployment are created but the role and the rolebinding are only applied if namespace = default). When I apply the manifest the service and deployment seem to be done correctly but it doesn't create a work queue as described here. Furthermore, the logs show an unusual IP used for the api URL:
    Configure Prefect to communicate with the server with:
    prefect config set PREFECT_API_URL=<http://0.0.0.0:4200/api>
    Check out the dashboard at <http://0.0.0.0:4200>
    12:51:20.774 | WARNING | prefect.agent - No work queue found named 'kubernetes'
    I've edited the service to be type LoadBalanced and used the external IP to set the PREFECT_API_URL
    prefect config set PREFECT_API_URL=<http://xx.xx.xx.xx:4200/api>
    I've updated the sqlalchemy as suggested in here and created a new work-queue called kubernetes. I've created a new storage pointing to my azure blob storage and created the new deployment and run it. Everything finishes without errors but I don't see any activity in my cluster nor nothing being created in the UI. Can you give me some guidance on what I might be missing?
    a
    • 2
    • 3
  • a

    Atul Anand

    04/04/2022, 2:46 AM
    Inside flow can we use for loop , if else and other programming stuff?
    a
    • 2
    • 2
  • s

    Shiyu Gan

    04/04/2022, 5:56 AM
    Is this answer about Dask Scheduler fault tolerance still up to date?
    ✅ 1
    a
    k
    • 3
    • 2
  • j

    Jeff Kehler

    04/04/2022, 6:18 AM
    I've created a flow that triggers other flows using the
    create_flow_run
    function. I would like to use a Parameter to configure the top level flow but it seems parameters are only really meant to be passed into tasks. Is it possible to use the value from a Parameter within a Flow?
    a
    k
    • 3
    • 17
  • k

    Konstantin

    04/04/2022, 6:56 AM
    Hi Prefect team, in the current one, I can't do anything with the prefect constantly hanging. I need help. First, stop all running tasks in the scheduler, I attach a screenshot below. Secondly, to deal with the problem, fix and prevent the recurrence of this situation. The prefect is located on local servers, in docker, not in a cloud service. Tasks are performed in separate projects. Four agents have been launched that interact with GitLab. The first problem is that the agent freezes while performing the task, the monitoring runs out of RAM. In the task manager, in the docker "top", there are many python *.py launches, where parent ID =-1, kernel, docker container
    a
    k
    • 3
    • 8
Powered by Linen
Title
k

Konstantin

04/04/2022, 6:56 AM
Hi Prefect team, in the current one, I can't do anything with the prefect constantly hanging. I need help. First, stop all running tasks in the scheduler, I attach a screenshot below. Secondly, to deal with the problem, fix and prevent the recurrence of this situation. The prefect is located on local servers, in docker, not in a cloud service. Tasks are performed in separate projects. Four agents have been launched that interact with GitLab. The first problem is that the agent freezes while performing the task, the monitoring runs out of RAM. In the task manager, in the docker "top", there are many python *.py launches, where parent ID =-1, kernel, docker container
a

Anna Geller

04/04/2022, 9:12 AM
it looks like a label mismatch issue - what labels did you assign to your agent and your flows? this page provides a more detailed explanation https://discourse.prefect.io/t/why-is-my-flow-stuck-in-a-scheduled-state/73
k

Konstantin

04/04/2022, 3:38 PM
I checked, each Flow has the label "PROD", as well as the five agents running. According to DevOps, Prefect's task scheduler tries to launch the next task of the same Flow after some time. If you run all agents, all these jobs will be simultaneously launched in this queue for all available agents of the given label. As a result, the Agents will stop working when the resources run out, since the release on DWH is blocked by the subsequent launch of the flow
k

Kevin Kho

04/04/2022, 5:42 PM
Just confirming, you are saying running concurrent flows is causing the issue? I think concurrent flows would still be picked up by the agent though, which they aren’t in the picture. Or are the flows causing the agent to die?
k

Konstantin

04/05/2022, 6:45 PM
Flows dies when run on the same agent at the same time. When an agent is in a state of resource shortage, switching to another agent does not occur.
There is an idea, try to name three agents differently, indicate in the flow the possibility of working on any of them, I think that it will turn out to launch two flows on different agents
k

Kevin Kho

04/05/2022, 8:27 PM
Am re-reading this. Just so you know, there is a cancel button for all the currently scheduled flows in the main dashboard. There is something in Server to prevent agents from picking up the same flow, but this mechanism is more robust in Prefect Cloud. Are you using Cloud or Server? The Lazarus process should also reschedule them if they were not submitted. Yes though Prefect is not designed to be aware or the resources available on an agent. If there are two agents that can pick up a flow, there is no load balancing innately
k

Konstantin

04/06/2022, 8:14 AM
I use the server. I did not find a button to cancel all scheduled tasks. Can you be a little more specific about where this button is located?
k

Kevin Kho

04/06/2022, 2:07 PM
this was added in 0.15.something you can click it.
View count: 18