I've got a case where I start a local flow with nu...
# ask-community
t
I've got a case where I start a local flow with num_workers in a LocalDaskExecutor set to 6, but the task seems to start many more tasks than that. For example, that flow is currently showing 15 tasks running simultaneously. Is there something else needed to limit the number of simultaneous tasks? It overloads the machine as is.
k
That is quite confusing. Could you show me how you define your executor?
You can also add debug level logs to your Flow to see what executor gets attached.
t
Copy code
flow.run_config = LocalRun(env={'PREFECT__LOGGING__FORMAT':'[%(asctime)s-%(levelname)s - %(name)s]-[%(filename)s:%(funcName)s]-Line %(lineno)d: %(message)s',
                                'PREFECT__LOGGING__LEVEL':'DEBUG',
                            },
        working_dir=file_path, labels=["high-mem"])

flow.executor = LocalDaskExecutor(scheduler='threads',num_workers=6)
flow.register(project_name=PROJECT_NAME,idempotency_key=PROJECT_NAME)
I've also begun having a weird issue where, when I register a new version of a flow, it doesn't increment in the UI. I've had to take to deleting and recreating flows. I even upgraded to the new Prefect version that came out last week and deleted and recreated all my docker images on the server (local) and the problem also seems to persist. Mention in case it's related?
Also, the log was already set to debug:
14:02:44 Using executor type LocalDaskExecutor
Ach, I just checked, and it looks like one of my check processes started a second version of the same flow without my realizing it. I'm cancelling that one, but I wonder if, when the second flow started, if it caused issues with the first one? Maybe this is simply because that occurred. I'm trying to have a check process that periodically reruns flows if the destination data isn't current; I can't seem to find a reliable way to do that which doesn't periodically have a process running twice.
k
Ah ok I was hoping debug logs would show the number of workers. This looks all fine though.
I think if you use two LocalDaskExecutors on the same machine and they end up competing for resources, maybe you could see some bottlenecks?
t
Hmm, that still shouldn't have led to a process having MORE live tasks than the worker limit though, right?
k
Is the second one a new flow run? Or also running the same Flow?
t
New flow run of the same flow
I've set the idempotency key to be the same for all flow runs though to try and avoid that, so if the second started it may have registered as the same flow...
k
If I have to guess it sounds like there are two copies of the same Flow running, but I’m not sure. That is the only way I can see the number of Running tasks exceeding the num_workers
How are you doing that check process? Maybe we can do it in a more stable way?
Your registration thing also seems idempotency key related. Are you providing one when you register?
t
Ach, damn, posting here I just realized I'd had a copy/paste error in my idempotency key, that might have been causing a lot of these issues. Corrected and retrying now; version incrementing seems to be working again now!
The check basically checks the destination DB for each table and, in cases where the data isn't from today, it re-triggers the flow that's supposed to update it
The difficulty is in doing that in cases where the flow is still running from the last check
I want it to trigger if it's not still running, but wait if it is still running. Will the correct idempotency key give me that behavior?
k
I don’t think running with the idempotency key will wait for it (hence the double trigger). Maybe you can use the
wait_for_flow_run
task? If you’re doing this outside of tasks, then you can just do
wait_for_flow_run.run(flow_run_id=…)
. You can also see the code for that here . You can make the
FlowRunView
,
get_latest()
and then check for the state with
flow_run.state
. What do you think of this?
Actually, how are you starting the Flow run?
create_flow_run
task?
Actually, you should be right. I think the idempotency key should work.
t
In the check process, yes, create_flow_run. In that case I have this:
create_flow_run(_flow_name_ = flow_name,
                       
_project_name_ = PROJECT_NAME,_task_args_={"name":'Reinitiated {flow_name} attempt.'},_idempotency_key_='{flow_name}')
In that case I figure, as long as the flow name is the idempotency key in every case, it ought to create this behavior, right? And in that case I've been not seeing that behavior due to my copy/paste error in setting the keys wrong
k
Yes I think so
t
Awesome, I'll give that a try then. Thanks for the consult; pasting the code in here made me see my error just there, appreciate your patience in helping me find my own bonehead mistakes!
And also for sharing your knowledge; very good to know I'm at least on the right track with the behavior above.
Much appreciated.
k
Anytime!
❤️ 1
t
@Kevin Kho, the issue with more than the limited number of tasks running at once continued after I'd corrected for everything else, but then it struck me that this is probably a byproduct of using threads instead of processes. I switched to processes and the limit does indeed seem to be holding now. Sharing just for your info.
Since these are often i/o limited tasks, I think a bunch of those threads were being spawned and then just waiting for a return, meaning the executor would then start the next thread while waiting.
k
Ahh it did come across my mind, but I didn’t think it would have an effect. Thanks for the info!