add peers to ConnStatus (#45)

* add peers to ConnStatus
* return connected peers and supported protocols
This commit is contained in:
RichΛrd 2021-08-30 12:14:35 -04:00 committed by Vitaliy Vlasov
parent 24c7a8e4c6
commit 53b3d19948
1 changed files with 31 additions and 9 deletions

View File

@ -42,6 +42,7 @@ type PeerStats map[peer.ID][]string
type ConnStatus struct { type ConnStatus struct {
IsOnline bool IsOnline bool
HasHistory bool HasHistory bool
Peers PeerStats
} }
type WakuNode struct { type WakuNode struct {
@ -70,7 +71,9 @@ type WakuNode struct {
quit chan struct{} quit chan struct{}
// Map of peers and their supported protocols // Map of peers and their supported protocols
peers PeerStats peers PeerStats
peersMutex sync.Mutex
// Internal protocol implementations that wish // Internal protocol implementations that wish
// to listen to peer added/removed events (e.g. Filter) // to listen to peer added/removed events (e.g. Filter)
peerListeners []chan *event.EvtPeerConnectednessChanged peerListeners []chan *event.EvtPeerConnectednessChanged
@ -81,8 +84,11 @@ type WakuNode struct {
} }
func (w *WakuNode) handleConnectednessChanged(ev event.EvtPeerConnectednessChanged) { func (w *WakuNode) handleConnectednessChanged(ev event.EvtPeerConnectednessChanged) {
log.Info("### EvtPeerConnectednessChanged ", w.Host().ID(), " to ", ev.Peer, " : ", ev.Connectedness) log.Info("### EvtPeerConnectednessChanged ", w.Host().ID(), " to ", ev.Peer, " : ", ev.Connectedness)
w.peersMutex.Lock()
defer w.peersMutex.Unlock()
if ev.Connectedness == network.Connected { if ev.Connectedness == network.Connected {
_, ok := w.peers[ev.Peer] _, ok := w.peers[ev.Peer]
if !ok { if !ok {
@ -108,6 +114,10 @@ func (w *WakuNode) handleConnectednessChanged(ev event.EvtPeerConnectednessChang
} }
func (w *WakuNode) handleProtocolsUpdated(ev event.EvtPeerProtocolsUpdated) { func (w *WakuNode) handleProtocolsUpdated(ev event.EvtPeerProtocolsUpdated) {
log.Info("### EvtPeerProtocolsUpdated ", w.Host().ID(), " to ", ev.Peer, " added: ", ev.Added, ", removed: ", ev.Removed) log.Info("### EvtPeerProtocolsUpdated ", w.Host().ID(), " to ", ev.Peer, " added: ", ev.Added, ", removed: ", ev.Removed)
w.peersMutex.Lock()
defer w.peersMutex.Unlock()
_, ok := w.peers[ev.Peer] _, ok := w.peers[ev.Peer]
if ok { if ok {
peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer) peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer)
@ -117,8 +127,11 @@ func (w *WakuNode) handleProtocolsUpdated(ev event.EvtPeerProtocolsUpdated) {
} }
func (w *WakuNode) handlePeerIdentificationCompleted(ev event.EvtPeerIdentificationCompleted) { func (w *WakuNode) handlePeerIdentificationCompleted(ev event.EvtPeerIdentificationCompleted) {
log.Info("### EvtPeerIdentificationCompleted ", w.Host().ID(), " to ", ev.Peer) log.Info("### EvtPeerIdentificationCompleted ", w.Host().ID(), " to ", ev.Peer)
w.peersMutex.Lock()
defer w.peersMutex.Unlock()
peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer) peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer)
log.Info("identified protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols) log.Info("identified protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols)
_, ok := w.peers[ev.Peer] _, ok := w.peers[ev.Peer]
@ -135,13 +148,13 @@ func (w *WakuNode) processHostEvent(e interface{}) {
} }
isOnline := w.IsOnline() isOnline := w.IsOnline()
hasHistory := w.HasHistory() hasHistory := w.HasHistory()
switch e.(type) { switch e := e.(type) {
case event.EvtPeerConnectednessChanged: case event.EvtPeerConnectednessChanged:
w.handleConnectednessChanged(e.(event.EvtPeerConnectednessChanged)) w.handleConnectednessChanged(e)
case event.EvtPeerProtocolsUpdated: case event.EvtPeerProtocolsUpdated:
w.handleProtocolsUpdated(e.(event.EvtPeerProtocolsUpdated)) w.handleProtocolsUpdated(e)
case event.EvtPeerIdentificationCompleted: case event.EvtPeerIdentificationCompleted:
w.handlePeerIdentificationCompleted(e.(event.EvtPeerIdentificationCompleted)) w.handlePeerIdentificationCompleted(e)
} }
log.Info("###processHostEvent before isOnline()") log.Info("###processHostEvent before isOnline()")
@ -152,8 +165,17 @@ func (w *WakuNode) processHostEvent(e interface{}) {
hasHistory, "/", newHasHistory) hasHistory, "/", newHasHistory)
if w.connStatusChan != nil && if w.connStatusChan != nil &&
(isOnline != newIsOnline || hasHistory != newHasHistory) { (isOnline != newIsOnline || hasHistory != newHasHistory) {
log.Info("New ConnStatus: ", ConnStatus{IsOnline: newIsOnline, HasHistory: newHasHistory})
w.connStatusChan <- ConnStatus{IsOnline: newIsOnline, HasHistory: newHasHistory} // Creating a copy of the current peers map
w.peersMutex.Lock()
p := make(PeerStats)
for k, v := range w.peers {
p[k] = v
}
w.peersMutex.Unlock()
log.Info("New ConnStatus: ", ConnStatus{IsOnline: newIsOnline, HasHistory: newHasHistory, Peers: p})
w.connStatusChan <- ConnStatus{IsOnline: newIsOnline, HasHistory: newHasHistory, Peers: p}
} }
} }