Spark On: Let's Code! (Part 2)

Spark On: Let's Code! (Part 2)

This is the second part of our blog series on Spark. If you missed the first post, read Spark On: Let’s Code! (Part 1) to get some background information. We’ll be using the same example and environment but this time, we’ll be introducing some additional functionalities to our Spark Streaming pipeline.

For our purposes now, we’re going to take advantage of our spark streaming computation to open a WebSocket that can be used by any client. Let’s review the details:

New elements in our Stack

As we pointed out, our new challenge is to create a WebSocket using our filtered word tracks. Each time the Twitter Streaming API pushes out a new tweet with one of our designated track words in the status, we will send a push message to the open WebSocket. There’s a variety of ways to do this, and some which work better at accomplishing the same task, but we’re just playing around and introducing some complexities to make our code a little more fun.

Therefore, these are the new technical elements in the project stack:

  • Apache Kafka - used as a publish-subscribe model.
  • Reactive Kafka - to consume the topic, where the Kafka Publisher is sending the new tracks based on the Twitter Streaming flow.
  • Akka Streams - to create the HTTP WebSocket.

Additionally, the code developed in the previous blog post has been refactored to a new structure based on a sbt multi-module. You can see the previous version in this specific feature branch.

The new implementation of the Spark Streaming Receiver, along with the new code structure, deserves special mention.

class TwitterReceiverActorStream[T: ClassTag](
  twitterAuth: Authorization,
  filters: Seq[String]
) extends Actor with ActorHelper {

  val twitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
  val listener = new StatusListener() {

    def onStatus(status: Status) = self ! status
    def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) = {}
    def onTrackLimitationNotice(i: Int) = {}
    def onScrubGeo(l: Long, l1: Long) = {}
    def onStallWarning(stallWarning: StallWarning) = {}
    def onException(e: Exception) = e.printStackTrace()
  }

  override def preStart(): Unit = {
    twitterStream.addListener(listener)
    filters match {
      case Nil => twitterStream.sample()
      case _ =>
        val query = new FilterQuery
        query.track(filters.toArray)
        twitterStream.filter(query)
    }
  }

  def receive = {
    case data => store(data.asInstanceOf[T])
  }
}

As you can see, we’re using a Custom Actor-based Receiver which greatly simplifies our code.

We won’t dive too deep into the sbt dependencies needed to make this code functional because all the details can be found in the GitHub repository.

Pushing New Tracks

Our system ingests all the Twitter Streaming flow using a service called ingestTweets. We will take advantage of this by using the same DStream to send Kafka the new tracks found in the Twitter Streaming.

What we’re doing here is pipelining the same input DStream to make five operations:

  • Map the new tweets to the proper Cassandra metadata model.
  • Compute filtering by track in the Spark Streaming Window.
  • Write the new word tracks found to Kafka.
  • Store all the tweets in a Cassandra table, grouping them by day.
  • Store all the filtered tracks found in a Cassandra table, grouping them by track.

As you might have noticed, the new additions relate to the third operation: “Write the new word tracks found to Kafka”.

def writeToKafka(dStream: DStream[TweetsByTrack]) =
dStream.map(_.track).foreachRDD { rdd =>
  rdd foreachPartition { partition =>
    lazy val kafkaProducerParams = new Properties()

    val bs = sys.env.getOrElse("kafkaBootstrapServers", "")
    val ks = sys.env.getOrElse("kafkaProducerKeySerializer", "")
    val vs = sys.env.getOrElse("kafkaProducerValueSerializer", "")

    kafkaProducerParams.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bs)
    kafkaProducerParams.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ks)
    kafkaProducerParams.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, vs)
    val producer = new KafkaProducer[String, String](kafkaProducerParams)

    partition foreach {
      case m: String =>
        val message = new ProducerRecord[String, String](
						kafkaTopicRaw, StaticValues.javaNull, m)
        producer.send(message)
      case _ => logger.warn("Unknown Partition Message!")
    }
  }
}

During execution, this code will be sent to the different executors along the Spark Worker nodes, and will most likely be split into different tasks and executed in a different JVM. It’s important to mention, that all the configuration keys in the application (Spark Driver side) won’t be visible on those executors unless you pass them through the Spark Configuration Object.

A Kafka Producer needs some configuration to work properly, these are:

  • bootstrap.servers: A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
  • key.serializer: Serializer class for key that implements the Serializer interface.
  • value.serializer: Serializer class for value that implements the Serializer interface.

These three parameters are parametrized in our application. So, once the SparkContext are initialized through the SparkConf object, we will need to change the parameter’s values to environment variables in the executors JVM to send a message to the Kafka topic.

Below is the code needed to pass the environment variables to the Spark executors:

val sparkConf = new SparkConf()
	...
	.set("spark.executorEnv.kafkaBootstrapServers", bootstrapServers)
	.set("spark.executorEnv.kafkaProducerKeySerializer", kafkaProducerKeySerializer)
	.set("spark.executorEnv.kafkaProducerValueSerializer", kafkaProducerValueSerializer)

The standard is: spark.executorEnv.[EnvironmentVariableName] to add the environment variable specified by EnvironmentVariableName to the Executor process.

We can read these environment variables with this piece of code (it will be run in the Spark Executor side):

val bs = sys.env.getOrElse("kafkaBootstrapServers", "")
val ks = sys.env.getOrElse("kafkaProducerKeySerializer", "")
val vs = sys.env.getOrElse("kafkaProducerValueSerializer", "")

And now our Kafka topic is being populated with the word tracks we are filtering for.

WebSocket

Let’s move on to tackling the previously populated Kafka topic. As we mentioned, we’re going to use Reactive Kafka to pass the messages on to the WebSocket.

First, let’s take a look at the HTTP API path and its associated code:

get {
	handleWebsocketMessages(handler = kafkaServiceFlow)
}
...
/* Kafka Service Flow Handler */
def kafkaServiceFlow: Flow[Message, Message, _] = {

	val kafka = new ReactiveKafka()
	val publisher: Publisher[StringKafkaMessage] =
	kafka.consume(
		ConsumerProperties(
			brokerList = bootstrapServers,
			zooKeeperHost = s"$zookeeperHost:$zookeeperPort",
			topic = kafkaTopicRaw,
			groupId = kafkaGroupId,
			decoder = new StringDecoder()
		)
	)

	Flow.wrap(Sink.ignore, Source(publisher) map toMessage)(Keep.none)
}

We have just created a new Akka Streams Flow from a specific source; we’ve ignored the Sink because it isn’t necessary in this example. We won’t go into depth about details on Akka Streams as it’s out of scope, but the code looks simple enough, doesn’t it?

Creating the Source becomes much easier with ReactiveKafka, as we only need to set up the Kafka consumer and connect it to the WebSocket.

Conclusion

Generally speaking, we’ve added more complexities to our example that, in my opinion, along with all the environment and tech considerations, makes Spark more interesting.

In our next post in the Spark series, we will explain how to deploy this environment as a cluster set that will be ready for production. All of these aspects will be dockerized and assembled in a single docker-compose descriptor. Stay tuned!

You can check out the entire code here.

Further References

blog comments powered by Disqus

Ensure the success of your project

47 Degrees can work with you to help manage the risks of technology evolution, develop a team of top-tier engaged developers, improve productivity, lower maintenance cost, increase hardware utilization, and improve product quality; all while using the best technologies.