class: center, middle # Dask-Gateway Internals
Jim Crist-Harif
`jcristharif` //
`jcrist`
*Coiled* --- class: center, middle # Dask Deployments --- # Cluster Manager - In charge of deploying dask schedulers/workers on cluster backend - Many different implementations - [Kubernetes](https://kubernetes.dask.org) - [YARN](https://yarn.dask.org) - [HPC Job queues](https://jobqueue.dask.org) - [Cloud Providers (EC2, ...)](https://cloudprovider.dask.org) - ... - Standard interface: ```python # Scale to n workers cluster.scale(n) # Adaptively scale between min and max workers cluster.adapt(min, max) ``` --- # Cluster Manager - Example Example using [`dask-yarn`](http://yarn.dask.org/en/latest/) ```python # Create a cluster backed by YARN from dask_yarn import YarnCluster cluster = YarnCluster( environment='environment.tar.gz', worker_vcores=2, worker_memory="8GiB", ) # Scale to 2 workers cluster.scale(2) # Connect to the cluster from dask.distributed import Client client = Client(cluster) # Application code... import dask.dataframe as dd df = dd.read_parquet(...) ... ``` --- # Cluster Manager - Example Example using [`dask-yarn`](http://yarn.dask.org/en/latest/) ```python *# Create a cluster backed by YARN *from dask_yarn import YarnCluster *cluster = YarnCluster( * environment='environment.tar.gz', * worker_vcores=2, * worker_memory="8GiB", *) # Scale to 2 workers cluster.scale(2) # Connect to the cluster from dask.distributed import Client client = Client(cluster) # Application code... import dask.dataframe as dd df = dd.read_parquet(...) ... ``` --- # Cluster Manager - Example **Kubernetes**: [`dask-kubernetes`](https://kubernetes.dask.org) ```python >>> from dask_kubernetes import KubeCluster >>> cluster = KubeCluster(...) ``` **YARN**: [`dask-yarn`](https://yarn.dask.org) ```python >>> from dask_yarn import YarnCluster >>> cluster = YarnCluster(...) ``` **HPC Clusters**: [`dask-jobqueue`](https://jobqueue.dask.org) ```python >>> from dask_jobqueue import PBSCluster >>> cluster = PBSCluster(...) ``` --- # Status Quo - Pros - Deployment is part of user's code, no need for extra CLI tools. - Work from scripts, notebooks, applications, etc... - No need for extra infrastructure, plays well with deployment backends - Just python libraries - easy to install - Extensible design, anyone can make a backend --- # Status Quo - Cons - Requires domain-specific knowledge of backend - Difficult for new users - Users must have permissions and network access for cluster backend - May require users have excessive permissions, security risk - Client usually must be on same system as scheduler/workers - No central management - Must rely on backend for tracking user actions and resource usage - Hard to enforce good practices - Clusters are tied to lifetime of client - Restart your notebook, restart the cluster --- class: center, middle
--- # Dask-Gateway
--- # Dask-Gateway - REST api for managing clusters - Proxy for client to scheduler traffic (TLS) - Proxy for dashboards (HTTP) - Flexible design - Configurable backend (Kubernetes, YARN, HPC, ...) - Configurable authentication (Kerberos, JupyterHub, ...) - Most actions done server-side (simple client, more complicated server) --- # Using Dask-Gateway ```python # Previous example using dask-yarn... # Create a cluster backed by YARN from dask_yarn import YarnCluster cluster = YarnCluster( environment='environment.tar.gz', worker_vcores=2, worker_memory="8GiB", ) # Scale to 2 workers cluster.scale(2) # Connect to the cluster from dask.distributed import Client client = Client(cluster) # Application code... import dask.dataframe as dd df = dd.read_parquet(...) ... ``` --- # Using Dask-Gateway ```python # Same example using dask-gateway # Create a cluster backed by dask-gateway from dask_gateway import GatewayCluster cluster = GatewayCluster( address="https://mygateway.address.com", worker_vcores=2, worker_memory="8GiB", ) # Scale to 2 workers cluster.scale(2) # Connect to the cluster from dask.distributed import Client client = Client(cluster) # Application code... import dask.dataframe as dd df = dd.read_parquet(...) ... ``` --- class: center, middle # Demo --- # Additional Features - User resource limits - Automatic shutdown of idle clusters - Strong interop with JupyterHub - ... --- class: center, middle # Internals --- # Internals - Highlights - Server written in Python using `aiohttp` - Proxy written in Go - Traitlets heavily used for configuration --- # Traitlets! - Declarative - Automatic validation - Configuration as code - Same config as Jupyter ecosystem (https://github.com/ipython/traitlets) --- # Traitlets - Example ```python from traitlets.config import Configurable from traitlets import Float class Rectangle(Configurable): width = Float( 1, min=0, help="The width of the rectangle in meters", config=True, ) height = Float( 1, min=0, help="The height of the rectangle in meters", config=True, ) def area(self): return self.width * self.height ``` --- # Traitlets - Example **Configuration** ```python # config.py c.Rectangle.width = 10 c.Rectangle.height = 25 ``` **Usage** ```python >>> r = Rectangle(config=c) >>> r.width 10 >>> r.height 25 >>> r.area() 250 ``` --- # Traitlets - Validation **Configuration** ```python # config.py c.Rectangle.width = 10 c.Rectangle.height = -1 # invalid value ``` **Usage** ```python >>> r = Rectangle(config=c) ... TraitError: The value of the 'height' trait of a Rectangle instance should not be less than 0, but a value of -1.0 was specified ``` --- # Traitlets - Custom logic ```python # config.py import sys # Support wider rectangles on windows: if sys.platform == "win32": c.Rectangle.width = 20 else: c.Rectangle.width = 10 c.Rectangle.height = 10 ``` --- # Configurable Backends - Wanted support for multiple backends - Kubernetes - YARN - HPC Job Queues - ... --- # Configurable Backends - Wanted support for multiple backends - Kubernetes - YARN - HPC Job Queues - ... - Solution: - Abstract backend-specifics out into a base class - Implement class for various backends - Make backend class configurable using [`traitlets.Type`](https://traitlets.readthedocs.io/en/stable/trait_types.html#traitlets.Type). --- # Backend Base Class - See [`dask_gateway_server.backends.base`](https://github.com/dask/dask-gateway/blob/master/dask-gateway-server/dask_gateway_server/backends/base.py) for full definition. ```python class Backend(LoggingConfigurable): async def setup(self, app): """Called when the server is starting up.""" async def cleanup(self): """Called when the server is shutting down.""" async def list_clusters(self, username=None, statuses=None): """List known clusters.""" async def get_cluster(self, cluster_name, wait=False): """Get information about a cluster.""" async def start_cluster(self, user, cluster_options): """Submit a new cluster.""" async def stop_cluster(self, cluster_name, failed=False): """Stop a cluster.""" async def on_cluster_heartbeat(self, cluster_name, msg): """Handle a cluster heartbeat.""" ``` --- # DataBase-backed Backend Base Class - Backend implementation that uses a database to store state - See [`dask_gateway_server.backends.db_base`](https://github.com/dask/dask-gateway/blob/master/dask-gateway-server/dask_gateway_server/backends/db_base.py) for full definition. ```python class DBBackendBase(Backend): async def do_start_cluster(self, cluster): "Start a cluster" async def do_stop_cluster(self, cluster): "Stop a cluster" async def do_check_clusters(self, clusters): "Check the status of multiple clusters" async def do_start_worker(self, worker): "Start a worker" async def do_stop_worker(self, worker): "Stop a worker" async def do_check_workers(self, workers): "Check the status of multiple workers" ``` --- # Backend - Configuration ```python # Use the Kubernetes backend c.DaskGateway.backend_class = "dask_gateway_server.backends.kubernetes.KubeBackend" # Specify which base image to use c.KubeClusterConfig.image = "myuser/my-dask-gateway-image:latest" # Specify worker resources c.KubeClusterConfig.worker_cores = 2 c.KubeClusterConfig.worker_memory = "8 G" ... ``` --- # Scheduler Proxy - Dask uses raw TCP to communicate - Most proxies either: - Proxy HTTP (level 7), dispatching on the path in the HTTP request - Proxy TCP (level 4) to a static address - We need to proxy TLS (encrypted TCP) to *multiple* backends - Could include routing in each message, but would need to have proxy decrypt traffic (expensive) --- # Solution: Use SNI - SNI (Server Name Indication) > "an extension to the Transport Layer Security (TLS) computer networking protocol by which a client indicates which hostname it is attempting to connect to at the start of the handshaking process." ([Wikipedia](https://en.wikipedia.org/wiki/Server_Name_Indication)) - Allows routing TLS *encrypted* traffic based on an *unencrypted* field in handshake --- # Scheduler Proxy - On each connection: - Extract handshake message, extract SNI - Check SNI, rejecting clients with invalid SNIs - Connect to backend for specified SNI - Replay extracted handshake to backend - Copy bytes between sockets until connection closes (fast!) ---
# Scheduler Proxy - Written in [Go](https://golang.org/) - Needed some low-level bytes handling - Good support for handling many clients - Stdlib had features that made this easier to write (~1000 lines total) - Also wanted an excuse to write some Go - For K8s backend we use [Traefik](https://docs.traefik.io) instead ---
# Scheduler Proxy - Bundled with the `dask-gateway-server` wheel: - Wanted an easy install - Added as "package-data" ([setuptools docs](https://setuptools.readthedocs.io/en/latest/setuptools.html#including-data-files)) ```python # setup.py setup( ..., package_data={"dask_gateway_server": ["proxy/dask-gateway-proxy"]}, ... ) ``` --- class: center, middle # Internals (the gory details) --- # Cluster Startup Process (Client) - Client POST to `/api/v1/clusters`, gets cluster ID - Client long polls in loop to `/api/v1/clusters/CLUSTER-ID` - Client gets cluster info, initiates connection through proxy --- # Cluster Startup Process (Server) - Server receives & validates cluster request - Server adds cluster record in backend - Reconciler loop moves cluster through startup state machine - Scheduler eventually starts, heartbeats home - Cluster is marked as "Running", client(s) notified --- # Cluster Scale Process (Client) - Client submits scale request to API - That's it --- # Cluster Scale Process (Server) - Server receives & validates scale request - Server forwards scale request to scheduler - Scheduler heartbeats back info about current & desired state - Desired worker count - Lists of active, closing, and closed workers - Cluster & worker state changes stored in backend - Reconciler loop notices & responds to changes --- # Cluster Scale Up Process (Server) - Reconciler loop notes cluster needs new workers, creates new worker records - Reconciler loop moves workers through worker state machine - Worker connects to scheduler - Worker is part of active set sent in scheduler heartbeat - Worker is marked as RUNNING --- # Cluster Scale Down Process (Server) - Scheduler chooses 1 or more workers to close - Scheduler notes closing workers in next heartbeat - Scheduler requests workers close themselves - Server marks workers as CLOSING - If workers time out in CLOSING, server force-kills them - Server notices workers stopped, cleans up state (if any) --- # Cluster Adapt Process (Client) - Client submits adapt request to API - That's it --- # Cluster Adapt Process (Server) - Server receives & validates adapt request - Server forwards adapt request to scheduler - Scheduler moves to adaptive scaling mode, dynamically changing its set worker count within a set range - Remainder is same as Scale process described above --- # Cluster Shutdown Process (Client) - Client submits shutdown request to API - OR Scheduler hits idle timeout and pings sends shutdown request to API --- # Cluster Shutdown Process (Server) - Server receives & validates shutdown request - Server moves all workers through state machine to STOPPED - Server moves scheduler through state machine to STOPPED - *Eventually* cluster record removed from backend --- # Error Handling - Backend requests can fail! What to do then? - Token bucket backoff implemented in work queue, transitions are retried after backoff - If worker subsequent start failure count (per cluster) exceeds limit, cluster marked as failed --- # Kubernetes Backend Differences - All state stored in etcd as CRD objects - 1 controller process (the reconciler loop alone) - 1 or more API servers running - 1 or more traefik proxies running --- # Kubernetes Backend Differences
--- # Non-k8s backend resiliency - 1 or more proxies running (most users just use one) - If API server stops, running schedulers still work fine, just can't start/stop/scale/connect new clusters - On API server restart - Existing state loaded from DB - Reconciler queue filled with all existing clusters - Start handling requests again --- class: center, middle # Conclusion --- # Dask-Gateway - **Centrally Managed** - Admins do the heavy lifting, users get easy cluster access. - **Secure by default** - Automatic TLS everywhere, no elevated user permissions. - **Flexible** - Configurable backends, authentication, etc... - **Robust to Failure** - Can be restarted or failover without losing existing clusters. --- class: center, middle # Questions?
- Documentation: https://gateway.dask.org/ - GitHub: https://github.com/dask/dask-gateway/