Mu-RPC: defining messages and services
by Oli Makhasoeva
- •
- February 21, 2019
- •
- mu• scala• rpc• http• avro• protobuff• protocol• buffers• json• openapi• microservices
- |
- 12 minutes to read.

This is the second article of an ongoing series about the Mu open source library for building purely functional microservices. Previously, Juan Pedro Moreno provided an overview of Mu and illustrated the different types of communication between services and how they are defined with Protocol Buffers, Avro, Open API, and Mu:
In this post, we’ll cover how to define messages and services and their usage.
Service Protocol Definition
To best illustrate Mu-rpc abilities, let’s consider examples of communication between servers and clients (see gRPC concepts):
- Unary RPC: the client sends a single request and receives a single response from the server.
- Server streaming RPC: similar to the unary service, but in this case, the server will send back a stream of responses for a client request.
- Client streaming RPC: in this case, it’s the client that sends a stream of requests. The server will respond with a single response.
- Bidirectional streaming RPC: a mix of server and client streaming as both sides will be sending a stream of data.
We will also talk about the unary and streaming services.
Please note that the Avro
format doesn’t support streaming communications therefore we will focus on then Protobuf
format.
Mu offers the following annotations for defining messages and services:
@service
- Scope:
Trait
- Arguments:
SerializationType
,Compression
Tags the trait as an RPC service in order to derive server and client code (macro expansion). For the SerializationType
parameter value, Protobuf
and Avro
are the current supported serialization methods. For the Compression
parameter value, only Gzip
is supported.
@message
- Scope:
Case Class
- Arguments: -
Tags the case class as an RPC message.
You can start with a simple service definition like this:
object service {
import higherkindness.mu.rpc.protocol._
import monix.reactive.Observable
@message
case class SumRequest(a: Int, b: Int)
@message
case class SumResponse(sum: Int)
@message
case class FibonacciResponse(a: Int)
@message
case class FibonacciRequest(n: Int)
@message
case class Rate(r: Int)
@service(Protobuf)
trait MathService[F[_]] {
def sum(sumRequest: SumRequest): F[SumResponse]
def fibonacci(fibonacciRequest: FibonacciRequest): Observable[FibonacciResponse]
def squares(rates: Observable[Rate]): Observable[Rate]
}
}
First and foremost, you need to import higherkindness.mu.rpc.protocol._
to enable the annotations.
The given definition has five messages represented as case classes annotated with @message
and one service.
As you can see from the @service
annotation parameter, MathService
uses Protobuf
to serialize messages.
Mu allows you to write the RPC services using Tagless Final encoding, that way you only need to concentrate on the API that you want to expose, without worrying about how it will be implemented.
The first method definition is our unary service:
def sum(sumRequest: SumRequest): F[SumResponse]
It has a single request and a single response baked inside F
, our Tagless Final algebra parameter.
The second and the third methods define streams.
In mu, the streaming features have been implemented based on two data types, monix.reactive.Observable
and fs2.Stream
. You can choose whichever data type best fits your application’s needs. For more information on these two data types, you can review the Monix and FS2 documentation.
Based on your selected data type for defining streams, please add the corresponding dependency: mu-rpc-monix
or mu-rpc-fs2
.
The following example uses monix.reactive.Observable
:
def fibonacci(fibonacciRequest: FibonacciRequest): Observable[FibonacciResponse]
def squares(rates: Observable[Rate]): Observable[Rate]
The same methods using fs2.Stream
def fibonacci(fibonacciRequest: FibonacciRequest): Stream[F, FibonacciResponse]
def squares(rates: Stream[F, Rate]): Stream[F, Rate]
Server
Let’s start by implementing our MathService
:
import cats.syntax.applicative._
class MathServiceHandler[F[_]](implicit M: Applicative[F]) extends MathService[F] {
override def sum(sumRequest: SumRequest): F[SumResponse] =
SumResponse(sumRequest.a + sumRequest.b).pure[F]
override def fibonacci(request: FibonacciRequest): Observable[service.FibonacciResponse] = {
??? // skip implementation for simplification as it's not important for our purposes
}
override def squares(rates: Observable[Rate]): Observable[Rate] = {
rates.map(r => Rate(r.r ^ 2))
}
}
In addition to the implementation of MathService
, you might need to add a different set of implicits to your runtime depending on which effect you want to use.
In the case of IO
, you need a ContextShift[IO] for running IO instances and a Timer[IO] for scheduling:
sealed trait ServerImplicits {
implicit val timer: Timer[IO] = IO.timer(EC)
implicit val cs: ContextShift[IO] = IO.contextShift(EC)
implicit val mathServiceHandler: MathService[IO] = new MathServiceHandler[IO]()
}
Now we can use an enriched version of MathService
to build a server application:
object ServerApp {
def main(args: Array[String]): Unit = {
val grpcConfig: IO[List[GrpcConfig]] = List(MathService.bindService[IO].map(AddService)).sequence
val runServer = for {
config <- grpcConfig
server <- BuildServerFromConfig[IO]("rpc.server.port", config)
_ <- GrpcServer.server[IO](server)
} yield ()
runServer.unsafeRunSync
}
}
For the server bootstrapping, remember to add the mu-rpc-server
dependency to your build.
As it turns out, the MathService.bindService
method generated by Mu constructs a definition of a service to be exposed via a Server.
AddService
is a gRPC config which adds a service implementation to the handler registry.
BuildServerFromConfig
, which is also provided by Mu, takes the config as a parameter to build a gRPC server.
Finally, GrpcServer.server
takes care of the server life cycle methods, i.e., starting it (lazily) and eventually shutting it down.
After our IO
is ready we can run the server using unsafeRunSync
.
Now we’re ready to work on the client’s side.
Client
Different clients can share common runtime variables that are needed for executing our effects from the cats-effect library:
trait CommonRuntime {
implicit val channelFor: ChannelFor =
ConfigForAddress[IO]("rpc.client.host", "rpc.client.port").unsafeRunSync()
implicit val EC: ExecutionContext = ExecutionContext.Implicits.global
}
We are going to rely on the default behavior of passing the ChannelFor
directly to the client builder. host
and port
will be derived from an application configuration file.
Note that you need to add the mu-config
dependency to your build if you want to take advantage of using the ConfigForAddress
helper.
Unary Client
The simplest unary client can look like this:
trait SumClient[F[_]] {
def sum(sumRequest: SumRequest): F[Int]
}
In the following example, we will use a runtime that’s similar to what we used for the server.
We are going to use an automatically derived client which is provided by mu, by calling the method MathService.client
.
This is especially useful because you can distribute it depending on the protocol/service definitions.
If you change something in your protocol definition, you will get a new client for free without having to write anything.
Note that you need to add one of the following client dependencies to your build: mu-rpc-client-netty
or mu-rpc-client-okhttp
.
trait SumClientImplicits extends CommonRuntime {
implicit val timer: Timer[IO] = IO.timer(EC)
implicit val cs: ContextShift[IO] = IO.contextShift(EC)
val sumClient: Resource[IO, MathService[IO]] =
MathService.client[IO](channelFor)
implicit val mathClientHandler: SumClientHandler[IO] =
new SumClientHandler[IO](sumClient)
}
Next, we add the SumApp
implementation:
object SumApp {
def main(args: Array[String]): Unit = {
sum[IO].unsafeRunSync()
}
def sum[M[_] : Monad](implicit client: SumClient[M]): M[Int] =
client.sum(SumRequest(1, 2))
}
Streaming Client using Monix
You can simply use Monix.Task
as an effect in your runtime:
trait FibonacciClientImplicits extends CommonRuntime {
implicit val S: Scheduler = Scheduler.Implicits.global
implicit val fibonacciClient: Resource[Task, MathService[Task]] =
MathService.client[Task](channelFor)
implicit val fibonacciClientHandler: FibonacciClientHandler[Task] =
new FibonacciClientHandler[Task]
}
Then, in the application, you can await for a result like this:
object FibonacciClientApp {
def main(args: Array[String]): Unit = {
Await.result(fibonacci[Task].runToFuture, Duration.Inf)
}
def fibonacci[M[_] : Monad](implicit client: FibonacciClient[M]): M[Unit] =
client.fibonacci(10)
}
Bi-directional streaming can be done in a very similar fashion.
Summary
You should now have a basic understanding of how to build RPC server and client applications with Mu, using popular FP patterns including Tagless Final encoding, where all the effects are shifted on the edge of both server and client applications. Later in this series, we will tackle some of Mu’s capabilities like Authentication and Securing the communications, metrics reporting, and even deploying Mu to the cloud.
If you want to dive deeper into Mu, we have a few complete examples available, one of these is based on the Route Guide Demo originally shared by the gRPC Java Project.