Hey guys, has anyone come across this when executi...
# prefect-community
d
Hey guys, has anyone come across this when executing multiple concurrent Fargate Tasks [trigger 10 isntances of the same flow]
An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Too many concurrent attempts to create a new revision of the specified family.
Appears to be amazon complaining about the job family name, but I don’t know if there’s a way around it in the agent config.
z
Hi @Darragh, taking a look now! Will be back to you with an answer ASAP.
d
Hey @Zachary Hughes Thanks!
z
Okay, back with a few suggestions: • If you aren't already, using
enable_task_revisions
might help alleviate some of this. It'll really depend on your use case • Since this is essentially a rate-limiting issue, you could try limiting the number of connections by altering the
botocore
config passed in (link below) While this is an AWS issue, there's definitely room for us to handle this more gracefully. If you don't mind creating an issue, we can definitely look into how to improve the error handling here! https://docs.prefect.io/orchestration/agents/fargate.html#configuration
d
Thanks Zachary - the task_revisions doesn’t help unfortunately, as each instance of the Flow is on the same revision [I’m triggering X instances of 1 flow]. In terms of the botocore config, what am I looking for, the max_pool_connections?
z
Okay, good to know! And yep. My thought is that if this is a rate limiting issue, reducing the max connections from the default of 10 may help alleviate this. Full disclosure that I'm not a heavy consumer of Fargate, but this seems like it might get some results.
b
I wrote the enable task revisions option and can try and help best I can
curious why 10 instances of the flow is registering 10 times
in a perfect world you would have a version of a task definition created and then use in run task
d
Hey @Braun Reyes, sorry for the delay - it’s not that I’m trying to re-registering the definition, it’s that it’s creating 1, or X, running instances of it, that’s where I get the error. But it from the log it does seem like
RegisterTaskDefinition
is what’s happening…
Is there any other diagnostic info I can give on it that would help?
I’m going to turn off the revisions flag as a test
b
so you are running the Fargate Agent with enable task revisions on correct?
what version of prefect are you on?
d
Yes, and master
b
oh ok
d
Direct from github
b
got it
are you using server or cloud for scheduling?
d
server, running on AWS
b
ok...so what determines whether or not a new task def revision gets created is based on the True/False return of a validation method which is based on an evaluation of the prefect flow run id and flow version id being executed
can show me a screenshot of the tags for your most current task definition revision
are you seeing new revisions get added to the list in the AWS console regardless of whether you change it our not
d
No I only see new revisions to the task if I register a new version of the flow
Tags: PrefectFlowVersion - 2 PrefectFlowId - b0ef09cd
b
can you send me the debug logs from the agent after submit the 10 consecutive flow runs
d
Sure, gimme a couple minutes to reset it and gather them..
b
what task revision number is that?
ok cool
d
task revision 4
b
ok good
obviously if there were a bunch of revisions every time you submitted the flow to execution...you would see more
d
yeah exactly
And that would be worrying 😄
Quick one - when setting DEBUG log for the agent, what am I passing it? I create the agent as a python object rather than command cline start
j
Probably best to set it via prefect user config.toml as:
Copy code
[cloud.agent]
level = "DEBUG"
or as an env var:
Copy code
export PREFECT__CLOUD__AGENT__LEVEL="DEBUG"
d
So far the debug output shows no problems, which is really annoying 🙂 I’ll bump the number, see if I can make it fail
b
is it possible this is happening on the first attempt at running a new flow version?
that would make sense
d
Interestingly the one I’m doing right now is a new revision, and is the first time I haven’t seen the problem
b
since the flow run hits the agent which triggers the validation check
d
Ok, found an odd problem. There’s nothing showing it in the logs, BUT when I go to the UI to check the state of the sub flows I triggered, I can see that one has failed with the RegisterTaskDefinition exception…
b
how are you triggering the multiple flows like that?
what is the mechanism for triggering 10 flows at once?
d
map over a set of input partitions, and for each partition call FlowRunTask
b
oh ok so you are running the flows programmatically
d
We need to be able to do sub flows, and this is the closest workaround until it’s avialable in Prefect
Yep
b
ah ok
the Prefect Agent does support concurrency via threadpool now, so on the first run of a new version using the method you described...I see how this could happen
mmmm
so you have a master flow that calls out to the PRefect api to kick a map of another flow?
d
Exactly right [uses prefect.Client to kick the map], then those sub flow id’s are passed to another mapped flow to poll for state
b
interesting
d
We’re adding a bunch of handling around retyring for a failed flow, but still need to be able to kick the thing off 🙂
b
@josh is it possible to limit the threadpool on the agent to submit runs to Fargate syncronously?
d
Zachary had mentioned way back up about altering the max_pool_connections param - if I understand what that param is for, it would allow me to do what I’m doing right now, kick off X number of flows in a map, but the agent would only submit them in batches of N to be executed, is that right?
Even if that’s true though, the docs say that the
max_pool_connections
is default 10, so I thought the current test case would be fine…..
👍 1
b
Seems like the best 2 options are: • register the task definition immediately after register - would require you to manually do this or we/I could add a register component for fargate/ecs to the flow object • setup the agent to not send flow runs out concurrently
d
Option 2, is that something under my control? max_connections = 1?
upvote 1
b
yeah...my theory is that this is happening on the initial run of a new flow version...so with the agent evaluating the determination of new task rev or not...sending many at once could definitely result in duplicate register task revision calls...when you really only need one
yeah I would try that
option 2
upvote 1
d
What’s the format for getting that through the agent?
Not 100% sure it will work for our full case, but let’s test
j
@Braun Reyes I actually don’t think there currently is an option to limit the thread pool! It grabs max workers by:
Copy code
max_workers = min(32, (os.cpu_count() or 1) + 4)
There’s an open issue here to change this behavior entirely https://github.com/PrefectHQ/prefect/issues/2468 if you’re interested in voicing your thoughts
b
ugh
yeah I like Jim's idea on that
d
I ran the 10 subflow test with existing task defintions for both the parent flow, and the subflow, and they all succeded
So it can be done, I’m not completely mad 🙂
b
no...i must be the first run of new version
thats the only thing that makes sense
d
It seems like it. Since my use case isn’t going away, and the new register problem seems to be confirmed, what would you suggest as a workaround?
The only obvious thing that comes to mind is to start the flow and as part of the same task wait for say 2 minutes, cos fargate is slow, and then check status?
All other suggestions welcome
@Zachary Hughes @Braun Reyes if I'm trying my daft suggestion above, what states should I be checking for to see if the flow run has been accepted/started?
b
so technically you do not have to wait until the initial run is complete once the agent submits the flow...the revision is registered
submitted is the state you want to know that agent has completed its register True or False validation
z
@Darragh Definitely not daft! Anything
Submitted
or beyond is an indication the work's been accepted.
b
@josh perhaps we should have an option to register the task definition along with prefect flow registration....seems like option to couple registering of things could be a good idea
j
Worth opening an issue for!
d
Great thanks for that guys... Raises another interesting question actually, when you say submitted or beyond, is it like a enum I can check? Subllmitted = 2, so state > 2?
z
I don't think we expose this as an enum at the moment. But really as long as you reach submitted, you'll be good to go. So a
state = Submitted
would do the trick.
d
Great, thanks guys, I'll trial that and let you know how it goes
I do have a follow on if you don't mind... We refently had our ec2 isntance fall over ( it was a small one to be fair) due to being flooded with logs. I've seen some of the docs around deployment logging, but it's jot clear to me if I can force all logs to cloudwatch or similar, save disk space. Or if there's a rollover/ archive process for the logs?
z
When you mention falling over due to logging, do you mean agent logging or that the log table in the persistence layer was too large?
d
It was too large and completely filled the ec2 instance it was running on
Sorry, filled the persistence layer on the ec2 instance be more accurate
z
Gotcha. Depending on your setup, you might want to look into defining a handler to manage your logs (link below). You could also set up a service that loops every so often and clears/archives your logs. https://docs.prefect.io/core/advanced_tutorials/custom-logs.html#an-example
d
Thanks for that - read that earlier, I was hoping for something pluggable similar to the way the fargate agent and task can plug into cloudwatch. Might be in future release? Thanks for all the help guys!
Hey @Braun Reyes Circling back to you on the whole separation of register task and run task - I know you had mentioned making a change for it, just wondering if you’d have a rough timescale? We’re still getting issues where, on first register/run of a flow to run on Fargate, sometimes that Flow just sits at Submitted and never progresses, runs, stops, etc..
b
to be be candid, I have not started working on it yet....it may be a least another 2 weeks for me to get to it.
d
Ok, thanks for being honest. -2 weeks isn’t the end of the world! 🙂
l
Hey @Darragh @Braun Reyes, sorry for replying in this "old" thread. I just ran into the same issue with the
Too many concurrent attempts to create a new revision of the specified family.
error. Basically same context as for Darragh, I'm mapping over a list and create multiple FlowRunTasks. First time I run a new version of the fargate task I get this error. Did you guys find any solution for this issue? 🙂
b
I had intentions of being able to work on potential change that would enable this....but I was not able make time for it with pandemic and work stuff.
I contribute to prefect when I can but have not been able to lately
not sure @Darragh was able to work around this at all
d
Sorry for the delay guys - yeah we never really found a way to work around it cleanly. We do some stuff around staged API requests to slow it down, but nothing thats a proper solution
b
hey there....we may have sometime to work on something this week. Would it be helpful to allow for passing in an existing task definition group which would trigger an override for the agent to not try and register a new task definition? This would also give people the ability to define a task definition as part of CICD process instead of via Prefect agent flow submission
@Mark McDonald ☝️
r
Hi, I’m facing the same issue. Did anybody come up with a workaround until this is resolved?
If we set
enable_task_revisions
to False, would that actually solve the problem? (but create a “messy” Task definitions table in AWS 😂) Another workaround I’m thinking of: When registering the flow, run a dummy instance of the flow that will force a new task revision to be registered on the spot.
Also I’d like to point out, that changing the Agent’s behaviour to run synchronously would not solve the problem, since it is possible to run multiple agents that might try to register the same task definition. The long term solution IMO is a simple retry mechanism in the Agent, with an exponential backoff - i.e try after 5, 10, 20 … seconds, etc.
b
its been along time since I posted on these threads where I was going to make some changes to make it possible to define a task definition outside of flow registration so that agent does not always attempt to register one. There was also a change we were going to make to allow for sidecars as well. Good news is that the new ECSRun and ECSAgent features handles this for you. Here is a snippet from our deployment process.
Copy code
ECSRun(
        task_definition_arn=task_definition_arn, run_task_kwargs=dex_run_task_kwargs
    )
the task definition can have the sidecar defined in it