mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-04 06:53:06 +00:00
chore: logging improvements
This commit is contained in:
parent
e2b87eee7b
commit
daa34a5caa
@ -8,6 +8,7 @@ package logging
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -147,3 +148,8 @@ func TCPAddr(key string, ip net.IP, port int) zap.Field {
|
|||||||
func UDPAddr(key string, ip net.IP, port int) zap.Field {
|
func UDPAddr(key string, ip net.IP, port int) zap.Field {
|
||||||
return zap.Stringer(key, &net.UDPAddr{IP: ip, Port: port})
|
return zap.Stringer(key, &net.UDPAddr{IP: ip, Port: port})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Uint64(key string, value uint64) zap.Field {
|
||||||
|
valueStr := fmt.Sprintf("%v", value)
|
||||||
|
return zap.String(key, valueStr)
|
||||||
|
}
|
||||||
|
|||||||
@ -554,9 +554,9 @@ func (w *WakuNode) watchENRChanges(ctx context.Context) {
|
|||||||
currNodeVal := w.localNode.Node().String()
|
currNodeVal := w.localNode.Node().String()
|
||||||
if prevNodeVal != currNodeVal {
|
if prevNodeVal != currNodeVal {
|
||||||
if prevNodeVal == "" {
|
if prevNodeVal == "" {
|
||||||
w.log.Info("enr record", logging.ENode("enr", w.localNode.Node()))
|
w.log.Info("local node enr record", logging.ENode("enr", w.localNode.Node()))
|
||||||
} else {
|
} else {
|
||||||
w.log.Info("new enr record", logging.ENode("enr", w.localNode.Node()))
|
w.log.Info("local node new enr record", logging.ENode("enr", w.localNode.Node()))
|
||||||
}
|
}
|
||||||
prevNodeVal = currNodeVal
|
prevNodeVal = currNodeVal
|
||||||
}
|
}
|
||||||
|
|||||||
@ -129,7 +129,6 @@ func (c *PeerConnectionStrategy) consumeSubscription(s subscription) {
|
|||||||
if len(c.host.Network().Peers()) < c.pm.OutPeersTarget {
|
if len(c.host.Network().Peers()) < c.pm.OutPeersTarget {
|
||||||
triggerImmediateConnection = true
|
triggerImmediateConnection = true
|
||||||
}
|
}
|
||||||
c.logger.Debug("adding discovered peer", logging.HostID("peerID", p.AddrInfo.ID))
|
|
||||||
c.pm.AddDiscoveredPeer(p, triggerImmediateConnection)
|
c.pm.AddDiscoveredPeer(p, triggerImmediateConnection)
|
||||||
|
|
||||||
case <-time.After(1 * time.Second):
|
case <-time.After(1 * time.Second):
|
||||||
|
|||||||
@ -55,7 +55,7 @@ func (pm *PeerManager) discoverOnDemand(cluster uint16,
|
|||||||
|
|
||||||
wakuProtoInfo, ok := pm.wakuprotoToENRFieldMap[wakuProtocol]
|
wakuProtoInfo, ok := pm.wakuprotoToENRFieldMap[wakuProtocol]
|
||||||
if !ok {
|
if !ok {
|
||||||
pm.logger.Error("cannot do on demand discovery for non-waku protocol", zap.String("protocol", string(wakuProtocol)))
|
pm.logger.Info("cannot do on demand discovery for non-waku protocol", zap.String("protocol", string(wakuProtocol)))
|
||||||
return nil, errors.New("cannot do on demand discovery for non-waku protocol")
|
return nil, errors.New("cannot do on demand discovery for non-waku protocol")
|
||||||
}
|
}
|
||||||
iterator, err := pm.discoveryService.PeerIterator(
|
iterator, err := pm.discoveryService.PeerIterator(
|
||||||
|
|||||||
@ -128,7 +128,7 @@ func (pm *PeerManager) checkAndUpdateTopicHealth(topic *NodeTopicDetails) int {
|
|||||||
pThreshold, err := pm.host.Peerstore().(wps.WakuPeerstore).Score(p)
|
pThreshold, err := pm.host.Peerstore().(wps.WakuPeerstore).Score(p)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if pThreshold < relay.PeerPublishThreshold {
|
if pThreshold < relay.PeerPublishThreshold {
|
||||||
pm.logger.Debug("peer score below publish threshold", logging.HostID("peer", p), zap.Float64("score", pThreshold))
|
pm.logger.Debug("peer score below publish threshold", zap.Stringer("peer", p), zap.Float64("score", pThreshold))
|
||||||
} else {
|
} else {
|
||||||
healthyPeerCount++
|
healthyPeerCount++
|
||||||
}
|
}
|
||||||
@ -136,9 +136,9 @@ func (pm *PeerManager) checkAndUpdateTopicHealth(topic *NodeTopicDetails) int {
|
|||||||
if errors.Is(err, peerstore.ErrNotFound) {
|
if errors.Is(err, peerstore.ErrNotFound) {
|
||||||
// For now considering peer as healthy if we can't fetch score.
|
// For now considering peer as healthy if we can't fetch score.
|
||||||
healthyPeerCount++
|
healthyPeerCount++
|
||||||
pm.logger.Debug("peer score is not available yet", logging.HostID("peer", p))
|
pm.logger.Debug("peer score is not available yet", zap.Stringer("peer", p))
|
||||||
} else {
|
} else {
|
||||||
pm.logger.Warn("failed to fetch peer score ", zap.Error(err), logging.HostID("peer", p))
|
pm.logger.Warn("failed to fetch peer score ", zap.Error(err), zap.Stringer("peer", p))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -271,7 +271,7 @@ func (pm *PeerManager) GroupPeersByDirection(specificPeers ...peer.ID) (inPeers
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pm.logger.Error("failed to retrieve peer direction",
|
pm.logger.Error("failed to retrieve peer direction",
|
||||||
logging.HostID("peerID", p), zap.Error(err))
|
zap.Stringer("peerID", p), zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return inPeers, outPeers, nil
|
return inPeers, outPeers, nil
|
||||||
@ -399,10 +399,10 @@ func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) {
|
|||||||
err := pm.host.Network().ClosePeer(p)
|
err := pm.host.Network().ClosePeer(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
pm.logger.Warn("failed to disconnect connection towards peer",
|
pm.logger.Warn("failed to disconnect connection towards peer",
|
||||||
logging.HostID("peerID", p))
|
zap.Stringer("peerID", p))
|
||||||
}
|
}
|
||||||
pm.logger.Debug("successfully disconnected connection towards peer",
|
pm.logger.Debug("successfully disconnected connection towards peer",
|
||||||
logging.HostID("peerID", p))
|
zap.Stringer("peerID", p))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -410,7 +410,7 @@ func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID {
|
|||||||
shards, err := wenr.RelaySharding(p.ENR.Record())
|
shards, err := wenr.RelaySharding(p.ENR.Record())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
pm.logger.Error("could not derive relayShards from ENR", zap.Error(err),
|
pm.logger.Error("could not derive relayShards from ENR", zap.Error(err),
|
||||||
logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String()))
|
zap.Stringer("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String()))
|
||||||
} else {
|
} else {
|
||||||
if shards != nil {
|
if shards != nil {
|
||||||
p.PubsubTopics = make([]string, 0)
|
p.PubsubTopics = make([]string, 0)
|
||||||
@ -420,7 +420,7 @@ func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID {
|
|||||||
p.PubsubTopics = append(p.PubsubTopics, topicStr)
|
p.PubsubTopics = append(p.PubsubTopics, topicStr)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pm.logger.Debug("ENR doesn't have relay shards", logging.HostID("peer", p.AddrInfo.ID))
|
pm.logger.Debug("ENR doesn't have relay shards", zap.Stringer("peer", p.AddrInfo.ID))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
supportedProtos := []protocol.ID{}
|
supportedProtos := []protocol.ID{}
|
||||||
@ -446,6 +446,7 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
|
|||||||
if pm.maxPeers <= pm.host.Peerstore().Peers().Len() {
|
if pm.maxPeers <= pm.host.Peerstore().Peers().Len() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
//Check if the peer is already present, if so skip adding
|
//Check if the peer is already present, if so skip adding
|
||||||
_, err := pm.host.Peerstore().(wps.WakuPeerstore).Origin(p.AddrInfo.ID)
|
_, err := pm.host.Peerstore().(wps.WakuPeerstore).Origin(p.AddrInfo.ID)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@ -463,16 +464,17 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
|
|||||||
}
|
}
|
||||||
//Peer is already in peer-store but stored ENR is older than discovered one.
|
//Peer is already in peer-store but stored ENR is older than discovered one.
|
||||||
pm.logger.Info("peer already found in peerstore, but re-adding it as ENR sequence is higher than locally stored",
|
pm.logger.Info("peer already found in peerstore, but re-adding it as ENR sequence is higher than locally stored",
|
||||||
logging.HostID("peer", p.AddrInfo.ID), zap.Uint64("newENRSeq", p.ENR.Seq()), zap.Uint64("storedENRSeq", enr.Record().Seq()))
|
zap.Stringer("peer", p.AddrInfo.ID), logging.Uint64("newENRSeq", p.ENR.Record().Seq()), logging.Uint64("storedENRSeq", enr.Record().Seq()))
|
||||||
} else {
|
} else {
|
||||||
pm.logger.Info("peer already found in peerstore, but no new ENR", logging.HostID("peer", p.AddrInfo.ID))
|
pm.logger.Info("peer already found in peerstore, but no new ENR", zap.Stringer("peer", p.AddrInfo.ID))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
//Peer is in peer-store but it doesn't have an enr
|
//Peer is in peer-store but it doesn't have an enr
|
||||||
pm.logger.Info("peer already found in peerstore, but doesn't have an ENR record, re-adding",
|
pm.logger.Info("peer already found in peerstore, but doesn't have an ENR record, re-adding",
|
||||||
logging.HostID("peer", p.AddrInfo.ID))
|
zap.Stringer("peer", p.AddrInfo.ID))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pm.logger.Debug("adding discovered peer", zap.Stringer("peerID", p.AddrInfo.ID))
|
||||||
|
|
||||||
supportedProtos := []protocol.ID{}
|
supportedProtos := []protocol.ID{}
|
||||||
if len(p.PubsubTopics) == 0 && p.ENR != nil {
|
if len(p.PubsubTopics) == 0 && p.ENR != nil {
|
||||||
@ -483,14 +485,15 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
|
|||||||
_ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubsubTopics, supportedProtos...)
|
_ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubsubTopics, supportedProtos...)
|
||||||
|
|
||||||
if p.ENR != nil {
|
if p.ENR != nil {
|
||||||
|
pm.logger.Debug("setting ENR for peer", zap.Stringer("peerID", p.AddrInfo.ID), zap.Stringer("enr", p.ENR))
|
||||||
err := pm.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR)
|
err := pm.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
pm.logger.Error("could not store enr", zap.Error(err),
|
pm.logger.Error("could not store enr", zap.Error(err),
|
||||||
logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String()))
|
zap.Stringer("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if connectNow {
|
if connectNow {
|
||||||
pm.logger.Debug("connecting now to discovered peer", logging.HostID("peer", p.AddrInfo.ID))
|
pm.logger.Debug("connecting now to discovered peer", zap.Stringer("peer", p.AddrInfo.ID))
|
||||||
go pm.peerConnector.PushToChan(p)
|
go pm.peerConnector.PushToChan(p)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -499,10 +502,10 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
|
|||||||
// It also sets additional metadata such as origin and supported protocols
|
// It also sets additional metadata such as origin and supported protocols
|
||||||
func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) error {
|
func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) error {
|
||||||
if pm.maxPeers <= pm.host.Peerstore().Peers().Len() {
|
if pm.maxPeers <= pm.host.Peerstore().Peers().Len() {
|
||||||
pm.logger.Error("could not add peer as peer store capacity is reached", logging.HostID("peer", ID), zap.Int("capacity", pm.maxPeers))
|
pm.logger.Error("could not add peer as peer store capacity is reached", zap.Stringer("peer", ID), zap.Int("capacity", pm.maxPeers))
|
||||||
return errors.New("peer store capacity reached")
|
return errors.New("peer store capacity reached")
|
||||||
}
|
}
|
||||||
pm.logger.Info("adding peer to peerstore", logging.HostID("peer", ID))
|
pm.logger.Info("adding peer to peerstore", zap.Stringer("peer", ID))
|
||||||
if origin == wps.Static {
|
if origin == wps.Static {
|
||||||
pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.PermanentAddrTTL)
|
pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.PermanentAddrTTL)
|
||||||
} else {
|
} else {
|
||||||
@ -512,14 +515,14 @@ func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Orig
|
|||||||
}
|
}
|
||||||
err := pm.host.Peerstore().(wps.WakuPeerstore).SetOrigin(ID, origin)
|
err := pm.host.Peerstore().(wps.WakuPeerstore).SetOrigin(ID, origin)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
pm.logger.Error("could not set origin", zap.Error(err), logging.HostID("peer", ID))
|
pm.logger.Error("could not set origin", zap.Error(err), zap.Stringer("peer", ID))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(protocols) > 0 {
|
if len(protocols) > 0 {
|
||||||
err = pm.host.Peerstore().AddProtocols(ID, protocols...)
|
err = pm.host.Peerstore().AddProtocols(ID, protocols...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
pm.logger.Error("could not set protocols", zap.Error(err), logging.HostID("peer", ID))
|
pm.logger.Error("could not set protocols", zap.Error(err), zap.Stringer("peer", ID))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -531,7 +534,7 @@ func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Orig
|
|||||||
err = pm.host.Peerstore().(wps.WakuPeerstore).SetPubSubTopics(ID, pubSubTopics)
|
err = pm.host.Peerstore().(wps.WakuPeerstore).SetPubSubTopics(ID, pubSubTopics)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
pm.logger.Error("could not store pubSubTopic", zap.Error(err),
|
pm.logger.Error("could not store pubSubTopic", zap.Error(err),
|
||||||
logging.HostID("peer", ID), zap.Strings("topics", pubSubTopics))
|
zap.Stringer("peer", ID), zap.Strings("topics", pubSubTopics))
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -609,7 +612,7 @@ func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) {
|
|||||||
|
|
||||||
//For now adding the peer to serviceSlot which means the latest added peer would be given priority.
|
//For now adding the peer to serviceSlot which means the latest added peer would be given priority.
|
||||||
//TODO: Ideally we should sort the peers per service and return best peer based on peer score or RTT etc.
|
//TODO: Ideally we should sort the peers per service and return best peer based on peer score or RTT etc.
|
||||||
pm.logger.Info("adding peer to service slots", logging.HostID("peer", peerID),
|
pm.logger.Info("adding peer to service slots", zap.Stringer("peer", peerID),
|
||||||
zap.String("service", string(proto)))
|
zap.String("service", string(proto)))
|
||||||
// getPeers returns nil for WakuRelayIDv200 protocol, but we don't run this ServiceSlot code for WakuRelayIDv200 protocol
|
// getPeers returns nil for WakuRelayIDv200 protocol, but we don't run this ServiceSlot code for WakuRelayIDv200 protocol
|
||||||
pm.serviceSlots.getPeers(proto).add(peerID)
|
pm.serviceSlots.getPeers(proto).add(peerID)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user