Publishing and subscribing to topics on a broker

Akka Serverless integrates with Google Cloud Pub/Sub to enable asynchronous messaging. Consumers of published messages are guaranteed to receive them at least once. This means that receivers must be able to handle duplicate messages. If you are new to the world of Pub/Sub, many online resources provide tips on handling duplicate messages.

This page describes how to subscribe to published events, but you also need to configure Google Cloud Pub/Sub access for your Akka Serverless project as explained in How to configure a broker.

It is your responsibility to create the topics in Google Cloud Pub/Sub before configuring publishers or subscribers.

Subscribing to a topic’s messages

To receive messages from a Pub/Sub topic, annotate a service method in the Protobuf service definition with the (akkaserverless.method).eventing annotation. Specify the topic name in the in section of the annotation:

syntax = "proto3";

package shopping.cart.actions;

import "akkaserverless/annotations.proto";
import "cart/shopping_cart_domain.proto";
import "google/protobuf/empty.proto";

service ShoppingCartAnalyticsService {
  // get ItemAdded from the topic
  rpc ProcessAdded(shopping.cart.domain.ItemAdded) returns (google.protobuf.Empty) {
    option (akkaserverless.method).eventing.in = { (1)
      topic: "shopping-cart-events" (2)
    };
  }
}
1 annotate the Protobuf rpc method with (akkaserverless.method).eventing
2 use in and topic to subscribe to a topic

There is nothing specific required in the implementation of ProcessAdded. The implementation will in most cases be an Action and forward a converted message to a different component (eg. an Event Sourced Entity).

Receiving JSON messages

Your Akka Serverless service may subscribe to topics that use messages in JSON format. The messages must have the Content-Type attribute stating application/json.

The Protobuf rpc method receiving these JSON messages must be set up to receive Any.

syntax = "proto3";

import "google/protobuf/empty.proto";
import "google/protobuf/any.proto";
import "akkaserverless/annotations.proto";

package shopping.cart.actions;

service ShoppingCartTopicService {
    rpc JsonFromTopic(google.protobuf.Any) returns (google.protobuf.Empty) {
        option (akkaserverless.method).eventing.in = {
            topic:  "shopping-cart-json"
        };
    }
}

Receiving CloudEvents

Akka Serverless uses the CloudEvents standard when receiving from and publishing to topics. The CloudEvents specification standardizes message metadata so that systems can integrate more easily.

Describing the structure of the message payload is the CloudEvents feature most important to Akka Serverless.

An example of that is the capability to send serialized Protobuf messages and have Akka Serverless deserialize them accordingly.

To allow proper reading of Protobuf messages from topics, the messages need to specify the message attributes:

  • Content-Type = application/protobuf

  • ce-specversion = 1.0

  • ce-type = fully qualified protobuf message name (eg. shopping.cart.api.TopicOperation)

(The ce- prefixed attributes are part of the CloudEvents specification.)

The Protobuf rpc declaration uses the expected Protobuf message type and specifies the topic to subscribe to. You’ll normally want to share the exact Protobuf message declaration with the sending system.

syntax = "proto3";

import "google/protobuf/empty.proto";
import "google/protobuf/any.proto";
import "akkaserverless/annotations.proto";

package shopping.cart.api;

message TopicOperation {
    string operation = 1;
}

service ShoppingCartTopicService {

    rpc ProtobufFromTopic(TopicOperation) returns (google.protobuf.Empty) {
        option (akkaserverless.method).eventing.in = {
            topic:  "shopping-cart-protobuf-cloudevents"
        };
    }
}