
Scala / Akka library for the Kraken exchange API.

Reactive Kraken

Scala client based on Akka to help connect reactively to the Kraken API.

Main features:

  1. Signing functionality
  2. REST API (public and private endpoints), based on Future
  3. Websocket API - based on Akka Stream

Signing functionality only

If you only need the logic to evaluate the signature, you can simply use

val signature = Signer.getSignature(path, nonce, postData, apiSecret)

See the Signer or an example usage below.

REST API usage

Use the HttpPublicApi and the HttpPrivateApi.

Each method fires an HTTP request and returns a Future[T], where T is the type of message returned by each endpoint. All such types are in the model package.

See below for example usages.

Websocket API usage

You can open a websocket connection using the following method in the WesocketPublicApi object, which is based on Akka Stream's Source and Sink.

def openConnection[Mat](source: Source[KrakenWsMessage, Mat],
                        sink: Sink[KrakenWsMessage, Future[Done]],
                        actorSystem: ActorSystem = ActorSystem("reactive-kraken"))

See example below for a full explanation.

Full example

This is an example that you can copy/paste to test the library.

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import io.ticofab.reactivekraken.http.v0.{KrakenPrivateApi, KrakenPublicApi}
import io.ticofab.reactivekraken.websocket.v01.WebsocketPublicApi
import io.ticofab.reactivekraken.websocket.v01.model.Subscription._
import io.ticofab.reactivekraken.websocket.v01.model._

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}

object TestApp extends App {

  // necessary implicits for Akka Http
  implicit val as = ActorSystem("wsTest")
  implicit val ec = as.dispatcher
  implicit val am = ActorMaterializer()

  // section about the HTTP API
  def awaitAndPrint[T](f: Future[T]): Unit = println(Await.result(f, 10.seconds))

  val publicApi = new KrakenPublicApi(as)
  awaitAndPrint(publicApi.getCurrentAssetPair("ETH", "EUR"))
  awaitAndPrint(publicApi.getCurrentTicker("ETH", "EUR"))
  awaitAndPrint(publicApi.getOHLC("ETH", "EUR"))
  awaitAndPrint(publicApi.getOrderBook("ETH", "EUR"))
  awaitAndPrint(publicApi.getRecentTrades("ETH", "EUR"))
  awaitAndPrint(publicApi.getRecentSpread("ETH", "EUR"))

  // your Kraken credentials
  val apiKey     = "YOUR_API_KEY"
  val apiSecret  = "YOUR_API_SECRET"
  val privateApi = new KrakenPrivateApi(apiKey, apiSecret, () => System.currentTimeMillis(), as)


  // this block returns a source which will publish any message sent to the publisher actorRef.
  // every message emitted by this source will be sent up to the websocket API. 
  // when you want to change something in your subscription, send the appropriate message to the publisher actor.
  val (publisher, source) = {
    val (actor, publisher) = Source
      .actorRef[KrakenWsMessage](100, OverflowStrategy.dropBuffer)
      .toMat(Sink.asPublisher(fanout = false))(Keep.both)
    val source = Source.fromPublisher(publisher)
    (actor, source)

  // this sink will receive all messages coming from the websocket API.
  // in this example, we simply print messages out.
  val sink                = Sink.foreach[KrakenWsMessage](println)

  // this method returns futures for
  //   . connection establishment
  //   . connection closure
  val (futureConnected, futureClosed) = WebsocketPublicApi.openConnection(source, sink, as)

  // once the connection has been established, we subscribe to a variety of topics
  futureConnected.onComplete {
    case Success(_) =>
      println(s"websocket connected!")
      publisher ! Ping(Some(89))
      publisher ! Subscribe(List(CurrencyPair("ETH", "EUR")), Subscription(TopicOHLC))
      publisher ! Subscribe(List(CurrencyPair("ETH", "EUR")), Subscription(TopicSpread))
      publisher ! Subscribe(List(CurrencyPair("ETH", "EUR")), Subscription(TopicTrade))
      publisher ! Subscribe(List(CurrencyPair("ETH", "EUR")), Subscription(TopicBook))
      publisher ! Subscribe(List(CurrencyPair("ETH", "EUR")), Subscription(TopicTicker))

      // after 10 seconds, unsubscribe from all topics except book
      as.scheduler.scheduleOnce(10.seconds) {
        publisher ! Unsubscribe(List(CurrencyPair("ETH", "EUR")), Some(Subscription(TopicOHLC)))
        publisher ! Unsubscribe(List(CurrencyPair("ETH", "EUR")), Some(Subscription(TopicSpread)))
        publisher ! Unsubscribe(List(CurrencyPair("ETH", "EUR")), Some(Subscription(TopicTrade)))
        publisher ! Unsubscribe(List(CurrencyPair("ETH", "EUR")), Some(Subscription(TopicTicker)))

        // after 5 additional seconds, close connection client side
        as.scheduler.scheduleOnce(5.seconds, publisher, akka.actor.Status.Success)

    // there was some error connecting
    case Failure(error) => println(s"error connecting: ${error.getMessage}")

  // once the connection has been closed, we shut down the entire actor system.
  futureClosed.foreach { _ =>
    println("connection closed, shutting down.")

Import via SBT

Available for Scala 2.11 and 2.12. In your build.sbt file,

resolvers += Resolver.jcenterRepo // you might not need this line
libraryDependencies += "io.ticofab" %% "reactive-kraken" % "1.0.0"



Contributions are most welcome. Please use the Issues section of this project and fire PRs away!


