https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • n

    Nick Hart

    02/21/2022, 4:52 PM
    I'm looking to run create_flow_runs using threading for a custom module I'm writing and for some reason I'm getting an attribute error. Below is my test code and the error. Would you know how to fix this? Also, I'm assuming this is not a prefect problem and more of a threading problem, but I was hoping someone would be able to help! Thanks in advance
    from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
    import threading
    
    def thread_flows(flowname):
        print("Running thread for: ",flowname)
        flow_id = create_flow_run.run(flow_name=flowname)
        flow_run = wait_for_flow_run.run(flow_id, stream_logs=True)#
    
    if __name__ == "__main__":
        flow_list = ["FlowA", "FlowB", "FlowC"]
    
        threads = []
        for flowname in flow_list:
            x = threading.Thread(target = thread_flows, args=(flowname,))
            threads.append(x)
            x.start()
    
        for thread in threads:
            thread.join()
    File "/home/test/.pyenv/versions/3.8.6/lib/python3.8/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
      File "/home/test/Documents/create-flow1.py", line 6, in thread_flows
        self._target(*self._args, **self._kwargs)
      File "/home/test/Documents/create-flow1.py", line 6, in thread_flows
        flow_id = create_flow_run.run(flow_name=flowname)
      File "/home/test/.pyenv/versions/3.8.6/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 123, in create_flow_run
        logger = prefect.context.logger
    AttributeError: 'Context' object has no attribute 'logger'
    m
    a
    8 replies · 3 participants
  • a

    Aaron Rnd

    02/21/2022, 4:56 PM
    Hi guys, i have a question. Is it possible to start another flow, once a task in another flow is completed. For illustration :- Flow A Task 1 > Task 2 > Task 3 Flow B (once Task 2 in Flow A is completed on success, run Flow B) Task 1
    m
    6 replies · 2 participants
  • r

    Raimundo Pereira De Souza Neto

    02/21/2022, 7:58 PM
    Hello guys, I'm using
    prefect=2.0a12
    , and I would like to schedule my flow like a cronJob. It's possible with decorator?
    from prefect import flow, task
    
    @task()
    def s1(message):
        print(message)
    
    @flow() # where I put schedule params?
    def update_flow() -> None:
        s1("hello prefect")
    a
    m
    2 replies · 3 participants
  • a

    Anna Geller

    02/21/2022, 8:13 PM
    set the channel topic: Welcome to the Prefect Community! Please use threads if possible so we can archive helpful conversations to: - GitHub: https://github.com/PrefectHQ/prefect/issues?q=label%3A“Prefect+Slack+Community”+ - Discourse: https://discourse.prefect.io/docs
  • p

    Patrick Tan

    02/21/2022, 8:51 PM
    Hi guys, I have flow deployment issue. Agent does not pickup flow registered in different host  Host A 1. I was doing my development on my local Macbook: 2. I created a flow  3. I switched backend to prefect cloud 4. I connected to Prefect cloud with API key. 5. I registered flow to to Prefect Cloud. 6. I started local agent  7. I can see both the flow and agent in Prefect Cloud UI 8. I ran the flow, agent picked up and execute the flow, it worked fine 9. I shutdown the agent Host B 1. I created AWS EC2 instance and deploy my code 2. I connected to Prefect Cloud with same API key  3. I switched backend to Prefect Cloud and started local agent 4. I can see agent in UI 5. However, when I start running Flow I registered earlier, agent does not pickup flow Please advise
    k
    m
    +1
    9 replies · 4 participants
  • d

    Daniel Komisar

    02/21/2022, 10:40 PM
    Hello everyone, is it possible to add/remove labels from agents through the API with
    update_agent_config
    ? Thanks!
    a
    k
    5 replies · 3 participants
  • s

    shijas km

    02/22/2022, 4:39 AM
    hi a quick question we have created prefect flows in our dev env and using prefect cloud to schedule and run now we need to implement ci/cd to automate deployment to prod env, how can we achieve it we are not using docker for prefect what is the easiest method for ci/cd for prefect
    m
    a
    2 replies · 3 participants
  • n

    Noam polak

    02/22/2022, 7:01 AM
    Hey dear community We recently have an issue when we were triggering 3 instances of the same parent flow that triggers a child flows (it's not the common use case but we wanted to test this case also) And we get errors from the prefect-server that goes like this.. - I will write in the thread What can possibly be the cause? Thanks
    a
    k
    11 replies · 3 participants
  • k

    Konstantin

    02/22/2022, 9:48 AM
    Hello everyone, I'm trying to launch a task through the prefect with a call to clickhouse, I did not find the library of the prefect's tasks. Can anyone who has done a similar task, please share the solution, thanks
    a
    6 replies · 2 participants
  • h

    Hedgar

    02/22/2022, 9:49 AM
    Hey guys a quick question I intend to create a task that would use
    awswrangler
    to save my dataframe to s3. my environment is already
    awscli
    compliant, do I need to go through prefect Storage? Secondly if I'm doing all this with aws ec2 remote instance how do I activate my flow. Can I do on the command line:
    python myfirstprefectflow.py
    ? Assuming I have already indicated a schedule in my flow code ? Or do I use crontab to schedule again 🤔. I would be glad if I could be pointed in the right direction on using aws ec2 to run prefect.
    a
    1 reply · 2 participants
  • s

    Steph Clacksman

    02/22/2022, 10:31 AM
    Hello, beginner's question here! I see a lot of examples that are like this:
    with Flow("my-flow) as flow:
        # do some things here
    # EITHER flow.register("project_name")
    # OR flow.run(executor=DaskExecutor())
    I want to run my flow through prefect cloud, so I understand that I need to register my flow , but when I want to run it it looks like I need to edit the code, which seems a bit weird when it's dockerised and running on a remote server. Is it normal to register the flow like above before building the docker image, then switch it to the run command before building? Or is there something I'm missing?
    a
    11 replies · 2 participants
  • d

    Daniel Nilsen

    02/22/2022, 10:44 AM
    HI! I am getting an error*:* 
    ModuleNotFoundError("No module named 'parameters'")
    root
      - src
        - flows
          - flow_1
            - flow_1.py
            - parameters.py
    I am registering the flow with this
    bin/prefect register --project «myProject» --path ./src/flows/flow_1/flow_1.py
    When I run the flow locally with 
    flow.run()
     it works fine 🤔
    a
    2 replies · 2 participants
  • f

    Frederick Thomas

    02/22/2022, 1:02 PM
    Hello all I am attempting to get information from the current flow that I'm in using GraphQL, however, I'm only getting an empty response from the API. I'm almost dead certain that the query is correct as I've used it in the sandbox first. We're using Server for our flows. Any help would be appreciated. Thanks
    👋 2
    a
    s
    +1
    19 replies · 4 participants
  • a

    Anna Geller

    02/22/2022, 2:29 PM
    FYI Slack has some issue - if you can't post your message here, you can use Discourse or submit a GitHub issue
    :upvote: 2
  • f

    Frederick Thomas

    02/22/2022, 4:13 PM
    Is there anybody out there??😞
    👋 4
    c
    k
    6 replies · 3 participants
  • d

    Donnchadh McAuliffe

    02/22/2022, 4:37 PM
    Hi guys, I want my flow to not move onto the next task if one of my tasks is retrying.
    @task(retries=2, retry_delay_seconds=10)
    def task_1(time_to_sleep: int):
        raise Exception()
        time.sleep(time_to_sleep)
    
    
    @task(retries=3, retry_delay_seconds=10)
    def task_2(time_to_sleep: int):
        time.sleep(time_to_sleep)
    
    
    @flow(name="steps_flow_dask", task_runner=DaskTaskRunner(address={dask_address}))
    def flow_to_run_tasks():
        task_1(3)
        task_2(2)
    Currently when
    task_1
    throws the exception
    task_2
    immediately executes. The behaviour I want is that
    task_2
    doesn't execute unless
    task_1
    is successful. Also, if
    task_1
    hits its' maximum number of retries then
    task_2
    should not execute. Any ideas? This is on Orion.
    k
    m
    12 replies · 3 participants
  • d

    David Elliott

    02/22/2022, 4:53 PM
    Hey all, we’re seeing some unexpected / buggy behaviour with Prefect Cloud, hoping you could advise? We have a flow with a daily cron schedule which gets redeployed once a day (as code changes are made). Previously when the new version of the flow is registered (part of our deployment), the old version of the flow becomes archived and the schedules for that old version were cancelled, but over the last week we’ve been seeing double-scheduled flows remaining - one scheduled flow_run for the current flow, and one for the previous (now archived) flow. Wondering if you’ve seen this issue elsewhere? In addition we’ve seen some (likely) related oddities with our Automation which triggers a PagerDuty incident if the flow doesn’t start within the first 5mins. It looks as though these phantom archived–but-still-scheduled flows are trying to run, hitting our concurrency limit of 1 and stalling, triggering PagerDuty, but then when we click through on the flow_run link, the flow run literally doesn’t exist (either in the UI or in GraphQL). It’s super weird. Hope someone can advise! We’re the
    Deliveroo
    prefect tenant, happy to send over some specifics / URLs if helpful
    k
    a
    25 replies · 3 participants
  • g

    Gonzalo Stillo

    02/22/2022, 5:04 PM
    Hi all, Is there any special flags or procedures for when deploying agents in servers that connect to the internet through a proxy? I have this wired behavior that I been fighting with since yesterday. I start the agent normally from CLI (Docker Agent) and It’ registers successfully (it even shows that in the Cloud UI). When I try to quick-run a flow from the Cloud, the flow is not being picked up by the agent even though the labels are exactly the same. in fact in the Agents section message about “Late submittable runs” starts to appear. Maybe some kind of caching in the proxy? Thanks in advance.
    k
    8 replies · 2 participants
  • b

    brian

    02/22/2022, 5:21 PM
    Hi all, I’m trying to use a secret in a state handler but it doesn’t seem to be working
    k
    16 replies · 2 participants
  • m

    Mehdi Nazari

    02/22/2022, 5:28 PM
    Hey All, I’m having an issue with a flow that is registered with the Hybrid model approach where I have a docker image and the flow with a DockerConfig to run it from the cloud. This was working in a the past but as of a few days ago, flow run gets triggered but no run is being performed. This leaves the run in Submitted state with a few attempts to re-run and Lazarus service finally kills it. I have other flows registered/installed and working within the same environment so I a little puzzled as to why this flow does not run. Any debugging suggestion would be appreciated.
    k
    3 replies · 2 participants
  • m

    Matthew Seligson

    02/22/2022, 5:56 PM
    If a task run can be in a “Running” state multiple times in its lifetime, should there be an identifier that uniquely identifies a particular “run” of a task run? We could take task run ID + task_run_count to come up with an identifier for this, but I’m wondering if Prefect has thought about this or plans to support this in the future?
    k
    1 reply · 2 participants
  • s

    Scott Aefsky

    02/22/2022, 6:57 PM
    Hi all. I have a couple of ETL flows that are going to run on a large quantity of data the first time they run, but much smaller datasets every subsequent run. What I want to do is run with a DaskExecutor on the first run, but a default executor on subsequent runs. I was wondering if it's possible to choose an executor at runtime based on a parameter, or if there is another way to achieve what I'm looking for without having multiple flows. Thanks!
    k
    8 replies · 2 participants
  • j

    Jacqueline Riley Garrahan

    02/22/2022, 8:05 PM
    Is there an alternative/direct way to get all flow runs via flow id besides:
    runs = FlowRunView._query_for_flow_run(where={"flow_id": {"_eq": id}})
    k
    1 reply · 2 participants
  • c

    Constantino Schillebeeckx

    02/22/2022, 8:59 PM
    I'm running flows on ECS using fargate; is it normal to see
    RUNNING
    tasks like those shown below which have been running for days? Are those truly still running? Am I being billed for idle compute here?
    a
    k
    18 replies · 3 participants
  • c

    Chris Martinez

    02/22/2022, 9:00 PM
    Hi all, I am running into an issue when running a flow where the repository is no longer accessible when upgrading to
    0.15.12
    from
    0.14.21
    Failed to load and execute Flow's environment: NotGitRepository('No git repository was found ...
    I am using the git storage interface and have no issues when using version
    0.14.21
    a
    1 reply · 2 participants
  • m

    Matthias

    02/22/2022, 9:04 PM
    Stupid question, is there an equivalent of
    prefect run
    in Orion? Or do you have to run flows with
    python flow.py
    ?
    a
    k
    2 replies · 3 participants
  • l

    Lee Cullen

    02/22/2022, 9:11 PM
    Hey all, I'm getting the following error when I try to run a flow via cloud using Bitbucket storage config:
    Failed to load and execute Flow's environment: ValueError('No flows found in file.')
    . The flow context manager is defined within the body of a function that returns a flow, I'm wondering if that is the reason why cloud can't find the flow?
    a
    k
    12 replies · 3 participants
  • l

    Leon Kozlowski

    02/22/2022, 9:28 PM
    Does the DbtShellTask work with the open-lineage dbt? (
    dbt-ol run
    )
    🙌 1
    a
    a
    5 replies · 3 participants
  • w

    Wesley Jin

    02/22/2022, 10:08 PM
    Hey Prefect community! I’m using ECS task runs with Github storage for my flows but running into the following error when running the task:
    Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'constants'")
    I’m importing constants which will be shared across many flows: the project name, ECS cluster name, Github Token secret name from a
    constants.py
    file in the same directory. Running the flow locally with
    flow.run()
    works just fine as expected, but when running it in ECS
    constants
    is not found. How do I register other module dependencies with Github storage, if possible? Or do I have to add these constant values to each of my flow files? Thank you! Example below:
    . (repo root)
    |____flows
    | |____hello.py
    | |____constants.py
    | |______init__.py
    a
    k
    4 replies · 3 participants
  • j

    Jack Chang

    02/22/2022, 10:17 PM
    Is there any documentation on having a main flow and sub flows that can branch off from the main one?
    a
    2 replies · 2 participants
Powered by Linen
Title
j

Jack Chang

02/22/2022, 10:17 PM
Is there any documentation on having a main flow and sub flows that can branch off from the main one?
a

Anna Geller

02/22/2022, 10:19 PM
Yes, we already have quite a lot of content regarding the flow-of-flows orchestration pattern on Discourse including a 3-part blog series about that - check this out here and if you have any specific question about this, feel free to ask
j

Jack Chang

02/22/2022, 10:21 PM
Perfect! (or should I say prefect) i will start there! Thanks!
:prefect: 1
👍 1
View count: 5