fix: check for messages first on filter in case a non null request is being sent (#50)

* fix: check for messages first on filter in case a non null request is being sent
* fix: clean up logs
* fix: peer lock
This commit is contained in:
RichΛrd 2021-09-27 08:47:18 -04:00 committed by GitHub
parent 67ac969a65
commit 6c4a74fb9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 37 additions and 30 deletions

View File

@ -307,11 +307,10 @@ func (w *WakuNode) ID() string {
return w.host.ID().Pretty()
}
func (w *WakuNode) GetPeerStats() PeerStats {
return w.peers
}
func (w *WakuNode) IsOnline() bool {
w.peersMutex.Lock()
defer w.peersMutex.Unlock()
hasRelay := false
hasLightPush := false
hasStore := false
@ -340,6 +339,9 @@ func (w *WakuNode) IsOnline() bool {
}
func (w *WakuNode) HasHistory() bool {
w.peersMutex.Lock()
defer w.peersMutex.Unlock()
for _, v := range w.peers {
for _, protocol := range v {
if protocol == string(store.WakuStoreProtocolId) {
@ -751,6 +753,8 @@ func (w *WakuNode) ClosePeerById(id peer.ID) error {
}
func (w *WakuNode) PeerCount() int {
w.peersMutex.Lock()
defer w.peersMutex.Unlock()
return len(w.peers)
}
@ -800,15 +804,16 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
go func(peer peer.ID) {
peerFound := false
w.peersMutex.Lock()
for p := range w.peers {
if p == peer {
peerFound = true
break
}
}
defer w.peersMutex.Unlock()
log.Debug("###PING before fetching result")
//log.Info("###PING " + s + " before fetching result")
//logwriter.Write([]byte("###PING " + s + " before fetching result"))
pingTicker := time.NewTicker(time.Duration(1) * time.Second)
isError := false
select {

View File

@ -96,12 +96,10 @@ func WithWakuRelay(opts ...wakurelay.Option) WakuNodeOption {
}
}
// WithWakuFilter enables the Waku V2 Filter protocol. This WakuNodeOption
// accepts a list of WakuFilter gossipsub options to setup the protocol
func WithWakuFilter(opts ...wakurelay.Option) WakuNodeOption {
// WithWakuFilter enables the Waku V2 Filter protocol.
func WithWakuFilter() WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableFilter = true
params.wOpts = opts
return nil
}
}

View File

@ -95,22 +95,28 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
return
}
log.Info(fmt.Sprintf("%s: Received query from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer()))
log.Info(fmt.Sprintf("%s: received request from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer()))
stats.Record(wf.ctx, metrics.Messages.M(1))
if filterRPCRequest.Request != nil {
if filterRPCRequest.Push != nil && len(filterRPCRequest.Push.Messages) > 0 {
// We're on a light node.
// This is a message push coming from a full node.
log.Info("filter light node, received a message push. ", len(filterRPCRequest.Push.Messages), " messages")
wf.pushHandler(filterRPCRequest.RequestId, *filterRPCRequest.Push)
} else if filterRPCRequest.Request != nil {
// We're on a full node.
// This is a filter request coming from a light node.
if filterRPCRequest.Request.Subscribe {
subscriber := Subscriber{peer: string(s.Conn().RemotePeer()), requestId: filterRPCRequest.RequestId, filter: *filterRPCRequest.Request}
wf.subscribers = append(wf.subscribers, subscriber)
log.Info("Full node, add a filter subscriber ", subscriber)
log.Info("filter full node, add a filter subscriber: ", subscriber.peer)
stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers))))
} else {
peerId := string(s.Conn().RemotePeer())
log.Info("Full node, remove a filter subscriber ", peerId)
log.Info("filter full node, remove a filter subscriber: ", peerId)
contentFilters := filterRPCRequest.Request.ContentFilters
var peerIdsToRemove []string
for _, subscriber := range wf.subscribers {
@ -151,20 +157,13 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers))))
}
} else if filterRPCRequest.Push != nil {
// We're on a light node.
// This is a message push coming from a full node.
log.Info("Light node, received a message push ", *filterRPCRequest.Push)
wf.pushHandler(filterRPCRequest.RequestId, *filterRPCRequest.Push)
}
}
func (wf *WakuFilter) peerListener() {
for e := range wf.peerChan {
if e.Connectedness == network.NotConnected {
log.Info("Filter Notification received ", e.Peer)
log.Info("filter Notification received ", e.Peer)
i := 0
// Delete subscribers matching deleted peer
for _, s := range wf.subscribers {
@ -174,7 +173,7 @@ func (wf *WakuFilter) peerListener() {
}
}
log.Info("Filter, deleted subscribers: ", len(wf.subscribers)-i)
log.Info("filter, deleted subscribers: ", len(wf.subscribers)-i)
wf.subscribers = wf.subscribers[:i]
}
}
@ -217,20 +216,22 @@ func (wf *WakuFilter) FilterListener() {
for _, filter := range subscriber.filter.ContentFilters {
if msg.ContentTopic == filter.ContentTopic {
log.Info("Found matching contentTopic ", filter, msg)
log.Info("found matching contentTopic ", filter, msg)
msgArr := []*pb.WakuMessage{msg}
// Do a message push to light node
pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: msgArr}}
log.Info("Pushing a message to light node: ", pushRPC)
log.Info("pushing a message to light node: ", pushRPC)
conn, err := wf.h.NewStream(wf.ctx, peer.ID(subscriber.peer), WakuFilterProtocolId)
if err != nil {
// @TODO more sophisticated error handling here
log.Error("Failed to open peer stream")
log.Error("failed to open peer stream")
//waku_filter_errors.inc(labelValues = [dialFailure])
return err
}
defer conn.Close()
writer := protoio.NewDelimitedWriter(conn)
err = writer.WriteMsg(pushRPC)
if err != nil {
@ -262,12 +263,14 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, request pb.FilterRequest) (
conn, err := wf.h.NewStream(ctx, *peer, WakuFilterProtocolId)
if conn != nil {
defer conn.Close()
// This is the only successful path to subscription
id := protocol.GenerateRequestId()
writer := protoio.NewDelimitedWriter(conn)
filterRPC := &pb.FilterRPC{RequestId: hex.EncodeToString(id), Request: &request}
log.Info("Sending filterRPC: ", filterRPC)
log.Info("sending filterRPC: ", filterRPC)
err = writer.WriteMsg(filterRPC)
if err != nil {
log.Error("failed to write message", err)
@ -281,7 +284,7 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, request pb.FilterRequest) (
return "", err
}
} else {
log.Info("Error selecting peer: ", err)
log.Info("error selecting peer: ", err)
}
return "", nil
@ -294,6 +297,8 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, request pb.FilterRequest)
conn, err := wf.h.NewStream(ctx, *peer, WakuFilterProtocolId)
if conn != nil {
defer conn.Close()
// This is the only successful path to subscription
id := protocol.GenerateRequestId()
@ -303,7 +308,6 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, request pb.FilterRequest)
if err != nil {
log.Error("failed to write message", err)
}
//return some(id)
} else {
// @TODO more sophisticated error handling here
log.Error("failed to connect to remote peer", err)