Jim Crist
jiminy_crist
// jcrist
Anaconda Inc.
In charge of deploying dask schedulers/workers on cluster backend
Many different implementations
Standard interface:
# Scale to n workerscluster.scale(n)# Adaptively scale between min and max workerscluster.adapt(min, max)
Example using dask-yarn
# Create a cluster backed by YARNfrom dask_yarn import YarnClustercluster = YarnCluster( environment='environment.tar.gz', worker_vcores=2, worker_memory="8GiB",)# Scale to 2 workerscluster.scale(2)# Connect to the clusterfrom dask.distributed import Clientclient = Client(cluster)# Application code...import dask.dataframe as dddf = dd.read_parquet(...)...
Example using dask-yarn
# Create a cluster backed by YARNfrom dask_yarn import YarnClustercluster = YarnCluster( environment='environment.tar.gz', worker_vcores=2, worker_memory="8GiB",)# Scale to 2 workerscluster.scale(2)# Connect to the clusterfrom dask.distributed import Clientclient = Client(cluster)# Application code...import dask.dataframe as dddf = dd.read_parquet(...)...
Kubernetes: dask-kubernetes
>>> from dask_kubernetes import KubeCluster>>> cluster = KubeCluster(...)
YARN: dask-yarn
>>> from dask_yarn import YarnCluster>>> cluster = YarnCluster(...)
HPC Clusters: dask-jobqueue
>>> from dask_jobqueue import PBSCluster>>> cluster = PBSCluster(...)
Deployment is part of user's code, no need for extra CLI tools.
No need for extra infrastructure, plays well with deployment backends
Just python libraries - easy to install
Extensible design, anyone can make a backend
Requires domain-specific knowledge of backend
Users must have permissions and network access for cluster backend
No central management
Clusters are tied to lifetime of client
REST api for managing clusters
Proxy for client to scheduler traffic (TLS)
Proxy for dashboards (HTTP)
Flexible design
Most actions done server-side (simple client, more complicated server)
# Previous example using dask-yarn...# Create a cluster backed by YARNfrom dask_yarn import YarnClustercluster = YarnCluster( environment='environment.tar.gz', worker_vcores=2, worker_memory="8GiB",)# Scale to 2 workerscluster.scale(2)# Connect to the clusterfrom dask.distributed import Clientclient = Client(cluster)# Application code...import dask.dataframe as dddf = dd.read_parquet(...)...
# Same example using dask-gateway# Create a cluster backed by dask-gatewayfrom dask_gateway import GatewayClustercluster = GatewayCluster( address="https://mygateway.address.com", worker_vcores=2, worker_memory="8GiB",)# Scale to 2 workerscluster.scale(2)# Connect to the clusterfrom dask.distributed import Clientclient = Client(cluster)# Application code...import dask.dataframe as dddf = dd.read_parquet(...)...
User resource limits
Automatic shutdown of idle clusters
Strong interop with JupyterHub
...
*Currently being rewritten
Server written in Python using aiohttp
Proxy written in Go
Traitlets heavily used for configuration
Declarative
Automatic validation
Configuration as code
Same config as Jupyter ecosystem (https://github.com/ipython/traitlets)
from traitlets.config import Configurablefrom traitlets import Floatclass Rectangle(Configurable): width = Float( 1, min=0, help="The width of the rectangle in meters", config=True, ) height = Float( 1, min=0, help="The height of the rectangle in meters", config=True, ) def area(self): return self.width * self.height
Configuration
# config.pyc.Rectangle.width = 10c.Rectangle.height = 25
Usage
>>> r = Rectangle(config=c)>>> r.width10>>> r.height25>>> r.area()250
Configuration
# config.pyc.Rectangle.width = 10c.Rectangle.height = -1 # invalid value
Usage
>>> r = Rectangle(config=c)...TraitError: The value of the 'height' trait of a Rectangle instanceshould not be less than 0, but a value of -1.0 was specified
# config.pyimport sys# Support wider rectangles on windows:if sys.platform == "win32": c.Rectangle.width = 20else: c.Rectangle.width = 10c.Rectangle.height = 10
Wanted support for multiple backends
Wanted support for multiple backends
Solution:
traitlets.Type
.dask_gateway_server.backends.base
for full definition.class Backend(LoggingConfigurable): async def setup(self, app): """Setup the backend, called on startup""" async def cleanup(self): """Cleanup the backend, called on shutdown""" async def list_clusters(self, username=None, statuses=None): """List clusters, with optional filtering""" async def get_cluster(self, cluster_name, wait=False): """Lookup a cluster""" async def start_cluster(self, user, cluster_options): """Submit a new cluster.""" async def stop_cluster(self, cluster_name, failed=False): """Stop a cluster.""" async def on_cluster_heartbeat(self, cluster_name, msg): """Handle heartbeats from the cluster."""
dask_gateway_server.backends.db_base
for full definition.class DBBackendBase(Backend): async def do_start_cluster(self, cluster): """Start a cluster.""" async def do_stop_cluster(self, cluster): """Stop a cluster.""" async def do_check_clusters(self, clusters): """Check the status of multiple clusters.""" async def do_start_worker(self, worker): """Start a worker.""" async def do_stop_worker(self, worker): """Stop a worker.""" async def do_check_workers(self, workers): """Check the status of multiple workers."""
# Use the Kubernetes backendc.DaskGateway.backend_class = "dask_gateway_server.backends.kubernetes.KubeBackend"# Specify which base image to usec.KubeClusterConfig.image = "myuser/my-dask-gateway-image:latest"# Specify worker resourcesc.KubeClusterConfig.worker_cores = 2c.KubeClusterConfig.worker_memory = "8 G"...
Dask uses raw TCP to communicate
Most proxies either:
We need to proxy TLS (encrypted TCP) to multiple backends
SNI (Server Name Indication)
"an extension to the Transport Layer Security (TLS) computer networking protocol by which a client indicates which hostname it is attempting to connect to at the start of the handshaking process." (Wikipedia)
Allows routing TLS encrypted traffic based on an unencrypted field in handshake
On each connection:
Extract handshake message, extract SNI
Check SNI, rejecting clients with invalid SNIs
Connect to backend for specified SNI
Replay extracted handshake to backend
Copy bytes between sockets until connection closes (fast!)
Bundled with the dask-gateway-server
wheel:
Wanted an easy install
Added as "package-data" (setuptools docs)
# setup.pysetup( ..., package_data={"dask_gateway_server": ["proxy/dask-gateway-proxy"]}, ...)
Centrally Managed
Secure by default
Flexible
Robust to Failure
Designed with institutional users/teams in mind
Previous deployment options still quite useful, and may be preferred for individuals and small teams.
Finishing up the rewrite
Web UI
User driven features (come talk to me/file an issue)
Keyboard shortcuts
↑, ←, Pg Up, k | Go to previous slide |
↓, →, Pg Dn, Space, j | Go to next slide |
Home | Go to first slide |
End | Go to last slide |
Number + Return | Go to specific slide |
b / m / f | Toggle blackout / mirrored / fullscreen mode |
c | Clone slideshow |
p | Toggle presenter mode |
t | Restart the presentation timer |
?, h | Toggle this help |
Esc | Back to slideshow |