API Docs


class skein.Client(address=None, security=None, keytab=None, principal=None, log=None, log_level=None, java_options=None)

Connect to and schedule applications on the YARN cluster.

addressstr, optional

The address for the driver. By default will create a new driver process. Pass in address explicitly to connect to a different driver. To connect to the global driver see Client.from_global_driver.

securitySecurity, optional

The security configuration to use to communicate with the driver. Defaults to the global configuration.

keytabstr, optional

Path to a keytab file to use when starting the driver. If not provided, the driver will login using the ticket cache instead.

principalstr, optional

The principal to use when starting the driver with a keytab.

logstr, bool, or None, optional

When starting a new driver, sets the logging behavior for the driver. Values may be a path for logs to be written to, None to log to stdout/stderr, or False to turn off logging completely. Default is None.

log_levelstr or skein.model.LogLevel, optional

The driver log level. Sets the skein.log.level system property. One of {‘ALL’, ‘TRACE’, ‘DEBUG’, ‘INFO’, ‘WARN’, ‘ERROR’, ‘FATAL’, ‘OFF’} (from most to least verbose). Default is ‘INFO’.

java_optionsstr or list of str, optional

Additional Java options to forward to the driver. Can also be configured by setting the environment variable SKEIN_DRIVER_JAVA_OPTIONS.


>>> with skein.Client() as client:
...     app_id = client.submit('spec.yaml')
application_logs(app_id, user='')

Get logs from a completed skein application.


The id of the application.

userstr, optional

The user to get the application logs as. Requires the current user to have permissions to proxy as user. Default is the current user.


A mapping of yarn_container_id to logs for each container.


>>> client.application_logs('application_1526134340424_0012')

Get a report on the status of a skein application.


The id of the application.



>>> client.application_report('application_1526134340424_0012')

Release all resources used by this client. Idempotent.

Note that this closes the Java driver if it was started by this client.

connect(app_id, wait=True, security=None)

Connect to a running application.


The id of the application.

waitbool, optional

If true [default], blocks until the application starts. If False, will raise a ApplicationNotRunningError immediately if the application isn’t running.

securitySecurity, optional

The security configuration to use to communicate with the application master. Defaults to the global configuration.


If the application isn’t running.

classmethod from_global_driver()

Connect to the global driver.


Get information about all queues in the cluster.

queueslist of Queue


>>> client.get_all_queues()
[Queue<name='default', percent_used=0.00>,
 Queue<name='myqueue', percent_used=5.00>,
 Queue<name='child1', percent_used=10.00>,
 Queue<name='child2', percent_used=0.00>]
get_applications(states=None, name=None, user=None, queue=None, started_begin=None, started_end=None, finished_begin=None, finished_end=None)

Get the status of current skein applications.

statessequence of ApplicationState, optional

If provided, applications will be filtered to these application states. Default is ['SUBMITTED', 'ACCEPTED', 'RUNNING'].

namestr, optional

Only select applications with this name.

userstr, optional

Only select applications with this user.

queuestr, optional

Only select applications in this queue.

started_begindatetime or str, optional

Only select applications that started after this time (inclusive). Can be either a datetime or a string representation of one. String representations can use any of the following formats:

  • YYYY-M-D H:M:S (e.g. 2019-4-10 14:50:20)

  • YYYY-M-D H:M (e.g. 2019-4-10 14:50)

  • YYYY-M-D (e.g. 2019-4-10)

  • H:M:S (e.g. 14:50:20, today is used for date)

  • H:M (e.g. 14:50, today is used for date)

started_enddatetime or str, optional

Only select applications that started before this time (inclusive). Can be either a datetime or a string representation of one.

finished_begindatetime or str, optional

Only select applications that finished after this time (inclusive). Can be either a datetime or a string representation of one.

finished_enddatetime or str, optional

Only select applications that finished before this time (inclusive). Can be either a datetime or a string representation of one.

reportslist of ApplicationReport


Get all the finished and failed applications

>>> client.get_applications(states=['FINISHED', 'FAILED'])

Get all applications named ‘demo’ started after 2019-4-10:

>>> client.get_applications(name='demo', started_begin='2019-4-10')

Get information about all children of a parent queue.


The parent queue name.

queueslist of Queue


>>> client.get_child_queues('myqueue')
[Queue<name='child1', percent_used=10.00>,
 Queue<name='child2', percent_used=0.00>]

Get the status of nodes in the cluster.

statessequence of NodeState, optional

If provided, nodes will be filtered to these node states. Default is all states.

reportslist of NodeReport


Get all the running nodes

>>> client.get_nodes(states=['RUNNING'])

Get information about a queue.


The queue name.



>>> client.get_queue('myqueue')
Queue<name='myqueue', percent_used=5.00>
kill_application(app_id, user='')

Kill an application.


The id of the application to kill.

userstr, optional

The user to kill the application as. Requires the current user to have permissions to proxy as user. Default is the current user.

move_application(app_id, queue)

Move an application to a different queue.


The id of the application to move.


The queue to move the application to.

static start_global_driver(keytab=None, principal=None, log=None, log_level=None, java_options=None)

Start the global driver.

No-op if the global driver is already running.

keytabstr, optional

Path to a keytab file to use when starting the driver. If not provided, the driver will login using the ticket cache instead.

principalstr, optional

The principal to use when starting the driver with a keytab.

logstr, bool, or None, optional

Sets the logging behavior for the driver. Values may be a path for logs to be written to, None to log to stdout/stderr, or False to turn off logging completely. Default is None.

log_levelstr or skein.model.LogLevel, optional

The driver log level. Sets the skein.log.level system property. One of {‘ALL’, ‘TRACE’, ‘DEBUG’, ‘INFO’, ‘WARN’, ‘ERROR’, ‘FATAL’, ‘OFF’} (from most to least verbose). Default is ‘INFO’.

java_optionsstr or list of str, optional

Additional Java options to forward to the driver. Can also be configured by setting the environment variable SKEIN_DRIVER_JAVA_OPTIONS.


The address of the driver

static stop_global_driver(force=False)

Stops the global driver if running.

No-op if no global driver is running.

forcebool, optional

By default skein will check that the process associated with the driver PID is actually a skein driver. Setting force to True will kill the process in all cases.


Submit a new skein application.

specApplicationSpec, str, or dict

A description of the application to run. Can be an ApplicationSpec object, a path to a yaml/json file, or a dictionary description of an application specification.


The id of the submitted application.


Submit a new skein application, and wait to connect to it.

If an error occurs before the application connects, the application is killed.

specApplicationSpec, str, or dict

A description of the application to run. Can be an ApplicationSpec object, a path to a yaml/json file, or a dictionary description of an application specification.


Application Client

class skein.ApplicationClient(address, app_id, security=None)

A client for the application master.

Used to interact with a running application.


The address of the application master.


The application id

securitySecurity, optional

The security configuration to use to communicate with the application master. Defaults to the global configuration.

add_container(service, env=None)

Add a new container to a service.

Unlike scale, this adds the ability to override configuration of the requested container.


The service to scale.

envdict, optional

A mapping of environment variables to set in the container. These will be applied after any environment variables set in the service description, and can be used as overrides.


The new container that was started.


Release all resources used by this client. Idempotent.

Note that this only frees resources used by the client, it doesn’t shutdown the application itself. For that see shutdown.

classmethod from_current()

Create an application client from within a running container.

Useful for connecting to the application master from a running container in a application.

get_containers(services=None, states=None)

Get information on containers in this application.

servicessequence of str, optional

If provided, containers will be filtered to these services. Default is all services.

statessequence of ContainerState, optional

If provided, containers will be filtered by these container states. Default is ['WAITING', 'REQUESTED', 'RUNNING'].

containerslist of Container

Get the specification for the running application.


Kill a container.


The id of the container to kill.


The Skein Key-Value store.

Used by applications to coordinate configuration and global state.

This implements the standard MutableMapping interface, along with the ability to “wait” for keys to be set. Keys are strings, with values as bytes.


>>> app_client.kv['foo'] = b'bar'
>>> app_client.kv['foo']
>>> del app_client.kv['foo']
>>> 'foo' in app_client.kv

Wait until the key is set, either by another service or by a user client. This is useful for inter-service synchronization.

>>> app_client.kv.wait('mykey')
scale(service, count=None, delta=None, **kwargs)

Scale a service to a requested number of instances.

Adds or removes containers to match the requested number of instances. The number of instances for the service can be specified either as a total count or a delta in that count.

When choosing which containers to remove, containers are removed in order of state (WAITING, REQUESTED, RUNNING) followed by age (oldest to newest).

When specified as a negative delta, if the number of removed containers is greater than the number of existing containers, all containers are removed rather than throwing an error. This means that app.scale(delta=-1) will remove a container if one exists, otherwise it will do nothing.


The service to scale.

countint, optional

The number of instances to scale to.

deltaint, optional

The change in number of instances.

containerslist of Container

A list of containers that were started or stopped.


Update the progress for this application.

For applications processing a fixed set of work it may be useful for diagnostics to set the progress as the application processes.

Progress indicates job progression, and must be a float between 0 and 1. By default the progress is set at 0.1 for its duration, which is a good default value for applications that don’t know their progress, (e.g. interactive applications).


The application progress, must be a value between 0 and 1.

shutdown(status='SUCCEEDED', diagnostics=None)

Shutdown the application.

Stop all running containers and shutdown the application.

statusFinalStatus, optional

The final application status. Default is ‘SUCCEEDED’.

diagnosticsstr, optional

The application exit message, usually used for diagnosing failures. Can be seen in the YARN Web UI for completed applications under “diagnostics”, as well as the diagnostic field of ApplicationReport objects. If not provided, a default will be used.


The Skein Web UI.

Used by applications to register additional web pages, and to get the addresses of these pages.

Runtime Properties

skein.properties = <skein.core.Properties object>

Skein runtime properties.

This class implements an immutable mapping type, exposing properties determined at import time.

application_idstr or None

The current application id. None if not running in a container.

appmaster_addressstr or None

The address of the current application’s appmaster. None if not running in a container.


The path to the configuration directory.

container_idstr or None

The current skein container id (of the form '{service}_{instance}'). None if not running in a container.

container_resourcesResources or None

The resources allocated to the current container. None if not in a container.

container_dirstr or None

The absolute path to the working directory for this container. None if not in a container.

yarn_container_idstr or None

The current YARN container id. None if not running in a container.

Key Value Store

class skein.kv.KeyValueStore(client)

The Skein Key-Value store.

Used by applications to coordinate configuration and global state.

clear() → None. Remove all items from D.
count(start=None, end=None, prefix=None)

Count keys in the key-value store.

startstr, optional

The lower bound of the key range, inclusive. If not provided no lower bound will be used.

endstr, optional

The upper bound of the key range, exclusive. If not provided, no upper bound will be used.

prefixstr, optional

If provided, will count the number keys matching this prefix.


Discard a single key.

Returns true if the key was present, false otherwise.


The key to discard.

discard_prefix(prefix, return_keys=False)

Discard all key-value pairs whose keys start with prefix.

Returns either the number of keys discarded or a list of those keys, depending on the value of return_keys.


The key prefix.

return_keysbool, optional

If True, the discarded keys will be returned instead of their count. Default is False.

int or list of keys
discard_range(start=None, end=None, return_keys=False)

Discard a range of keys.

Returns either the number of keys discarded or a list of those keys, depending on the value of return_keys.

startstr, optional

The lower bound of the key range, inclusive. If not provided no lower bound will be used.

endstr, optional

The upper bound of the key range, exclusive. If not provided, no upper bound will be used.

return_keysbool, optional

If True, the discarded keys will be returned instead of their count. Default is False.

int or list of keys

Create a new EventQueue subscribed to no events.


Subscribe to events starting with 'foo' or 'bar'.

>>> foo = skein.kv.EventFilter(prefix='foo')
>>> bar = skein.kv.EventFilter(prefix='bar')
>>> queue = app.kv.event_queue()              
>>> queue.subscribe(foo)                      
>>> queue.subscribe(bar)                      
>>> for event in queue:                       
...     if event.filter == foo:
...         print("foo event")
...     else:
...         print("bar event")
events(event_filter=None, key=None, prefix=None, start=None, end=None, event_type=None)

Shorthand for creating an EventQueue and adding a single filter.

May provide either an explicit event filter, or provide arguments to create a new one and add it to the queue.

If no arguments are provided, creates a queue subscribed to all events.


An explicit EventFilter. If provided, no other keyword arguments may be provided.

keystr, optional

If present, only events from this key will be selected.

prefixstr, optional

If present, only events with this key prefix will be selected.

startstr, optional

If present, specifies the lower bound of the key range, inclusive.

endstr, optional

If present, specifies the upper bound of the key range, exclusive.

event_typeEventType, optional.

The type of event. Default is 'ALL'.



Subscribe to all events with prefix 'foo':

>>> for event in app.kv.events(prefix='foo'):  
...     if event.type == 'PUT':
...         print("PUT<key=%r, value=%r>" % (event.key, event.value))
...     else:  # DELETE
...         print("DELETE<key=%r>" % event.key)
PUT<key='foo', value=b'bar'>
PUT<key='food', value=b'biz'>
PUT<key='foo', value=b'changed'>

Check if a key exists in the key-value store.


The key to check the presence of.

get(key, default=None, return_owner=False)

Get the value associated with a single key.


The key to get.

defaultbytes or None, optional

Default value to return if the key is not present.

return_ownerbool, optional

If True, the owner will also be returned along with the value. Default is False.

bytes or ValueOwnerPair
get_prefix(prefix, return_owner=False)

Get all key-value pairs whose keys start with prefix.


The key prefix.

return_ownerbool, optional

If True, the owner will also be returned along with the value. Default is False.

get_range(start=None, end=None, return_owner=False)

Get a range of keys.

startstr, optional

The lower bound of the key range, inclusive. If not provided no lower bound will be used.

endstr, optional

The upper bound of the key range, exclusive. If not provided, no upper bound will be used.

return_ownerbool, optional

If True, the owner will also be returned along with the value. Default is False.

items() → a set-like object providing a view on D’s items
keys() → a set-like object providing a view on D’s keys
list_keys(start=None, end=None, prefix=None)

Get a list of keys in the key-value store.

startstr, optional

The lower bound of the key range, inclusive. If not provided no lower bound will be used.

endstr, optional

The upper bound of the key range, exclusive. If not provided, no upper bound will be used.

prefixstr, optional

If provided, will return all keys matching this prefix.

list of keys

Check if a key is not in the key-value store.

This is the inverse of exists.


The key to check the absence of.

pop(key, default=None, return_owner=False)

Remove a single key and return its corresponding value.


The key to pop.

defaultbytes or None, optional

Default value to return if the key is not present.

return_ownerbool, optional

If True, the owner will also be returned along with the value. Default is False.

bytes or ValueOwnerPair
pop_prefix(prefix, return_owner=False)

Remove all key-value pairs whose keys start with prefix, and return their corresponding values.


The key prefix.

return_ownerbool, optional

If True, the owner will also be returned along with the value. Default is False.

pop_range(start=None, end=None, return_owner=False)

Remove a range of keys and return their corresponding values.

startstr, optional

The lower bound of the key range, inclusive. If not provided no lower bound will be used.

endstr, optional

The upper bound of the key range, exclusive. If not provided, no upper bound will be used.

return_ownerbool, optional

If True, the owner will also be returned along with the value. Default is False.

popitem() → (k, v), remove and return some (key, value) pair

as a 2-tuple; but raise KeyError if D is empty.

put(key, value=no_change, owner=no_change)

Assign a value and/or owner for a single key.


The key to put.

valuebytes, optional

The value to put. Default is to leave value unchanged; an error will be raised if the key doesn’t exist.

ownerstr or None, optional

The container id to claim ownership. Provide None to set to no owner. Default is to leave value unchanged.

setdefault(key, default)

Get the value associated with key, setting it to default if not present.

This transaction happens atomically on the key-value store.


The key


The default value to set if the key isn’t present.

swap(key, value=no_change, owner=no_change, return_owner=False)

Assign a new value and/or owner for a single key, and return the previous value.


The key to put.

valuebytes, optional

The value to put. Default is to leave value unchanged; an error will be raised if the key doesn’t exist.

ownerstr or None, optional

The container id to claim ownership. Provide None to set to no owner. Default is to leave value unchanged.

return_ownerbool, optional

If True, the owner will also be returned along with the value. Default is False.

bytes or ValueOwnerPair
transaction(conditions=None, on_success=None, on_failure=None)

An atomic transaction on the key-value store.

conditionsCondition or sequence of Conditions, optional

A sequence of conditions to evaluate together. The conditional expression succeeds if all conditions evaluate to True, and fails otherwise. If no conditions are provided the conditional expression also succeeds.

on_successOperation or sequence of Operation, optional

A sequence of operations to apply if all conditions evaluate to True.

on_failureOperation or sequence of Operation, optional

A sequence of operations to apply if any condition evaluates to False.


A namedtuple of (succeeded, results), where results is a list of results from either the on_success or on_failure operations, depending on which branch was evaluated.


This implements an atomic compare-and-swap operation, a useful concurrency primitive. It sets key to new only if it currently is prev:

>>> from skein import kv
>>> def compare_and_swap(app, key, new, prev):
...     result = app.kv.transaction(
...         conditions=[kv.value(key) == prev],  # if key == prev
...         on_success=[kv.put(key, new)])       # then set key = new
...     return result.succeeded
>>> app.kv['key'] = b'value'  

Since 'key' currently is b'value', the conditional expression succeeds and 'key' is set to b'new_value'

>>> compare_and_swap(app, 'key', b'new_value', b'value')  

Since 'key' currently is b'value' and not b'wrong', the conditional expression fails and 'key' remains unchanged.

>>> compare_and_swap(app, 'key', b'another_value', b'wrong')  
update(*args, **kwargs)

Update the key-value store with multiple key-value pairs atomically.

argmapping or iterable, optional

Either a mapping or an iterable of (key, value).


Extra key-value pairs to set. Semantically these are applied after any present in arg, and will thus override any intersecting keys between the two.

values() → an object providing a view on D’s values
wait(key, return_owner=False)

Get the value associated with a single key, blocking until the key exists if not present.


The key to get.

return_ownerbool, optional

If True, the owner will also be returned along with the value. Default is False.

bytes or ValueOwnerPair
class skein.kv.ValueOwnerPair(value, owner)

A (value, owner) pair in the key-value store.


The value.

ownerstr or None

The owner container_id, or None for no owner.

class skein.kv.count(start=None, end=None, prefix=None)

A request to count keys in the key-value store.

startstr, optional

The lower bound of the key range, inclusive. If not provided no lower bound will be used.

endstr, optional

The upper bound of the key range, exclusive. If not provided, no upper bound will be used.

prefixstr, optional

If provided, will count the number keys matching this prefix.

class skein.kv.count(start=None, end=None, prefix=None)

A request to count keys in the key-value store.

startstr, optional

The lower bound of the key range, inclusive. If not provided no lower bound will be used.

endstr, optional

The upper bound of the key range, exclusive. If not provided, no upper bound will be used.

prefixstr, optional

If provided, will count the number keys matching this prefix.

class skein.kv.list_keys(start=None, end=None, prefix=None)

A request to get a list of keys in the key-value store.

startstr, optional

The lower bound of the key range, inclusive. If not provided no lower bound will be used.

endstr, optional

The upper bound of the key range, exclusive. If not provided, no upper bound will be used.

prefixstr, optional

If provided, will return all keys matching this prefix.

class skein.kv.exists(key)

A request to check if a key exists in the key-value store.


The key to check the presence of.

class skein.kv.missing(key)

A request to check if a key is not in the key-value store.

This is the inverse of exists.


The key to check the absence of.

class skein.kv.get(key, default=None, return_owner=False)

A request to get the value associated with a single key.


The key to get.

defaultbytes or None, optional

Default value to return if the key is not present.

return_ownerbool, optional

If True, the owner will also be returned along with the value. Default is False.

class skein.kv.get_prefix(prefix, return_owner=False)

A request to get all key-value pairs whose keys start with prefix.


The key prefix.

return_ownerbool, optional

If True, the owner will also be returned along with the value. Default is False.

class skein.kv.get_range(start=None, end=None, return_owner=False)

A request to get a range of keys.

startstr, optional

The lower bound of the key range, inclusive. If not provided no lower bound will be used.

endstr, optional

The upper bound of the key range, exclusive. If not provided, no upper bound will be used.

return_ownerbool, optional

If True, the owner will also be returned along with the value. Default is False.

class skein.kv.pop(key, default=None, return_owner=False)

A request to remove a single key and return its corresponding value.


The key to pop.

defaultbytes or None, optional

Default value to return if the key is not present.

return_ownerbool, optional

If True, the owner will also be returned along with the value. Default is False.

class skein.kv.pop_prefix(prefix, return_owner=False)

A request to remove all key-value pairs whose keys start with prefix, and return their corresponding values.


The key prefix.

return_ownerbool, optional

If True, the owner will also be returned along with the value. Default is False.

class skein.kv.pop_range(start=None, end=None, return_owner=False)

A request to remove a range of keys and return their corresponding values.

startstr, optional

The lower bound of the key range, inclusive. If not provided no lower bound will be used.

endstr, optional

The upper bound of the key range, exclusive. If not provided, no upper bound will be used.

return_ownerbool, optional

If True, the owner will also be returned along with the value. Default is False.

class skein.kv.discard(key)

A request to discard a single key.

Returns true if the key was present, false otherwise.


The key to discard.

class skein.kv.discard_prefix(prefix, return_keys=False)

A request to discard all key-value pairs whose keys start with prefix.

Returns either the number of keys discarded or a list of those keys, depending on the value of return_keys.


The key prefix.

return_keysbool, optional

If True, the discarded keys will be returned instead of their count. Default is False.

class skein.kv.discard_range(start=None, end=None, return_keys=False)

A request to discard a range of keys.

Returns either the number of keys discarded or a list of those keys, depending on the value of return_keys.

startstr, optional

The lower bound of the key range, inclusive. If not provided no lower bound will be used.

endstr, optional

The upper bound of the key range, exclusive. If not provided, no upper bound will be used.

return_keysbool, optional

If True, the discarded keys will be returned instead of their count. Default is False.

class skein.kv.put(key, value=no_change, owner=no_change)

A request to assign a value and/or owner for a single key.


The key to put.

valuebytes, optional

The value to put. Default is to leave value unchanged; an error will be raised if the key doesn’t exist.

ownerstr or None, optional

The container id to claim ownership. Provide None to set to no owner. Default is to leave value unchanged.

class skein.kv.swap(key, value=no_change, owner=no_change, return_owner=False)

A request to assign a new value and/or owner for a single key, and return the previous value.


The key to put.

valuebytes, optional

The value to put. Default is to leave value unchanged; an error will be raised if the key doesn’t exist.

ownerstr or None, optional

The container id to claim ownership. Provide None to set to no owner. Default is to leave value unchanged.

return_ownerbool, optional

If True, the owner will also be returned along with the value. Default is False.

class skein.kv.TransactionResult(succeeded, results)

A result from a key-value store transaction.


Whether the transaction conditions evaluated to True.


A sequence of results from applying all operations in the transaction on_success or on_failure parameters, depending on whether the conditions evaluated to True or False.

class skein.kv.value(key)

Represents the value for a key, for use in transaction conditions.


The key to lookup

class skein.kv.owner(key)

Represents the owner for a key, for use in transaction conditions.


The key to lookup

class skein.kv.comparison(key, field, operator, rhs)

A comparison of the value or owner for a specified key.


The corresponding key.

field{‘value’, ‘owner’}

The field to compare on.

operator{‘==’, ‘!=’, ‘>’, ‘>=’, ‘<’, ‘<=’}

The comparison operator to use.

rhsbytes, str or None

The right-hand-side of the condition expression. Must be a bytes if field='value', or str or None if field='owner'.


Return if x is a valid skein key-value store condition


Return if obj is a valid skein key-value store operation

class skein.kv.EventType(x)

Event types to listen on.


All events.


Only PUT events.


Only DELETE events.

class skein.kv.Event(key, result, event_type, event_filter)

An event in the key-value store.


The key affected.

resultValueOwnerPair or None

The value and owner for the key. None if a 'DELETE' event.


The type of event.


The event filter that generated the event.

class skein.kv.EventFilter(key=None, prefix=None, start=None, end=None, event_type=None)

An event filter.

Specifies a subset of events to watch for. May specify one of key, prefix, or start/end. If no parameters are provided, selects all events.

keystr, optional

If present, only events from this key will be selected.

prefixstr, optional

If present, only events with this key prefix will be selected.

startstr, optional

If present, specifies the lower bound of the key range, inclusive.

endstr, optional

If present, specifies the upper bound of the key range, exclusive.

event_typeEventType, optional.

The type of event. Default is 'ALL'

class skein.kv.EventQueue(kv)

A queue of events on the key-value store.

Besides the normal Queue interface, also supports iteration.

>>> for event in app.kv.events(prefix='bar'):
...     print(event)

If an event falls into multiple selected filters, it will be placed in the event queue once for each filter. For example, prefix='bar' and key='bart' would both recieve events on key='bart'. If a queue was subscribed to both events, changes to this key would be placed in the queue twice, once for each filter.

All events are unsubscribed when this object is collected. Can also be used as a contextmanager to unsubscribe-all on __exit__, or explicitly call unsubscribe_all.

get(block=True, timeout=None)

Remove and return an item from the queue.

If optional args ‘block’ is true and ‘timeout’ is None (the default), block if necessary until an item is available. If ‘timeout’ is a non-negative number, it blocks at most ‘timeout’ seconds and raises the Empty exception if no item was available within that time. Otherwise (‘block’ is false), return an item if one is immediately available, else raise the Empty exception (‘timeout’ is ignored in that case).

put(item, block=True, timeout=None)

Put an item into the queue.

If optional args ‘block’ is true and ‘timeout’ is None (the default), block if necessary until a free slot is available. If ‘timeout’ is a non-negative number, it blocks at most ‘timeout’ seconds and raises the Full exception if no free slot was available within that time. Otherwise (‘block’ is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (‘timeout’ is ignored in that case).

subscribe(event_filter=None, key=None, prefix=None, start=None, end=None, event_type=None)

Subscribe to an event filter.

May provide either an explicit event filter, or provide arguments to create a new one and add it to the queue. In either case, the event filter is returned.

If no arguments are provided, subscribes to all events.


An explicit EventFilter. If provided, no other keyword arguments may be provided.

keystr, optional

If present, only events from this key will be selected.

prefixstr, optional

If present, only events with this key prefix will be selected.

startstr, optional

If present, specifies the lower bound of the key range, inclusive.

endstr, optional

If present, specifies the upper bound of the key range, exclusive.

event_typeEventType, optional.

The type of event. Default is 'ALL'.

unsubscribe(event_filter=None, key=None, prefix=None, start=None, end=None, event_type=None)

Unsubscribe from an event filter.

May provide either an explicit event filter, or provide arguments to create a new one and add it to the queue.

If no arguments are provided, unsubscribes from a filter of all events.

A ValueError is raised if the specified filter isn’t currently subscribed to.


An explicit EventFilter. If provided, no other keyword arguments may be provided.

keystr, optional

If present, only events from this key will be selected.

prefixstr, optional

If present, only events with this key prefix will be selected.

startstr, optional

If present, specifies the lower bound of the key range, inclusive.

endstr, optional

If present, specifies the upper bound of the key range, exclusive.

event_typeEventType, optional.

The type of event. Default is 'ALL'.


Unsubscribe from all event filters

Web UI

class skein.ui.WebUI(client)

The Skein WebUI.

property proxy_prefix

The path between the Web UI address and the proxied pages.

property addresses

All known addresses of the Web UI.

In most cases this will be a list of length 1. If YARN is running in high availability mode, there may be several available addresses. Use the address attribute to choose one at random.

property address

The address of the Web UI.

If YARN is running in high availability mode, a single address is chosen at random on first access.

add_page(route, target, link_name=None)

Add a new proxied page to the Web UI.


The route for the proxied page. Must be a valid path segment in a url (e.g. foo in /foo/bar/baz). Routes must be unique across the application.


The target address to be proxied to this page. Must be a valid url.

link_namestr, optional

If provided, will be the link text used in the Web UI. If not provided, the page will still be proxied, but no link will be added to the Web UI. Link names must be unique across the application.


Remove a proxied page from the Web UI.


The route for the proxied page. Must be a valid path segment in a url (e.g. foo in /foo/bar/baz). Routes must be unique across the application.


Get all registered pages.


A dict of route to ProxiedPage for all pages.

class skein.ui.ProxiedPage(route, target, link_name, _ui_address, _proxy_prefix)

A page proxied by the Skein Web UI.


The route used in the address to the proxied page.


The target address of the proxy.

link_namestring or None

The name of the linked page in the Web UI. If None, the page is still proxied but isn’t linked to in the Web UI.


The full proxied address to this page

property address

The full proxied address to this page

Application Specification

class skein.ApplicationSpec(services=None, master=None, name='skein', queue='default', user='', node_label='', tags=None, file_systems=None, acls=None, max_attempts=1)

A complete description of an application.

servicesdict, optional

A mapping of service-name to services. Applications must either specify at least one service, or a script for the application master to run (see skein.Master for more information).

masterMaster, optional

Additional configuration for the application master service. See skein.Master for more information.

namestr, optional

The name of the application, defaults to ‘skein’.

queuestr, optional

The queue to submit to. Defaults to the default queue.

userstr, optional

The user name to submit the application as. Requires that the submitting user have permission to proxy as this user name. Default is the submitter’s user name.

node_labelstr, optional

The node label expression to use when requesting containers for this application. Services can override this setting by specifying node_label on the service directly. Default is no label.

tagsset, optional

A set of strings to use as tags for this application.

file_systemslist, optional

A list of Hadoop file systems to acquire delegation tokens for. A token is always acquired for the defaultFS.

aclsACLs, optional

Allows restricting users/groups to subsets of application access. See skein.ACLs for more information.

max_attemptsint, optional

The maximum number of submission attempts before marking the application as failed. Note that this only considers failures of the application master during startup. Default is 1.

classmethod from_dict(obj, **kwargs)

Create an instance from a dict.

Keys in the dict should match parameter names

classmethod from_file(path, format='infer')

Create an instance from a json or yaml file.


The path to the file to load.

format{‘infer’, ‘json’, ‘yaml’}, optional

The file format. By default the format is inferred from the file extension.

classmethod from_json(b)

Create an instance from a json string.

Keys in the json object should match parameter names

classmethod from_protobuf(obj)

Create an instance from a protobuf message.

classmethod from_yaml(b)

Create an instance from a yaml string.


Convert object to a dict

to_file(path, format='infer', skip_nulls=True)

Write object to a file.


The path to the file to load.

format{‘infer’, ‘json’, ‘yaml’}, optional

The file format. By default the format is inferred from the file extension.

skip_nullsbool, optional

By default null values are skipped in the output. Set to True to output all fields.


Convert object to a json string


Convert object to a protobuf message


Convert object to a yaml string

class skein.LogLevel(x)

Enum of log levels.

Corresponds with log4j logging levels.


Turns on all logging.


The finest level of events.


Fine-grained informational events that are most useful to debug an application.


Informational messages that highlight the progress of the application at a coarse-grained level. The default LogLevel.


Potentially harmful situations that still allow the application to continue running.


Error events that might still allow the application to continue running.


Severe error events that will lead the application to abort.


Turns off all logging.

classmethod values()

The constants of this enum type, in the order they are declared.

class skein.Master(resources=None, script='', files=None, env=None, log_level=LogLevel.INFO, log_config=None, security=None)

Configuration for the Application Master.

resourcesResources, optional

Describes the resources needed to run the application master. Default is 512 MiB, 1 virtual core.

scriptstr, optional

An optional bash script to run after starting the application master. If provided, the application will terminate once the script has completed.

filesdict, optional

Describes any additional files needed to run the application master. A mapping of destination relative paths to File or str objects describing the sources for these paths. If a str, the file type is inferred from the extension.

envdict, optional

A mapping of environment variables to set on the application master.

log_levelstr or LogLevel, optional

The application master log level. Sets the skein.log.level system property. One of {‘ALL’, ‘TRACE’, ‘DEBUG’, ‘INFO’, ‘WARN’, ‘ERROR’, ‘FATAL’, ‘OFF’} (from most to least verbose). Default is ‘INFO’.

log_configstr or File, optional

A custom log4j.properties file to use for the application master. If not provided, the default logging configuration will be used.

securitySecurity, optional

The security credentials to use for the application master. If not provided, these will be the same as those used by the submitting client.

classmethod from_dict(obj, **kwargs)

Create an instance from a dict.

Keys in the dict should match parameter names

classmethod from_json(b)

Create an instance from a json string.

Keys in the json object should match parameter names

classmethod from_protobuf(obj)

Create an instance from a protobuf message.

classmethod from_yaml(b)

Create an instance from a yaml string.


Convert object to a dict


Convert object to a json string


Convert object to a protobuf message


Convert object to a yaml string

class skein.Service(resources=required, script=required, instances=1, files=None, env=None, depends=None, max_restarts=0, allow_failures=False, node_label='', nodes=None, racks=None, relax_locality=False)

Description of a Skein service.


Describes the resources needed to run the service.


A bash script to run the service.

instancesint, optional

The number of instances to create on startup. Default is 1.

filesdict, optional

Describes any files needed to run the service. A mapping of destination relative paths to File or str objects describing the sources for these paths. If a str, the file type is inferred from the extension.

envdict, optional

A mapping of environment variables needed to run the service.

dependsset, optional

A set of service names that this service depends on. The service will only be started after all its dependencies have been started.

max_restartsint, optional

The maximum number of restarts to allow for this service. Containers are only restarted on failure, and the cap is set for all containers in the service, not per container. Set to -1 to allow infinite restarts. Default is 0.

allow_failuresbool, optional

If False (default), the whole application will shutdown if the number of failures for this service exceeds max_restarts. Set to True to keep the application running even if this service exceeds its failure limit.

node_labelstr, optional

The node label expression to use when requesting containers for this service. If not set, defaults to the application-level node_label (if set).

nodeslist, optional

A list of node host names to restrict containers for this service to. If not set, defaults to no node restrictions.

rackslist, optional

A list of rack names to restrict containers for this service to. The racks corresponding to any nodes requested will be automatically added to this list. If not set, defaults to no rack restrictions.

relax_localitybool, optional

If true, containers for this request may be assigned on hosts and racks other than the ones explicitly requested. If False, those restrictions are strictly enforced. Default is False.

classmethod from_dict(obj, **kwargs)

Create an instance from a dict.

Keys in the dict should match parameter names

classmethod from_json(b)

Create an instance from a json string.

Keys in the json object should match parameter names

classmethod from_protobuf(obj)

Create an instance from a protobuf message.

classmethod from_yaml(b)

Create an instance from a yaml string.


Convert object to a dict


Convert object to a json string


Convert object to a protobuf message


Convert object to a yaml string

class skein.FileType(x)

Enum of possible file types to distribute with the application.


Regular file


A .zip, .tar.gz, or .tgz file to be automatically unarchived in the containers.

classmethod values()

The constants of this enum type, in the order they are declared.

class skein.FileVisibility(x)

Enum of possible file visibilities.

Determines how the file can be shared between containers.


Shared only among containers of the same application on the node.


Shared by all users on the node.


Shared among all applications of the same user on the node.

classmethod values()

The constants of this enum type, in the order they are declared.

class skein.File(source=required, type='infer', visibility=FileVisibility.APPLICATION, size=0, timestamp=0)

A file/archive to distribute with the service.


The path to the file/archive. If no scheme is specified, path is assumed to be on the local filesystem (file:// scheme).

typeFileType or str, optional

The type of file to distribute. Archive’s are automatically extracted by yarn into a directory with the same name as their destination. By default the type is inferred from the file extension.

visibilityFileVisibility or str, optional

The resource visibility, default is FileVisibility.APPLICATION

sizeint, optional

The resource size in bytes. If not provided will be determined by the file system.

timestampint, optional

The time the resource was last modified. If not provided will be determined by the file system.

classmethod from_dict(obj, **kwargs)

Create an instance from a dict.

Keys in the dict should match parameter names

classmethod from_json(b)

Create an instance from a json string.

Keys in the json object should match parameter names

classmethod from_protobuf(obj)

Create an instance from a protobuf message.

classmethod from_yaml(b)

Create an instance from a yaml string.


Convert object to a dict


Convert object to a json string


Convert object to a protobuf message


Convert object to a yaml string

class skein.Resources(memory=required, vcores=required, gpus=0, fpgas=0)

Resource requests per container.

memorystr or int

The amount of memory to request. Can be either a string with units (e.g. "5 GiB"), or numeric. If numeric, specifies the amount of memory in MiB. Note that the units are in mebibytes (MiB) NOT megabytes (MB) - the former being binary based (1024 MiB in a GiB), the latter being decimal based (1000 MB in a GB).

Requests smaller than the minimum allocation will receive the minimum allocation (1024 MiB by default). Requests larger than the maximum allocation will error on application submission.


The number of virtual cores to request. Depending on your system configuration one virtual core may map to a single actual core, or a fraction of a core. Requests larger than the maximum allocation will error on application submission.

gpusint, optional

The number of gpus to request. Requires Hadoop >= 3.1, sets resource requirements for yarn.io/gpu. Default is 0.

fpgasint, optional

The number of fpgas to request. Requires Hadoop >= 3.1, sets resource requirements for yarn.io/fpga. Default is 0.

classmethod from_dict(obj)

Create an instance from a dict.

Keys in the dict should match parameter names

classmethod from_json(b)

Create an instance from a json string.

Keys in the json object should match parameter names

classmethod from_protobuf(msg)

Create an instance from a protobuf message.

classmethod from_yaml(b)

Create an instance from a yaml string.


Convert object to a dict


Convert object to a json string


Convert object to a protobuf message


Convert object to a yaml string

class skein.ACLs(enable=False, view_users=None, view_groups=None, modify_users=None, modify_groups=None, ui_users=None)

Skein Access Control Lists.

Maps access types to users/groups to provide that access.

The following access types are supported:

  • VIEW : view application details

  • MODIFY : modify the application via YARN (e.g. killing the application)

  • UI : access the application Web UI

The VIEW and MODIFY access types are handled by YARN directly; permissions for these can be set by users and/or groups. Authorizing UI access is handled by Skein internally, and only user-level access control is supported.

The application owner (the user who submitted the application) will always have permission for all access types.

By default, ACLs are disabled - to enable, set enable=True. If enabled, access is restricted only to the application owner by default - add users/groups to the access types you wish to expand to other users.

enablebool, optional

If True, the ACLs will be enforced. Default is False.

view_users, view_groupslist, optional

Lists of users/groups to give VIEW access to this application. If both are empty, only the application owner has access (default). If either contains "*", all users are given access (default). See the YARN documentation for more information on what VIEW access entails.

modify_users, modify_groupslist, optional

Lists of users/groups to give MODIFY access to this application. If both are empty, only the application owner has access (default). If either contains "*", all users are given access (default). See the YARN documentation for more information on what MODIFY access entails.

ui_userslist, optional

A list of users to give access to the application Web UI. If empty, only the application owner has access (default). If it contains "*", all users are given access.


By default ACLs are disabled, and all users have access.

>>> import skein
>>> acls = skein.ACLs()

Enabling ACLs results in only the application owner having access (provided YARN is also configured with ACLs enabled).

>>> acls = skein.ACLs(enable=True)

To give access to other users, add users/groups to the desired access types. Here we enable view access for all users in group engineering, and modify access for user nancy.

>>> acls = skein.ACLs(enable=True,
...                   view_groups=['engineering'],
...                   modify_users=['nancy'])

You can use the wildcard character "*" to enable access for all users. Here we give view access to all users:

>>> acls = skein.ACLs(enable=True,
...                   view_users=['*'])
classmethod from_dict(obj)

Create an instance from a dict.

Keys in the dict should match parameter names

classmethod from_json(b)

Create an instance from a json string.

Keys in the json object should match parameter names

classmethod from_protobuf(obj)

Create an instance from a protobuf message.

classmethod from_yaml(b)

Create an instance from a yaml string.


Convert object to a dict


Convert object to a json string


Convert object to a protobuf message


Convert object to a yaml string

class skein.Security(cert_file=None, key_file=None, cert_bytes=None, key_bytes=None)

Security configuration.

Secrets may be specified either as file paths or raw bytes, but not both.

cert_filestr, File, optional

The TLS certificate file, in pem format. Either a path or a fully specified File object.

key_filestr, File, optional

The TLS private key file, in pem format. Either a path or a fully specified File object.

cert_bytesbytes, optional

The contents of the TLS certificate file, in pem format.

key_bytesbytes, optional

The contents of the TLS private key file, in pem format.

classmethod from_default()

The default security configuration.

Usually this loads the credentials stored in the configuration directory (~/.skein by default). If these credentials don’t already exist, new ones will be created.

When run in a YARN container started by Skein, this loads the same security credentials as used for the current application.

classmethod from_dict(obj, **kwargs)

Create an instance from a dict.

Keys in the dict should match parameter names

classmethod from_directory(directory)

Create a security object from a directory.

Relies on standard names for each file (skein.crt and skein.pem).

classmethod from_json(b)

Create an instance from a json string.

Keys in the json object should match parameter names

classmethod from_protobuf(obj)

Create an instance from a protobuf message.

classmethod from_yaml(b)

Create an instance from a yaml string.

classmethod new_credentials()

Create a new Security object with a new certificate/key pair.


Convert object to a dict

to_directory(directory, force=False)

Write this security object to a directory.


The directory to write the configuration to.

forcebool, optional

If security credentials already exist at this location, an error will be raised by default. Set to True to overwrite existing files.


A new security object backed by the written files.


Convert object to a json string


Convert object to a protobuf message


Convert object to a yaml string

Application Responses

class skein.model.ApplicationState(x)

Enum of application states.


Application was just created.


Application is being saved.


Application has been submitted.


Application has been accepted by the scheduler.


Application is currently running.


Application finished successfully.


Application failed.


Application was terminated by a user or admin.

classmethod values()

The constants of this enum type, in the order they are declared.

class skein.model.FinalStatus(x)

Enum of application final statuses.


Application finished successfully.


Application was terminated by a user or admin.


Application failed.


Application has not yet finished.

classmethod values()

The constants of this enum type, in the order they are declared.

class skein.model.ApplicationReport(id, name, user, queue, tags, host, port, tracking_url, state, final_status, progress, usage, diagnostics, start_time, finish_time)

Report of application status.


The application ID.


The application name.


The user that started the application.


The application queue.

tagsset of strings

The application tags.


The host the application master is running on.


The rpc port for the application master


The application tracking url.


The application state.


The application final status.


The progress of the application, from 0.0 to 1.0.


Report on application resource usage.


The diagnostic message in the case of failures.


The application start time.


The application finish time.

classmethod from_protobuf(obj)

Create an instance from a protobuf message.

property runtime

The total runtime of the container.


Convert object to a protobuf message

class skein.model.ResourceUsageReport(memory_seconds, vcore_seconds, num_used_containers, needed_resources, reserved_resources, used_resources)

Resource usage report.


The total amount of memory (in MBs) the application has allocated times the number of seconds the application has been running.


The total number of vcores that the application has allocated times the number of seconds the application has been running.


Current number of containers in use.


The needed resources.


The reserved resources.


The used resources.

classmethod from_protobuf(obj)

Create an instance from a protobuf message.


Convert object to a protobuf message

class skein.model.ContainerState(x)

Enum of container states.


Container is waiting on another service to startup before being requested.


Container has been requested but is not currently running.


Container is currently running.


Container finished successfully.


Container failed.


Container was terminated by a user or admin.

classmethod values()

The constants of this enum type, in the order they are declared.

class skein.model.Container(service_name, instance, state, yarn_container_id, yarn_node_http_address, start_time, finish_time, exit_message)

Current container state.


The name of the service this container is running.


The container instance number.


The current container state.


The YARN container id.


The YARN node HTTP address given as host:port.


The start time, None if container has not started.


The finish time, None if container has not finished.


The diagnostic exit message for completed containers.

classmethod from_protobuf(obj)

Create an instance from a protobuf message.

property id

The complete service_name & instance identity of this container.

property runtime

The total runtime of the application.


Convert object to a protobuf message

class skein.model.ApplicationLogs(app_id, logs)

A mapping of yarn_container_id to their logs for an application


Write the logs to a file or stdout.

filefile-like, optional

A file-like object to write the logs to. Defaults to sys.stdout.


Write the logs to a string.

class skein.model.NodeState(x)

Enum of node states.


Node is out of service.


Node is currently decommissioning.


Node has not sent responded for some time.


New has just started.


Node is just rebooted.


Node is currently running.


Node has been shutdown gracefully.


Node is unhealthy.

classmethod values()

The constants of this enum type, in the order they are declared.

class skein.model.NodeReport(id, http_address, rack_name, labels, state, health_report, total_resources, used_resources)

Report of node status.


The node id.


The http address to the node manager.


The rack name for this node.


Node labels for this node.


The node’s current state.


The diagnostic health report for this node.


Total resources available on this node.


Used resources available on this node.

classmethod from_protobuf(obj)

Create an instance from a protobuf message.

property host

The node manager host for this node.

property port

The node manager port for this node.


Convert object to a protobuf message

class skein.model.QueueState(x)

Enum of queue states.


Queue is running, normal operation.


Queue is stopped, no longer taking new requests.

classmethod values()

The constants of this enum type, in the order they are declared.

class skein.model.Queue(name, state, capacity, max_capacity, percent_used, node_labels, default_node_label)

Information about a specific YARN queue.


The queue’s name.


The queue’s state.


The queue’s capacity as a percentage. For the capacity scheduler, the queue is guaranteed access to this percentage of the parent queue’s resources (if sibling queues are running over their limit, there may be a lag accessing resources as those applications scale down). For the fair scheduler, this number is the percentage of the total cluster this queue currently has in its fair share (this will shift dynamically during cluster use).


The queue’s max capacity as a percentage. For the capacity scheduler, this queue may elastically expand to use up to this percentage of its parent’s resources if its siblings aren’t running at their capacity. For the fair scheduler this is always 100%.


The percent of this queue’s capacity that’s currently in use. This may be over 100% if elasticity is in effect.


A set of all accessible node labels for this queue. If all node labels are accessible this is the set {"*"}.


The default node label for this queue. This will be used if the application doesn’t specify a node label itself.

classmethod from_protobuf(obj)

Create an instance from a protobuf message.


Convert object to a protobuf message


exception skein.SkeinError

Bases: Exception

Base class for Skein specific exceptions

exception skein.ConnectionError

Bases: skein.exceptions.SkeinError, ConnectionError

Failed to connect to the driver or application master

exception skein.DriverNotRunningError

Bases: skein.exceptions.ConnectionError

The driver process is not currently running

exception skein.ApplicationNotRunningError

Bases: skein.exceptions.ConnectionError

The application master is not currently running

exception skein.DriverError

Bases: skein.exceptions.SkeinError

Internal exceptions from the driver

exception skein.ApplicationError

Bases: skein.exceptions.SkeinError

Internal exceptions from the application master

Tornado Utilities

skein.tornado.init_kerberos(keytab=None, service='HTTP', hostname=None)

Initialize Kerberos authentication settings.

Should be called once on process startup, sets global settings.

keytabstr, optional

Path to the keytab file. If not set, will check the KRB5_KTNAME environment variable, and error otherwise.

hostnamestr, optional

The hostname. If not provided, will be inferred from the system.

servicestr, optional

The service name. The default of "HTTP" is usually sufficient.

class skein.tornado.KerberosAuthMixin

A tornado.web.RequestHandler mixin for authenticating with Kerberos.


A simple hello-world application:

from skein.tornado import init_kerberos, KerberosAuthMixin
from tornado import web, ioloop

# Create a handler with the KerberosAuthMixin as a base class
class HelloHandler(KerberosAuthMixin, web.RequestHandler):
    def get(self):
        self.write("Hello %s" % self.current_user)

if __name__ == "__main__":
    # Initialize kerberos once per application

    # Serve web application
    app = web.Application([("/", HelloHandler)])
class skein.tornado.SimpleAuthMixin

A tornado.web.RequestHandler mixin for authenticating using Hadoop’s “simple” protocol.

In Hadoop, “simple” authentication uses a URL query parameter to specify the user, and isn’t secure at all. This mixin class exists for parity with Hadoop, but kerberos authentication is advised instead.


A simple hello-world application:

from skein.tornado import SimpleAuthMixin
from tornado import web, ioloop

# Create a handler with the SimpleAuthMixin as a base class
class HelloHandler(SimpleAuthMixin, web.RequestHandler):
    def get(self):
        self.write("Hello %s" % self.current_user)

if __name__ == "__main__":
    # Serve web application
    app = web.Application([("/", HelloHandler)])