Jim Crist-Harif

Dask and Scikit-Learn -- Putting it all together

Posted on July 26, 2016

Note: This post is old, and discusses an experimental library that no longer exists. Please see this post on dask-searchcv, and the corresponding documentation for the current state of things.

This is part 3 of a series of posts discussing recent work with dask and scikit-learn.

In this post we'll combine these concepts together to do distributed learning and grid search on a real dataset; namely the airline dataset. This contains information on every flight in the USA between 1987 and 2008. It's roughly 121 million rows, so it could be worked with on a single machine - we'll use it here for illustration purposes only.

I'm running this on a 4 node cluster of m3.2xlarge instances (8 cores, 30 GB RAM each), with one worker sharing a node with the scheduler.

from distributed import Executor, progress
exc = Executor('172.31.11.71:8786', set_as_default=True)
exc

Loading the data

I've put the csv files up on S3 for faster access on the aws cluster. We can read them into a dask dataframe using read_csv. The usecols keyword specifies the subset of columns we want to use. We also pass in blocksize to tell dask to partition the data into larger partitions than the default.

import dask.dataframe as dd

# Subset of the columns to use
cols = ['Year', 'Month', 'DayOfWeek', 'DepDelay',
        'CRSDepTime', 'UniqueCarrier', 'Origin', 'Dest']

# Create the dataframe
df = dd.read_csv('s3://dask-data/airline-data/*.csv',
                 usecols=cols,
                 blocksize=int(128e6),
                 storage_options=dict(anon=True))
df

Note that this hasn't done any work yet, we've just built up a graph specifying where to load the dataframe from. Before we actually compute, lets do a few more preprocessing steps:

These all can be done using normal pandas operations:

df = (df.drop(['DepDelay', 'CRSDepTime'], axis=1)
        .assign(hour=df.CRSDepTime.clip(upper=2399)//100,
                delayed=(df.DepDelay.fillna(16) > 15).astype('i8')))

After setting up all the preprocessing, we can call Executor.persist to fully compute the dataframe and cache it in memory across the cluster. We do this because it can easily fit in distributed memory, and will speed up all of the remaining steps (I/O can be expensive).

exc.persist(df)

progress(df, notebook=False)
[########################################] | 100% Completed | 1min 17.7s
df
dd.DataFrame<_assign..., npartitions=104>
len(df) / 1e6   # Number of rows in millions
121.232833
df.head()
Year Month DayOfWeek UniqueCarrier Origin Dest delayed hour
0 1987 10 3 PS SAN SFO 0 7
1 1987 10 4 PS SAN SFO 0 7
2 1987 10 6 PS SAN SFO 0 7
3 1987 10 7 PS SAN SFO 0 7
4 1987 10 1 PS SAN SFO 1 7

So we have roughly 121 million rows, each with 8 columns each. Looking at the columns you can see that each variable is categorical in nature, which will be important later on.

Exploratory Plotting

Now that we have the data loaded into memory on the cluster, most dataframe computations will run much faster. Lets do some exploratory plotting to see the relationship between certain features and our target variable.

# Define some aggregations to plot
aggregations = (df.groupby('Year').delayed.mean(),
                df.groupby('Month').delayed.mean(),
                df.groupby('hour').delayed.mean(),
                df.groupby('UniqueCarrier').delayed.mean().nlargest(15))

# Compute them all in a single pass over the data
(delayed_by_year,
delayed_by_month,
delayed_by_hour,
delayed_by_carrier) = dask.compute(*aggregations)

Plotting these series with bokeh gives:

grouped

The hour one is especially interesting, as it shows an increase in frequency of delays over the course of each day. This is possibly due to a cascading effect, as earlier delayed flights cause problems for later flights waiting on the plane/gate.

Extract Features

As we saw above, all of our columns are categorical in nature. To make best use of these, we need to "one-hot encode" our target variables. Scikit-learn provides a builtin class for doing this, but it doesn't play as well with dataframe inputs. Instead of making a new class, we can do the same operation using some simple functions and the dask delayed interface.

Here we define a function one_hot_encode to transform a given pandas.DataFrame into a sparse matrix, given a mapping of categories for each column. We've decorated the function with dask.delayed, which makes the function return a lazy object instead of computing immediately.

import pandas as pd
import numpy as np
from dask import delayed
from scipy import sparse

def one_hot_encode_series(s, categories, dtype='f8'):
    """Transform a pandas.Series into a sparse matrix by one-hot-encoding
    for the given `categories`"""
    cat = pd.Categorical(s, np.asarray(categories))
    codes = cat.codes
    n_features = len(cat.categories)
    n_samples = codes.size
    mask = codes != -1
    if np.any(~mask):
        raise ValueError("unknown categorical features present %s "
                        "during transform." % np.unique(s[~mask]))
    row_indices = np.arange(n_samples, dtype=np.int32)
    col_indices = codes
    data = np.ones(row_indices.size)
    return sparse.coo_matrix((data, (row_indices, col_indices)),
                            shape=(n_samples, n_features),
                            dtype=dtype).tocsr()


@delayed(pure=True)
def one_hot_encode(df, categories, dtype='f8'):
    """One-hot-encode a pandas.DataFrame.

    Parameters
    ----------
    df : pandas.DataFrame
    categories : dict
        A mapping of column name to an sequence of categories for the column.
    dtype : str, optional
        The dtype of the output array. Default is 'float64'.
    """
    arrs = [one_hot_encode_series(df[col], cats, dtype=dtype)
            for col, cats in sorted(categories.items())]
    return sparse.hstack(arrs).tocsr()

To use this function, we need to get a dict mapping column names to their categorical values. This isn't ideal, as it requires a complete pass over the data before transformation. Since the data is already in memory though, this is fairly quick to do:

# Extract categories for each feature
categories = dict(Year=np.arange(1987, 2009),
                  Month=np.arange(1, 13),
                  DayOfWeek=np.arange(1, 8),
                  hour=np.arange(24),
                  UniqueCarrier=df.UniqueCarrier.unique(),
                  Origin=df.Origin.unique(),
                  Dest=df.Dest.unique())

# Compute all the categories in one pass
categories = delayed(categories).compute()

Finally, we can build up our feature and target matrices, as two dklearn.matrix.Matrix objects.

import dklearn.matrix as dm

# Convert the series `delayed` into a `Matrix`
y = dm.from_series(df.delayed)

# `to_delayed` returns a list of `dask.Delayed` objects, each representing
# one partition in the total `dask.dataframe`
chunks = df.to_delayed()

# Apply `one_hot_encode` to each chunk, and then convert all the
# chunks into a `Matrix`
X = dm.from_delayed([one_hot_encode(x, categories) for x in chunks],
                    dtype='f8')
X, y
(dklearn.matrix, dklearn.matrix)

Extract train-test splits

Now that we have our feature and target matrices, we're almost ready to start training an estimator. The last thing we need to do is hold back some of the data for testing later on. To do that, we can use the train_test_split function, which mirrors the scikit-learn function of the same name. We'll hold back 20% of the rows:

from dklearn.cross_validation import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

Create an estimator

As discussed in the previous post, dask-learn contains several parallelization patterns for wrapping scikit-learn estimators. These make it easy to use with any estimator that matches the scikit-learn standard interface. Here we'll wrap a SGDClassifier with the Averaged wrapper. This will fit the classifier on each partition of the data, and then average the resulting coefficients to produce a final estimator.

from dklearn import Averaged
from sklearn.linear_model import SGDClassifier

est = Averaged(SGDClassifier())

Distributed grid search

The SGDClassifier also takes several hyperparameters, we'll do a grid search across a few of them to try and pick the best combination. Note that the dask-learn version of GridSearchCV mirrors the interface of the scikit-learn equivalent. We'll also pass in an instance of RandomSplit to use for cross-validation, instead of the default KFold. We do this because the samples are ordered approximately chronologically, and KFold would result in training data with potentially missing years.

from dklearn.cross_validation import RandomSplit
from dklearn.grid_search import GridSearchCV

grid = {'alpha': [0.0001, 0.001],
        'loss': ['log', 'hinge']}

search = GridSearchCV(est, grid,
                      cv=RandomSplit(n_iter=3, test_size=0.2))

Finally we can call fit and perform the grid search. As discussed in In part 1, GridSearchCV isn't lazy due to some complications with the scikit-learn api, so this call to fit runs immediately.

search.fit(X_train, y_train)

What's happening here is:

Note that this is all run in parallel, distributed across the cluster. Thanks to the slick web interface, you can watch it compute. This takes a couple minutes, here's a gif of just the start:

Distributed Web UI

Once all the computation is done, we can see the best score and parameters by checking the best_score_ and best_params_ attributes (continuing to mirror the scikit-learn interface):

search.best_score_
0.83144082308054479
search.best_params_
{'alpha': 0.0001, 'loss': 'hinge'}

So we got ~83% accuracy on predicting if a flight was delayed based on these features. The best estimator (after refitting on all of the training data) is stored in the best_estimator_ attribute:

search.best_estimator_
SGDClassifier(alpha=0.0001, average=False, class_weight=None, epsilon=0.1, eta0=0.0, fit_intercept=True, l1_ratio=0.15, learning_rate='optimal', loss='hinge', n_iter=5, n_jobs=1, penalty='l2', power_t=0.5, random_state=None, shuffle=True, verbose=0, warm_start=False)

Scoring the model

To evaluate our model, we can check its performance on the testing data we held out originally. This illustrates an issue with the current design; both predict and many of the score functions are parallelizable, but since we don't know which one a given estimator uses in its score method, we can't dispatch to a parallel implementation easily. For now, we can call predict (which we can parallelize), and then compute the score directly using the accuracy_score function (the default score for classifiers).

from sklearn.metrics import accuracy_score

# Call `predict` in parallel on the testing data
y_pred = search.predict(X_test)

# Compute the actual and predicted targets
actual, predicted = dask.compute(y_test, y_pred)

# Score locally
accuracy_score(actual, predicted)
0.83146828311187182

So the overall accuracy score is ~83%, roughly the same as the best score from GridSearchCV above. This may sound good, but it's actually equivalent to the strategy of predicting that no flights are delayed (roughly 83% of all flights leave on time).

# Compute the fraction of flights that aren't delayed.
s = df.delayed.value_counts().compute()
s[0].astype('f8') / s.sum()
0.83151615151172298

I'm sure there is a better way to fit this data (I'm not a machine learning expert). However, the point of this blogpost was to show how to use dask to create these workflows, and I hope I was successful in that at least.

What worked well

What could be better

Help

I am not a machine learning expert. Is any of this useful? Do you have suggestions for improvements (or better yet PRs for improvements :))? Please feel free to reach out in the comments below, or on github.

This work is supported by Continuum Analytics and the XDATA program as part of the Blaze Project.