DeleteMessage Sink shuts down after one command
Bathtor opened this issue · 8 comments
This is more a request for help, than a bug (I think...you tell me).
I'm trying to also delete the original command message when it has been processed successfully and a response has been generated.
To that end I adapted the code from the core example in the following way:
class SayAsCmd(requests: RequestHelper) extends Actor with ActorLogging {
import SayAsCmd._
implicit val askTimeout = Timeout(5.seconds)
implicit val system: ActorSystem = context.system;
import requests.mat;
import context.dispatcher;
private val messageGraph = Sink.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._;
val unzip = builder.add(Unzip[DeleteMessage[NotUsed], List[CreateMessage[NotUsed]]]);
val msgSink = requests.sinkIgnore[RawMessage, NotUsed];
val deleteSink = requests.sinkIgnore[NotUsed, NotUsed];
unzip.out0.log("DELETE MSG", dm => dm.messageId) ~> deleteSink;
unzip.out1.mapConcat(identity).log("REPLY MSG", cm => cm.bodyForLogging) ~> msgSink;
SinkShape.of(unzip.in)
});
private val msgQueue = Source.queue[(ActorRef, SayAsMessage, ChannelId, MessageId)](32, OverflowStrategy.backpressure)
.mapAsyncUnordered(parallelism = 1000)((t: (ActorRef, SayAsMessage, ChannelId, MessageId)) =>
(t._1 ? t._2).mapTo[List[CreateMessageData]].map(l => (DeleteMessage(t._3, t._4) -> l.map(CreateMessage(t._3, _)))))
.to(messageGraph).run();
private val serverHandlers = mutable.Map.empty[GuildId, ActorRef];
private val fallbackHandler = context.actorOf(Props(new SayAsActor(None)));
override def receive: Receive = {
case InitAck => {
log.debug("Got Init");
sendAck(sender);
}
case ParsedCmd(msg, cmd @ CharacterArgs.Register(name, avatar), _, _) => {
log.debug(s"Received register command: $cmd");
routeToHandler(msg, RegisterCharacter(name, avatar));
}
case ParsedCmd(msg, cmd @ CharacterArgs.Unregister(name), _, _) => {
log.debug(s"Received unregister command: $cmd");
routeToHandler(msg, UnregisterCharacter(name));
}
case ParsedCmd(msg, cmd @ CharacterArgs.ListAll, _, _) => {
log.debug(s"Received list command: $cmd");
routeToHandler(msg, ListCharacters);
}
case ParsedCmd(msg, cmd @ SayArgs.Say(author, content), _, _) => {
log.debug(s"Received sayas command: $cmd");
routeToHandler(msg, SayAs(author, content));
}
case x => {
log.warning(s"Received unexpected message: $x");
sendAck(sender);
}
}
def routeToHandler(orig: Message, msg: SayAsMessage): Unit = {
// TODO routing
val msgSender = sender();
msgQueue.offer((fallbackHandler, msg, orig.channelId, orig.id)).onComplete(_ => sendAck(msgSender))
}
def sendAck(sender: ActorRef): Unit = sender ! SayAsCmd.Ack;
}
This approach works for the first command after I start the bot, and afterwards the delete path seems to shut down and only the reply path works. (I added the log
elements to the flow graph and the last message from the delete flow is [DELETE MSG] Downstream finished.
)
a) Is this intended behaviour?
b) How do I work around it?
c) Is there a better way to sink both CreateMessage
and DeleteMessage
events into the requests
queue, without splitting the stream like this?
Can you show me you command object? Most likely the issues lie in there somewhere. I've been dealing with this myself recently for a retry flow. From my testing, the cause is normally a Source.single
together with a flatMapConcat somewhere. Why it happens I can't say though. Most likely some Akka. I've been able to work around it for my stuff by avoiding Source.single together with flatMapConcat.
As for if there is a better way to do this, have you looked at the alsoTo
method on flow? You could also just be lazy and do one operation after the other (take advantage of the request context and the withContext
method on Request
to simplify this).
Going to leave this open until I either find a solution to this problem, and know the root cause for why it happens and what can be done to avoid it.
I updated the original post with the full actor. The relevant command factories are:
def charCmdFactory(sayAsActor: ActorRef): ParsedCmdFactory[F, SayAsCmd.CharacterArgs, NotUsed] =
ParsedCmdFactory[F, SayAsCmd.CharacterArgs, NotUsed](
refiner = CmdInfo[F](
prefix = Categories.adminCommands,
aliases = Seq("char"),
filters = Seq(ByUser(UserId(Main.adminId)))),
sink = _ => Sink.actorRefWithAck(
ref = sayAsActor,
onInitMessage = SayAsCmd.InitAck,
ackMessage = SayAsCmd.Ack,
onCompleteMessage = PoisonPill),
description = Some(CmdDescription(
name = "Character Registry",
description = "Add/Remove/List Characters",
usage = "register <name> [<avatar url>]|unregister <name>|list")));
def sayasCmdFactory(sayAsActor: ActorRef): ParsedCmdFactory[F, SayAsCmd.SayArgs, NotUsed] =
ParsedCmdFactory[F, SayAsCmd.SayArgs, NotUsed](
refiner = CmdInfo[F](
prefix = Categories.generalCommands,
aliases = Seq("sayas")),
sink = _ => Sink.actorRefWithAck(
ref = sayAsActor,
onInitMessage = SayAsCmd.InitAck,
ackMessage = SayAsCmd.Ack,
onCompleteMessage = PoisonPill),
description = Some(CmdDescription(
name = "SayAs",
description = "Speak as a registered character",
usage = "<name> <text>")));
I'm not using Source.single anywhere, I think, unless there is somehow one hidden in mapAsyncUnordered
?
I was originally planning to use alsoTo
with filter
, but then I found Unzip
which fit my use-case much better. Doing them lazily is not so easy, I think. I only want the delete to occur if the reply is happening as well (so that a semantically invalid command will not get deleted), and I also need to keep the required state around to create the correct delete object (i.e. messageId
and channelId
).
I also find it curious, that it's always the delete flow that fails, even if I switch the positions in the Unzip
. I mean the two branches are essentially identical except for the mapConcat
. The sinks have different serialisation types, but apart from that they are the same implementation, aren't they? Why would one misbehave and the other work fine?
Edit: I tried unzip.out1.mapConcat(dm => List(dm)).log("DELETE MSG", dm => dm.messageId) ~> deleteSink;
just to make sure that the mapConcat
wasn't the difference, and indeed the behaviour persists.
My only guess would be that mapConcat has some of the same logic to cause the same error. As for what I said above, when Is aid being lazy I meant more not worry about doing things properly. As for keeping the required state around, that's what withContext is for.
As for the types being different. Yep, that's the only difference. On master it's just the same flow/sink casted to different types.
Is there a common supertype I could use for a sink that would make it possible to use a single flow for both CreateMessage
and DeleteMessage
?
Request[_, _]
, BaseRESTRequest[_, _, _]
and RESTRequest[_, _, _, _]
would all fit.
Ok, I made a single flow now for both message types like this:
private val msgQueue = Source.queue[(ActorRef, SayAsMessage, ChannelId, MessageId)](32, OverflowStrategy.backpressure)
.mapAsyncUnordered(parallelism = 1000)((t: (ActorRef, SayAsMessage, ChannelId, MessageId)) =>
(t._1 ? t._2).mapTo[List[CreateMessageData]].map(l => (DeleteMessage(t._3, t._4) :: l.map(CreateMessage(t._3, _)))))
.mapConcat(identity)
.to(requests.sinkIgnore[Any, NotUsed]).run();
But once again only the first exchange works, and then the flow simply stops executing any requests (i.e. neither DeleteMessage
nor CreateMessage
are executed after the first time, when both are successfully executed). I don't know why, but DeleteMessage
somehow breaks the sink, without throwing any errors that I can see (I'll attach my logging output, maybe you see something I've overlooked). I don't see this behaviour if the flow consists only of CreateMessage
requests. In that case it keeps working correctly forever.
output.log
Edit: I'm using a local build of master, btw, not 0.10.0.
Retry requests fixes on master. Not sure how much it was ever related to this bug, but that should be the last "Stream stopped for some reason" kind of bug.