Jim Crist
Continuum Analytics
OSBD Workshop, 12/05/16
Scientific Python goes back to 1995
Scale Up | Scale Out |
---|---|
- C, C++, Fortran, Cython, ... | - MPI |
- CUDA-C, OpenCL | - Spark, MapReduce, ... |
A parallel computing framework
Written in pure Python
That leverages the excellent Python ecosystem
Using blocked algorithms and task scheduling
Blocked Mean
x = h5py.File('data.hdf5')['x'] # Trillion element array on disk
sums = []
counts = []
N = 1000000
for i in range(N): # One million times
chunk = x[N*i: N*(i+1)] # Pull out chunk
sums.append(np.sum(chunk)) # Sum chunk
counts.append(len(chunk)) # Count chunk
result = sum(sums) / sum(counts) # Aggregate results
x.mean()
x + x.T
x.dot(x.T + 1)
x.dot(x.T + 1) - x.mean()
(x.dot(x.T + 1) - x.mean()).std()
U, S, V = da.linalg.svd(x + x.T - x.mean())
# Reductions
df.passenger_count.mean()
# Groupby operations
df.groupby(df.passenger_count).trip_distance.mean()
# Timeseries operations
df.trip_distance.resample('h').mean()
# many many more...
# Wrap functions to make them lazy
delayed(function)(*args, **kwargs) -> Delayed
# Wrap data to make attribute access lazy
delayed(data) -> Delayed
_
results = {}
for a in A:
for b in B:
for c in C:
r1 = fit1(x, a)
r2 = fit2(r1, b)
r3 = fit3(r2, c)
results[a, b, c] = r3
best = score(results)
_
from dask import delayed, compute
results = {}
for a in A:
for b in B:
for c in C:
r1 = delayed(fit1)(x, a)
r2 = delayed(fit2)(r1, b)
r3 = delayed(fit3)(r2, c)
results[a, b, c] = r3
best = delayed(score)(results)
result = best.compute()
Parallel CPU: Uses multiple threads or processes
Minimizes RAM: Choose tasks to remove intermediates
Low overhead: ~100us per task
Concise: ~600 LOC, stable for ~12 months
Distributed: One scheduler coordinates many workers
Data local: Tries to moves computation to "best" worker
Asynchronous: Continuous non-blocking conversation
Multi-user: Several users can share the same system
HDFS Aware: Works well with HDFS, S3, YARN, etc..
Less Concise: ~3000 LOC Tornado TCP application
A type-specializing JIT for (numeric) Python
Translate Python syntax to machine code
Code generation done with:
conda/pip installable
$ conda install dask distributed -c conda-forge
$ pip install dask[complete] distributed --upgrade
$ conda install numba
>>> from dask.distributed import Client
>>> e = Client() # sets up local cluster
$ dask-scheduler
$ dask-worker scheduler-hostname:8786
$ dask-worker scheduler-hostname:8786