Higher-kinded data in Scala

Higher-kinded data in Scala

The other day I came across a nice use case for a concept known as “higher-kinded data” that I thought was worth sharing. Higher-kinded data is a reasonably well-known concept in the Haskell world, but I haven’t seen many people talking about it in Scala. I’ll use a real-world example to explain the concept and show how it can be useful.

Batch job configuration

At my client, we have a batch job implemented in Scala and Spark. It takes a couple of command line arguments, but most of the configuration is done using a file that the job downloads from S3 at startup.

The config file is parsed and decoded to a case class instance using PureConfig. The corresponding case class looks something like this:

final case class JobConfig(
  inputs: InputConfig,
  outputs: OutputConfig,
  filters: List[Filter],
  timeRange: TimeRange,
  tags: List[String]
)

And the meat of the job looks like this:

def doStuff(config: JobConfig): Unit

The part of the configuration we are most interested in is the TimeRange, which specifies the time period of the data the job should process:

final case class TimeRange(
  min: LocalDateTime,
  max: LocalDateTime
)

There is a new requirement to change the way this time period is passed to the job. In some cases, we want to continue specifying it in the config file, but we also want the ability to pass the min and max values via CLI arguments. For example, when using a job scheduler such as Airflow to run a job every day against the previous day’s data, the scheduler needs a way to pass the appropriate datetime values to the job.

The required behavior is as follows:

  1. You can optionally pass --timeRangeMin and --timeRangeMax CLI arguments to the job. These arguments come as a set: if you pass one, you must also pass the other.

  2. You can continue to specify the time range in the config file, as before.

  3. If the time range is specified both via CLI arguments and via the config file, the CLI arguments take precedence.

  4. If you don’t specify a time range, either using CLI arguments or in the config file, the job will throw an error at startup.

Updating the JobConfig class

With these new requirements, the timeRange field in the config file becomes optional, where previously it was required. So we need to update the corresponding JobConfig case class, so it doesn’t fail when we try to decode a config file with that field missing:

final case class JobConfig(
  inputs: InputConfig,
  outputs: OutputConfig,
  filters: List[Filter],
  timeRange: Option[TimeRange], // this becomes an Option
  tags: List[String]
)

The entrypoint to the job would now look something like:

  1. Look for the optional --timeRangeMin and --timeRangeMax CLI arguments.

  2. Decode the config file, using the updated case class definition.

  3. If the time range was passed via CLI args, replace the timeRange field with those values. Otherwise, if the time range was not specified in the config file, fail with a useful error message.

  4. Call doStuff(config)

But hold on, something isn’t quite right here!

If we’ve reached step 4, we know that the time range has been correctly specified, either via the CLI or the config file. But the timeRange field in the JobConfig is an Option[TimeRange]. In other words, we know the value will always be present, but the compiler doesn’t. We haven’t encoded our knowledge properly in the types.

We’ll have to change a load of code inside the doStuff method to handle the Option, and we’ll probably end up writing something like this:

val min = config.timeRange.get.min // this won't blow up, trust me :)

Can we do better than this?

Enter higher-kinded data

The issue is that we want to re-use our JobConfig class for two different purposes: decoding the contents of a config file, and passing a fully validated and populated job configuration to the core of the job.

We could make two different (but very similar) case classes and implement the conversion from one to the other, but that is quite laborious, potentially error-prone, and not very DRY. An alternative approach is to use a trick called “higher-kinded data,” whereby we make JobConfig polymorphic with a higher-kinded type parameter:

final case class JobConfig[F[_]](
  inputs: InputConfig,
  outputs: OutputConfig,
  filters: List[Filter],
  timeRange: F[TimeRange], // this is now polymorphic
  tags: List[String]
)

Introducing this abstraction means that we can now teach the compiler what we already knew: when decoding the config file, the timeRange field may or may not be present, but by the time we call doStuff(config), the field is definitely there.

Decoding the config file

The change to the PureConfig code is very minor, from:

val config: JobConfig = configSource.loadOrThrow[JobConfig]

to:

val config: JobConfig[Option] = configSource.loadOrThrow[JobConfig[Option]]

Here, we set the F[_] type parameter to Option, because the timeRange field might be missing from the config file.

Requiring the time range to be present

In the doStuff method, we expect the time range to be set, and we don’t want to be dealing with Option. So we can update the method signature from:

def doStuff(config: JobConfig): Unit

to:

def doStuff(config: JobConfig[Id]): Unit

Here, we’re using the Id type (short for “Identity”) from Cats. This is nothing more than a type alias, so you could define it yourself if you prefer:

type Id[A] = A

In other words, the type Id[TimeRange] is precisely TimeRange, but it has the shape we need to match our F[_] type parameter. Within the doStuff method, we can refer to the timeRange field directly without any unwrapping, just like we did before:

val min = config.timeRange.min

Ensuring the time range is set

The entrypoint to the job now looks something like this:

val timeRangeFromCLI: Option[TimeRange] = parseCLIArgs()

val configFromFile: JobConfig[Option] = loadConfigFile()

val configWithTimeRange: JobConfig[Id] = setTimeRange(configFromFile, timeRangeFromCLI)

doStuff(configWithTimeRange)

where the setTimeRange method is defined as follows:

def setTimeRange(
    config: JobConfig[Option],
    timeRangeFromCLI: Option[TimeRange]
): JobConfig[Id] = {
  val timeRange = timeRangeFromCLI
    .orElse(config.timeRange)
    .getOrElse(
      throw new Exception(
        "Time range must be specified either as CLI args or in the config file"
      )
    )
  config.copy[Id](timeRange = timeRange)
}

Parsing the CLI arguments

There was one more requirement we haven’t covered: it shouldn’t be possible to set only one CLI argument and not the other. As a slight aside, let’s look at how to implement that.

We use the Decline library for CLI argument parsing. It makes it really easy to compose the two arguments in the way we need:

private val timeRangeMinOpt: Opts[LocalDateTime] =
  Opts.option[LocalDateTime]("timeRangeMin", help = "...")

private val timeRangeMaxOpt: Opts[LocalDateTime] =
  Opts.option[LocalDateTime]("timeRangeMax", help = "...")

val timeRangeOpt: Opts[Option[TimeRange]] =
  (timeRangeMinOpt, timeRangeMaxOpt).mapN(TimeRange).orNone

This will behave as required: you can pass no arguments, or both, but not only one.

Conclusion

Higher-kinded polymorphism lets you re-use your case classes in multiple situations, increasing the coherence of your model and reducing code duplication.

Ensure the success of your project

47 Degrees can work with you to help manage the risks of technology evolution, develop a team of top-tier engaged developers, improve productivity, lower maintenance cost, increase hardware utilization, and improve product quality; all while using the best technologies.