diff --git a/cmd/waku/server/rest/admin.go b/cmd/waku/server/rest/admin.go index a757b6f5..22628935 100644 --- a/cmd/waku/server/rest/admin.go +++ b/cmd/waku/server/rest/admin.go @@ -53,9 +53,12 @@ func NewAdminService(node *node.WakuNode, m *chi.Mux, log *zap.Logger) *AdminSer func (a *AdminService) getV1Peers(w http.ResponseWriter, req *http.Request) { peers, err := a.node.Peers() if err != nil { + a.log.Error("failed to fetch peers", zap.Error(err)) writeErrOrResponse(w, err, nil) return } + a.log.Info("fetched peers", zap.Int("count", len(peers))) + response := make([]WakuPeer, 0) for _, peer := range peers { wPeer := WakuPeer{ @@ -68,6 +71,7 @@ func (a *AdminService) getV1Peers(w http.ResponseWriter, req *http.Request) { } for _, proto := range peer.Protocols { if !server.IsWakuProtocol(proto) { + a.log.Debug("skipping protocol as it is a non-waku protocol", logging.HostID("peer", peer.ID), zap.String("protocol", string(proto))) continue } wPeer.Protocols = append(wPeer.Protocols, string(proto)) @@ -85,6 +89,7 @@ func (a *AdminService) postV1Peer(w http.ResponseWriter, req *http.Request) { decoder := json.NewDecoder(req.Body) if err := decoder.Decode(&pInfo); err != nil { + a.log.Error("failed to decode request", zap.Error(err)) w.WriteHeader(http.StatusBadRequest) return } @@ -102,6 +107,10 @@ func (a *AdminService) postV1Peer(w http.ResponseWriter, req *http.Request) { topics = append(topics, topic.String()) } + for _, proto := range pInfo.Protocols { + protos = append(protos, protocol.ID(proto)) + } + id, err := a.node.AddPeer(addr, peerstore.Static, topics, protos...) if err != nil { a.log.Error("failed to add peer", zap.Error(err)) diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index ebcecedf..cf014adc 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -272,17 +272,19 @@ func (d *DiscoveryV5) evaluateNode() func(node *enode.Node) bool { if node == nil { return false } + d.log.Debug("found a peer", logging.ENode("enr", node)) // node filtering based on ENR; we do not filter based on ENR in the first waku discv5 beta stage if !isWakuNode(node) { + d.log.Debug("peer is not waku node", logging.ENode("enr", node)) return false } - + d.log.Debug("peer is a waku node", logging.ENode("enr", node)) _, err := wenr.EnodeToPeerInfo(node) if err != nil { d.metrics.RecordError(peerInfoFailure) - utils.Logger().Named("discv5").Error("obtaining peer info from enode", logging.ENode("enr", node), zap.Error(err)) + d.log.Error("obtaining peer info from enode", logging.ENode("enr", node), zap.Error(err)) return false } @@ -404,21 +406,25 @@ func (d *DiscoveryV5) DefaultPredicate() Predicate { nodeRS, err := wenr.RelaySharding(n.Record()) if err != nil { + d.log.Debug("failed to get relay shards from node record", logging.ENode("node", n), zap.Error(err)) return false } if nodeRS == nil { + d.log.Debug("node has no shards registered", logging.ENode("node", n)) // Node has no shards registered. return false } if nodeRS.ClusterID != localRS.ClusterID { + d.log.Debug("cluster id mismatch from local clusterid", logging.ENode("node", n), zap.Error(err)) return false } // Contains any for _, idx := range localRS.ShardIDs { if nodeRS.Contains(localRS.ClusterID, idx) { + d.log.Debug("shards match for discovered node", logging.ENode("node", n)) return true } } diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index 553313af..ece03539 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -130,6 +130,7 @@ func (c *PeerConnectionStrategy) consumeSubscription(s subscription) { if len(c.host.Network().Peers()) < waku_proto.GossipSubOptimalFullMeshSize { triggerImmediateConnection = true } + c.logger.Debug("adding discovered peer", logging.HostID("peer", p.AddrInfo.ID)) c.pm.AddDiscoveredPeer(p, triggerImmediateConnection) case <-time.After(1 * time.Second): diff --git a/waku/v2/peermanager/peer_discovery.go b/waku/v2/peermanager/peer_discovery.go index fd8b316b..c6439146 100644 --- a/waku/v2/peermanager/peer_discovery.go +++ b/waku/v2/peermanager/peer_discovery.go @@ -92,6 +92,8 @@ func (pm *PeerManager) discoverOnDemand(cluster uint16, select { case <-ctx.Done(): + pm.logger.Error("failed to find peers for shard and services", zap.Uint16("cluster", cluster), + zap.Uint16("shard", shard), zap.String("service", string(wakuProtocol)), zap.Error(ctx.Err())) return nil, ctx.Err() default: } diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 1b150670..e9396149 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -173,7 +173,7 @@ func (pm *PeerManager) GroupPeersByDirection(specificPeers ...peer.ID) (inPeers outPeers = append(outPeers, p) } } else { - pm.logger.Error("Failed to retrieve peer direction", + pm.logger.Error("failed to retrieve peer direction", logging.HostID("peerID", p), zap.Error(err)) } } @@ -188,7 +188,7 @@ func (pm *PeerManager) getRelayPeers(specificPeers ...peer.ID) (inRelayPeers pee if err != nil { return } - pm.logger.Debug("Number of peers connected", zap.Int("inPeers", inPeers.Len()), + pm.logger.Debug("number of peers connected", zap.Int("inPeers", inPeers.Len()), zap.Int("outPeers", outPeers.Len())) //Need to filter peers to check if they support relay @@ -211,15 +211,17 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() { curPeers := topicInst.topic.ListPeers() curPeerLen := len(curPeers) if curPeerLen < waku_proto.GossipSubOptimalFullMeshSize { - pm.logger.Info("Subscribed topic is unhealthy, initiating more connections to maintain health", + pm.logger.Debug("subscribed topic is unhealthy, initiating more connections to maintain health", zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curPeerLen), zap.Int("optimumPeers", waku_proto.GossipSubOptimalFullMeshSize)) //Find not connected peers. notConnectedPeers := pm.getNotConnectedPers(topicStr) if notConnectedPeers.Len() == 0 { + pm.logger.Debug("could not find any peers in peerstore to connect to, discovering more", zap.String("pubSubTopic", topicStr)) pm.discoverPeersByPubsubTopics([]string{topicStr}, relay.WakuRelayID_v200, pm.ctx, 2) continue } + pm.logger.Debug("connecting to eligible peers in peerstore", zap.String("pubSubTopic", topicStr)) //Connect to eligible peers. numPeersToConnect := waku_proto.GossipSubOptimalFullMeshSize - curPeerLen @@ -239,7 +241,7 @@ func (pm *PeerManager) connectToRelayPeers() { pm.ensureMinRelayConnsPerTopic() inRelayPeers, outRelayPeers := pm.getRelayPeers() - pm.logger.Info("number of relay peers connected", + pm.logger.Debug("number of relay peers connected", zap.Int("in", inRelayPeers.Len()), zap.Int("out", outRelayPeers.Len())) if inRelayPeers.Len() > 0 && @@ -248,28 +250,10 @@ func (pm *PeerManager) connectToRelayPeers() { } } -// addrInfoToPeerData returns addressinfo for a peer -// If addresses are expired, it removes the peer from host peerStore and returns nil. -func addrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host) *service.PeerData { - addrs := host.Peerstore().Addrs(peerID) - if len(addrs) == 0 { - //Addresses expired, remove peer from peerStore - host.Peerstore().RemovePeer(peerID) - return nil - } - return &service.PeerData{ - Origin: origin, - AddrInfo: peer.AddrInfo{ - ID: peerID, - Addrs: addrs, - }, - } -} - // connectToPeers connects to peers provided in the list if the addresses have not expired. func (pm *PeerManager) connectToPeers(peers peer.IDSlice) { for _, peerID := range peers { - peerData := addrInfoToPeerData(wps.PeerManager, peerID, pm.host) + peerData := AddrInfoToPeerData(wps.PeerManager, peerID, pm.host) if peerData == nil { continue } @@ -306,10 +290,10 @@ func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) { p := inRelayPeers[pruningStartIndex] err := pm.host.Network().ClosePeer(p) if err != nil { - pm.logger.Warn("Failed to disconnect connection towards peer", + pm.logger.Warn("failed to disconnect connection towards peer", logging.HostID("peerID", p)) } - pm.logger.Debug("Successfully disconnected connection towards peer", + pm.logger.Debug("successfully disconnected connection towards peer", logging.HostID("peerID", p)) } } @@ -357,7 +341,7 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { //Check if the peer is already present, if so skip adding _, err := pm.host.Peerstore().(wps.WakuPeerstore).Origin(p.AddrInfo.ID) if err == nil { - pm.logger.Debug("Found discovered peer already in peerStore", logging.HostID("peer", p.AddrInfo.ID)) + pm.logger.Debug("peer already in peerStore", logging.HostID("peer", p.AddrInfo.ID)) return } supportedProtos := []protocol.ID{} @@ -376,6 +360,7 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { } } if connectNow { + pm.logger.Debug("connecting now to discovered peer", logging.HostID("peer", p.AddrInfo.ID)) go pm.peerConnector.PushToChan(p) } } @@ -384,6 +369,7 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { // It also sets additional metadata such as origin, ENR and supported protocols func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) error { if pm.maxPeers <= pm.host.Peerstore().Peers().Len() { + pm.logger.Error("could not add peer as peer store capacity is reached", logging.HostID("peer", ID), zap.Int("capacity", pm.maxPeers)) return errors.New("peer store capacity reached") } pm.logger.Info("adding peer to peerstore", logging.HostID("peer", ID)) @@ -403,6 +389,7 @@ func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Orig if len(protocols) > 0 { err = pm.host.Peerstore().AddProtocols(ID, protocols...) if err != nil { + pm.logger.Error("could not set protocols", zap.Error(err), logging.HostID("peer", ID)) return err } } @@ -492,7 +479,7 @@ func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) { //For now adding the peer to serviceSlot which means the latest added peer would be given priority. //TODO: Ideally we should sort the peers per service and return best peer based on peer score or RTT etc. - pm.logger.Info("Adding peer to service slots", logging.HostID("peer", peerID), + pm.logger.Info("adding peer to service slots", logging.HostID("peer", peerID), zap.String("service", string(proto))) // getPeers returns nil for WakuRelayIDv200 protocol, but we don't run this ServiceSlot code for WakuRelayIDv200 protocol pm.serviceSlots.getPeers(proto).add(peerID) diff --git a/waku/v2/peermanager/peer_selection.go b/waku/v2/peermanager/peer_selection.go index 72e1d062..bfaab97a 100644 --- a/waku/v2/peermanager/peer_selection.go +++ b/waku/v2/peermanager/peer_selection.go @@ -25,6 +25,7 @@ func (pm *PeerManager) SelectPeerByContentTopics(proto protocol.ID, contentTopic for _, cTopic := range contentTopics { pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(cTopic) if err != nil { + pm.logger.Debug("selectPeer: failed to get contentTopic from pubsubTopic", zap.String("contentTopic", cTopic)) return "", err } pubsubTopics = append(pubsubTopics, pubsubTopic) @@ -82,7 +83,7 @@ func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubsubTopics []strin if err == nil { return peerID, nil } else { - pm.logger.Debug("Discovering peers by pubsubTopic", zap.Strings("pubsubTopics", pubsubTopics)) + pm.logger.Debug("discovering peers by pubsubTopic", zap.Strings("pubsubTopics", pubsubTopics)) //Trigger on-demand discovery for this topic and connect to peer immediately. //For now discover atleast 1 peer for the criteria pm.discoverPeersByPubsubTopics(pubsubTopics, proto, ctx, 1)