Implementing Event Sourced Entities in Python
Event Sourced Entities persist their state with ACID semantics , scale horizontally, and isolate failures. They use the Event Sourcing Model—rather than persisting the current state, they persist all of the events that led to the current state. Akka Serverless stores these events in a journal.
An Event Sourced Entity must not update its in-memory state directly as a result of a command. The handling of a command, if it results in changes being required to state, should emit events. These events will then be received, at which point the in-memory state can and should be changed in response.
When you need to read state in your service, ask yourself what events should I be listening to?. When you need to write state, ask yourself what events should I be emitting?
TODO: add an illustration
To load an Entity, Akka Serverless reads the journal and replays events to compute the Entity’s current state. As an optimization, by default, Event Sourced Entities persist state snapshots periodically. This allows Akka Serverless to recreate an Entity from the most recent snapshot plus any events saved after the snapshot.
In contrast with typical create, read, update (CRUD) systems, event sourcing allows the state of the Entity to be reliably replicated to other services. Event Sourced Entities use offset tracking in the journal to record which portions of the system have replicated which events.
To learn more about event sourcing, check out the free Lightbend Academy course, Reactive Architecture: CQRS & Event Sourcing . |
Persistence types and serialization
Event Sourced Entities persist events and snapshots in the journal, so Akka Serverless must serialize them. Akka Serverless will automatically detect if an emitted event or snapshot is a protobuf
type, and serialize it using protobufjs
. However, as a Python developer, you may find it more natural to use JSON. Serialization options for Python services describes how to do that. See the protobuf documentation for more information on protobufjs
.
While protobuf
is the recommended format for persisting events, we recommend that you do not persist your service’s protobuf
messages, rather, you should create new messages, even if they are identical. While this may introduce some overhead in converting from one type to the other, it will allow the service’s public interface to evolve independently from its data storage format, which should be private.
The following shows an example shopping cart definition in a domain.proto
file:
syntax = "proto3";
package example.shoppingcart.domain;
message LineItem {
string productId = 1;
string name = 2;
int32 quantity = 3;
}
message ItemAdded {
LineItem item = 1;
}
message ItemRemoved {
string productId = 1;
}
message CheckedOut {}
message Cart {
repeated LineItem items = 1;
bool checkedout = 2;
}
In this file, the Cart
message represents the state snapshot, while ItemAdded
and ItemRemoved
are events. Note the event names are in past tense—events are facts, indisputable things that happened in the past. A fact never becomes false: after an item has been added to a shopping cart, it never becomes untrue that that item was added to the cart. It can be removed, but that doesn’t change the fact that it was added, it only changes the current state of the cart. The names of events should always be in past tense to reflect the indisputable fact that they represent.
Creating an entity
Create an Event Sourced Entity with the EventSourcedEntity class.
TODO: update with new SDK names.
# imports fom Akka Serverless SDK
from akkaserverless.event_sourced_context import EventSourcedCommandContext
from akkaserverless.event_sourced_entity import EventSourcedEntity
entity = EventSourcedEntity(_SHOPPINGCARTSERVICE, [API_DESCRIPTOR], 'carts', init)
Using protobuf types
When you pass an event or snapshot to persist, Akka Serverless needs to know how to serialize it. Simply passing a regular object does not provide enough information to know how protobuf
should serialize the objects. To emit an event or snapshot, you first must lookup the protobuf
types, and then use the create
method to create them.
The EventSourced
class provides a helper method called lookupType
. So before implementing, we’ll use lookupType
to get these types to use later.
# import from generated GRPC file(s)
from shoppingcart_api_pb2 import (GetShoppingCart, Cart, AddLineItem, _SHOPPINGCARTSERVICE, DESCRIPTOR as API_DESCRIPTOR)
from shoppingcart_domain_pb2 import (ItemAdded, LineItem)
Initial state
When there are no snapshots persisted for an entity (such as when the entity is first created), the entity needs to have an initial state. Note that Event Sourced Entities are not explicitly created, they are implicitly created when a command arrives for them. And, nothing is persisted until an event is created for that Entity. So, if user "X" opens their shopping cart for the first time, an Entity will be created for them, with no events in the log. It will just be in the initial state.
To create the initial state, we set the initial
callback. This takes the id of the entity being created, and returns a new empty state, in this case, an empty shopping cart:
def init(entity_id: str) -> Cart:
return Cart()
Note the use of Cart.create()
, this creates a protobuf
message using the Cart
protobuf
message type that we looked up earlier.
Behavior
Now we need to define the behavior for our entity. The behavior consists of two parts, command handlers, and event handlers.
Command handlers
A command handler is a function that takes a command, the current state, and an EventSourcedEntityCommandContext
. It implements a service call on the Entity’s gRPC interface.
The command is the input message type for the gRPC service call. For example, the GetCart
service call has an input type of GetShoppingCart
, while the AddItem
service call has an input type of AddLineItem
. The command will be an object that matches the structure of these protobuf types.
The command handler must return a message of the same type as the output type of the gRPC service call, in the case of our GetCart
command, this must be a Cart
message. Note that unlike for the state and events, this message does not need to be created using a looked up protobuf message type—Akka Serverless already knows the output type of the gRPC service call and so can infer it. It only has to be a plain Python object that matches the structure of the protobuf type.
The following shows the implementation of the GetCart
command handler. This command handler is a read-only command handler, it doesn’t emit any events, it just returns some state:
@entity.command_handler("GetCart")
def get(state: Cart, command: GetShoppingCart):
return state
Emitting events from commands
Commands that modify the state MUST do so by emitting events.
The only way a command handler may modify its state is by emitting an event. Any modifications made directly to the state from the command handler will not be persisted, and will be lost as soon as the command handler finishes executing. |
A command handler may emit an event by using the emit
method on the EventSourcedEntityCommandContext
.
The following command handler example emits an event:
@entity.command_handler("AddItem")
def add(state: Cart, command: AddLineItem, context: EventSourcedCommandContext):
context.emit(ItemAdded(LineItem(productId= command.product_id, name=command.name)))
return Empty()
This command handler also validates the command by ensuring that the quantity of items added is greater than zero. Invoking fail
fails the command - this method throws so there’s no need to explicitly throw an exception.
Event handlers
An event handler is invoked at two points:
-
When restoring Entities from the journal, before any commands are handled
-
Each time a new event is emitted
An event handler’s responsibility is to update the state of the entity according to the event. Event handlers are the only place where its safe to mutate the state of an Entity.
An event handler must be declared for each type of event that gets emitted. The type is defined by the protobuf message type in the case of protobuf
events, or the type
property on a JSON object in the case of JSON events. The mapping for these type names to functions will be discussed later, for now we’ll just look at the functions.
Event handlers take the event they are handling, and the state, and must return the new state. The handler may update the existing state passed in, but it still has to return that state as its return value. The command handler accumulates the emitted events and applies the events to the managed state after command processing.
Here’s an example event handler for the ItemAdded
event:
@entity.event_handler(Cart)
def added(state: Cart, event: ItemAdded ):
state.product_id = event.product_id
state.name = event.name
return state
Setting the behavior
Once you have command handler and event handler functions implemented, you can set your behavior. The behavior callback takes the current state of the entity, and returns an object with two properties: commandHandlers
and eventHandlers
. The callback may return different sets of handlers according to the current state, this will be explored more later, for now we’ll just implement an entity with one set of handlers.
The behavior callback can be set by setting the behavior
property on the entity:
@entity.command_handler("AddItem")
The command handlers are a mapping of the gRPC service call names to the command handler functions we implemented. Note the names, as in the gRPC convention for service call names, are uppercase.
The event handlers are a mapping of event names to the event handler functions that we implemented. The event names must match the type of the events that are being persisted. In the case of protobuf messages, this is either the fully qualified name of the protobuf message, or the unqualified name of the protobuf message. For JSON messages, this is the value of the type
property in the message.