diff --git a/examples/noise/go.mod b/examples/noise/go.mod index d207515e..f9bc1c95 100644 --- a/examples/noise/go.mod +++ b/examples/noise/go.mod @@ -109,10 +109,10 @@ require ( github.com/tklauser/numcpus v0.2.2 // indirect github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 // indirect github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7 // indirect - github.com/waku-org/go-zerokit-rln v0.1.14-0.20230905214645-ca686a02e816 // indirect - github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230905213302-1d6d18a03e7c // indirect - github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230905183322-05f4cda61468 // indirect - github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230905182930-2b11e72ef866 // indirect + github.com/waku-org/go-zerokit-rln v0.1.14-0.20230914234036-e0ebce7c29eb // indirect + github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230914230757-87caaeb9e6c9 // indirect + github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230914230828-f14923ed4dac // indirect + github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230914230901-6057b9728a32 // indirect github.com/wk8/go-ordered-map v1.0.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/dig v1.17.0 // indirect diff --git a/examples/noise/go.sum b/examples/noise/go.sum index 0a70eb46..f6a16946 100644 --- a/examples/noise/go.sum +++ b/examples/noise/go.sum @@ -646,14 +646,14 @@ github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7 h1:0 github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230628220917-7b4e5ae4c0e7/go.mod h1:pFvOZ9YTFsW0o5zJW7a0B5tr1owAijRWJctXJ2toL04= github.com/waku-org/go-noise v0.0.4 h1:ZfQDcCw8pazm89EBl5SXY7GGAnzDQb9AHFXlw3Ktbvk= github.com/waku-org/go-noise v0.0.4/go.mod h1:+PWRfs2eSOVwKrPcQlfhwDngSh3faL/1QoxvoqggEKc= -github.com/waku-org/go-zerokit-rln v0.1.14-0.20230905214645-ca686a02e816 h1:M5skPFmapY5i5a9jSiGWft9PZMiQr2nCi8uzJc2IfBI= -github.com/waku-org/go-zerokit-rln v0.1.14-0.20230905214645-ca686a02e816/go.mod h1:zc3FBSLP6vy2sOjAnqIju3yKLRq1WkcxsS1Lh9w0CuA= -github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230905213302-1d6d18a03e7c h1:aDn17iEMrdXeQ6dp+Cv3ywJYStkomkvKWv8I00iy79c= -github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230905213302-1d6d18a03e7c/go.mod h1:KYykqtdApHVYZ3G0spwMnoxc5jH5eI3jyO9SwsSfi48= -github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230905183322-05f4cda61468 h1:yNRDUyWJu/wHEPLps5D/Zce24mu/5ax2u1pXsMwRPbg= -github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230905183322-05f4cda61468/go.mod h1:7cSGUoGVIla1IpnChrLbkVjkYgdOcr7rcifEfh4ReR4= -github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230905182930-2b11e72ef866 h1:dURzhyGtPrpmBJcnY4hpY83dW81cZimkZ8U+S89ANd0= -github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230905182930-2b11e72ef866/go.mod h1:+LeEYoW5/uBUTVjtBGLEVCUe9mOYAlu5ZPkIxLOSr5Y= +github.com/waku-org/go-zerokit-rln v0.1.14-0.20230914234036-e0ebce7c29eb h1:jeoCt8oJCEW6qvCFZ+re2WQn/pdIt5Vd63P24bfC1QM= +github.com/waku-org/go-zerokit-rln v0.1.14-0.20230914234036-e0ebce7c29eb/go.mod h1:9QlOve5GEl53rWZQlOahA4kh+sugwYWIQKuexiS2WxI= +github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230914230757-87caaeb9e6c9 h1:bYnGf+NdSDOoxpGmGSiJsaiiDCPcZPMFYwcxMfONUE0= +github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230914230757-87caaeb9e6c9/go.mod h1:KYykqtdApHVYZ3G0spwMnoxc5jH5eI3jyO9SwsSfi48= +github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230914230828-f14923ed4dac h1:50Us7F8/2V0qIpEenexZ7gNRHIPakyYG6GSlpb+MqkY= +github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230914230828-f14923ed4dac/go.mod h1:7cSGUoGVIla1IpnChrLbkVjkYgdOcr7rcifEfh4ReR4= +github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230914230901-6057b9728a32 h1:JAQUiPcH26rgkNAPEhLPtRxvPaYiPoLbtbZnmz5ALeI= +github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230914230901-6057b9728a32/go.mod h1:+LeEYoW5/uBUTVjtBGLEVCUe9mOYAlu5ZPkIxLOSr5Y= github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/wk8/go-ordered-map v1.0.0 h1:BV7z+2PaK8LTSd/mWgY12HyMAo5CEgkHqbkVq2thqr8= github.com/wk8/go-ordered-map v1.0.0/go.mod h1:9ZIbRunKbuvfPKyBP1SIKLcXNlv74YCOZ3t3VTS6gRk= diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 672732ac..6ca51c84 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -54,7 +54,7 @@ type WakuRelay struct { defaultTopicValidators []validatorFn // TODO: convert to concurrent maps - topicsMutex sync.Mutex + topicsMutex sync.RWMutex wakuRelayTopics map[string]*pubsub.Topic relaySubs map[string]*pubsub.Subscription @@ -240,8 +240,8 @@ func (w *WakuRelay) PubSub() *pubsub.PubSub { // Topics returns a list of all the pubsub topics currently subscribed to func (w *WakuRelay) Topics() []string { - defer w.topicsMutex.Unlock() - w.topicsMutex.Lock() + defer w.topicsMutex.RUnlock() + w.topicsMutex.RLock() var result []string for topic := range w.relaySubs { @@ -252,8 +252,8 @@ func (w *WakuRelay) Topics() []string { // IsSubscribed indicates whether the node is subscribed to a pubsub topic or not func (w *WakuRelay) IsSubscribed(topic string) bool { - defer w.topicsMutex.Unlock() - w.topicsMutex.Lock() + w.topicsMutex.RLock() + defer w.topicsMutex.RUnlock() _, ok := w.relaySubs[topic] return ok } @@ -408,6 +408,9 @@ func (w *WakuRelay) Subscribe(ctx context.Context) (*Subscription, error) { // Unsubscribe closes a subscription to a pubsub topic func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error { + w.topicsMutex.Lock() + defer w.topicsMutex.Unlock() + sub, ok := w.relaySubs[topic] if !ok { return fmt.Errorf("not subscribed to topic") diff --git a/waku/v2/protocol/relay/waku_relay_test.go b/waku/v2/protocol/relay/waku_relay_test.go index c7b335d8..38b5d42f 100644 --- a/waku/v2/protocol/relay/waku_relay_test.go +++ b/waku/v2/protocol/relay/waku_relay_test.go @@ -36,21 +36,24 @@ func TestWakuRelay(t *testing.T) { defer sub.Cancel() require.NoError(t, err) + require.Equal(t, relay.IsSubscribed(testTopic), true) + topics := relay.Topics() require.Equal(t, 1, len(topics)) require.Equal(t, testTopic, topics[0]) ctx, cancel := context.WithCancel(context.Background()) - + bytesToSend := []byte{1} go func() { defer cancel() _, err := sub.Next(ctx) require.NoError(t, err) + }() msg := &pb.WakuMessage{ - Payload: []byte{1}, + Payload: bytesToSend, Version: 0, ContentTopic: "test", Timestamp: 0, @@ -58,6 +61,11 @@ func TestWakuRelay(t *testing.T) { _, err = relay.PublishToTopic(context.Background(), msg, testTopic) require.NoError(t, err) + time.Sleep(2 * time.Second) + + err = relay.Unsubscribe(ctx, testTopic) + require.NoError(t, err) + <-ctx.Done() }