scullxbones/akka-persistence-mongo

ActorRef serialization: transportInformation should be set

gbrd opened this issue · 10 comments

gbrd commented

Please see akka/akka#24321 for details.
I will try to submit a PR

Hi @gbrd ,

Ok so this never worked? That's surprising. I guess the issue wouldn't show up if the sender was local?

From what I have been able to tell, we're currently delegating to the ActorRef serializer which apparently doesn't store all the information needed to be full fidelity? That's unexpected.

Can you provide more information? I'm just struggling with this issue, sorry. I'm also pretty concerned about adding a dependency on an undocumented (other than the big warning comment INTERNAL API) API.

gbrd commented

Hi @scullxbones
Yes it only shows up in very specific conditions (serializing a EmptyLocalActorRef, i.e. an actorRef that was created from a local lookup but not found)

To be honest the API sounds very strange to me too, but I don't master this code completely. I understood it's a kind of optimization/trick.

Steps to reproduce :

  • launch a single cluster node on port 2551 that deploy an actor using akka-cluster-sharding and akka.cluster.sharding.state-store-mode = persistence
  • it will persist a ShardRegionRegistred(actorRef on port 2551)
  • kill -9
  • run it again, it will deserialize the ShardRegionRegistred(actorRef on port 2551) as an EmptyLocalActorRef because it's local (same port) but lookup will fail
  • thus it will persist ShardRegionTerminated(actorRef without address/port) (here is the bug)
  • run it again but changing configuration file and set port = 2552. deserialization will show errors/inconsistence due to missing host/address

Main :

import akka.actor.{ActorRef, ActorSystem, Props}
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._


object AkkaQuickstart extends App {


  // Create the 'helloAkka' actor system
  val system: ActorSystem = ActorSystem("ClusterSystem")

  implicit val timeout: Timeout = 10.seconds
  implicit val ec = system.dispatcher


  val extractEntityId: ShardRegion.ExtractEntityId = {
    case EntityEnvelope(id, payload)  (id.toString, payload)
    case msg@Get(id)  (id.toString, msg)
  }

  val numberOfShards = 100

  val extractShardId: ShardRegion.ExtractShardId = {
    case EntityEnvelope(id, _)  (id % numberOfShards).toString
    case Get(id)  (id % numberOfShards).toString
    case ShardRegion.StartEntity(id) 
      // StartEntity is used by remembering entities feature
      (id.toLong % numberOfShards).toString
  }

  val counterRegion: ActorRef = ClusterSharding(system).start(
    typeName = "Counter",
    entityProps = Props[Counter],
    settings = ClusterShardingSettings(system),
    extractEntityId = extractEntityId,
    extractShardId = extractShardId)

  counterRegion ! EntityEnvelope(0,Increment)

  (counterRegion ? Get(0)).mapTo[Int].onComplete {
    case c => println("counter = " + c)
  }

Counter actor:

import akka.actor.ReceiveTimeout
import akka.persistence.PersistentActor
import akka.cluster.sharding.ShardRegion

import scala.concurrent.duration._

case object Increment
case object Decrement
final case class Get(counterId: Long)
final case class EntityEnvelope(id: Long, payload: Any)

case object Stop
final case class CounterChanged(delta: Int)


object  Counter {


}


class Counter extends PersistentActor {
  import ShardRegion.Passivate

  context.setReceiveTimeout(120.seconds)

  // self.path.name is the entity identifier (utf-8 URL-encoded)
  override def persistenceId: String = "Counter-" + self.path.name

  var count = 0

  def updateState(event: CounterChanged): Unit = {
    count += event.delta
  }

  override def receiveRecover: Receive = {
    case evt: CounterChanged  updateState(evt)
  }

  override def receiveCommand: Receive = {
    case Increment       persist(CounterChanged(+1))(updateState)
    case Decrement       persist(CounterChanged(-1))(updateState)
    case Get(_)          sender() ! count
    case ReceiveTimeout  context.parent ! Passivate(stopMessage = Stop)
    case Stop            context.stop(self)
  }

}

Config file:

akka {
  loglevel = "DEBUG"
  actor {
    provider = "cluster"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2551
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551"]

  }
}
akka.cluster.sharding.state-store-mode = persistence


//akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
//akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"

akka.persistence.journal.plugin = "akka-contrib-mongodb-persistence-journal"
akka.persistence.snapshot-store.plugin = "akka-contrib-mongodb-persistence-snapshot"
akka.contrib.persistence.mongodb.mongo.database = "test2-db"


//akka.persistence.journal.plugin = "cassandra-journal"
//akka.persistence.snapshot-store.plugin = "cassandra-snapshot-store"

build.sbt

name := "akka-quickstart-scala"

version := "1.0"

scalaVersion := "2.12.3"

lazy val akkaVersion = "2.5.9"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % akkaVersion,
  "com.typesafe.akka" %% "akka-testkit" % akkaVersion,
  "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
  "org.fusesource.leveldbjni"   % "leveldbjni-all"   % "1.8",


  "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.80",

  "com.github.scullxbones" %% "akka-persistence-mongo-casbah" % "2.0.4",
  "org.mongodb" %% "casbah" % "3.1.1"
)
gbrd commented

My patch does not work for casbah because of Future{} that make us leave the scope of currentTransportInformation Dynamic variable.

Do you think that we should duplicate the code in CasbahPersistenceJournaller and RxMongoJournaller ?

Can it be done in MongoDataModel? In Payload.apply maybe?

I feel a lot better about this in general with the new public API being added to Serialization:

def withTransportInformation(system: ExtendedActorSystem)(f: () => Unit): Unit

Or something to that effect.

Also, is there a way to add a test for this? Seems like it would be difficult but I thought i'd at least ask.

gbrd commented

Yes it would be much better in Payload.apply. I did not put here initially because I did not see we had access to actorSystem (which is needed) through implicit Serialization parameter.
Maybe we should wait the new API ?
About test: I did not look how existing tests looks like, it would probably need using remoting and multiple actorsystem (or stop/start the same one with different config).

Maybe we should wait the new API ?

I'd be okay with that - it's not clear though when that would be coming. Can implement against the old internal API I guess for now. Is this issue blocking you currently?

gbrd commented

Yes and no, I "overloaded" the class in my protect..
I can do a PR with the modification in Payload.apply if it helps.

gbrd commented

I modified the fix with a cleaner solution.
I added one test.

This is in the 2.0.5 release. Thanks again for the PR!