Implementing Event Sourced Entities in Python

Event Sourced Entities persist their state with ACID semantics new tab, scale horizontally, and isolate failures. They use the Event Sourcing Model—​rather than persisting the current state, they persist all of the events that led to the current state. Akka Serverless stores these events in a journal.

An Event Sourced Entity must not update its in-memory state directly as a result of a command. The handling of a command, if it results in changes being required to state, should emit events. These events will then be received, at which point the in-memory state can and should be changed in response.

When you need to read state in your service, ask yourself what events should I be listening to?. When you need to write state, ask yourself what events should I be emitting?

TODO: add an illustration

To load an Entity, Akka Serverless reads the journal and replays events to compute the Entity’s current state. As an optimization, by default, Event Sourced Entities persist state snapshots periodically. This allows Akka Serverless to recreate an Entity from the most recent snapshot plus any events saved after the snapshot.

In contrast with typical create, read, update (CRUD) systems, event sourcing allows the state of the Entity to be reliably replicated to other services. Event Sourced Entities use offset tracking in the journal to record which portions of the system have replicated which events.

Event Sourced Entities offer strong consistency guarantees. Akka Serverless distributes Entities across every node in a stateful service deployment—​at any given time, each Entity will live on exactly one node. If a command for an Entity arrives to a node not hosting that Entity, the command is forwarded by the proxy to the node that contains that particular Entity. This forwarding is done transparently, your code does not need to know. Because each Entity lives on exactly one node, that node can handle messages for each Entity sequentially. Hence, there are no concurrency concerns relating to Event Sourced Entities, each Entity handles one message at a time.
To learn more about event sourcing, check out the free Lightbend Academy course, Reactive Architecture: CQRS & Event Sourcing new tab.

Persistence types and serialization

Event Sourced Entities persist events and snapshots in the journal, so Akka Serverless must serialize them. Akka Serverless will automatically detect if an emitted event or snapshot is a protobuf type, and serialize it using protobufjs. However, as a Python developer, you may find it more natural to use JSON. Serialization options for Python services describes how to do that. See the protobuf documentationnew tab for more information on protobufjs.

While protobuf is the recommended format for persisting events, we recommend that you do not persist your service’s protobuf messages, rather, you should create new messages, even if they are identical. While this may introduce some overhead in converting from one type to the other, it will allow the service’s public interface to evolve independently from its data storage format, which should be private.

The following shows an example shopping cart definition in a domain.proto file:

syntax = "proto3";

package example.shoppingcart.domain;

message LineItem {
  string productId = 1;
  string name = 2;
  int32 quantity = 3;
}

message ItemAdded {
  LineItem item = 1;
}

message ItemRemoved {
  string productId = 1;
}

message CheckedOut {}

message Cart {
  repeated LineItem items = 1;
  bool checkedout = 2;
}

In this file, the Cart message represents the state snapshot, while ItemAdded and ItemRemoved are events. Note the event names are in past tense—​events are facts, indisputable things that happened in the past. A fact never becomes false: after an item has been added to a shopping cart, it never becomes untrue that that item was added to the cart. It can be removed, but that doesn’t change the fact that it was added, it only changes the current state of the cart. The names of events should always be in past tense to reflect the indisputable fact that they represent.

Creating an entity

Create an Event Sourced Entity with the EventSourcedEntitynew tab class.

TODO: update with new SDK names.

# imports fom Akka Serverless SDK
from akkaserverless.event_sourced_context import EventSourcedCommandContext
from akkaserverless.event_sourced_entity import EventSourcedEntity

entity = EventSourcedEntity(_SHOPPINGCARTSERVICE, [API_DESCRIPTOR], 'carts', init)
Example details:
  • The example passes in the protobuf files that contain the service and the domain protocols: shoppingcart.proto and domain.proto. Akka Serverless will load and read these protobuf files. The domain event and snapshot definitions in domain.proto allow Akka Serverless to deserialize these messages when it receives them.

  • The example defines the fully qualified name of the service the Event Sourced Entity implements, example.shoppingcart.ShoppingCartService and specifies options:

    • Akka Serverless uses the persistenceId to namespace events in the journal.

    • The snapshotEvery parameter controls how often Akka Serverless takes snapshots, so that the entity doesn’t need to be recovered from the whole journal each time it’s loaded. If left unset, it defaults to every 100 events. Setting it to a negative number disables snapshots. Typically, the default works well, we only recommend changing it if you have specific data from performance tests to justify a change.

Using protobuf types

When you pass an event or snapshot to persist, Akka Serverless needs to know how to serialize it. Simply passing a regular object does not provide enough information to know how protobuf should serialize the objects. To emit an event or snapshot, you first must lookup the protobuf types, and then use the create method to create them.

The EventSourced class provides a helper method called lookupTypenew tab. So before implementing, we’ll use lookupType to get these types to use later.

# import from generated GRPC file(s)
from shoppingcart_api_pb2 import (GetShoppingCart, Cart, AddLineItem, _SHOPPINGCARTSERVICE, DESCRIPTOR as API_DESCRIPTOR)
from shoppingcart_domain_pb2 import (ItemAdded, LineItem)

Initial state

When there are no snapshots persisted for an entity (such as when the entity is first created), the entity needs to have an initial state. Note that Event Sourced Entities are not explicitly created, they are implicitly created when a command arrives for them. And, nothing is persisted until an event is created for that Entity. So, if user "X" opens their shopping cart for the first time, an Entity will be created for them, with no events in the log. It will just be in the initial state.

To create the initial state, we set the initialnew tab callback. This takes the id of the entity being created, and returns a new empty state, in this case, an empty shopping cart:

def init(entity_id: str) -> Cart:
    return Cart()

Note the use of Cart.create(), this creates a protobuf message using the Cart protobuf message type that we looked up earlier.

Behavior

Now we need to define the behavior for our entity. The behavior consists of two parts, command handlers, and event handlers.

Command handlers

A command handlernew tab is a function that takes a command, the current state, and an EventSourcedEntityCommandContextnew tab. It implements a service call on the Entity’s gRPC interface.

The command is the input message type for the gRPC service call. For example, the GetCart service call has an input type of GetShoppingCart, while the AddItem service call has an input type of AddLineItem. The command will be an object that matches the structure of these protobuf types.

The command handler must return a message of the same type as the output type of the gRPC service call, in the case of our GetCart command, this must be a Cart message. Note that unlike for the state and events, this message does not need to be created using a looked up protobuf message type—​Akka Serverless already knows the output type of the gRPC service call and so can infer it. It only has to be a plain Python object that matches the structure of the protobuf type.

The following shows the implementation of the GetCart command handler. This command handler is a read-only command handler, it doesn’t emit any events, it just returns some state:

@entity.command_handler("GetCart")
def get(state: Cart, command: GetShoppingCart):
    return state

Emitting events from commands

Commands that modify the state MUST do so by emitting events.

The only way a command handler may modify its state is by emitting an event. Any modifications made directly to the state from the command handler will not be persisted, and will be lost as soon as the command handler finishes executing.

A command handler may emit an event by using the emitnew tab method on the EventSourcedEntityCommandContext.

The following command handler example emits an event:

@entity.command_handler("AddItem")
def add(state: Cart, command: AddLineItem, context: EventSourcedCommandContext):
    context.emit(ItemAdded(LineItem(productId= command.product_id, name=command.name)))
    return Empty()

This command handler also validates the command by ensuring that the quantity of items added is greater than zero. Invoking failnew tab fails the command - this method throws so there’s no need to explicitly throw an exception.

Event handlers

An event handlernew tab is invoked at two points:

  • When restoring Entities from the journal, before any commands are handled

  • Each time a new event is emitted

An event handler’s responsibility is to update the state of the entity according to the event. Event handlers are the only place where its safe to mutate the state of an Entity.

An event handler must be declared for each type of event that gets emitted. The type is defined by the protobuf message type in the case of protobuf events, or the type property on a JSON object in the case of JSON events. The mapping for these type names to functions will be discussed later, for now we’ll just look at the functions.

Event handlers take the event they are handling, and the state, and must return the new state. The handler may update the existing state passed in, but it still has to return that state as its return value. The command handler accumulates the emitted events and applies the events to the managed state after command processing.

Here’s an example event handler for the ItemAdded event:

@entity.event_handler(Cart)
def added(state: Cart, event: ItemAdded ):
    state.product_id = event.product_id
    state.name = event.name
    return state

Setting the behavior

Once you have command handler and event handler functions implemented, you can set your behavior. The behavior callbacknew tab takes the current state of the entity, and returns an object with two properties: commandHandlers and eventHandlers. The callback may return different sets of handlers according to the current state, this will be explored more later, for now we’ll just implement an entity with one set of handlers.

The behavior callback can be set by setting the behaviornew tab property on the entity:

@entity.command_handler("AddItem")

The command handlers are a mapping of the gRPC service call names to the command handler functions we implemented. Note the names, as in the gRPC convention for service call names, are uppercase.

The event handlers are a mapping of event names to the event handler functions that we implemented. The event names must match the type of the events that are being persisted. In the case of protobuf messages, this is either the fully qualified name of the protobuf message, or the unqualified name of the protobuf message. For JSON messages, this is the value of the type property in the message.

Starting the entity

You can let Akka Serverless start entities by adding the entities to an Akka Serverless server instance:

# create service and add components
service = AkkaServerlessService()
service.add_component(myapi)