Implementing Value Entities in Python

Value Entities persist their state on every change, and Akka Serverless needs to serialize their data to send it to the underlying data store. The most straight forward way to persist the state is to use protobuf types. Akka Serverless will automatically detect if an updated state is protobuf, and serialize it using protobufjs. See https://www.npmjs.com/package/protobufjs for more information on protobufjs. For other serialization options, including JSON, see Serialization.

While protocol buffers are the recommended format for persisting state, we recommend that you do not persist your service’s protobuf messages. While this may introduce some overhead to convert from one type to the other, it will allow the service’s public interface to evolve independent of the data storage format, which should be private.

For value entities, you modify the state and then trigger a save of that state. Both the Python and the Java SDK have a context.updateState(newState) method for this purpose. If you change the state but do not call updateState, that state change will be lost.

The steps necessary to implement a Value Entity include:

  1. Defining the API and domain messages in .proto files.

  2. Implementing behavior in command handlers.

  3. Creating and initializing the Entity.

The sections on this page describe a Counter Value Entity as an example. Follow the steps on the Kickstart a Python module page to create a Counter entity using the code generation tools.

Defining the proto files

The following counter_domain.proto file defines a "Counter" Value Entity. The entity stores an integer value as defined in the CounterState message. A typical entity stores more data than what is required for a counter.

syntax = "proto3";

package com.example.domain; (1)

import "akkaserverless/annotations.proto"; (2)

option (akkaserverless.file).value_entity = { (3)
    name: "Counter" (4)
    entity_type: "counter" (5)
    state: "CounterState" (6)
};

message CounterState { (7)
    int32 value = 1;
}
1 Any classes generated from this protobuf file will be nested in the package heirarchy of: com.example.domain.
2 Import the Akka Serverless protobuf annotations, or options.
3 The protobuf option (akkaserverless.file).value_entity is specific to code-generation as provided by the Akka Serverless plugin.
4 name denotes the base name for the Value entity, the code-generation will create initial sources CounterImpl, CounterTest and CounterIntegrationTest. Once these files exist, they are not overwritten, so you can freely add logic to them.
5 enity_type is a unique identifier of the "state storage". The entity name may be changed even after data has been created, the entity_type can’t. This value shows in the @ValueEnity annotation of your entity implementation.
6 state points to the protobuf message representing the Value entity’s state which is kept by Akka Serverless
7 The CounterState protobuf message is what Akka Serverless stores for this entity.

The counter_api.proto file defines the commands we can send to the Counter service to manipulate or access the Counter`s state. They make up the service API:

// This is the public API offered by your entity.
syntax = "proto3";

import "google/protobuf/empty.proto";
import "akkaserverless/annotations.proto"; (1)
import "google/api/annotations.proto";

package com.example; (2)

message IncreaseValue { (3)
    string counter_id = 1 [(akkaserverless.field).entity_key = true]; (4)
    int32 value = 2;
}

message DecreaseValue {
    string counter_id = 1 [(akkaserverless.field).entity_key = true];
    int32 value = 2;
}

message ResetValue {
    string counter_id = 1 [(akkaserverless.field).entity_key = true];
}

message GetCounter {
    string counter_id = 1 [(akkaserverless.field).entity_key = true];
}

message CurrentCounter { (5)
    int32 value = 1;
}

service CounterService { (6)
    option (akkaserverless.service) = { (7)
        type : SERVICE_TYPE_ENTITY
        component : ".domain.Counter"
    };

    rpc Increase(IncreaseValue) returns (google.protobuf.Empty);
    rpc Decrease(DecreaseValue) returns (google.protobuf.Empty);
    rpc Reset(ResetValue) returns (google.protobuf.Empty);
    rpc GetCurrentCounter(GetCounter) returns (CurrentCounter);
}
1 Import the Akka Serverless protobuf annotations, or options.
2 Any classes generated from this protobuf file will be be nested in the package heirarchy: com.example.
3 Protobuf messages describe the Commands that the service handles. They may contain other messages to represent structured data.
4 Every Command must contain a string field that contains the entity ID and is marked with the (akkaserverless.field).entity_key option.
5 Messages describe the return value for the API. For methods that don’t have return values, you should use google.protobuf.Empty.
6 The service descriptor shows the API of the entity. It lists the methods a client can use to issue Commands to the entity.
7 The protobuf option (akkaserverless.service) is specific to code-generation as provided by Akka Serverless and points to the protobuf definition Counter shown above (in com.example.domain).

Creating an Entity

The following code creates the Value Entity with the ValueEntitynew tab class. It passes in:

  • The protobuf files, counter_api.proto and counter_domain.proto, define the service and the domain protocol. Akka Serverless uses the service protobuf file to load and read the service. It uses the definitions in counter_domain.proto to serialize state it receives from the proxy.

  • The fully qualified name of the service the Value Entity implements, com.example.CounterService. The entityType is used to namespace the state in the journal.

# imports fom Akka Serverless SDK
from akkaserverless.value_context import ValueEntityCommandContext
from akkaserverless.value_entity import ValueEntity

from google.protobuf.empty_pb2 import Empty
from counter_api_pb2 import (IncreaseValue, DecreaseValue, ResetValue, GetCounter, CurrentCounter, _COUNTERSERVICE, DESCRIPTOR as API_DESCRIPTOR)
from counter_domain_pb2 import (CounterState)

entity = ValueEntity(_COUNTERSERVICE, [API_DESCRIPTOR], 'counters', init)

Initial state

An Entity must have an initial state when it is created and no state has been persisted for it yet. Value Entities are not explicitly created, they are implicitly created when a command arrives for them. Nothing is persisted on creation. So, if user "X" opens their counter for the first time, an entity will be created, but it will have no state stored yet, and just be in the initial state.

To create the initial state, set the initialnew tab callback. This takes the ID of the entity being created, and returns a new empty state, in this case, an empty counter state:

def init(entity_id: str) -> CounterState:
    return CounterState()
The CounterState.create() method creates a protobuf message using the CounterState protobuf message type.

Behavior

Value Entity behavior is defined in command handlers. A command handlernew tab is a function that takes a command, the current state, and a ValueEntityCommandContextnew tab. The function executes a service call on the Entity gRPC interface.

The command is the input message type for the gRPC service call. For example, the GetCurrentCounter service call has an input type of GetCounter, while the Increase service call has an input type of IncreaseValue. The command will be an object that matches the structure of these protobuf types.

The command handler must return a message of the same type as the output type of the gRPC service call, in the case of the GetCurrentCounter command, this must be a CurrentCounter message. In this case, the message does not need to be created using a looked up protobuf message type because Akka Serverless is already aware of the output type of the gRPC service call. Use a Python object that matches the structure of the protobuf type.

Retrieving state

The following example shows the implementation of a GetCurrentCounter command handler. This command handler is a read-only command handler, it doesn’t update the state, it just returns it:

@entity.command_handler("GetCurrentCounter")
def get_counter(state: CounterState, command: GetCounter, context: ValueEntityCommandContext):
    return CurrentCounter(value=state.value)

Updating state

When updating the state, a command handler MUST persist that change by calling the updateStatenew tab method on the ValueEntityCommandContext. If it does not, any change to the state will be lost when the next command arrives.

The following command handler updates the state. It also validates the command, ensuring the quantity of items added is greater than zero. Invoking failnew tab fails the command. This method throws, so there’s no need to explicitly throw an exception.

@entity.command_handler("Increase")
def increase_value(state: CounterState, command: IncreaseValue, context: ValueEntityCommandContext):
    state.value += command.value
    context.update_state(state)
    return Empty()

Starting the entity

A helper method for listing the components declared in the current project is provided by the code generator. It also creates the relevant code for starting all of the components:

from akkaserverless.akkaserverless_service import AkkaServerlessService
from api_impl import entity as myapi
import logging

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)

    # create service and add components
    service = AkkaServerlessService()
    service.add_component(myapi)
    service.start()