https://prefect.io logo
Title
r

Ruslan

02/02/2022, 6:34 AM
Hi! is it possible to send notification when flow went in cancelling status? The problem is that flows doesn’t went canceled in normal production flow projects (not small toys), they went in cancelling status and stay there influently. I’m talking about this problem more than half year and nothing changes, cancellation functionality doesn’t work at all. And the main problem is that in additional I can’t monitor this states because there are no alerting ability about changing state in cancelling state
a

Anna Geller

02/02/2022, 11:38 AM
Hi @Ruslan, this is possible. Here is a Gist with a state handler that can send you e.g. Slack notification upon flow run cancellation https://gist.github.com/anna-geller/69a38677b8ab56c2da2d80eeea9ea1c4 But if you mean gracefully shutting down compute (e.g. flow run pod on Kubernetes), this is indeed still an open issue
r

Ruslan

02/02/2022, 11:42 AM
Hi Anna I am asking about “cancelling” status. Not “cancelled”. It is two different statuses. Prefect leave flows in “cancelling” status influently, it can’t completely cancel flow
a

Anna Geller

02/02/2022, 1:00 PM
This is true and I totally understand what you mean. So whether you can use this logic to react in a certain way to a flow run cancellation depends on what you want to do as cleanup step: • If you have e.g. EMR cluster on AWS and you want to make sure it gets shut down at the end, reacting to a Cancelled state would work because the cluster lives separately from the flow run execution/resources • if you want to get notified on a flow run cancellation, reacting to the Cancelled state still solves the problem - you will get the message informing you that the flow run has been cancelled • But if you want to stop the pod in which this flow run is running, then you can’t do that from that pod - i.e. the pod cannot kill itself by itself So in short: reacting to a Cancelled state allows to send notifications on a flow run cancellation or shut down remote execution separate from the agent, but it doesn’t allow to clean up the agent’s / flow run’s compute resources.
E.g. this thread shows how a user leverages a similar state handler logic reacting to a Cancelled state in order to shut down Databricks cluster and it works perfectly. So it depends on what type of action you want to take and what problem needs to be solved on flow run cancellation.
r

Ruslan

02/03/2022, 11:10 AM
• if you want to get notified on a flow run cancellation, reacting to the Cancelled state still solves the problem - you will get the message informing you that the flow run has been cancelled
I guess you didn’t understand me. One more time my flow freezes (I don’t know why, because of prefect some bugs). Automation cancel this flow after some time. But this flow went in status “cancelling” and NEVER completely cancels. Prefect INFINITELY try to cancel it but no luck. Your example catches prefect.engine.state.Cancelled yes it is work in your case, because your simple flow cancels, it not freezes. And this handler work when flow completely canceled, same way as working notifications from UI. It is not about my case, my flow does not reach the status “cancelled” and doesn’t catch this handler
a

Anna Geller

02/03/2022, 11:23 AM
Gotcha, thanks for explaining the issue a bit more. If you’d like to, we could try to investigate the root cause in order to find out why the flow freezes. 1. Does it run on Kubernetes? 2. What executor does this flow use? Do you run your tasks on Dask? 3. How long does the flow run when it doesn’t freeze? Is it some larger long-running job? 4. What is the memory allocation for this run - do you have some mechanism to check how much memory does this flow run consume? 5. What do your flow run and agent logs say about this run - can you see some indication of lost flow’s heartbeat? Sorry for so many questions, but the issue seems complex so we would need to investigate it step by step to detect why you can see this behavior
r

Ruslan

02/03/2022, 2:58 PM
yes Anna the best solution is to fix this bug with this stucking. I tried to get help many times but no luck, problem is that I can’t reproduce it. I feel that agent loosing connection with UI, but in fact agent working perfectly.
1. it is simple local agent on azure vm. 2. localDaskExecutor by processes. I need it to limit parallel working tasks 3. some times it is stuking in beginning, some times in the middle. But in fact process is working, but UI doesn’t get information about it. I have got large module that flow runs and I see logs from this module, it works, but prefect lost this task 4. I feel that problem is related to memory, but have no proves, there are a lot of memory on VM 5. logs are not indicative, no important information there, I don’t see anything No problem, I may provide for you any information
a

Anna Geller

02/03/2022, 3:47 PM
1. How do you sync the flow code with the local agent on Azure? What storage do you use? Can you share your storage and run configuration? 2. Are you on Prefect Cloud? 3. Any chance you could share your flow with me? It could be e.g. that there is some unclosed database connection in your flow that causes it to get stuck 4. What labels did you assign to your flow and agent? 5. How did you set up the agent? Do you know that we have agents on Azure marketplace? 6. How did you set the concurrency limit? Is it on a flow or task level? 7. What Prefect version did you use? 8. Did you try adding this environment variable that changes the flow’S heartbeat behavior from processed to threads? The code:
from prefect.run_configs import UniversalRun
flow.run_config = UniversalRun(env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"})
Since you say that you struggle with this issue, I can try to reproduce the issue in the next days (rather next week) if you can give me as much information as you can 🙂
r

Ruslan

02/03/2022, 5:03 PM
1. We use Git, but in fact files getting from local (it is another topic about git, there are a lot of restrictions in external modules connection from Git) 2. Yes Cloud 3. Yes I can share, but it will be very hard to run it.. Maybe impossible. It use a lot of external services.. I may share screen for you or I can share source code, but I dont’ think that you will launch it 4. Labels.. simple lables.. assigned yes. what may be? 5. It is long story.. We were living on dockers and were very tired with this problems. After that I moved to clean VM and simple local agent, because I thought that the problem in docker. But no, the problem in prefect… 6. Concurrency on flow only configured 7. 0.15.10 8. LocalDaskExecutor(scheduler=“processes”, num_workers=2) - it is the same? I was starting from threads, after that support told that processes more stable and I moved to them. But all of them have same problem, process maybe little bit better. Or you suggest to try something else? It will be great, but even me can’t reproduce it (( we have got 500 flow runs per night and stable 1-3 of them stucking, every day different
a

Anna Geller

02/03/2022, 5:19 PM
Thanks a lot for sharing more about your use case. It’s indeed a bit strange and hard to replicate. I was asking about #4 because labels are important for setting flow-level concurrency limits. Regarding #5, I understand and it actually makes a lot of sense since flow run should be executed the same way regardless of which agent picks it up and whether it runs in a local process or container. Regarding #6: I wonder if it could be that some flow runs hand because they crossed the concurrency limits. You mentioned you use Automations to cancel a flow run after a specific time - is it because of a time-based SLA? (e.g. if flow run takes longer than 5 minutes, then cancel it) - I would be curious to see a screenshot of both this automation and concurrency limits for this flow. Regarding #8 - I would suggest adding this environment variable
PREFECT__CLOUD__HEARTBEAT_MODE
to the run configuration. Since you are using a local agent, you can use either LocalRun or UniversalRun and attach it to your flow:
from prefect.run_configs import UniversalRun

with Flow("name", run_config = UniversalRun(env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"})) as flow:
   ...
This helps a lot when you have issues as those that you see because the flow heartbeat’s mode then switches to threads rather than processes, which is more stable when you use many external services or long-running jobs. Again, I totally understand the frustration especially if you don’t understand why this is happening. I wonder whether you could perhaps try to run a similar flow with Orion and see how this goes? Some features such as concurrency limits are not yet there, but maybe Orion can handle such flows better - would be great to test.
r

Ruslan

02/03/2022, 6:38 PM
#4 I don’t see any regularity with concurrency, task is not waiting for concurency, it is running, after that cancelling with automation and thats all.. stuck.. it happens with flows with concurrency and the same with flows without any concurrency, but yes all of them with labels, just for standardisation #6 yes, we know that flow should run not more than 5 mins, but it stucks, thats why wi configure automation for 10 min. without automation it stucks in “running” status. ok I will try PREFECT__CLOUD__HEARTBEAT_MODE this week. not familiar with Orion, I will read about it, thanks
a

Anna Geller

02/03/2022, 8:30 PM
I see. Yeah, it would be great if you try adding PREFECT__CLOUD__HEARTBEAT_MODE and keep us posted 👍