refactor: rename protocol variables

This commit is contained in:
Richard Ramos 2021-09-30 11:59:51 -04:00
parent 954f2a0c56
commit 034656b2c0
3 changed files with 16 additions and 20 deletions

View File

@ -55,9 +55,7 @@ type (
// should be direct payload exchange (a la req-resp), not be coupled with the
// relay protocol.
const WakuFilterCodec = "/vac/waku/filter/2.0.0-beta1"
const WakuFilterProtocolId = libp2pProtocol.ID(WakuFilterCodec)
const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1")
func (filters *Filters) Notify(msg *pb.WakuMessage, requestId string) {
for key, filter := range *filters {
@ -192,7 +190,7 @@ func NewWakuFilter(ctx context.Context, host host.Host, handler MessagePushHandl
wf.pushHandler = handler
wf.peerChan = peerChan
wf.h.SetStreamHandlerMatch(WakuFilterProtocolId, protocol.PrefixTextMatch(WakuFilterCodec), wf.onRequest)
wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest)
go wf.FilterListener()
go wf.peerListener()
@ -222,7 +220,7 @@ func (wf *WakuFilter) FilterListener() {
pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: msgArr}}
log.Info("pushing a message to light node: ", pushRPC)
conn, err := wf.h.NewStream(wf.ctx, peer.ID(subscriber.peer), WakuFilterProtocolId)
conn, err := wf.h.NewStream(wf.ctx, peer.ID(subscriber.peer), FilterID_v20beta1)
if err != nil {
// @TODO more sophisticated error handling here
@ -258,9 +256,9 @@ func (wf *WakuFilter) FilterListener() {
// select a peer with filter support, dial it,
// and submit FilterRequest wrapped in FilterRPC
func (wf *WakuFilter) Subscribe(ctx context.Context, request pb.FilterRequest) (string, error) { //.async, gcsafe.} {
peer, err := utils.SelectPeer(wf.h, string(WakuFilterProtocolId))
peer, err := utils.SelectPeer(wf.h, string(FilterID_v20beta1))
if err == nil {
conn, err := wf.h.NewStream(ctx, *peer, WakuFilterProtocolId)
conn, err := wf.h.NewStream(ctx, *peer, FilterID_v20beta1)
if conn != nil {
defer conn.Close()
@ -292,9 +290,9 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, request pb.FilterRequest) (
func (wf *WakuFilter) Unsubscribe(ctx context.Context, request pb.FilterRequest) {
// @TODO: NO REAL REASON TO GENERATE REQUEST ID FOR UNSUBSCRIBE OTHER THAN CREATING SANE-LOOKING RPC.
peer, err := utils.SelectPeer(wf.h, string(WakuFilterProtocolId))
peer, err := utils.SelectPeer(wf.h, string(FilterID_v20beta1))
if err == nil {
conn, err := wf.h.NewStream(ctx, *peer, WakuFilterProtocolId)
conn, err := wf.h.NewStream(ctx, *peer, FilterID_v20beta1)
if conn != nil {
defer conn.Close()

View File

@ -20,8 +20,7 @@ import (
var log = logging.Logger("waku_lightpush")
const WakuLightPushCodec = "/vac/waku/lightpush/2.0.0-beta1"
const WakuLightPushProtocolId = libp2pProtocol.ID(WakuLightPushCodec)
const LightPushID_v20beta1 = libp2pProtocol.ID("/vac/waku/lightpush/2.0.0-beta1")
var (
ErrNoPeersAvailable = errors.New("no suitable remote peers")
@ -40,7 +39,7 @@ func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay)
wakuLP.ctx = ctx
wakuLP.h = h
wakuLP.h.SetStreamHandlerMatch(WakuLightPushProtocolId, protocol.PrefixTextMatch(WakuLightPushCodec), wakuLP.onRequest)
wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest)
log.Info("Light Push protocol started")
return wakuLP
@ -124,7 +123,7 @@ func WithPeer(p peer.ID) LightPushOption {
func WithAutomaticPeerSelection() LightPushOption {
return func(params *LightPushParameters) {
p, err := utils.SelectPeer(params.lp.h, string(WakuLightPushProtocolId))
p, err := utils.SelectPeer(params.lp.h, string(LightPushID_v20beta1))
if err == nil {
params.selectedPeer = *p
} else {
@ -170,7 +169,7 @@ func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, o
return nil, ErrInvalidId
}
connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, WakuLightPushProtocolId)
connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1)
if err != nil {
log.Info("failed to connect to remote peer", err)
return nil, err

View File

@ -30,8 +30,7 @@ import (
var log = logging.Logger("wakustore")
const WakuStoreCodec = "/vac/waku/store/2.0.0-beta3"
const WakuStoreProtocolId = libp2pProtocol.ID(WakuStoreCodec)
const StoreID_v20beta3 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta3")
const MaxPageSize = 100 // Maximum number of waku messages in each page
var (
@ -248,7 +247,7 @@ func (store *WakuStore) Start(ctx context.Context, h host.Host, peerChan chan *e
return
}
store.h.SetStreamHandlerMatch(WakuStoreProtocolId, protocol.PrefixTextMatch(WakuStoreCodec), store.onRequest)
store.h.SetStreamHandlerMatch(StoreID_v20beta3, protocol.PrefixTextMatch(string(StoreID_v20beta3)), store.onRequest)
go store.storeIncomingMessages(ctx)
@ -440,7 +439,7 @@ func WithPeer(p peer.ID) HistoryRequestOption {
func WithAutomaticPeerSelection() HistoryRequestOption {
return func(params *HistoryRequestParameters) {
p, err := utils.SelectPeer(params.s.h, string(WakuStoreProtocolId))
p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta3))
if err == nil {
params.selectedPeer = *p
} else {
@ -483,7 +482,7 @@ func DefaultOptions() []HistoryRequestOption {
}
func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selectedPeer peer.ID, requestId []byte) (*pb.HistoryResponse, error) {
connOpt, err := store.h.NewStream(ctx, selectedPeer, WakuStoreProtocolId)
connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta3)
if err != nil {
log.Info("failed to connect to remote peer", err)
return nil, err
@ -615,7 +614,7 @@ func (store *WakuStore) Resume(pubsubTopic string, peerList []peer.ID) (int, err
return -1, ErrFailedToResumeHistory
}
} else {
p, err := utils.SelectPeer(store.h, string(WakuStoreProtocolId))
p, err := utils.SelectPeer(store.h, string(StoreID_v20beta3))
if err != nil {
log.Info("Error selecting peer: ", err)