Run a fs2-kafka stream with scala-cli and Scala 3
by Miguel García Puyol
- •
- October 28, 2021
- •
- scala• scala3• kubernetes• kafka• fs2• fs2-kafka• fp• functional programming• functional
- |
- 6 minutes to read.

In this blog post, I’m sharing a real use case that I’ve experienced recently in one of the projects I’m working on.
Specifically, I had to build a Kafka processing pipeline to mock a new service under development.
In this case, the pipeline does the following:
- Consume from an
input-topic
- Get the keys from that
input-topic
- Generate fixed data into an
output-topic
using the key from the previous topic.
My team had developed a service that consumes this test data from the output-topic
and joins it with another-input-topic
.
Our pipeline is mocking a service developed by another party that is under active development.
Build the Kafka stream
Assuming you have a Kafka cluster accessible on localhost:9092
, you could use a fs2 stream scala-cli script based on thefs2-kafka
quickstart example.
If you do not know scala-cli
or ammonite
scripts, you can add a dependency by declaring it at the start of your script:
import $ivy.
com.github.fd4s::fs2-kafka:2.2.0`
Here is the complete example:
import $ivy.`com.github.fd4s::fs2-kafka:2.2.0`
import cats.effect._
import cats.effect.unsafe.implicits.global
import fs2.kafka._
import scala.concurrent.duration._
def log(a: Any) = IO(println(a))
def processRecord(
record: ConsumerRecord[Long, String]
): IO[Long] =
log(s"Processing: ${record.key}") *> IO.pure(record.key)
val consumerSettings =
ConsumerSettings[IO, Long, String]
.withAutoOffsetReset(AutoOffsetReset.Latest)
.withBootstrapServers("localhost:9092")
.withGroupId("test-metadata-producer")
val producerSettings =
ProducerSettings[IO, Long, String]
.withBootstrapServers("localhost:9092")
val stream =
KafkaConsumer
.stream(consumerSettings)
.subscribeTo("events")
.records
.mapAsync(2) { committable =>
processRecord(committable.record)
.map { key =>
val value: String = s"My custom message for the output topic $key"
val record = ProducerRecord("updates", key, value)
ProducerRecords.one(record, committable.offset)
}
}
.through(KafkaProducer.pipe(producerSettings))
.map(_.passthrough)
.through(commitBatchWithin(500, 15.seconds))
(log("Starting the processor") *> stream.compile.drain.as(ExitCode.Success)).unsafeRunSync()
Essentially, we’ve created an ETL Pipeline
(ETL
, which stands for extract, transform, and load) script able to start fs2 stream consuming the topic records, extracting the keys in processRecord
, building the new record for the output-topic
, and piping it to a Kafka producer.
Save this as fs2-kafka-stream.sc
.
You could run it using scala-cli run fs2-kafka-stream.sc
.
Kubernetes Deployment
Now you have to test this on a Kubernetes test cluster.
Please do not do this in production! In production, you should deploy using a CI/CD tool from a version control manifests.
You could do the following:
- Define a Kubernetes ConfigMap to host your script.
- Replace the references to
localhost:9092
with a the Kafka Bootstrap URL of the cluster. In our example,kafka-bootstrap:9092
. - Create a Deployment that uses the config map as as volume.
- Use
scala-cli
Docker Image to run the script. - Deploy it to the cluster
kubectl apply -f my-deployment.yaml
.
And that’s all!
Here you have the complete Kubernetes YAML manifests that includes the ConfigMap
and the Deployment
.
---
apiVersion: v1
kind: ConfigMap
metadata:
name: scala-cli-scripts
data:
fs2-kafka-stream.sc: |
import $ivy.`com.github.fd4s::fs2-kafka:2.2.0`
import cats.effect._
import cats.effect.unsafe.implicits.global
import fs2.kafka._
import scala.concurrent.duration._
def log(a: Any) = IO(println(a))
def processRecord(
record: ConsumerRecord[Long, String]
): IO[Long] =
log(s"Processing: ${record.key}") *> IO.pure(record.key)
val consumerSettings =
ConsumerSettings[IO, Long, String]
.withAutoOffsetReset(AutoOffsetReset.Latest)
.withBootstrapServers("localhost:9092")
.withGroupId("test-metadata-producer")
val producerSettings =
ProducerSettings[IO, Long, String]
.withBootstrapServers("localhost:9092")
val stream =
KafkaConsumer
.stream(consumerSettings)
.subscribeTo("events")
.records
.mapAsync(2) { committable =>
processRecord(committable.record)
.map { key =>
val value: String = s"My custom message for the output topic $key"
val record = ProducerRecord("updates", key, value)
ProducerRecords.one(record, committable.offset)
}
}
.through(KafkaProducer.pipe(producerSettings))
.map(_.passthrough)
.through(commitBatchWithin(500, 15.seconds))
(log("Starting the processor") *> stream.compile.drain.as(ExitCode.Success)).unsafeRunSync()
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
service: scala-cli-fs2-kafka-stream
name: scala-cli-fs2-kafka-stream
spec:
replicas: 1
selector:
matchLabels:
service: scala-cli-fs2-kafka-stream
strategy: {}
template:
metadata:
labels:
service: scala-cli-fs2-kafka-stream
spec:
volumes:
- name: scala-cli-scripts # (1)
configMap:
name: scala-cli-scripts
containers:
- image: virtuslab/scala-cli
name: scala-cli
volumeMounts:
- name: scala-cli-scripts # (2)
mountPath: /usr/local/scala-cli-scripts
args:
- run
- /usr/local/scala-cli-scripts/fs2-kafka-stream.sc # (3)
resources:
limits:
cpu: 2
memory: 1Gi
requests:
cpu: 2
memory: 1Gi
restartPolicy: Always
Please note how:
- the
ConfigMap
is declared as a volume in1
- the volume is mounted at
/usr/local/scala-cli-scripts
in2
- the script is run from
/usr/local/scala-cli-scripts/fs2-kafka-stream.sc
in3
If you inspect the deployment logs, you should see something like this:
Processing: key1
Processing: key2
Processing: key3
As a side note, to interact with a Kubernetes cluster, I strongly recommend using k9s
Once you are done, you can delete the deployment using kubectl delete -f my-deployment.yaml
!
Other use cases
You could use these scala-cli
Kubernetes manifests to experiment with anything you like on a test cluster, such as:
- Query a Postgres DB and extract some data using Skunk.
- Automate the insertion of any other type of test data.
- Send HTTP requests.
- Generate quick reports.
The limit is your imagination.
Useful Resources
GitHub - derailed/k9s: 🐶 Kubernetes CLI To Manage Your Clusters In Style!