Use keepAlive for Connectedness events

This commit is contained in:
Vitaliy Vlasov 2021-08-10 17:23:49 +03:00
parent 51f220a130
commit 24c7a8e4c6
4 changed files with 241 additions and 144 deletions

View File

@ -814,6 +814,8 @@ github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210711180556-9afd35dadd3f h1
github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210711180556-9afd35dadd3f/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE= github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210711180556-9afd35dadd3f/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE=
github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210721191549-83c6a2c077f4 h1:LaoaRVhtEu5kCzcEVn+l3sprDbTqpzWFsVT2lmAcHcA= github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210721191549-83c6a2c077f4 h1:LaoaRVhtEu5kCzcEVn+l3sprDbTqpzWFsVT2lmAcHcA=
github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210721191549-83c6a2c077f4/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE= github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210721191549-83c6a2c077f4/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE=
github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a h1:eCna/q/PuZVqtmOMBqytw9yzZwMNKpao4au0OJDvesI=
github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE=
github.com/status-im/status-go/extkeys v1.0.0/go.mod h1:GdqJbrcpkNm5ZsSCpp+PdMxnXx+OcRBdm3PI0rs1FpU= github.com/status-im/status-go/extkeys v1.0.0/go.mod h1:GdqJbrcpkNm5ZsSCpp+PdMxnXx+OcRBdm3PI0rs1FpU=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=

View File

@ -7,6 +7,9 @@ import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"net" "net"
"os"
"os/signal"
"syscall"
"time" "time"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
@ -14,7 +17,8 @@ import (
"github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/status-im/go-waku/waku/v2/protocol/relay"
"github.com/status-im/go-waku/waku/v2/protocol/store"
//"github.com/status-im/go-waku/waku/v2/protocol/store"
"github.com/status-im/go-waku/waku/v2/utils" "github.com/status-im/go-waku/waku/v2/utils"
) )
@ -55,107 +59,114 @@ func main() {
ctx := context.Background() ctx := context.Background()
connStatusChan := make(chan node.ConnStatus) //connStatusChan := make(chan node.ConnStatus)
log.Info("### create relayNode1")
relayNode1, err := node.New(ctx, relayNode1, err := node.New(ctx,
node.WithPrivateKey(addrsAndKeys[0].key), node.WithPrivateKey(addrsAndKeys[0].key),
node.WithHostAddress([]net.Addr{addrsAndKeys[0].addr}), node.WithHostAddress([]net.Addr{addrsAndKeys[0].addr}),
node.WithWakuRelay(), node.WithWakuRelay(),
node.WithConnStatusChan(connStatusChan), //node.WithConnStatusChan(connStatusChan),
node.WithWakuStore(true), node.WithWakuStore(true),
node.WithKeepAlive(time.Duration(2)*time.Second),
) )
relayNode2, err := node.New(ctx, // relayNode2, err := node.New(ctx,
node.WithPrivateKey(addrsAndKeys[1].key), // node.WithPrivateKey(addrsAndKeys[1].key),
node.WithHostAddress([]net.Addr{addrsAndKeys[1].addr}), // node.WithHostAddress([]net.Addr{addrsAndKeys[1].addr}),
node.WithWakuRelay(), // node.WithWakuRelay(),
) // )
relayNode2.DialPeer(relayNode1.ListenAddresses()[0]) log.Info("### before DialPeer")
//staticNode := "/ip4/8.210.222.231/tcp/30303/p2p/16Uiu2HAm4v86W3bmT1BiH6oSPzcsSr24iDQpSN5Qa992BCjjwgrD"
staticNode := "/ip4/188.166.135.145/tcp/30303/p2p/16Uiu2HAmL5okWopX7NqZWBUKVqW8iUxCEmd5GMHLVPwCgzYzQv3e"
relayNode1.DialPeer(staticNode)
//relayNode2.DialPeer(relayNode1.ListenAddresses()[0])
//go writeLoop(ctx, relayNode1) //go writeLoop(ctx, relayNode1)
//go readLoop(relayNode1) //go readLoop(relayNode1)
//go readLoop(relayNode2) //go readLoop(relayNode2)
printNodeConns := func(node *node.WakuNode) { log.Info("### Peer dialled")
log.Info(node.Host().ID(), ": ", "peerCount: ", len(node.Host().Peerstore().Peers())) // printNodeConns := func(node *node.WakuNode) {
log.Info("node peers: ") // log.Info(node.Host().ID(), ": ", "peerCount: ", len(node.Host().Peerstore().Peers()))
for k, v := range node.GetPeerStats() { // log.Info("node peers: ")
log.Info(k, " ", v) // for k, v := range node.GetPeerStats() {
} // log.Info(k, " ", v)
log.Info(node.Host().ID(), ": ", "isOnline/hasHistory ", node.IsOnline(), " ", node.HasHistory()) // }
log.Info("end") // log.Info(node.Host().ID(), ": ", "isOnline/hasHistory ", node.IsOnline(), " ", node.HasHistory())
// log.Info("end")
} // }
go func() { // go func() {
for connStatus := range connStatusChan { // for connStatus := range connStatusChan {
log.Info("Conn status update: ", connStatus) // log.Info("Conn status update: ", connStatus)
} // }
}() // }()
go func() { // go func() {
ticker := time.NewTicker(time.Millisecond * 1000) // ticker := time.NewTicker(time.Millisecond * 1000)
defer ticker.Stop() // defer ticker.Stop()
for { // for {
select { // select {
case <-ticker.C: // case <-ticker.C:
printNodeConns(relayNode1) // printNodeConns(relayNode1)
} // }
} // }
}() // }()
time.Sleep(3 * time.Second) // time.Sleep(3 * time.Second)
log.Info("stop relayNode2") // log.Info("stop relayNode2")
//relayNode2.Stop() // relayNode2.Host().Close()
time.Sleep(3 * time.Second) // time.Sleep(3 * time.Second)
log.Info("start relayNode3") // log.Info("start relayNode3")
relayNode3, err := node.New(ctx, // relayNode3, err := node.New(ctx,
node.WithPrivateKey(addrsAndKeys[2].key), // node.WithPrivateKey(addrsAndKeys[2].key),
node.WithHostAddress([]net.Addr{addrsAndKeys[2].addr}), // node.WithHostAddress([]net.Addr{addrsAndKeys[2].addr}),
node.WithWakuRelay(), // node.WithWakuRelay(),
) // )
relayNode3.DialPeer(relayNode1.ListenAddresses()[0]) // relayNode3.DialPeer(relayNode1.ListenAddresses()[0])
time.Sleep(3 * time.Second) // time.Sleep(3 * time.Second)
log.Info("stop relayNode3") // log.Info("stop relayNode3")
//relayNode3.Stop() // //relayNode3.Stop()
log.Info("start storeNode") // log.Info("start storeNode")
// Start a store node // // Start a store node
storeNode, _ := node.New(ctx, // storeNode, _ := node.New(ctx,
node.WithPrivateKey(addrsAndKeys[3].key), // node.WithPrivateKey(addrsAndKeys[3].key),
node.WithHostAddress([]net.Addr{addrsAndKeys[3].addr}), // node.WithHostAddress([]net.Addr{addrsAndKeys[3].addr}),
node.WithWakuRelay(), // node.WithWakuRelay(),
node.WithWakuStore(true), // node.WithWakuStore(true),
) // )
tCtx, _ := context.WithTimeout(ctx, 5*time.Second) // tCtx, _ := context.WithTimeout(ctx, 5*time.Second)
log.Info("#before AddStorePeer") // log.Info("#before AddStorePeer")
storeNodeId, err := relayNode1.AddStorePeer(storeNode.ListenAddresses()[0]) // storeNodeId, err := relayNode1.AddStorePeer(storeNode.ListenAddresses()[0])
time.Sleep(3 * time.Second) // time.Sleep(3 * time.Second)
log.Info("#before Query") // log.Info("#before Query")
_, err = relayNode1.Query(tCtx, []string{contentTopic}, 0, 0, store.WithPeer(*storeNodeId)) // _, err = relayNode1.Query(tCtx, []string{contentTopic}, 0, 0, store.WithPeer(*storeNodeId))
log.Info("storeNode.ListenAddresses(): ", storeNode.ListenAddresses(), storeNodeId) // log.Info("storeNode.ListenAddresses(): ", storeNode.ListenAddresses(), storeNodeId)
if err != nil { // if err != nil {
log.Info("### error adding store peer: ", err) // log.Info("### error adding store peer: ", err)
} // }
time.Sleep(3 * time.Second) // time.Sleep(3 * time.Second)
log.Info("stop storeNode") // log.Info("stop storeNode")
storeNode.Stop() // storeNode.Stop()
time.Sleep(3 * time.Second) // time.Sleep(3 * time.Second)
// // Wait for a SIGINT or SIGTERM signal // // // Wait for a SIGINT or SIGTERM signal
// ch := make(chan os.Signal, 1) ch := make(chan os.Signal, 1)
// signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
// <-ch <-ch
// fmt.Println("\n\n\nReceived signal, shutting down...") fmt.Println("\n\n\nReceived signal, shutting down...")
// shut the nodes down // // shut the nodes down
relayNode1.Stop() // relayNode1.Stop()
} }

View File

@ -77,78 +77,104 @@ type WakuNode struct {
// Channel passed to WakuNode constructor // Channel passed to WakuNode constructor
// receiving connection status notifications // receiving connection status notifications
connStatusChan chan ConnStatus connStatusChan chan ConnStatus
pingEventsChan chan interface{}
} }
func (w *WakuNode) handleConnectednessChanged(ev event.EvtPeerConnectednessChanged) {
log.Info("### EvtPeerConnectednessChanged ", w.Host().ID(), " to ", ev.Peer, " : ", ev.Connectedness)
if ev.Connectedness == network.Connected {
_, ok := w.peers[ev.Peer]
if !ok {
peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer)
log.Info("protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols)
w.peers[ev.Peer] = peerProtocols
} else {
log.Info("### Peer already exists")
}
} else if ev.Connectedness == network.NotConnected {
log.Info("Peer down: ", ev.Peer)
delete(w.peers, ev.Peer)
// for _, pl := range w.peerListeners {
// pl <- &ev
// }
// TODO
// There seems to be no proper way to
// remove a dropped peer from Host's Peerstore
// https://github.com/libp2p/go-libp2p-host/issues/13
//w.Host().Network().ClosePeer(ev.Peer)
}
}
func (w *WakuNode) handleProtocolsUpdated(ev event.EvtPeerProtocolsUpdated) {
log.Info("### EvtPeerProtocolsUpdated ", w.Host().ID(), " to ", ev.Peer, " added: ", ev.Added, ", removed: ", ev.Removed)
_, ok := w.peers[ev.Peer]
if ok {
peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer)
log.Info("updated protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols)
w.peers[ev.Peer] = peerProtocols
}
}
func (w *WakuNode) handlePeerIdentificationCompleted(ev event.EvtPeerIdentificationCompleted) {
log.Info("### EvtPeerIdentificationCompleted ", w.Host().ID(), " to ", ev.Peer)
peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer)
log.Info("identified protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols)
_, ok := w.peers[ev.Peer]
if ok {
peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer)
w.peers[ev.Peer] = peerProtocols
}
}
func (w *WakuNode) processHostEvent(e interface{}) {
if e == nil {
log.Info("processHostEvent nil event")
return
}
isOnline := w.IsOnline()
hasHistory := w.HasHistory()
switch e.(type) {
case event.EvtPeerConnectednessChanged:
w.handleConnectednessChanged(e.(event.EvtPeerConnectednessChanged))
case event.EvtPeerProtocolsUpdated:
w.handleProtocolsUpdated(e.(event.EvtPeerProtocolsUpdated))
case event.EvtPeerIdentificationCompleted:
w.handlePeerIdentificationCompleted(e.(event.EvtPeerIdentificationCompleted))
}
log.Info("###processHostEvent before isOnline()")
newIsOnline := w.IsOnline()
log.Info("###processHostEvent before hasHistory()")
newHasHistory := w.HasHistory()
log.Info("###ConnStatus isOnline: ", isOnline, "/", newIsOnline, " hasHistory: ",
hasHistory, "/", newHasHistory)
if w.connStatusChan != nil &&
(isOnline != newIsOnline || hasHistory != newHasHistory) {
log.Info("New ConnStatus: ", ConnStatus{IsOnline: newIsOnline, HasHistory: newHasHistory})
w.connStatusChan <- ConnStatus{IsOnline: newIsOnline, HasHistory: newHasHistory}
}
}
func (w *WakuNode) connectednessListener() { func (w *WakuNode) connectednessListener() {
for { for {
isOnline := w.IsOnline() var e interface{}
hasHistory := w.HasHistory() log.Info("connectednessListener before select")
select { select {
case e := <-w.connectednessEventSub.Out(): case e = <-w.connectednessEventSub.Out():
if e == nil { log.Info("connectednessListener connectednessEvent")
break case e = <-w.protocolEventSub.Out():
} log.Info("connectednessListener protocolEvent")
ev := e.(event.EvtPeerConnectednessChanged) case e = <-w.identificationEventSub.Out():
log.Info("connectednessListener identificationEvent")
log.Info("### EvtPeerConnectednessChanged ", w.Host().ID(), " to ", ev.Peer, " : ", ev.Connectedness) case e = <-w.pingEventsChan:
if ev.Connectedness == network.Connected { log.Info("connectednessListener pingEvent")
_, ok := w.peers[ev.Peer]
if !ok {
peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer)
log.Info("protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols)
w.peers[ev.Peer] = peerProtocols
} else {
log.Info("### Peer already exists")
}
} else if ev.Connectedness == network.NotConnected {
log.Info("Peer down: ", ev.Peer)
delete(w.peers, ev.Peer)
for _, pl := range w.peerListeners {
pl <- &ev
}
// TODO
// There seems to be no proper way to
// remove a dropped peer from Host's Peerstore
// https://github.com/libp2p/go-libp2p-host/issues/13
//w.Host().Network().ClosePeer(ev.Peer)
}
case e := <-w.protocolEventSub.Out():
if e == nil {
break
}
ev := e.(event.EvtPeerProtocolsUpdated)
log.Info("### EvtPeerProtocolsUpdated ", w.Host().ID(), " to ", ev.Peer, " added: ", ev.Added, ", removed: ", ev.Removed)
_, ok := w.peers[ev.Peer]
if ok {
peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer)
log.Info("updated protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols)
w.peers[ev.Peer] = peerProtocols
}
case e := <-w.identificationEventSub.Out():
if e == nil {
break
}
ev := e.(event.EvtPeerIdentificationCompleted)
log.Info("### EvtPeerIdentificationCompleted ", w.Host().ID(), " to ", ev.Peer)
peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer)
log.Info("identified protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols)
_, ok := w.peers[ev.Peer]
if ok {
peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer)
w.peers[ev.Peer] = peerProtocols
}
}
newIsOnline := w.IsOnline()
newHasHistory := w.HasHistory()
if w.connStatusChan != nil &&
(isOnline != newIsOnline || hasHistory != newHasHistory) {
w.connStatusChan <- ConnStatus{IsOnline: newIsOnline, HasHistory: newHasHistory}
} }
log.Info("connectednessListener after select")
w.processHostEvent(e)
log.Info("connectednessListener after processHostEvent")
} }
} }
@ -205,6 +231,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
if params.connStatusChan != nil { if params.connStatusChan != nil {
w.connStatusChan = params.connStatusChan w.connStatusChan = params.connStatusChan
} }
w.pingEventsChan = make(chan interface{})
go w.connectednessListener() go w.connectednessListener()
if params.enableStore { if params.enableStore {
@ -688,12 +715,69 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
w.ping = ping.NewPingService(w.host) w.ping = ping.NewPingService(w.host)
ticker := time.NewTicker(t) ticker := time.NewTicker(t)
go func() { go func() {
peerMap := make(map[peer.ID]<-chan ping.Result)
em, _ := w.Host().EventBus().Emitter(new(event.EvtPeerConnectednessChanged))
defer em.Close()
var mu sync.Mutex
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
for _, peer := range w.host.Network().Peers() { for _, peer := range w.host.Network().Peers() {
log.Info("Pinging", peer) mu.Lock()
w.ping.Ping(w.ctx, peer) _, ok := peerMap[peer]
mu.Unlock()
if !ok {
log.Info("###Pinging", peer)
result := w.ping.Ping(w.ctx, peer)
mu.Lock()
peerMap[peer] = result
mu.Unlock()
go func() {
peerFound := false
for p, _ := range w.peers {
if p == peer {
peerFound = true
break
}
}
log.Info("###PING before fetching result")
pingTicker := time.NewTicker(time.Duration(1) * time.Second)
isError := false
select {
case resVal := <-result:
isError = resVal.Error != nil
case <-pingTicker.C:
isError = true
}
pingTicker.Stop()
log.Info("###PING after fetching result")
if !peerFound && !isError {
log.Info("###PING peer added")
//EventBus Emitter doesn't seem to work when there's no connection
w.pingEventsChan <- event.EvtPeerConnectednessChanged{
Peer: peer,
Connectedness: network.Connected,
}
} else if peerFound && isError {
log.Info("###PING peer removed")
w.pingEventsChan <- event.EvtPeerConnectednessChanged{
Peer: peer,
Connectedness: network.NotConnected,
}
log.Info("###PING wrote to ping chan")
}
mu.Lock()
delete(peerMap, peer)
mu.Unlock()
}()
} else {
log.Info("###PING already pinged")
}
} }
case <-w.quit: case <-w.quit:
ticker.Stop() ticker.Stop()