https://prefect.io logo
r

Robin

10/08/2020, 9:27 PM
We are having some major roadblocks when using prefect and snowflake. We got the following error when running a flow with ~35000 embarassingly parallel tasks (one for each of ~35000 systems):
Copy code
Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID 3417baee-44a2-4b39-82f4-c6ac6d073d1e: provided a running state but associated flow run 51cc335d-f029-45c6-80b4-8c88a0173dbc is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 128, in call_runner_target_handlers
    cache_for=self.task.cache_for,
  File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 1321, in set_task_run_state
    version=version,
  File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 294, in graphql
    raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID 3417baee-44a2-4b39-82f4-c6ac6d073d1e: provided a running state but associated flow run 51cc335d-f029-45c6-80b4-8c88a0173dbc is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
Has anyone already experienced this? How to debug it? 😕
c

Chris White

10/08/2020, 9:30 PM
Hi Robin - this error isn’t related to Snowflake; if your Flow Run is not in a
Running
state, your task runs cannot enter a
Running
state either. Maybe check your logs - something caused your Flow Run to “finish” while you still had outstanding task runs that hadn’t started yet
r

Robin

10/08/2020, 9:34 PM
OK, we previously had an error that was cause by
snowflake-sqlalchemy 1.2.3
which has disappeared when upgrading to 1.2.4. So I wanted to hint on the possibility that snowflake dependencies might cause problems. Please find attached the logs of the last successful and first failing tasks. How can I dig deeper? last successful log: https://cloud.prefect.io/accure/flow-run/51cc335d-f029-45c6-80b4-8c88a0173dbc?logId=cd2f7b79-a3ca-4486-9f84-391f8b29d4d9 first failing log: https://cloud.prefect.io/accure/flow-run/51cc335d-f029-45c6-80b4-8c88a0173dbc?logId=cd2f7b79-a3ca-4486-9f84-391f8b29d4d9
Screenshot of last successful and first failing task:
c

Chris White

10/08/2020, 9:36 PM
FYI you provided the same two log IDs
one thing you could try is turning on DEBUG level logs to see if more information surfaces
👍 1
I can only turn them on when rerunning, or can I show them now post-mortem?
c

Chris White

10/08/2020, 9:42 PM
unfortunately not. Taking a deeper look, it appears your flow failed at
"2020-10-08T16:52:40.096577+00:00"
and your task attempted to set its final state one second later at
"2020-10-08T16:52:41.709352+00:00"
which is really interesting
are you running with Dask?
r

Robin

10/08/2020, 9:43 PM
yes:
Copy code
flow.environment = DaskKubernetesEnvironment(
        min_workers=1, max_workers=10, labels=["k8s"]
    )
    # register flow on AWS ECR
    module_dir = path.dirname(path.dirname(path.abspath(__file__)))

    flow.storage = Docker(
        python_dependencies=[
            "numpy",
            "pandas",
            "snowflake-connector-python[pandas]==2.3.2  ",
            "snowflake-sqlalchemy>=1.2.4",
            "tqdm",
        ],
        registry_url="<http://asdasd.dkr.ecr.eu-central-1.amazonaws.com|asdasd.dkr.ecr.eu-central-1.amazonaws.com>",  
       
        image_name="fetch_flow",
        image_tag="beta_"
        + datetime.now().strftime("%Y%m%d_%H%M%S"),  # Unique tag avoids AWS caching
        files={module_dir:"/modules/accure_analytics"},
        extra_dockerfile_commands= ["RUN pip install -e /modules/accure_analytics"],
    )
    flow.register(project_name="eks_test_01")
    # flow.visualize()
task concurrency = 10
c

Chris White

10/08/2020, 9:46 PM
I have a guess as to what’s happening: - your flow runs, and some of your tasks fail (I’d be curious if you can identify the failed tasks and whether the error messages are meaningful to you) - your flow enters a
Failed
state - something happens in your dask cluster at shutdown, causing dask to rerun some of the task futures even though they’ve already finished, causing these noisy error logs but not actually doing any additional work
because to be clear: these error logs mean that Prefect correctly prevented these tasks from running
For example, the error log you posted from that task run came from a task run that actually succeeded already
so Prefect correctly prevented it from unnecessarily rerunning
r

Robin

10/08/2020, 9:49 PM
OK, I will check further up the logs whether there had already been some failed tasks.
c

Chris White

10/08/2020, 9:51 PM
👍 👍 let me know if you learn anything!
r

Robin

10/08/2020, 9:53 PM
NB: I actually checked with my smartphone the state of the flow and saw that it failed. Is it possible that somehow when interacting with the smartphone the UI misinterpreted some actions to cancel the flow? I really like the possibility to check via smartphone, but not sure whether it's already intended for that use, as the experience is not quite as smooth as on desktop...
c

Chris White

10/08/2020, 9:55 PM
Good to know - we definitely try to support mobile browsers! No worries though, the final state of your Flow Run appears to be the state that Prefect Core generated, it would have a different message if it had come from you interacting with the UI
r

Robin

10/08/2020, 9:56 PM
OK, thanks for the quick responses! Just scrolled all the way up but did not find any previous errors, so the one mentioned above was indeed the first one. Any other information I can provide to you to dig deeper?
👍 1
I have some experience using dask cluster (on slurm cluster), is there any chance to get to the dask cluster logs?
d

Dylan

10/08/2020, 10:01 PM
Hey Robin
I don’t know about EKS but on GKE my Dask logs are available in the container logs for the Kubernetes Job
(I believe you can flip a switch to get them into cloud)
But if you’re not seeing them in Cloud I would try there first
and yes that’s correct 👍
r

Robin

10/08/2020, 10:03 PM
Ah, that's interesting! OK, will 1. set the debug level and rerun the flow and 2. investigate the container logs etc.
Any chance that someone from prefect might be already available for feedback in ~10 hours? We are always looking forward to afternoon European time when you are starting your day in the americas 🙂
prefect.config.logging.level = "DEBUG"
Does this work?
Hmm, do I understand this part of prefect docs correct, that usually logging is already set to
DEBUG
for prefect cloud?
c

Chris White

10/08/2020, 10:22 PM
Ah good find - that document is out of date! The logging level is configured at import time, so you’ll need to set an environment variable where your agent is running:
Copy code
PREFECT__LOGGING__LEVEL="DEBUG"
r

Robin

10/08/2020, 10:34 PM
OK, thanks! Any other ideas apart from checking debug level logs?
c

Chris White

10/08/2020, 10:36 PM
debug level logs + your cluster logs; I’d bet money this is going to be a dask worker eviction issue / noisy teardown of the cluster issue (and just to reiterate: it appears your flow ran correctly, it just produced some noisy and annoying logs)
🙏 1
r

Robin

10/08/2020, 10:43 PM
OK, I would have expected 8h execution time as opposed to 45 min 🤔 Tomorrow will tell 🙂
c

Chris White

10/08/2020, 10:44 PM
verrry interesting - yea definitely let us know what you find; this is one of those situations where I’d love to figure out how we can both capture / detect and expose this event better in the UI! And of course there’s a chance I’m wrong in which case we can dive in deeper 🙂
r

Robin

10/08/2020, 10:48 PM
OK, looking forward to connecting again tomorrow, hopefully with some interesting insights!
Yesterday, I restarted the flow once again without changing log level and got a task that is caught (for 8h now 😮 ) in retrying, blocking the flow from continuing or failing 🤔
Adding further information: on
flow.register()
we get the following warnings:
Copy code
UserWarning: No result handler was specified on your Flow. Cloud features such as input caching and resuming 
task runs from failure may not work properly.
  no_url=no_url,
and
Copy code
/opt/prefect/healthcheck.py:149: UserWarning: Task <Task: create_or_update_copy_progress_table> has retry settings but some upstream dependencies do not have result types. See <https://docs.prefect.io/core/concepts/results.html> for more details.
  result_check(flows)
/opt/prefect/healthcheck.py:149: UserWarning: Task <Task: copy_batteryconfiguration> has retry settings but some upstream dependencies do not have result types. See <https://docs.prefect.io/core/concepts/results.html> for more details.
  result_check(flows)
Could that cause the trouble?
r

Raphaël Riel

10/09/2020, 12:27 PM
I had the exact same issue when cancelling a Flow from the Cloud UI (Looping or Mapping flows in my case) while the flow was doing hard cpu crunching. The Agent seems to have attempted to set the State of the Cancelled Flow and this resulted in an INTERNAL_SERVER_ERROR.
r

Robin

10/09/2020, 1:16 PM
OK, thanks for the info, makes sense! The only difference is that I did not cancel the flow 🤔 So the question is, why the flow was cancelled. But I guess you and Chris explained how to interpret the following secondary errors related to the flow cancelling...
Hey @Chris White, it took some time to get DEBUG level logging running on our AWS EKS kubernetes cluster, but here we are with the log messages of the above described problem:
r

Raphaël Riel

10/14/2020, 7:58 PM
I confirm this was the same with my above-mentioned “Cancelled Tasks”: They were CPU-intensive calculation that took long before the Task’s thread was released.
Great finding with the Debug! 🙌
👋 1
r

Robin

10/14/2020, 9:52 PM
@Chris White, @Kyle Moon-Wright any thoughts on the problem or information that we can provide additionally to help solve the issue?
c

Chris White

10/14/2020, 11:05 PM
So there are lots of moving pieces in your description above that aren’t necessarily related: the fundamental issue you were originally having is that some tasks were attempting to run after the flow had already finished. The few tasks we looked at had in fact completed earlier and were trying to run again for some reason, so while reading the logs was a poor user experience, your Flow was executing as you expect. The new screenshot you have posted is related to zombie tasks - it appears you had a task that died for some reason, and our service marked it as failed (because it stopped running). So while it does seem you might have some infrastructure issues that are killing tasks / processes / jobs, I don’t think there’s an issue on our side to solve at the moment
r

Robin

10/14/2020, 11:35 PM
Yes, lots of moving pieces indeed. marvin
Hmmm, the task that fails in the above mentioned screenshot was already successful and should therefore only change state, which fails. Doesn't this look like a prefect issue? 🤷‍♂️
In none of the above-mentioned examples did the flow successfully finish, instead always aborted preliminary without finishing or even running all the tasks. I would like to better understand why the whole flow stops instead of just moving to the next tasks. 🤔
I was happy when we were able to get debug level logs today using
flow.run_config
, however the parallelization with
flow.run_config
and
flow.executor = DaskExecutor()
did not work for us. Therefore, we finally switched back to
flow.environment = DaskKubernetesEnvironment(...)
and run the flow now in small batches to see whether this reduces the failing tasks and flows, however without debug level logging. This is also a somewhat desperate attempt to at least start running the tasks for a larger amount of systems successfully.
So we are also curious exploring the different root causes for the different issues and contribute to possible solutions. Therefore, we will try to get dask cluster logs tomorrow via AWS. Any other information that could help? 🙏 Current issues: 1. some tasks attempt to run after the flow has finished / aborted 2. flow runs switch to
failed
before running all tasks 3. tasks don't parallelize with
flow.run_config
and
flow.executor = DaskExecutor()
4. debug level logging not available for
DaskKubernetesEnvironment
on AWS EKS
PS: Should I open issues on GitHub for the current issues to facilitate troubleshooting?
c

Chris White

10/14/2020, 11:42 PM
Definitely open an issue for the run config / dask executor piece and the DaskK8sEnvironment logging issue; for the other two issues could you provide me with a flow run ID?
r

Robin

10/14/2020, 11:52 PM
Ok, will do all this tomorrow.
c

Chris White

10/14/2020, 11:52 PM
sounds good thanks for your patience!
4. debug level logging not available for 
DaskKubernetesEnvironment
 on AWS EKS see: https://github.com/PrefectHQ/prefect/issues/3509
3. tasks don't parallelize with 
flow.run_config
 and 
flow.executor = DaskExecutor()
https://github.com/PrefectHQ/prefect/issues/3510
d

Dylan

10/15/2020, 3:56 PM
Hi @Robin! I believe you need to configure Dask to set up more than one worker to get parallelism
You also need to ensure that you request enough CPU resources for that pod
Copy code
run_config = KubernetesRun(cpu_limit=2, cpu_request=2)

executor = LocalDaskExecutor(num_workers=4)
For example
r

Robin

10/15/2020, 4:45 PM
Thanks! I tried but it seems to not have had any impact, (see issue on GitHub)
d

Dylan

10/15/2020, 4:47 PM
👍
We’ll follow up there
👍 1
c

Chris White

10/15/2020, 4:57 PM
@Robin some of the flow runs you’ve linked indicate that your dask cluster is crashing. Here’s the sequence of events: - your flow run job is created, which is responsible for managing a dask cluster - many of the tasks are submitted to the cluster - some of the workers on the cluster crash, raising an error in the flow run job and resulting in a failed flow run state - before the dask cluster is torn down, dask reschedules some of the lost tasks from the crashed worker onto other workers. These workers attempt to rerun the tasks but they cannot enter a Running state because the flow run is in a failed state.
🙌 1
r

Robin

10/15/2020, 5:50 PM
Thanks for the analysis! That answers question 1.
Failed to set task state with error: ClientError
So it looks like this issue is the common issue of all those flows that abort prematurely, right?
Concerning question 2. flow runs switch to 
failed
 before running all task: So on the prefect side it boils down to the following question: Why is the flow run state set to failed? Some feedback from the dask cluster has to trigger this event, right? 🤔 Does prefect provide a mean to analyze why prefect sets the state to failed? If not yet, it would be great if prefect forwards information about the related events that trigger the flow state change. Apart from this, I guess that the dask cluster logs will tell us more about what triggers prefect to set the flow state to failed?
PS: Although a solution with AWS EKS is important and urgent, we are also open to explore how
coiled
avoids or simplifies these issues. Do you already recommend ways of how to setup prefect agents with
coiled
?
c

Chris White

10/15/2020, 6:54 PM
Yea the flows are aborting prematurely; the first flow run you linked to had a clear “KilledWorker” error message in the state message. There are two situtations taht result in similar outcomes: - the dask cluster object managed by your flow run errors out; this will definitely be logged by the flow runner and show up in the Failed state message (as it did with your run
81060a1b-8e15-4c8d-9672-4da86a64f489
, there’s a clear
KilledWorker
error) - the dask cluster still (partially) crashes, but in a way that the scheduler simply releases all work. This won’t necessarily result in an error in the flow runner process would would still result in incomplete task runs. Either way, this is very much a dask resource issue. You can use coiled directly without new agents or anything: the
coiled
package has a Cluster object that you can feed directly into the
DaskExecutor
when configuring your Flow. As long as your coiled credentials are present (possibly in the docker image you run your flows with) then it will “just work” and create a cluster in coiled land and farm your tasks out to it
r

Robin

10/15/2020, 7:24 PM
OK, so while trying to solve the root cause issues with the "dask on EKS" via the github issues with your team and maybe the dask team, I will try to also test it with coiled. It's a bit frustrating that we are working with several workarounds at the moment, but we learn from it. 🙂 Is there a ressource with which prefect runs more stable and flawlessly than kubernetes cluster, that is also ready to scale to 10s of thousands of parallelized tasks? Although we would expect that scaling flows with prefect runs seemlessly with different cloud providers and services, we are open to adapt to whatever is currently already stable!
Trying to use a coiled cluster as described here: https://github.com/PrefectHQ/prefect/discussions/3257
c

Chris White

10/15/2020, 9:02 PM
Stability is entirely based on the resource requirements of your workflow, so there’s no universal or out-of-the-box solution here; where ever your tasks run will need to have the appropriate CPU / memory / networking bandwidth etc. It seems from the symptoms I’m seeing here your tasks are heavier than your k8s cluster and / or dask cluster are capable of handling
r

Robin

10/15/2020, 9:27 PM
OK, thanks for all the feedback!
Highly appreciate it 🙏
c

Chris White

10/15/2020, 9:31 PM
for sure! I appreciate you raising all of these issues; regardless of whether there ends up being a Prefect bug lurking in here, we need to find better ways of making these sorts of infrastructure events more obvious in the UI so it’s easier to self debug.
💯 1