https://prefect.io logo
#prefect-community
Title
# prefect-community
a

alex

05/10/2022, 7:00 PM
Hello, I use a cron schedule for my prefect flows with the following format "m h * * *". Today a subset of my flows did not run at all, and there were no changes/new registrations made recently. Has anyone else encountered this issue? The previous and scheduled flows look good to me
1
k

Kevin Kho

05/10/2022, 7:46 PM
How often is your flow running? The scheduler has a max of 10 runs schedules at a time and runs on a 90 or 120 second loop. So it could be that you have more than 10 runs before the schedule schedules the new runs?
a

alex

05/10/2022, 7:52 PM
once a day
k

Kevin Kho

05/10/2022, 7:53 PM
Oh I see. Did not run means late? I think you agent may have died?
a

alex

05/10/2022, 7:57 PM
It just wasn't scheduled at all. I realized I didn't capture the banner from my previous screenshots. If the agent died, I believe the flow would just have been stuck in a stuck state.
Could it be an issue with how I setup my flow? I'm using Prefect cloud.
k

Kevin Kho

05/11/2022, 4:41 AM
This really seems like a Flow stuck in a scheduled state which means there is no agent to pick it up
upvote 1
a

alex

05/11/2022, 2:13 PM
In that case wouldn't it show up as a solid yellow line?
k

Kevin Kho

05/11/2022, 3:17 PM
Ahh I see what you are saying. Will ask someone
a

Anna Geller

05/11/2022, 3:33 PM
I don't have enough information as of now to determine the root cause. Could you share more details? 1. Output of
prefect diagnostics
2. The flow code of the flow that fails 3. What agent type do you use and how did you start it? 4. Did you set the schedule via code or UI?
it's most likely a label mismatch
a

alex

05/11/2022, 4:59 PM
Yeah sure
k

Kevin Kho

05/11/2022, 5:36 PM
i took a look and scheduled is normally not a concrete bar
a

alex

05/11/2022, 7:24 PM
Isn't that in the case where the time for the flow hasn't arrived yet. If the flow has been scheduled to run at some previous time and is unable to do so (ie. concurrency limits or no agent) they turn solid yellow.
a

Anna Geller

05/12/2022, 9:38 AM
in the image those are late runs which are scheduled runs that are queued up but are not getting picked up by an agent, most commonly due to a label mismatch - could you answer my questions?
a

alex

05/12/2022, 2:39 PM
Yup, in my case I did not see the solid yellow bars, which I would expect to happen if the flows were queued but had a label mistmatch/no agent. This is what I actually saw https://prefect-community.slack.com/archives/CL09KU1K7/p1652212636550839?thread_ts=1652209237.209749&cid=CL09KU1K7 As for your questions. 1.
Copy code
{
  "config_overrides": {
    "cloud": {
      "agent": {
        "auth_token": true
      }
    },
    "context": {
      "secrets": false
    }
  },
  "env_vars": [
    "PREFECT__LOGGING__EXTRA_LOGGERS"
  ],
  "system_information": {
    "platform": "Linux-5.4.0-1030-aws-x86_64-with-Ubuntu-20.04-focal",
    "prefect_backend": "cloud",
    "prefect_version": "0.15.4",
    "python_version": "3.6.12"
  }
}
2. Some examples below 3. local agent.
prefect agent local start -l invenloado
running in a tmux session (I know this isn't recommended) 4. Via code
Copy code
def to_prefect_schedule(self):
        start_date = pendulum.today(tz=self.timezone)
        return CronSchedule(self.cron, start_date=start_date)



def time_to_schedule(time_str):
    """
    converts time in HH:MM format to a daily prefect schedule

    :param time_str: time in HH:MM, where HH is in 24h format
    """
    hour, min = time_str.split(":")
    time = f"{min} {hour} * * *"
    sch = MetaCronSchedule(cron_str=time, timezone="Canada/Eastern")
    return sch.to_prefect_schedule()


def get_flow(
    general_config: Conf,
    coll_name: str,
    shard_base: int,
    shard_id: int,
):
    client_name = general_config.prefect["project_name"]

    es_host = general_config.host
    index_name = general_config.index_name

    with prefect.Flow(f"invenloado-{client_name}") as f:
        store_id = prefect.Parameter("store_id", default=None)
        shard_base = prefect.Parameter("shard_base", default=shard_base)
        shard_id = prefect.Parameter("shard_id", default=shard_id)

        InjectInventory(
            client_name=client_name,
            es_host=es_host,
            es_index=index_name,
            coll_name=coll_name,
            shard_base=shard_base,
            shard_id=shard_id,
            store_id=store_id,
            task_args=dict(tags=["inv-augmentation"]),
        )

    return f




def deploy(ctx, shard_base, shard_id, project, time):
    if shard_id >= shard_base:
        raise click.BadOptionUsage("shard-id", "shard-id must be < shard-base")
    prefect_schedule = time_to_schedule(time)

    general_config: Conf = ctx.obj["general_config"]

    flow = get_flow(
        general_config,
        coll_name=coll_name,
        shard_base=shard_base,
        shard_id=shard_id,
    )
    flow.schedule = prefect_schedule
    flow.register(project_name=project, labels=["invenloado"])
I have a bash script that iterates over ~10 projects and deploys a flow for each. The flow structure does not change with the client, just some configs. I have a concurrency limit with the
invenloado
label of 1, so only one runs at a time. I've been using this workflow for a month and I haven't had any issues and I didn't re-register any flows around the time I saw this issue (this Tuesday). Only 2/10 flows I had got queued that day. All flows queued and ran as expected yesterday and today, with no changed haven been made on our end.
Thank you for looking into this 🙂
a

Anna Geller

05/17/2022, 1:58 PM
thank you so much for this detailed description! I don't see any errors in your setup directly but there are some things that could make things easier to troubleshoot and also later migrate to Prefect 2.0 e.g. if you leverage the
prefect register
CLI instead of using this custom class, this would automatically register all flows in a given directory without you having to do any looping etc similarly I see no storage definition and if you don't define it explicitly, using flow.register() will pickle your flow object which can be hard to unpickle and cause issues when registering from a different place than the actual flow - again moving to register CLI would make things easier as this would ensure you are using script storage rather than pickle storage https://docs.prefect.io/orchestration/flow_config/storage.html#pickle-vs-script-based-storage @alex let us know if this is resolved already
a

alex

05/18/2022, 2:37 PM
Thanks for your response @Anna Geller. Would I be able to use the cli command with my flow structure? I have a
Copy code
def get_flow(
    general_config: Conf,
    coll_name: str,
    shard_base: int,
    shard_id: int,
):
function that I call multiple times with different
general_configs.
I've run into the same issue today and a few times in the previous week. Today, only 3/10 flows in the project actually ran. They're scheduled to run very close together, but we've set the concurrency limit to 1 so some of them enter a late state before they run. Today only the first 3 flows in order of schedule were queued and ran successfully. There's no trace of the other 7. I'm concerned that they are just not being queued after being schedule. We have 1,000+ flows running everyday (with different structures and in different environments), so could it be a load issue?
k

Kevin Kho

05/18/2022, 2:40 PM
Are you using .map? Could you show the map call?
k

Kevin Kho

05/18/2022, 2:42 PM
Do you see all 10 registered?
a

alex

05/18/2022, 2:44 PM
Yup they're all registered. I registered them all around 2 weeks ago and haven't made any changes since. They all run fine some days, like yesterday but then on some days, like today some or all are just not queued up. In these cases, I manually run them using the UI or the gql api and they all run successfully.
k

Kevin Kho

05/18/2022, 3:14 PM
So for the ones that don’t run, when you navigate to the Flow page, do you see anything scheduled? Or none at all? When you navigate to the Schedule under Settings, is it what you expect to see?
a

alex

05/18/2022, 3:18 PM
Yup, this is what I see: https://prefect-community.slack.com/archives/CL09KU1K7/p1652212636550839?thread_ts=1652209237.209749&amp;cid=CL09KU1K7 All the scheduled flows look right and match my assigned schedule
k

Kevin Kho

05/18/2022, 3:20 PM
And on the timeline view, do you see the missed runs or are they just gone?
a

alex

05/18/2022, 3:23 PM
They are just gone
k

Kevin Kho

05/18/2022, 3:24 PM
Do you see the future scheduled ones though?
Yesterday's ran, tomorrow's and onwards are scheduled but today's has just disappeared
k

Kevin Kho

05/18/2022, 3:28 PM
Sorry my bad. Was at the onsite last week and just recovered from COVID so I haven’t paid full attention. I understand everything now, and I am super confused. I would try checking agent logs to see if it picked up anything from today?
a

alex

05/18/2022, 3:29 PM
No worries, hope you feel better!
k

Kevin Kho

05/18/2022, 3:31 PM
I am good now thank you 🙂
🙏 2
❤️ 1
a

alex

05/18/2022, 3:36 PM
Yup 3/10 flows for the projects ran so the agent did pick them up. The agent logs look good to me too, I see flows being picked up.
k

Kevin Kho

05/18/2022, 3:36 PM
The “missing” flows are picked up too? or the agent just got the 3/10?
a

alex

05/18/2022, 3:47 PM
It's hard to say because I only see the flow run id in the agent logs which I can't get because I can't find the flow run in the UI. I can try going through the logs and comparing the number of flow runs in the agent logs vs actual flow runs.
k

Kevin Kho

05/18/2022, 3:48 PM
I think that would help if possible so we can see if the missing runs actually even hit the agent.
And then if we find the agent isn’t even getting them, I propose you get some of the Flows that are missing runs, and the list all of the scheduled flow run ids. Then when something doesn’t run, we can look up that individual flow run id specifically since we’ll have it, and then we can try to figure out what happened
a

alex

05/18/2022, 3:52 PM
Sure I'll look into that
Yup, the number of flow runs in the agent logs match exactly the number of flows that actually ran. So we're missing log entries for 7 of the flows. I'll collect some flow run ids, that's a good idea.
k

Kevin Kho

05/18/2022, 3:59 PM
Just making sure you know you can use the GraphQL API to get the ids so you don’t handwrite them
a

alex

05/18/2022, 4:00 PM
One more thing: I have another project using the same agent and flow structure, just pointing at a different database. The flow names are the same between both projects. In theory it shouldn't matter but I remember having an issue when I first starting using prefect while running flows with the same name between different projects so though I would bring that up as well. The flows in the other project run into the same issue too, sometimes they all run (like today) and sometimes they don't. The flows in that project are scheduled to run before the current project; all 10 of those ran, and then 3 of the ones in the current project.
Yup, I'll use the GQL api
k

Kevin Kho

05/18/2022, 4:02 PM
Not 100% confident, but I don’t think it should matter because the flow run creation api endpoint takes in an id. This sounds like a dev-staging-prod setup which people do use
a

alex

05/18/2022, 4:07 PM
Yup that's what it is.
I'm seeing this issue with my GraphQL queries 🤔
Copy code
query flowruninfo {
  flow_run(where: {name: {_eq: "wise-flounder"}}) {
    name
    flow {
      name
    }
    logs {
      message
      timestamp
    }
  }
}
Copy code
{
  "errors": [
    {
      "message": "Unknown argument \"limit\" on field \"flow_concurrency\" of type \"Query\".",
      "extensions": {
        "code": "GRAPHQL_VALIDATION_FAILED"
      }
    }
  ]
}
k

Kevin Kho

05/18/2022, 4:12 PM
I can actually run that query on the Interactive API. Are you doing it in the Interactive API? or through Python?
a

alex

05/18/2022, 4:13 PM
Interactive API
Nvm, it seems it was running another query I had while I was trying to run this one. All good now sorry
k

Kevin Kho

05/18/2022, 4:14 PM
Ah ok was about to ask that. No worries!
a

alex

05/18/2022, 4:14 PM
Not really relevant but this is the failing one
Copy code
query Test {
  flow_concurrency {
    label
  }
}
k

Kevin Kho

05/18/2022, 4:16 PM
Yeah will ask the team about that
a

Anna Geller

05/18/2022, 4:19 PM
The thread is too long, didn't read it 😄 why do you run this query? concurrency can be checked from the concurrency page in the UI
a

alex

05/18/2022, 4:20 PM
I was just testing something 😛 This one's not a concern for me
👍 1
a

Anna Geller

05/18/2022, 4:20 PM
ahh gotcha - for this one the team says it's a known issue so you are not doing anything wrong
k

Kevin Kho

05/18/2022, 4:20 PM
I think he doesn’t really need it for now. The idea is just to collect the scheduled flow run ids, and then when one of them doesn’t run on schedule, we can look up that flow run id and see if we can find anything about it
👍 1
a

Anna Geller

05/18/2022, 4:22 PM
thx, what is the underlying problem behind it?
@alex could you try to summarize the problem you are facing and which steps have you taken so far to find the root cause?
a

alex

05/18/2022, 4:57 PM
I have deployed 10 flows that have the same structure just different configs (database url etc.) to a project. The flows are scheduled to run daily at a certain time and have a concurrency limit of 1. Some days, the flows run successfully as expected but some days the flow runs for some or all just disappear. They do not show up as failed or queued or cancelled. This is a screenshot of what I see. This is my code + prefect diagnostic info. I have: • deployed flows 2 weeks ago so no changes were made to cause sudden failures. • am able to run the flows with missing runs successfully using the UI and GQL API • checked the agent logs to confirm that the missing flow runs have not been picked up by the agent • confirmed that the agent is functioning and there is no label mismatch • used the graphql api to confirm that there are no flow_runs created for the missing ones • captured a list of scheduled flow runs and their ids and will share with @Kevin Kho if this issue happens again.
a

Anna Geller

05/20/2022, 5:07 AM
Thanks for explaining your use case more and sorry for getting back to you late. Could you try deploying a simple hello-world flow such as this one to see whether this works as expected? this way we can isolate whether this is an issue with the flow code vs. with the storage/run_config/execution layer
Copy code
from prefect import task, Flow


@task(log_stdout=True)
def hello_world():
    print("hello world")


with Flow("hello") as flow:
    hw = hello_world()
generally speaking with situations such as this, it's always good to approach it step by step - start with a simple flow, then add storage, then add run config, then add a schedule, then add concurrency limits, then test with multiple runs, then extend the workflow to reflect your custom code, etc. this makes it easier to identify the root cause
a

alex

05/26/2022, 2:29 PM
Thanks for the response, I will deploy a simple flow under the same project and monitor it for the next few days
9 Views