How to save time and money by testing Spark locally

How to save time and money by testing Spark locally

Table of Contents

Why should you think once again about testing Spark?

Sometimes it’s good to look back at how things were supposed to be done, what went wrong along the way, and how to make amends. Data Engineers were tempted by the pressure of the moment to give up on testing all together. Here are the reasons:

  • Launching the Spark job on premise was very cheap.
  • There was no need for generating your own data; just take a percentage of production data.
  • And no need for unit testing, since the app was tested all at once by running it in the cluster, and the data could be validated running some basic SQL commands.

This way of doing it was not perfect, but works most of the time. Nevertheless, Data Engineers started to suffer from repetitive work, having to wait for the data to be loaded, or for the output to be validated. In many cases, these tasks ended up on the shoulders of the Data Engineers themselves.

Cloud is not free

During the last three years, what was once a problem for some has become a general issues for most of the industries that use data. The price of cloud computing has risen due to the combination of a growing demand and more expensive hardware:

  • Disruption of the supply chain: shortages of chips (GPU, CPU), back orders, broken stocks.
  • Covid-19: remote work was implemented by many companies by relying on cloud services.
  • Data-driven: companies that are redesigning their internal processes frequently use cloud services.

On top of those market-related reasons, most of the companies admit to wasting resources on unnecessary processes or suboptimal processing. They find themselves unable to control and reduce cloud costs. Related material on this point:

Treat your Data Engineers well

Several studies have shown that Data Engineers are pushing toward a perfect storm for burnout. The main reasons are:

  1. Unreasonable requests.
  2. Manual processes: operational execution.
  3. Finding and fixing bugs takes time away from designing systems.
  4. Shouldering blame: if something goes wrong, blame your Data Engineer!
  5. Overly restrictive governance.

Testing the Spark applications locally will help Data Engineering teams with points 2, 3, and 4. Thus, preventing the burnout. You can find more information by checking out 5 reasons data engineers are burnt out and 3 Ways To Prevent Data Engineer Burnout.

What to test on a Spark Application

Unit Test

The easier library to help with testing Spark Applications is the spark-testing-base library by Holden Karau. It provides a SparkSession for free, reducing boilerplate and JVM overhead. It contains assertion on the DataFrame / DataSet / RDD level, removing the need to trigger an action to validate the result.

Integration Test

Some of the first solutions were the spark-integration-tests library or the previously mentioned spark-testing-base that can be used to simulate integration. Still, most of the integration testing is done by launching your Spark Job on a cluster. That cluster used to be a sandbox or pre-production environment on premise. Nowadays, the cluster is a cloud instance: a sandbox, a notebook, or any other cloud environment.

Another way for testing the integration of our Spark applications relies on containers. Containerised integration tests for Spark jobs allows local integration testing. Basically:

  1. Bake the Spark job’s jar into a Spark image.
  2. Inject test data and job configuration files into the container by mounting a volume.
  3. Use compose to orchestrate the Spark job and its dependencies.
  4. Run compose.
  5. Run assertions on the content of the target storage after the job has run.

Performance

Spark tuning is a mastery of its own. Most of the work is handmade by the senior engineers trying to optimize DAGs, skew data, counting errors, etc. Have a look at Holden Karau’s Spark Autotuning presentation from Strata New York 2018.

Job Validation

Even when the Spark job seems to work just fine, it might cause some real problems in production. Holden Karau has done some work on validating Spark jobs. Here you can check out her Validating Big Data Jobs talk from Big Data Spain 2018, and the slides from SparkAISummit SF 2019.

Testing the small - Unit Test

Ted Malaska gave a talk about testing the small. For him, testing the small means:

The end goal is […] to increase velocity of development, increase stability and production quality.

Let’s come up with some data that requires transformations.

firstName,surName,department,manager,enrollmentDate
harry,johnson,Back End, ,05012021
sylvia,roth,Data Science, ,08012017
Greg,Thomas,Sales,Mr. Ceo,02282001
Pamela,Griffin,Management, ,12062010

Requirments

  • Firstname and surname should be capitalized.
  • Manager should be assigned to employees based on their department:
    • If the employee works in Back End, its manager must be Pierre Graz.
    • If the employee works in Data Science, its manager must be Luisa Garcia.
    • If the employee works for any other department, its manager must be Mr. CEO.
  • Enrollment date has to be transformed from ‘MMddyyyy’ to ISO 8601 format ‘yyyy-MM-dd’.

This would be the expected exit:

firstName,surName,department,manager,enrollmentDate
Harry,Johnson,Back End,Pierre Graz,2021-05-01
Sylvia,Roth,Data Science,Luisa Garcia,2017-08-01
Greg,Thomas,Sales,Mr. CEO,2001-02-28
Pamela,Griffin,Management,Mr. CEO,2010-12-06

At this point, let’s model the scenario by creating a basic case class for the employee.

case class Employee (
  firstName: String,
  surName: String,
  department: String,
  manager: String,
  enrollmentDate: String
)

One way of implementing the transformations would be to write each of them as a function of type DataFrame => DataFrame. Some columns are assumed to have certain values. Thus, not all the cases are covered. For example, the enrollmentDate has to be either a valid string or an empty string in the source data.

object Transformations {

  lazy val capitalize: String => Column = columnName => initcap(lower(col(columnName)))

  lazy val assignManager: DataFrame => DataFrame = df =>
    df.withColumn("manager",
      when(col("department") === lit("Back End"), lit("Pierre Graz"))
        .otherwise(
          when(col("department") === lit("Data Science"), lit("Luisa Garcia"))
            .otherwise(
              lit("Mr. CEO")
            )
        )
    )
  lazy val to_ISO8601_Date: DataFrame => DataFrame = df =>
    df.withColumn("enrollmentDate",
      when(col("enrollmentDate") === lit(""), lit("")
      ).otherwise(to_date(col("enrollmentDate"), "MMddyyyy").cast(StringType)))

   lazy val capitalizeNames: DataFrame => DataFrame = df =>
    df.withColumn("firstName", capitalize("firstName"))
      .withColumn("surName", capitalize("surName"))

}

The early stages of Spark testing

Spark supports direct testing with some workarounds. Those workarounds add some boilerplate to our code. The following code (or something similar) has to be added to all the testing classes.

class TestingSparkWithScalaTest extends FunSuite with BeforeAndAfterAll {

  var sparkSession: SparkSession = _

  override protected def beforeAll(): Unit =
    sparkSession = SparkSession.builder()
      .appName("testing spark")
      .master("local[1]")
      .getOrCreate()

  override protected def afterAll(): Unit = sparkSession.stop()

Then, the transformations are tested. But we have to define some dummy data first.

lazy val entryData: Seq[Employee] = Vector(
  Employee("Ryan", "Wilson", "Back End", "", "01312019"),
  Employee("lexy", "Smith", "Data Science", "", "02282018"),
  Employee("marlow", "perez", "Sales", "", "05132020"),
  Employee("Angela", "costa", "Management", "", "12062021")
)

lazy val expectedExit: Seq[Employee] = Vector(
  Employee("Ryan", "Wilson", "Back End", "Pierre Graz", "2019-01-31"),
  Employee("Lexy", "Smith", "Data Science", "Luisa Garcia", "2018-02-28"),
  Employee("Marlow", "Perez", "Sales", "Mr. CEO", "2020-05-13"),
  Employee("Angela", "Costa", "Management", "Mr. CEO", "2021-12-06")
)

As an example, let’s look at the assign manager test.

test("Testing Assign Manager") {
  val dataframe: DataFrame = sparkSession.createDataFrame(EmployeeSample.entryData)
  val updatedManager: DataFrame = Transformations.assignManager(dataframe)
  assert(updatedManager.where(col("manager") === lit("Mr. CEO")).count() === 2)
  assert(updatedManager.where(col("manager") === lit("Pierre Graz")).count() === 1)
  assert(updatedManager.where(col("manager") === lit("Luisa Garcia")).count() === 1)
}

Steps

  1. Get yourself a SparkSession for each testing class.
  2. Generate some dummy data: It can be a Sequence, a csv file on your laptop, or other things.
  3. Run the test and compare input and output (outside the DataFrame type) only for the scenarios that you have thought of.

Problems

  1. Having to instantiate a SparkSession for each class adds boilerplate and overhead to your JVM.
  2. Generating the dummy data is a manual task that is time-consuming and prone to errors.
  3. Tests are done only in the given input and not in all the possible inputs for the given type. Most times, an action has to be done on the DataFrame (collect, take, count . . .).

How to make testing easier and safer

Using existing libraries that are well-supported is usually helpful. While testing Spark, we use several libraries together. In this introduction, we will show how to change the way of thinking about testing. For now, we will use only the spark-testing-base library.

Chapter 1: Spark-testing-base and ScalaCheck

Comparing DataFrames

One of the best advantages of the library is the ability to compare two given DataFrames. In the following test, all the transformations (all the requirements) are applied to the entry dataframe and the result is compared with what is expected.

test("Testing Employee Logic") {
  val entryDataset: DataFrame = spark.createDataFrame(EmployeeSample.entryData)
  val transformedFDF: DataFrame = Employee.applyTransformations(entryDataset)
  val exitDataset: DataFrame = spark.createDataFrame(EmployeeSample.expectedExit)
  assertDataFrameDataEquals(exitDataset, transformedFDF)
}

A brief introduction to the ScalaCheck benefits

Spark-testing-base includes the ScalaCheck library under the hood. ScalaCheck is known for its property-based testing capabilities. Benefits:

  • It automatically generates data for you based on the given type or using a custom generator.
  • Runs 100 tests for each property.
  • Tells you which value made the test crash (if any).

For example:

test("Summing Integers") {
  val nPlus1GreaterThanN: Prop = Prop.forAll((i: Int) => (i + 1) > i)
  check(nPlus1GreaterThanN)
}
GeneratorDrivenPropertyCheckFailedException was thrown during property evaluation.
  Falsified after 3 successful property evaluations.
  Occurred when passed generated values ( arg0 = 2147483647 )

That value is exactly the Int.MaxValue!

scala> Int.MaxValue
val res0: Int = 2147483647
scala> res0 + 1
val res1: Int = -2147483648

Having that in mind, the focus can be changed from comparing input and output to checking if the properties of our DataFrame => DataFrame functions hold.

Arbitrary DataFrames with custom fields

In order to use this tool to our advantage, custom generators must be created. For example:

  • Firstname and surname can share the same generator
    def anyNameGen: Gen[String] = for {
      length <- Gen.chooseNum(5,10)
      charList <- Gen.listOfN(length, Gen.alphaChar)
    } yield charList.mkString
    
  • Department
    def departmentGen: Gen[String] = Gen.oneOf(
      Vector(
    "Back End", 
    "Data Science", 
    "Sales", 
    "Management"
      )
    )
    

Testing function properties, not whole DataFrames

Often, the amount of columns that are needed to perform the transformation are just a few. The remaining columns of the DataFrame do not matter. DataFrames can get really huge; up to hundreds of columns. In those cases, testing the whole DataFrame in your local machine becomes really hard. Thus, instead of testing the entire DataFrame, it’s better to focus on the columns that are really involved in the function that we want to check.

Let’s recall the logic to assign a manager to the employee based on the department:

  • If the employee works in Back End, its manager must be Pierre Graz.
  • If the employee works in Data Science, its manager must be Luisa Garcia.
  • If the employee works for any other department, its manager must be Mr. CEO.

What are the properties of this function? One that is quite direct is that the number of employees who work under Back End has to be equal to the number of employees who have Pierre Graz as a manager. This holds the same for the count of Data Science and the count of any other departments that are not the prior.

test("Testing assigning manager") {
  val assigningManager: Prop = Prop.forAll(employeeDfGenerator) { entryDataFrame =>
    Transformations.assignManager(entryDataFrame).where(
      col("manager").isin("Pierre Graz", "Luisa Garcia")
    ).count() === entryDataFrame.where(
      col("department").isin("Back End", "Data Science")
    ).count()
  }
  check(assigningManager)
}

For the function that capitalizes both firstname and surname, the property is direct. Let’s check if they follow a regular expression.

test("Testing capitalized names") {
  val capitalizedNames: Prop = Prop.forAll(employeeDfGenerator) { entryDataFrame =>
    Transformations.capitalizeNames(entryDataFrame).where(
      col("firstName").rlike("[A-Z]{1}[a-z]*") && col("surName").rlike("[A-Z]{1}[a-z]*")
    ).count() == entryDataFrame.count()
  }
  check(capitalizedNames)
}

Using this approach, the entire pipeline of transformation could be checked on a given table all at once. For the specific case of Employee, this is one of the many ways of doing it:

test("Testing Employee Logic") {
  val employeeLogic: Prop = Prop.forAll(employeeDfGenerator) { entryDataFrame =>
    Employee.applyTransformations(entryDataFrame).where(
      col("manager").isin("Pierre Graz", "Luisa Garcia") && col("enrollmentDate").rlike("[0-9]{4}-[0-9]{2}-[0-9]{2}") && col("firstName").rlike("[A-Z]{1}[a-z]*") && col("surName").rlike("[A-Z]{1}[a-z]*")
    ).count() === entryDataFrame.where(
      col("department").isin("Back End", "Data Science")
    ).count()
  }
  check(employeeLogic)
}

Conclusion

For Data Engineers

Once you start testing your Spark apps, some benefits will emerge:

  • Do not worry that much about finding all the edge cases.
  • Stop spending time generating dummy data, gathering data from production, or lunching the app in some instance to check if it works.
  • Reduce the time spent on debugging and avoid executing the app on the cluster as much as possible.
  • Splitting gigantic singletons into something that better resembles the business domain will come naturally.
  • Fearless refactoring when needed.

For Managers

Giving your team sufficient time to test their code will prevent:

  • Unexpected rising cloud costs.
  • Data Engineer burnouts.

Stay tuned for Chapter 2!

Code available at 47 Degrees Open Source.

Ensure the success of your project

47 Degrees can work with you to help manage the risks of technology evolution, develop a team of top-tier engaged developers, improve productivity, lower maintenance cost, increase hardware utilization, and improve product quality; all while using the best technologies.