https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • d

    DL

    03/10/2022, 1:16 PM
    Hey everyone! Quick question (sorry if this has been posted but could not find any answers out there). Is it possible to use the new Prefect Orion together with Airbyte for streaming data pipelines? Or does it only work with the standard Prefect product?
    a
    d
    • 3
    • 3
  • b

    Brian Phillips

    03/10/2022, 2:35 PM
    Has anyone encountered the following error when using the ECS agent? Is there an easy way to increase the number of retries or backoff interval?
    An error occurred (ThrottlingException) when calling the RegisterTaskDefinition operation (reached max retries: 2): Rate exceeded
    k
    a
    • 3
    • 19
  • c

    Constantino Schillebeeckx

    03/10/2022, 3:29 PM
    How can I run two tasks (one that is mapped, the other that is not) in parallel?
    @task
    def sleep_5():
        print(f'sleeping 5')
        time.sleep(5)
        print(f'done sleeping 5')
    
    
    @task
    def sleep_x(x):
        print(f'sleeping {x}')
        time.sleep(x)
        print(f'done sleeping {x}')
    
    
    with DHFlow("foo") as flow:
        sleep_5()
        sleep_x.map([7, 8])
    when I execute the above,
    sleep_5
    is run first; only after it finishes will
    sleep_x
    run.
    k
    • 2
    • 2
  • p

    Pedro Machado

    03/10/2022, 4:26 PM
    Hi there. I have a flow that is orchestrated by Prefect Cloud. It has 1k+ mapped tasks and it failed after processing 250. There are 20 tasks in retrying state showing this message:
    No heartbeat detected from the remote task; retrying the run.This will be retry 1 of 2.
    However, the last message was written 12 hours ago. It does not look like the flow got retried at all. Could someone help me figure out what may have happened? I am using ECS to run the flows. The flow run is
    fed29ba9-66a6-4f0b-aa69-cf9db908bd58
    I went ahead and canceled it but it does not offer me the option to restart it. How can I restart it and preserve the same flow run ID so that the tasks that succeeded are not executed again? Ideally, I'd rely on the task state but if they need to run again, I am hoping that caching will help avoid doing the work again. The cache key is based on the
    flow_run_id
    . Thanks!
    k
    • 2
    • 5
  • o

    Olivér Atanaszov

    03/10/2022, 4:45 PM
    Hi, I'm running
    n_workers
    agents as kubernetes jobs similarly to this:
    # Agent flow
    with Flow("run-agent") as flow_agent:
        ShellTask(agent_command) # launch agent computing tasks
    
    @task
    def run_agent(worker_id): 
        run_agent_flow_id = create_flow_run.run("run-agent",
                                                name=f"run-agent_#{worker_id}")
        return run_agent_flow_id
    
    # Main flow
    with Flow("main") as flow:
        n_workers = Parameters("n_workers", default=2)
        worker_ids = get_worker_ids(n_workers)  # [0, 1]
        run_agent.map(worker_id=worker_ids)
    When the agents finish their tasks, for some reason the kubernetes jobs are not terminating. Inside the job's container apparently the agent's process terminates, but I see
    prefect execute flow-run
    and
    prefect hearbeat flow-run -i ...
    being stuck.
    k
    a
    • 3
    • 14
  • l

    Ling Chen

    03/10/2022, 5:18 PM
    Hi Community! Are there guidelines to free up the previous tasks' memory after they are finished? We are running memory intensive pipelines and run into out of memory error. The output of each task is only needed in the next task, but not in the subsequent tasks. Is there room for memory optimization?
    k
    a
    • 3
    • 8
  • s

    Sarah Floris

    03/10/2022, 6:49 PM
    I am pushing my code to github then want to build it in a docker image and lastly want to run the prefect pipeline as part of our kubernetes orchestration that we already have set up. When I push my code, I would need to build it and then push it to azure container registry. Then, in kubernetes, I need to run a prefect flow like this one https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows/docker_pickle_kubernetes_run_custom_ecr_image.py?
    a
    • 2
    • 28
  • d

    David Yang

    03/10/2022, 7:02 PM
    I use CLI "prefect register --project ${{ parameters.project }} --label $(dockeragent_label) -p flows/" to register all prefect flow files in "flows" folder. This command runs in an azure devop pipeline. But it seems that the command registers all flows with new version although most of them have no metadata changes. The pipeline run always install the latest prefect version. Any idea why that happens? I though the latest prefect version won't create new version if the metadata is not changed.
    k
    • 2
    • 12
  • g

    Gabriel Milan

    03/10/2022, 8:13 PM
    Has anyone faced this before? the schedule for this flow is defined as
    bot_schedule = Schedule(
        clocks=[
            IntervalClock(
                interval=timedelta(hours=1),
                start_date=datetime(2021, 1, 1),
                labels=[
                    ...,
                ],
                parameter_defaults={
                    ...,
                },
            ),
        ],
    )
    At first, we thought it was an UI issue. But then, we've confirmed that the flow wasn't being scheduled at all for this time window. Any ideas?
    k
    m
    • 3
    • 45
  • s

    Stephen Herron

    03/10/2022, 9:38 PM
    hi, if I wanted a task to loop or retry by raising a signal, how would I delay the next iteration of the retry? I’m basically create a task that polls something and I’d might want to check that every 10 minutes or so. Previously, I had a for loop/sleep in the task but I figured this signalling would be a better way?
    k
    • 2
    • 4
  • r

    Rio McMahon

    03/10/2022, 10:33 PM
    I have a flow that appears to time out during a lengthy sql query with an error message
    No heartbeat detected from the flow run; marking the run as failed.
    . The SQL query takes ~45 seconds to run on my local machine so I am curious what interval the zombie killer process polls at and if you have any suggestions for debugging this interruption. The flow/associated scripts run fine on my local machine. Thanks.
    k
    • 2
    • 7
  • b

    Brett Naul

    03/11/2022, 1:02 AM
    noticed that I can no longer select States on the Cloud Hooks page (this is a Slack hook specifically), I know it worked semi-recently....any idea what might be up? cc @nicholas
    a
    n
    • 3
    • 6
  • k

    kevin

    03/11/2022, 4:08 AM
    I have a flow that runs a
    select * from table
    to snowflake using a SnowflakeQuery task. It appears that this task uses
    fetchall()
    to return this data into memory: https://github.com/PrefectHQ/prefect/blob/5d2732a30563591410cb11fe0f7e7dfe65cc5669/src/prefect/tasks/snowflake/snowflake.py#L186 I expect that this causes performance issues with extremely large queries so I am wondering what Prefect Cloud’s tolerance for this. Ideally I think it would be preferable to lazy load query results and/or allow for query pagination? Perhaps there’s an architecture limitation I’m overlooking? I’d appreciate any insight 🙂
    k
    • 2
    • 14
  • s

    satyapal reddy

    03/11/2022, 6:00 AM
    Hello all, myself satyapal from india
  • s

    satyapal reddy

    03/11/2022, 6:01 AM
    intend to use prefect to create workflow in my application
  • s

    satyapal reddy

    03/11/2022, 6:01 AM
    would like to know , whether anyone has tried prefect in this direction?
    k
    • 2
    • 11
  • l

    Liezl Puzon

    03/11/2022, 6:20 AM
    What is the max timeout for a prefect task? I have a task that will run for 2-5 mins. I couldn’t find anything on the pricing page, google, or discord when I searched for “max timeout.” Is there a better term to search for?
    k
    • 2
    • 12
  • j

    J. Martins

    03/11/2022, 9:39 AM
    I have registered a flow in prefect cloud and I am trying to run it in Kubernetes (AKS). Right now I can see that the Kubernetes agent submits the flow for execution and the first task in the flow seems to execute fine until it errors with this message. If I run the flow locally all is fine. Has anyone faced the same problem?
    Untitled.txt
    a
    • 2
    • 26
  • a

    ale

    03/11/2022, 10:36 AM
    Hey folks 👋 We’ve been using Prefect Server for two years now and are really happy with its flexibility! However, keeping Prefect Server up to date is becoming quite expensive (1 Data Engineer + 1 SRE need to spend 1 week every time we want to upgrade). So, we are evaluating the ability to switch to Prefect Cloud. Given that pricing is based on successful task runs, I have few questions: • what’s the best way to get daily task run stats out from Prefect Server? • how does task/flow mapping fit into Prefect Cloud pricing? Regarding the first point, I crafted up a query but I’m not sure it is correct. Any help would be much appreciated! 🙌
    with raw_data as
    (
    	select "start_time"::date as task_run_date,
    	       state,
    	       count(id) as task_run_per_date_and_state
    	from public.task_run
    	group by 1,2
    )
    select state,
           avg(task_run_per_date_and_state)::int as avg_task_runs_per_day
    from raw_data
    where state = 'Success'
    group by 1
    order by 1
    a
    • 2
    • 2
  • v

    Vipul

    03/11/2022, 11:27 AM
    Hello Prefect Community, I have an issue while installing the latest version of Orion i.e. "Prefect 2.0a13". I see the error while deploying the ray[default]>=1.9. I am on Python 3.7.6 and running this on the Linux platform. Does anyone have a similar issue? Thanks
    a
    k
    m
    • 4
    • 43
  • l

    Lalit Pagaria

    03/11/2022, 12:09 PM
    Hi Community. I have query regarding cloud offering. As I see that first 20000 tasks are free and later are charged based on usages. But I am not able to find other details like system limits (memory, cpu, network, storage and runtime limits)
    a
    a
    • 3
    • 23
  • j

    Jean-Baptiste Six

    03/11/2022, 12:42 PM
    Hi 🙂 I missclicked in the UI and canceled a scheduled flow, is it possible to "uncancel" it ?
    a
    k
    • 3
    • 12
  • a

    Adi Gandra

    03/11/2022, 3:32 PM
    Hey, I have a failed flow run on EKS - i’m trying to restart but for some reason when its spinning up the new pod its requesting CPU that the original run config did not set. From run config on the flow run:
    "cpu_request": "4",
    On describe pod that is spun up:
    Requests:
          cpu:     6
          memory:  16Gi
    Why is this happening on the restarted flow run that i’m trying?
    a
    • 2
    • 11
  • c

    Constantino Schillebeeckx

    03/11/2022, 3:45 PM
    I'm working on writing a state handler that executes some logic at the very end of a flow, after all tasks have run. In that logic I end up calling
    StartFlowRun(...).run()
    - when this executes in the cloud I'm seeing: 🧵
    a
    k
    • 3
    • 16
  • c

    Chris Reuter

    03/11/2022, 4:11 PM
    👋 Hi everyone! Just so thankful for you all. Let us tell you, we’re super excited And you should be, too, even very delighted! Until the appearance of ol’ Orion 😮rion: Never before had we felt like we were flyin’ Ceremony and excitement for what’s next How can we contain what we think is the best? We are pumped for the future Endless fun & positivity await Eternally grateful for what we’re all creating together 🚀 Keep up the good vibes community, we’ll see you on Monday!
    :marvin: 3
    🎉 4
    ❤️ 2
    😄 5
    k
    j
    • 3
    • 2
  • a

    Andreas Nord

    03/11/2022, 4:34 PM
    Hi! I am trying to register a flow onto prefect cloud with docker config:
    flow.run_configs = DockerRun(image="myrepo/image")
    flow.register(project_name)
    But it shows up incorrectly as UniversalRun in cloud UI. If I add the runconfig when I define the flow it works perfectly:
    with Flow("myflow", DockerRun(image="myrepo/image") as flow:
    Any suggestion to what I am doing wrong in the first approach would be appreciated
    k
    • 2
    • 5
  • t

    Tim Enders

    03/11/2022, 8:54 PM
    Is there an upper limit on what version of Prefect is compatible with Prefect Cloud? (obviously not Orion/2.0, looking at jumping to 1.1 right now)
    k
    • 2
    • 1
  • b

    Bradley Hurley

    03/11/2022, 9:25 PM
    Hi Folks - I have a question about using the return value from a task as input in another task. I have searched around and am not sure if I am missing something. 🧵
    k
    • 2
    • 17
  • d

    David Beck

    03/11/2022, 9:27 PM
    Hi, I'm working on implementing opentelemetry traces & metrics for python into my Prefect code. Are there any examples or references for how to structure flows and tasks with them? I was hoping to use state_handlers for referencing the current trace, but doesn't seem to work at all.
    k
    t
    a
    • 4
    • 8
  • k

    kensuke matsuura

    03/12/2022, 10:41 AM
    Excuse my first question. I want to access a container that is already started in my local environment and run a job. (I don’t want to use DockerRun to create a container for each job, I want to leave the container running.) I would like to know if there is any way to achieve this with PREFECT.
    k
    • 2
    • 2
Powered by Linen
Title
k

kensuke matsuura

03/12/2022, 10:41 AM
Excuse my first question. I want to access a container that is already started in my local environment and run a job. (I don’t want to use DockerRun to create a container for each job, I want to leave the container running.) I would like to know if there is any way to achieve this with PREFECT.
k

Kevin Kho

03/12/2022, 4:07 PM
How would you do this without Prefect?
Actually, why don’t you just make a Local Agent in the container, and then it can pick up the job from prefect and run it as a local job
👍 1
👀 1
View count: 6