We are having some major roadblocks when using pre...
# ask-community
r
We are having some major roadblocks when using prefect and snowflake. We got the following error when running a flow with ~35000 embarassingly parallel tasks (one for each of ~35000 systems):
Copy code
Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID 3417baee-44a2-4b39-82f4-c6ac6d073d1e: provided a running state but associated flow run 51cc335d-f029-45c6-80b4-8c88a0173dbc is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 128, in call_runner_target_handlers
    cache_for=self.task.cache_for,
  File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 1321, in set_task_run_state
    version=version,
  File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 294, in graphql
    raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID 3417baee-44a2-4b39-82f4-c6ac6d073d1e: provided a running state but associated flow run 51cc335d-f029-45c6-80b4-8c88a0173dbc is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
Has anyone already experienced this? How to debug it? 😕
c
Hi Robin - this error isn’t related to Snowflake; if your Flow Run is not in a
Running
state, your task runs cannot enter a
Running
state either. Maybe check your logs - something caused your Flow Run to “finish” while you still had outstanding task runs that hadn’t started yet
r
OK, we previously had an error that was cause by
snowflake-sqlalchemy 1.2.3
which has disappeared when upgrading to 1.2.4. So I wanted to hint on the possibility that snowflake dependencies might cause problems. Please find attached the logs of the last successful and first failing tasks. How can I dig deeper? last successful log: https://cloud.prefect.io/accure/flow-run/51cc335d-f029-45c6-80b4-8c88a0173dbc?logId=cd2f7b79-a3ca-4486-9f84-391f8b29d4d9 first failing log: https://cloud.prefect.io/accure/flow-run/51cc335d-f029-45c6-80b4-8c88a0173dbc?logId=cd2f7b79-a3ca-4486-9f84-391f8b29d4d9
Screenshot of last successful and first failing task:
c
FYI you provided the same two log IDs
one thing you could try is turning on DEBUG level logs to see if more information surfaces
👍 1
I can only turn them on when rerunning, or can I show them now post-mortem?
c
unfortunately not. Taking a deeper look, it appears your flow failed at
"2020-10-08T16:52:40.096577+00:00"
and your task attempted to set its final state one second later at
"2020-10-08T16:52:41.709352+00:00"
which is really interesting
are you running with Dask?
r
yes:
Copy code
flow.environment = DaskKubernetesEnvironment(
        min_workers=1, max_workers=10, labels=["k8s"]
    )
    # register flow on AWS ECR
    module_dir = path.dirname(path.dirname(path.abspath(__file__)))

    flow.storage = Docker(
        python_dependencies=[
            "numpy",
            "pandas",
            "snowflake-connector-python[pandas]==2.3.2  ",
            "snowflake-sqlalchemy>=1.2.4",
            "tqdm",
        ],
        registry_url="<http://asdasd.dkr.ecr.eu-central-1.amazonaws.com|asdasd.dkr.ecr.eu-central-1.amazonaws.com>",  
       
        image_name="fetch_flow",
        image_tag="beta_"
        + datetime.now().strftime("%Y%m%d_%H%M%S"),  # Unique tag avoids AWS caching
        files={module_dir:"/modules/accure_analytics"},
        extra_dockerfile_commands= ["RUN pip install -e /modules/accure_analytics"],
    )
    flow.register(project_name="eks_test_01")
    # flow.visualize()
task concurrency = 10
c
I have a guess as to what’s happening: - your flow runs, and some of your tasks fail (I’d be curious if you can identify the failed tasks and whether the error messages are meaningful to you) - your flow enters a
Failed
state - something happens in your dask cluster at shutdown, causing dask to rerun some of the task futures even though they’ve already finished, causing these noisy error logs but not actually doing any additional work
because to be clear: these error logs mean that Prefect correctly prevented these tasks from running
For example, the error log you posted from that task run came from a task run that actually succeeded already
so Prefect correctly prevented it from unnecessarily rerunning
r
OK, I will check further up the logs whether there had already been some failed tasks.
c
👍 👍 let me know if you learn anything!
r
NB: I actually checked with my smartphone the state of the flow and saw that it failed. Is it possible that somehow when interacting with the smartphone the UI misinterpreted some actions to cancel the flow? I really like the possibility to check via smartphone, but not sure whether it's already intended for that use, as the experience is not quite as smooth as on desktop...
c
Good to know - we definitely try to support mobile browsers! No worries though, the final state of your Flow Run appears to be the state that Prefect Core generated, it would have a different message if it had come from you interacting with the UI
r
OK, thanks for the quick responses! Just scrolled all the way up but did not find any previous errors, so the one mentioned above was indeed the first one. Any other information I can provide to you to dig deeper?
👍 1
I have some experience using dask cluster (on slurm cluster), is there any chance to get to the dask cluster logs?
d
Hey Robin
I don’t know about EKS but on GKE my Dask logs are available in the container logs for the Kubernetes Job
(I believe you can flip a switch to get them into cloud)
But if you’re not seeing them in Cloud I would try there first
and yes that’s correct 👍
r
Ah, that's interesting! OK, will 1. set the debug level and rerun the flow and 2. investigate the container logs etc.
Any chance that someone from prefect might be already available for feedback in ~10 hours? We are always looking forward to afternoon European time when you are starting your day in the americas 🙂
prefect.config.logging.level = "DEBUG"
Does this work?
Hmm, do I understand this part of prefect docs correct, that usually logging is already set to
DEBUG
for prefect cloud?
c
Ah good find - that document is out of date! The logging level is configured at import time, so you’ll need to set an environment variable where your agent is running:
Copy code
PREFECT__LOGGING__LEVEL="DEBUG"
r
OK, thanks! Any other ideas apart from checking debug level logs?
c
debug level logs + your cluster logs; I’d bet money this is going to be a dask worker eviction issue / noisy teardown of the cluster issue (and just to reiterate: it appears your flow ran correctly, it just produced some noisy and annoying logs)
🙏 1
r
OK, I would have expected 8h execution time as opposed to 45 min đŸ€” Tomorrow will tell 🙂
c
verrry interesting - yea definitely let us know what you find; this is one of those situations where I’d love to figure out how we can both capture / detect and expose this event better in the UI! And of course there’s a chance I’m wrong in which case we can dive in deeper 🙂
r
OK, looking forward to connecting again tomorrow, hopefully with some interesting insights!
Yesterday, I restarted the flow once again without changing log level and got a task that is caught (for 8h now 😼 ) in retrying, blocking the flow from continuing or failing đŸ€”
Adding further information: on
flow.register()
we get the following warnings:
Copy code
UserWarning: No result handler was specified on your Flow. Cloud features such as input caching and resuming 
task runs from failure may not work properly.
  no_url=no_url,
and
Copy code
/opt/prefect/healthcheck.py:149: UserWarning: Task <Task: create_or_update_copy_progress_table> has retry settings but some upstream dependencies do not have result types. See <https://docs.prefect.io/core/concepts/results.html> for more details.
  result_check(flows)
/opt/prefect/healthcheck.py:149: UserWarning: Task <Task: copy_batteryconfiguration> has retry settings but some upstream dependencies do not have result types. See <https://docs.prefect.io/core/concepts/results.html> for more details.
  result_check(flows)
Could that cause the trouble?
r
I had the exact same issue when cancelling a Flow from the Cloud UI (Looping or Mapping flows in my case) while the flow was doing hard cpu crunching. The Agent seems to have attempted to set the State of the Cancelled Flow and this resulted in an INTERNAL_SERVER_ERROR.
r
OK, thanks for the info, makes sense! The only difference is that I did not cancel the flow đŸ€” So the question is, why the flow was cancelled. But I guess you and Chris explained how to interpret the following secondary errors related to the flow cancelling...
Hey @Chris White, it took some time to get DEBUG level logging running on our AWS EKS kubernetes cluster, but here we are with the log messages of the above described problem:
r
I confirm this was the same with my above-mentioned “Cancelled Tasks”: They were CPU-intensive calculation that took long before the Task’s thread was released.
Great finding with the Debug! 🙌
👋 1
r
@Chris White, @Kyle Moon-Wright any thoughts on the problem or information that we can provide additionally to help solve the issue?
c
So there are lots of moving pieces in your description above that aren’t necessarily related: the fundamental issue you were originally having is that some tasks were attempting to run after the flow had already finished. The few tasks we looked at had in fact completed earlier and were trying to run again for some reason, so while reading the logs was a poor user experience, your Flow was executing as you expect. The new screenshot you have posted is related to zombie tasks - it appears you had a task that died for some reason, and our service marked it as failed (because it stopped running). So while it does seem you might have some infrastructure issues that are killing tasks / processes / jobs, I don’t think there’s an issue on our side to solve at the moment
r
Yes, lots of moving pieces indeed. marvin
Hmmm, the task that fails in the above mentioned screenshot was already successful and should therefore only change state, which fails. Doesn't this look like a prefect issue? đŸ€·â€â™‚ïž
In none of the above-mentioned examples did the flow successfully finish, instead always aborted preliminary without finishing or even running all the tasks. I would like to better understand why the whole flow stops instead of just moving to the next tasks. đŸ€”
I was happy when we were able to get debug level logs today using
flow.run_config
, however the parallelization with
flow.run_config
and
flow.executor = DaskExecutor()
did not work for us. Therefore, we finally switched back to
flow.environment = DaskKubernetesEnvironment(...)
and run the flow now in small batches to see whether this reduces the failing tasks and flows, however without debug level logging. This is also a somewhat desperate attempt to at least start running the tasks for a larger amount of systems successfully.
So we are also curious exploring the different root causes for the different issues and contribute to possible solutions. Therefore, we will try to get dask cluster logs tomorrow via AWS. Any other information that could help? 🙏 Current issues: 1. some tasks attempt to run after the flow has finished / aborted 2. flow runs switch to
failed
before running all tasks 3. tasks don't parallelize with
flow.run_config
and
flow.executor = DaskExecutor()
4. debug level logging not available for
DaskKubernetesEnvironment
on AWS EKS
PS: Should I open issues on GitHub for the current issues to facilitate troubleshooting?
c
Definitely open an issue for the run config / dask executor piece and the DaskK8sEnvironment logging issue; for the other two issues could you provide me with a flow run ID?
r
Ok, will do all this tomorrow.
c
sounds good thanks for your patience!
2. flow runs switch to 
failed
 before running all task ‱ https://cloud.prefect.io/accure/flow-run/81060a1b-8e15-4c8d-9672-4da86a64f489?logId=7216403e-8741-4c82-90b7-7103d4908d12 ‱ https://cloud.prefect.io/accure/flow-run/9bc734ad-805f-4904-aa95-88b1d81426a4?logId=daa523a9-1e39-4f0f-b412-5d5a225a47aa ‱ https://cloud.prefect.io/accure/flow-run/51cc335d-f029-45c6-80b4-8c88a0173dbc?logId=e013e144-045f-413f-aedb-64804a7e2338 Currently (in the last ~7 days) it is hard to identify those flows as schematic view does not work for most flows and UI is a little unresponsive in general.
4. debug level logging not available for 
DaskKubernetesEnvironment
 on AWS EKS see: https://github.com/PrefectHQ/prefect/issues/3509
3. tasks don't parallelize with 
flow.run_config
 and 
flow.executor = DaskExecutor()
https://github.com/PrefectHQ/prefect/issues/3510
d
Hi @Robin! I believe you need to configure Dask to set up more than one worker to get parallelism
You also need to ensure that you request enough CPU resources for that pod
Copy code
run_config = KubernetesRun(cpu_limit=2, cpu_request=2)

executor = LocalDaskExecutor(num_workers=4)
For example
r
Thanks! I tried but it seems to not have had any impact, (see issue on GitHub)
d
👍
We’ll follow up there
👍 1
c
@Robin some of the flow runs you’ve linked indicate that your dask cluster is crashing. Here’s the sequence of events: - your flow run job is created, which is responsible for managing a dask cluster - many of the tasks are submitted to the cluster - some of the workers on the cluster crash, raising an error in the flow run job and resulting in a failed flow run state - before the dask cluster is torn down, dask reschedules some of the lost tasks from the crashed worker onto other workers. These workers attempt to rerun the tasks but they cannot enter a Running state because the flow run is in a failed state.
🙌 1
r
Thanks for the analysis! That answers question 1.
Failed to set task state with error: ClientError
So it looks like this issue is the common issue of all those flows that abort prematurely, right?
Concerning question 2. flow runs switch to 
failed
 before running all task: So on the prefect side it boils down to the following question: Why is the flow run state set to failed? Some feedback from the dask cluster has to trigger this event, right? đŸ€” Does prefect provide a mean to analyze why prefect sets the state to failed? If not yet, it would be great if prefect forwards information about the related events that trigger the flow state change. Apart from this, I guess that the dask cluster logs will tell us more about what triggers prefect to set the flow state to failed?
PS: Although a solution with AWS EKS is important and urgent, we are also open to explore how
coiled
avoids or simplifies these issues. Do you already recommend ways of how to setup prefect agents with
coiled
?
c
Yea the flows are aborting prematurely; the first flow run you linked to had a clear “KilledWorker” error message in the state message. There are two situtations taht result in similar outcomes: - the dask cluster object managed by your flow run errors out; this will definitely be logged by the flow runner and show up in the Failed state message (as it did with your run
81060a1b-8e15-4c8d-9672-4da86a64f489
, there’s a clear
KilledWorker
error) - the dask cluster still (partially) crashes, but in a way that the scheduler simply releases all work. This won’t necessarily result in an error in the flow runner process would would still result in incomplete task runs. Either way, this is very much a dask resource issue. You can use coiled directly without new agents or anything: the
coiled
package has a Cluster object that you can feed directly into the
DaskExecutor
when configuring your Flow. As long as your coiled credentials are present (possibly in the docker image you run your flows with) then it will “just work” and create a cluster in coiled land and farm your tasks out to it
r
OK, so while trying to solve the root cause issues with the "dask on EKS" via the github issues with your team and maybe the dask team, I will try to also test it with coiled. It's a bit frustrating that we are working with several workarounds at the moment, but we learn from it. 🙂 Is there a ressource with which prefect runs more stable and flawlessly than kubernetes cluster, that is also ready to scale to 10s of thousands of parallelized tasks? Although we would expect that scaling flows with prefect runs seemlessly with different cloud providers and services, we are open to adapt to whatever is currently already stable!
Trying to use a coiled cluster as described here: https://github.com/PrefectHQ/prefect/discussions/3257
c
Stability is entirely based on the resource requirements of your workflow, so there’s no universal or out-of-the-box solution here; where ever your tasks run will need to have the appropriate CPU / memory / networking bandwidth etc. It seems from the symptoms I’m seeing here your tasks are heavier than your k8s cluster and / or dask cluster are capable of handling
r
OK, thanks for all the feedback!
Highly appreciate it 🙏
c
for sure! I appreciate you raising all of these issues; regardless of whether there ends up being a Prefect bug lurking in here, we need to find better ways of making these sorts of infrastructure events more obvious in the UI so it’s easier to self debug.
💯 1