View on GitHub
K / Programming Model

Kappa aims to run mostly unmodified Python 3 code on serverless platforms. As such, Kappa application code is mostly just ordinary Python 3 code, with a few extra features and restrictions which are described below.

Application Structure

A Kappa application lives in a directory named after the application (e.g., ./factorial). The application directory must contain a file named handler.py, which is the entry point script of the application. This Python file must, in turn, define a function named handler with the following signature:

def handler(event, _):
    pass

Kappa starts an application by calling this function.

The handler function takes two positional arguments. The first argument, event, contains application input specified by the user in JSON form at invocation time. The event argument can be of arbitrary type as long as it is expressible in JSON. The second argument is currently unused.

A Kappa application may also contain files and directory other than the entry point script; they can be Python packages imported by the application (e.g., numpy), configuration files, etc. Be mindful, however, of these caveats:

Execution

An execution of a Kappa application consists of executing Kappa tasks, each of which is a logical thread of execution.

Each task executes a Python function call f(*args), where both the function f and the arguments args are specified when the task is spawned. At startup time, the Kappa library automatically spawns a very first task running handler(event, None), where event is user-supplied application input.

Generally, each task is run on top of lambda functions, which are time-bounded execution contexts provided by the serverless platform. Since each lambda function has a time limit, a Kappa task should take checkpoints periodically so that when a lambda function dies, the task can be resumed from a recent checkpoint on a fresh lambda function.

The programmer is responsible for ensuring that adequate checkpoints be taken. For example, as explained in the Quick-Start Tutorial, you may call checkpoint() every x number of iterations in a loop to take checkpoints frequently. The checkpoint function belongs to a class of special functions called coordinator calls (see below); making any coordinator call automatically takes a checkpoint.

To take a checkpoint, the Kappa runtime library serializes and saves all live variables at the program point where the checkpoint is taken. Informally, a variable is live at a program point if it can be accessed by subsequent code. For example, in the following code snippet:

def foo(x, y):
    z = x * y
    checkpoint()
    t = z + x
    return t

the checkpoint will contain values for variables x and z (variable y is not live as it is not accessed in later code).

Kappa serializes live variables using Python’s pickle module, and raises a runtime exception if serialization fails. The programmer is responsible for making sure that any live values at checkpoint locations are picklable, i.e., capable of being serialized using pickle. See pickle documentation for details.

While tasks in Kappa are single-threaded, Kappa enables concurrent processing by allowing each task to spawn other tasks which execute in parallel, and by providing inter-task communication mechanisms that allow tasks to communicate and coordinate with each other. The concurrency mechanism is detailed in a subsequent section.

Coordinator Calls

The Kappa library, rt, provides a special set of functions called coordinator calls. These functions implement features such as checkpointing and synchronization between tasks. Here is a list of core coordinator calls:

Coordinator calls differ from regular Python functions in the following ways:

Coordinator calls carry greater overhead than ordinary Python function calls because they need to take and persist a checkpoint, as well as contact the coordinator machine over the network. Reducing the number of coordinator calls made may improve application performance.

Concurrency

To allow parallel computation on the serverless platform, Kappa provides mechanisms by which a running task can spawn additional tasks that run in parallel, and parallel tasks can communicate and synchronize through FIFO queues. We will showcase these mechanisms using two examples.

Example: Parallel Fibonacci

Our first example features a recursive computation of Fibonacci numbers. To compute fib(n), we spawn two sub-tasks to compute fib(n-1) and fib(n-2) in parallel, then spawn a third sub-task to compute their sum.

from rt import spawn

def sum_two(a, b):
    return a + b  # a, b are ints, not futures.

def fib(n):
    if n <= 1:
        return n
    else:
        fut1 = spawn(fib, (n-1,))
        fut2 = spawn(fib, (n-2,))
        fut_sum = spawn(sum_two, (fut1, fut2))
        return fut_sum.wait()

def handler(event, _):
    return fib(event["n"])

The spawn function takes a function f and a sequence of arguments args, spawns a task that runs f(*args), and returns a future object to the result.

Recall that the function f, a coordinator call argument, must be picklable. In practice, this means f can be any function defined at the module level or any built-in function in Python.

There are two ways to use the result of a future fut:

  1. Retrieve the result explicitly through fut.wait(), a coordinator call that blocks until the result is produced (i.e., waits for the spawned task to complete), and returns the result.

    For example, the fib function above calls fut_sum.wait() to retrieve the result of the sum_two task.

  2. Alternatively, pass fut to a spawned sub-task as an argument, in which case the sub-task doesn’t start until the result of fut is ready, and the result is implicitly substituted for the future as argument to the sub-task.

    For example, the sum_two task is spawned with arguments fut1 and fut2, which are automatically turned into their values (i.e., fib(n-1) and fib(n-2)) before being passed to the sum_two function.

    This mechanism simplifies constructing dependency graphs for tasks.

Example: Message Passing

Kappa allows currently-executing tasks to communicate and synchronize with each other through FIFO queues. Take a look at this example:

from rt import create_queue, on_coordinator, spawn

def count(q):
    """Counts the strings passed into queue, stopping at None."""
    ctr = 0
    while q.dequeue() is not None:
        ctr += 1
    return ctr

def gen(q):
    """Passes two strings into queue."""
    q.enqueue("a")
    q.enqueue("b")
    q.enqueue(None)

@on_coordinator
def handler(_event, _):
    q = create_queue(max_size=1)
    fut = spawn(count, (q,))
    spawn(gen, (q,))
    assert fut.wait() == 2

The entry point handler function creates a queue q and passes it to two spawned tasks:

In the end, we assert that the count task has retrieved the correct number of strings from the queue.

Queues can also be used for synchronization: dequeue blocks if the queue is empty, and enqueue blocks if the queue is full. One can implement other synchronization primitives, e.g., semaphores, on top of queues.

Finally, note that the handler function is annotated with @on_coordinator. When a task is spawned running an on_coordinator function, the task is launched as a normal Python process on the coordinator machine instead of on a lambda function. As a result, such tasks can issue coordinator calls faster (no network latency), and do not suffer from lambda function timeouts.

However, these functions take up resources on the coordinator machine. The handler function above is a good candidate for an on_coordinator task because it mostly just spawns and waits on other tasks and does little computation.

Idempotence

Recall that when a Kappa task gets killed, it gets restarted from a previous checkpoint. As such, code may be re-executed when timeouts occur, which can be problematic if the re-executed code is non-idempotent.

Consider, for example, a Kappa task that sends an email. If this task is restarted in the middle of sending an email, the re-executed task may possibly send a duplicate copy of the email.

Kappa, by default, provides at-most once semantics only for coordinator calls. Care must thus be taken in all other scenarios to prevent re-execution of non-idempotent code, either written by yourself or imported from a third-party library.

At-most once semantics for arbitrary non-idempotent code can be achieved by running such code as an on-coordinator task. For example:

from rt import spawn, on_coordinator

@on_coordinator
def send_email():
    # ... code to send email ...
    pass

def handler(event, _):
    # ...
    fut = spawn(send_email, ())
    fut.wait()
    # ...

Recall that the @on_coordinator annotation causes the send_email task to be run as a regular process on the coordinator machine rather than on a lambda function. Since the coordinator machine (e.g., your laptop) is assumed to never fail, the send_email task has no risk of failing in the middle either. Failures in the handler task do not cause any problems as spawn and wait are guaranteed at-most once semantics by virtue of being coordinator calls.

Other Features

Ignores

Kappa compiler-transformed code may suffer performance degradation because the compiler “flattens out” Python expressions. If a Python function is performance-sensitive and does not make coordinator calls, you may instruct the compiler to ignore the function by putting kappa:ignore at the end of its docstring:

def compute_much(foo, bar):
    """Does much computation.

    kappa:ignore
    """
    # ...

This feature works the same way for class docstrings.

If your computation is mostly performed within external modules (e.g., NumPy), you need not worry about the performance degradation since the Kappa compiler does not transform external modules.

It is the programmer’s responsibility to make sure that any ignored functions / classes do not make coordinator calls (including checkpoint).

Custom Serialization

As discussed above, Kappa uses the pickle module for serializing objects in a checkpoint. This allows the programmer to customize serialization strategies at a per-class granularity by, say, implementing the __getstate__ and __setstate__ methods on a user-defined class (see pickle documentation for details).

This mechanism enables serialization of stateful objects in certain scenarios. For example, you may wrap a network connection inside a wrapper class that, upon deserialization, re-establishes the connection.

Kappa also offers a convenience function, reconstructor, that allows customizing serialization and deserialization at a per-object granularity. As an example, suppose a local variable text contains a large string that is backed by a persistent S3 object. Rather than save it in your checkpoints, you want the string to be reconstructed directly from the backing S3 object at checkpoint load time. Here’s how you can achieve this:

from rt import reconstructor

# Assuming that s3_get(bucket, key) returns the content stored at
# s3://bucket/key.
text = reconstructor(s3_get, "my_bucket", "my_key")

The text object is initialized as text = s3_get("my_bucket", "my_key") and is reconstructed in the same manner when a checkpoint containing it is loaded. Its content is not stored in checkpoints.

The reconstructor function works for objects of any user-defined class and of certain built-in types (including str, bytes, list, dict, and set). Arguments to reconstructor must be picklable. One caveat is that, for objects of built-in types, reconstructor returns a wrapper around the original object; the wrapper inherits from the built-in type and implements __reduce_ex__.

Python Restrictions

Kappa supports a fair subset of Python features that has allowed us to implement interesting applications. That said, below is a partial list of Python features that Kappa currently doesn’t support. Note that these restrictions apply only to the entry point script handler.py, and not to any other Python modules in the application.

If your handler.py uses any of these features, Kappa should display an error message showing the offending unsupported code snippet.


back to top