Actions

Actions are stateless functions that can be triggered in multiple ways. For example, by

  • a gRPC service call

  • an HTTP service call

  • a new item in an Event Sourced Entity’s journal

  • a forwarded call from another component

Use case: request conversion

You can use Actions to convert incoming data into a different format before forwarding a call to a different component.

A service might need to offer a request data format that does not correspond directly with the commands of an Event Sourced Entity. By exposing a service implemented by an Action, the Action implementation can adapt the data format to the command (or commands) expected by the Event Sourced Entity. The incoming requests get forwarded to the target component.

Use case: listening to a journal

To listen to an Event Sourced Entity’s journal, an Action can be set up for Eventing.

The Event Sourced Entity journal contains events that capture all state changes. By subscribing to the journal with the Event Sourced Entity name, another component can receive all events of that type emitted by an Event Sourced Entity.

Together with Topic publishing, this may be used to inform other services asynchronously about certain events.

Implementing Actions

The following example shows a .proto file for an action:

// Copyright 2021 Lightbend Inc.

syntax = "proto3";

package perf.action;

import "perf_domain.proto";
import "perf_api.proto";
import "google/protobuf/empty.proto";
import "google/api/annotations.proto";
import "akkaserverless/annotations.proto";

message PingRequest {}

message PongResponse {}

service PingActionService {
  rpc Ping(PingRequest) returns (PongResponse) {
    option (google.api.http) = {
      get: "/action/ping"
    };
  }
}

service FromValueActionService {
  rpc Consume(domain.ValueState) returns (google.protobuf.Empty) {
    option (akkaserverless.method).eventing.in = {
      value_entity: "values"
    };
  }
}

service FromTopicActionService {
  rpc Consume(domain.ValueState) returns (google.protobuf.Empty) {
    option (akkaserverless.method).eventing.in = {
      topic: "values"
    };
  }
}

message ToTopicRequest {
  string id = 1;
  int32 payload_size = 2;
}

service ToTopicActionService {
  rpc ProduceToValues(ToTopicRequest) returns (domain.ValueState) {
    option (akkaserverless.method).eventing.out = {
      topic: "values"
    };
  }

  rpc ProduceToIncrements(api.IncrementRequest) returns (api.IncrementRequest) {
    option (akkaserverless.method).eventing.out = {
      topic: "increments"
    };
  }
}

service FromValueToTopicActionService {
  rpc Produce(domain.ValueState) returns (domain.ValueState) {
    option (akkaserverless.method).eventing.in = {
      value_entity: "values"
    };
    option (akkaserverless.method).eventing.out = {
      topic: "values"
    };
  }
}

service FromTopicForwardToEntityActionService {
  rpc Forward(api.IncrementRequest) returns (google.protobuf.Empty) {
    option (akkaserverless.method).eventing.in = {
      topic: "increments"
    };
  }
}

The following shows the implementation:

/*
 * Copyright 2019 Lightbend Inc.
 */

const Action = require("@lightbend/akkaserverless-python-sdk").Action

const tckModel = new Action(
  "proto/action.proto",
  "akkaserverless.tck.model.action.ActionTckModel"
);

const Response = tckModel.lookupType("akkaserverless.tck.model.action.Response");

tckModel.commandHandlers = {
  ProcessUnary: processUnary,
  ProcessStreamedIn: processStreamedIn,
  ProcessStreamedOut: processStreamedOut,
  ProcessStreamed: processStreamed
};

function processUnary(request, context) {
  respondWith(singleResponse(createResponses(request)), context);
}

function processStreamedIn(context) {
  const responses = [];
  context.on("data", request => responses.push(...createResponses(request)));
  context.on("end", () => respondWith(singleResponse(responses), context));
}

function processStreamedOut(request, context) {
  createResponses(request).forEach(response => respondWith(response, context));
  context.end();
}

function processStreamed(context) {
  context.on("data", request => createResponses(request).forEach(response => respondWith(response, context)));
  context.on("end", () => context.end());
}

function respondWith(response, context) {
  // need to accumulate effects, before replying, forwarding, or failing
  response.effects.forEach(effect => context.effect(two.service.methods.Call, { id: effect.id }, effect.synchronous));
  if (response.fail) context.fail(response.fail);
  else if (response.forward) context.forward(two.service.methods.Call, { id: response.forward });
  else if (response.reply) context.write(Response.create({ message: response.reply }));
  else context.write(); // empty message
}

function createResponses(request) {
  return request.groups.map(createResponse);
}

function createResponse(group) {
  const response = {
    effects: []
  };
  group.steps.forEach(step => {
    if (step.reply) {
      response.reply = step.reply.message;
    } else if (step.forward) {
      response.forward = step.forward.id;
    } else if (step.effect) {
      response.effects.push({ id: step.effect.id, synchronous: step.effect.synchronous });
    } else if (step.fail) {
      response.fail = step.fail.message;
    }
  });
  return response;
}

function singleResponse(responses) {
  return responses.reduce((response, next) => ({
    reply: next.reply || response.reply,
    forward: next.forward || response.forward,
    fail: next.fail || response.fail,
    effects: response.effects.concat(next.effects)
  }), { effects: [] });
}

const two = new Action(
  "proto/action.proto",
  "akkaserverless.tck.model.action.ActionTwo"
);

two.commandHandlers = {
  Call: request => Response.create()
};