nbd-wtf/go-nostr

problem: every second event is being dropped

Closed this issue ยท 8 comments

When subscribing to a list of IDs, every second event on the list is dropped.

	ids := []string{"d6bee3593951e1d81283250f6bb4241bcf6da036b3dae1ad7aff9107bbeee2f0", "363b07a681e38063e7572e3fefe90936e0d2f743a4e9f4d582e63c33ce276b9e", "4873e66c79f31ed992d43acf0d33d52628a274379cf8e976b42e83293cfc57e8", .........}
	_, events, unsub := pool.Sub(nostr.Filters{
		{
			IDs: ids,
		},
	})

The events are being sent from the relay and are received properly by the websocket connection, but are being erroneously dropped somewhere within this library.

I'm digging into this now and will post updates as I find things.

Working on branch here to try and fix it if anyone is interested: https://github.com/stanstacks/go-nostr/tree/every2ndIDbeingDropped

Nice find. I have no idea of why this could be happening.

This is incredibly difficult to nail down.

It's only when subscription IDs are involved that this happens.

If I hard code the eventStream channel (i.e. Events chan EventMessage in the RelayPool struct) and use that instead of the map, then everything works properly. It has nothing to do with the map library though, I tried using the standard library's map and it doesn't change anything. It's got to be something to do with the subscription ID somehow.

If I use a goroutine to listen on the eventStream channel in Sub it prints the missing events (and only the missing events).

I can resolve it by simply sending the event twice here

eventStream <- EventMessage{Relay: relay.URL, Event: evt}

I have no idea what's going on, and it's a nasty way of solving the problem, but it does solve it.

Moving uniqueEvents to outside Unique function at relaypool.go fixes it but I'm not sure that's a good design solution.

+var uniqueEvents = make(chan Event)
+
 func Unique(all chan EventMessage) chan Event {
-       uniqueEvents := make(chan Event)
        emittedAlready := s.MapOf[string, struct{}]{}

I can also open a pull request with this approach.

@stanstacks I believe the problem happens here

master...stanstacks:go-nostr:every2ndIDbeingDropped#diff-e0476d823596031d7eb9d711f89395ac525a465fe338f6d807fa1e9def447117R39-R54

I added some debug prints

diff --git a/example/issue_23_every_second_event_dropped.go b/example/issue_23_every_second_event_dropped.go
index 3aded47..df7b855 100644
--- a/example/issue_23_every_second_event_dropped.go
+++ b/example/issue_23_every_second_event_dropped.go
@@ -12,10 +12,10 @@ func main() {
        ids := []string{
                "d6bee3593951e1d81283250f6bb4241bcf6da036b3dae1ad7aff9107bbeee2f0",
                "363b07a681e38063e7572e3fefe90936e0d2f743a4e9f4d582e63c33ce276b9e",
-               // "4873e66c79f31ed992d43acf0d33d52628a274379cf8e976b42e83293cfc57e8",
-               // "d04f33d5cb7c39f8cbc851b10380b11c958790b67b856628e88769aa00980c8e",
-               // "19c30d25ec25e674e839c2762d814095bfdfdacb68a2db1e7e2b50ef695bc5a6",
-               // "15458787661b4d4d15c2ed0fa685d4b1e47834ba10bd2dc9f4bb198090153a30",
+               "4873e66c79f31ed992d43acf0d33d52628a274379cf8e976b42e83293cfc57e8",
+               "d04f33d5cb7c39f8cbc851b10380b11c958790b67b856628e88769aa00980c8e",
+               "19c30d25ec25e674e839c2762d814095bfdfdacb68a2db1e7e2b50ef695bc5a6",
+               "15458787661b4d4d15c2ed0fa685d4b1e47834ba10bd2dc9f4bb198090153a30",
                // "f069b38d1561acbfc5cdf95798edb0985dd8a29786f60f5f23b72f0d17efb8ad",
                // "6842710df729feee231effe99e40897436df1ff26482df3f0378400ad75da5c2",
                // "95bc76af0841ec18906c87143146a05a991c3806e64c3a3ab5a83c6f19a7a005",
@@ -47,14 +47,19 @@ func main() {
        })
        allevents := make(map[string]nostr.Event)
        gotResult := false
+       count := 0
        var tries int64 = 0
 L:
        for tries < 1 {
+               count++
+               fmt.Printf("\ncalls to nostr.Unique: %d", count)
                select {
                case e := <-nostr.Unique(events):
                        allevents[e.ID] = e
                        gotResult = true
+                       time.Sleep(time.Second * 10)
                case <-time.After(time.Second * 5):
+                       fmt.Printf("\n\n!!timeout!!\n\n")
                        if gotResult {
                                break L
                        }
@@ -65,7 +70,7 @@ L:
 
        for _, id := range ids {
                if _, ok := allevents[id]; !ok {
-                       fmt.Println(id)
+                       fmt.Printf("\n\nNo response: \n\tid: %s, \n\tevent: %s", id, allevents[id])
                }
        }
        //_, events, unsub = pool.Sub(nostr.Filters{
diff --git a/relaypool.go b/relaypool.go
index 5455d54..655d790 100644
--- a/relaypool.go
+++ b/relaypool.go
@@ -124,9 +124,9 @@ func (r *RelayPool) Remove(url string) {
        }
 }
 
-//Sub subscribes to events matching the passed filters and returns the subscription ID,
-//a channel which you should pass into Unique to get unique events, and a function which
-//you should call to clean up and close your subscription so that the relay doesn't block you.
+// Sub subscribes to events matching the passed filters and returns the subscription ID,
+// a channel which you should pass into Unique to get unique events, and a function which
+// you should call to clean up and close your subscription so that the relay doesn't block you.
 func (r *RelayPool) Sub(filters Filters) (subID string, events chan EventMessage, unsubscribe func()) {
        random := make([]byte, 7)
        rand.Read(random)
@@ -136,7 +136,6 @@ func (r *RelayPool) Sub(filters Filters) (subID string, events chan EventMessage
        eventStream := make(chan EventMessage)
        r.eventStreams.Store(id, eventStream)
        unsub := make(chan struct{})
-
        r.Relays.Range(func(_ string, relay *Relay) bool {
                sub := relay.prepareSubscription(id)
                sub.Sub(filters)
@@ -175,7 +174,9 @@ func Unique(all chan EventMessage) chan Event {
        go func() {
                for eventMessage := range all {
                        if _, ok := emittedAlready.LoadOrStore(eventMessage.Event.ID, struct{}{}); !ok {
+                               fmt.Printf("\nrelaypool.go:177: uniqueEvents <- eventMessage.Event")
                                uniqueEvents <- eventMessage.Event
+                               fmt.Printf("\nrelaypool.go:179: Success!")
                        }
                }
        }()

the output, as expected, follows the issue description (I added the sleep to make it more pronounced)

% go run example/issue_23_every_second_event_dropped.go

calls to nostr.Unique: 1
relaypool.go:177: uniqueEvents <- eventMessage.Event
relaypool.go:179: Success!
relaypool.go:177: uniqueEvents <- eventMessage.Event
calls to nostr.Unique: 2
relaypool.go:177: uniqueEvents <- eventMessage.Event
relaypool.go:179: Success!
relaypool.go:177: uniqueEvents <- eventMessage.Event
calls to nostr.Unique: 3
relaypool.go:177: uniqueEvents <- eventMessage.Event
relaypool.go:179: Success!
relaypool.go:177: uniqueEvents <- eventMessage.Event
calls to nostr.Unique: 4

!!timeout!!



No response: 
	id: 363b07a681e38063e7572e3fefe90936e0d2f743a4e9f4d582e63c33ce276b9e, 
	event: {  0001-01-01 00:00:00 +0000 UTC %!s(int=0) []   map[]}

No response: 
	id: d04f33d5cb7c39f8cbc851b10380b11c958790b67b856628e88769aa00980c8e, 
	event: {  0001-01-01 00:00:00 +0000 UTC %!s(int=0) []   map[]}

No response: 
	id: 15458787661b4d4d15c2ed0fa685d4b1e47834ba10bd2dc9f4bb198090153a30, 
	event: {  0001-01-01 00:00:00 +0000 UTC %!s(int=0) []   map[]}% 

From the output you can see that every time nostr.Unique is called a goroutine starts attempting to read all the events in the chan passed to it. It starts iterating through the chan and before it can finish another call to nostr.Unique is made and the previous chanel with the dangling event is never read. If you change the example to

        gotResult := false
+       unique := nostr.Unique(events)
+       count := 0
        var tries int64 = 0
 L:
        for tries < 1 {
+               count++
+               fmt.Printf("\ncalls to nostr.Unique: %d", count)
                select {
-               case e := <-nostr.Unique(events):
+               case e := <-unique:
                        allevents[e.ID] = e
                        gotResult = true
+                       time.Sleep(time.Second * 10)
                case <-time.After(time.Second * 5):
+                       fmt.Printf("\n\n!!timeout!!\n\n")
                        if gotResult {
                                break L
                        }
@@ -65,7 +71,7 @@ L:
 
        for _, id := range ids {
                if _, ok := allevents[id]; !ok {
-                       fmt.Println(id)
+                       fmt.Printf("\n\nNo response: \n\tid: %s, \n\tevent: %s", id, allevents[id])
                }
        }
...

all events make it home

% go run example/issue_23_every_second_event_dropped.go

calls to nostr.Unique: 1
relaypool.go:177: uniqueEvents <- eventMessage.Event
relaypool.go:179: Success!
relaypool.go:177: uniqueEvents <- eventMessage.Event
calls to nostr.Unique: 2
relaypool.go:179: Success!
relaypool.go:177: uniqueEvents <- eventMessage.Event
calls to nostr.Unique: 3
relaypool.go:179: Success!
relaypool.go:177: uniqueEvents <- eventMessage.Event
calls to nostr.Unique: 4
relaypool.go:179: Success!
relaypool.go:177: uniqueEvents <- eventMessage.Event
calls to nostr.Unique: 5
relaypool.go:179: Success!
relaypool.go:177: uniqueEvents <- eventMessage.Event
calls to nostr.Unique: 6
relaypool.go:179: Success!
calls to nostr.Unique: 7

!!timeout!!

@fiatjaf (and folks that want to participate) out of curiosity is this still an issue? If so what is the desired behavior? I would be happy to put together a pr or continue the discussion if the current behavior is not what was intended. If this is the expected behavior sounds like we can close the issue.

Thank you all for what you do!

It is not an issue anymore since I nuked the entire relaypool implementation from this library, but you seen to have gotten it there. It is good that you did it, since the same approach will still have to be done on higher-level libraries and hopefully we won't commit the same mistake.

great! thanks for the quick response @fiatjaf. Do you think we should close this in that case? Also will there be a new relaypool implemented somewhere else and if so where will/does it live? And I wanted to ask if you need a hand with anything. This may not be the best place to ask but if you need eyes on an issue/pr or hands on a feature or thoughts on a nip just say the word and I will take a look. Thanks again mate!

All the best

I'll close this, yes, thank you.

I do need a hand with this, in case you're interested: https://gist.github.com/fiatjaf/ea7d21e81359e1eb8abcb8805306adaa