https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
best-practices-coordination-plane
  • s

    Slackbot

    06/13/2022, 6:47 PM
    This message was deleted.
    ✅ 1
    a
    • 2
    • 2
  • d

    Daniel Lomartra

    06/16/2022, 4:47 PM
    Hi all, I am trying to create a flow that can skip a block of tasks when a parameter is set to False. This is similar to the merge example in the docs on branching (Using conditional logic in a Flow | Prefect Docs) except I have no need for the “case(False)” branch. Currently, I have a dummy task that just signals success so that I have something to merge. This feels pretty janky. Is there a cleaner approach? Task:
    @task(name="Skipped Task Branch")
    def skipped_task_branch(skip_block_name):
        logger = context.get("logger")
        <http://logger.info|logger.info>(f"Sucessfully skipped {skip_block_name}")
        raise signals.SUCCESS
    Flow:
    with Flow(
        name="example flow"
    ) as flow:
    myCondition = Parameter(name = "Run Tasks?", default = True)
    with case(myCondition, True):
        task1 = do_some_stuff()
        task2 = do_some_other_stuff()
    with case(myCondition, False):
        skip = skipped_task_branch("task block name")
    example_merge = merge(task2, skip)
    k
    • 2
    • 29
  • i

    Isaac Kargar

    06/17/2022, 7:16 AM
    Hi folks! I want to host Prefect on GCP for production and have a question regarding the architecture. I'm thinking of three solutions. • Hosting it on a VM -> Start VM whenever needed and run the flow, and then stop it. • using Cloud Run and hosting Prefect in a serverless way. This is just for the UI and metadata and logging, right? To run the flow and deep learning stuff, we need to use another machine with GPU or GKE or ...? What do you think are the pros and cons of these solutions? Any other recommendations? Any proposed architecture?
    a
    • 2
    • 1
  • s

    Surawut Jirasaktavee

    06/19/2022, 10:47 AM
    Hi, Need help! How can I deploy Prefect with the others python script that I have created and imported to use it? For example. I created
    search_run.py
    and imported it to
    prefect_deploy.py
    and stuck with the error below.
    a
    k
    • 3
    • 2
  • t

    Tomas Borrella

    06/20/2022, 12:29 PM
    Hello everyone, I have a series of flows that are executed in a chained way (
    upstream_tasks
    ) and at the end of all of them, I would like to execute a last flow only if it is a specific day of the week (for example execute it the last of the flows only on Mondays). I have tried the following code, but the condition is always
    false
    .... what is the good way to do this?
    with Flow('prefect_parent', run_config=config.RUN_CONFIG) as prefect_p_flow:
        a = flow_a()
        b = flow_b(upstream_tasks=[a])
        c = flow_c(upstream_tasks=[b])
    
        if date.today().weekday() == 0:  # Monday
            d = flow_d(upstream_tasks=[c])
    
    prefect_p_flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=4)
    NOTE: I have been looking at
    Schedules
    , but I don’t want the task to run at a specific point in time (because previous flows may take more or less), I would like it to run at the end of the previous task, but only if the day condition is met. Any suggestions are welcome
    ✅ 1
    🔝 1
    s
    a
    • 3
    • 4
  • e

    Edmondo Porcu

    06/21/2022, 12:00 AM
    After studying for a little, I am going to ask some annoying questions: • what is the best way to separate multiple environments in prefect cloud? Multiple accounts? • can the tenant configuration be placed under version control?
    a
    • 2
    • 2
  • f

    Florian Guily

    06/21/2022, 8:29 AM
    Hey, my question is a bit related to @Edmondo Porcu, what are the best practices about flow versionning and especially ETL flows that should interact with different env (and different db depending on the env) ?
    a
    n
    e
    • 4
    • 6
  • i

    Ibrahim Sherif

    06/23/2022, 11:30 PM
    Hello I would like to ask a question related to flows. Lets say I have a simple ETL flow divided into three tasks Extract task, transform task and load task. If i want to run separate tasks from the flow, lets say I want to execute extract only, or extract and transform. Is something like that possible ?
    k
    s
    • 3
    • 2
  • s

    Sylvain Hazard

    06/24/2022, 9:07 AM
    Stumbled upon this discussion and the content is just amazing. Figured it would be nice to post it here.
    🙏🏼 1
    🙌 3
    🙏 1
    a
    b
    • 3
    • 2
  • t

    Travis Leleu

    06/27/2022, 2:47 AM
    What is the recommended way to build a flow-of-flows dependency in the Orion Beta? IE, I want my dbt build flow to kick off only after the upstream ingestion flows complete. I suppose I could try to make everything that has dependencies in the same flow, but that will eventually lead to a massive single flow with scores of tasks. It seems like it'd be better to keep them modular with cross-flow dependencies, what does the community think?
    k
    a
    • 3
    • 3
  • r

    Rohit

    06/27/2022, 11:38 AM
    Hello community! I started with Prefect flows yesterday, and I am interested in the scheduling feature. Is it possible to trigger the execution of a Flow based on: 1. update in the data csv file 2. new data in the database, etc?
    z
    k
    • 3
    • 10
  • m

    martin hablak

    06/29/2022, 8:38 AM
    👋 Hello, team! I would like to get recommendation. I’m attracted to use prefect but not so much to run jobs on agent but as general programming model to create reusable task which are then in turn orchestrated in flows. Problem I have is that I would like to run flow like this:
    flow_run_reference = flow.run()
    for result in flow_run_reference.task_results:
        yield result.json()
    basically stream tasks result using say fastapi as soon as it finishes not waiting until whole flow finish
    ✅ 1
    a
    • 2
    • 2
  • t

    Tom Matthews

    07/01/2022, 9:42 AM
    Hi! If had some gRPC API endpoint that i wanted to trigger a Prefect workflow which runs a bunch of different models and collates and returns some results, what would be the best way to structure that? Something like this?
    @flow
    def pipeline(request):
        # bunch of tasks, some of which run in parallel
        # flow and tasks defined in another script
       return output
    
    ...
    
    def some_grpc_api_endpoint(request):
        return pipeline(request)
    ✅ 1
    a
    • 2
    • 2
  • s

    Sergey Goncharov

    07/01/2022, 3:32 PM
    Hi all! I've already written that question into #prefect-community , but maybe it's better to ask here I'm new here and only started with my way with Prefect. I'd like to ask if someone could give some advice or just a documentation (I haven't found any): I'm using Prefect 2.0/beta, there I'd like to create a deployment for a flow with multiple tasks, each of them should use its own docker image. For Prefect v1 I used CreateContainer from prefect.tasks.docker library, but for Prefect v2 I cannot find any replacement. Please, any advices? Does Prefect 2 have that feature?
    ✅ 1
    d
    • 2
    • 3
  • j

    Jordan Charlier

    07/01/2022, 3:54 PM
    Hi everyone ! I have a question about Flow of flow. I’m trying to do flow dynamically and sequentially because I don’t have a huge server at my disposal. I’m trying to do what’s on the picture. I don’t know how much sources I’ll have. And the ETL is not parallel. Is it possible with prefect, please ? Thanks a lot
    ✅ 1
    a
    • 2
    • 4
  • c

    Chu

    07/05/2022, 6:41 PM
    hi how can I shut down the prefect job in terminal when it is waiting for run
    k
    • 2
    • 1
  • k

    Kevin Focke

    07/06/2022, 3:37 PM
    Hello, in Prefect 2.0 does the dependent flows idiom still exist? https://github.com/PrefectHQ/prefect/blob/b9503001f5de642d48d7d5248436d1e8861cffed/docs/core/idioms/flow-to-flow.md My use case is this: I want to run a Flow upon completion of another flow.
    k
    • 2
    • 2
  • k

    Kevin Focke

    07/06/2022, 3:48 PM
    I also have another question related to Prefect 2.0. I haven't been able to find it in the docs, maybe I don't use the right terminology? Scenario: Multiple scheduled flows of the same name are late. The flow is computation-intensive, and only needs to run once. How can the late behavior of a flow be defined?
    k
    • 2
    • 1
  • r

    redsquare

    07/06/2022, 4:00 PM
    Is there any guidance for checkpointing in 2.0?
    k
    • 2
    • 4
  • v

    Valeria Romero

    07/07/2022, 2:53 PM
    Hi all, I was wondering what the best practice is to run a flow with S3 Storage and ECS Agent that depends on other scripts or files. Currently, I get "ModuleNotFoundError("No module named 'my-python-script'")" when I run my flow in Prefect Cloud. With Docker storage we managed to solve this by providing env_vars = {"PYTHONPATH":"$PYTHONPATH:/path/in/Docker"} and building the Docker image with all of the dependent modules. How could I achieve this with S3 Storage? We changed from Docker to S3 Storage because we would get "Cannot provide
    task_definition_arn
    when using
    Docker
    storage". We use a task definition arn because the task definition is created with Terraform. Maybe we're going about this the wrong way? Thanks in advance!
    ✅ 1
    a
    s
    • 3
    • 7
  • s

    Stefan

    07/10/2022, 10:42 AM
    Hi all! I'm trying to read up on Parameters and how to pass them to a Flow run when triggered. In the Documentation the flow is usually triggered from within the script and a static variable is passed, but what if I want to trigger a flow/deployment with a certain email, list of dates etc to be used in the @flow?
    ✅ 1
    a
    • 2
    • 14
  • f

    Florian Guily

    07/11/2022, 12:56 PM
    Hey, prefect 1 here. I'm close to put etl flows in production. I'm currently using eks with a k8's agent on a fargate node to deploy flows. This agents is only used for dev but i'm creating a staging and prod env. Should i deploy a new agent for each env in this cluster or can i stick with one agent that will handle deployed flows with label
    "dev", "staging", "prod"
    ?
    ✅ 1
    a
    b
    • 3
    • 7
  • l

    Luis Echegaray

    07/12/2022, 4:01 PM
    Morning! QQ, when we start hitting AWS API throttling errors such as:
    An error occurred (ThrottlingException) when calling the RegisterTaskDefinition operation (reached max retries: 2): Rate exceeded
    Is there a prefect construct that will help us queue up the work or do people usually handle this programatically?
    k
    • 2
    • 3
  • j

    Jon Ruhnke

    07/12/2022, 4:22 PM
    Hey all, I'm trying to figure out a very simple method of deploying my prefect flows to DEV and PROD environments, and not sure how to automatically point at environment-appropriate config variables based on which environment I'm deploying/running from. I've created very simple DEV and PROD BAT files to pull latest code from gitlab and update dependencies, then register the flows under an appropriate Prefect project name ("Data Pipelines" for PROD, "Data Pipelines DEV" for DEV), then I also need to somehow set a single "ENV" variable to "PROD" or "DEV" so my flows read from the appropriate variables in my configuration file when they run. My only thought so far is passing an "ENV" parameter into each flow, but not sure if that's the best practice?
    a
    • 2
    • 6
  • m

    Michiel Verburg

    07/14/2022, 2:36 PM
    A question about flow and task versions (Prefect Orion) Hi all, Quick intro for our use-case and how we got to the idea of maybe starting to use flow/task versioning (triggered by the latest release 2.0b8 which includes this option for tasks as well now). The root idea stems from the fact that with CI/CD up and running, automatically deploying new versions to our prefect server, at some point we want to be able to double/sanity check that the correct version of the code is being ran in a flow (because, if flow X has a currently active flow run, and we deploy a new version for flow X, the already running flow will still have the old code and the next flow might have the new code) . Another colleague noticed this: https://orion-docs.prefect.io/tutorials/flow-task-config/ mentioning the usage of GIT_COMMIT_SHA as a version. This seemed nifty so we tried it out. However, we had to do some manual labor to get this to work, to try and get this git commit in the env variables, which currently basically relies on our Makefile and CI/CD, so this seemed a bit cumbersome and not ideal. We managed to make it work. But as an extra we have a lot of code that we need to add to all the flows, so on top of that maybe it would be nice if somewhere we could configure a default for this version using a callback that could be specified somewhere in the task/flow source code. Because currently the only way we could come up with setting a default is by basically overriding the flow/task decorators from Prefect, but that does not feel right. Wondering if anyone has any other takes that might be a bit more clean on tracking what version of code is actually running?
    a
    m
    • 3
    • 2
  • j

    John

    07/18/2022, 2:25 PM
    Hi all-- I am evaluating Prefect (Orion) and had a question. What is the best way to dynamically trigger a scheduled flow run at a specified time? I have a nightly flow that picks up a start time from an API, and I'd like to have it then schedule a different flow to run at that time (only once though). Thanks for any help
    ✅ 1
    k
    • 2
    • 9
  • j

    Jess Zhang

    07/19/2022, 1:01 AM
    https://engineering.atspotify.com/2020/08/how-we-use-golden-paths-to-solve-fragmentation-in-our-software-ecosystem/
  • j

    Jess Zhang

    07/19/2022, 1:01 AM
    thoughts?
    a
    • 2
    • 1
  • c

    Chu

    07/19/2022, 7:32 PM
    Hi community, how is the best practice for implementing parallel dbt jobs using Prefect 1.0? Basically, I need to send different client_id to dbt jobs and trigger dbt run for each client_id (pseudo code as following)
    with Flow as flow:
      for i in id_list:
        dbt_run_function(i)
    (I’m wondering if a simple for loop would achieve parallelism?)
    k
    • 2
    • 1
  • h

    hieu le

    07/20/2022, 9:11 AM
    Hi, i have problem run schedule every minute. when run schedule, states flow change Scheduled to Late , How to fix it ?
    ✅ 1
    a
    • 2
    • 4
Powered by Linen
Title
h

hieu le

07/20/2022, 9:11 AM
Hi, i have problem run schedule every minute. when run schedule, states flow change Scheduled to Late , How to fix it ?
✅ 1
a

Anna Geller

07/20/2022, 10:47 AM
Most likely your agent is not healthy - try restarting it or check the agent logs to troubleshoot this
h

hieu le

07/21/2022, 3:12 AM
I running on server
and running prefect 2.0 latest
a

Anna Geller

07/21/2022, 10:49 PM
is it resolved now? if not, can you provide more info and walk us through your process?
View count: 7