47 Degrees joins forces with Xebia read more

Combining data from a database and a web service with Fetch

Combining data from a database and a web service with Fetch

In a previous article, I discussed how you can use Fetch to query and combine data from a variety of sources such as databases and HTTP servers. This time, we’re going to examine a full example using the Doobie library for DB access and http4s as the HTTP client.

For the sake of this example, let’s assume we have a DB table with users and we are storing related to-do items in a third-party web service. We’ll query users from the DB and their list of to-do items from an HTTP API.

Querying the Database

We’ll start by creating our user DB table and inserting a few values in it. For performing queries to the DB, we’ll need a Doobie Transactor; you can find details on how to create one in Doobie’s documentation. We’ll create a transactor that runs queries to the Task type from the fs2 library and makes sure that the user table is created and populated with a few records upon transactor creation:

type UserId = Int
case class User(id: UserId, name: String)

val dropTable = sql"DROP TABLE IF EXISTS user".update.run

val createTable = sql"""
   CREATE TABLE user (
     id INTEGER PRIMARY KEY,
     name VARCHAR(20) NOT NULL UNIQUE
   )
  """.update.run

def addUser(usr: User) =
  sql"INSERT INTO user (id, name) VALUES(${usr.id}, ${usr.name})".update.run

val users: List[User] =
  List("William Shakespeare", "Charles Dickens", "George Orwell").zipWithIndex.map {
    case (name, id) => User(id + 1, name)
  }

val xa: Transactor[Task] = (for {
  xa <- createTransactor
  _  <- (dropTable *> createTable *> users.traverse(addUser)).transact(xa)
} yield xa).unsafeRunSync.toOption.getOrElse(
  throw new Exception("Could not create test database and/or transactor")
)

Now that we have the table created and a few records inputted, we can start using Doobie for querying users. We’ll start by writing a couple of functions that’ll help us run the queries for one user or multiple users:

import cats.data.NonEmptyList
import doobie.imports.{Query => _, _}

def userById(id: Int): ConnectionIO[Option[Author]] =
  sql"SELECT * FROM author WHERE id = $id".query[Author].option

def usersByIds(ids: NonEmptyList[Int]): ConnectionIO[List[Author]] = {
  implicit val idsParam = Param.many(ids)
  sql"SELECT * FROM author WHERE id IN (${ids: ids.type})".query[Author].list
}

Let’s run some queries for individual users:

import doobie.imports._

userById(1).transact(xa).unsafeRun
//=> Some(User(1,William Shakespeare))

userById(42).transact(xa).unsafeRun
// => None

as well as multiple users:

import cats.data.NonEmptyList

val ids: NonEmptyList[UserId] = NonEmptyList(1, List(2, 3))

usersByIds(ids).transact(xa).unsafeRun
//=> List(User(1,William Shakespeare), User(2,Charles Dickens), User(3,George Orwell))

The only missing piece of the puzzle is the user data source, which should be fairly easy to implement now that we can query the user table.

implicit val userDS = new DataSource[UserId, User] {
  override def name = "UserDoobie"
  override def fetchOne(id: UserId): Query[Option[User]] =
    Query.sync {
      userById(id).transact(xa).unsafeRun
    }
  override def fetchMany(ids: NonEmptyList[UserId]): Query[Map[UserId, User]] =
    Query.sync { 
      usersByIds(ids).map { users =>
        users.map(a => a.id -> a).toMap
      }.transact(xa).unsafeRun
    }
}

def user(id: UserId): Fetch[User] = Fetch(id)

We’ve seen how to create a DB data source using Doobie, now it’s time to move on to the HTTP data source and how we can use them together!

Querying the Web service

As I mentioned before, we are using a third-party web service for storing to-do items related to our users in the database. We’ll use the JSON placeholder to emulate queries to an API that stores to-do items, so let’s start by using Circe for deriving the JSON decoders. Circe makes this really easy:

type TodoId = Int
case class Todo(id: TodoId, userId: UserId, title: String, completed: Boolean)

import io.circe._
import io.circe.generic.semiauto._

implicit val todoDecoder: Decoder[Todo] = deriveDecoder

That’s it; we can now decode Todo instance from JSON payloads. Our next step is to write a function to fetch a user’s to-do items given its user id, for which we will use the http4s HTTP client. One thing to take into account is that both Doobie and http4s use the Task type in some of their results types. Unfortunately, the former is from fs2 and the latter from Scalaz.

import org.http4s.circe._
import org.http4s.client.blaze._
import scalaz.concurrent.{Task => Zask}

val httpClient = PooledHttp1Client()

def todosByUser(id: UserId): Zask[List[Todo]] = {
  val url = s"https://jsonplaceholder.typicode.com/todos?userId=${id}"
  httpClient.expect(url)(jsonOf[List[Todo]])
}

We can now easily query the web service for obtaining a list of to-do items for a user:

todosByUser(1).run
// => List(Todo(1,1,delectus aut autem,false), ...)

The only piece missing is writing the to-do items’ data source, we’ll use the Query#async constructor and run the Scalaz Task’ returned by the HTTP client asynchronously. In the JVM, you can use Query#sync for blocking calls and Query#async for non-blocking calls, although, in JS most of your data sources will use Query#async since I/O in JavaScript does not block.

implicit val todosDS = new DataSource[UserId, List[Todo]] {
  override def name = "TodoH4s"
  override def fetchOne(id: UserId): Query[Option[List[Todo]]] =
    Query.async { (ok, fail) =>
      todosByUser(id).unsafePerformAsync(_.fold(fail, (x) => ok(Some(x))))
    }

  override def fetchMany(ids: NonEmptyList[UserId]): Query[Map[UserId, List[Todo]]] =
    batchingNotSupported(ids)
}

def todos(id: UserId): Fetch[List[Todo]] = Fetch(id)

Like many HTTP APIs, the to-do items’ endpoint doesn’t support batching, so we implement DataSource#fetchMany with the default unbatched implementation with DataSource#batchingNotSupported.

Putting it all together

We now have both data sources in place, let’s combine them and see how fetch optimizes data access. For querying users and to-dos, we’ll create a Fetch for each of them and combine them using .product. When combining fetches this way we are implicitly telling Fetch that they can be run in parallel:

def fetchUserAndTodos(id: UserId): Fetch[(User, List[Todo])] = user(id).product(todos(id))

Let’s run a fetch returned by the function above so you see that both queries (to the database and the web service) run concurrently; we’ll be using the debugging facilities recently introduced to fetch to visualize a fetch execution:

import cats.Id
import fetch.syntax._
import fetch.unsafe.implicits._
import fetch.debug._

describe(
  fetchUserAndTodos(1).runE[Id]
)
// ...
// [Concurrent]
//   [Fetch one] From `UserDoobie` with id 1
//   [Fetch one] From `TodoH4s` with id 1

Let’s take a look at a more involved example: a fetch that has multiple steps, deduplication and caching:

import fetch._

val involvedFetch: Fetch[List[(User, List[Todo])]] = for {
  userAndTodos <- Fetch.traverse(List(1, 2, 1, 2))(fetchUserAndTodos _)
  moreUserAndTodos <- Fetch.traverse(List(1, 2, 3))(fetchUserAndTodos _)
} yield userAndTodos ++ moreUserAndTodos

describe(
  involvedFetch.runE[Id]
)
// ..
// [Concurrent]
//   [Fetch many] From `UserDoobie` with ids List(1, 2)
//   [Fetch many] From `TodoH4s` with ids List(1, 2)
// [Concurrent]
//   [Fetch one] From `UserDoobie` with id 3
//   [Fetch one] From `TodoH4s` with id3

As you can see in the description of the fetch execution, the fetch was run in two rounds:

  • In the first, both data sources were queried in batch for the ids 1 and 2. Repeated identities were deduplicated.
  • In the second, both data sources were queried for getting the id 3. Note how Fetch didn’t need to ask for ids 1 and 2 since they are cached from the previous round.

Conclusion

The recently introduced asynchronous query support has made it possible to use Fetch with non-blocking clients and made it viable for using it in a non-JVM environment with Scala.js. Besides optimizing data access with caching, batching, and parallelism, Fetch lets you treat every data source uniformly and arbitrarily combines data from multiple data sources.

Hopefully, this article has helped you understand how Fetch can be useful. Feel free to drop by the Fetch Gitter channel to ask any questions. Most of the code contained in this article has been extracted from examples that Peter Neyens contributed to the Fetch repository. If you have more examples of Fetch usage, don’t hesitate to open a pull request so more people can benefit from them.

Ensure the success of your project

47 Degrees can work with you to help manage the risks of technology evolution, develop a team of top-tier engaged developers, improve productivity, lower maintenance cost, increase hardware utilization, and improve product quality; all while using the best technologies.