Real-time Analysis with Google Cloud PubSub and Integration Tests with Docker
by Juan Méndez Rey
- •
- April 03, 2018
- •
- google cloud pubsub• apache cassandra• stream processing• integration tests• docker
- |
- 11 minutes to read.

Recently, I’ve been interested in developing scalable applications specifically for leveraging technologies to balance high loads.
This article showcases a project that uses several technologies specially designed for this purpose: Google Cloud Pub/Sub for asynchronous messaging, and Cassandra as a database to provide high scalability and availability. As a source of information, we will read a selection of messages from the globally known social media service: Twitter.
Before moving the software to production, we, as software engineers, incorporate integration tests to verify the correctness of the interactions between the components. To write these tests, we will use the Docker-it-scala library, which we will allow to run our tests with Docker instances with the same version of the software we will use in production.
Inspiration
Terms like scalability, robustness, and highly-available applications, are all representative of things that we work to integrate into all of our projects at 47 Degrees on a daily basis.
For example, imagine a company with a fleet of trucks, each one equipped with sensors that continually deliver measurements to a central application. Now, picture the moment when the sensor sends a piece of information to the central application. We call that: Event. Imagine now that we are sending a lot of events; the application must be prepared to handle massive amounts of information coming in, potentially, at a fast rate of speed.
I attended Scala eXchange 2017 in London, and one talk, in particular, drew my attention. It was a presentation that demonstrated how to do integration tests in Scala with Docker instances. In this talk by Emanuele Blanco, he explains the reasoning behind this: by using the same versions of the software for our tests, we can be confident that the program won’t find any compatibility or configuration issues when deploying in production. So instead of using a mock object to simulate a database connection, the tests could create a docker instance on the fly, running a real PostgreSQL database.
I’m also very interested in Google Cloud technologies, and I like this Real-time analysis data with Kubernetes, Cloud Pub/Sub and BigQuery solution that features an example in Python on how to get a stream of data from Twitter to perform analytical queries on the captured data, all deployed on Google Kubernetes.
Implementation
While Google used Python, PostgreSQL, and BigQuery for their example, we’re going to switch things up and use Scala and Apache Cassandra for ours. However, we’ll continue to use Google Cloud Pub/Sub for queuing messages.
We’re calling this project Twitter Sentiment because it can be expanded to perform queries over the messages persisted on the database and analyze trends.
First steps
Clone the project repository
git clone https://github.com/47deg/twitter-sentiment.git
Creating a Google Cloud account
For testing, we used the GCP Free Trial. Make sure to read their conditions before accepting the trial. Right now, the free period starts when the customer creates a billing account in the Google Cloud Console and ends on the date that a Customer’s usage fees exceed $300 or, 12-months from the start date, whichever comes first.
Creating a Google Cloud Pub/Sub topic
Follow the instructions on Creating a Cloud Pub/Sub topic as described in Real-Time Data Analysis with Kubernetes, Cloud Pub/Sub, and BigQuery. Call the topic: new_tweets
Modify the application.conf file by substituting cp100-178510
with the codename for your project. It’s the one that’s created when you sign-up for your Google Cloud account.
google {
project = "cp100-178510"
pubsub {
topicname = "new_tweets"
}
}
Create access tokens for Twitter
Go to apps.twitter.com and create a new app. Start by filling in the name, description, and required fields. Once the application is created, continue by creating Keys and Access Tokens
on its corresponding tab.
You will see something like this:
Copy the values for Consumer key and secret
and the Access Token key and secret
, and use them in the Twitter section in the application.conf.
Example:
twitter {
consumer {
key = ""
key = ${?TW_CONSUMER_KEY}
secret = ""
secret = ${?TW_CONSUMER_SECRET}
}
access {
key = ""
key = ${?TW_ACCESS_KEY}
secret = ""
secret = ${?TW_ACCESS_SECRET}
}
}
google {
project = "cp100-178510"
pubsub {
topicname = "new_tweets"
}
}
Optionally, you could set the corresponding environment variables before running the program, with the advantage of not having to write the keys on any files.
Installing the Google Cloud SDK
Follow the instructions for installing the Google Cloud SDK to run gcloud commands from the command-line and set up the account credentials. Check the quickstart guide to read instructions on how to do this for several Operating Systems like Debian, Ubuntu, Red Hat, CentOS, Windows, or macOS.
This will allow us to manually test whether the messages being read from Twitter are being successfully pushed into the Pub/Sub topic. To test that the application is pushing into Pub/Sub correctly, we can create a pull subscription
to our topic and call the subscription tweets-new_tweets
.
e.g.:
gcloud pubsub subscriptions pull tweets-new_tweets
Setting up Apache Cassandra
Download Apache Cassandra and follow the instructions for installation.
Now, we can create the keyspace that the app will use:
cqlsh < twitter-sentiment/storer/src/main/resources/twittersentiment.cql
Running the modules
The project is composed of two modules: Reader
and Storer
, that can be deployed and run independently.
Reader
takes care of reading a stream of messages from Twitter and submitting the messages to Pub/Sub.
Storer
is responsible for reading from Pub/Sub and persisting the messages in Cassandra.
The decoupling of the functionality into two modules will allow the Storer module to read from Pub/Sub and write to the database at its own rhythm, achieving the aforementioned scalability for high loads on the consuming part.
In the Reader module
, reading from Twitter is implemented with Daniela Sfregola’s Twitter4s client. Now, we prepare a filter with a list of keywords that we are going to watch from the stream.
val trackList = Seq("scala", "47deg", "scalacats", "scalax", "googlecloud", "#scalax", "pubsub",
"lightbend", "typesafe", "gcp", "47 degrees", "microservices", "kafka", "cassandra",
"scalaexchange", "tagless", "frees.io", "monads", "categorytheory")
val topicname = TopicName.of(googleconfig.project, googleconfig.pubsub.topicname)
val streamingClient = TwitterStreamingClient()
streamingClient.filterStatuses(stall_warnings = true, tracks = trackList)(publishTweetText)
publishTweetText is a partial function that will send the data to Pub/Sub.
val pubsubmessage: PubsubMessage = PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(tweet.text)).build()
publisher.publish(pubsubmessage)
In the Storer module
, reading from Pub/Sub is done by creating a Subscriber:
val subscriptionName: SubscriptionName = SubscriptionName.of(googleconfig.project, googleconfig.pubsub.subscriptionname);
val response = Either.catchNonFatal(Subscriber.newBuilder(subscriptionName, receiver).build())
We start reading from the Pub/Sub topic:
subscriber.startAsync().awaitRunning()
and sending the messages we read to be persisted by the database:
val tweetmsg = TweetMessage(pubsubmsg.getMessageId(), msg_text)
persister.persist(tweetmsg)
To finally insert the message in Cassandra, we prepare the insert statement with Quill.
object CassandraRepository {
def cassandraPersist(ctx: CassandraAsyncContext[SnakeCase]): Repository[TweetMessage] = new Repository[TweetMessage] {
import ctx._
override def persist(twmsg: TweetMessage): Unit = {
val insertStatement = quote {
query[TweetMessage].insert(
_.messageId -> lift(twmsg.messageId),
_.data -> lift(twmsg.data)
)
}
ctx.run(insertStatement)
}
}
}
Running the reader module:
sbt "project reader" "run"
We will start seeing that the tweets are being read in the console logs:
Output example:
Sending to projects/cp100-178510/topics/new_tweets :Google preemptible VMs reduce #cloud costs -- with a catch https://t.co/hQctBuIuQv @googlecloud
Sending to projects/cp100-178510/topics/new_tweets :RT @apachekafka: Capturing database changes and creating derived views using @debezium and Kafka's Streams API https://t.co/PuqKNKcpow
Sending to projects/cp100-178510/topics/new_tweets :RT @amnestyfrance: L'histoire de Cassandra ou quand tweeter une plaisanterie devient un crime en Espagne.
Running the storer module:
sbt "project storer" "run"
Similarly, the messages being stored in the database are shown in the console logs.
Saving in the Database: Why Go With Microservices? https://t.co/cxLyR4hqLw via @DZone
Saving in the Database: Pietro Gandetto fa il punto sull' "Orphée et Euridice" in scena al Teatro alla Scala con la direzione di Michele... https://t.co/3Gt7rjbGi2
Saving in the Database: How To Migrate Mainframe Batch To #Cloud #Microservices With Blu Age And #Aws https://t.co/SlUIsRn5RW
We can also check the database to see that the messages are stored:
cqlsh 14:52
Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.11.2 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh> desc keyspaces;
system_auth twittersentiment system
system_distributed
cqlsh> use twittersentiment;
cqlsh:twittersentiment> desc tables;
tweet_message
cqlsh:twittersentiment> select * from tweet_message;
message_id | data
----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
50994850521524 | 【https://t.co/vNAkNcZjxy】6 ways Apache Cassandra prepares you for a multi-cloud future https://t.co/Z1ejCuOJOK
Running the integration tests.
One of the cool features we wanted to achieve here was to use the same version of the software used in production while running the integration tests.
The storer module implements tests in this fashion thanks to the docker-it-scala
library.
Have a look at storer.DockerCassandraService to see how we can pull a Docker image containing the desired Apache Cassandra version to run the tests against:
lazy val cassandraContainer: DockerContainer =
DockerContainer(image = "cassandra:3.11.2")
.withPorts(9042 -> Some(9043))
.withReadyChecker(DockerReadyChecker.LogLineContains("Starting listening for CQL clients on"))
abstract override def dockerContainers: List[DockerContainer] =
cassandraContainer :: super.dockerContainers
The keyspace is created in this Cassandra instance thanks to Quill and CassandraUnit:
object InitializeTests {
lazy val clusterWithoutSSL =
Cluster.builder()
.withPort(9043)
.addContactPoint("localhost")
.withCredentials("cassandra", "cassandra").build()
lazy val ctx = new CassandraAsyncContext[SnakeCase](
naming = SnakeCase,
cluster = clusterWithoutSSL,
keyspace = "twittersentiment",
preparedStatementCacheSize = 100
)
def initializeKeyspaces() = {
val dataLoader = new CQLDataLoader(clusterWithoutSSL.connect())
dataLoader.load(new ClassPathCQLDataSet("twittersentiment.cql", true, "twittersentiment"))
}
}
The tests implemented in CassandraPersisterIntegrationSpec.scala
mix in the aforementioned DockerCassandraService and InitializeTests Cassandra context, allowing
us to test that our persist
method is really working using this dockerized Cassandra instance.
"saving a tweet" should {
"persist it into the database" in {
cassandraPersist(ctx).persist(testTweet)
val q = quote {
query[TweetMessage].filter(_.messageId == lift(testTweet.messageId))
}
val returnedTweets = ctx.run(q)
returnedTweets.futureValue.headOption should contain(testTweet)
}
}
Finally, to see all this in action, we can run the tests with:
sbt "project storer" "it:test"
Full summary of libraries and components
- Twitter.
- Google Cloud Pub/Sub.
- Apache Cassandra.
- Twitter4s: An asynchronous non-blocking Scala Twitter Client, implemented using Akka-Http and json4s.
- Case Classy: Library to make it easy to decode untyped structured data in our configuration files into case class hierarchies.
- Typelevel Cats: Lightweight, modular, and extensible library for functional programming.
- Quill: Quoted Domain Specific Language to express queries in Scala. We use it to create queries for Cassandra.
- Scalatest: Testing tool in the Scala ecosystem.
- Docker Testkit: Set of utility classes to make integration testing with dockerized services in Scala easy. It can work with the underlying Spotify’s docker-client library.
- CassandraUnit: Java utility test tool for Cassandra. We use it to create the keyspaces in our dockerized Cassandra.
Summary
The two modules are very small and easily showcase how we can separate concerns in microservices to allow our applications to scale independently.
The usage of Google Pub/Sub as a messaging system is the perfect example of preparing our application for dealing with high levels of incoming traffic on our input stream. It will store the messages, allowing the subscriber applications to read and consume at their own desired speed.
We also illustrated how you could use Apache Cassandra, running in a Docker container, for integration tests.
It would be worth mentioning another approach to do integration tests with a sbt plugin for embedded Cassandra, check https://github.com/47deg/sbt-embedded-cassandra
Have comments or questions? Join in on the conversation on Twitter @47deg, contribute to the code in the repository, or get in touch.