Key-Value Store

Every Skein application contains a key-value store running on the application master. This provides a way for containers to share runtime configuration parameters (e.g. service addresses), as well as coordinate state between containers.

Skein takes inspiration from other key-value stores (especially etcd), with simplifications and specializations for YARN as seen fit.

Overview

  • Keys are UTF-8 encoded strings (str in Python), with values as arbitrary raw bytes (bytes in Python).

  • The lifetime of a key or keys can be tied to the lifetime of a YARN container (referred to as the “owner” of the key). When the container completes, all keys owned by it will be deleted.

  • Operations work on single keys, ranges of keys, or keys starting with a specific prefix (e.g. all keys beginning with "foo").

  • Atomic bulk transactions are supported. Several operations can be applied in a single transaction, potentially based on a set of conditions.

  • Clients can subscribe to a range (or ranges) of keys for changes and react accordingly.

Walkthrough

To illustrate all the functionality provided by the key-value store, and how it might be useful, the remainder of this page is a complete runnable example. To start off, we’ll create a test application with a single container that sleeps forever.

>>> import skein
>>> spec = skein.ApplicationSpec.from_yaml("""
... name: example
... services:
...   sleeper:
...     resources:
...       memory: 128 MiB
...       vcores: 1
...     script: |
...       sleep infinity
... """)
>>> client = skein.Client()
>>> app = client.submit_and_connect(spec)

KeyValueStore Basics

The KeyValueStore class implements the standard MutableMapping interface. As mentioned above, all keys must be str, and all values must be bytes. All of these methods happen atomically in a single transaction, meaning there are no race conditions between one client checking state and another client modifying it.

To start off, let’s put some data on the key-value store. We’ll use the KeyValueStore.update() method to put multiple values in a single transaction:

# Originally the key-value store is empty
>>> len(app.kv)
0

# Add some data
>>> app.kv.update({'apples': b'1.22',
                   'apricots': b'8.99',
                   'bananas': b'0.56',
                   'oranges': b'7.96'})

# Now there are 4 items
>>> len(app.kv)
4

As with other mutable mappings, you can get, set, and delete values using the standard interface:

# Get the value for 'apples'
>>> app.kv['apples']
b'1.22'

# Add a new value for 'grapes'
>>> app.kv['grapes'] = b'2.88'

# Check if 'grapes' is present
>>> 'grapes' in app.kv
True

# Update the value for 'grapes'
>>> app.kv['grapes'] = b'2.61'

# Delete 'grapes'
>>> del app.kv['grapes']

Iteration

Since KeyValueStore supports the MutableMapping interface, things like iterating over the keys work as expected. The iteration order is alphabetical based on keys, as the mapping is sorted internally (more on that later).

>>> list(app.kv)
['apples', 'apricots', 'bananas', 'oranges']

However, sometimes using the standard iteration based protocols can unsafe or inefficient. Since the key-value store is a shared mapping with potentially many clients making changes concurrently, you need need to think about race-conditions when passing the key-value store to functions that might make use of iteration in a potentially unsafe way. For example, calling dict on the key-value store object works as expected, but is not the best way to get all key-value pairs for two reasons:

  • Interally, dict iterates over the keys in the provided mapping. It’s implemented something like:

    def dict(mapping):
        out = {}
        for k in mapping:
            out[k] = mapping[k]
        return out
    

    Since iterating over the keys and copying out their values happens over multiple operations, other clients may have changed the key-value store between operations, meaning the output dict may represent an inconsistent state.

  • Additionally, calling dict on the mapping results in multiple operations, reducing efficiency as it makes more calls than required.

For these reasons, it’s better to use methods that operate on Ranges and Prefixes instead:

>>> dict(app.kv)  # unsafe and inefficient
{'apples': b'1.22',
 'apricots': b'8.99',
 'bananas': b'0.56',
 'oranges': b'7.96'}

>>> app.kv.get_range()  # safe and efficient
OrderedDict([('apples', b'1.22'),
             ('apricots', b'8.99'),
             ('bananas', b'0.56'),
             ('oranges', b'7.96')])

Ranges and Prefixes

Key-value pairs are sorted internally, allowing for efficient operations on ranges of keys. Ranges are start inclusive and end exclusive. If start or end are unspecified, the range is unbounded on that side. Many methods have range versions, allowing for bulk operations in a single call. Operations on keys starting with a certain “prefix” are also supported. These are special cases of range operations (since get_prefix("abc") is equivalent to get_range(start="abc", end="abd")).

# Lookup all keys starting with "ap"
>>> app.kv.get_prefix("ap")
OrderedDict([('apples', b'1.22'),
             ('apricots', b'8.99')])

# Lookup all keys in ["apricots", "oranges")
>>> app.kv.get_range(start="apricots", end="oranges")
OrderedDict([('apricots', b'8.99'),
             ('bananas', b'0.56')])

# Pop all keys after "m"
>>> removed = app.kv.pop_range(start="m")
>>> removed
OrderedDict([('oranges', b'7.96')])

>>> len(app.kv)  # the key was removed
3

# Put it back
>>> app.kv.update(removed)
>>> len(app.kv)
4

Additional Methods

Besides the MutableMapping interface, the KeyValueStore class provides a larger set of methods for common operations. Each of these runs as a single atomic transaction, and most have variations for a single key, a range of keys, or a key prefix.

KeyValueStore.count([start, end, prefix])

Count keys in the key-value store.

KeyValueStore.list_keys([start, end, prefix])

Get a list of keys in the key-value store.

KeyValueStore.exists(key)

Check if a key exists in the key-value store.

KeyValueStore.missing(key)

Check if a key is not in the key-value store.

KeyValueStore.get(key[, default, return_owner])

Get the value associated with a single key.

KeyValueStore.get_prefix(prefix[, return_owner])

Get all key-value pairs whose keys start with prefix.

KeyValueStore.get_range([start, end, …])

Get a range of keys.

KeyValueStore.pop(key[, default, return_owner])

Remove a single key and return its corresponding value.

KeyValueStore.pop_prefix(prefix[, return_owner])

Remove all key-value pairs whose keys start with prefix, and return their corresponding values.

KeyValueStore.pop_range([start, end, …])

Remove a range of keys and return their corresponding values.

KeyValueStore.discard(key)

Discard a single key.

KeyValueStore.discard_prefix(prefix[, …])

Discard all key-value pairs whose keys start with prefix.

KeyValueStore.discard_range([start, end, …])

Discard a range of keys.

KeyValueStore.put(key[, value, owner])

Assign a value and/or owner for a single key.

KeyValueStore.swap(key[, value, owner, …])

Assign a new value and/or owner for a single key, and return the previous value.

Ownership

Skein’s key-value store provides a key ownership model. This allows the lifetime of a key or keys to be tied to the lifetime of a YARN container (referred to as the “owner” of the key). When the container finishes (whether after success, failure, or being killed by the user), any keys owned by that container are deleted. This can be useful for tracking container lifetimes, or implementing robust locks that are released when a container exits.

Owners are specified as skein container ids. These are different than their YARN counterparts, and are strings of the form {service-name}_{instance-number} (e.g. myservice_1). Container ids can be obtained in a few ways:

  • From a Container object:

    >>> [c.id for c in app.get_containers()]
    ['sleeper_0']
    
  • When running code on a container, the executing container id is available in the skein.properties object:

    # Example of code executing in a container
    >>> import skein
    >>> skein.properties.container_id
    'myservice_10'
    

To set or change the owner for a key, pass a container id to KeyVaueStore.put() or KeyValueStore.swap() via the owner keyword. Likewise the owner of a key or keys can be retrieved using the normal get methods by providing return_owner=True.

# First scale up the 'sleeper' service to 2 containers
>>> app.scale('sleeper', 2)
[Container<service_name='sleeper', instance=1, state=REQUESTED>]

# Create a new key 'grapes', and set the owner to 'sleeper_0'
>>> app.kv.put('grapes', b'2.88', owner='sleeper_0')

# Get the full value & owner record for 'grapes'
>>> app.kv.get('grapes', return_owner=True)
ValueOwnerPair(value=b'2.88', owner='sleeper_0')

# Change the value, owner stays the same
>>> app.kv.put('grapes', b'2.61')
>>> app.kv.get('grapes', return_owner=True)
ValueOwnerPair(value=b'2.61', owner='sleeper_0')

# Change the owner, value stays the same
>>> app.kv.put('grapes', owner='sleeper_1')
>>> app.kv.get('grapes', return_owner=True)
ValueOwnerPair(value=b'2.61', owner='sleeper_1')

# To clear the owner, set to None
>>> app.kv.put('grapes', owner=None)
>>> app.kv.get('grapes', return_owner=True)
ValueOwnerPair(value=b'2.61', owner=None)

When a container exits, all of its owned keys (if any) are deleted.

# Set the owner of grapes to 'sleeper_1'
>>> app.kv.put('grapes', owner='sleeper_1')

>>> 'grapes' in app.kv
True

# Kill the 'sleeper_1' container
>>> app.kill_container('sleeper_1')

>>> 'grapes' in app.kv
False

Transactions

Skein provides support for applying several operations in a single transaction, potentially based on a set of conditions. This is useful for preventing race conditions.

For example, say you want to get the value of a key if it exists, and if it doesn’t you want to set the value to a default and then return the default (this is the setdefault method from the MutableMapping interface).

A naive implementation would suffer from a couple race conditions:

def naive_setdefault(self, key, default):
    """A naive implementation of MutableMapping.setdefault"""
    if key in self:
        # Race condition 1: key could be deleted by a different client
        # between the contains check and the getitem
        return self[key]
    else:
        # Race condition 2: key could be created by a different client
        # between the contains check and the setitem
        self[key] = default
        return default

The KeyValueStore.transaction() method exists to solve this problem. This takes 3 parameters:

  • A sequence of Condition objects to evaluate together. The conditional expression succeeds if all conditions evaluate to True, and fails otherwise. If no conditions are provided the conditional expression also succeeds.

  • A sequence of Operation objects to apply if all conditions evaluate to True.

  • A sequence of Operation objects to apply if any condition evaluates to False.

Conditions

Condition objects are prechecks on the state of the key-value store. They can check for the existance or absence of a key, as well as comparisons (equality or order) on the value or owner corresponding with a key.

exists(key)

A request to check if a key exists in the key-value store.

missing(key)

A request to check if a key is not in the key-value store.

value(key)

Represents the value for a key, for use in transaction conditions.

owner(key)

Represents the owner for a key, for use in transaction conditions.

comparison(key, field, operator, rhs)

A comparison of the value or owner for a specified key.

A few examples

>>> from skein import kv

# A condition to check if 'grapes' exists
>>> kv.exists('grapes')
exists('grapes')

# A condition to check if the value of 'apples' is b'123'
>>> kv.value('apples') == b'123'
value('apples') == b'123'

# A condition to check if the value of 'apples' is greater than b'000'
>>> kv.value('apples') > b'000'
value('apples') > b'000'

# A condition to check if 'apples' has an owner
>>> kv.owner('apples') != None
owner('apples') != None

Operations

Operation objects are operations to apply to the key-value store. These coincide with the Additional Methods described above - for every method on the KeyValueStore there is an identical operation in the skein.kv namespace.

count([start, end, prefix])

A request to count keys in the key-value store.

list_keys([start, end, prefix])

A request to get a list of keys in the key-value store.

exists(key)

A request to check if a key exists in the key-value store.

missing(key)

A request to check if a key is not in the key-value store.

get(key[, default, return_owner])

A request to get the value associated with a single key.

get_prefix(prefix[, return_owner])

A request to get all key-value pairs whose keys start with prefix.

get_range([start, end, return_owner])

A request to get a range of keys.

pop(key[, default, return_owner])

A request to remove a single key and return its corresponding value.

pop_prefix(prefix[, return_owner])

A request to remove all key-value pairs whose keys start with prefix, and return their corresponding values.

pop_range([start, end, return_owner])

A request to remove a range of keys and return their corresponding values.

discard(key)

A request to discard a single key.

discard_prefix(prefix[, return_keys])

A request to discard all key-value pairs whose keys start with prefix.

discard_range([start, end, return_keys])

A request to discard a range of keys.

put(key[, value, owner])

A request to assign a value and/or owner for a single key.

swap(key[, value, owner, return_owner])

A request to assign a new value and/or owner for a single key, and return the previous value.

A few examples

>>> from skein import kv

# An operation to get the value of 'apples'
>>> kv.get('apples')
get('apples', default=None, return_owner=False)

# An operation to set the value of 'grapes' to b'123'
>>> kv.put('grapes', b'123')
put('grapes', value=b'123', owner=no_change)

Implementing setdefault

Using the above, we can do an atomic implementation of setdefault.

from skein import kv

def setdefault(self, key, default):
    """A safe, atomic implementation of MutableMapping.setdefault"""
    # If the key exists, get the value, otherwise set the value to default
    res = self.transaction(conditions=[kv.exists(key)],
                           on_success=[kv.get(key)],
                           on_failure=[kv.put(key, default)])
    if res.succeeded:
        # Condition succeeded, return result of get
        return res.results[0]
    else:
        # Condition failed, key didn't exist. Return default
        return default

Event Streams

Skein provides a way for clients to subscribe to a range (or ranges) of keys for changes. This can be useful for waiting for certain keys to be set, or creating larger abstractions for coordinating workers like locks or semaphores.

KeyValueStore.event_queue()

Create a new EventQueue subscribed to no events.

KeyValueStore.events([event_filter, key, …])

Shorthand for creating an EventQueue and adding a single filter.

EventQueue(kv)

A queue of events on the key-value store.

EventFilter([key, prefix, start, end, …])

An event filter.

Event(key, result, event_type, event_filter)

An event in the key-value store.

To subscribe to an event stream you can use either KeyValueStore.event_queue() and KeyValueStore.events() (the latter is shorthand for creation and subscription). Both methods return an instance of EventQueue(), an object similar to Queue from the standard library, with a few additional features:

  • EventQueue objects are iterable, allowing for easy looping of events:

    # Iterate through all events for keys starting with foobar
    for event in app.kv.events(prefix='foobar'):
        ...
    
  • EventQueue objects have additional methods EventQueue.subscribe() and EventQueue.unsubscribe(), for adding/removing subscriptions.

    eq = app.kv.event_queue()
    eq.subscribe(prefix='service_1')
    eq.subscribe(prefix='service_2')
    
    for event in eq:
        ...
    

Internally, all EventQueue objects are filled by the same worker thread fed by a single client connection with the server. This helps minimize the cost of creating additional event queues or subscribing to new event ranges. When an EventQueue object is collected, the event stream will be automatically unsubscribed. To manually handle unsubscription you can also use an event queue as a contextmanager, or call EventQueue.unsubscribe_all().

# All event filters will unsubscribe on exit from this block
with app.kv.event_queue() as eq:
    eq.subscribe(prefix='service_1')
    eq.subscribe(prefix='service_2')

    while True:
        event = eq.get()
        ...

eq.subscribe(start='a', end='b')
...
# explicitly unsubscribe from all event filters
eq.unsubscribe_all()

Iterating through the event queue or using methods like EventQueue.get() yield instances of Event - a namedtuple containing

  • The key affected

  • The result of the change. A ValueOwnerPair if a PUT event, or None if a DELETE event.

  • The event_type - either EventType.PUT or EventType.DELETE.

  • The event_filter that generated this event.

Note that if a queue subscribes to any intersecting filters, events that fall in the intersection will be placed in that queue once for every applicable filter.

# Create a new event queue
>>> eq = app.kv.event_queue()

# Create two filters that intersect
>>> starts_with_apple = eq.subscribe(prefix='apple')
>>> between_a_and_b = eq.subscribe(start='a', end='b')

# Create a new key starting with 'apple', triggering an event
>>> app.kv['applesauce'] = b'11.30'

# One event is put in the queue for each filter, since both filters hit the
# key 'applesauce'
>>> eq.get()
Event(key='applesauce',
      result=ValueOwnerPair(value=b'11.30', owner=None),
      event_type=EventType.PUT,
      event_filter=EventFilter(start='apple', end='applf', event_type=EventType.ALL))

>>> eq.get()
Event(key='applesauce',
      result=ValueOwnerPair(value=b'11.30', owner=None),
      event_type=EventType.PUT,
      event_filter=EventFilter(start='a', end='b', event_type=EventType.ALL))

Waiting for a Single Key

One common case is waiting for a single key to be set. This might contain a runtime configuration value that one container sets that others need, or it may signal that a service has started up to other containers that depend on it. The KeyValueStore.wait() method is provided for this common case. It will block until the key is set (usually by a different container).

# In one container, block until the key is set, returning the value
address = app.kv.wait('dask.scheduler.address')

# In another container set the key
app.kv['dask.scheduler.address'] = b'172.18.0.2:8787'

Example - Atomic Counter

Using the primitives above, we can create larger concurrency abstractions. Here we provide a simple recipe for an atomic shared counter.

import uuid
from contextlib import contextmanager

import skein

class Counter(object):
    """An atomic counter class.

    Parameters
    ----------
    kv : KeyValueStore
        The application key-value store.
    key : str, optional
        The key to use to store the counter state. If absent a new counter
        is created with a unique random key.

    Examples
    --------
    Create a new counter

    >>> c = Counter(app.kv)

    Increment the counter temporarily

    >>> with c.incrementing():
    ...     do_work()

    Block until the counter reaches 0
    >>> c.wait_until_zero()
    """
    def __init__(self, kv, key=None):
        key = key or uuid.uuid4().hex
        # Get the current value, or set it to 0 if new.
        value = kv.setdefault(key, int.to_bytes(0, 4, 'little'))

        if len(value) != 4:
            # The key already exists and isn't a valid counter
            raise ValueError("Key isn't a valid counter")

        self.kv = kv
        self.key = key

    def get(self):
        return int.from_bytes(self.kv[self.key], 'little')

    @contextmanager
    def incrementing(self):
        """Increment while inside a block"""
        try:
            self.increment()
            yield
        finally:
            self.decrement()

    def increment(self):
        """Increment and return the new value"""
        while True:
            # Try to increment atomically until success. Inefficient if high
            # increment/decrement rate, but fine for low bandwidth changes
            current = self.kv[self.key]
            new = (int.from_bytes(current, 'little') + 1)
            new_bytes = new.to_bytes(4, 'little')
            # Only set the value to the new value if the value hasn't
            # changed between when we got it above and now
            succeeded, _ = self.kv.transaction(
                conditions=[skein.kv.value(self.key) == current],
                on_success=[skein.kv.put(self.key, new_bytes)])
            if succeeded:
                return new

    def decrement(self):
        """Decrement and return the new value"""
        while True:
            # Try to decrement atomically until success. Inefficient if high
            # increment/decrement rate, but fine for low bandwidth changes
            current = self.kv[self.key]
            new = (int.from_bytes(current, 'little') - 1)
            new_bytes = new.to_bytes(4, 'little')
            # Try to decrement atomically until success. Inefficient if high
            # increment/decrement rate, but fine for low bandwidth changes
            succeeded, _ = self.kv.transaction(
                conditions=[skein.kv.value(self.key) == current],
                on_success=[skein.kv.put(self.key, new_bytes)])
            if succeeded:
                return new

    def wait_for_zero(self):
        """Block until the counter equals 0"""
        # Watch the event stream for changes, waiting for the value to
        # decrement to 0
        for event in self.kv.events(key=self.key, event_type='PUT'):
            if int.from_bytes(event.result.value, 'little') == 0:
                return