+ - 0:00:00
Notes for current slide
Notes for next slide

Dask-Gateway

Dask clusters as a service

Jim Crist

jiminy_crist // jcrist


Anaconda Inc.

1 / 39

The state of deployments

2 / 39

Cluster Manager

  • In charge of deploying dask schedulers/workers on cluster backend

  • Many different implementations

  • Standard interface:

    # Scale to n workers
    cluster.scale(n)
    # Adaptively scale between min and max workers
    cluster.adapt(min, max)
3 / 39

Cluster Manager - Example

Example using dask-yarn

# Create a cluster backed by YARN
from dask_yarn import YarnCluster
cluster = YarnCluster(
environment='environment.tar.gz',
worker_vcores=2,
worker_memory="8GiB",
)
# Scale to 2 workers
cluster.scale(2)
# Connect to the cluster
from dask.distributed import Client
client = Client(cluster)
# Application code...
import dask.dataframe as dd
df = dd.read_parquet(...)
...
4 / 39

Cluster Manager - Example

Example using dask-yarn

# Create a cluster backed by YARN
from dask_yarn import YarnCluster
cluster = YarnCluster(
environment='environment.tar.gz',
worker_vcores=2,
worker_memory="8GiB",
)
# Scale to 2 workers
cluster.scale(2)
# Connect to the cluster
from dask.distributed import Client
client = Client(cluster)
# Application code...
import dask.dataframe as dd
df = dd.read_parquet(...)
...
5 / 39

Cluster Manager - Example

Kubernetes: dask-kubernetes

>>> from dask_kubernetes import KubeCluster
>>> cluster = KubeCluster(...)

YARN: dask-yarn

>>> from dask_yarn import YarnCluster
>>> cluster = YarnCluster(...)

HPC Clusters: dask-jobqueue

>>> from dask_jobqueue import PBSCluster
>>> cluster = PBSCluster(...)
6 / 39

Status Quo - Pros

  • Deployment is part of user's code, no need for extra CLI tools.

    • Work from scripts, notebooks, applications, etc...
  • No need for extra infrastructure, plays well with deployment backends

  • Just python libraries - easy to install

  • Extensible design, anyone can make a backend

7 / 39

Status Quo - Cons

  • Requires domain-specific knowledge of backend

    • Difficult for new users
  • Users must have permissions and network access for cluster backend

    • May require users have excessive permissions, security risk
    • Client usually must be on same system as scheduler/workers
  • No central management

    • Must rely on backend for tracking user actions and resource usage
    • Hard to enforce good practices
  • Clusters are tied to lifetime of client

    • Restart your notebook, restart the cluster
8 / 39

9 / 39

Dask-Gateway

10 / 39

Dask-Gateway

  • REST api for managing clusters

  • Proxy for client to scheduler traffic (TLS)

  • Proxy for dashboards (HTTP)

  • Flexible design

    • Configurable backend (Kubernetes, YARN, HPC, ...)
    • Configurable authentication (Kerberos, JupyterHub, ...)
  • Most actions done server-side (simple client, more complicated server)

11 / 39

Using Dask-Gateway

# Previous example using dask-yarn...
# Create a cluster backed by YARN
from dask_yarn import YarnCluster
cluster = YarnCluster(
environment='environment.tar.gz',
worker_vcores=2,
worker_memory="8GiB",
)
# Scale to 2 workers
cluster.scale(2)
# Connect to the cluster
from dask.distributed import Client
client = Client(cluster)
# Application code...
import dask.dataframe as dd
df = dd.read_parquet(...)
...
12 / 39

Using Dask-Gateway

# Same example using dask-gateway
# Create a cluster backed by dask-gateway
from dask_gateway import GatewayCluster
cluster = GatewayCluster(
address="https://mygateway.address.com",
worker_vcores=2,
worker_memory="8GiB",
)
# Scale to 2 workers
cluster.scale(2)
# Connect to the cluster
from dask.distributed import Client
client = Client(cluster)
# Application code...
import dask.dataframe as dd
df = dd.read_parquet(...)
...
13 / 39

Demo

14 / 39

Additional Features

  • User resource limits

  • Automatic shutdown of idle clusters

  • Strong interop with JupyterHub

  • ...

15 / 39

Internals

16 / 39

*Currently being rewritten

17 / 39

Internals - Highlights

  • Server written in Python using aiohttp

  • Proxy written in Go

  • Traitlets heavily used for configuration

18 / 39

Traitlets!

19 / 39

Traitlets - Example

from traitlets.config import Configurable
from traitlets import Float
class Rectangle(Configurable):
width = Float(
1,
min=0,
help="The width of the rectangle in meters",
config=True,
)
height = Float(
1,
min=0,
help="The height of the rectangle in meters",
config=True,
)
def area(self):
return self.width * self.height
20 / 39

Traitlets - Example

Configuration

# config.py
c.Rectangle.width = 10
c.Rectangle.height = 25

Usage

>>> r = Rectangle(config=c)
>>> r.width
10
>>> r.height
25
>>> r.area()
250
21 / 39

Traitlets - Validation

Configuration

# config.py
c.Rectangle.width = 10
c.Rectangle.height = -1 # invalid value

Usage

>>> r = Rectangle(config=c)
...
TraitError: The value of the 'height' trait of a Rectangle instance
should not be less than 0, but a value of -1.0 was specified
22 / 39

Traitlets - Custom logic

# config.py
import sys
# Support wider rectangles on windows:
if sys.platform == "win32":
c.Rectangle.width = 20
else:
c.Rectangle.width = 10
c.Rectangle.height = 10
23 / 39

Configurable Backends

  • Wanted support for multiple backends

    • Kubernetes
    • YARN
    • HPC Job Queues
    • ...
24 / 39

Configurable Backends

  • Wanted support for multiple backends

    • Kubernetes
    • YARN
    • HPC Job Queues
    • ...
  • Solution:

    • Abstract backend-specifics out into a base class
    • Implement class for various backends
    • Make backend class configurable using traitlets.Type.
25 / 39

Backend Base Class

class Backend(LoggingConfigurable):
async def setup(self, app):
"""Setup the backend, called on startup"""
async def cleanup(self):
"""Cleanup the backend, called on shutdown"""
async def list_clusters(self, username=None, statuses=None):
"""List clusters, with optional filtering"""
async def get_cluster(self, cluster_name, wait=False):
"""Lookup a cluster"""
async def start_cluster(self, user, cluster_options):
"""Submit a new cluster."""
async def stop_cluster(self, cluster_name, failed=False):
"""Stop a cluster."""
async def on_cluster_heartbeat(self, cluster_name, msg):
"""Handle heartbeats from the cluster."""
26 / 39

DataBase-backed Backend Base Class

class DBBackendBase(Backend):
async def do_start_cluster(self, cluster):
"""Start a cluster."""
async def do_stop_cluster(self, cluster):
"""Stop a cluster."""
async def do_check_clusters(self, clusters):
"""Check the status of multiple clusters."""
async def do_start_worker(self, worker):
"""Start a worker."""
async def do_stop_worker(self, worker):
"""Stop a worker."""
async def do_check_workers(self, workers):
"""Check the status of multiple workers."""
27 / 39

Backend - Configuration

# Use the Kubernetes backend
c.DaskGateway.backend_class = "dask_gateway_server.backends.kubernetes.KubeBackend"
# Specify which base image to use
c.KubeClusterConfig.image = "myuser/my-dask-gateway-image:latest"
# Specify worker resources
c.KubeClusterConfig.worker_cores = 2
c.KubeClusterConfig.worker_memory = "8 G"
...
28 / 39

Scheduler Proxy

  • Dask uses raw TCP to communicate

  • Most proxies either:

    • Proxy HTTP (level 7), dispatching on the path in the HTTP request
    • Proxy TCP (level 4) to a static address
  • We need to proxy TLS (encrypted TCP) to multiple backends

    • Could include routing in each message, but would need to have proxy decrypt traffic (expensive)
29 / 39

Solution: Use SNI

  • SNI (Server Name Indication)

    "an extension to the Transport Layer Security (TLS) computer networking protocol by which a client indicates which hostname it is attempting to connect to at the start of the handshaking process." (Wikipedia)

  • Allows routing TLS encrypted traffic based on an unencrypted field in handshake

30 / 39

Scheduler Proxy

  • On each connection:

    • Extract handshake message, extract SNI

    • Check SNI, rejecting clients with invalid SNIs

    • Connect to backend for specified SNI

    • Replay extracted handshake to backend

    • Copy bytes between sockets until connection closes (fast!)

31 / 39

Scheduler Proxy

  • Written in Go

    • Needed some low-level bytes handling

    • Good support for handling many clients

    • Stdlib had features that made this easier to write (~1000 lines total)

    • Also wanted an excuse to write some Go

  • For K8s backend rewrite we're using Traefik instead

32 / 39

Scheduler Proxy

  • Bundled with the dask-gateway-server wheel:

# setup.py
setup(
...,
package_data={"dask_gateway_server": ["proxy/dask-gateway-proxy"]},
...
)
33 / 39

Conclusion

34 / 39

Dask-Gateway

  • Centrally Managed

    • Admins do the heavy lifting, users get easy cluster access.
  • Secure by default

    • Automatic TLS everywhere, no elevated user permissions.
  • Flexible

    • Configurable backends, authentication, etc...
  • Robust to Failure

    • Can be restarted or failover without losing existing clusters.
35 / 39

Dask-Gateway

  • Designed with institutional users/teams in mind

  • Previous deployment options still quite useful, and may be preferred for individuals and small teams.

36 / 39

Ongoing Work

  • Finishing up the rewrite

    • Re-adding Kubernetes backend
    • Testing
    • Documentation
  • Web UI

    • Authentication
    • Dashboarding
    • Admin panel
  • User driven features (come talk to me/file an issue)

37 / 39

Questions?


39 / 39

The state of deployments

2 / 39
Paused

Help

Keyboard shortcuts

, , Pg Up, k Go to previous slide
, , Pg Dn, Space, j Go to next slide
Home Go to first slide
End Go to last slide
Number + Return Go to specific slide
b / m / f Toggle blackout / mirrored / fullscreen mode
c Clone slideshow
p Toggle presenter mode
t Restart the presentation timer
?, h Toggle this help
Esc Back to slideshow