diff --git a/examples/chat2/chat.go b/examples/chat2/chat.go index c55e26e4..9acd3944 100644 --- a/examples/chat2/chat.go +++ b/examples/chat2/chat.go @@ -344,7 +344,7 @@ func (c *Chat) publish(ctx context.Context, message string) error { } if c.options.LightPush.Enable { - var lightOpt lightpush.LightPushOption + var lightOpt lightpush.Option var peerID peer.ID peerID, err = options.LightPush.NodePeerID() if err != nil { @@ -426,7 +426,7 @@ func (c *Chat) retrieveHistory(connectionWg *sync.WaitGroup) { } response, err := c.node.Store().Query(tCtx, q, - store.WithAutomaticRequestId(), + store.WithAutomaticRequestID(), storeOpt, store.WithPaging(false, 100)) diff --git a/mobile/api_lightpush.go b/mobile/api_lightpush.go index 21dedb92..13cc1eef 100644 --- a/mobile/api_lightpush.go +++ b/mobile/api_lightpush.go @@ -26,7 +26,7 @@ func lightpushPublish(msg *pb.WakuMessage, pubsubTopic string, peerID string, ms ctx = context.Background() } - var lpOptions []lightpush.LightPushOption + var lpOptions []lightpush.Option if peerID != "" { p, err := peer.Decode(peerID) if err != nil { diff --git a/mobile/api_store.go b/mobile/api_store.go index 8823399f..b44ce3b7 100644 --- a/mobile/api_store.go +++ b/mobile/api_store.go @@ -80,7 +80,7 @@ func StoreQuery(queryJSON string, peerID string, ms int) string { } options := []store.HistoryRequestOption{ - store.WithAutomaticRequestId(), + store.WithAutomaticRequestID(), store.WithPaging(args.PagingOptions.Forward, args.PagingOptions.PageSize), store.WithCursor(args.PagingOptions.Cursor), } @@ -120,7 +120,7 @@ func StoreLocalQuery(queryJSON string) string { } options := []store.HistoryRequestOption{ - store.WithAutomaticRequestId(), + store.WithAutomaticRequestID(), store.WithPaging(args.PagingOptions.Forward, args.PagingOptions.PageSize), store.WithCursor(args.PagingOptions.Cursor), store.WithLocalQuery(), diff --git a/waku/v2/connection_gater.go b/waku/v2/connection_gater.go index 66a0391f..766d0141 100644 --- a/waku/v2/connection_gater.go +++ b/waku/v2/connection_gater.go @@ -12,6 +12,8 @@ import ( "go.uber.org/zap" ) +// ConnectionGater is the implementation of the connection gater used to limit +// the number of connections per IP address type ConnectionGater struct { sync.Mutex logger *zap.Logger @@ -22,6 +24,7 @@ type ConnectionGater struct { const maxConnsPerIP = 10 +// NewConnectionGater creates a new instance of ConnectionGater func NewConnectionGater(logger *zap.Logger) *ConnectionGater { c := &ConnectionGater{ logger: logger.Named("connection-gater"), diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index c185f622..81ac8733 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -165,7 +165,7 @@ func (wf *WakuFilterLightnode) request(ctx context.Context, params *FilterSubscr reader := pbio.NewDelimitedReader(conn, math.MaxInt32) request := &pb.FilterSubscribeRequest{ - RequestId: hex.EncodeToString(params.requestId), + RequestId: hex.EncodeToString(params.requestID), FilterSubscribeType: reqType, PubsubTopic: contentFilter.Topic, ContentTopics: contentFilter.ContentTopics, @@ -188,7 +188,7 @@ func (wf *WakuFilterLightnode) request(ctx context.Context, params *FilterSubscr } if filterSubscribeResponse.RequestId != request.RequestId { - wf.log.Error("requestId mismatch", zap.String("expected", request.RequestId), zap.String("received", filterSubscribeResponse.RequestId)) + wf.log.Error("requestID mismatch", zap.String("expected", request.RequestId), zap.String("received", filterSubscribeResponse.RequestId)) metrics.RecordFilterError(ctx, "request_id_mismatch") err := NewFilterError(300, "request_id_mismatch") return &err diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index 285c56ff..f3c65c90 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -15,14 +15,14 @@ type ( FilterSubscribeParameters struct { host host.Host selectedPeer peer.ID - requestId []byte + requestID []byte log *zap.Logger } FilterUnsubscribeParameters struct { unsubscribeAll bool selectedPeer peer.ID - requestId []byte + requestID []byte log *zap.Logger } @@ -78,22 +78,26 @@ func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Fi } } -func WithRequestId(requestId []byte) FilterSubscribeOption { +// WithRequestID is an option to set a specific request ID to be used when +// creating a filter subscription +func WithRequestID(requestID []byte) FilterSubscribeOption { return func(params *FilterSubscribeParameters) { - params.requestId = requestId + params.requestID = requestID } } -func WithAutomaticRequestId() FilterSubscribeOption { +// WithAutomaticRequestID is an option to automatically generate a request ID +// when creating a filter subscription +func WithAutomaticRequestID() FilterSubscribeOption { return func(params *FilterSubscribeParameters) { - params.requestId = protocol.GenerateRequestId() + params.requestID = protocol.GenerateRequestId() } } func DefaultSubscriptionOptions() []FilterSubscribeOption { return []FilterSubscribeOption{ WithAutomaticPeerSelection(), - WithAutomaticRequestId(), + WithAutomaticRequestID(), } } @@ -109,15 +113,17 @@ func Peer(p peer.ID) FilterUnsubscribeOption { } } -func RequestID(requestId []byte) FilterUnsubscribeOption { +// RequestID is an option to set a specific request ID to be used when +// removing a subscription from a filter node +func RequestID(requestID []byte) FilterUnsubscribeOption { return func(params *FilterUnsubscribeParameters) { - params.requestId = requestId + params.requestID = requestID } } func AutomaticRequestId() FilterUnsubscribeOption { return func(params *FilterUnsubscribeParameters) { - params.requestId = protocol.GenerateRequestId() + params.requestID = protocol.GenerateRequestId() } } diff --git a/waku/v2/protocol/legacy_filter/filter_map.go b/waku/v2/protocol/legacy_filter/filter_map.go index 019be8e2..f04d7de8 100644 --- a/waku/v2/protocol/legacy_filter/filter_map.go +++ b/waku/v2/protocol/legacy_filter/filter_map.go @@ -85,15 +85,17 @@ func (fm *FilterMap) Items() <-chan FilterMapItem { return c } -func (fm *FilterMap) Notify(msg *pb.WakuMessage, requestId string) { +// Notify is used to push a received message from a filter subscription to +// any content filter registered on this node and to the broadcast subscribers +func (fm *FilterMap) Notify(msg *pb.WakuMessage, requestID string) { fm.RLock() defer fm.RUnlock() - filter, ok := fm.items[requestId] + filter, ok := fm.items[requestID] if !ok { - // We do this because the key for the filter is set to the requestId received from the filter protocol. + // We do this because the key for the filter is set to the requestID received from the filter protocol. // This means we do not need to check the content filter explicitly as all MessagePushs already contain - // the requestId of the coresponding filter. + // the requestID of the coresponding filter. return } diff --git a/waku/v2/protocol/legacy_filter/filter_subscribers.go b/waku/v2/protocol/legacy_filter/filter_subscribers.go index a047ff9c..872126cb 100644 --- a/waku/v2/protocol/legacy_filter/filter_subscribers.go +++ b/waku/v2/protocol/legacy_filter/filter_subscribers.go @@ -10,7 +10,7 @@ import ( type Subscriber struct { peer peer.ID - requestId string + requestID string filter *pb.FilterRequest // @TODO MAKE THIS A SEQUENCE AGAIN? } @@ -122,14 +122,15 @@ func (sub *Subscribers) FlagAsFailure(peerID peer.ID) { } } -func (sub *Subscribers) RemoveContentFilters(peerID peer.ID, requestId string, contentFilters []*pb.FilterRequest_ContentFilter) { +// RemoveContentFilters removes a set of content filters registered for an specific peer +func (sub *Subscribers) RemoveContentFilters(peerID peer.ID, requestID string, contentFilters []*pb.FilterRequest_ContentFilter) { sub.Lock() defer sub.Unlock() var peerIdsToRemove []peer.ID for subIndex, subscriber := range sub.subscribers { - if subscriber.peer != peerID || subscriber.requestId != requestId { + if subscriber.peer != peerID || subscriber.requestID != requestID { continue } @@ -156,7 +157,7 @@ func (sub *Subscribers) RemoveContentFilters(peerID peer.ID, requestId string, c // if no more content filters left for _, peerId := range peerIdsToRemove { for i, s := range sub.subscribers { - if s.peer == peerId && s.requestId == requestId { + if s.peer == peerId && s.requestID == requestID { l := len(sub.subscribers) - 1 sub.subscribers[i] = sub.subscribers[l] sub.subscribers = sub.subscribers[:l] diff --git a/waku/v2/protocol/legacy_filter/filter_subscribers_test.go b/waku/v2/protocol/legacy_filter/filter_subscribers_test.go index debb3110..c026e9d2 100644 --- a/waku/v2/protocol/legacy_filter/filter_subscribers_test.go +++ b/waku/v2/protocol/legacy_filter/filter_subscribers_test.go @@ -28,14 +28,14 @@ func firstSubscriber(subs *Subscribers, contentTopic string) *Subscriber { func TestAppend(t *testing.T) { subs := NewSubscribers(10 * time.Second) peerId := createPeerId(t) - requestId := "request_1" + requestID := "request_1" contentTopic := "topic1" request := &pb.FilterRequest{ Subscribe: true, Topic: TOPIC, ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: contentTopic}}, } - subs.Append(Subscriber{peerId, requestId, request}) + subs.Append(Subscriber{peerId, requestID, request}) sub := firstSubscriber(subs, contentTopic) assert.NotNil(t, sub) @@ -44,15 +44,15 @@ func TestAppend(t *testing.T) { func TestRemove(t *testing.T) { subs := NewSubscribers(10 * time.Second) peerId := createPeerId(t) - requestId := "request_1" + requestID := "request_1" contentTopic := "topic1" request := &pb.FilterRequest{ Subscribe: true, Topic: TOPIC, ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: contentTopic}}, } - subs.Append(Subscriber{peerId, requestId, request}) - subs.RemoveContentFilters(peerId, requestId, request.ContentFilters) + subs.Append(Subscriber{peerId, requestID, request}) + subs.RemoveContentFilters(peerId, requestID, request.ContentFilters) sub := firstSubscriber(subs, contentTopic) assert.Nil(t, sub) @@ -61,7 +61,7 @@ func TestRemove(t *testing.T) { func TestRemovePartial(t *testing.T) { subs := NewSubscribers(10 * time.Second) peerId := createPeerId(t) - requestId := "request_1" + requestID := "request_1" topic1 := "topic1" topic2 := "topic2" request := &pb.FilterRequest{ @@ -69,8 +69,8 @@ func TestRemovePartial(t *testing.T) { Topic: TOPIC, ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: topic1}, {ContentTopic: topic2}}, } - subs.Append(Subscriber{peerId, requestId, request}) - subs.RemoveContentFilters(peerId, requestId, []*pb.FilterRequest_ContentFilter{{ContentTopic: topic1}}) + subs.Append(Subscriber{peerId, requestID, request}) + subs.RemoveContentFilters(peerId, requestID, []*pb.FilterRequest_ContentFilter{{ContentTopic: topic1}}) sub := firstSubscriber(subs, topic2) assert.NotNil(t, sub) @@ -81,8 +81,8 @@ func TestRemoveDuplicateSubscriptions(t *testing.T) { subs := NewSubscribers(10 * time.Second) peerId := createPeerId(t) topic := "topic" - requestId1 := "request_1" - requestId2 := "request_2" + requestID1 := "request_1" + requestID2 := "request_2" request1 := &pb.FilterRequest{ Subscribe: true, Topic: TOPIC, @@ -93,10 +93,10 @@ func TestRemoveDuplicateSubscriptions(t *testing.T) { Topic: TOPIC, ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: topic}}, } - subs.Append(Subscriber{peerId, requestId1, request1}) - subs.Append(Subscriber{peerId, requestId2, request2}) - subs.RemoveContentFilters(peerId, requestId2, []*pb.FilterRequest_ContentFilter{{ContentTopic: topic}}) - subs.RemoveContentFilters(peerId, requestId1, []*pb.FilterRequest_ContentFilter{{ContentTopic: topic}}) + subs.Append(Subscriber{peerId, requestID1, request1}) + subs.Append(Subscriber{peerId, requestID2, request2}) + subs.RemoveContentFilters(peerId, requestID2, []*pb.FilterRequest_ContentFilter{{ContentTopic: topic}}) + subs.RemoveContentFilters(peerId, requestID1, []*pb.FilterRequest_ContentFilter{{ContentTopic: topic}}) sub := firstSubscriber(subs, topic) assert.Nil(t, sub) @@ -106,8 +106,8 @@ func TestRemoveDuplicateSubscriptionsPartial(t *testing.T) { subs := NewSubscribers(10 * time.Second) peerId := createPeerId(t) topic := "topic" - requestId1 := "request_1" - requestId2 := "request_2" + requestID1 := "request_1" + requestID2 := "request_2" request1 := &pb.FilterRequest{ Subscribe: true, Topic: TOPIC, @@ -118,27 +118,27 @@ func TestRemoveDuplicateSubscriptionsPartial(t *testing.T) { Topic: TOPIC, ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: topic}}, } - subs.Append(Subscriber{peerId, requestId1, request1}) - subs.Append(Subscriber{peerId, requestId2, request2}) - subs.RemoveContentFilters(peerId, requestId1, []*pb.FilterRequest_ContentFilter{{ContentTopic: topic}}) + subs.Append(Subscriber{peerId, requestID1, request1}) + subs.Append(Subscriber{peerId, requestID2, request2}) + subs.RemoveContentFilters(peerId, requestID1, []*pb.FilterRequest_ContentFilter{{ContentTopic: topic}}) sub := firstSubscriber(subs, topic) assert.NotNil(t, sub) - assert.Equal(t, sub.requestId, requestId2) + assert.Equal(t, sub.requestID, requestID2) } func TestRemoveBogus(t *testing.T) { subs := NewSubscribers(10 * time.Second) peerId := createPeerId(t) - requestId := "request_1" + requestID := "request_1" contentTopic := "topic1" request := &pb.FilterRequest{ Subscribe: true, Topic: TOPIC, ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: contentTopic}}, } - subs.Append(Subscriber{peerId, requestId, request}) - subs.RemoveContentFilters(peerId, requestId, []*pb.FilterRequest_ContentFilter{{ContentTopic: "does not exist"}, {ContentTopic: contentTopic}}) + subs.Append(Subscriber{peerId, requestID, request}) + subs.RemoveContentFilters(peerId, requestID, []*pb.FilterRequest_ContentFilter{{ContentTopic: "does not exist"}, {ContentTopic: contentTopic}}) sub := firstSubscriber(subs, contentTopic) assert.Nil(t, sub) diff --git a/waku/v2/protocol/legacy_filter/waku_filter.go b/waku/v2/protocol/legacy_filter/waku_filter.go index 85f9c8c2..f9f3a01d 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter.go +++ b/waku/v2/protocol/legacy_filter/waku_filter.go @@ -144,7 +144,7 @@ func (wf *WakuFilter) onRequest(ctx context.Context) func(s network.Stream) { // We're on a full node. // This is a filter request coming from a light node. if filterRPCRequest.Request.Subscribe { - subscriber := Subscriber{peer: s.Conn().RemotePeer(), requestId: filterRPCRequest.RequestId, filter: filterRPCRequest.Request} + subscriber := Subscriber{peer: s.Conn().RemotePeer(), requestID: filterRPCRequest.RequestId, filter: filterRPCRequest.Request} if subscriber.filter.Topic == "" { // @TODO: review if empty topic is possible subscriber.filter.Topic = relay.DefaultWakuTopic } @@ -168,7 +168,7 @@ func (wf *WakuFilter) onRequest(ctx context.Context) func(s network.Stream) { } func (wf *WakuFilter) pushMessage(ctx context.Context, subscriber Subscriber, msg *wpb.WakuMessage) error { - pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: []*wpb.WakuMessage{msg}}} + pushRPC := &pb.FilterRPC{RequestId: subscriber.requestID, Push: &pb.MessagePush{Messages: []*wpb.WakuMessage{msg}}} logger := wf.log.With(logging.HostID("peer", subscriber.peer)) conn, err := wf.h.NewStream(ctx, subscriber.peer, FilterID_v20beta1) diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 85af5d40..8df889da 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -19,7 +19,7 @@ import ( "go.uber.org/zap" ) -// LightPushID_v20beta1 is the current Waku Lightpush protocol identifier +// LightPushID_v20beta1 is the current Waku LightPush protocol identifier const LightPushID_v20beta1 = libp2pProtocol.ID("/vac/waku/lightpush/2.0.0-beta1") var ( @@ -27,6 +27,7 @@ var ( ErrInvalidId = errors.New("invalid request id") ) +// WakuLightPush is the implementation of the Waku LightPush protocol type WakuLightPush struct { h host.Host relay *relay.WakuRelay @@ -137,8 +138,8 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(s network.Strea } } -func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, opts ...LightPushOption) (*pb.PushResponse, error) { - params := new(LightPushParameters) +func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, opts ...Option) (*pb.PushResponse, error) { + params := new(lightPushParameters) params.host = wakuLP.h params.log = wakuLP.log @@ -152,7 +153,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o return nil, ErrNoPeersAvailable } - if len(params.requestId) == 0 { + if len(params.requestID) == 0 { return nil, ErrInvalidId } @@ -174,7 +175,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o } }() - pushRequestRPC := &pb.PushRPC{RequestId: hex.EncodeToString(params.requestId), Query: req} + pushRequestRPC := &pb.PushRPC{RequestId: hex.EncodeToString(params.requestID), Query: req} writer := pbio.NewDelimitedWriter(connOpt) reader := pbio.NewDelimitedReader(connOpt, math.MaxInt32) @@ -208,7 +209,7 @@ func (wakuLP *WakuLightPush) Stop() { } // PublishToTopic is used to broadcast a WakuMessage to a pubsub topic via lightpush protocol -func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.WakuMessage, topic string, opts ...LightPushOption) ([]byte, error) { +func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.WakuMessage, topic string, opts ...Option) ([]byte, error) { if message == nil { return nil, errors.New("message can't be null") } @@ -232,6 +233,6 @@ func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.Wa } // Publish is used to broadcast a WakuMessage to the default waku pubsub topic via lightpush protocol -func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessage, opts ...LightPushOption) ([]byte, error) { +func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) { return wakuLP.PublishToTopic(ctx, message, relay.DefaultWakuTopic, opts...) } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index f1fda91e..61f80752 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -10,18 +10,19 @@ import ( "go.uber.org/zap" ) -type LightPushParameters struct { +type lightPushParameters struct { host host.Host selectedPeer peer.ID - requestId []byte + requestID []byte log *zap.Logger } -type LightPushOption func(*LightPushParameters) +// Option is the type of options accepted when performing LightPush protocol requests +type Option func(*lightPushParameters) // WithPeer is an option used to specify the peerID to push a waku message to -func WithPeer(p peer.ID) LightPushOption { - return func(params *LightPushParameters) { +func WithPeer(p peer.ID) Option { + return func(params *lightPushParameters) { params.selectedPeer = p } } @@ -30,8 +31,8 @@ func WithPeer(p peer.ID) LightPushOption { // to push a waku message to. If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from the node peerstore -func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) LightPushOption { - return func(params *LightPushParameters) { +func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option { + return func(params *lightPushParameters) { p, err := utils.SelectPeer(params.host, LightPushID_v20beta1, fromThesePeers, params.log) if err == nil { params.selectedPeer = p @@ -45,8 +46,8 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) LightPushOption { // with the lowest ping. If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from the node peerstore -func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) LightPushOption { - return func(params *LightPushParameters) { +func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Option { + return func(params *lightPushParameters) { p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, LightPushID_v20beta1, fromThesePeers, params.log) if err == nil { params.selectedPeer = p @@ -56,26 +57,26 @@ func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Li } } -// WithRequestId is an option to set a specific request ID to be used when +// WithRequestID is an option to set a specific request ID to be used when // publishing a message -func WithRequestId(requestId []byte) LightPushOption { - return func(params *LightPushParameters) { - params.requestId = requestId +func WithRequestID(requestID []byte) Option { + return func(params *lightPushParameters) { + params.requestID = requestID } } -// WithAutomaticRequestId is an option to automatically generate a request ID +// WithAutomaticRequestID is an option to automatically generate a request ID // when publishing a message -func WithAutomaticRequestId() LightPushOption { - return func(params *LightPushParameters) { - params.requestId = protocol.GenerateRequestId() +func WithAutomaticRequestID() Option { + return func(params *lightPushParameters) { + params.requestID = protocol.GenerateRequestId() } } // DefaultOptions are the default options to be used when using the lightpush protocol -func DefaultOptions(host host.Host) []LightPushOption { - return []LightPushOption{ - WithAutomaticRequestId(), +func DefaultOptions(host host.Host) []Option { + return []Option{ + WithAutomaticRequestID(), WithAutomaticPeerSelection(), } } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option_test.go b/waku/v2/protocol/lightpush/waku_lightpush_option_test.go index 6f408c5b..6fa21677 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option_test.go @@ -17,15 +17,15 @@ func TestLightPushOption(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - options := []LightPushOption{ + options := []Option{ WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"), WithAutomaticPeerSelection(), WithFastestPeerSelection(context.Background()), - WithRequestId([]byte("requestId")), - WithAutomaticRequestId(), + WithRequestID([]byte("requestID")), + WithAutomaticRequestID(), } - params := new(LightPushParameters) + params := new(lightPushParameters) params.host = host params.log = utils.Logger() @@ -35,5 +35,5 @@ func TestLightPushOption(t *testing.T) { require.Equal(t, host, params.host) require.NotNil(t, params.selectedPeer) - require.NotNil(t, params.requestId) + require.NotNil(t, params.requestID) } diff --git a/waku/v2/protocol/peer_options.go b/waku/v2/protocol/peer_options.go deleted file mode 100644 index 2d0eaff9..00000000 --- a/waku/v2/protocol/peer_options.go +++ /dev/null @@ -1 +0,0 @@ -package protocol diff --git a/waku/v2/protocol/relay/broadcast.go b/waku/v2/protocol/relay/broadcast.go index f79eec5b..2b3d570a 100644 --- a/waku/v2/protocol/relay/broadcast.go +++ b/waku/v2/protocol/relay/broadcast.go @@ -86,6 +86,7 @@ func (b *chStore) close() { b.topicToChans = nil } +// Broadcaster is used to create a fanout for an envelope that will be received by any subscriber interested in the topic of the message type Broadcaster interface { Start(ctx context.Context) error Stop() @@ -109,12 +110,14 @@ type broadcaster struct { running atomic.Bool } +// NewBroadcaster creates a new instance of a broadcaster func NewBroadcaster(bufLen int) *broadcaster { return &broadcaster{ bufLen: bufLen, } } +// Start initiates the execution of the broadcaster func (b *broadcaster) Start(ctx context.Context) error { if !b.running.CompareAndSwap(false, true) { // if not running then start return errors.New("already started") @@ -140,6 +143,7 @@ func (b *broadcaster) run(ctx context.Context) { } } +// Stop stops the execution of the broadcaster and closes all subscriptions func (b *broadcaster) Stop() { if !b.running.CompareAndSwap(true, false) { // if running then stop return @@ -150,12 +154,12 @@ func (b *broadcaster) Stop() { close(b.input) // close input channel } -// returned subscription is all speicfied topic +// Register returns a subscription for an specific topic func (b *broadcaster) Register(topic string, chLen ...int) Subscription { return b.chStore.getNewCh(topic, getChLen(chLen)) } -// return subscription is for all topic +// RegisterForAll returns a subscription for all topics func (b *broadcaster) RegisterForAll(chLen ...int) Subscription { return b.chStore.getNewCh("", getChLen(chLen)) @@ -169,7 +173,7 @@ func getChLen(chLen []int) int { return l } -// only accepts value when running. +// Submit is used to broadcast messages to subscribers. It only accepts value when running. func (b *broadcaster) Submit(m *protocol.Envelope) { if b.running.Load() { b.input <- m diff --git a/waku/v2/protocol/relay/subscription.go b/waku/v2/protocol/relay/subscription.go index f256703c..3f7cb34d 100644 --- a/waku/v2/protocol/relay/subscription.go +++ b/waku/v2/protocol/relay/subscription.go @@ -2,11 +2,13 @@ package relay import "github.com/waku-org/go-waku/waku/v2/protocol" +// Subscription handles the details of a particular Topic subscription. There may be many subscriptions for a given topic. type Subscription struct { Unsubscribe func() Ch <-chan *protocol.Envelope } +// NoopSubscription creates a noop subscription that will not receive any envelope func NoopSubscription() Subscription { ch := make(chan *protocol.Envelope) close(ch) @@ -16,6 +18,7 @@ func NoopSubscription() Subscription { } } +// ArraySubscription creates a subscription for a list of envelopes func ArraySubscription(msgs []*protocol.Envelope) Subscription { ch := make(chan *protocol.Envelope, len(msgs)) for _, msg := range msgs { diff --git a/waku/v2/protocol/relay/validators.go b/waku/v2/protocol/relay/validators.go index a3dd1c55..14490870 100644 --- a/waku/v2/protocol/relay/validators.go +++ b/waku/v2/protocol/relay/validators.go @@ -20,8 +20,7 @@ import ( proto "google.golang.org/protobuf/proto" ) -// Application level message hash -func MsgHash(pubSubTopic string, msg *pb.WakuMessage) []byte { +func msgHash(pubSubTopic string, msg *pb.WakuMessage) []byte { timestampBytes := make([]byte, 8) binary.LittleEndian.PutUint64(timestampBytes, uint64(msg.Timestamp)) @@ -39,7 +38,7 @@ func MsgHash(pubSubTopic string, msg *pb.WakuMessage) []byte { ) } -const MessageWindowDuration = time.Minute * 5 +const messageWindowDuration = time.Minute * 5 func withinTimeWindow(t timesource.Timesource, msg *pb.WakuMessage) bool { if msg.Timestamp == 0 { @@ -49,7 +48,7 @@ func withinTimeWindow(t timesource.Timesource, msg *pb.WakuMessage) bool { now := t.Now() msgTime := time.Unix(0, msg.Timestamp) - return now.Sub(msgTime).Abs() <= MessageWindowDuration + return now.Sub(msgTime).Abs() <= messageWindowDuration } type validatorFn = func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool @@ -67,13 +66,14 @@ func validatorFnBuilder(t timesource.Timesource, topic string, publicKey *ecdsa. return false } - msgHash := MsgHash(topic, msg) + msgHash := msgHash(topic, msg) signature := msg.Meta return secp256k1.VerifySignature(publicKeyBytes, msgHash, signature) }, nil } +// AddSignedTopicValidator registers a gossipsub validator for a topic which will check that messages Meta field contains a valid ECDSA signature for the specified pubsub topic. This is used as a DoS prevention mechanism func (w *WakuRelay) AddSignedTopicValidator(topic string, publicKey *ecdsa.PublicKey) error { w.log.Info("adding validator to signed topic", zap.String("topic", topic), zap.String("publicKey", hex.EncodeToString(elliptic.Marshal(publicKey.Curve, publicKey.X, publicKey.Y)))) @@ -94,8 +94,9 @@ func (w *WakuRelay) AddSignedTopicValidator(topic string, publicKey *ecdsa.Publi return nil } +// SignMessage adds an ECDSA signature to a WakuMessage as an opt-in mechanism for DoS prevention func SignMessage(privKey *ecdsa.PrivateKey, msg *pb.WakuMessage, pubsubTopic string) error { - msgHash := MsgHash(pubsubTopic, msg) + msgHash := msgHash(pubsubTopic, msg) sign, err := secp256k1.Sign(msgHash, crypto.FromECDSA(privKey)) if err != nil { return err diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 83eaa5ac..5377e06d 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -27,10 +27,13 @@ import ( "github.com/waku-org/go-waku/waku/v2/timesource" ) +// WakuRelayID_v200 is the current protocol ID used for WakuRelay const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0") +// DefaultWakuTopic is the default pubsub topic used across all Waku protocols var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic().String() +// WakuRelay is the implementation of the Waku Relay protocol type WakuRelay struct { host host.Host opts []pubsub.Option @@ -202,11 +205,12 @@ func (w *WakuRelay) peerScoreInspector(peerScoresSnapshots map[peer.ID]*pubsub.P } } -// Sets the host to be able to mount or consume a protocol +// SetHost sets the host to be able to mount or consume a protocol func (w *WakuRelay) SetHost(h host.Host) { w.host = h } +// Start initiates the WakuRelay protocol func (w *WakuRelay) Start(ctx context.Context) error { w.wg.Wait() ctx, cancel := context.WithCancel(ctx) @@ -249,6 +253,7 @@ func (w *WakuRelay) Topics() []string { return result } +// IsSubscribed indicates whether the node is subscribed to a pubsub topic or not func (w *WakuRelay) IsSubscribed(topic string) bool { defer w.topicsMutex.Unlock() w.topicsMutex.Lock() @@ -487,6 +492,7 @@ func (w *WakuRelay) subscribeToTopic(pubsubTopic string, sub *pubsub.Subscriptio } +// Params returns the gossipsub configuration parameters used by WakuRelay func (w *WakuRelay) Params() pubsub.GossipSubParams { return w.params } diff --git a/waku/v2/protocol/rln/common.go b/waku/v2/protocol/rln/common.go index 79c6be06..69412d4c 100644 --- a/waku/v2/protocol/rln/common.go +++ b/waku/v2/protocol/rln/common.go @@ -6,23 +6,23 @@ import ( "github.com/waku-org/go-zerokit-rln/rln" ) -type MessageValidationResult int +type messageValidationResult int const ( - MessageValidationResult_Unknown MessageValidationResult = iota - MessageValidationResult_Valid - MessageValidationResult_Invalid - MessageValidationResult_Spam + validationError messageValidationResult = iota + validMessage + invalidMessage + spamMessage ) // the maximum clock difference between peers in seconds -const MAX_CLOCK_GAP_SECONDS = 20 +const maxClockGapSeconds = 20 // maximum allowed gap between the epochs of messages' RateLimitProofs -const MAX_EPOCH_GAP = int64(MAX_CLOCK_GAP_SECONDS / rln.EPOCH_UNIT_SECONDS) +const maxEpochGap = int64(maxClockGapSeconds / rln.EPOCH_UNIT_SECONDS) -// Acceptable roots for merkle root validation of incoming messages -const AcceptableRootWindowSize = 5 +// acceptable roots for merkle root validation of incoming messages +const acceptableRootWindowSize = 5 type RegistrationHandler = func(tx *types.Transaction) diff --git a/waku/v2/protocol/rln/rln_relay_test.go b/waku/v2/protocol/rln/rln_relay_test.go index f92970ba..f353254f 100644 --- a/waku/v2/protocol/rln/rln_relay_test.go +++ b/waku/v2/protocol/rln/rln_relay_test.go @@ -79,7 +79,7 @@ func (s *WakuRLNRelaySuite) TestUpdateLogAndHasDuplicate() { rlnInstance, err := r.NewRLN() s.Require().NoError(err) - rootTracker, err := group_manager.NewMerkleRootTracker(AcceptableRootWindowSize, rlnInstance) + rootTracker, err := group_manager.NewMerkleRootTracker(acceptableRootWindowSize, rlnInstance) s.Require().NoError(err) rlnRelay := &WakuRLNRelay{ @@ -170,7 +170,7 @@ func (s *WakuRLNRelaySuite) TestValidateMessage() { groupManager, err := static.NewStaticGroupManager(groupIDCommitments, idCredential, index, utils.Logger()) s.Require().NoError(err) - rootTracker, err := group_manager.NewMerkleRootTracker(AcceptableRootWindowSize, rlnInstance) + rootTracker, err := group_manager.NewMerkleRootTracker(acceptableRootWindowSize, rlnInstance) s.Require().NoError(err) rlnRelay := &WakuRLNRelay{ @@ -221,8 +221,8 @@ func (s *WakuRLNRelaySuite) TestValidateMessage() { msgValidate4, err := rlnRelay.ValidateMessage(wm4, &now) s.Require().NoError(err) - s.Require().Equal(MessageValidationResult_Valid, msgValidate1) - s.Require().Equal(MessageValidationResult_Spam, msgValidate2) - s.Require().Equal(MessageValidationResult_Valid, msgValidate3) - s.Require().Equal(MessageValidationResult_Invalid, msgValidate4) + s.Require().Equal(validMessage, msgValidate1) + s.Require().Equal(spamMessage, msgValidate2) + s.Require().Equal(validMessage, msgValidate3) + s.Require().Equal(invalidMessage, msgValidate4) } diff --git a/waku/v2/protocol/rln/waku_rln_relay.go b/waku/v2/protocol/rln/waku_rln_relay.go index f87b7c27..fa2eb9be 100644 --- a/waku/v2/protocol/rln/waku_rln_relay.go +++ b/waku/v2/protocol/rln/waku_rln_relay.go @@ -62,7 +62,7 @@ func New( return nil, err } - rootTracker, err := group_manager.NewMerkleRootTracker(AcceptableRootWindowSize, rlnInstance) + rootTracker, err := group_manager.NewMerkleRootTracker(acceptableRootWindowSize, rlnInstance) if err != nil { return nil, err } @@ -166,15 +166,15 @@ func (rlnRelay *WakuRLNRelay) updateLog(proofMD rln.ProofMetadata) (bool, error) return true, nil } -func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime *time.Time) (MessageValidationResult, error) { - // validate the supplied `msg` based on the waku-rln-relay routing protocol i.e., - // the `msg`'s epoch is within MAX_EPOCH_GAP of the current epoch - // the `msg` has valid rate limit proof - // the `msg` does not violate the rate limit - // `timeOption` indicates Unix epoch time (fractional part holds sub-seconds) - // if `timeOption` is supplied, then the current epoch is calculated based on that +// ValidateMessage validates the supplied message based on the waku-rln-relay routing protocol i.e., +// the message's epoch is within `maxEpochGap` of the current epoch +// the message's has valid rate limit proof +// the message's does not violate the rate limit +// if `optionalTime` is supplied, then the current epoch is calculated based on that, otherwise the current time will be used +func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime *time.Time) (messageValidationResult, error) { + // if msg == nil { - return MessageValidationResult_Unknown, errors.New("nil message") + return validationError, errors.New("nil message") } // checks if the `msg`'s epoch is far from the current epoch @@ -191,46 +191,46 @@ func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime if msgProof == nil { // message does not contain a proof rlnRelay.log.Debug("invalid message: message does not contain a proof") - return MessageValidationResult_Invalid, nil + return invalidMessage, nil } proofMD, err := rlnRelay.RLN.ExtractMetadata(*msgProof) if err != nil { rlnRelay.log.Debug("could not extract metadata", zap.Error(err)) - return MessageValidationResult_Invalid, nil + return invalidMessage, nil } // calculate the gaps and validate the epoch gap := rln.Diff(epoch, msgProof.Epoch) - if int64(math.Abs(float64(gap))) > MAX_EPOCH_GAP { + if int64(math.Abs(float64(gap))) > maxEpochGap { // message's epoch is too old or too ahead // accept messages whose epoch is within +-MAX_EPOCH_GAP from the current epoch rlnRelay.log.Debug("invalid message: epoch gap exceeds a threshold", zap.Int64("gap", gap)) - return MessageValidationResult_Invalid, nil + return invalidMessage, nil } valid, err := rlnRelay.verifyProof(msg, msgProof) if err != nil { rlnRelay.log.Debug("could not verify proof", zap.Error(err)) - return MessageValidationResult_Invalid, nil + return invalidMessage, nil } if !valid { // invalid proof rlnRelay.log.Debug("Invalid proof") - return MessageValidationResult_Invalid, nil + return invalidMessage, nil } // check if double messaging has happened hasDup, err := rlnRelay.HasDuplicate(proofMD) if err != nil { rlnRelay.log.Debug("validation error", zap.Error(err)) - return MessageValidationResult_Unknown, err + return validationError, err } if hasDup { rlnRelay.log.Debug("spam received") - return MessageValidationResult_Spam, nil + return spamMessage, nil } // insert the message to the log @@ -238,11 +238,11 @@ func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime // it will never error out _, err = rlnRelay.updateLog(proofMD) if err != nil { - return MessageValidationResult_Unknown, err + return validationError, err } rlnRelay.log.Debug("message is valid") - return MessageValidationResult_Valid, nil + return validMessage, nil } func (rlnRelay *WakuRLNRelay) verifyProof(msg *pb.WakuMessage, proof *rln.RateLimitProof) (bool, error) { @@ -303,19 +303,19 @@ func (rlnRelay *WakuRLNRelay) addValidator( } switch validationRes { - case MessageValidationResult_Valid: + case validMessage: rlnRelay.log.Debug("message verified", zap.String("pubsubTopic", pubsubTopic), zap.String("id", hex.EncodeToString(wakuMessage.Hash(pubsubTopic))), ) return true - case MessageValidationResult_Invalid: + case invalidMessage: rlnRelay.log.Debug("message could not be verified", zap.String("pubsubTopic", pubsubTopic), zap.String("id", hex.EncodeToString(wakuMessage.Hash(pubsubTopic))), ) return false - case MessageValidationResult_Spam: + case spamMessage: rlnRelay.log.Debug("spam message found", zap.String("pubsubTopic", pubsubTopic), zap.String("id", hex.EncodeToString(wakuMessage.Hash(pubsubTopic))), diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/store/waku_store_client.go index d2c957b4..b3a8df09 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/store/waku_store_client.go @@ -84,7 +84,7 @@ type criteriaFN = func(msg *wpb.WakuMessage) (bool, error) type HistoryRequestParameters struct { selectedPeer peer.ID localQuery bool - requestId []byte + requestID []byte cursor *pb.Index pageSize uint64 asc bool @@ -131,15 +131,19 @@ func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Hi } } -func WithRequestId(requestId []byte) HistoryRequestOption { +// WithRequestID is an option to set a specific request ID to be used when +// creating a store request +func WithRequestID(requestID []byte) HistoryRequestOption { return func(params *HistoryRequestParameters) { - params.requestId = requestId + params.requestID = requestID } } -func WithAutomaticRequestId() HistoryRequestOption { +// WithAutomaticRequestID is an option to automatically generate a request ID +// when creating a store request +func WithAutomaticRequestID() HistoryRequestOption { return func(params *HistoryRequestParameters) { - params.requestId = protocol.GenerateRequestId() + params.requestID = protocol.GenerateRequestId() } } @@ -166,13 +170,13 @@ func WithLocalQuery() HistoryRequestOption { // Default options to be used when querying a store node for results func DefaultOptions() []HistoryRequestOption { return []HistoryRequestOption{ - WithAutomaticRequestId(), + WithAutomaticRequestID(), WithAutomaticPeerSelection(), WithPaging(true, MaxPageSize), } } -func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selectedPeer peer.ID, requestId []byte) (*pb.HistoryResponse, error) { +func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selectedPeer peer.ID, requestID []byte) (*pb.HistoryResponse, error) { logger := store.log.With(logging.HostID("peer", selectedPeer)) logger.Info("querying message history") @@ -188,7 +192,7 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec _ = connOpt.Reset() }() - historyRequest := &pb.HistoryRPC{Query: q, RequestId: hex.EncodeToString(requestId)} + historyRequest := &pb.HistoryRPC{Query: q, RequestId: hex.EncodeToString(requestID)} writer := pbio.NewDelimitedWriter(connOpt) reader := pbio.NewDelimitedReader(connOpt, math.MaxInt32) @@ -218,7 +222,7 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec return historyResponseRPC.Response, nil } -func (store *WakuStore) localQuery(query *pb.HistoryQuery, requestId []byte) (*pb.HistoryResponse, error) { +func (store *WakuStore) localQuery(query *pb.HistoryQuery, requestID []byte) (*pb.HistoryResponse, error) { logger := store.log logger.Info("querying local message history") @@ -227,7 +231,7 @@ func (store *WakuStore) localQuery(query *pb.HistoryQuery, requestId []byte) (*p } historyResponseRPC := &pb.HistoryRPC{ - RequestId: hex.EncodeToString(requestId), + RequestId: hex.EncodeToString(requestID), Response: store.FindMessages(query), } @@ -272,7 +276,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR return nil, ErrNoPeersAvailable } - if len(params.requestId) == 0 { + if len(params.requestID) == 0 { return nil, ErrInvalidId } @@ -296,9 +300,9 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR var err error if params.localQuery { - response, err = store.localQuery(q, params.requestId) + response, err = store.localQuery(q, params.requestID) } else { - response, err = store.queryFrom(ctx, q, params.selectedPeer, params.requestId) + response, err = store.queryFrom(ctx, q, params.selectedPeer, params.requestID) } if err != nil { return nil, err diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index dd5fd0d1..53176c64 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -154,7 +154,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { ContentTopics: []string{topic1}, } - response, err := s2.Query(ctx, q, WithAutomaticPeerSelection(), WithAutomaticRequestId(), WithPaging(true, 2)) + response, err := s2.Query(ctx, q, WithAutomaticPeerSelection(), WithAutomaticRequestID(), WithPaging(true, 2)) require.NoError(t, err) require.Len(t, response.Messages, 2) require.Equal(t, response.Messages[0].Timestamp, msg1.Timestamp) @@ -229,7 +229,7 @@ func TestWakuStoreResult(t *testing.T) { ContentTopics: []string{topic1}, } - result, err := s2.Query(ctx, q, WithAutomaticPeerSelection(), WithAutomaticRequestId(), WithPaging(true, 2)) + result, err := s2.Query(ctx, q, WithAutomaticPeerSelection(), WithAutomaticRequestID(), WithPaging(true, 2)) require.NoError(t, err) require.False(t, result.started) require.Len(t, result.GetMessages(), 0) @@ -331,7 +331,7 @@ func TestWakuStoreProtocolFind(t *testing.T) { return msg.ContentTopic == "hello", nil } - foundMsg, err := s2.Find(ctx, q, fn, WithAutomaticPeerSelection(), WithAutomaticRequestId(), WithPaging(true, 2)) + foundMsg, err := s2.Find(ctx, q, fn, WithAutomaticPeerSelection(), WithAutomaticRequestID(), WithPaging(true, 2)) require.NoError(t, err) require.NotNil(t, foundMsg) require.Equal(t, "hello", foundMsg.ContentTopic) @@ -340,7 +340,7 @@ func TestWakuStoreProtocolFind(t *testing.T) { return msg.ContentTopic == "bye", nil } - foundMsg, err = s2.Find(ctx, q, fn2, WithAutomaticPeerSelection(), WithAutomaticRequestId(), WithPaging(true, 2)) + foundMsg, err = s2.Find(ctx, q, fn2, WithAutomaticPeerSelection(), WithAutomaticRequestID(), WithPaging(true, 2)) require.NoError(t, err) require.Nil(t, foundMsg) } diff --git a/waku/v2/rpc/store.go b/waku/v2/rpc/store.go index f3b08ecc..1a29399d 100644 --- a/waku/v2/rpc/store.go +++ b/waku/v2/rpc/store.go @@ -40,7 +40,7 @@ type StoreMessagesReply struct { func (s *StoreService) GetV1Messages(req *http.Request, args *StoreMessagesArgs, reply *StoreMessagesReply) error { options := []store.HistoryRequestOption{ - store.WithAutomaticRequestId(), + store.WithAutomaticRequestID(), store.WithAutomaticPeerSelection(), store.WithPaging(args.PagingOptions.Forward, args.PagingOptions.PageSize), store.WithCursor(args.PagingOptions.Cursor),