diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 1ebc98d0..8134a2a6 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -42,6 +42,7 @@ type PeerStats map[peer.ID][]string type ConnStatus struct { IsOnline bool HasHistory bool + Peers PeerStats } type WakuNode struct { @@ -70,7 +71,9 @@ type WakuNode struct { quit chan struct{} // Map of peers and their supported protocols - peers PeerStats + peers PeerStats + peersMutex sync.Mutex + // Internal protocol implementations that wish // to listen to peer added/removed events (e.g. Filter) peerListeners []chan *event.EvtPeerConnectednessChanged @@ -81,8 +84,11 @@ type WakuNode struct { } func (w *WakuNode) handleConnectednessChanged(ev event.EvtPeerConnectednessChanged) { - log.Info("### EvtPeerConnectednessChanged ", w.Host().ID(), " to ", ev.Peer, " : ", ev.Connectedness) + + w.peersMutex.Lock() + defer w.peersMutex.Unlock() + if ev.Connectedness == network.Connected { _, ok := w.peers[ev.Peer] if !ok { @@ -108,6 +114,10 @@ func (w *WakuNode) handleConnectednessChanged(ev event.EvtPeerConnectednessChang } func (w *WakuNode) handleProtocolsUpdated(ev event.EvtPeerProtocolsUpdated) { log.Info("### EvtPeerProtocolsUpdated ", w.Host().ID(), " to ", ev.Peer, " added: ", ev.Added, ", removed: ", ev.Removed) + + w.peersMutex.Lock() + defer w.peersMutex.Unlock() + _, ok := w.peers[ev.Peer] if ok { peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer) @@ -117,8 +127,11 @@ func (w *WakuNode) handleProtocolsUpdated(ev event.EvtPeerProtocolsUpdated) { } func (w *WakuNode) handlePeerIdentificationCompleted(ev event.EvtPeerIdentificationCompleted) { - log.Info("### EvtPeerIdentificationCompleted ", w.Host().ID(), " to ", ev.Peer) + + w.peersMutex.Lock() + defer w.peersMutex.Unlock() + peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer) log.Info("identified protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols) _, ok := w.peers[ev.Peer] @@ -135,13 +148,13 @@ func (w *WakuNode) processHostEvent(e interface{}) { } isOnline := w.IsOnline() hasHistory := w.HasHistory() - switch e.(type) { + switch e := e.(type) { case event.EvtPeerConnectednessChanged: - w.handleConnectednessChanged(e.(event.EvtPeerConnectednessChanged)) + w.handleConnectednessChanged(e) case event.EvtPeerProtocolsUpdated: - w.handleProtocolsUpdated(e.(event.EvtPeerProtocolsUpdated)) + w.handleProtocolsUpdated(e) case event.EvtPeerIdentificationCompleted: - w.handlePeerIdentificationCompleted(e.(event.EvtPeerIdentificationCompleted)) + w.handlePeerIdentificationCompleted(e) } log.Info("###processHostEvent before isOnline()") @@ -152,8 +165,17 @@ func (w *WakuNode) processHostEvent(e interface{}) { hasHistory, "/", newHasHistory) if w.connStatusChan != nil && (isOnline != newIsOnline || hasHistory != newHasHistory) { - log.Info("New ConnStatus: ", ConnStatus{IsOnline: newIsOnline, HasHistory: newHasHistory}) - w.connStatusChan <- ConnStatus{IsOnline: newIsOnline, HasHistory: newHasHistory} + + // Creating a copy of the current peers map + w.peersMutex.Lock() + p := make(PeerStats) + for k, v := range w.peers { + p[k] = v + } + w.peersMutex.Unlock() + + log.Info("New ConnStatus: ", ConnStatus{IsOnline: newIsOnline, HasHistory: newHasHistory, Peers: p}) + w.connStatusChan <- ConnStatus{IsOnline: newIsOnline, HasHistory: newHasHistory, Peers: p} } }