Hey guys, I am trying to create_flow_runs programa...
# prefect-community
k
Hey guys, I am trying to create_flow_runs programatically using
Copy code
from prefect.client import Client
import pandas as pd

if __name__ == "__main__":
    dates = pd.bdate_range(pd.Timestamp("20200601"), pd.Timestamp("20200630"))

    c = Client()
    for d in reversed(dates):
        c.create_flow_run(
            "e04181f3-86b3-4f6f-87cf-ed7c24cb95ec",
            parameters=dict(date=d.strftime("%Y-%m-%d")),
            context=dict(name="John Doe"),
            run_name=f"test_backfill_{d.strftime('%Y%m%d')}")
But some of the flow runs are stuck with
Queued due to concurrency limits. The local process will attempt to run the flow for the next 10 minutes, after which time it will be made available to other agents.
I am on the dev version of the cloud backend and understand that there are limits. I have the following questions: 1. Am I hitting this error because of the dev cloud version? 2. Although, the message says the flow will be rescheduled within the next 10 minutes, it takes around 30 mins for the rest of the tasks to get scheduled and run. 3. Is there a better way to create a backfill? I am avoiding
FlowRunTask
because context is currently not passed to the triggered flow runs
đź‘€ 2
s
The
FlowRunTask
is internally using
Client()
. Unfortunately the
FlowRunTask
is broken in v0.12.4 when using server-mode (non-cloud). In general using the
FlowRunTask
is the preferred way to trigger (registered) flow runs. Please ignore this advice when you're using the cloud-mode flavor of Prefect.
a
As Sven said, can you confirm that you’re using the Cloud product? AFAIK (but maybe I missed it), there isn’t a dev version of that available. Specifically looking at the rescheduling message, what’s happening under the hood is that your flow runs are bound by a concurrency limit, let’s say X. When you get this message about concurrency limiting, what’s happening is that in the local process, the flow run will check with orchestration (cloud / server) every ~30 seconds to see if the number of running flows is < X. If so, the flow run will proceed, otherwise it’ll go back to waiting. When it says it’ll be made available to other agents after 10 minutes, it means that the flow run will attempt to run in a different process (helpful in case the original process dies for whatever reason), and this new process will also require doing the concurrency check with orchestration. The experience you’re getting with having ~30 minutes for the tasks to get scheduled and run is likely due to how long your processes are taking overall. The flow concurrency checks will see if there are any flows in the
Running
state, which means that tasks can run. If your tasks are taking a long time, or are retrying, the flow run will still be
Running
while that is happening, thus taking a concurrency slot.
k
@Alex Cano I can confirm that I am using the cloud product. I was making a distinction between the developer, team and enterprise versions as mentioned here https://www.prefect.io/get-prefect
a
@karteekaddanki Gotcha! Thought you had meant a development version, not the developer version! To answer your questions: 1. Yes, you are receiving that message because of the developer cloud account tiering works. 2. Since you’re on the developer tier, each flow run needs to complete running prior to the next starting, but there shouldn’t be anything causing a 30 minute delay between a flow run starting to run and the tasks getting generated and scheduled to run. This will likely require someone from the Prefect team to help debug 🙂 3. On best practices for doing backfills, I’d think doing it the way you’re doing it would work well (using the client & creating several flow runs). You could use
FlowRunTask
, but that’d mean making a new flow just to do the backfill, which is likely more overhead than most people would want.
k
@Alex Cano Thanks! I don't mind the overhead of
FlowRunTask
if there was a way to pass the context to the flows. All my backfills require date to be modified in context (and not as a parameter because of target caching) and I cannot change that bit right now.
a
Gotcha! Actually quick question @karteekaddanki, do you have heartbeating enabled for this flow?
k
@Alex Cano Not entirely sure. I haven't disabled it explicitly. I have a feeling I might be doing something wrong in how I am running my code. The code in my original message is in a python script and execute it from the command line passing flow_id and dates from the command line.
a
@karteekaddanki Are you able to share the main code itself? We may be able to help more if you can!
k
@Alex Cano I've edited my original post removing the
argparse
bits of the code for simplicity. Essentially, this is the script I am running. The Flow id corresponds to a flow with one trivial task and shouldn't take more than a second to run.
a
@karteekaddanki Gotcha! In this case, your script looks fine in terms of creating the flow runs. The next question is confirming your Agent does find the flow runs successfully (albeit slowly, correct?), and that whatever you’re using for execution is also executing the code correctly? My next thought on where things are getting held up is the execution layer, so maybe take a peek at those logs to see if you’re finding anything interesting
k
@Alex Cano Yup. Everything seems to be working correctly but after a delay of 30mins (almost to the dot). The tasks finish cleanly. I'll try to spend more time on a reproducible
foobar
example.
a
@karteekaddanki That sounds good! It also will likely be a good idea to loop in someone from the Prefect team as well… since this seems very odd, especially with the almost exactly 30 minute delay.
l
Hi! Just FYI I’m poking around a bit on this, this may be a bug on the cloud side and if so I will keep you updated 🙂
Unfortunately we couldn’t reproduce the 30-minute delay issue yesterday, if its still a problem if you don’t mind opening a Github issue with a reproducible example we can get more eyes on it! You may also be interested that a PR with updates to
FlowRunTask
is incoming (https://github.com/PrefectHQ/prefect/pull/3005) which should also address your original need for
context
being exposed with that task library task soon 🙂
❤️ 1
k
@Laura Lorenz (she/her) Thanks for the PR! It'll make my workflow a bit more simple. Regarding the concurrency bug, I think it maybe a race condition that is triggered by flows that take a very short time to finish. I haven't had any issues when I replaced my toy-flow examples with my real flows. I'll have another stab at it once I finish porting my flows to Prefect.
đź‘Ť 1
@Laura Lorenz (she/her) I quickly scanned the PR. It might also be useful to be able to pass in a flow run name https://github.com/PrefectHQ/prefect/pull/3005/files#diff-7836972d3e8bafd3361be68b33c62fb3R38 to the
FlowRunTask.run
function. It'd be cleaner to give backfills a human-readable name (with a time based suffix like
BUG-123_20200720
).