scullxbones/akka-persistence-mongo

High CPU usage on both application and MongoDB with EventsByTag

Opened this issue · 1 comments

I've been using EventsByTagQuery to read events from the journal. I've created 9 long-lived streams (one for each tag) to listen to events and do some processing. When I start the application, before any event is processed, CPU usage on the application and MongoDB is really high.
When I send some load to the application and streams start processing events, everything works well, even if application becomes idle again CPU is still normal.

Also queries before the first processed event looks like this (and I can see a lot of different connections)

{
                       "host" : "9a929359bdb8:27017",
                       "desc" : "conn15",
                       "connectionId" : 15,
                       "client" : "172.19.0.7:54004",
                       "active" : true,
                       "currentOpTime" : "2020-08-04T09:14:59.659+0000",
                       "opid" : 267843,
                       "secs_running" : NumberLong(0),
                       "microsecs_running" : NumberLong(181),
                       "op" : "query",
                       "ns" : "top.akka_persistence_realtime",
                       "command" : {
                               "find" : "akka_persistence_realtime",
                               "skip" : 0,
                               "tailable" : true,
                               "awaitData" : true,
                               "oplogReplay" : false,
                               "noCursorTimeout" : false,
                               "allowPartialResults" : false,
                               "singleBatch" : false,
                               "returnKey" : false,
                               "showRecordId" : false,
                               "filter" : {
                                       "_tg" : "MY_TAG1"
                               },
                               "readConcern" : {
                                       "level" : "local"
                               },
                               "$readPreference" : {
                                       "mode" : "primary"
                               },
                               "$db" : "top"
                       },
                       "planSummary" : "COLLSCAN",
                       "numYields" : 0,
                       "locks" : {
                               "Global" : "r",
                               "Database" : "r",
                               "Collection" : "r"
                       },
                       "waitingForLock" : false,
                       "lockStats" : {
                               "Global" : {
                                       "acquireCount" : {
                                               "r" : NumberLong(2)
                                       }
                               },
                               "Database" : {
                                       "acquireCount" : {
                                               "r" : NumberLong(1)
                                       }
                               },
                               "Collection" : {
                                       "acquireCount" : {
                                               "r" : NumberLong(1)
                                       }
                               }
                       }
               }

After first processed event queries look like this (and all queries use like 2 different connections):

{
                       "host" : "9a752a76da5c:27017",
                       "desc" : "conn10",
                       "connectionId" : 10,
                       "client" : "172.22.0.7:46860",
                       "active" : true,
                       "currentOpTime" : "2020-08-04T08:38:47.191+0000",
                       "opid" : 2308306,
                       "secs_running" : NumberLong(0),
                       "microsecs_running" : NumberLong(970917),
                       "op" : "getmore",
                       "ns" : "top.akka_persistence_realtime",
                       "command" : {
                               "getMore" : NumberLong("64185858736"),
                               "collection" : "akka_persistence_realtime",
                               "batchSize" : 1,
                               "$db" : "top"
                       },
                       "originatingCommand" : {
                               "find" : "akka_persistence_realtime",
                               "skip" : 0,
                               "tailable" : true,
                               "awaitData" : true,
                               "oplogReplay" : false,
                               "noCursorTimeout" : false,
                               "allowPartialResults" : false,
                               "singleBatch" : false,
                               "returnKey" : false,
                               "showRecordId" : false,
                               "filter" : {
                                       "_tg" : "MY_TAG1"
                               },
                               "readConcern" : {
                                       "level" : "local"
                               },
                               "$readPreference" : {
                                       "mode" : "primary"
                               },
                               "$db" : "top
                       },
                       "planSummary" : "COLLSCAN",
                       "numYields" : 2,
                       "locks" : {
                       },
                       "waitingForLock" : false,
                       "lockStats" : {
                               "Global" : {
                                       "acquireCount" : {
                                               "r" : NumberLong(4)
                                       }
                               },
                               "Database" : {
                                       "acquireCount" : {
                                               "r" : NumberLong(2)
                                       }
                               },
                               "Collection" : {
                                       "acquireCount" : {
                                               "r" : NumberLong(2)
                                       }
                               }
                       }
               }

I switched to CurrentEventsByTagQuery (wrapped in RestartSource) and it solves the issue, but according to Akka documentation, EventsByTagQuery should be the preferred way for creating long-lived streams.

Hi @mradovic95 -

Is it possible you are seeing the starting issue described in this comment? A tailable cursor is still used to support live events, so an empty database could cause a spin.

At some point I need to swap for change streams instead, but that work is not done.

Can you also confirm which driver you are using?