/http4s-consul-middleware

Middleware to rewrite request URIs using Consul for service discovery

Primary LanguageScala

http4s Consul Middleware

Middleware for http4s to allow users to discover the host and port for an HTTP request using Consul service discovery.

The middleware rewrites URIs of the form consul://service-name/path to an HTTP URI, using Consul to discover an available instance of the service service-name. For example, if Consul knew about a service named httpd available at 127.0.0.1:80, a request made to consul://httpd/example/path would be rewritten and requested from http://127.0.0.1:80/example/path.

Usage

http4s middleware wrap an underlying Client[F], so we need to have such a client available. Consul's API uses "blocking queries" implemented via long polling to immediately notify listeners when its internal state changes. For this reason, we recommend constructing a second Client[F] with a much longer timeout.

import cats.effect._, cats.syntax.all._, cats.effect.std._
import fs2.Stream
import org.http4s._, org.http4s.Method.GET, org.http4s.client.Client, org.http4s.ember.client.EmberClientBuilder, org.http4s.syntax.all._, org.http4s.client.dsl.io._
import org.typelevel.log4cats.Logger, org.typelevel.log4cats.slf4j.Slf4jLogger
import scala.concurrent.duration._
import com.dwolla.http4s.consul._
import cats.effect.unsafe.implicits.global

val exampleConsulUri = uri"consul://httpd/"

def clientWithTimeout[F[_] : Async](timeout: FiniteDuration): Resource[F, Client[F]] =
  EmberClientBuilder.default[F].withTimeout(timeout).build

def longPollClient[F[_] : Async]: Resource[F, Client[F]] = clientWithTimeout(10.minutes)

def typicalClient[F[_] : Async]: Resource[F, Client[F]] = clientWithTimeout(20.seconds)

def consulServiceDiscoveryAlg[F[_] : Async : Logger : Random]: Resource[F, ConsulServiceDiscoveryAlg[F]] =
  longPollClient[F].map(ConsulServiceDiscoveryAlg(uri"http://localhost:8500", 1.minute, _))

def consulAwareClient[F[_] : Async : Logger : Random]: Resource[F, Client[F]] =
  (consulServiceDiscoveryAlg[F], typicalClient[F])
    .parMapN(ConsulMiddleware(_)(_))
    .flatten

Random.scalaUtilRandom[IO].flatMap { implicit random =>
  Slf4jLogger.create[IO].flatMap { implicit logger =>
    // in a real app you'd probably want a real Trace implementation
    import natchez.Trace.Implicits.noop

    Stream.resource(consulAwareClient[IO])
      .flatMap { client =>
        // make a GET call to consul://httpd/, every 2 seconds, until shut down
        //
        // if you change the state of the Consul cluster by registering
        // or de-registering services named "httpd", you should see the
        // requests going to different endpoints over time.
        
        Stream.repeatEval(client.successful(GET(exampleConsulUri))
          .flatMap {
            case true => Logger[IO].info("🔮 success")
            case false => Logger[IO].info("🔮 failure")
          })
          .metered(2.seconds)
      }
      .compile
      .drain
  }
}
  .unsafeRunSync()