/reactive-memcached

Akka-Stream based Memached Client for Scala

Primary LanguageScalaMIT LicenseMIT

reactive-mecached

CircleCI Maven Central Scaladoc Codacy Badge License: MIT

Akka-Stream based Memcached Client for Scala

Concept

  • Transport is akka-stream 2.5.x.
  • Response parser is fastparse.
  • monix.eval.Task support.

Support Protocol

https://github.com/memcached/memcached/blob/master/doc/protocol.txt

  • Supported commands
    • "set", "add", "replace", "cas", "delete", "get", "gets", "gat", "gats", "touch", "version"
  • Not supported commands
    • "append", "prepend", "slabs reassign", "slabs automove", "lru", "lru_crawler", "watch", "stats", "STAT items"

Installation

Add the following to your sbt build (Scala 2.11.x, 2.12.x):

Release Version

resolvers += "Sonatype OSS Release Repository" at "https://oss.sonatype.org/content/repositories/releases/"

libraryDependencies += "com.github.j5ik2o" %% "reactive-memcached-core" % "1.0.6"

Snapshot Version

resolvers += "Sonatype OSS Snapshot Repository" at "https://oss.sonatype.org/content/repositories/snapshots/"

libraryDependencies += "com.github.j5ik2o" %% "reactive-memcached-core" % "1.0.7-SNAPSHOT"

Usage

Non connection pooling

import monix.execution.Scheduler.Implicits.global

implicit val system = ActorSystem()

val peerConfig = PeerConfig(remoteAddress = new InetSocketAddress("127.0.0.1", 6379))
val connection = MemcachedConnection(peerConfig)
val client = MemcachedClient()

val result = (for{
  _ <- client.set("foo", "bar")
  r <- client.get("foo")
} yield r).run(connection).runAsync

println(result) // bar

Connection pooling

import monix.execution.Scheduler.Implicits.global

implicit val system = ActorSystem()

val peerConfig = PeerConfig(remoteAddress = new InetSocketAddress("127.0.0.1", 6379))
val pool = MemcachedConnectionPool.ofSingleRoundRobin(sizePerPeer = 5, peerConfig, RedisConnection(_)) // powered by RoundRobinPool
val connection = MemcachedConnection(connectionConfig)
val client = MemcachedClient()

// Fucntion style
val result1 = pool.withConnectionF{ con =>
  (for{
    _ <- client.set("foo", "bar")
    r <- client.get("foo")
  } yield r).run(con) 
}.runAsync

println(result1) // bar

// Monadic style
val result2 = (for {
  _ <- ConnectionAutoClose(pool)(client.set("foo", "bar").run)
  r <- ConnectionAutoClose(pool)(client.get("foo").run)
} yield r).run().runAsync

println(result2) // bar

if you want to use other pooling implementation, please select from the following modules.

  • reactive-memcached-pool-commons (commons-pool2)
  • reactive-memcached-pool-scala (scala-pool)
  • reactive-memcached-pool-fop (fast-object-pool)
  • reactive-memcached-pool-stormpot (stormpot)

Hash ring connection

import monix.execution.Scheduler.Implicits.global

implicit val system = ActorSystem()

val peerConfigs = Seq(
  PeerConfig(remoteAddress = new InetSocketAddress("127.0.0.1", 6380)),
  PeerConfig(remoteAddress = new InetSocketAddress("127.0.0.1", 6381)),
  PeerConfig(remoteAddress = new InetSocketAddress("127.0.0.1", 6382))
)

val connection = HashRingConnection(peerConfigs)

val client = MemcachedClient()

val result = (for{
  _ <- client.set("foo", "bar") // write to master
  r <- client.get("foo")        // read from any slave
} yield r).run(connection).runAsync

println(result) // bar

License

MIT License / Copyright (c) 2018 Junichi Kato