diff --git a/logging/logging.go b/logging/logging.go index 7e872eef..19732d55 100644 --- a/logging/logging.go +++ b/logging/logging.go @@ -8,6 +8,7 @@ package logging import ( "encoding/hex" + "fmt" "net" "time" @@ -147,3 +148,8 @@ func TCPAddr(key string, ip net.IP, port int) zap.Field { func UDPAddr(key string, ip net.IP, port int) zap.Field { return zap.Stringer(key, &net.UDPAddr{IP: ip, Port: port}) } + +func Uint64(key string, value uint64) zap.Field { + valueStr := fmt.Sprintf("%v", value) + return zap.String(key, valueStr) +} diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 3d2aa1a5..4b3508a4 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -554,9 +554,9 @@ func (w *WakuNode) watchENRChanges(ctx context.Context) { currNodeVal := w.localNode.Node().String() if prevNodeVal != currNodeVal { if prevNodeVal == "" { - w.log.Info("enr record", logging.ENode("enr", w.localNode.Node())) + w.log.Info("local node enr record", logging.ENode("enr", w.localNode.Node())) } else { - w.log.Info("new enr record", logging.ENode("enr", w.localNode.Node())) + w.log.Info("local node new enr record", logging.ENode("enr", w.localNode.Node())) } prevNodeVal = currNodeVal } diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index f081c564..e53d9c09 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -129,7 +129,6 @@ func (c *PeerConnectionStrategy) consumeSubscription(s subscription) { if len(c.host.Network().Peers()) < c.pm.OutPeersTarget { triggerImmediateConnection = true } - c.logger.Debug("adding discovered peer", logging.HostID("peerID", p.AddrInfo.ID)) c.pm.AddDiscoveredPeer(p, triggerImmediateConnection) case <-time.After(1 * time.Second): diff --git a/waku/v2/peermanager/peer_discovery.go b/waku/v2/peermanager/peer_discovery.go index d1cedac2..6d05c69b 100644 --- a/waku/v2/peermanager/peer_discovery.go +++ b/waku/v2/peermanager/peer_discovery.go @@ -55,7 +55,7 @@ func (pm *PeerManager) discoverOnDemand(cluster uint16, wakuProtoInfo, ok := pm.wakuprotoToENRFieldMap[wakuProtocol] if !ok { - pm.logger.Error("cannot do on demand discovery for non-waku protocol", zap.String("protocol", string(wakuProtocol))) + pm.logger.Info("cannot do on demand discovery for non-waku protocol", zap.String("protocol", string(wakuProtocol))) return nil, errors.New("cannot do on demand discovery for non-waku protocol") } iterator, err := pm.discoveryService.PeerIterator( diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index c3b1cf74..173663c0 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -128,7 +128,7 @@ func (pm *PeerManager) checkAndUpdateTopicHealth(topic *NodeTopicDetails) int { pThreshold, err := pm.host.Peerstore().(wps.WakuPeerstore).Score(p) if err == nil { if pThreshold < relay.PeerPublishThreshold { - pm.logger.Debug("peer score below publish threshold", logging.HostID("peer", p), zap.Float64("score", pThreshold)) + pm.logger.Debug("peer score below publish threshold", zap.Stringer("peer", p), zap.Float64("score", pThreshold)) } else { healthyPeerCount++ } @@ -136,9 +136,9 @@ func (pm *PeerManager) checkAndUpdateTopicHealth(topic *NodeTopicDetails) int { if errors.Is(err, peerstore.ErrNotFound) { // For now considering peer as healthy if we can't fetch score. healthyPeerCount++ - pm.logger.Debug("peer score is not available yet", logging.HostID("peer", p)) + pm.logger.Debug("peer score is not available yet", zap.Stringer("peer", p)) } else { - pm.logger.Warn("failed to fetch peer score ", zap.Error(err), logging.HostID("peer", p)) + pm.logger.Warn("failed to fetch peer score ", zap.Error(err), zap.Stringer("peer", p)) } } } @@ -271,7 +271,7 @@ func (pm *PeerManager) GroupPeersByDirection(specificPeers ...peer.ID) (inPeers } } else { pm.logger.Error("failed to retrieve peer direction", - logging.HostID("peerID", p), zap.Error(err)) + zap.Stringer("peerID", p), zap.Error(err)) } } return inPeers, outPeers, nil @@ -399,10 +399,10 @@ func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) { err := pm.host.Network().ClosePeer(p) if err != nil { pm.logger.Warn("failed to disconnect connection towards peer", - logging.HostID("peerID", p)) + zap.Stringer("peerID", p)) } pm.logger.Debug("successfully disconnected connection towards peer", - logging.HostID("peerID", p)) + zap.Stringer("peerID", p)) } } @@ -410,7 +410,7 @@ func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID { shards, err := wenr.RelaySharding(p.ENR.Record()) if err != nil { pm.logger.Error("could not derive relayShards from ENR", zap.Error(err), - logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) + zap.Stringer("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) } else { if shards != nil { p.PubsubTopics = make([]string, 0) @@ -420,7 +420,7 @@ func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID { p.PubsubTopics = append(p.PubsubTopics, topicStr) } } else { - pm.logger.Debug("ENR doesn't have relay shards", logging.HostID("peer", p.AddrInfo.ID)) + pm.logger.Debug("ENR doesn't have relay shards", zap.Stringer("peer", p.AddrInfo.ID)) } } supportedProtos := []protocol.ID{} @@ -446,6 +446,7 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { if pm.maxPeers <= pm.host.Peerstore().Peers().Len() { return } + //Check if the peer is already present, if so skip adding _, err := pm.host.Peerstore().(wps.WakuPeerstore).Origin(p.AddrInfo.ID) if err == nil { @@ -463,16 +464,17 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { } //Peer is already in peer-store but stored ENR is older than discovered one. pm.logger.Info("peer already found in peerstore, but re-adding it as ENR sequence is higher than locally stored", - logging.HostID("peer", p.AddrInfo.ID), zap.Uint64("newENRSeq", p.ENR.Seq()), zap.Uint64("storedENRSeq", enr.Record().Seq())) + zap.Stringer("peer", p.AddrInfo.ID), logging.Uint64("newENRSeq", p.ENR.Record().Seq()), logging.Uint64("storedENRSeq", enr.Record().Seq())) } else { - pm.logger.Info("peer already found in peerstore, but no new ENR", logging.HostID("peer", p.AddrInfo.ID)) + pm.logger.Info("peer already found in peerstore, but no new ENR", zap.Stringer("peer", p.AddrInfo.ID)) } } else { //Peer is in peer-store but it doesn't have an enr pm.logger.Info("peer already found in peerstore, but doesn't have an ENR record, re-adding", - logging.HostID("peer", p.AddrInfo.ID)) + zap.Stringer("peer", p.AddrInfo.ID)) } } + pm.logger.Debug("adding discovered peer", zap.Stringer("peerID", p.AddrInfo.ID)) supportedProtos := []protocol.ID{} if len(p.PubsubTopics) == 0 && p.ENR != nil { @@ -483,14 +485,15 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { _ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubsubTopics, supportedProtos...) if p.ENR != nil { + pm.logger.Debug("setting ENR for peer", zap.Stringer("peerID", p.AddrInfo.ID), zap.Stringer("enr", p.ENR)) err := pm.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR) if err != nil { pm.logger.Error("could not store enr", zap.Error(err), - logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) + zap.Stringer("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) } } if connectNow { - pm.logger.Debug("connecting now to discovered peer", logging.HostID("peer", p.AddrInfo.ID)) + pm.logger.Debug("connecting now to discovered peer", zap.Stringer("peer", p.AddrInfo.ID)) go pm.peerConnector.PushToChan(p) } } @@ -499,10 +502,10 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { // It also sets additional metadata such as origin and supported protocols func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) error { if pm.maxPeers <= pm.host.Peerstore().Peers().Len() { - pm.logger.Error("could not add peer as peer store capacity is reached", logging.HostID("peer", ID), zap.Int("capacity", pm.maxPeers)) + pm.logger.Error("could not add peer as peer store capacity is reached", zap.Stringer("peer", ID), zap.Int("capacity", pm.maxPeers)) return errors.New("peer store capacity reached") } - pm.logger.Info("adding peer to peerstore", logging.HostID("peer", ID)) + pm.logger.Info("adding peer to peerstore", zap.Stringer("peer", ID)) if origin == wps.Static { pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.PermanentAddrTTL) } else { @@ -512,14 +515,14 @@ func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Orig } err := pm.host.Peerstore().(wps.WakuPeerstore).SetOrigin(ID, origin) if err != nil { - pm.logger.Error("could not set origin", zap.Error(err), logging.HostID("peer", ID)) + pm.logger.Error("could not set origin", zap.Error(err), zap.Stringer("peer", ID)) return err } if len(protocols) > 0 { err = pm.host.Peerstore().AddProtocols(ID, protocols...) if err != nil { - pm.logger.Error("could not set protocols", zap.Error(err), logging.HostID("peer", ID)) + pm.logger.Error("could not set protocols", zap.Error(err), zap.Stringer("peer", ID)) return err } } @@ -531,7 +534,7 @@ func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Orig err = pm.host.Peerstore().(wps.WakuPeerstore).SetPubSubTopics(ID, pubSubTopics) if err != nil { pm.logger.Error("could not store pubSubTopic", zap.Error(err), - logging.HostID("peer", ID), zap.Strings("topics", pubSubTopics)) + zap.Stringer("peer", ID), zap.Strings("topics", pubSubTopics)) } return nil } @@ -609,7 +612,7 @@ func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) { //For now adding the peer to serviceSlot which means the latest added peer would be given priority. //TODO: Ideally we should sort the peers per service and return best peer based on peer score or RTT etc. - pm.logger.Info("adding peer to service slots", logging.HostID("peer", peerID), + pm.logger.Info("adding peer to service slots", zap.Stringer("peer", peerID), zap.String("service", string(proto))) // getPeers returns nil for WakuRelayIDv200 protocol, but we don't run this ServiceSlot code for WakuRelayIDv200 protocol pm.serviceSlots.getPeers(proto).add(peerID)