Actions
Actions are stateless functions that can be triggered in multiple ways. For example, by
-
a gRPC service call
-
an HTTP service call
-
a new item in an Event Sourced Entity’s journal
-
a forwarded call from another component
The Event Sourced Entity journal contains events that capture all state changes. By subscribing to the journal with the Event Sourced Entity name, another component can receive all events of that type emitted by an Event Sourced Entity.
Together with Topic publishing, this may be used to inform other services asynchronously about certain events.
Implementing Actions
The following example shows a .proto
file for an action:
// Copyright 2021 Lightbend Inc.
syntax = "proto3";
package perf.action;
import "perf_domain.proto";
import "perf_api.proto";
import "google/protobuf/empty.proto";
import "google/api/annotations.proto";
import "akkaserverless/annotations.proto";
message PingRequest {}
message PongResponse {}
service PingActionService {
rpc Ping(PingRequest) returns (PongResponse) {
option (google.api.http) = {
get: "/action/ping"
};
}
}
service FromValueActionService {
rpc Consume(domain.ValueState) returns (google.protobuf.Empty) {
option (akkaserverless.method).eventing.in = {
value_entity: "values"
};
}
}
service FromTopicActionService {
rpc Consume(domain.ValueState) returns (google.protobuf.Empty) {
option (akkaserverless.method).eventing.in = {
topic: "values"
};
}
}
message ToTopicRequest {
string id = 1;
int32 payload_size = 2;
}
service ToTopicActionService {
rpc ProduceToValues(ToTopicRequest) returns (domain.ValueState) {
option (akkaserverless.method).eventing.out = {
topic: "values"
};
}
rpc ProduceToIncrements(api.IncrementRequest) returns (api.IncrementRequest) {
option (akkaserverless.method).eventing.out = {
topic: "increments"
};
}
}
service FromValueToTopicActionService {
rpc Produce(domain.ValueState) returns (domain.ValueState) {
option (akkaserverless.method).eventing.in = {
value_entity: "values"
};
option (akkaserverless.method).eventing.out = {
topic: "values"
};
}
}
service FromTopicForwardToEntityActionService {
rpc Forward(api.IncrementRequest) returns (google.protobuf.Empty) {
option (akkaserverless.method).eventing.in = {
topic: "increments"
};
}
}
The following shows the implementation:
/*
* Copyright 2019 Lightbend Inc.
*/
const Action = require("@lightbend/akkaserverless-python-sdk").Action
const tckModel = new Action(
"proto/action.proto",
"akkaserverless.tck.model.action.ActionTckModel"
);
const Response = tckModel.lookupType("akkaserverless.tck.model.action.Response");
tckModel.commandHandlers = {
ProcessUnary: processUnary,
ProcessStreamedIn: processStreamedIn,
ProcessStreamedOut: processStreamedOut,
ProcessStreamed: processStreamed
};
function processUnary(request, context) {
respondWith(singleResponse(createResponses(request)), context);
}
function processStreamedIn(context) {
const responses = [];
context.on("data", request => responses.push(...createResponses(request)));
context.on("end", () => respondWith(singleResponse(responses), context));
}
function processStreamedOut(request, context) {
createResponses(request).forEach(response => respondWith(response, context));
context.end();
}
function processStreamed(context) {
context.on("data", request => createResponses(request).forEach(response => respondWith(response, context)));
context.on("end", () => context.end());
}
function respondWith(response, context) {
// need to accumulate effects, before replying, forwarding, or failing
response.effects.forEach(effect => context.effect(two.service.methods.Call, { id: effect.id }, effect.synchronous));
if (response.fail) context.fail(response.fail);
else if (response.forward) context.forward(two.service.methods.Call, { id: response.forward });
else if (response.reply) context.write(Response.create({ message: response.reply }));
else context.write(); // empty message
}
function createResponses(request) {
return request.groups.map(createResponse);
}
function createResponse(group) {
const response = {
effects: []
};
group.steps.forEach(step => {
if (step.reply) {
response.reply = step.reply.message;
} else if (step.forward) {
response.forward = step.forward.id;
} else if (step.effect) {
response.effects.push({ id: step.effect.id, synchronous: step.effect.synchronous });
} else if (step.fail) {
response.fail = step.fail.message;
}
});
return response;
}
function singleResponse(responses) {
return responses.reduce((response, next) => ({
reply: next.reply || response.reply,
forward: next.forward || response.forward,
fail: next.fail || response.fail,
effects: response.effects.concat(next.effects)
}), { effects: [] });
}
const two = new Action(
"proto/action.proto",
"akkaserverless.tck.model.action.ActionTwo"
);
two.commandHandlers = {
Call: request => Response.create()
};