Code instrumentation in Mu-RPC

Code instrumentation in Mu-RPC

Metrics are an essential part of a backend application but are often overlooked. This is especially true for architectures involving a mass of microservices distributed in the cloud, where inherently, many points of failure can be found. This is where metrics can offer invaluable insight into what goes on inside distributed cloud services, especially when they run under extreme situations.

Mu offers integrations with two of the most important Metrics libraries: DropWizard Metrics and Prometheus. This article shows how easy it is to integrate these libraries to measure crucial information in a GRPC-based service and how to plug in your implementation for sending that data to other metrics’ backends.

A simple example service

Let’s imagine that we have a system similar to the historic Traf-O-Data application, the first project that Paul Allen, Bill Gates, and Paul Gilbert created together. This application creates reports of the level of traffic on multiple roadways. Sensors on the roads gather data on the number of cars passing over and sends that information at different rates to our server, which holds the responsibility of storing the data in a data warehouse.

We’re going to have two small applications: a GRPC client that will represent the sensors sending the data to the central server, and a GRPC server that will pick up those requests and send a correct response. We’ll omit other functions (i.e., storing the data in Cassandra or another data warehouse system) as they’re out of the scope of this article. We don’t need to worry about instrumenting our application yet, as we’ll handle the metrics as side-effects configured at the very end of our implementation. For now, we can focus on the details of the app:

Data model

Both of our apps have a common simple data model:

import org.joda.time._

final case class TrafficReportRequest(highwayCode: Int, startDate: LocalDateTime, endDate: LocalDateTime, amountGoing: Long, amountComing: Long)
final case class TrafficReportResponse(status: Int)

The requests will include an identifier code for the highway the sensor is measuring, the start and date times of the measure that’s being sent, and how many cars were detected in both directions. The response contains a status code we can use to track different behaviors or even error handling.

The Service Protocol

We’ll use Protocol Buffer to represent our service, specifying the service with Mu through code is pretty straight-forward:

import higherkindness.mu.rpc.protocol.{Protobuf, service}
import higherkindness.mu.rpc.marshallers.jodaTimeEncoders.pbd._

@service(Protobuf) trait TrafficReportService[F[_]] {
  def reportTraffic(request: TrafficReportRequest): F[TrafficReportResponse]
}

For the sake of simplicity, we’ve specified the service writing it directly in our Scala implementation, but our recommendation is to use IDL protocol definitions (i.e., .proto in this case).

The client

Our client will send repeated requests to the server with different delays, to pretend we’re getting data from various sources and paces. First, we need to define a client from the service we’ve created previously. Achieving that is easy thanks to the Mu @service annotation, which allows us to auto-derive one by using the client function.

def client[F[_]: ConcurrentEffect]
  (implicit cs: ContextShift[F], timer: Timer[F]): Resource[F, TrafficReportService[F]] =
    TrafficReportService.client[IO](
      ChannelForAddress("localhost", 8080),
      List(UsePlaintext()))

Notice that we’re specifying UsePlainText() in our parameters list to simplify things by not including SSL. For more information about securing your Mu-GRPC applications, you can take a look at this article: Mu-RPC: securing communications. Please consider securing your endpoints in a production environment.

We’ll target a GRPC server using the same service and answering requests in port 8080 of localhost. Following this, we can add the functions that perform the repeated requests. These take a function to generate new requests with random data, a value for the maximum number of milliseconds each request should wait, and how many calls will be performed. We also need to provide the client created above, to be able to send the requests through it:

def callRepeatedly[F[_]: Logger](
  client: TrafficReportService[F],
  maxDelay: Int,
  numberOfCalls: Int,
  request: () => TrafficReportRequest
  )(implicit T: Timer[F], ev: Sync[F]): Stream[F, TrafficReportResponse] = {
    val requestOp = for {
      requestGen  <- ev.delay(request())
      response    <- client.reportTraffic(requestGen)
    } yield response

    Stream.eval(requestOp)
      .evalTap(_ => T.sleep(Random.nextInt(maxDelay).millis))
      .repeatN(numberOfCalls)
}

def callRepeatedlyStream(
  request: () => TrafficReportRequest,
  maxDelay: Int,
  numberOfCalls: Int): Stream[IO, TrafficReportResponse] =
    Stream.resource(client[IO])
      .flatMap(c => callRepeatedly(c, maxDelay, numberOfCalls, request))

Then it’s just a question of providing the function that will generate the random requests before we start sending them. In the end, we’ll print the results we obtained from the server:

def generateRequest() = TrafficReportRequest(
    highwayCode = Random.nextInt(100),
    startDate = LocalDateTime.now().minusHours(1),
    endDate = LocalDateTime.now(),
    amountGoing = Random.nextInt(200),
    amountComing = Random.nextInt(200))

callRepeatedlyStream(
    generateRequest,
    maxDelay = 2000,
    numberOfCalls = 100)
    .compile
    .toList
    .unsafeRunSync()

The server

Our basic server has a similar structure to the client, with some caveats. The main difference, in this case, is that we need to provide an actual implementation to the service we defined above. As you can see, we print each request we receive and each response we return, but in a real case, we’d deserialize, validate, and persist the incoming events:

import cats.effect._
import cats.implicits._
import higherkindness.mu.protocols._
import io.chrisdavenport.log4cats.Logger

class TrafficReportServiceImpl[F[_]: ConcurrentEffect](
  implicit logger: Logger[F]) extends TrafficReportService[F] {
    def reportTraffic(request: TrafficReportRequest): F[TrafficReportResponse] =
      for {
        _ <- logger.info(s"Received request: $request")
        response = TrafficReportResponse(200)
        _ <- logger.info(s"Generating response: $response")
      } yield response
}

Once we specify how our service is going to behave when it’s called by the client, we can run it! To do this, we need to provide implicit instances for our Logger and TrafficReportService instances, so we can then bind our service to IO and start it in port 8080. Notice the List[GrpcConfig] while we generate grpcServer, so far, we only contain the actual creation of the server (AddService(service)), but we’ll add this part of the code to our metrics later:

val app = for {
  implicit0(logger: Logger[IO])   <- Slf4jLogger.create[IO]
  implicit0(service: TrafficReportService[IO]) = new TrafficReportServiceImpl[IO]
  service     <- TrafficReportService.bindService[IO]
  grpcServer  <- GrpcServer.default[IO](8080, List(AddService(service)))
  _           <- GrpcServer.server[IO](grpcServer)
} yield ()

app.unsafeRunSync()

(Note: we’re using the better-monadic-for plugin, that allows us to create implicit instances in our for comprehensions).

Running this will start the server based on our service, listening to requests coming to port 8080.

Implementing metrics in our server

Provided metrics

These are the metrics Mu exposes when associated with a GRPC server:

  • Active calls: currently running connections.
  • Messages sent: number of messages that the server sent (distributed by service name and method name).
  • Messages received: number of messages that the server got (distributed by service name and method name).
  • Timer for header calls, total calls, and also distributed by method types (i.e., unary, streaming…) and statuses (i.e., ok, canceled…).

Mu uses the MetricsServerInterceptor class as a bridge between our RPC service and the two supported frameworks to relay these metrics in a timely fashion:

Mu - MetricsServerInterceptor

DropWizard

The first step in making our server send metrics through DropWizard is to create a DropWizardMetrics instance based on a MetricRegistry:

val registry: MetricRegistry = new MetricRegistry()
val dwMetrics: DropWizardMetrics[IO] = DropWizardMetrics[IO](registry, "traffic-report")

Once we have that, we need to set the interceptor that will capture all of the events associated with the server and relay those to the DropWizard metrics registry. In order to instantiate it, we need to provide the DropWizardMetrics instance that we just created, and an optional string that will work as a classifier to group metrics under the same service, if required.

import interceptors.implicits._

val app = for {
  implicit0(logger: Logger[IO])   <- Slf4jLogger.create[IO]
  implicit0(service: TrafficReportService[IO]) = new TrafficReportServiceImpl[IO]
  service     <- TrafficReportService.bindService[IO]
  dwMetrics     = DropWizardMetrics[IO](registry, "traffic-report")
  grpcConfigDw  = AddService(
      service.interceptWith(
          MetricsServerInterceptor(dwMetrics, Some("traffic-metrics"))))
  grpcServer  <- GrpcServer.default[IO](8080, List(AddService(service), grpcConfigDw))
  _           <- GrpcServer.server[IO](grpcServer)
} yield ()

app.unsafeRunSync()

With these additions, our GRPC server will provide metrics from the very first request it receives.

Checking metrics through JMX

To check the metrics from our server, DropWizard provides a reporter that exposes the metrics through JMX. Setting this up is really easy:

val jmxReporter = JmxReporter.forRegistry(registry)
jmxReporter.build().start()

Once the reporter is associated with the metrics registry, all of our metrics are exposed through JMX. For instance, the following capture is from JConsole, but we could use any JMX viewer:

Mu - DropWizardJMXReporting

Prometheus

Integrating Prometheus with Mu is similar to the process we use with DropWizard Metrics, with a few small differences. First, we need to create a CollectorRegistry, which is the Prometheus equivalent to MetricRegistry:

val cr = new CollectorRegistry()

Then, integrating this registry with our server is nearly the same process:

import interceptors.implicits._

val app = for {
  implicit0(logger: Logger[IO])   <- Slf4jLogger.create[IO]
  implicit0(service: TrafficReportService[IO]) = new TrafficReportServiceImpl[IO]
  service      <- TrafficReportService.bindService[IO]
  promMetrics  <- PrometheusMetrics.build[IO](cr, "traffic_report")
  grpcConfigProm  = AddService(service.interceptWith(MetricsServerInterceptor(promMetrics, Some("traffic_metrics"))))
  grpcServer  <- grpcServer  <- GrpcServer.default[IO](8080, List(AddService(service), grpcConfigProm))
  _           <- GrpcServer.server[IO](grpcServer)
} yield ()

app.unsafeRunSync()

Notice how we required the instantiation of PrometheusMetrics to return an IO. This is because Prometheus requires us to create the set of metrics before receiving them, as opposed to DropWizard that creates them on the fly. Also, be aware that some characters are forbidden to use for metrics labels under Prometheus; for instance, we used underscores instead of dashes.

Checking metrics through Prometheus server

So now we should be able to check our metrics too, right? I’m afraid we still need to do some additional work. Prometheus works with a server that provides great dashboards and the ability to perform queries to our metrics data, but to achieve that, we need to be able to submit the metrics to it. The easiest way to do this is to create an HTTP endpoint (/metrics) that the Prometheus server will be polling timely to obtain the data it needs to work. We’ll use http4s which provides a simple to use integration with Prometheus:

def exportMetrics(cr: CollectorRegistry, port: Int, host: String) =
    BlazeServerBuilder[IO].bindHttp(port, host)
      .withHttpApp(PrometheusExportService[IO](cr).routes.orNotFound)
      .serve

This function creates a Blaze HTTP server that will contain a route (metrics) that will be returning updates based on the data inside the CollectorRegistry instance we provide to it. Having this new HTTP server running alongside our GRPC server is straight-forward using an fs2 Stream:

val grpcService = Stream.eval(
    for {
      implicit0(logger: Logger[IO])   <- Slf4jLogger.create[IO]
      implicit0(service: TrafficReportService[IO]) = new TrafficReportServiceImpl[IO]
      service      <- TrafficReportService.bindService[IO]
      promMetrics  <- PrometheusMetrics.build[IO](cr, "traffic_report")
      grpcConfigProm  = AddService(service.interceptWith(MetricsServerInterceptor(promMetrics, Some("traffic_metrics"))))
      grpcServer  <- GrpcServer.default[IO](8080, List(AddService(service), grpcConfigProm))
      _           <- GrpcServer.server[IO](grpcServer)
    } yield ()
  )

  Stream(grpcService, exportMetrics(cr, 9001, "localhost"))
    .parJoinUnbounded
    .compile
    .drain
    .unsafeRunSync()

By wrapping the GRPC server creation under Stream.eval, we can combine it with the creation of the metrics report server using parJoinUnbounded. Both servers will run at the same time without issues.

Now, we need to configure our Prometheus server to call our fancy HTTP report server. Inside prometheus.yml we can add a new job that will tell Prometheus to poll /metrics under localhost:9001:

scrape_configs:
  - job_name: 'traffic-report'

    static_configs:
    - targets: ['localhost:9001']
      labels:
        group: 'traffic-report'

Let’s run the Prometheus server and perform a small query to check that everything is working. Let’s go to the Prometheus dashboard (by default in http://localhost:9090) and set a new graph with the query traffic_report_calls_total_count (any of our new metrics will work). By activating the Graph tab and reducing the time range to a sensible value (i.e., 5 minutes, unless you want to keep the client sending messages for an hour!), you’ll be able to see the increasing number of calls the server was receiving:

Mu - PrometheusReporting

Prometheus queries support combining multiple metrics to provide all sorts of insights. Combining your metrics with those Mu offers can provide you with crucial information on things such as debugging issues to detecting load stress patterns.

Client-side metrics

Our GRPC clients can also generate metrics in a similar way to what we just did with our server. Let’s take a brief look at how we can add metrics to our current client, by adding support to DropWizard metrics:

def client[F[_]: ConcurrentEffect](
  implicit cs: ContextShift[F], timer: Timer[F]): Resource[F, TrafficReportService[F]] = {
    val registry: MetricRegistry = new MetricRegistry()
    val jmxReporter = JmxReporter.forRegistry(registry)
    jmxReporter.build().start()

    val dwMetrics = DropWizardMetrics[IO](registry, "traffic-report")

    TrafficReportService.client[F](
      ChannelForAddress("localhost", 8080),
      List(
        UsePlaintext(),
        AddInterceptor(MetricsChannelInterceptor(dwMetrics, Some("traffic-report-client")))))
}

We only had to modify the client generation function, first creating the same metrics instances we used for our server (i.e., MetricRegistry, JmxReporter and DropWizardMetrics, etc…) and adding those to our client with AddInterceptor and a MetricsChannelInterceptor instance (notice it’s mostly the same code to our server, the only difference is that we don’t use MetricsServerInterceptor anymore).

Running the client now will output the same metrics through JMX as it did with the server before.

Summary

Integrating metrics with your GRPC services with Mu is a straight-forward process. Combining your business logic metrics with the metrics Mu provides about your services can offer you valuable insights that can assist you while fixing issues and help your applications grow.

Have questions or comments? Join us in the Mu Gitter channel. Mu is possible thanks to an awesome group of contributors. As with all of the open source projects under the 47 Degrees umbrella, we’re an inclusive environment and always looking for new contributors, and we’re happy to provide guidance and mentorship on contributing to, or using the library.

Ensure the success of your project

47 Degrees can work with you to help manage the risks of technology evolution, develop a team of top-tier engaged developers, improve productivity, lower maintenance cost, increase hardware utilization, and improve product quality; all while using the best technologies.