/strava-summary

Primary LanguageScalaBSD 3-Clause "New" or "Revised" LicenseBSD-3-Clause

Small Data Science with Typelevel Libraries

I like to run and bike. Like many other athletes, I use Strava for tracking my workouts. When I run outside, I almost always start a Strava run on my watch and a Peloton class on my phone (Strava for metrics & GPS, Peloton for instruction and company), resulting in two activites getting logged. In the past, only the Strava run would count towards the annual distance metric but recently, I noticed this changed, with both activities reporting distance. Thankfully, Strava has a very well documented API which allows full access to recorded activities. In this article, we'll look at how to use the Strava API to de-duplicate these runs as well as compute some additional metrics. We'll use a number of Typelevel libraries to implement a console application, including http4s, fs2, and Circe. In particular, we'll use the latest previews of these libraries, built on cats-effect 3.

Overview

Let's start by thinking about de-duplication. The general idea is to look for 2 or more time-wise overlapping activities, taking the activity with the longest distance and discarding the others. This approach assumes there's a single "longest" encompassing activity (e.g., a Strava run) with other shorter activities (e.g., a Peloton class or two). To implement de-duplication, we'll need a list of activities in a time period, with each activity providing at minimum, a period and a distance.

A quick persual of the API docs shows that a simple GET /athlete/activities request returns a list of activities, with each activity providing the data necessary for de-duplication. The /athlete/activities resource supports filtering activities by start date via before and after query params. Futher, it supports pagination, with a maximum of 200 activities per page, and with an end of result signaled by an empty response.

Hence, all we have to do is make a few calls to /athlete/activities, implement our de-duplication logic, and summarize the results!

Application Architecture

We'll implement two console applications -- one that fetches activities for a specified year and writes them to local storage and another that reads the stored activities, de-duplicates them, and summarizes them. We could do this as a single application but I have found it useful to fetch the data once and then iterate on the processing logic without authentication and without waiting for activity retrieval. This also allows REPL-based experimentation with the fetched data.

Authentication

One complication is that Strava uses OAuth2 for authentication. The linked authentication documentation gets in to all the details, but in summary:

  • we register our application with Strava, which gives us a client id and client secret
  • we implement a bearer token retrieval scheme which:
    • launches a web browser to the Strava website, where the user logs in to Strava and grants our application access to read their activities
    • as a result of the user granting access, a single-use authorization code is provided to our application
    • using the authorization code, the application fetches an access token
    • the access token is then used in all Strava API requests

For those with OAuth 2 experience, there's no need to implement token renewal as Strava's access tokens are valid for 6 hours -- we're building a command line app which will terminate in a few seconds.

After the user grants access to our application, the Strava website redirects the user's browser to a URL of our choosing, providing the needed authorization code. Hence, we'll need to start an HTTP server in order for the redirect to have a target. After starting the HTTP server, we'll open a browser and ask the user to grant access. Once Strava redirects back to our application, we can extract the authorization code and tear down the HTTP server. Here's how we can implement this:

def getAuthorizationCode(clientId: ClientId): IO[AuthorizationCode] = {
  Deferred[IO, AuthorizationCode].flatMap { deferredAuthCode =>
    Dispatcher[IO].use { dispatcher =>
      BlazeServerBuilder[IO](global, dispatcher)
        .bindHttp(port = 0)
        .withHttpApp { 
          object CodeParam extends QueryParamDecoderMatcher[String]("code")
          HttpRoutes.of[IO] {
            case GET -> Root / "exchange_token" :? CodeParam(code) =>
              deferredAuthCode.complete(AuthorizationCode(code)).as(Response(Status.Ok))
          }.orNotFound
        }
        .stream.flatMap { server =>
          val port = server.address.getPort
          Stream.eval(requestAuthCode(clientId, port) *> deferredAuthCode.get)
        }.compile.lastOrError
    }
  }
}

We first create a Deferred[IO, AuthorizationCode], which will eventually be completed with the code provided by Strava. We then start an http4s server with a route that handles the /exchange_token?code=${authorizationCode} redirect by completing the Deferred[IO, AuthorizationCode]. To avoid a port conflict, we bind the server to port 0, which causes the operating system to pick an unused ephemeral port number. We query for the selected port number and use it in the subsequent redirect URI generation. Finally, we open a browser to Strava and then wait for the deferred authorization code to be completed. This is all expressed as an fs2.Stream which is compiled to a value of IO[AuthorizationCode] via .compile.lastOrError. As a result, when the AuthorizationCode is returned, the web server is shut down as part of stream finalization.

Opening a browser is accomplished via the open utility:

def requestAuthCode(clientId: ClientId, localPort: Int): IO[Unit] = {
  IO.blocking {
    import scala.sys.process._
    s"open http://www.strava.com/oauth/authorize?client_id=${clientId}&response_type=code&redirect_uri=http://localhost:${localPort}/exchange_token&approval_prompt=force&scope=read,activity:read".!
  }
}

Here we've used scala.sys.process to shell out to open but for more complicated interaction with processes, check out the prox library. Because scala.sys.process blocks for the process to complete, we wrap its execution with IO.blocking to ensure the blocking does not occur on our main compute pool.

Alright, now that we have an authorization code, we can fetch a bearer token:

def fetchBearerToken(client: Client[IO], clientId: ClientId, clientSecret: ClientSecret, authorizationCode: AuthorizationCode): IO[BearerToken] = {
  val request = Method.POST(
    UrlForm(
      "client_id" -> clientId.value,
      "client_secret" -> clientSecret.value,
      "code" -> authorizationCode.value,
      "grant_type" -> "authorization_code"),
    Uri.uri("https://www.strava.com/oauth/token"))
  client.expect(request)(jsonOf[IO, BearerToken])
}

This is straightforward usage of the http4s client API, though it relies on the client DSL (Http4sClientDsl[IO]) being mixed in to the containing type. The Client[IO] parameter is a value created at startup and used to make all client HTTP requests. The clientId and clientSecret parameters are values we obtained as a result of registering our application with Strava. We pass these as parameters to avoid putting any secrets in the code -- for this app, we get them from command line arguments but in a production grade application, we'd fetch these from Vault or some other secret storage system.

Putting these pieces together gives us the overall authentication workflow:

def getBearerToken(client: Client[IO], clientId: ClientId, clientSecret: ClientSecret): IO[BearerToken] =
  getAuthorizationCode(clientId).flatMap(fetchBearerToken(client, clientId, clientSecret, _))

Fetching Activities

Now that we have a bearer token, we can fetch activities via GET /athlete/activities. The only complication we need to handle is pagination. Each request takes a page=${n} query parameter, starting with 1. The API docs instruct us to increment the page number until we receive a response with no activities.

This is a common pattern when considering how streams of elements are constructed -- there's an initial state, the page number, and an effectful action which generates both the stream elements (activities) and the next state (page number + 1). The fs2.Stream object expresses this pattern via the unfoldLoopEval constructor:

def unfoldLoopEval[F[_], S, O](s: S)(f: S => F[(O, Option[S])]): Stream[F, O]

We'll use this with F = IO, S = Int (page number), and O = Vector[Json] (page of activities). Each invocation of f produces another page of elements and then either the next page number (wrapped in Some) or None, indicating there are no more pages to fetch.

def fetchActivitiesJson(client: Client[IO], bearerToken: BearerToken, after: ZonedDateTime, before: ZonedDateTime, page: Int = 1): Stream[IO, Json] = {
  Stream.unfoldLoopEval(1) { page =>
    val request = Method.GET(
      Uri.unsafeFromString(s"https://www.strava.com/api/v3/athlete/activities?after=${after.toEpochSecond}&before=${before.toEpochSecond}&page=${page}&per_page=200"),
        Accept(MediaType.application.json),
        Authorization(Credentials.Token(AuthScheme.Bearer, bearerToken.accessToken))
    )
    client.expect(request)(jsonOf[IO, Vector[Json]]).map { activities =>
      val nextPage = if (activities.nonEmpty) Some(page + 1) else None
      (activities, nextPage)
    }
  }.flatMap(activities => Stream.chunk(Chunk.vector(activities)))
}

We decoded each page with Circe to a Vector[Json], which may seem odd -- why not fully decode the activities as a Vector[Activity] or do no decoding and return a single Json? Our goal is to just write these values out to storage so decoding to an Activity just to later re-encode to Json is unnecessary. If we did no decoding and returned a single Json value for the entire page, we would not be able to emit individual activities and hide pagination from the caller.

Note unfoldLoopEval gives us a Stream[IO, Vector[Json]] and we want a Stream[IO, Json] -- we accomplish that by flat mapping the result of unfoldLoopEval and turning each page of activities in to a stream of individual activities.

Activity Storage

This application needs simple local storage for the activities list. Since we can fit all activities in to memory and don't need any query abilities, we'll just store the full list of activities as json to a local file using the fs2.io.file package.

private val path = Paths.get("activities.json")

def writeActivitiesJson(activities: Vector[Json]): IO[Unit] = {
  val asString = Json.fromValues(activities).spaces2
  Stream.emit(asString)
    .through(text.utf8Encode)
    .through(Files[IO].writeAll(path, flags = List(
      StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING
    ))).compile.drain
}

Wiring everything together gives us this application:

object FetchActivities extends IOApp with Http4sClientDsl[IO] {
  def run(args: List[String]): IO[ExitCode] = {
    if (args.size != 3) {
      IO(Console.err.println("Syntax: FetchActivities <client id> <client secret> <year to fetch>")).as(ExitCode.Error)
    } else {
      val clientId = ClientId(args.head)
      val clientSecret = ClientSecret(args.tail.head)
      val yearToFetch = Year.of(args.tail.tail.head.toInt)
      val client = JavaNetClientBuilder[IO].create
      getBearerToken(client, clientId, clientSecret).flatMap { bearerToken =>
        fetchActivitiesForYearJson(client, bearerToken, yearToFetch)
          .compile.toVector
          .flatTap(activities => IO(println(s"Fetched ${activities.size} activities")))
          .flatMap(ActivityStorage.writeActivitiesJson)
      }.as(ExitCode.Success)
    }
  }
}

This application uses the fetchActivitiesForYearJson method, which is implemented via the fetchActivitiesJson method we wrote above. Given the relatively small number of activities, we chose to accumulate all activities in to a single Vector[Json] via .compile.toVector before writing them to storage. We could have handled this in a streaming fashion instead by using circe-fs2 and a streaming JSON file format.

There's no validation or error handling of the command line arguments -- for a production grade application, we could use the Decline library.

We're using JavaNetClientBuilder to create a Client[IO] because as of the pubilcation date of this article, that's the only Client[F] implementation available for cats-effect 3. Additional client implementations will be available shortly (e.g. blaze, ember, async-http-client, netty).

Processing Activities

The processing application is much simpler. We need to read & decode the stored activities and then run our various computations on the result.

Let's start with reading & decoding:

def readActivities: IO[Vector[Activity]] =
  Files[IO].readAll(path, 4096).through(text.utf8Decode)
    .compile.string
    .flatMap { str =>
      parse(str).flatMap(_.as[Vector[Activity]]) match {
        case Left(err) => IO.raiseError(err)
        case Right(activities) => IO.pure(activities)
      }
  }

We use fs2.io.file again, this time reading the full contents of the file, decoding as UTF8. We accumulate the decoded results in to a single value of IO[String] via .compile.string. Then we parse that string to a Json value and then decode that value as a Vector[Activity].

Next, we need to compute various metrics on the Vector[Activity]. These metrics are all straightforward usage of the Scala collection API. For example, computing the total mileage of a Vector[Activity] and rendering as a friendly string:

def totalMiles(activities: Vector[Activity]): String =
  f"${metersToMiles(activities.foldLeft(0.0d)(_ + _.distance))}%.2f mi"

def metersToMiles(meters: Double): Double = meters * 0.000621371

Each activity has a type field indicating whether it is a ride, run, etc. We can use this field and some others to compute interesting breakouts:

val runs = activities.filter(_.tpe == "Run")
val zwiftRides = activities.filter(_.tpe == "VirtualRide")
val rides = activities.filter(_.tpe == "Ride")
val pelotonRides = rides.filter(_.trainer)
val outdoorRides = rides.filterNot(_.trainer)

The de-duplication logic is a bit more complicated:

def dedupe(activities: Vector[Activity]): Vector[Activity] = {
  val remaining = collection.mutable.SortedSet(activities: _*)(
    Ordering.by((a: Activity) => (-a.distance, a.startDate.toEpochMilli, a.name))
  )
  val bldr = Vector.newBuilder[Activity]
  while (remaining.nonEmpty) {
    val head = remaining.head
    val dupes = remaining.filter(_.overlaps(head))
    bldr += head
    remaining --= dupes
  }
  bldr.result()
}

We sort the activities so the longest activity is first and the shortest activity is last. We then take the longest activity, discard any other activities which overlap with it, and repeat. We continue the iteration until there are no remaining activities.

Hence, the full processing application is:

object ProcessActivities extends IOApp.Simple {
  def run: IO[Unit] = ActivityStorage.readActivities.flatMap(summarizeActivities)

  def totalMiles(activities: Vector[Activity]): String =
    f"${metersToMiles(activities.foldLeft(0.0d)(_ + _.distance))}%.2f mi"

  def metersToMiles(meters: Double): Double = meters * 0.000621371

  def dedupe(activities: Vector[Activity]): Vector[Activity] = {
    val remaining = collection.mutable.SortedSet(activities: _*)(
      Ordering.by((a: Activity) => (-a.distance, a.startDate.toEpochMilli, a.name))
    )
    val bldr = Vector.newBuilder[Activity]
    while (remaining.nonEmpty) {
      val head = remaining.head
      val dupes = remaining.filter(_.overlaps(head))
      bldr += head
      remaining --= dupes
    }
    bldr.result()
  }

  def summarizeActivities(activities: Vector[Activity]): IO[Unit] = {
    val runs = activities.filter(_.tpe == "Run")
    val dedupedRuns = dedupe(runs)
    val zwiftRides = activities.filter(_.tpe == "VirtualRide")
    val rides = activities.filter(_.tpe == "Ride")
    val pelotonRides = rides.filter(_.trainer)
    val outdoorRides = rides.filterNot(_.trainer)
    IO(println(
      s"""|Loaded ${activities.size} activities
          |
          |Run mileage (${runs.size}): ${totalMiles(runs)}
          | - Deduped run mileage (${dedupedRuns.size}): ${totalMiles(dedupedRuns)}
          |
          |Total ride mileage (${zwiftRides.size + rides.size}): ${totalMiles(zwiftRides ++ rides)}
          | - Zwift mileage (${zwiftRides.size}): ${totalMiles(zwiftRides)}
          | - Peloton mileage (${pelotonRides.size}): ${totalMiles(pelotonRides)}
          | - Outdoor ride mileage (${outdoorRides.size}): ${totalMiles(outdoorRides)}
          """.stripMargin))
  }
}

Summary

Success! We accomplished our goal with a small amount of code.

Most of the complexity in this project was in the OAuth exchange. I'd love to see an OAuth2 http4s library that simplifies this kind of stuff.

By structuring the project as two applications, we were able to experiment with the data set in the REPL and iterate quickly on our processing logic. I find this style of "small data science" very effective and very applicable. No need for Spark or worksheets when you have small data sets and powerful libraries.