diff --git a/go.mod b/go.mod index 40723d83a..38c587fb3 100644 --- a/go.mod +++ b/go.mod @@ -47,7 +47,7 @@ require ( github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a github.com/status-im/doubleratchet v3.0.0+incompatible - github.com/status-im/go-waku v0.0.0-20210729163508-c9d3334f2d0e + github.com/status-im/go-waku v0.0.0-20210918141919-49f216d40c4a github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a github.com/status-im/markdown v0.0.0-20201022101546-c0cbdd5763bf github.com/status-im/migrate/v4 v4.6.2-status.2 diff --git a/go.sum b/go.sum index 83a06242e..678b8db7d 100644 --- a/go.sum +++ b/go.sum @@ -1102,8 +1102,8 @@ github.com/status-im/go-ethereum v1.10.4-status.2 h1:uvcD2U7skYqPQviARFb4w3wZyFS github.com/status-im/go-ethereum v1.10.4-status.2/go.mod h1:GvIhpdCOgMHI6i5xVPEZOrv/qSMeOFHbZh77AoyZUoE= github.com/status-im/go-multiaddr-ethv4 v1.2.0 h1:OT84UsUzTCwguqCpJqkrCMiL4VZ1SvUtH9a5MsZupBk= github.com/status-im/go-multiaddr-ethv4 v1.2.0/go.mod h1:2VQ3C+9zEurcceasz12gPAtmEzCeyLUGPeKLSXYQKHo= -github.com/status-im/go-waku v0.0.0-20210729163508-c9d3334f2d0e h1:WlCBk7mwXa8tVIOQedx0whIXk9xT4lIKv3UWWABGWpI= -github.com/status-im/go-waku v0.0.0-20210729163508-c9d3334f2d0e/go.mod h1:plJBn9iDLcklnq2KKf/bBeuRNsKck8QMdr3yoJnh3hM= +github.com/status-im/go-waku v0.0.0-20210918141919-49f216d40c4a h1:PQ/S9OaV3I+peTU0YC7q8/AImudIKfuLNDfMzf+w8DY= +github.com/status-im/go-waku v0.0.0-20210918141919-49f216d40c4a/go.mod h1:XK6wGIMnxhpx9SQLDV9Qw0zfXTjd8jjw6DXGC0mKvA8= github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a h1:eCna/q/PuZVqtmOMBqytw9yzZwMNKpao4au0OJDvesI= github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= diff --git a/params/config.go b/params/config.go index 0eafd5d37..d2399f870 100644 --- a/params/config.go +++ b/params/config.go @@ -222,6 +222,9 @@ type WakuV2Config struct { // EnableConfirmations when true, instructs that confirmation should be sent for received messages EnableConfirmations bool + + // A name->libp2p_addr map for Wakuv2 custom nodes + CustomNodes map[string]string } // ---------- diff --git a/signal/events_messenger.go b/signal/events_messenger.go index 72dffa5a9..804bb5428 100644 --- a/signal/events_messenger.go +++ b/signal/events_messenger.go @@ -1,7 +1,5 @@ package signal -import "github.com/status-im/status-go/protocol/communities" - const ( // EventMesssageDelivered triggered when we got acknowledge from datasync level, that means peer got message @@ -32,6 +30,6 @@ func SendMessageDelivered(chatID string, messageID string) { } // SendMessageDelivered notifies about delivered message -func SendCommunityInfoFound(community *communities.Community) { +func SendCommunityInfoFound(community interface{}) { send(EventCommunityInfoFound, community) } diff --git a/signal/events_wakuv2.go b/signal/events_wakuv2.go new file mode 100644 index 000000000..dfbdcfd8c --- /dev/null +++ b/signal/events_wakuv2.go @@ -0,0 +1,12 @@ +package signal + +const ( + // EventPeerStats is sent when peer is added or removed. + // it will be a map with capability=peer count k/v's. + EventPeerStats = "wakuv2.peerstats" +) + +// SendPeerStats sends discovery.summary signal. +func SendPeerStats(peerStats interface{}) { + send(EventPeerStats, peerStats) +} diff --git a/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go index 3572cbe7f..9618c4813 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/node/wakunode2.go @@ -4,17 +4,23 @@ import ( "context" "errors" "fmt" + + //"log/syslog" + //"strconv" "sync" "time" proto "github.com/golang/protobuf/proto" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/p2p/protocol/identify" + "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" - peerstore "github.com/libp2p/go-libp2p-peerstore" + "github.com/libp2p/go-libp2p-core/peerstore" + p2pproto "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p/p2p/protocol/ping" ma "github.com/multiformats/go-multiaddr" "go.opencensus.io/stats" @@ -32,6 +38,8 @@ import ( var log = logging.Logger("wakunode") +//var logwriter, _ = syslog.New(syslog.LOG_ERR|syslog.LOG_LOCAL0, "WAKU") + type Message []byte // A map of peer IDs to supported protocols @@ -40,11 +48,13 @@ type PeerStats map[peer.ID][]string type ConnStatus struct { IsOnline bool HasHistory bool + Peers PeerStats } type WakuNode struct { - host host.Host - opts *WakuNodeParameters + host host.Host + idService *identify.IDService + opts *WakuNodeParameters relay *relay.WakuRelay filter *filter.WakuFilter @@ -68,85 +78,118 @@ type WakuNode struct { quit chan struct{} // Map of peers and their supported protocols - peers PeerStats + peers PeerStats + peersMutex sync.Mutex + // Internal protocol implementations that wish // to listen to peer added/removed events (e.g. Filter) peerListeners []chan *event.EvtPeerConnectednessChanged // Channel passed to WakuNode constructor // receiving connection status notifications connStatusChan chan ConnStatus + pingEventsChan chan interface{} } +func (w *WakuNode) handleConnectednessChanged(ev event.EvtPeerConnectednessChanged) { + log.Debug("### EvtPeerConnectednessChanged ", w.Host().ID(), " to ", ev.Peer, " : ", ev.Connectedness) + + w.peersMutex.Lock() + defer w.peersMutex.Unlock() + + if ev.Connectedness == network.Connected { + _, ok := w.peers[ev.Peer] + if !ok { + peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer) + log.Debug("protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols) + w.peers[ev.Peer] = peerProtocols + } else { + log.Debug("### Peer already exists") + } + } else if ev.Connectedness == network.NotConnected { + log.Debug("Peer down: ", ev.Peer) + delete(w.peers, ev.Peer) + // for _, pl := range w.peerListeners { + // pl <- &ev + // } + // TODO + // There seems to be no proper way to + // remove a dropped peer from Host's Peerstore + // https://github.com/libp2p/go-libp2p-host/issues/13 + //w.Host().Network().ClosePeer(ev.Peer) + } + +} +func (w *WakuNode) handleProtocolsUpdated(ev event.EvtPeerProtocolsUpdated) { + log.Debug("### EvtPeerProtocolsUpdated ", w.Host().ID(), " to ", ev.Peer, " added: ", ev.Added, ", removed: ", ev.Removed) + + w.peersMutex.Lock() + defer w.peersMutex.Unlock() + + _, ok := w.peers[ev.Peer] + if ok { + peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer) + log.Debug("updated protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols) + w.peers[ev.Peer] = peerProtocols + } + +} +func (w *WakuNode) handlePeerIdentificationCompleted(ev event.EvtPeerIdentificationCompleted) { + log.Debug("### EvtPeerIdentificationCompleted ", w.Host().ID(), " to ", ev.Peer) + + w.peersMutex.Lock() + defer w.peersMutex.Unlock() + + peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer) + log.Debug("identified protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols) + w.peers[ev.Peer] = peerProtocols +} +func (w *WakuNode) processHostEvent(e interface{}) { + if e == nil { + log.Debug("processHostEvent nil event") + return + } + isOnline := w.IsOnline() + hasHistory := w.HasHistory() + switch e := e.(type) { + case event.EvtPeerConnectednessChanged: + w.handleConnectednessChanged(e) + case event.EvtPeerProtocolsUpdated: + w.handleProtocolsUpdated(e) + case event.EvtPeerIdentificationCompleted: + w.handlePeerIdentificationCompleted(e) + } + + log.Debug("###processHostEvent before isOnline()") + newIsOnline := w.IsOnline() + log.Debug("###processHostEvent before hasHistory()") + newHasHistory := w.HasHistory() + log.Debug("###ConnStatus isOnline: ", isOnline, "/", newIsOnline, " hasHistory: ", + hasHistory, "/", newHasHistory) + if w.connStatusChan != nil { + connStatus := ConnStatus{IsOnline: newIsOnline, HasHistory: newHasHistory, Peers: w.Peers()} + log.Debug("New ConnStatus: ", connStatus) + w.connStatusChan <- connStatus + } + +} func (w *WakuNode) connectednessListener() { for { - isOnline := w.IsOnline() - hasHistory := w.HasHistory() - + var e interface{} + log.Debug("connectednessListener before select") select { - case e := <-w.connectednessEventSub.Out(): - if e == nil { - break - } - ev := e.(event.EvtPeerConnectednessChanged) - - log.Info("### EvtPeerConnectednessChanged ", w.Host().ID(), " to ", ev.Peer, " : ", ev.Connectedness) - if ev.Connectedness == network.Connected { - _, ok := w.peers[ev.Peer] - if !ok { - peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer) - log.Info("protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols) - w.peers[ev.Peer] = peerProtocols - } else { - log.Info("### Peer already exists") - } - } else if ev.Connectedness == network.NotConnected { - log.Info("Peer down: ", ev.Peer) - delete(w.peers, ev.Peer) - for _, pl := range w.peerListeners { - pl <- &ev - } - // TODO - // There seems to be no proper way to - // remove a dropped peer from Host's Peerstore - // https://github.com/libp2p/go-libp2p-host/issues/13 - //w.Host().Network().ClosePeer(ev.Peer) - } - case e := <-w.protocolEventSub.Out(): - if e == nil { - break - } - ev := e.(event.EvtPeerProtocolsUpdated) - - log.Info("### EvtPeerProtocolsUpdated ", w.Host().ID(), " to ", ev.Peer, " added: ", ev.Added, ", removed: ", ev.Removed) - _, ok := w.peers[ev.Peer] - if ok { - peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer) - log.Info("updated protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols) - w.peers[ev.Peer] = peerProtocols - } - - case e := <-w.identificationEventSub.Out(): - if e == nil { - break - } - ev := e.(event.EvtPeerIdentificationCompleted) - - log.Info("### EvtPeerIdentificationCompleted ", w.Host().ID(), " to ", ev.Peer) - peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer) - log.Info("identified protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols) - _, ok := w.peers[ev.Peer] - if ok { - peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer) - w.peers[ev.Peer] = peerProtocols - } - - } - newIsOnline := w.IsOnline() - newHasHistory := w.HasHistory() - if w.connStatusChan != nil && - (isOnline != newIsOnline || hasHistory != newHasHistory) { - w.connStatusChan <- ConnStatus{IsOnline: newIsOnline, HasHistory: newHasHistory} + case e = <-w.connectednessEventSub.Out(): + log.Debug("connectednessListener connectednessEvent") + case e = <-w.protocolEventSub.Out(): + log.Debug("connectednessListener protocolEvent") + case e = <-w.identificationEventSub.Out(): + log.Debug("connectednessListener identificationEvent") + case e = <-w.pingEventsChan: + log.Debug("connectednessListener pingEvent") } + log.Debug("connectednessListener after select") + + w.processHostEvent(e) + log.Debug("connectednessListener after processHostEvent") } } @@ -181,6 +224,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { w := new(WakuNode) w.bcaster = NewBroadcaster(1024) w.host = host + w.idService = identify.NewIDService(host) w.cancel = cancel w.ctx = ctx w.subscriptions = make(map[relay.Topic][]*Subscription) @@ -189,8 +233,6 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { w.peers = make(PeerStats) // Subscribe to Connectedness events - log.Info("### host.ID(): ", host.ID()) - connectednessEventSub, _ := host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged)) w.connectednessEventSub = connectednessEventSub @@ -203,6 +245,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { if params.connStatusChan != nil { w.connStatusChan = params.connStatusChan } + w.pingEventsChan = make(chan interface{}) go w.connectednessListener() if params.enableStore { @@ -357,85 +400,49 @@ func (w *WakuNode) mountLightPush() { w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay) } -func (w *WakuNode) AddPeer(p peer.ID, addrs []ma.Multiaddr, protocolId string) error { - log.Info("AddPeer: ", protocolId) +func (w *WakuNode) AddPeer(info *peer.AddrInfo, protocolId string) error { + log.Info(fmt.Sprintf("adding peer %s with protocol %s", info.ID.Pretty(), protocolId)) - for _, addr := range addrs { - w.host.Peerstore().AddAddr(p, addr, peerstore.PermanentAddrTTL) - } - err := w.host.Peerstore().AddProtocols(p, protocolId) + w.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) - if err != nil { - return err - } - - return nil + return w.host.Peerstore().AddProtocols(info.ID, protocolId) } func (w *WakuNode) startStore() { peerChan := make(chan *event.EvtPeerConnectednessChanged) w.opts.store.Start(w.ctx, w.host, peerChan) w.peerListeners = append(w.peerListeners, peerChan) - w.opts.store.Resume(string(relay.GetTopic(nil)), nil) + + if w.opts.shouldResume { + if _, err := w.opts.store.Resume(string(relay.GetTopic(nil)), nil); err != nil { + log.Error("failed to resume", err) + } + } +} + +func (w *WakuNode) addPeerWithProtocol(address string, proto p2pproto.ID) (*peer.ID, error) { + info, err := addrInfoFromMultiaddrString(address) + if err != nil { + return nil, err + } + + return &info.ID, w.AddPeer(info, string(proto)) } func (w *WakuNode) AddStorePeer(address string) (*peer.ID, error) { - if w.opts.store == nil { - return nil, errors.New("WakuStore is not set") - } - - storePeer, err := ma.NewMultiaddr(address) - if err != nil { - return nil, err - } - - // Extract the peer ID from the multiaddr. - info, err := peer.AddrInfoFromP2pAddr(storePeer) - if err != nil { - return nil, err - } - - return &info.ID, w.AddPeer(info.ID, info.Addrs, string(store.WakuStoreProtocolId)) + return w.addPeerWithProtocol(address, store.WakuStoreProtocolId) +} + +func (w *WakuNode) AddRelayPeer(address string) (*peer.ID, error) { + return w.addPeerWithProtocol(address, wakurelay.WakuRelayID_v200) } -// TODO Remove code duplication func (w *WakuNode) AddFilterPeer(address string) (*peer.ID, error) { - if w.filter == nil { - return nil, errors.New("WakuFilter is not set") - } - - filterPeer, err := ma.NewMultiaddr(address) - if err != nil { - return nil, err - } - - // Extract the peer ID from the multiaddr. - info, err := peer.AddrInfoFromP2pAddr(filterPeer) - if err != nil { - return nil, err - } - - return &info.ID, w.AddPeer(info.ID, info.Addrs, string(filter.WakuFilterProtocolId)) + return w.addPeerWithProtocol(address, filter.WakuFilterProtocolId) } -// TODO Remove code duplication func (w *WakuNode) AddLightPushPeer(address string) (*peer.ID, error) { - if w.filter == nil { - return nil, errors.New("WakuFilter is not set") - } - - lightPushPeer, err := ma.NewMultiaddr(address) - if err != nil { - return nil, err - } - - // Extract the peer ID from the multiaddr. - info, err := peer.AddrInfoFromP2pAddr(lightPushPeer) - if err != nil { - return nil, err - } - - return &info.ID, w.AddPeer(info.ID, info.Addrs, string(lightpush.WakuLightPushProtocolId)) + return w.addPeerWithProtocol(address, lightpush.WakuLightPushProtocolId) } func (w *WakuNode) Query(ctx context.Context, contentTopics []string, startTime float64, endTime float64, opts ...store.HistoryRequestOption) (*pb.HistoryResponse, error) { @@ -687,13 +694,31 @@ func (w *WakuNode) DialPeer(address string) error { return err } - // Extract the peer ID from the multiaddr. info, err := peer.AddrInfoFromP2pAddr(p) if err != nil { return err } - return w.host.Connect(w.ctx, *info) + return w.connect(*info) +} + +func (w *WakuNode) connect(info peer.AddrInfo) error { + err := w.host.Connect(w.ctx, info) + if err != nil { + return err + } + + w.processHostEvent(event.EvtPeerConnectednessChanged{ + Peer: info.ID, + Connectedness: network.Connected, + }) + + return nil +} + +func (w *WakuNode) DialPeerByID(peerID peer.ID) error { + info := w.host.Peerstore().PeerInfo(peerID) + return w.connect(info) } func (w *WakuNode) ClosePeerByAddress(address string) error { @@ -712,25 +737,118 @@ func (w *WakuNode) ClosePeerByAddress(address string) error { } func (w *WakuNode) ClosePeerById(id peer.ID) error { - return w.host.Network().ClosePeer(id) + err := w.host.Network().ClosePeer(id) + if err != nil { + return err + } + + w.processHostEvent(event.EvtPeerConnectednessChanged{ + Peer: id, + Connectedness: network.NotConnected, + }) + + return nil } func (w *WakuNode) PeerCount() int { - return len(w.host.Network().Peers()) + return len(w.peers) +} + +func (w *WakuNode) Peers() PeerStats { + w.peersMutex.Lock() + defer w.peersMutex.Unlock() + p := make(PeerStats) + for k, v := range w.peers { + p[k] = v + } + + return p } func (w *WakuNode) startKeepAlive(t time.Duration) { - log.Info("Setting up ping protocol with duration of", t) + + log.Info("Setting up ping protocol with duration of ", t) w.ping = ping.NewPingService(w.host) ticker := time.NewTicker(t) go func() { + + // This map contains peers that we're + // waiting for the ping response from + peerMap := make(map[peer.ID]<-chan ping.Result) + var mu sync.Mutex for { select { case <-ticker.C: - for _, peer := range w.host.Network().Peers() { - log.Info("Pinging", peer) - w.ping.Ping(w.ctx, peer) + for _, p := range w.host.Peerstore().Peers() { + if p == w.host.ID() { + log.Info("###PING skip ", p) + continue + } + mu.Lock() + _, ok := peerMap[p] + mu.Unlock() + + var s = p.Pretty() + s = s[len(s)-4:] + if !ok { + log.Info("###PING ", s) + result := w.ping.Ping(w.ctx, p) + mu.Lock() + peerMap[p] = result + mu.Unlock() + + go func(peer peer.ID) { + peerFound := false + for p := range w.peers { + if p == peer { + peerFound = true + break + } + } + + //log.Info("###PING " + s + " before fetching result") + //logwriter.Write([]byte("###PING " + s + " before fetching result")) + pingTicker := time.NewTicker(time.Duration(1) * time.Second) + isError := false + select { + case resVal := <-result: + isError = resVal.Error != nil + case <-pingTicker.C: + isError = true + } + pingTicker.Stop() + if !peerFound && !isError { + //EventBus Emitter doesn't seem to work when there's no connection + w.pingEventsChan <- event.EvtPeerConnectednessChanged{ + Peer: peer, + Connectedness: network.Connected, + } + peerConns := w.host.Network().ConnsToPeer(peer) + if len(peerConns) > 0 { + // log.Info("###PING " + s + " IdentifyWait") + // logwriter.Write([]byte("###PING " + s + " IdentifyWait")) + //w.idService.IdentifyWait(peerConns[0]) + } else { + w.DialPeerByID(peer) + } + } else if peerFound && isError { + // log.Info("###PING " + s + " peer removed") + // logwriter.Write([]byte("###PING " + s + " peer removed")) + w.pingEventsChan <- event.EvtPeerConnectednessChanged{ + Peer: peer, + Connectedness: network.NotConnected, + } + } + + mu.Lock() + delete(peerMap, peer) + mu.Unlock() + }(p) + } else { + log.Info("###PING " + s + " already pinged") + // logwriter.Write([]byte("###PING " + s + " already pinged")) + } } case <-w.quit: ticker.Stop() @@ -738,5 +856,13 @@ func (w *WakuNode) startKeepAlive(t time.Duration) { } } }() - +} + +func addrInfoFromMultiaddrString(address string) (*peer.AddrInfo, error) { + ma, err := ma.NewMultiaddr(address) + if err != nil { + return nil, err + } + + return peer.AddrInfoFromP2pAddr(ma) } diff --git a/vendor/github.com/status-im/go-waku/waku/v2/node/wakuoptions.go b/vendor/github.com/status-im/go-waku/waku/v2/node/wakuoptions.go index 2ff140211..3dbd9c894 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/node/wakuoptions.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/node/wakuoptions.go @@ -26,9 +26,10 @@ type WakuNodeParameters struct { enableFilter bool wOpts []wakurelay.Option - enableStore bool - storeMsgs bool - store *store.WakuStore + enableStore bool + shouldResume bool + storeMsgs bool + store *store.WakuStore // filter *filter.WakuFilter keepAliveInterval time.Duration @@ -107,11 +108,12 @@ func WithWakuFilter(opts ...wakurelay.Option) WakuNodeOption { // WithWakuStore enables the Waku V2 Store protocol and if the messages should // be stored or not in a message provider -func WithWakuStore(shouldStoreMessages bool) WakuNodeOption { +func WithWakuStore(shouldStoreMessages bool, shouldResume bool) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableStore = true params.storeMsgs = shouldStoreMessages params.store = store.NewWakuStore(shouldStoreMessages, nil) + params.shouldResume = shouldResume return nil } } diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go index 95c6481a8..c9be11b62 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/filter/waku_filter.go @@ -59,12 +59,6 @@ const WakuFilterCodec = "/vac/waku/filter/2.0.0-beta1" const WakuFilterProtocolId = libp2pProtocol.ID(WakuFilterCodec) -// Error types (metric label values) -const ( - dialFailure = "dial_failure" - decodeRpcFailure = "decode_rpc_failure" -) - func (filters *Filters) Notify(msg *pb.WakuMessage, requestId string) { for key, filter := range *filters { envelope := protocol.NewEnvelope(msg, filter.Topic) @@ -252,7 +246,9 @@ func (wf *WakuFilter) FilterListener() { } for m := range wf.MsgC { - handle(m) + if err := handle(m); err != nil { + log.Error("failed to handle message", err) + } } } @@ -273,6 +269,10 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, request pb.FilterRequest) ( filterRPC := &pb.FilterRPC{RequestId: hex.EncodeToString(id), Request: &request} log.Info("Sending filterRPC: ", filterRPC) err = writer.WriteMsg(filterRPC) + if err != nil { + log.Error("failed to write message", err) + return "", err + } return string(id), nil } else { // @TODO more sophisticated error handling here @@ -300,6 +300,9 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, request pb.FilterRequest) writer := protoio.NewDelimitedWriter(conn) filterRPC := &pb.FilterRPC{RequestId: hex.EncodeToString(id), Request: &request} err = writer.WriteMsg(filterRPC) + if err != nil { + log.Error("failed to write message", err) + } //return some(id) } else { // @TODO more sophisticated error handling here diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go index 83d1719a5..ebbfe4b20 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go @@ -177,7 +177,12 @@ func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, o } defer connOpt.Close() - defer connOpt.Reset() + defer func() { + err := connOpt.Reset() + if err != nil { + log.Error("failed to reset connection", err) + } + }() pushRequestRPC := &pb.PushRPC{RequestId: hex.EncodeToString(params.requestId), Query: req} diff --git a/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go b/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go index bf60ae1db..c27e815c6 100644 --- a/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go +++ b/vendor/github.com/status-im/go-waku/waku/v2/protocol/store/waku_store.go @@ -260,7 +260,10 @@ func (store *WakuStore) Start(ctx context.Context, h host.Host, peerChan chan *e storedMessages, err := store.msgProvider.GetAll() if err != nil { log.Error("could not load DBProvider messages", err) - stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "store_load_failure")}, metrics.Errors.M(1)) + err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "store_load_failure")}, metrics.Errors.M(1)) + if err != nil { + log.Error("failed to record with tags") + } return } @@ -272,7 +275,9 @@ func (store *WakuStore) Start(ctx context.Context, h host.Host, peerChan chan *e store.storeMessageWithIndex(storedMessage.PubsubTopic, idx, storedMessage.Message) - stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "stored")}, metrics.StoreMessages.M(int64(len(store.messages)))) + if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "stored")}, metrics.StoreMessages.M(int64(len(store.messages)))); err != nil { + log.Error("failed to record with tags") + } } go store.peerListener() @@ -312,11 +317,16 @@ func (store *WakuStore) storeMessage(pubSubTopic string, msg *pb.WakuMessage) { if err != nil { log.Error("could not store message", err) - stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "store_failure")}, metrics.Errors.M(1)) + if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "store_failure")}, metrics.Errors.M(1)); err != nil { + log.Error("failed to record with tags", err) + } return } - stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "stored")}, metrics.StoreMessages.M(int64(len(store.messages)))) + if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "stored")}, metrics.StoreMessages.M(int64(len(store.messages)))); err != nil { + log.Error("failed to record with tags", err) + } + } func (store *WakuStore) storeIncomingMessages(ctx context.Context) { @@ -336,7 +346,9 @@ func (store *WakuStore) onRequest(s network.Stream) { err := reader.ReadMsg(historyRPCRequest) if err != nil { log.Error("error reading request", err) - stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "decodeRPCFailure")}, metrics.Errors.M(1)) + if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "decodeRPCFailure")}, metrics.Errors.M(1)); err != nil { + log.Error("failed to record with tags", err) + } return } @@ -497,11 +509,15 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec err = reader.ReadMsg(historyResponseRPC) if err != nil { log.Error("could not read response", err) - stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "decodeRPCFailure")}, metrics.Errors.M(1)) + if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyStoreErrorType, "decodeRPCFailure")}, metrics.Errors.M(1)); err != nil { + log.Error("failed to record with tags") + } return nil, err } - stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "retrieved")}, metrics.StoreMessages.M(int64(len(store.messages)))) + if err := stats.RecordWithTags(store.ctx, []tag.Mutator{tag.Insert(metrics.KeyType, "retrieved")}, metrics.StoreMessages.M(int64(len(store.messages)))); err != nil { + log.Error("failed to record with tags", err) + } return historyResponseRPC.Response, nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index e35f5b246..b4139c397 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -426,7 +426,7 @@ github.com/spacemonkeygo/spacelog github.com/status-im/doubleratchet # github.com/status-im/go-multiaddr-ethv4 v1.2.0 github.com/status-im/go-multiaddr-ethv4 -# github.com/status-im/go-waku v0.0.0-20210729163508-c9d3334f2d0e +# github.com/status-im/go-waku v0.0.0-20210918141919-49f216d40c4a github.com/status-im/go-waku/waku/v2/metrics github.com/status-im/go-waku/waku/v2/node github.com/status-im/go-waku/waku/v2/protocol diff --git a/wakuv2/waku.go b/wakuv2/waku.go index b844caad7..13f70e500 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -50,6 +50,7 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/status-im/status-go/eth-node/types" + "github.com/status-im/status-go/signal" "github.com/status-im/status-go/wakuv2/common" node "github.com/status-im/go-waku/waku/v2/node" @@ -145,6 +146,9 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger) (*Waku, error) { return nil, fmt.Errorf("failed to setup the network interface: %v", err) } + connStatusChan := make(chan node.ConnStatus) + + keepAliveInt := 1 waku.node, err = node.New(context.Background(), node.WithLibP2POptions( libp2p.BandwidthReporter(waku.bandwidthCounter), @@ -152,9 +156,21 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger) (*Waku, error) { node.WithPrivateKey(privateKey), node.WithHostAddress([]net.Addr{hostAddr}), node.WithWakuRelay(wakurelay.WithMaxMessageSize(int(waku.settings.MaxMsgSize))), - node.WithWakuStore(false), // Mounts the store protocol (without storing the messages) + node.WithWakuStore(false, false), // Mounts the store protocol (without storing the messages) + node.WithConnStatusChan(connStatusChan), + node.WithKeepAlive(time.Duration(keepAliveInt)*time.Second), ) + go func() { + for { + select { + case <-waku.quit: + return + case c := <-connStatusChan: + signal.SendPeerStats(c) + } + } + }() if err != nil { fmt.Println(err) return nil, fmt.Errorf("failed to start the go-waku node: %v", err)