47 Degrees joins forces with Xebia read more

Mu-RPC: defining messages and services

Mu-RPC: defining messages and services

This is the second article of an ongoing series about the Mu open source library for building purely functional microservices. Previously, Juan Pedro Moreno provided an overview of Mu and illustrated the different types of communication between services and how they are defined with Protocol Buffers, Avro, Open API, and Mu:

Mu - Types of Communication

In this post, we’ll cover how to define messages and services and their usage.

Service Protocol Definition

To best illustrate Mu-rpc abilities, let’s consider examples of communication between servers and clients (see gRPC concepts):

  • Unary RPC: the client sends a single request and receives a single response from the server.
  • Server streaming RPC: similar to the unary service, but in this case, the server will send back a stream of responses for a client request.
  • Client streaming RPC: in this case, it’s the client that sends a stream of requests. The server will respond with a single response.
  • Bidirectional streaming RPC: a mix of server and client streaming as both sides will be sending a stream of data.

We will also talk about the unary and streaming services. Please note that the Avro format doesn’t support streaming communications therefore we will focus on then Protobuf format.

Mu offers the following annotations for defining messages and services:

@service

  • Scope: Trait
  • Arguments: SerializationType, Compression

Tags the trait as an RPC service in order to derive server and client code (macro expansion). For the SerializationType parameter value, Protobuf and Avro are the current supported serialization methods. For the Compression parameter value, only Gzip is supported.

@message

  • Scope: Case Class
  • Arguments: -

Tags the case class as an RPC message.

You can start with a simple service definition like this:

object service {
  import higherkindness.mu.rpc.protocol._
  import monix.reactive.Observable

  @message
  case class SumRequest(a: Int, b: Int)

  @message
  case class SumResponse(sum: Int)

  @message
  case class FibonacciResponse(a: Int)

  @message
  case class FibonacciRequest(n: Int)

  @message
  case class Rate(r: Int)

  @service(Protobuf)
  trait MathService[F[_]] {
    def sum(sumRequest: SumRequest): F[SumResponse]

    def fibonacci(fibonacciRequest: FibonacciRequest): Observable[FibonacciResponse]

    def squares(rates: Observable[Rate]): Observable[Rate]
  }
}

First and foremost, you need to import higherkindness.mu.rpc.protocol._ to enable the annotations. The given definition has five messages represented as case classes annotated with @message and one service. As you can see from the @service annotation parameter, MathService uses Protobuf to serialize messages.

Mu allows you to write the RPC services using Tagless Final encoding, that way you only need to concentrate on the API that you want to expose, without worrying about how it will be implemented.

The first method definition is our unary service:

  def sum(sumRequest: SumRequest): F[SumResponse]

It has a single request and a single response baked inside F, our Tagless Final algebra parameter.

The second and the third methods define streams. In mu, the streaming features have been implemented based on two data types, monix.reactive.Observable and fs2.Stream. You can choose whichever data type best fits your application’s needs. For more information on these two data types, you can review the Monix and FS2 documentation.

Based on your selected data type for defining streams, please add the corresponding dependency: mu-rpc-monix or mu-rpc-fs2.

The following example uses monix.reactive.Observable:

  def fibonacci(fibonacciRequest: FibonacciRequest): Observable[FibonacciResponse]

  def squares(rates: Observable[Rate]): Observable[Rate]

The same methods using fs2.Stream

  def fibonacci(fibonacciRequest: FibonacciRequest): Stream[F, FibonacciResponse]

  def squares(rates: Stream[F, Rate]): Stream[F, Rate]

Server

Let’s start by implementing our MathService:

import cats.syntax.applicative._

class MathServiceHandler[F[_]](implicit M: Applicative[F]) extends MathService[F] {
  override def sum(sumRequest: SumRequest): F[SumResponse] =
    SumResponse(sumRequest.a + sumRequest.b).pure[F]

  override def fibonacci(request: FibonacciRequest): Observable[service.FibonacciResponse] = {
    ??? // skip implementation for simplification as it's not important for our purposes
  }

  override def squares(rates: Observable[Rate]): Observable[Rate] = {
      rates.map(r => Rate(r.r ^ 2))
  }
}

In addition to the implementation of MathService, you might need to add a different set of implicits to your runtime depending on which effect you want to use. In the case of IO, you need a ContextShift[IO] for running IO instances and a Timer[IO] for scheduling:

sealed trait ServerImplicits {
  implicit val timer: Timer[IO] = IO.timer(EC)
  implicit val cs: ContextShift[IO] = IO.contextShift(EC)

  implicit val mathServiceHandler: MathService[IO] = new MathServiceHandler[IO]()
}

Now we can use an enriched version of MathService to build a server application:

object ServerApp {
  def main(args: Array[String]): Unit = {
    val grpcConfig: IO[List[GrpcConfig]] = List(MathService.bindService[IO].map(AddService)).sequence

    val runServer = for {
      config <- grpcConfig
      server <- BuildServerFromConfig[IO]("rpc.server.port", config)
      _ <- GrpcServer.server[IO](server)
    } yield ()

    runServer.unsafeRunSync
  }
}

For the server bootstrapping, remember to add the mu-rpc-server dependency to your build.

As it turns out, the MathService.bindService method generated by Mu constructs a definition of a service to be exposed via a Server. AddService is a gRPC config which adds a service implementation to the handler registry. BuildServerFromConfig, which is also provided by Mu, takes the config as a parameter to build a gRPC server. Finally, GrpcServer.server takes care of the server life cycle methods, i.e., starting it (lazily) and eventually shutting it down. After our IO is ready we can run the server using unsafeRunSync.

Now we’re ready to work on the client’s side.

Client

Different clients can share common runtime variables that are needed for executing our effects from the cats-effect library:

trait CommonRuntime {
  implicit val channelFor: ChannelFor =
    ConfigForAddress[IO]("rpc.client.host", "rpc.client.port").unsafeRunSync()

  implicit val EC: ExecutionContext = ExecutionContext.Implicits.global
}

We are going to rely on the default behavior of passing the ChannelFor directly to the client builder. host and port will be derived from an application configuration file. Note that you need to add the mu-config dependency to your build if you want to take advantage of using the ConfigForAddress helper.

Unary Client

The simplest unary client can look like this:

trait SumClient[F[_]] {
  def sum(sumRequest: SumRequest): F[Int]
}

In the following example, we will use a runtime that’s similar to what we used for the server. We are going to use an automatically derived client which is provided by mu, by calling the method MathService.client. This is especially useful because you can distribute it depending on the protocol/service definitions. If you change something in your protocol definition, you will get a new client for free without having to write anything.

Note that you need to add one of the following client dependencies to your build: mu-rpc-client-netty or mu-rpc-client-okhttp.

trait SumClientImplicits extends CommonRuntime {
  implicit val timer: Timer[IO] = IO.timer(EC)
  implicit val cs: ContextShift[IO] = IO.contextShift(EC)

  val sumClient: Resource[IO, MathService[IO]] =
    MathService.client[IO](channelFor)

  implicit val mathClientHandler: SumClientHandler[IO] =
    new SumClientHandler[IO](sumClient)
}

Next, we add the SumApp implementation:

object SumApp {
  def main(args: Array[String]): Unit = {
    sum[IO].unsafeRunSync()
  }

  def sum[M[_] : Monad](implicit client: SumClient[M]): M[Int] =
    client.sum(SumRequest(1, 2))
}

Streaming Client using Monix

You can simply use Monix.Task as an effect in your runtime:

trait FibonacciClientImplicits extends CommonRuntime {
  implicit val S: Scheduler = Scheduler.Implicits.global

  implicit val fibonacciClient: Resource[Task, MathService[Task]] =
    MathService.client[Task](channelFor)

  implicit val fibonacciClientHandler: FibonacciClientHandler[Task] =
    new FibonacciClientHandler[Task]
}

Then, in the application, you can await for a result like this:

object FibonacciClientApp {
  def main(args: Array[String]): Unit = {
    Await.result(fibonacci[Task].runToFuture, Duration.Inf)
  }

  def fibonacci[M[_] : Monad](implicit client: FibonacciClient[M]): M[Unit] =
    client.fibonacci(10)
}

Bi-directional streaming can be done in a very similar fashion.

Summary

You should now have a basic understanding of how to build RPC server and client applications with Mu, using popular FP patterns including Tagless Final encoding, where all the effects are shifted on the edge of both server and client applications. Later in this series, we will tackle some of Mu’s capabilities like Authentication and Securing the communications, metrics reporting, and even deploying Mu to the cloud.

If you want to dive deeper into Mu, we have a few complete examples available, one of these is based on the Route Guide Demo originally shared by the gRPC Java Project.

Ensure the success of your project

47 Degrees can work with you to help manage the risks of technology evolution, develop a team of top-tier engaged developers, improve productivity, lower maintenance cost, increase hardware utilization, and improve product quality; all while using the best technologies.