/blog-hashing

Primary LanguageShellMIT LicenseMIT

Streaming Hash Computations

This article is the latest in a series of posts that discuss evolving functional APIs. The previous post is Optimizing Functional Walks of File Trees.

The fs2-io library provides support for computing cryptographic hashes, e.g. SHA-256, in a functional way. It provides a single API that works on the JVM, Scala.js, and Scala Native, delegating to the Java Cryptography API, Web Crypto, and OpenSSL respectively.

The fs2.hash object provides a pipe for each supported hash function:

package fs2

object hash {
  def md5[F[_]]: Pipe[F, Byte, Byte] = ???
  def sha1[F[_]]: Pipe[F, Byte, Byte] = ???
  def sha256[F[_]]: Pipe[F, Byte, Byte] = ???
  def sha384[F[_]]: Pipe[F, Byte, Byte] = ???
  def sha512[F[_]]: Pipe[F, Byte, Byte] = ???
}

These pipes take a source byte stream and output a new byte stream. Pulling on the output byte stream causes the source to be pulled on and all bytes emitted from the source are added to the hash calculation. Upon termination of the source, the hash is finalized and output as a single chunk.

Computing the hash of a byte stream can be accomplished by sending that source to a hash pipe and collecting the resulting output in to a collection.

import fs2.{Chunk, Pure, Stream, hash}
import scodec.bits.ByteVector

val source: Stream[Pure, Byte] = Stream.chunk(Chunk.array("The quick brown fox".getBytes))
// source: Stream[Pure, Byte] = Stream(..)
val hashed = source.through(hash.sha256).to(ByteVector)
// hashed: ByteVector = ByteVector(32 bytes, 0x5cac4f980fedc3d3f1f99b4be3472c9b30d56523e632d151237ec9309048bda9)

This works equally well for effectful streams -- for example, hashing a file:

import cats.effect.IO
import fs2.{Chunk, Pure, Stream, hash}
import fs2.io.file.{Files, Path}
import scodec.bits.ByteVector

val source: Stream[IO, Byte] = Files[IO].readAll(Path("LICENSE"))
// source: Stream[[A >: Nothing <: Any] => IO[A], Byte] = Stream(..)
val hashed = source.through(hash.sha256).compile.to(ByteVector)
// hashed: IO[Out] = IO(...)

import cats.effect.unsafe.implicits.global
val result = hashed.unsafeRunSync()
// result: ByteVector = ByteVector(32 bytes, 0x0e8b76ea2b69ae6f3482c7136c3d092ca0c6f5d4480a46b6b01f6bb8ed7d7d9b)

The fs2.hash API is too simple though. Consider a scenario where you want to write a byte stream to a file and write a hash of the same byte stream to another file. Doing each of these transformations in isolation is easy:

def writeToFile[F[_]: Files](source: Stream[F, Byte], path: Path): Stream[F, Nothing] =
  source.through(Files[F].writeAll(path))

def writeHashToFile[F[_]: Files](source: Stream[F, Byte], path: Path): Stream[F, Nothing] =
  source.through(hash.sha256).through(Files[F].writeAll(path))

Perhaps the simplest way to combine these functions is via stream composition:

def writeFileAndHash[F[_]: Files](source: Stream[F, Byte], path: Path): Stream[F, Nothing] =
  writeToFile(source, path) ++ writeHashToFile(source, Path(path.toString + ".sha256"))

This approach has a major issue though: the source stream is processed twice -- once when writing the file and once when computing the hash. For some sources, this is simply inefficient. Imagine a source stream that originates from another file on the file system. This solution would result in opening that file twice. The inefficiency is worse for streams with lots of computations, as those computations would be run twice as well.

There's a bigger issue though. Some streams aren't safe to be used multiple times -- a stream of bytes from a network socket for instance. Using those streams more than once often result in unexpected results. When streaming from a socket, the bytes received are gone by the time the stream is run a second time. Maybe the second evaluation returns any new bytes received on the socket? Or maybe the socket was closed as an implementation detail of reaching the end of the source stream and the second evaluation results in an error or a hang.

We need a way to process the source stream once and simultaneously write to the output file and compute the hash. The broadcastThrough operation on Stream directly supports this use case -- it takes one or more pipes as an argument and sends any emitted elements from the source to all of those pipes, collecting their outputs in to a single output stream.

import cats.effect.Concurrent

def writeFileAndHashViaBroadcast[F[_]: Files: Concurrent](
  source: Stream[F, Byte], path: Path
): Stream[F, Nothing] =
  source.broadcastThrough(
    s => writeToFile(s, path),
    s => writeHashToFile(s, Path(path.toString + ".sha256"))
  )

Equivalently, since broadcastThrough operates on pipes, we could inline the definitions of writeToFile and writeHashToFile and simplify a bit, keeping each expression as a pipe:

def writeFileAndHashViaBroadcastPipes[F[_]: Files: Concurrent](
  source: Stream[F, Byte], path: Path
): Stream[F, Nothing] =
  source.broadcastThrough(
    Files[F].writeAll(path),
    hash.sha256 andThen Files[F].writeAll(Path(path.toString + ".sha256"))
  )

In either case, we picked up a Concurrent constraint, indicating broadcastThrough is doing some concurrency. This technique certainly works but it feels a bit overkill. The broadcastThrough operator is an example of a scatter-gather algorithm. The chunks from the source stream are scattered to each pipe and the subsequent outputs of those pipes are gathered back in to a single output stream. There's a performance penalty to this coordination, though if the chunk sizes are sufficiently large then performance is unlikely to be an issue in practice. Still, it seems like this solution violates the principle of least power. We should be able to compute a hash while processing a byte stream, without introducing complex concurrency constructs.

Building Blocks

FS2 pipes are completely opaque -- they are simple functions from streams to streams (i.e. Pipe[F, A, B] is an alias for Stream[F, A] => Stream[F, B]). While this makes for easy composition via andThen, the opacity leaves us little control. We can only compose pipes in to larger pipes or apply a source to a pipe, yielding an output stream. We can't inspect elements that flow through it or interact with it in any other way.

The implementation of the hashing pipes we've seen so far is based on observing the chunks of a source stream, updating a running hash computation with each observed chunk, and then emitting the final hash value upon completion of the source. We could model the streaming hash computation with a Hasher trait:

trait Hasher:
  def update(bytes: Chunk[Byte]): Unit
  def hash: Chunk[Byte]

To compute a hash of a stream using this API, we would create a new Hasher instance for a desired hash function (more on this in a moment), call update for each chunk in the stream, and emit the result of hash when the source completes. However, the Hasher trait clearly encapsulates some mutable state and hence the methods must have side effects. To make the API referentially transparent, we can parameterize Hasher by an effect type:

import fs2.{Chunk, Stream}
import cats.effect.IO
trait Hasher[F[_]]:
  def update(bytes: Chunk[Byte]): F[Unit]
  def hash: F[Chunk[Byte]]

How do we create instances of Hasher? We'll need to know the desired hash function at the very lease. We'll also need to suspend over some mutable state, so we could reach for a Sync constraint. But we want this to be a general purpose library and we don't want to propagate Sync constraints through call sites. Instead, let's introduce a capability trait that allows creation of Hasher given a hash algorithm. Then we can provide a constructor for Hashing[F] given a Sync[F].

import cats.effect.{Resource, Sync, SyncIO}
import fs2.Pipe

enum HashAlgorithm:
  case SHA1
  case SHA256
  case SHA512
  // etc.

trait Hashing[F[_]]:
  def hasher(algorithm: HashAlgorithm): Resource[F, Hasher[F]]

object Hashing:
  def apply[F[_]](using F: Hashing[F]): F.type = F

  def forSync[F[_]: Sync]: Hashing[F] = new Hashing[F]:
    def hasher(algorithm: HashAlgorithm): Resource[F, Hasher[F]] =
      ??? // Implementation isn't important right now but assume this delegates to platform crypto apis

  given forIO: Hashing[IO] = forSync[IO] 
  given forSyncIO: Hashing[SyncIO] = forSync[SyncIO]

The Hashing[F] capability trait is a typeclass that allows creation of Hasher[F] instances. A new hasher is returned as a Resource[F, Hasher[F]], allowing the implementation to manage initialization and finalization of an instance. This may seem like overkill for a pure function -- afterall, isn't a hash a simple calculation that digests an arbitrary number of bytes in to a fixed size number of bytes? Implementations are free to use operating system resources thought -- e.g., delegating to a hardware security module and hence abstracting over a communication channel with a hardware device.

Given this new implementation, we can hash a stream in a relatively straightforward fashion:

import cats.effect.MonadCancelThrow

def hashingPipe[F[_]: Hashing: MonadCancelThrow](algorithm: HashAlgorithm): Pipe[F, Byte, Byte] =
  source =>
    Stream.resource(Hashing[F].hasher(algorithm)).flatMap:
      hasher =>
        source.chunks.foreach(c => hasher.update(c)) ++ Stream.evalUnChunk(hasher.hash)

We're back to where we've started -- we've implemented a Pipe[F, Byte, Byte] using our new lower level Hashing API. But this API also supports other use cases that our original API did not. Consider this utility function:

def observe[F[_]: Hashing: MonadCancelThrow](
  algorithm: HashAlgorithm,
  sink: Pipe[F, Byte, Nothing]
): Pipe[F, Byte, Byte] =
  source =>
    Stream.resource(Hashing[F].hasher(algorithm)).flatMap:
      hasher =>
        source.chunks.evalTap(c => hasher.update(c)).unchunks.through(sink) ++ Stream.evalUnChunk(hasher.hash)

This function takes a sink that operates on bytes and returns a pipe that accepts bytes and emits a single hash as a chunk upon completion. The implementation sends the source to the sink while observing any chunks that pass between them and at completion, emits the computed hash. This combinator lets us implement our original use case of writing a file and writing a hash in a single pass:

def writeFileAndHash[F[_]: Files: Hashing: MonadCancelThrow](
  source: Stream[F, Byte],
  path: Path
): Stream[F, Nothing] =
  source
    .through(observe(HashAlgorithm.SHA256, Files[F].writeAll(path)))
    .through(Files[F].writeAll(Path(path.toString + ".sha256")))

fs2.hashing

The fs2 3.11 release introduced the fs2.hashing package, which builds upon this general technique. The Hashing[F] capability trait is defined like this:

sealed trait Hashing[F[_]]:
  def hasher(algorithm: HashAlgorithm): Resource[F, Hasher[F]]
  def hmac(algorithm: HashAlgorithm, key: Chunk[Byte]): Resource[F, Hasher[F]]
  def hash(algorithm: HashAlgorithm): Pipe[F, Byte, Hash]
  def hashWith(hasher: Resource[F, Hasher[F]]): Pipe[F, Byte, Hash]

In addition to hasher, the Hashing trait supports HMAC. The hash and hashWith operations provide pipes that compute hashes of byte streams. The output type is Hash instead of Byte. The Hash type is a simple wrapper over a Chunk[Byte] that has a nice toString and a constant time equals implementation (to avoid timing attacks).

The Hasher type is a bit more interesting:

trait Hasher[F[_]]:
  def update(bytes: Chunk[Byte]): F[Unit]
  def hash: F[Hash]

  protected def unsafeUpdate(chunk: Chunk[Byte]): Unit
  protected def unsafeHash(): Hash

  def update: Pipe[F, Byte, Byte] =
    _.mapChunks: c =>
      unsafeUpdate(c)
      c

  def observe(sink: Pipe[F, Byte, Nothing]): Pipe[F, Byte, Hash] =
    source => sink(update(source)) ++ Stream.eval(hash)

  def drain: Pipe[F, Byte, Hash] = observe(_.drain)

  def verify(expected: Hash): Pipe[F, Byte, Byte] =
    source =>
      update(source)
        .onComplete(
          Stream
            .eval(hash)
            .flatMap: actual =>
              if actual == expected then Stream.empty
              else Pull.fail(HashVerificationException(expected, actual)).streamNoScope
        )

Besides the fundamental update and hash operations, there's internal unsafeUpdate and unsafeHash operations and a number of pipes. The update, observe and drain pipes are all things we've seen already. update returns a pipe that updates the hasher as chunks are pulled, observe does the same for chunks pulled by a sink and emits the computed hash, and drain uses observe with a drain sink, resulting in the behavior of our original hash pipe. The verify pipe is new -- it acts as an identity pipe but raises an error if the source doesn't hash to the expected value.

Writing a file and its hash with fs2.hashing looks like this:

import cats.effect.MonadCancelThrow
import fs2.{Stream, Pipe}
import fs2.hashing.{Hashing, HashAlgorithm}
import fs2.io.file.{Files, Path}

def writeFileAndHash[F[_]: Hashing: Files: MonadCancelThrow](path: Path): Pipe[F, Byte, Nothing] =
  source =>
    // Create a hash
    Stream.resource(Hashing[F].hasher(HashAlgorithm.SHA3_256)).flatMap: h =>
      source
        // Write source to file, updating the hash with observed bytes
        .through(h.observe(Files[F].writeAll(path)))
        // Write digest to separate file
        .map(_.bytes)
        .unchunks
        .through(Files[F].writeAll(Path(path.toString + ".sha256")))

What about hashing pure streams though? The operations in fs2.hash weren't limited to effectful streams, but we can't define a Hashing[Pure] instance due to Hashing using mutable state internally. To replace this functionality, Hashing defines the hashPureStream operation on its companion:

val source: Stream[Pure, Byte] = Stream.chunk(Chunk.array("The quick brown fox".getBytes))
// source: Stream[Pure, Byte] = Stream(..)
val hashed = Hashing.hashPureStream(HashAlgorithm.SHA256, source)
// hashed: Hash = 5cac4f980fedc3d3f1f99b4be3472c9b30d56523e632d151237ec9309048bda9

This is internally implemented using SyncIO and the hash operation:

def hashPureStream(algorithm: HashAlgorithm, source: Stream[Pure, Byte]): Hash =
  source.through(Hashing[SyncIO].hash(algorithm)).compile.lastOrError.unsafeRunSync()

The old fs2.hash implementations played a similar trick. They didn't directly use SyncIO but rather used carefully placed side effects to avoid needing typeclass constraints.

Conclusion

The hashing support in fs2.hash is conceptually elegant but ultimately the wrong abstraction. The API is too opaque to do anything besides hashing a stream. In this article, we took a tour of refactoring this API in to something with much more utility.

Special thanks to:

  • Antonio Gelameris for inspiring this new API with a question on the fs2 Discord channel
  • i10416 for inspiring addition of SHA3 and HMAC support
  • Arman Bilge for API suggestions and code reviews