From a3c7102a34cc9cfd70047d14339acd8842ed06d5 Mon Sep 17 00:00:00 2001 From: Vitaliy Vlasov Date: Sat, 11 Sep 2021 14:36:54 +0300 Subject: [PATCH] Fix keepAlive --- waku/v2/node/wakunode2.go | 61 ++++++++++++++++++++++++--------------- 1 file changed, 38 insertions(+), 23 deletions(-) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 70d61d2d..f38e55b3 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -4,12 +4,16 @@ import ( "context" "errors" "fmt" + + //"log/syslog" + //"strconv" "sync" "time" proto "github.com/golang/protobuf/proto" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/p2p/protocol/identify" "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/host" @@ -34,6 +38,8 @@ import ( var log = logging.Logger("wakunode") +//var logwriter, _ = syslog.New(syslog.LOG_ERR|syslog.LOG_LOCAL0, "WAKU") + type Message []byte // A map of peer IDs to supported protocols @@ -46,8 +52,9 @@ type ConnStatus struct { } type WakuNode struct { - host host.Host - opts *WakuNodeParameters + host host.Host + idService *identify.IDService + opts *WakuNodeParameters relay *relay.WakuRelay filter *filter.WakuFilter @@ -134,12 +141,7 @@ func (w *WakuNode) handlePeerIdentificationCompleted(ev event.EvtPeerIdentificat peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer) log.Debug("identified protocols found for peer: ", ev.Peer, ", protocols: ", peerProtocols) - _, ok := w.peers[ev.Peer] - if ok { - peerProtocols, _ := w.host.Peerstore().GetProtocols(ev.Peer) - w.peers[ev.Peer] = peerProtocols - } - + w.peers[ev.Peer] = peerProtocols } func (w *WakuNode) processHostEvent(e interface{}) { if e == nil { @@ -222,6 +224,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { w := new(WakuNode) w.bcaster = NewBroadcaster(1024) w.host = host + w.idService = identify.NewIDService(host) w.cancel = cancel w.ctx = ctx w.subscriptions = make(map[relay.Topic][]*Subscription) @@ -763,31 +766,35 @@ func (w *WakuNode) Peers() PeerStats { } func (w *WakuNode) startKeepAlive(t time.Duration) { + log.Info("Setting up ping protocol with duration of ", t) w.ping = ping.NewPingService(w.host) ticker := time.NewTicker(t) go func() { + // This map contains peers that we're + // waiting for the ping response from peerMap := make(map[peer.ID]<-chan ping.Result) - em, _ := w.Host().EventBus().Emitter(new(event.EvtPeerConnectednessChanged)) - defer em.Close() var mu sync.Mutex for { select { case <-ticker.C: - for _, peer := range w.host.Network().Peers() { + for _, p := range w.host.Network().Peers() { mu.Lock() - _, ok := peerMap[peer] + _, ok := peerMap[p] mu.Unlock() + + var s = p.Pretty() + s = s[len(s)-4:] if !ok { - log.Debug("###Pinging", peer) - result := w.ping.Ping(w.ctx, peer) + log.Info("###PING ", s) + result := w.ping.Ping(w.ctx, p) mu.Lock() - peerMap[peer] = result + peerMap[p] = result mu.Unlock() - go func() { + go func(peer peer.ID) { peerFound := false for p := range w.peers { if p == peer { @@ -796,7 +803,8 @@ func (w *WakuNode) startKeepAlive(t time.Duration) { } } - log.Debug("###PING before fetching result") + //log.Info("###PING " + s + " before fetching result") + //logwriter.Write([]byte("###PING " + s + " before fetching result")) pingTicker := time.NewTicker(time.Duration(1) * time.Second) isError := false select { @@ -806,29 +814,36 @@ func (w *WakuNode) startKeepAlive(t time.Duration) { isError = true } pingTicker.Stop() - log.Debug("###PING after fetching result") if !peerFound && !isError { - log.Debug("###PING peer added") //EventBus Emitter doesn't seem to work when there's no connection w.pingEventsChan <- event.EvtPeerConnectednessChanged{ Peer: peer, Connectedness: network.Connected, } + peerConns := w.host.Network().ConnsToPeer(peer) + if len(peerConns) > 0 { + // log.Info("###PING " + s + " IdentifyWait") + // logwriter.Write([]byte("###PING " + s + " IdentifyWait")) + //w.idService.IdentifyWait(peerConns[0]) + } else { + w.DialPeerByID(peer) + } } else if peerFound && isError { - log.Debug("###PING peer removed") + // log.Info("###PING " + s + " peer removed") + // logwriter.Write([]byte("###PING " + s + " peer removed")) w.pingEventsChan <- event.EvtPeerConnectednessChanged{ Peer: peer, Connectedness: network.NotConnected, } - log.Debug("###PING wrote to ping chan") } mu.Lock() delete(peerMap, peer) mu.Unlock() - }() + }(p) } else { - log.Debug("###PING already pinged") + log.Info("###PING " + s + " already pinged") + // logwriter.Write([]byte("###PING " + s + " already pinged")) } } case <-w.quit: