diff --git a/waku/v2/protocol/relay/broadcast.go b/waku/v2/protocol/relay/broadcast.go index 38fe27e8..a2911c5c 100644 --- a/waku/v2/protocol/relay/broadcast.go +++ b/waku/v2/protocol/relay/broadcast.go @@ -3,6 +3,7 @@ package relay import ( "context" "errors" + "fmt" "sync" "sync/atomic" @@ -53,7 +54,10 @@ func (s *chStore) getNewCh(topic string, chLen int) Subscription { func (s *chStore) broadcast(m *protocol.Envelope) { s.mu.RLock() defer s.mu.RUnlock() + fmt.Println(m) + fmt.Println(s.topicToChans, "msg") for _, ch := range s.topicToChans[m.PubsubTopic()] { + fmt.Println(m.PubsubTopic()) ch <- m } // send to all registered subscribers @@ -118,8 +122,10 @@ func (b *broadcaster) run(ctx context.Context) { select { case <-ctx.Done(): return - case msg := <-b.input: - b.chStore.broadcast(msg) + case msg, ok := <-b.input: + if ok { + b.chStore.broadcast(msg) + } } } } diff --git a/waku/v2/protocol/relay/broadcast_test.go b/waku/v2/protocol/relay/broadcast_test.go index d662c473..07b62efc 100644 --- a/waku/v2/protocol/relay/broadcast_test.go +++ b/waku/v2/protocol/relay/broadcast_test.go @@ -88,5 +88,5 @@ func TestBroadcastUnregisterSub(t *testing.T) { // msg on subForAll require.Equal(t, env, <-subForAll.Ch) b.Stop() // it automatically unregister/unsubscribe all - require.Equal(t, nil, <-specificSub.Ch) + require.Nil(t, <-specificSub.Ch) } diff --git a/waku/v2/protocol/relay/subscription.go b/waku/v2/protocol/relay/subscription.go index cac07a0c..f256703c 100644 --- a/waku/v2/protocol/relay/subscription.go +++ b/waku/v2/protocol/relay/subscription.go @@ -18,6 +18,9 @@ func NoopSubscription() Subscription { func ArraySubscription(msgs []*protocol.Envelope) Subscription { ch := make(chan *protocol.Envelope, len(msgs)) + for _, msg := range msgs { + ch <- msg + } close(ch) return Subscription{ Unsubscribe: func() {}, diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 93ba61a1..e6cb1483 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -44,10 +44,6 @@ type WakuRelay struct { wakuRelayTopics map[string]*pubsub.Topic relaySubs map[string]*pubsub.Subscription - // TODO: convert to concurrent maps - subscriptions map[string][]*Subscription - subscriptionsMutex sync.Mutex - ctx context.Context cancel context.CancelFunc wg sync.WaitGroup @@ -63,7 +59,6 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou w.timesource = timesource w.wakuRelayTopics = make(map[string]*pubsub.Topic) w.relaySubs = make(map[string]*pubsub.Subscription) - w.subscriptions = make(map[string][]*Subscription) w.bcaster = bcaster w.minPeersToPublish = minPeersToPublish w.wg = sync.WaitGroup{} @@ -251,16 +246,6 @@ func (w *WakuRelay) Stop() { w.cancel() w.wg.Wait() - - w.subscriptionsMutex.Lock() - defer w.subscriptionsMutex.Unlock() - - for _, topic := range w.Topics() { - for _, sub := range w.subscriptions[topic] { - sub.Unsubscribe() - } - } - w.subscriptions = nil } // EnoughPeersToPublish returns whether there are enough peers connected in the default waku pubsub topic @@ -305,10 +290,6 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error { } w.log.Info("unsubscribing from topic", zap.String("topic", sub.Topic())) - for _, sub := range w.subscriptions[topic] { - sub.Unsubscribe() - } - w.relaySubs[topic].Cancel() delete(w.relaySubs, topic) @@ -332,9 +313,6 @@ func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) < w.log.Error("getting message from subscription", zap.Error(err)) } sub.Cancel() - for _, subscription := range w.subscriptions[sub.Topic()] { - subscription.Unsubscribe() - } return } msgChannel <- msg diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index cc03eca3..dd5fd0d1 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -35,7 +35,6 @@ func TestWakuStoreProtocolQuery(t *testing.T) { Version: 0, Timestamp: utils.GetUnixEpoch(), } - require.NoError(t, err) // Simulate a message has been received via relay protocol sub := relay.ArraySubscription([]*protocol.Envelope{protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1)}) @@ -45,6 +44,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) s2.SetHost(host2) err = s2.Start(ctx, relay.NoopSubscription()) require.NoError(t, err) diff --git a/waku/v2/rest/runner.go b/waku/v2/rest/runner.go index e369f3a6..7681dd2d 100644 --- a/waku/v2/rest/runner.go +++ b/waku/v2/rest/runner.go @@ -32,7 +32,7 @@ func (r *runnerService) Start(ctx context.Context) { case <-ctx.Done(): return case envelope, ok := <-r.sub.Ch: - if !ok { + if ok { r.adder(envelope) } } diff --git a/waku/v2/rpc/private_test.go b/waku/v2/rpc/private_test.go index 5410d305..29d40e69 100644 --- a/waku/v2/rpc/private_test.go +++ b/waku/v2/rpc/private_test.go @@ -2,6 +2,7 @@ package rpc import ( "context" + "fmt" "testing" "time" @@ -95,7 +96,7 @@ func TestGetV1SymmetricMessages(t *testing.T) { // Subscribing topic to test getter _, err := d.node.Relay().SubscribeToTopic(context.TODO(), "test") require.NoError(t, err) - + fmt.Println("here") var reply SuccessReply err = d.PostV1SymmetricMessage( makeRequest(t), diff --git a/waku/v2/rpc/runner.go b/waku/v2/rpc/runner.go index d41340ce..a0b24c9a 100644 --- a/waku/v2/rpc/runner.go +++ b/waku/v2/rpc/runner.go @@ -21,7 +21,7 @@ func newRunnerService(broadcaster relay.Broadcaster, adder Adder) *runnerService } func (r *runnerService) Start() { - r.broadcaster.RegisterForAll(1024) + r.sub = r.broadcaster.RegisterForAll(1024) for envelope := range r.sub.Ch { r.adder(envelope) }