ActorRef serialization: transportInformation should be set
gbrd opened this issue · 10 comments
Please see akka/akka#24321 for details.
I will try to submit a PR
Hi @gbrd ,
Ok so this never worked? That's surprising. I guess the issue wouldn't show up if the sender was local?
From what I have been able to tell, we're currently delegating to the ActorRef
serializer which apparently doesn't store all the information needed to be full fidelity? That's unexpected.
Can you provide more information? I'm just struggling with this issue, sorry. I'm also pretty concerned about adding a dependency on an undocumented (other than the big warning comment INTERNAL API
) API.
Hi @scullxbones
Yes it only shows up in very specific conditions (serializing a EmptyLocalActorRef, i.e. an actorRef that was created from a local lookup but not found)
To be honest the API sounds very strange to me too, but I don't master this code completely. I understood it's a kind of optimization/trick.
Steps to reproduce :
- launch a single cluster node on port 2551 that deploy an actor using akka-cluster-sharding and
akka.cluster.sharding.state-store-mode = persistence
- it will persist a ShardRegionRegistred(actorRef on port 2551)
- kill -9
- run it again, it will deserialize the ShardRegionRegistred(actorRef on port 2551) as an EmptyLocalActorRef because it's local (same port) but lookup will fail
- thus it will persist ShardRegionTerminated(actorRef without address/port) (here is the bug)
- run it again but changing configuration file and set port = 2552. deserialization will show errors/inconsistence due to missing host/address
Main :
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
object AkkaQuickstart extends App {
// Create the 'helloAkka' actor system
val system: ActorSystem = ActorSystem("ClusterSystem")
implicit val timeout: Timeout = 10.seconds
implicit val ec = system.dispatcher
val extractEntityId: ShardRegion.ExtractEntityId = {
case EntityEnvelope(id, payload) ⇒ (id.toString, payload)
case msg@Get(id) ⇒ (id.toString, msg)
}
val numberOfShards = 100
val extractShardId: ShardRegion.ExtractShardId = {
case EntityEnvelope(id, _) ⇒ (id % numberOfShards).toString
case Get(id) ⇒ (id % numberOfShards).toString
case ShardRegion.StartEntity(id) ⇒
// StartEntity is used by remembering entities feature
(id.toLong % numberOfShards).toString
}
val counterRegion: ActorRef = ClusterSharding(system).start(
typeName = "Counter",
entityProps = Props[Counter],
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
counterRegion ! EntityEnvelope(0,Increment)
(counterRegion ? Get(0)).mapTo[Int].onComplete {
case c => println("counter = " + c)
}
Counter actor:
import akka.actor.ReceiveTimeout
import akka.persistence.PersistentActor
import akka.cluster.sharding.ShardRegion
import scala.concurrent.duration._
case object Increment
case object Decrement
final case class Get(counterId: Long)
final case class EntityEnvelope(id: Long, payload: Any)
case object Stop
final case class CounterChanged(delta: Int)
object Counter {
}
class Counter extends PersistentActor {
import ShardRegion.Passivate
context.setReceiveTimeout(120.seconds)
// self.path.name is the entity identifier (utf-8 URL-encoded)
override def persistenceId: String = "Counter-" + self.path.name
var count = 0
def updateState(event: CounterChanged): Unit = {
count += event.delta
}
override def receiveRecover: Receive = {
case evt: CounterChanged ⇒ updateState(evt)
}
override def receiveCommand: Receive = {
case Increment ⇒ persist(CounterChanged(+1))(updateState)
case Decrement ⇒ persist(CounterChanged(-1))(updateState)
case Get(_) ⇒ sender() ! count
case ReceiveTimeout ⇒ context.parent ! Passivate(stopMessage = Stop)
case Stop ⇒ context.stop(self)
}
}
Config file:
akka {
loglevel = "DEBUG"
actor {
provider = "cluster"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551"]
}
}
akka.cluster.sharding.state-store-mode = persistence
//akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
//akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.journal.plugin = "akka-contrib-mongodb-persistence-journal"
akka.persistence.snapshot-store.plugin = "akka-contrib-mongodb-persistence-snapshot"
akka.contrib.persistence.mongodb.mongo.database = "test2-db"
//akka.persistence.journal.plugin = "cassandra-journal"
//akka.persistence.snapshot-store.plugin = "cassandra-snapshot-store"
build.sbt
name := "akka-quickstart-scala"
version := "1.0"
scalaVersion := "2.12.3"
lazy val akkaVersion = "2.5.9"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
"org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8",
"com.typesafe.akka" %% "akka-persistence-cassandra" % "0.80",
"com.github.scullxbones" %% "akka-persistence-mongo-casbah" % "2.0.4",
"org.mongodb" %% "casbah" % "3.1.1"
)
My patch does not work for casbah because of Future{}
that make us leave the scope of currentTransportInformation
Dynamic variable.
Do you think that we should duplicate the code in CasbahPersistenceJournaller and RxMongoJournaller ?
Can it be done in MongoDataModel
? In Payload.apply
maybe?
I feel a lot better about this in general with the new public API being added to Serialization
:
def withTransportInformation(system: ExtendedActorSystem)(f: () => Unit): Unit
Or something to that effect.
Also, is there a way to add a test for this? Seems like it would be difficult but I thought i'd at least ask.
Yes it would be much better in Payload.apply
. I did not put here initially because I did not see we had access to actorSystem (which is needed) through implicit Serialization
parameter.
Maybe we should wait the new API ?
About test: I did not look how existing tests looks like, it would probably need using remoting and multiple actorsystem (or stop/start the same one with different config).
Maybe we should wait the new API ?
I'd be okay with that - it's not clear though when that would be coming. Can implement against the old internal API I guess for now. Is this issue blocking you currently?
Yes and no, I "overloaded" the class in my protect..
I can do a PR with the modification in Payload.apply
if it helps.
I modified the fix with a cleaner solution.
I added one test.
This is in the 2.0.5
release. Thanks again for the PR!