Real-time Analysis with Google Cloud PubSub and Integration Tests with Docker

Real-time Analysis with Google Cloud PubSub and Integration Tests with Docker

Recently, I’ve been interested in developing scalable applications specifically for leveraging technologies to balance high loads.

This article showcases a project that uses several technologies specially designed for this purpose: Google Cloud Pub/Sub for asynchronous messaging, and Cassandra as a database to provide high scalability and availability. As a source of information, we will read a selection of messages from the globally known social media service: Twitter.

Before moving the software to production, we, as software engineers, incorporate integration tests to verify the correctness of the interactions between the components. To write these tests, we will use the Docker-it-scala library, which we will allow to run our tests with Docker instances with the same version of the software we will use in production.

Inspiration

Terms like scalability, robustness, and highly-available applications, are all representative of things that we work to integrate into all of our projects at 47 Degrees on a daily basis.

For example, imagine a company with a fleet of trucks, each one equipped with sensors that continually deliver measurements to a central application. Now, picture the moment when the sensor sends a piece of information to the central application. We call that: Event. Imagine now that we are sending a lot of events; the application must be prepared to handle massive amounts of information coming in, potentially, at a fast rate of speed.

I attended Scala eXchange 2017 in London, and one talk, in particular, drew my attention. It was a presentation that demonstrated how to do integration tests in Scala with Docker instances. In this talk by Emanuele Blanco, he explains the reasoning behind this: by using the same versions of the software for our tests, we can be confident that the program won’t find any compatibility or configuration issues when deploying in production. So instead of using a mock object to simulate a database connection, the tests could create a docker instance on the fly, running a real PostgreSQL database.

I’m also very interested in Google Cloud technologies, and I like this Real-time analysis data with Kubernetes, Cloud Pub/Sub and BigQuery solution that features an example in Python on how to get a stream of data from Twitter to perform analytical queries on the captured data, all deployed on Google Kubernetes.

Implementation

While Google used Python, PostgreSQL, and BigQuery for their example, we’re going to switch things up and use Scala and Apache Cassandra for ours. However, we’ll continue to use Google Cloud Pub/Sub for queuing messages.

We’re calling this project Twitter Sentiment because it can be expanded to perform queries over the messages persisted on the database and analyze trends.

First steps

Clone the project repository

git clone https://github.com/47deg/twitter-sentiment.git

Creating a Google Cloud account

For testing, we used the GCP Free Trial. Make sure to read their conditions before accepting the trial. Right now, the free period starts when the customer creates a billing account in the Google Cloud Console and ends on the date that a Customer’s usage fees exceed $300 or, 12-months from the start date, whichever comes first.

Creating a Google Cloud Pub/Sub topic

Follow the instructions on Creating a Cloud Pub/Sub topic as described in Real-Time Data Analysis with Kubernetes, Cloud Pub/Sub, and BigQuery. Call the topic: new_tweets

Modify the application.conf file by substituting cp100-178510 with the codename for your project. It’s the one that’s created when you sign-up for your Google Cloud account.

google {
  project = "cp100-178510"
  pubsub {
    topicname = "new_tweets"
  }
}

Create access tokens for Twitter

Go to apps.twitter.com and create a new app. Start by filling in the name, description, and required fields. Once the application is created, continue by creating Keys and Access Tokens on its corresponding tab.

You will see something like this:

Twitter App config example

Copy the values for Consumer key and secret and the Access Token key and secret, and use them in the Twitter section in the application.conf.

Example:

twitter {
  consumer {
    key = ""
    key = ${?TW_CONSUMER_KEY}
    secret = ""
    secret = ${?TW_CONSUMER_SECRET}
  }
  access {
    key = ""
    key = ${?TW_ACCESS_KEY}
    secret = ""
    secret = ${?TW_ACCESS_SECRET}
  }

}

google {
  project = "cp100-178510"
  pubsub {
    topicname = "new_tweets"
  }
}

Optionally, you could set the corresponding environment variables before running the program, with the advantage of not having to write the keys on any files.

Installing the Google Cloud SDK

Follow the instructions for installing the Google Cloud SDK to run gcloud commands from the command-line and set up the account credentials. Check the quickstart guide to read instructions on how to do this for several Operating Systems like Debian, Ubuntu, Red Hat, CentOS, Windows, or macOS.

This will allow us to manually test whether the messages being read from Twitter are being successfully pushed into the Pub/Sub topic. To test that the application is pushing into Pub/Sub correctly, we can create a pull subscription to our topic and call the subscription tweets-new_tweets.

e.g.:

gcloud pubsub subscriptions pull tweets-new_tweets

Setting up Apache Cassandra

Download Apache Cassandra and follow the instructions for installation.

Now, we can create the keyspace that the app will use:

cqlsh < twitter-sentiment/storer/src/main/resources/twittersentiment.cql

Running the modules

The project is composed of two modules: Reader and Storer, that can be deployed and run independently.

Reader takes care of reading a stream of messages from Twitter and submitting the messages to Pub/Sub.

Storer is responsible for reading from Pub/Sub and persisting the messages in Cassandra.

The decoupling of the functionality into two modules will allow the Storer module to read from Pub/Sub and write to the database at its own rhythm, achieving the aforementioned scalability for high loads on the consuming part.

In the Reader module, reading from Twitter is implemented with Daniela Sfregola’s Twitter4s client. Now, we prepare a filter with a list of keywords that we are going to watch from the stream.

val trackList = Seq("scala", "47deg", "scalacats", "scalax", "googlecloud", "#scalax", "pubsub",
  "lightbend", "typesafe", "gcp", "47 degrees", "microservices", "kafka", "cassandra",
  "scalaexchange", "tagless", "frees.io", "monads", "categorytheory")

val topicname = TopicName.of(googleconfig.project, googleconfig.pubsub.topicname)

val streamingClient = TwitterStreamingClient()
streamingClient.filterStatuses(stall_warnings = true, tracks = trackList)(publishTweetText)

publishTweetText is a partial function that will send the data to Pub/Sub.

val pubsubmessage: PubsubMessage = PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(tweet.text)).build()
publisher.publish(pubsubmessage)

In the Storer module, reading from Pub/Sub is done by creating a Subscriber:

val subscriptionName: SubscriptionName = SubscriptionName.of(googleconfig.project, googleconfig.pubsub.subscriptionname);
val response = Either.catchNonFatal(Subscriber.newBuilder(subscriptionName, receiver).build())

We start reading from the Pub/Sub topic:

subscriber.startAsync().awaitRunning()

and sending the messages we read to be persisted by the database:

val tweetmsg = TweetMessage(pubsubmsg.getMessageId(), msg_text)
persister.persist(tweetmsg)

To finally insert the message in Cassandra, we prepare the insert statement with Quill.

object CassandraRepository {

  def cassandraPersist(ctx: CassandraAsyncContext[SnakeCase]): Repository[TweetMessage] = new Repository[TweetMessage] {
    import ctx._
    override def persist(twmsg: TweetMessage): Unit = {

      val insertStatement = quote {
        query[TweetMessage].insert(
          _.messageId -> lift(twmsg.messageId),
          _.data -> lift(twmsg.data)
        )
      }
      ctx.run(insertStatement)
    }
  }
}

Running the reader module:

sbt "project reader" "run"

We will start seeing that the tweets are being read in the console logs:

Output example:

Sending to projects/cp100-178510/topics/new_tweets :Google preemptible VMs reduce #cloud costs -- with a catch https://t.co/hQctBuIuQv @googlecloud
Sending to projects/cp100-178510/topics/new_tweets :RT @apachekafka: Capturing database changes and creating derived views using @debezium and Kafka's Streams API https://t.co/PuqKNKcpow
Sending to projects/cp100-178510/topics/new_tweets :RT @amnestyfrance: L'histoire de Cassandra ou quand tweeter une plaisanterie devient un crime en Espagne.

Running the storer module:

sbt "project storer" "run"

Similarly, the messages being stored in the database are shown in the console logs.

Saving in the Database: Why Go With Microservices? https://t.co/cxLyR4hqLw via @DZone
Saving in the Database: Pietro Gandetto fa il punto sull' "Orphée et Euridice" in scena al Teatro alla Scala con la direzione di Michele... https://t.co/3Gt7rjbGi2
Saving in the Database: How To Migrate Mainframe Batch To #Cloud #Microservices With Blu Age And #Aws https://t.co/SlUIsRn5RW

We can also check the database to see that the messages are stored:

cqlsh                                                                                                                               14:52
Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.11.2 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh> desc keyspaces;

system_auth    twittersentiment   system
system_distributed

cqlsh> use twittersentiment;
cqlsh:twittersentiment> desc tables;

tweet_message

cqlsh:twittersentiment> select * from tweet_message;

 message_id     | data
----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 50994850521524 |                                                                                                                                 【https://t.co/vNAkNcZjxy】6 ways Apache Cassandra prepares you for a multi-cloud future https://t.co/Z1ejCuOJOK

Running the integration tests.

One of the cool features we wanted to achieve here was to use the same version of the software used in production while running the integration tests.

The storer module implements tests in this fashion thanks to the docker-it-scala library.

Have a look at storer.DockerCassandraService to see how we can pull a Docker image containing the desired Apache Cassandra version to run the tests against:

lazy val cassandraContainer: DockerContainer =
  DockerContainer(image = "cassandra:3.11.2")
    .withPorts(9042 -> Some(9043))
    .withReadyChecker(DockerReadyChecker.LogLineContains("Starting listening for CQL clients on"))

abstract override def dockerContainers: List[DockerContainer] =
  cassandraContainer :: super.dockerContainers

The keyspace is created in this Cassandra instance thanks to Quill and CassandraUnit:

object InitializeTests {

  lazy val clusterWithoutSSL =
    Cluster.builder()
      .withPort(9043)
      .addContactPoint("localhost")
      .withCredentials("cassandra", "cassandra").build()

  lazy val ctx = new CassandraAsyncContext[SnakeCase](
    naming = SnakeCase,
    cluster = clusterWithoutSSL,
    keyspace = "twittersentiment",
    preparedStatementCacheSize = 100
  )

  def initializeKeyspaces() = {
    val dataLoader = new CQLDataLoader(clusterWithoutSSL.connect())
    dataLoader.load(new ClassPathCQLDataSet("twittersentiment.cql", true, "twittersentiment"))
  }


}

The tests implemented in CassandraPersisterIntegrationSpec.scala mix in the aforementioned DockerCassandraService and InitializeTests Cassandra context, allowing us to test that our persist method is really working using this dockerized Cassandra instance.

    "saving a tweet" should {
      "persist it into the database" in {
        cassandraPersist(ctx).persist(testTweet)

        val q = quote {
          query[TweetMessage].filter(_.messageId == lift(testTweet.messageId))
        }
        val returnedTweets = ctx.run(q)

        returnedTweets.futureValue.headOption should contain(testTweet)
      }
    }

Finally, to see all this in action, we can run the tests with:

sbt "project storer" "it:test"

Full summary of libraries and components

  • Twitter.
  • Google Cloud Pub/Sub.
  • Apache Cassandra.
  • Twitter4s: An asynchronous non-blocking Scala Twitter Client, implemented using Akka-Http and json4s.
  • Case Classy: Library to make it easy to decode untyped structured data in our configuration files into case class hierarchies.
  • Typelevel Cats: Lightweight, modular, and extensible library for functional programming.
  • Quill: Quoted Domain Specific Language to express queries in Scala. We use it to create queries for Cassandra.
  • Scalatest: Testing tool in the Scala ecosystem.
  • Docker Testkit: Set of utility classes to make integration testing with dockerized services in Scala easy. It can work with the underlying Spotify’s docker-client library.
  • CassandraUnit: Java utility test tool for Cassandra. We use it to create the keyspaces in our dockerized Cassandra.

Summary

The two modules are very small and easily showcase how we can separate concerns in microservices to allow our applications to scale independently.

The usage of Google Pub/Sub as a messaging system is the perfect example of preparing our application for dealing with high levels of incoming traffic on our input stream. It will store the messages, allowing the subscriber applications to read and consume at their own desired speed.

We also illustrated how you could use Apache Cassandra, running in a Docker container, for integration tests.

It would be worth mentioning another approach to do integration tests with a sbt plugin for embedded Cassandra, check https://github.com/47deg/sbt-embedded-cassandra

Have comments or questions? Join in on the conversation on Twitter @47deg, contribute to the code in the repository, or get in touch.

Ensure the success of your project

47 Degrees can work with you to help manage the risks of technology evolution, develop a team of top-tier engaged developers, improve productivity, lower maintenance cost, increase hardware utilization, and improve product quality; all while using the best technologies.