class: center, middle # Dask-Gateway # Dask clusters as a service
Jim Crist
`jiminy_crist` //
`jcrist`
*Anaconda Inc.* --- class: center, middle # The state of deployments --- # Cluster Manager - In charge of deploying dask schedulers/workers on cluster backend - Many different implementations - [Kubernetes](https://kubernetes.dask.org) - [YARN](https://yarn.dask.org) - [HPC Job queues](https://jobqueue.dask.org) - [Cloud Providers (EC2, ...)](https://cloudprovider.dask.org) - ... - Standard interface: ```python # Scale to n workers cluster.scale(n) # Adaptively scale between min and max workers cluster.adapt(min, max) ``` --- # Cluster Manager - Example Example using [`dask-yarn`](http://yarn.dask.org/en/latest/) ```python # Create a cluster backed by YARN from dask_yarn import YarnCluster cluster = YarnCluster( environment='environment.tar.gz', worker_vcores=2, worker_memory="8GiB", ) # Scale to 2 workers cluster.scale(2) # Connect to the cluster from dask.distributed import Client client = Client(cluster) # Application code... import dask.dataframe as dd df = dd.read_parquet(...) ... ``` --- # Cluster Manager - Example Example using [`dask-yarn`](http://yarn.dask.org/en/latest/) ```python *# Create a cluster backed by YARN *from dask_yarn import YarnCluster *cluster = YarnCluster( * environment='environment.tar.gz', * worker_vcores=2, * worker_memory="8GiB", *) # Scale to 2 workers cluster.scale(2) # Connect to the cluster from dask.distributed import Client client = Client(cluster) # Application code... import dask.dataframe as dd df = dd.read_parquet(...) ... ``` --- # Cluster Manager - Example **Kubernetes**: [`dask-kubernetes`](https://kubernetes.dask.org) ```python >>> from dask_kubernetes import KubeCluster >>> cluster = KubeCluster(...) ``` **YARN**: [`dask-yarn`](https://yarn.dask.org) ```python >>> from dask_yarn import YarnCluster >>> cluster = YarnCluster(...) ``` **HPC Clusters**: [`dask-jobqueue`](https://jobqueue.dask.org) ```python >>> from dask_jobqueue import PBSCluster >>> cluster = PBSCluster(...) ``` --- # Status Quo - Pros - Deployment is part of user's code, no need for extra CLI tools. - Work from scripts, notebooks, applications, etc... - No need for extra infrastructure, plays well with deployment backends - Just python libraries - easy to install - Extensible design, anyone can make a backend --- # Status Quo - Cons - Requires domain-specific knowledge of backend - Difficult for new users - Users must have permissions and network access for cluster backend - May require users have excessive permissions, security risk - Client usually must be on same system as scheduler/workers - No central management - Must rely on backend for tracking user actions and resource usage - Hard to enforce good practices - Clusters are tied to lifetime of client - Restart your notebook, restart the cluster --- class: center, middle
--- # Dask-Gateway
--- # Dask-Gateway - REST api for managing clusters - Proxy for client to scheduler traffic (TLS) - Proxy for dashboards (HTTP) - Flexible design - Configurable backend (Kubernetes, YARN, HPC, ...) - Configurable authentication (Kerberos, JupyterHub, ...) - Most actions done server-side (simple client, more complicated server) --- # Using Dask-Gateway ```python # Previous example using dask-yarn... # Create a cluster backed by YARN from dask_yarn import YarnCluster cluster = YarnCluster( environment='environment.tar.gz', worker_vcores=2, worker_memory="8GiB", ) # Scale to 2 workers cluster.scale(2) # Connect to the cluster from dask.distributed import Client client = Client(cluster) # Application code... import dask.dataframe as dd df = dd.read_parquet(...) ... ``` --- # Using Dask-Gateway ```python # Same example using dask-gateway # Create a cluster backed by dask-gateway from dask_gateway import GatewayCluster cluster = GatewayCluster( address="https://mygateway.address.com", worker_vcores=2, worker_memory="8GiB", ) # Scale to 2 workers cluster.scale(2) # Connect to the cluster from dask.distributed import Client client = Client(cluster) # Application code... import dask.dataframe as dd df = dd.read_parquet(...) ... ``` --- class: center, middle # Demo --- # Additional Features - User resource limits - Automatic shutdown of idle clusters - Strong interop with JupyterHub - ... --- class: center, middle # Internals --- class: center, middle **Currently being rewritten* --- # Internals - Highlights - Server written in Python using `aiohttp` - Proxy written in Go - Traitlets heavily used for configuration --- # Traitlets! - Declarative - Automatic validation - Configuration as code - Same config as Jupyter ecosystem (https://github.com/ipython/traitlets) --- # Traitlets - Example ```python from traitlets.config import Configurable from traitlets import Float class Rectangle(Configurable): width = Float( 1, min=0, help="The width of the rectangle in meters", config=True, ) height = Float( 1, min=0, help="The height of the rectangle in meters", config=True, ) def area(self): return self.width * self.height ``` --- # Traitlets - Example **Configuration** ```python # config.py c.Rectangle.width = 10 c.Rectangle.height = 25 ``` **Usage** ```python >>> r = Rectangle(config=c) >>> r.width 10 >>> r.height 25 >>> r.area() 250 ``` --- # Traitlets - Validation **Configuration** ```python # config.py c.Rectangle.width = 10 c.Rectangle.height = -1 # invalid value ``` **Usage** ```python >>> r = Rectangle(config=c) ... TraitError: The value of the 'height' trait of a Rectangle instance should not be less than 0, but a value of -1.0 was specified ``` --- # Traitlets - Custom logic ```python # config.py import sys # Support wider rectangles on windows: if sys.platform == "win32": c.Rectangle.width = 20 else: c.Rectangle.width = 10 c.Rectangle.height = 10 ``` --- # Configurable Backends - Wanted support for multiple backends - Kubernetes - YARN - HPC Job Queues - ... --- # Configurable Backends - Wanted support for multiple backends - Kubernetes - YARN - HPC Job Queues - ... - Solution: - Abstract backend-specifics out into a base class - Implement class for various backends - Make backend class configurable using [`traitlets.Type`](https://traitlets.readthedocs.io/en/stable/trait_types.html#traitlets.Type). --- # Backend Base Class - See [`dask_gateway_server.backends.base`](https://github.com/dask/dask-gateway/blob/master/dask-gateway-server/dask_gateway_server/backends/base.py) for full definition. ```python class Backend(LoggingConfigurable): async def setup(self, app): """Setup the backend, called on startup""" async def cleanup(self): """Cleanup the backend, called on shutdown""" async def list_clusters(self, username=None, statuses=None): """List clusters, with optional filtering""" async def get_cluster(self, cluster_name, wait=False): """Lookup a cluster""" async def start_cluster(self, user, cluster_options): """Submit a new cluster.""" async def stop_cluster(self, cluster_name, failed=False): """Stop a cluster.""" async def on_cluster_heartbeat(self, cluster_name, msg): """Handle heartbeats from the cluster.""" ``` --- # DataBase-backed Backend Base Class - Backend implementation that uses a database to store state - See [`dask_gateway_server.backends.db_base`](https://github.com/dask/dask-gateway/blob/master/dask-gateway-server/dask_gateway_server/backends/db_base.py) for full definition. ```python class DBBackendBase(Backend): async def do_start_cluster(self, cluster): """Start a cluster.""" async def do_stop_cluster(self, cluster): """Stop a cluster.""" async def do_check_clusters(self, clusters): """Check the status of multiple clusters.""" async def do_start_worker(self, worker): """Start a worker.""" async def do_stop_worker(self, worker): """Stop a worker.""" async def do_check_workers(self, workers): """Check the status of multiple workers.""" ``` --- # Backend - Configuration ```python # Use the Kubernetes backend c.DaskGateway.backend_class = "dask_gateway_server.backends.kubernetes.KubeBackend" # Specify which base image to use c.KubeClusterConfig.image = "myuser/my-dask-gateway-image:latest" # Specify worker resources c.KubeClusterConfig.worker_cores = 2 c.KubeClusterConfig.worker_memory = "8 G" ... ``` --- # Scheduler Proxy - Dask uses raw TCP to communicate - Most proxies either: - Proxy HTTP (level 7), dispatching on the path in the HTTP request - Proxy TCP (level 4) to a static address - We need to proxy TLS (encrypted TCP) to *multiple* backends - Could include routing in each message, but would need to have proxy decrypt traffic (expensive) --- # Solution: Use SNI - SNI (Server Name Indication) > "an extension to the Transport Layer Security (TLS) computer networking protocol by which a client indicates which hostname it is attempting to connect to at the start of the handshaking process." ([Wikipedia](https://en.wikipedia.org/wiki/Server_Name_Indication)) - Allows routing TLS *encrypted* traffic based on an *unencrypted* field in handshake --- # Scheduler Proxy - On each connection: - Extract handshake message, extract SNI - Check SNI, rejecting clients with invalid SNIs - Connect to backend for specified SNI - Replay extracted handshake to backend - Copy bytes between sockets until connection closes (fast!) ---
# Scheduler Proxy - Written in [Go](https://golang.org/) - Needed some low-level bytes handling - Good support for handling many clients - Stdlib had features that made this easier to write (~1000 lines total) - Also wanted an excuse to write some Go - For K8s backend rewrite we're using [Traefik](https://docs.traefik.io) instead ---
# Scheduler Proxy - Bundled with the `dask-gateway-server` wheel: - Wanted an easy install - Added as "package-data" ([setuptools docs](https://setuptools.readthedocs.io/en/latest/setuptools.html#including-data-files)) ```python # setup.py setup( ..., package_data={"dask_gateway_server": ["proxy/dask-gateway-proxy"]}, ... ) ``` --- class: center, middle # Conclusion --- # Dask-Gateway - **Centrally Managed** - Admins do the heavy lifting, users get easy cluster access. - **Secure by default** - Automatic TLS everywhere, no elevated user permissions. - **Flexible** - Configurable backends, authentication, etc... - **Robust to Failure** - Can be restarted or failover without losing existing clusters. --- # Dask-Gateway - Designed with *institutional* users/teams in mind - Previous deployment options still quite useful, and may be preferred for individuals and small teams. --- # Ongoing Work - Finishing up the rewrite - Re-adding Kubernetes backend - Testing - Documentation - Web UI - Authentication - Dashboarding - Admin panel - User driven features (come talk to me/file an issue) --- # Resources - Documentation: https://gateway.dask.org/ - [User Guide](https://gateway.dask.org/usage.html) - [Install on Kubernetes](https://gateway.dask.org/install-kube.html) - [Install on Hadoop/YARN](https://gateway.dask.org/install-hadoop.html) - GitHub: https://github.com/dask/dask-gateway/ - Dask Website: https://dask.org/ --- class: center, middle # Questions?
- Documentation: https://gateway.dask.org/ - GitHub: https://github.com/dask/dask-gateway/