From 034656b2c0e350d3e672cb08090082e55b2be0f3 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 30 Sep 2021 11:59:51 -0400 Subject: [PATCH] refactor: rename protocol variables --- waku/v2/protocol/filter/waku_filter.go | 16 +++++++--------- waku/v2/protocol/lightpush/waku_lightpush.go | 9 ++++----- waku/v2/protocol/store/waku_store.go | 11 +++++------ 3 files changed, 16 insertions(+), 20 deletions(-) diff --git a/waku/v2/protocol/filter/waku_filter.go b/waku/v2/protocol/filter/waku_filter.go index ec8c63a6..ad660aae 100644 --- a/waku/v2/protocol/filter/waku_filter.go +++ b/waku/v2/protocol/filter/waku_filter.go @@ -55,9 +55,7 @@ type ( // should be direct payload exchange (a la req-resp), not be coupled with the // relay protocol. -const WakuFilterCodec = "/vac/waku/filter/2.0.0-beta1" - -const WakuFilterProtocolId = libp2pProtocol.ID(WakuFilterCodec) +const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1") func (filters *Filters) Notify(msg *pb.WakuMessage, requestId string) { for key, filter := range *filters { @@ -192,7 +190,7 @@ func NewWakuFilter(ctx context.Context, host host.Host, handler MessagePushHandl wf.pushHandler = handler wf.peerChan = peerChan - wf.h.SetStreamHandlerMatch(WakuFilterProtocolId, protocol.PrefixTextMatch(WakuFilterCodec), wf.onRequest) + wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest) go wf.FilterListener() go wf.peerListener() @@ -222,7 +220,7 @@ func (wf *WakuFilter) FilterListener() { pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: msgArr}} log.Info("pushing a message to light node: ", pushRPC) - conn, err := wf.h.NewStream(wf.ctx, peer.ID(subscriber.peer), WakuFilterProtocolId) + conn, err := wf.h.NewStream(wf.ctx, peer.ID(subscriber.peer), FilterID_v20beta1) if err != nil { // @TODO more sophisticated error handling here @@ -258,9 +256,9 @@ func (wf *WakuFilter) FilterListener() { // select a peer with filter support, dial it, // and submit FilterRequest wrapped in FilterRPC func (wf *WakuFilter) Subscribe(ctx context.Context, request pb.FilterRequest) (string, error) { //.async, gcsafe.} { - peer, err := utils.SelectPeer(wf.h, string(WakuFilterProtocolId)) + peer, err := utils.SelectPeer(wf.h, string(FilterID_v20beta1)) if err == nil { - conn, err := wf.h.NewStream(ctx, *peer, WakuFilterProtocolId) + conn, err := wf.h.NewStream(ctx, *peer, FilterID_v20beta1) if conn != nil { defer conn.Close() @@ -292,9 +290,9 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, request pb.FilterRequest) ( func (wf *WakuFilter) Unsubscribe(ctx context.Context, request pb.FilterRequest) { // @TODO: NO REAL REASON TO GENERATE REQUEST ID FOR UNSUBSCRIBE OTHER THAN CREATING SANE-LOOKING RPC. - peer, err := utils.SelectPeer(wf.h, string(WakuFilterProtocolId)) + peer, err := utils.SelectPeer(wf.h, string(FilterID_v20beta1)) if err == nil { - conn, err := wf.h.NewStream(ctx, *peer, WakuFilterProtocolId) + conn, err := wf.h.NewStream(ctx, *peer, FilterID_v20beta1) if conn != nil { defer conn.Close() diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index ebbfe4b2..71bca63b 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -20,8 +20,7 @@ import ( var log = logging.Logger("waku_lightpush") -const WakuLightPushCodec = "/vac/waku/lightpush/2.0.0-beta1" -const WakuLightPushProtocolId = libp2pProtocol.ID(WakuLightPushCodec) +const LightPushID_v20beta1 = libp2pProtocol.ID("/vac/waku/lightpush/2.0.0-beta1") var ( ErrNoPeersAvailable = errors.New("no suitable remote peers") @@ -40,7 +39,7 @@ func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay) wakuLP.ctx = ctx wakuLP.h = h - wakuLP.h.SetStreamHandlerMatch(WakuLightPushProtocolId, protocol.PrefixTextMatch(WakuLightPushCodec), wakuLP.onRequest) + wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest) log.Info("Light Push protocol started") return wakuLP @@ -124,7 +123,7 @@ func WithPeer(p peer.ID) LightPushOption { func WithAutomaticPeerSelection() LightPushOption { return func(params *LightPushParameters) { - p, err := utils.SelectPeer(params.lp.h, string(WakuLightPushProtocolId)) + p, err := utils.SelectPeer(params.lp.h, string(LightPushID_v20beta1)) if err == nil { params.selectedPeer = *p } else { @@ -170,7 +169,7 @@ func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, o return nil, ErrInvalidId } - connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, WakuLightPushProtocolId) + connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1) if err != nil { log.Info("failed to connect to remote peer", err) return nil, err diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index c27e815c..de2619d3 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -30,8 +30,7 @@ import ( var log = logging.Logger("wakustore") -const WakuStoreCodec = "/vac/waku/store/2.0.0-beta3" -const WakuStoreProtocolId = libp2pProtocol.ID(WakuStoreCodec) +const StoreID_v20beta3 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta3") const MaxPageSize = 100 // Maximum number of waku messages in each page var ( @@ -248,7 +247,7 @@ func (store *WakuStore) Start(ctx context.Context, h host.Host, peerChan chan *e return } - store.h.SetStreamHandlerMatch(WakuStoreProtocolId, protocol.PrefixTextMatch(WakuStoreCodec), store.onRequest) + store.h.SetStreamHandlerMatch(StoreID_v20beta3, protocol.PrefixTextMatch(string(StoreID_v20beta3)), store.onRequest) go store.storeIncomingMessages(ctx) @@ -440,7 +439,7 @@ func WithPeer(p peer.ID) HistoryRequestOption { func WithAutomaticPeerSelection() HistoryRequestOption { return func(params *HistoryRequestParameters) { - p, err := utils.SelectPeer(params.s.h, string(WakuStoreProtocolId)) + p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta3)) if err == nil { params.selectedPeer = *p } else { @@ -483,7 +482,7 @@ func DefaultOptions() []HistoryRequestOption { } func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selectedPeer peer.ID, requestId []byte) (*pb.HistoryResponse, error) { - connOpt, err := store.h.NewStream(ctx, selectedPeer, WakuStoreProtocolId) + connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta3) if err != nil { log.Info("failed to connect to remote peer", err) return nil, err @@ -615,7 +614,7 @@ func (store *WakuStore) Resume(pubsubTopic string, peerList []peer.ID) (int, err return -1, ErrFailedToResumeHistory } } else { - p, err := utils.SelectPeer(store.h, string(WakuStoreProtocolId)) + p, err := utils.SelectPeer(store.h, string(StoreID_v20beta3)) if err != nil { log.Info("Error selecting peer: ", err)