https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • h

    Huw Ringer

    10/23/2021, 11:01 PM
    Hi, I’m running the same task multiple times in a flow using some Parameters that make different stuff happen each time, using the
    map
    method:
    process_tasks.map(flow_description, start_task_id)
    My problem is that if one of the mapped tasks fails, the other ones still subsequently run regardless, and I’d like for the whole flow to be cancelled programatically/automatically when that happens. I think the CancelFlowRun API method should be able to do this if I invoke it just before the failed task enters the failed state (via a raise exception). I’m struggling to figure out how to call this thing though, and was hoping someone might be able to help, please. If I try the following:
    prefect.tasks.prefect.flow_run_cancel.CancelFlowRun.run(context.get("flow_run_id"))
    My python engine says it can’t find the
    flow_run_cancel
    method in any imported Prefect module, and I’ve already imported
    task, Flow, Parameter, context, Task, tasks
    (for good measure). I’m clearly doing something very wrong and don’t know enough to figure out what it is. Would be very grateful for any help or suggestions to put me out of my misery. Thanks in advance!
    k
    • 2
    • 14
  • h

    haf

    10/24/2021, 7:37 AM
    Hi, what's a the recommended process of transitioning off Prefect cloud into something self-hosted that can run flows? E.g. is it recommended to set up the server software yourself? It's very important that we can run our flows.
    a
    • 2
    • 1
  • g

    Gabi Pi

    10/24/2021, 12:17 PM
    Hey everyone, I am trying to run a simple flow with a
    RunNamespacedJob
    task over Kubernetes. I am using the following code:
    storage = S3(bucket="gavriel-test", stored_as_script=True)
    kubernetes_run_conf = KubernetesRun(
        env={
            "AWS_ACCESS_KEY_ID": AWS_ACCESS_KEY_ID,
            "AWS_SECRET_ACCESS_KEY": AWS_SECRET_ACCESS_KEY
        },
        labels=["prefect-poc-k8s-agent"]
    )
    body = {
        'apiVersion': 'batch/v1',
        'kind': 'Job',
        'metadata': {'name': 'echo'},
        'spec':
            {
                'template':
                    {
                        'spec': {
                            'containers': [
                                {
                                    'name': 'echo',
                                    'image': 'alpine:3.7',
                                    'command': ['sh -c  "echo Hello!!!"; sleep 10']
                                }
                            ]
                        }
                    },
                'backoffLimit': 4
            }
    }
    with Flow("kubernetes-CreateNamespacedJob", run_config=kubernetes_run_conf, storage=storage) as flow:
        job = RunNamespacedJob(body=body, namespace="prefect", delete_job_after_completion=False)
    But I keep getting the following error:
    Error during execution of task: MaxRetryError("HTTPConnectionPool(host='localhost', port=80): Max retries exceeded with url: /apis/batch/v1/namespaces/prefect/jobs (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f8c8adeb650>: Failed to establish a new connection: [Errno 111] Connection refused'))")
    Do you have any idea how to resolve this?
    a
    k
    • 3
    • 7
  • r

    Royzac

    10/24/2021, 11:08 PM
    This issue, graphviz.backend.ExecutableNotFound: failed to execute 'dot', make sure the Graphviz executables are on your systems' PATH is occuring through my venv. It would seem to me that the executable would come prepackaged with prefect['viz'] or prefect['dev']. I'm using debian/bash. Anyone else come across this issue?
    a
    a
    • 3
    • 4
  • t

    Tony Yun

    10/25/2021, 3:09 AM
    hi, how can I start two tasks simultaneously? Now I see it’s running at a single thread, even two tasks have no dependencies, they cannot run at the same time. Is this even impossible?
    k
    s
    • 3
    • 4
  • l

    Lukáš Polák

    10/25/2021, 7:32 AM
    Hi everybody, did anybody using local agent notice any interesting spikes in memory usage? We use local agent and Dask executor. Our load is not that intensive at the moment - we run ~30 Flows during that spike. Regularly, we run into situation when the local agent's memory consumption spikes from ~ 150MB to 1.5-2.1GB. Has anybody had similar experience? At the moment, we're diving into debugging and profiling the issue so I'll post our findings here, if they turn out to be relevant.
    a
    m
    m
    • 4
    • 14
  • g

    Giovanni Giacco

    10/25/2021, 10:15 AM
    Maybe it’s a stupid question but… How can I define the task name with the “_*create_flow_run*_” function?
    a
    • 2
    • 2
  • r

    Richard Hughes

    10/25/2021, 2:07 PM
    Good Morning - I am experiencing an outage on my end w/ self hosted agents something is not allowing my agents to pickup flows - is anyone able to help me - are ports 8080 and 4200 the firewall rules I should check - not sure where to begin
    k
    a
    • 3
    • 52
  • p

    Philip MacMenamin

    10/25/2021, 2:50 PM
    Hi, at the top of my flow I'm doing sanity checking type stuff, if something isn't correct I
    raise signals.FAIL('Didn't work')
    I do not want the flow to proceed. However, other downstream tasks occur despite the exception being raised. Is this expected behavior?
    k
    • 2
    • 34
  • j

    jcozar

    10/25/2021, 4:07 PM
    Hi everybody! I have a flow that is scheduled to be run once every day. However, I would like that the executions during the weekend are moved to the next weekday (monday), which can be done with the adjustment function adjustments.next_weekday. However, I need the “real” date value (saturday or sunday) to run each flow correctly. Is there a way to use the original scheduled time before the adjustemnt? I don’t see any context variable for that. Thank you all in advance!
    k
    • 2
    • 10
  • k

    Kevin

    10/25/2021, 4:20 PM
    Hi - I have a flow that retrieves a list of keys from S3. Each key represents a .zip file which is an archive that when unzipped contains 5 different csv files. I am trying to map over those keys and write each file within the archive to Azure Storage. I am having issues understanding how to handle the zipped object - which at times should be unmapped. The working code I have right now only writes out the data associated with the last file in the list
    k
    • 2
    • 41
  • i

    Ihor Bondartcov

    10/25/2021, 5:26 PM
    Hi! I need little advice: how to update parameters using view. My default param is [{"1":"a"}, {}]. I want to add value [{"1":"a"}, {"2":"b"}] using view. I choose List in parameter type and put in my json. But prefect show me next value ["{\"1\":\"a\"}, {\"2\":\"b\"}"]. How to create list with objs using view?
    k
    • 2
    • 9
  • t

    Tony Yun

    10/25/2021, 6:56 PM
    hi, is it possible to add trigger in Prefect provided tasks? for example, I want to add a
    always_run/all_finished
    trigger to this task, but I can’t simply just add it to the end of parameters. Because this is not a
    @task
    way of definition, don’t know how to:
    deletePod = kpod.DeleteNamespacedPod(kubernetes_api_key_secret=None)
    k
    • 2
    • 3
  • s

    Steve s

    10/25/2021, 7:56 PM
    hi all! I'm trying to use
    create_flow_run.map(...)
    to run a single flow multiple times with different parameters. I have that working, but unfortunately I need the created flows to run in a specific order. is there a way to force that behavior?
    k
    • 2
    • 2
  • k

    KhTan

    10/25/2021, 11:37 PM
    Hi! New to prefect and have a quick question regarding multiple flows. If i have a daily task and a weekly task that is the summary of the daily, should i start 2 local agents and register 2 flows? If the weekly runs every Monday for the previous sun-sat, should i set upstream even though it doesn’t require the same day’s daily task to finish running? thanks in advance.
    n
    k
    a
    • 4
    • 7
  • d

    davzucky

    10/26/2021, 12:40 AM
    Orion specific question, Why when I start orion agent
    orion agent start
    and I set the following env variable to connect to the orion server
    PREFECT_ORION_API_HOST: orion_server
          PREFECT_ORION_API_PORT: 4200
    the agent still try to create a database in this case using sqllite on the agent?
    c
    • 2
    • 9
  • d

    davzucky

    10/26/2021, 12:44 AM
    How can we see the agents connected to an orion server from the UI ?
    m
    • 2
    • 2
  • t

    Tara

    10/26/2021, 4:30 AM
    Hi ! I’m wondering if there’s a way to run a flow every x minutes after completion of the previous flow run (as oppose to having a fixed scheduled interval) in prefect ?
    k
    • 2
    • 3
  • g

    Gabi Pi

    10/26/2021, 6:53 AM
    Hi! I have 2 questions about flows versioning - 1. Is it possible to run archived version of a flow? 2. The documentation states:
    You could override the automatic version promotion to unarchive and enable old versions (for example, for A/B testing)
    How can I do that?
    a
    k
    • 3
    • 12
  • j

    Jai Deo

    10/26/2021, 8:02 AM
    Is there a good example of having classes defined in a separate file and using that class in a task (on Kubernetes and Azure) - I keep getting the message module not found
    a
    k
    • 3
    • 9
  • t

    Thomas Furmston

    10/26/2021, 8:46 AM
    Hi, Probably a noob question, but is there an equivalent to Airflow sensors in Prefect? In particular, suppose there is another team running an ETL that updates a core table in a data warehouse and I want my flow to start when once that table has been updated. In airflow I would use a sensor on that table. Is there something equivalent in prefect?
    s
    a
    k
    • 4
    • 14
  • e

    Eric Feldman

    10/26/2021, 8:56 AM
    hi, is there a way to wait for a flow run to end other than polling
    get_flow_run_info
    and until the state is finished?
    a
    k
    • 3
    • 85
  • w

    Will List

    10/26/2021, 10:13 AM
    I'm looking for the way to get a flow to run with other classes, which are defined in a separate file. The idea being that we can encapsulate business logic in a class that multiple flows can refer to. We're not looking to create an installed package, just host the class file alongside the flow file. Is that able to be done? Just importing the class gives the flow error: "Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'doMath\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')"
    a
    • 2
    • 6
  • m

    Milly gupta

    10/26/2021, 11:53 AM
    Hi All, Can we label Prefect agent after starting the agent? For e.g I have multiple test environments and I want to add another label to agent and use the same agent across.
    a
    • 2
    • 1
  • t

    Toprak Nihat Deniz Öztürk

    10/26/2021, 12:15 PM
    Hi. I am refactoring. I divided my flows into 3 categories: • pipelines: Takes input from upstream pipe and feeds downstream pipe. Pipes are tube together with flow.update(pipe_file_reader) , flow.update(pipe_transform) and so on. • python parametered flows: Takes input such as pandas dataframe through 'Parameter' task. These are for paralelism with create_flow_run.map(flow_name) • production flows: End product flows These will be the sub folders of pipelines in my project. Any idea is appreciated!
    a
    k
    • 3
    • 4
  • e

    Eric Feldman

    10/26/2021, 3:04 PM
    hey, is it possible to data between flows using dependent flows?
    def fetch_result(data):
        # data ?
        raise 
    
    class GetData(Task):
        def run(self):
            return {'data': 7}
    
    recipe_flow = StartFlowRun(flow_name="batch", project_name="proj", wait=True)                
    with Flow(name='schedule') as sched:
        recipe_flow.set_upstream(GetData(), key='parameters')
        FunctionTask(fetch_result)(data=recipe_flow)
    but the only thing I get as 
    data
     is 
    prefect.engine.signals.SUCCESS
    object calling 
    data.state
    isn’t really helpfull, and 
    data.state.result
     I get the 
    prefect.engine.signals.SUCCESS
      all over again is there any way to get the real tasks/flowrunid/data of the the inner flow?
    k
    m
    • 3
    • 11
  • j

    Josh

    10/26/2021, 3:51 PM
    Is it possible to run a docker image with different environment variables in the container as specified by a Flow parameter? Locally, I can do this via
    docker run -env FOO=BAR <my_docker_image>
    I would like to be able to execute a flow on this image with a Docker Agent by calling the flow with the parameters
    {
      "FOO": "BAR"
    }
    Is this possible with Prefect? Or is there a Prefect idiom for such a concept? Use cases for changing environment variables on container create/flow execution would be 1. defining which customer code path we want to trigger (database, configs, special methods and permissions) 2. Setting how we want to persist data (which database type to use, how to cache)
    m
    k
    • 3
    • 17
  • h

    haf

    10/26/2021, 5:39 PM
    Do you know if there exists a pattern for varying the location of a result and hence the result type? My use-case is that on my laptop I don't want to re-run every task unless either the task definition has changed or the inputs have changed; instead I just want to use a cached result. When running in prod in prefect-cloud, when it's working again, I want to save to GCS but locally on the laptop, I'm fine with local storage. It would seem I can't set
    @task(result=...)
    because I then have to specify the result type to be either local or GCS. How do you handle this?
    a
    • 2
    • 2
  • d

    Dominic Pham

    10/26/2021, 6:57 PM
    Hi all! I have a task that calls a SQL query and returns a list, and I want to iterate over that list and pass even chunks of the list to a scrapy spider that is called in another task and wait between each chunk. However, I'm running into an issue with LOOP where it is only passing the last chunk of the list to the scrapy spider.
    @task
    def query_that_will_return_a_list(): -> list
    
    @task
    def scrapy_api_call_chunks(title_list):
        loop_payload = prefect.context.get("task_loop_count", 0)
        title_list_grouper = list(grouper(title_list, 10))
        if loop_payload <= len(title_list_grouper):
            # Each loop will be an iteration of 10 titles. # of loops * 10 will result in the total number of titles looped over so far
            raise LOOP(message = 'Running the next 10 items in job titles list')
        scraper_class = Scraper()
        scraper_class.instantiate_web_scraper(title_list_grouper[loop_payload - 2])
    I feel like I don't fully understand how to utilize LOOP in the context of passing information to another function inside the task.
    k
    • 2
    • 8
  • s

    Samuel Hinton

    10/26/2021, 10:43 PM
    Hey all! Was wondering if someone could help me out in configuring a prefect agent to make it a bit more robust. After we had an outage this evening, a massive backlog of flows occurred. This would be fine, except that the agent we have (a local agent run as a service in the docker compose) immedietly tries to launch all the delayed jobs, which makes it run out of memory. We get the Lazarus errors, the agent restarts, and same thing happens. You can see in the screenshot of service memory the constant agents restarting, using all available memory, and eventually crashing. Are there any settings/env variables I can set to try and a) limit the number of concurrent jobs the agent sends out and b) stop whatever is using memory? What exactly would cause an almost instant multiple GB memory usage in the agent?
    k
    a
    k
    • 4
    • 43
Powered by Linen
Title
s

Samuel Hinton

10/26/2021, 10:43 PM
Hey all! Was wondering if someone could help me out in configuring a prefect agent to make it a bit more robust. After we had an outage this evening, a massive backlog of flows occurred. This would be fine, except that the agent we have (a local agent run as a service in the docker compose) immedietly tries to launch all the delayed jobs, which makes it run out of memory. We get the Lazarus errors, the agent restarts, and same thing happens. You can see in the screenshot of service memory the constant agents restarting, using all available memory, and eventually crashing. Are there any settings/env variables I can set to try and a) limit the number of concurrent jobs the agent sends out and b) stop whatever is using memory? What exactly would cause an almost instant multiple GB memory usage in the agent?
k

Kevin Kho

10/26/2021, 10:46 PM
Hey, this is one of the reasons Automations was introduced so you can cancel flows that have not kicked off after X number of minutes. For flow level concurrency, you can check these docs. These are both Cloud only features
s

Samuel Hinton

10/26/2021, 10:49 PM
Hey Kevin! We are not using prefect cloud, and one of the big product points when we set it up a while ago were the statements that server and cloud would share the same functionality. Are there no solutions for this for prefect server users?
Ideally, if we have a flow that is scheduled every 5 minutes, I would want to say “Look, I dont care if youve missed the last 20 of them, dont try and run all 20 of then, just run the last one”
k

Kevin Kho

10/26/2021, 10:52 PM
Kind of but a bit more complicated. You could try a flow level state handler to hit the GraphQL API to check how many of that flow is running, and then you could end the flow as success
s

Samuel Hinton

10/26/2021, 10:54 PM
I am open to any and all solutions as this just took down my prod env data source and I need to ensure it cant happen again if at all possible. Is there any chance youd be able to provide a scaffolding or a few pointers so that I can begin looking at this in the morning?
Also, would you have any updates on https://github.com/PrefectHQ/server/issues/213, I notice the PR was closed due to difficult, but maybe theres someone else giving it a shot?
I raised this quite a while ago in a similar issue, but would be great to see if its still on the roadmap?
k

Kevin Kho

10/26/2021, 11:00 PM
I think Michael detailed it was hard to make performant here
I don’t think there’s much left on the roadmap because as we move to Orion, the roadmap of the current Prefect core is more about stability and performance
s

Samuel Hinton

10/26/2021, 11:02 PM
Thats fair enough, I do look forward to watching the update videos once theyre out and about. Regarding a flow handler with graphql mixed in, are there any examples in the current doco I can use whilst I pray for a speed delivery of orion?
Also, will orion be offered as server and cloud like prefect? Data security contracts and networking issues prevent us from using a cloud based solution
k

Kevin Kho

10/26/2021, 11:12 PM
Not quite but I can detail this out. You can use a flow level state handler that uses the
Client().graphql()
to hit the API. The state handler would be applied on the scheduled -> running transition. In this state handler, use the Client to query for the number of flows running.
query {
  flow (where: 
    {name: {_eq: "..."},
     project: {name: {_eq: "bristech"}},
     archived: {_eq: false}}) {
    name
    project {
      name
    }
    archived
    flow_runs(where: {state: {_eq: "Running"}}){
      id
      state
    }
  }
}
And then use Python to count the number. If the number of flows is at the concurrency you want for that flow, then just return SUCCESS
Orion is a server+agent in one. Cloud form will take a different form, but with Orion, Server and Cloud features will be very aligned
Because everyone will be on Orion
s

Samuel Hinton

10/26/2021, 11:14 PM
Awesome, Ill see if I can get something working. Ah, to be more explicit, if we have an existing swarm/etc will we be able to integrate orion into our swarm like can with prefect server?
k

Kevin Kho

10/26/2021, 11:15 PM
I’d like to say less because server will all fit in one container now. It’s not a matter of spinning up a ton of containers as it’s backed by sqllite
s

Samuel Hinton

10/26/2021, 11:15 PM
Ah, sounds promising. Thanks for the info, and Ill let you know how I get on with the flow handler 🙂
k

Kevin Kho

10/26/2021, 11:16 PM
I have recommended this a couple of times and noone has come back to me about it so I assume it works
🙏 1
s

Samuel Hinton

10/26/2021, 11:37 PM
As a potentially secondary solution, is there also a way to signify the maximum time Delta for rescheduling. Where I could say "if you're more than an hour late, just fail and get out of the way"?
k

Kevin Kho

10/27/2021, 1:12 AM
That...is the Automation in Prefect Cloud 😅. But it requires services running in our backend that oversee all of the flows not available in Server
a

Anna Geller

10/27/2021, 9:29 AM
btw @Samuel Hinton you mentioned:
I would want to say “Look, I dont care if youve missed the last 20 of them, dont try and run all 20 of then, just run the last one”
There is also a UI feature to clear late runs with one click.
s

Samuel Hinton

10/27/2021, 9:54 AM
Yeah thats what I ended up using to get the environment back to runnable conditions, just trying to ensure that I dont have PagerDuty calling me again in the early morning to do it manually haha
💯 1
k

Kevin Mullins

10/27/2021, 5:50 PM
I’m also interested in this. I have a flow that under ideal circumstances will run very quickly, but occasionally might have to “re-snapshot” a source and run long. I believe this would fall into the same category where I would like to setup scheduling to only allow a single concurrent flow and only run a single instance again on the next run if there were missed events.
k

Kevin Kho

10/27/2021, 5:52 PM
I don’t think the agent concurrency will be revisited anytime soon as it was attempted but not performant. The workaround though is outlined above. Let’s see if Samuel reports back?
👍 1
k

Kevin Mullins

10/27/2021, 5:53 PM
Sounds good. I’ll end up trying the same thing if I get to it first 🙂
s

Samuel Hinton

10/27/2021, 5:55 PM
It's at the top of our backlog for next sprint so hopefully I'll check back in early next week with a full query and python snippet
:thank-you: 1
Hey @Kevin Kho, would you know how to, in a state handler, get the flow run id? Im trying to figure out how exactly to check to see if the current flow run for the handler is the latest one scheduled (so I dont kill it, but can kill everything else). But figuring out the flow run ID (or its randomised name) inside the handler has stumped me, so I’m probably overlooking something simple. This is the code so far:
def state_handler(flow: Flow, old_state: State, new_state: State) -> Optional[State]:
    if old_state.is_pending() and new_state.is_running():

        client = Client()
        now = dt.now(tz=pytz.UTC)
        result = client.graphql(
            """{
    flow(where: {
        archived: {_eq: false},
        name: {_eq: "%s"}
    }) {
        name
        archived
        flow_runs (where: {
        state: {_in: ["Scheduled", "Retrying", "Running"]}, 
        scheduled_start_time:{_lte: "%s"}
        }) {
        scheduled_start_time
        start_time
        name
        state
        id
        }
    }"""
            % (flow.name, now.isoformat())
        )
        # These flow runs will be everything thats scheduled to start in the past and
        # might have built up.
        flow_runs = result["data"]["flow"]["flow_runs"]

        # I dont want to run another task if:
        # 1. Theres already a flow in the running state
        # 2. If there are multiple scheduled, only the latest one should be run
        any_running = any([f["state"] == "Running" for f in flow_runs])
        if any_running:
            return Cancelled("Existing tasks are already running")
        
        scheduled = [f for f in flow_runs if f["state"] in ("Pending", "Scheduled")]
        if len(scheduled) > 1:
            last_scheduled_time = max([dt.fromisoformat(f["scheduled_start_time"]) for f in scheduled])
            this_flow_run = None 
            # How do I get the flow run id? It doesnt seem to be in the Flow, and not in the State either

    pass
k

Kevin Kho

10/29/2021, 2:09 PM
Just do
prefect.context.get("flow_run_id")
inside it
s

Samuel Hinton

10/29/2021, 2:10 PM
Ahhh theres a funky little context box, perfect, thanks mate
Ah would
flow.context.get(…)
work too, or is
prefect.context
going to do some scope matching or similar to know whats going on?
k

Kevin Kho

10/29/2021, 2:19 PM
I don’t think that will work because context is not attached to it. Is it?
s

Samuel Hinton

10/29/2021, 2:23 PM
Youre right, I was confusing it with
flow_run_context
inside Flow itself, but its never persisted as an attribute, apologies
k

Kevin Kho

10/29/2021, 2:42 PM
No worries!
s

Samuel Hinton

10/29/2021, 2:55 PM
Hmm, trying to manually set a schedule to enable testing of this, but its hitting me with the old “Interval can not be less than one minute when deploying to Prefect Cloud.” I am not deploying to prefect cloud, but to prefect server on my own hardware. Is there a way to turn off this validation that you know of?
k

Kevin Kho

10/29/2021, 2:57 PM
Will check
Looks hardcoded so I don’t think it’s configurable
s

Samuel Hinton

10/29/2021, 2:59 PM
Ah well, if its client side then I guess I can just edit the code haha
Theres a server side check too, damn. Okay, Ill figure out some other way of testing
@Kevin Mullins @Kevin Kho reporting back with a working state handler.
import datetime
from datetime import timedelta, datetime as dt
import json
import os
import gc
from typing import Optional

import pytz
import requests
import pandas as pd
import prefect
from prefect import task, Flow
from prefect.engine.state import Failed
from prefect.utilities.notifications import slack_notifier
from prefect.engine.signals import SKIP
from prefect.engine.state import Cancelled, State
from prefect.client import Client


def concurrent_handler(flow: Flow, old_state: State, new_state: State) -> Optional[State]:
    if old_state.is_pending() and new_state.is_running():

        client = Client()
        now = dt.now(tz=pytz.UTC).replace(microsecond=0) + timedelta(seconds=1)
        # Replacing microseconds because graphql api cant always handle the number of decimals
        result = client.graphql(
            """{
    flow(where: {
        archived: {_eq: false},
        name: {_eq: "%s"}
    }) {
        name
        archived
        flow_runs (where: {
        state: {_in: ["Submitted", "Queued", "Scheduled", "Retrying", "Running"]}, 
        scheduled_start_time:{_lte: "%s"}
        }) {
        scheduled_start_time
        start_time
        name
        state
        id
        }
    }
}"""
            % (flow.name, now.isoformat())  # Sorry for % operator, but those {} make it a pain
        )
        # These flow runs will be everything thats scheduled to start in the past and
        # might have built up.
        logger = prefect.context.get("logger")
        # This might fail if the GraphQL cant find anything, but havent seen this in practise
        flow_runs = result["data"]["flow"][0]["flow_runs"]

        # I dont want to run another task if theres already more than one flow running
        # For me, Im happy to have two running at once, as API issues means we can get timeouts and
        # hangs that dont terminate easily. For other use cases, Id generally say to cancel if theres
        # any running
        num_running = sum([1 if f["state"] in ("Running", "Retrying") else 0 for f in flow_runs])
        if num_running > 1:
            msg = "Existing tasks are already running"
            <http://logger.info|logger.info>(msg)
            return Cancelled(msg)

        # And if there are multiple scheduled, only the latest one should be run
        scheduled = [
            f for f in flow_runs if f["state"] in ("Pending", "Scheduled", "Queued", "Submitted")
        ]
        if len(scheduled) > 1:
            last_scheduled_time = max(
                [dt.fromisoformat(f["scheduled_start_time"]) for f in scheduled]
            )
            this_flow_run_id = prefect.context.get("flow_run_id")
            matching_runs = [f for f in scheduled if f["id"] == this_flow_run_id]
            if not matching_runs:
                <http://logger.info|logger.info>(f"Current id is {this_flow_run_id}")
                <http://logger.info|logger.info>(f"Flow runs are: {scheduled}")
                return Cancelled("Nope")
            this_run = matching_runs[0]
            this_run_time = dt.fromisoformat(this_run["scheduled_start_time"])
            if this_run_time != last_scheduled_time:
                msg = "Multiple scheduled tasks, this is not the last one"
                <http://logger.info|logger.info>(msg)
                return Cancelled(msg)

    return new_state
Will allow a max of two concurrent running jobs (can easily change this to one), and if multiple jobs are scheduled (ie your agent was down for a while and is now back up), only the last one will execute and the others will log and cancel. Not all the imports are necessary, havent filtered out the imports from the other handlers and common functions in the file.
:thank-you: 3
k

Kevin Mullins

11/02/2021, 2:35 PM
Awesome! Thank you @Samuel Hinton very much for this.
k

Kevin Kho

11/02/2021, 2:36 PM
Nice. This is worth archiving in the server repo. Want me to do that or do you want to post it?
s

Samuel Hinton

11/02/2021, 3:01 PM
Feel free to do it however you’d like @Kevin Kho 🙂
k

Kevin Kho

11/02/2021, 3:04 PM
https://github.com/PrefectHQ/server/issues/307
👍 1
View count: 3