Algae is a Scala library providing a diverse group of final tagless algebras.
Algae (/ˈældʒi, ˈælɡi/; singular alga /ˈælɡə/) is an informal term for a large, diverse group of photosynthetic eukaryotic organisms that are not necessarily closely related, and is thus polyphyletic.
Algae is a new project under active development. Feedback and contributions are welcome.
Algae defines final tagless algebras around common capabilities, such as Logging
and Counting
. The core library makes use of cats, cats-mtl, and cats-effect, while modules use external libraries to implement, complement, and define extra algebras. Algae also defines supportive constructs like: type classes, immutable data, and pure functions.
To get started with sbt, simply add the following lines to your build.sbt
file.
val algaeVersion = "0.1.7"
resolvers += Resolver.bintrayRepo("ovotech", "maven")
libraryDependencies += "com.ovoenergy" %% "algae-core" % algaeVersion
The Config
algebra implements basic configuration using environment variables and system properties. An implementation using Ciris is defined as CirisConfig
, and there's a KubernetesConfig
algebra for Kubernetes secrets support. There is also an AivenKafkaConfig
algebra using ciris-aiven-kafka. Following are the relevant modules and the lines you need for build.sbt
to include them in your project.
libraryDependencies ++= Seq(
"com.ovoenergy" %% "algae-ciris" % algaeVersion,
"com.ovoenergy" %% "algae-ciris-kubernetes" % algaeVersion,
"com.ovoenergy" %% "algae-ciris-aiven-kafka" % algaeVersion
)
To create an instance of CirisConfig
, simply import algae.ciris._
and use createCirisConfig
.
import algae.ciris._
import cats.MonadError
import cats.effect.IO
import ciris.loadConfig
import ciris.cats.effect._
final case class Config(appEnv: String, maxRetries: Int)
def loadConfiguration[F[_]](config: CirisConfig[F])(
implicit F: MonadError[F, Throwable]
): F[Config] = {
loadConfig(
config.env[String]("APP_ENV"),
config.prop[Int]("max.retries")
)(Config.apply).orRaiseThrowable
}
val cirisConfig = createCirisConfig[IO]
loadConfiguration(cirisConfig)
To create an instance of KubernetesConfig
, simply import algae.ciris.kubernetes._
and use either:
createDefaultKubernetesConfig
to use the default configuration options, orcreateKubernetesConfig
to customize the API client and authenticators to use.
import algae.ciris.kubernetes._
import cats.effect.ContextShift
import ciris.Secret
implicit val contextShift: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
final case class Config(appEnv: String, maxRetries: Int, apiKey: Secret[String])
def loadConfiguration[F[_]](config: KubernetesConfig[F])(
implicit F: MonadError[F, Throwable]
): F[Config] = {
loadConfig(
config.env[String]("APP_ENV"),
config.prop[Int]("max.retries"),
config.secret[Secret[String]]("namespace", "api-key")
)(Config.apply).orRaiseThrowable
}
createDefaultKubernetesConfig[IO].
flatMap(loadConfiguration[IO])
To create an instance of AivenKafkaConfig
, simply import algae.ciris.aiven.kafka._
and use createAivenKafkaConfig
.
import algae.ciris.aiven.kafka._
val aivenKafkaConfig: AivenKafkaConfig[IO] =
createAivenKafkaConfig[IO]
The Counting
algebra implements counting and count accumulation. Accumulation is supported via the MonadLog
type class, and createMonadLog
is available to create a MonadLog
using a Ref[F]
. An implementation of Counting
using Kamon
is provided. If count accumulation isn't necessary, CountingNow
can be used. Additionally, there are functions for configuring Kamon modules available.
libraryDependencies ++= Seq(
"com.ovoenergy" %% "algae-kamon" % algaeVersion,
"com.ovoenergy" %% "algae-kamon-influxdb" % algaeVersion,
"com.ovoenergy" %% "algae-kamon-system-metrics" % algaeVersion
)
Start by defining your counter increments. It's recommended to use a coproduct, like a sealed abstract class
. This keeps your counter increments in a single place, and the logic for what is counted is encapsulated in the increments. A CounterIncrement
consists of a counter name, a map of tags, and the number of times to increment.
import algae.counting._
object counts {
sealed abstract class Count(
val counterName: String,
val tags: Map[String, String],
val times: Long
)
case object ApplicationStarted extends
Count("application_started", Map.empty, 1L)
final case class SaidHello(override val times: Long) extends
Count("said_hello", Map.empty, times)
implicit val countCounterIncrement: CounterIncrement[Count] =
CounterIncrement.from(_.counterName, _.tags, _.times)
}
To create an instance of Counting
, we'll need to have a MonadLog
instance. We'll use IO
and create an instance from a Ref
, using createMonadLog
. We need to choose a collection type, and Chain
is a good default choice here because it supports constant-time append. We also setup reporting and system metrics collection using provided functions.
import algae._
import algae.kamon._
import algae.kamon.influxdb._
import algae.kamon.system._
import cats.data.Chain
import counts._
val kamonSetup =
influxDbRegistration[IO].
flatMap(_ => systemMetricsCollection[IO])
kamonSetup.use { _ =>
for {
counts <- createMonadLog[IO, Chain[Count]]
counting = createCounting(counts)
_ <- counting.countNow(ApplicationStarted)
_ <- counting.count(SaidHello(1L))
_ <- counting.count(SaidHello(1L))
_ <- counting.dispatchCounts
} yield ()
}
The example above immediately counts ApplicationStarted
and then counts SaidHello
twice when dispatchCounts
is invoked. After dispatching counts with dispatchCounts
, accumulated counts are cleared. It's worth noting that the counter increments are stored in a separate Ref
, so even if part of your program fails, any counts are still available.
The algae-fs2-kafka
module provides KafkaConsumer
and KafkaProducer
algebras and implements them using fs2-kafka.
libraryDependencies += "com.ovoenergy" %% "algae-fs2-kafka" % algaeVersion
To create a KafkaConsumer
, you can use these functions:
-
createKafkaConsumerStream[F, K, V](settings)
, or -
createKafkaConsumerStream[F].using(settings)
,
or one of the following, where a new default ExecutionContext
will be created.
-
createDefaultKafkaConsumerStream[F, K, V](settings)
, or -
createDefaultKafkaConsumerStream[F].using(settings)
.
To create a KafkaProducer
, use one of the following functions.
-
createKafkaProducerStream[F, K, V](settings)
, or -
createKafkaProducerStream[F].using(settings)
.
Following is an example of how to create a KafkaConsumer
and KafkaProducer
.
import cats.data.NonEmptyList
import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.functor._
import cats.syntax.traverse._
import algae.fs2.kafka._
import _root_.fs2.kafka._
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
object Main extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val consumerSettings = (executionContext: ExecutionContext) =>
ConsumerSettings(
keyDeserializer = new StringDeserializer,
valueDeserializer = new StringDeserializer,
executionContext = executionContext
)
.withAutoOffsetReset(AutoOffsetReset.Earliest)
.withBootstrapServers("localhost")
.withGroupId("group")
val producerSettings =
ProducerSettings(
keySerializer = new StringSerializer,
valueSerializer = new StringSerializer,
)
.withBootstrapServers("localhost")
val topics =
NonEmptyList.one("topic")
def processRecord(record: ConsumerRecord[String, String]): IO[(String, String)] =
IO.pure(record.key -> record.value)
val stream =
for {
consumer <- createDefaultKafkaConsumerStream[IO].using(consumerSettings)
producer <- createKafkaProducerStream[IO].using(producerSettings)
_ <- consumer.subscribe(topics)
_ <- consumer.stream
.mapAsync(25)(message =>
processRecord(message.record)
.map {
case (key, value) =>
val record = new ProducerRecord("topic", key, value)
ProducerMessage.single(record, message.committableOffset)
})
.evalMap(producer.produceBatched)
.map(_.map(_.passthrough))
.to(commitBatchWithinF(500, 15.seconds))
} yield ()
stream.compile.drain.as(ExitCode.Success)
}
}
The Logging
algebra implements log accumulation and dispatching of log messages, with support for diagnostic contexts. This is done via the MonadLog
type class, which is a thin wrapper around MonadState
, with additional laws governing log accumulation. If you're working with a Sync[F]
context, a Ref[F]
can be used to implement MonadLog
, and there's a createMonadLog
helper function for exactly that. If log accumulation isn't necessary, LoggingNow
can be used.
If you want slf4j logging support, simply add the algae-slf4j
module to your dependencies in build.sbt
.
The algae-logback
module adds Logback as a dependency, for convenience when wanting to use Logback.
libraryDependencies ++= Seq(
"com.ovoenergy" %% "algae-slf4j" % algaeVersion,
"com.ovoenergy" %% "algae-logback" % algaeVersion
)
Start by defining your log entries. It's recommended to use a coproduct, like a sealed trait
. This keeps your log entries in a single place, and the logic for what is logged is encapsulated in the log entries. A log entry consists of a LogLevel
and a String
message. We define an instance for LogEntry
for our coproduct, to define it as a log entry.
import algae._
import algae.logging._
object entries {
sealed trait Log {
def level: LogLevel
def message: String
}
case object ApplicationStarted extends Log {
def level = LogLevel.Info
def message = "Application started"
}
case object HelloWorld extends Log {
def level = LogLevel.Info
def message = "Hello, world"
}
implicit val logLogEntry: LogEntry[Log] =
LogEntry.from(_.level, _.message)
}
We do the same for the diagnostic context, by also defining a coproduct as a sealed abstract class
. We implement an instance of MdcEntry
for our coproduct, to define it as an entry for diagnostic contexts.
object mdc {
sealed abstract class Mdc(val key: String, val value: String)
final case class TraceToken(override val value: String) extends Mdc("traceToken", value)
implicit val mdcMdcEntry: MdcEntry[Mdc] =
MdcEntry.from(_.key, _.value)
}
To create an instance of Logging
, we'll need to have a MonadLog
instance. We'll use IO
and create an instance from a Ref
, using createMonadLog
. We need to choose a collection type, and Chain
is a good default choice here because it supports constant-time append.
The algae-slf4j
module defines a createLogging
function which creates a Logging
instance given a MonadLog
instance, and dispatches log messages using slf4j bindings. We can use log
to accumulate log entries, and later dispatch them with dispatchLogs
. Using logNow
we can immediately dispatch the given log entries as a message.
import algae.slf4j._
import cats.data.Chain
import cats.effect.IO
import entries._
import mdc._
for {
logs <- createMonadLog[IO, Chain[Log]]
logging <- createLogging[IO, Chain, Log, Mdc]("App", logs)
_ <- logging.logNow(ApplicationStarted)
_ <- logging.log(HelloWorld)
_ <- logging.log(HelloWorld)
_ <- logging.dispatchLogs
} yield ()
The example above immediately logs ApplicationStarted
and then logs a combined message containing HelloWorld
twice. After dispatching logs with dispatchLogs
, accumulated logs are cleared. It's worth noting that the log entries are stored in a separate Ref
, so even if part of your program fails, any logged messages are still available.