Fix keepAlive

This commit is contained in:
Vitaliy Vlasov 2021-09-11 14:36:54 +03:00
parent 1cf3de3a3f
commit a3c7102a34
1 changed files with 38 additions and 23 deletions

View File

@ -4,12 +4,16 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
//"log/syslog"
//"strconv"
"sync" "sync"
"time" "time"
proto "github.com/golang/protobuf/proto" proto "github.com/golang/protobuf/proto"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
@ -34,6 +38,8 @@ import (
var log = logging.Logger("wakunode") var log = logging.Logger("wakunode")
//var logwriter, _ = syslog.New(syslog.LOG_ERR|syslog.LOG_LOCAL0, "WAKU")
type Message []byte type Message []byte
// A map of peer IDs to supported protocols // A map of peer IDs to supported protocols
@ -47,6 +53,7 @@ type ConnStatus struct {
type WakuNode struct { type WakuNode struct {
host host.Host host host.Host
idService *identify.IDService
opts *WakuNodeParameters opts *WakuNodeParameters
relay *relay.WakuRelay relay *relay.WakuRelay
@ -134,13 +141,8 @@ func (w *WakuNode) handlePeerIdentificationCompleted(ev event.EvtPeerIdentificat
peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer) peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer)
log.Debug("identified protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols) log.Debug("identified protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols)
_, ok := w.peers[ev.Peer]
if ok {
peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer)
w.peers[ev.Peer] = peerProtocols w.peers[ev.Peer] = peerProtocols
} }
}
func (w *WakuNode) processHostEvent(e interface{}) { func (w *WakuNode) processHostEvent(e interface{}) {
if e == nil { if e == nil {
log.Debug("processHostEvent nil event") log.Debug("processHostEvent nil event")
@ -222,6 +224,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
w := new(WakuNode) w := new(WakuNode)
w.bcaster = NewBroadcaster(1024) w.bcaster = NewBroadcaster(1024)
w.host = host w.host = host
w.idService = identify.NewIDService(host)
w.cancel = cancel w.cancel = cancel
w.ctx = ctx w.ctx = ctx
w.subscriptions = make(map[relay.Topic][]*Subscription) w.subscriptions = make(map[relay.Topic][]*Subscription)
@ -763,31 +766,35 @@ func (w *WakuNode) Peers() PeerStats {
} }
func (w *WakuNode) startKeepAlive(t time.Duration) { func (w *WakuNode) startKeepAlive(t time.Duration) {
log.Info("Setting up ping protocol with duration of ", t) log.Info("Setting up ping protocol with duration of ", t)
w.ping = ping.NewPingService(w.host) w.ping = ping.NewPingService(w.host)
ticker := time.NewTicker(t) ticker := time.NewTicker(t)
go func() { go func() {
// This map contains peers that we're
// waiting for the ping response from
peerMap := make(map[peer.ID]<-chan ping.Result) peerMap := make(map[peer.ID]<-chan ping.Result)
em, _ := w.Host().EventBus().Emitter(new(event.EvtPeerConnectednessChanged))
defer em.Close()
var mu sync.Mutex var mu sync.Mutex
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
for _, peer := range w.host.Network().Peers() { for _, p := range w.host.Network().Peers() {
mu.Lock() mu.Lock()
_, ok := peerMap[peer] _, ok := peerMap[p]
mu.Unlock()
if !ok {
log.Debug("###Pinging", peer)
result := w.ping.Ping(w.ctx, peer)
mu.Lock()
peerMap[peer] = result
mu.Unlock() mu.Unlock()
go func() { var s = p.Pretty()
s = s[len(s)-4:]
if !ok {
log.Info("###PING ", s)
result := w.ping.Ping(w.ctx, p)
mu.Lock()
peerMap[p] = result
mu.Unlock()
go func(peer peer.ID) {
peerFound := false peerFound := false
for p := range w.peers { for p := range w.peers {
if p == peer { if p == peer {
@ -796,7 +803,8 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
} }
} }
log.Debug("###PING before fetching result") //log.Info("###PING " + s + " before fetching result")
//logwriter.Write([]byte("###PING " + s + " before fetching result"))
pingTicker := time.NewTicker(time.Duration(1) * time.Second) pingTicker := time.NewTicker(time.Duration(1) * time.Second)
isError := false isError := false
select { select {
@ -806,29 +814,36 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
isError = true isError = true
} }
pingTicker.Stop() pingTicker.Stop()
log.Debug("###PING after fetching result")
if !peerFound && !isError { if !peerFound && !isError {
log.Debug("###PING peer added")
//EventBus Emitter doesn't seem to work when there's no connection //EventBus Emitter doesn't seem to work when there's no connection
w.pingEventsChan <- event.EvtPeerConnectednessChanged{ w.pingEventsChan <- event.EvtPeerConnectednessChanged{
Peer: peer, Peer: peer,
Connectedness: network.Connected, Connectedness: network.Connected,
} }
peerConns := w.host.Network().ConnsToPeer(peer)
if len(peerConns) > 0 {
// log.Info("###PING " + s + " IdentifyWait")
// logwriter.Write([]byte("###PING " + s + " IdentifyWait"))
//w.idService.IdentifyWait(peerConns[0])
} else {
w.DialPeerByID(peer)
}
} else if peerFound && isError { } else if peerFound && isError {
log.Debug("###PING peer removed") // log.Info("###PING " + s + " peer removed")
// logwriter.Write([]byte("###PING " + s + " peer removed"))
w.pingEventsChan <- event.EvtPeerConnectednessChanged{ w.pingEventsChan <- event.EvtPeerConnectednessChanged{
Peer: peer, Peer: peer,
Connectedness: network.NotConnected, Connectedness: network.NotConnected,
} }
log.Debug("###PING wrote to ping chan")
} }
mu.Lock() mu.Lock()
delete(peerMap, peer) delete(peerMap, peer)
mu.Unlock() mu.Unlock()
}() }(p)
} else { } else {
log.Debug("###PING already pinged") log.Info("###PING " + s + " already pinged")
// logwriter.Write([]byte("###PING " + s + " already pinged"))
} }
} }
case <-w.quit: case <-w.quit: