I have come across an issue where dask scheduler get killed(though workers keep running) with memory error if large number of tasks are submitted in short period of time.
If it's possible to get current number of task on the cluster, then it's easy to control count of concurrent tasks submitted to the cluster.
NOTE: Tasks are being submitted to same scheduler from multiple clients.
You can run arbitrary Python functions on the scheduler with the client.run_on_scheduler method.
Using this you can look at any of the scheduler state you like.
Note that the scheduler can handle millions of tasks. If you're getting anywhere close to this then you should probably rethink how you're using Dask. For optimal performance you should choose tasks that take take hundreds of milliseconds or more.