47 Degrees joins forces with Xebia read more

Run a fs2-kafka stream with scala-cli and Scala 3

Run a fs2-kafka stream with scala-cli and Scala 3

In this blog post, I’m sharing a real use case that I’ve experienced recently in one of the projects I’m working on.

Specifically, I had to build a Kafka processing pipeline to mock a new service under development.

In this case, the pipeline does the following:

  • Consume from an input-topic
  • Get the keys from that input-topic
  • Generate fixed data into an output-topic using the key from the previous topic.

Diagram

My team had developed a service that consumes this test data from the output-topic and joins it with another-input-topic.

Our pipeline is mocking a service developed by another party that is under active development.

Complete Diagram

Build the Kafka stream

Assuming you have a Kafka cluster accessible on localhost:9092, you could use a fs2 stream scala-cli script based on thefs2-kafka quickstart example.

If you do not know scala-cli or ammonite scripts, you can add a dependency by declaring it at the start of your script:

import $ivy.com.github.fd4s::fs2-kafka:2.2.0`

Here is the complete example:

import $ivy.`com.github.fd4s::fs2-kafka:2.2.0`

import cats.effect._
import cats.effect.unsafe.implicits.global
import fs2.kafka._
import scala.concurrent.duration._

def log(a: Any) = IO(println(a))

def processRecord(
    record: ConsumerRecord[Long, String]
): IO[Long] =
  log(s"Processing: ${record.key}") *> IO.pure(record.key)

val consumerSettings =
  ConsumerSettings[IO, Long, String]
    .withAutoOffsetReset(AutoOffsetReset.Latest)
    .withBootstrapServers("localhost:9092")
    .withGroupId("test-metadata-producer")

val producerSettings =
  ProducerSettings[IO, Long, String]
    .withBootstrapServers("localhost:9092")

val stream =
  KafkaConsumer
    .stream(consumerSettings)
    .subscribeTo("events")
    .records    
    .mapAsync(2) { committable =>
      processRecord(committable.record)
        .map { key =>
          val value: String = s"My custom message for the output topic $key"            
          val record = ProducerRecord("updates", key, value)
          ProducerRecords.one(record, committable.offset)
        }
    }
    .through(KafkaProducer.pipe(producerSettings))
    .map(_.passthrough)
    .through(commitBatchWithin(500, 15.seconds))


(log("Starting the processor") *> stream.compile.drain.as(ExitCode.Success)).unsafeRunSync()

Essentially, we’ve created an ETL Pipeline (ETL, which stands for extract, transform, and load) script able to start fs2 stream consuming the topic records, extracting the keys in processRecord, building the new record for the output-topic, and piping it to a Kafka producer.

Save this as fs2-kafka-stream.sc.

You could run it using scala-cli run fs2-kafka-stream.sc.

Kubernetes Deployment

Now you have to test this on a Kubernetes test cluster.

Please do not do this in production! In production, you should deploy using a CI/CD tool from a version control manifests.

You could do the following:

  • Define a Kubernetes ConfigMap to host your script.
  • Replace the references to localhost:9092 with a the Kafka Bootstrap URL of the cluster. In our example, kafka-bootstrap:9092.
  • Create a Deployment that uses the config map as as volume.
  • Use scala-cli Docker Image to run the script.
  • Deploy it to the cluster kubectl apply -f my-deployment.yaml.

And that’s all!

Here you have the complete Kubernetes YAML manifests that includes the ConfigMap and the Deployment.

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: scala-cli-scripts
data:
  fs2-kafka-stream.sc: |
    import $ivy.`com.github.fd4s::fs2-kafka:2.2.0`
    import cats.effect._
    import cats.effect.unsafe.implicits.global
    import fs2.kafka._
    import scala.concurrent.duration._

    def log(a: Any) = IO(println(a))

    def processRecord(
        record: ConsumerRecord[Long, String]
    ): IO[Long] =
      log(s"Processing: ${record.key}") *> IO.pure(record.key)

    val consumerSettings =
      ConsumerSettings[IO, Long, String]
        .withAutoOffsetReset(AutoOffsetReset.Latest)
        .withBootstrapServers("localhost:9092")
        .withGroupId("test-metadata-producer")

    val producerSettings =
      ProducerSettings[IO, Long, String]
        .withBootstrapServers("localhost:9092")

    val stream =
      KafkaConsumer
        .stream(consumerSettings)
        .subscribeTo("events")
        .records    
        .mapAsync(2) { committable =>
          processRecord(committable.record)
            .map { key =>
              val value: String = s"My custom message for the output topic $key"            
              val record = ProducerRecord("updates", key, value)
              ProducerRecords.one(record, committable.offset)
            }
        }
        .through(KafkaProducer.pipe(producerSettings))
        .map(_.passthrough)
        .through(commitBatchWithin(500, 15.seconds))

    (log("Starting the processor") *> stream.compile.drain.as(ExitCode.Success)).unsafeRunSync()

---    
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    service: scala-cli-fs2-kafka-stream
  name: scala-cli-fs2-kafka-stream

spec:
  replicas: 1
  selector:
    matchLabels:
      service: scala-cli-fs2-kafka-stream
  strategy: {}
  template:
    metadata:
      labels:
        service: scala-cli-fs2-kafka-stream
    spec:
      volumes:
      - name: scala-cli-scripts # (1)
        configMap:
          name: scala-cli-scripts
      containers:
        - image: virtuslab/scala-cli
          name: scala-cli
          volumeMounts:
            - name: scala-cli-scripts # (2)
              mountPath: /usr/local/scala-cli-scripts
          args:
            - run
            - /usr/local/scala-cli-scripts/fs2-kafka-stream.sc # (3)
          resources:
            limits:
              cpu: 2
              memory: 1Gi
            requests:
              cpu: 2
              memory: 1Gi
      restartPolicy: Always

Please note how:

  • the ConfigMap is declared as a volume in 1
  • the volume is mounted at /usr/local/scala-cli-scripts in 2
  • the script is run from /usr/local/scala-cli-scripts/fs2-kafka-stream.sc in 3

If you inspect the deployment logs, you should see something like this:

Processing: key1
Processing: key2
Processing: key3

As a side note, to interact with a Kubernetes cluster, I strongly recommend using k9s

Once you are done, you can delete the deployment using kubectl delete -f my-deployment.yaml!

Other use cases

You could use these scala-cli Kubernetes manifests to experiment with anything you like on a test cluster, such as:

  • Query a Postgres DB and extract some data using Skunk.
  • Automate the insertion of any other type of test data.
  • Send HTTP requests.
  • Generate quick reports.

The limit is your imagination.

Useful Resources

Quick Example · FS2 Kafka

Run - Scala CLI

GitHub - derailed/k9s: 🐶 Kubernetes CLI To Manage Your Clusters In Style!

Ensure the success of your project

47 Degrees can work with you to help manage the risks of technology evolution, develop a team of top-tier engaged developers, improve productivity, lower maintenance cost, increase hardware utilization, and improve product quality; all while using the best technologies.