basic logging for gossipsub
This commit is contained in:
parent
0757ff4be4
commit
bb5dd40680
26
gossipsub.go
26
gossipsub.go
|
@ -75,10 +75,12 @@ func (gs *GossipSubRouter) Attach(p *PubSub) {
|
|||
}
|
||||
|
||||
func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
|
||||
log.Debugf("PEERUP: Add new peer %s using %s", p, proto)
|
||||
gs.peers[p] = proto
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) RemovePeer(p peer.ID) {
|
||||
log.Debugf("PEERDOWN: Remove disconnected peer %s", p)
|
||||
delete(gs.peers, p)
|
||||
for _, peers := range gs.mesh {
|
||||
delete(peers, p)
|
||||
|
@ -96,8 +98,8 @@ func (gs *GossipSubRouter) HandleRPC(rpc *RPC) {
|
|||
return
|
||||
}
|
||||
|
||||
iwant := gs.handleIHave(ctl)
|
||||
ihave := gs.handleIWant(ctl)
|
||||
iwant := gs.handleIHave(rpc.from, ctl)
|
||||
ihave := gs.handleIWant(rpc.from, ctl)
|
||||
prune := gs.handleGraft(rpc.from, ctl)
|
||||
gs.handlePrune(rpc.from, ctl)
|
||||
|
||||
|
@ -109,7 +111,7 @@ func (gs *GossipSubRouter) HandleRPC(rpc *RPC) {
|
|||
gs.sendRPC(rpc.from, out)
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) handleIHave(ctl *pb.ControlMessage) []*pb.ControlIWant {
|
||||
func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlIWant {
|
||||
iwant := make(map[string]struct{})
|
||||
|
||||
for _, ihave := range ctl.GetIhave() {
|
||||
|
@ -131,6 +133,8 @@ func (gs *GossipSubRouter) handleIHave(ctl *pb.ControlMessage) []*pb.ControlIWan
|
|||
return nil
|
||||
}
|
||||
|
||||
log.Debugf("IHAVE: Asking for %d messages from %s", len(iwant), p)
|
||||
|
||||
iwantlst := make([]string, 0, len(iwant))
|
||||
for mid := range iwant {
|
||||
iwantlst = append(iwantlst, mid)
|
||||
|
@ -139,7 +143,7 @@ func (gs *GossipSubRouter) handleIHave(ctl *pb.ControlMessage) []*pb.ControlIWan
|
|||
return []*pb.ControlIWant{&pb.ControlIWant{MessageIDs: iwantlst}}
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) handleIWant(ctl *pb.ControlMessage) []*pb.Message {
|
||||
func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.Message {
|
||||
ihave := make(map[string]*pb.Message)
|
||||
for _, iwant := range ctl.GetIwant() {
|
||||
for _, mid := range iwant.GetMessageIDs() {
|
||||
|
@ -154,6 +158,8 @@ func (gs *GossipSubRouter) handleIWant(ctl *pb.ControlMessage) []*pb.Message {
|
|||
return nil
|
||||
}
|
||||
|
||||
log.Debugf("IWANT: Sending %d messages to %s", len(ihave), p)
|
||||
|
||||
msgs := make([]*pb.Message, 0, len(ihave))
|
||||
for _, msg := range ihave {
|
||||
msgs = append(msgs, msg)
|
||||
|
@ -170,6 +176,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
|
|||
if !ok {
|
||||
prune = append(prune, topic)
|
||||
} else {
|
||||
log.Debugf("GRAFT: Add mesh link from %s in %s", p, topic)
|
||||
peers[p] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
@ -191,6 +198,7 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
|
|||
topic := prune.GetTopicID()
|
||||
peers, ok := gs.mesh[topic]
|
||||
if ok {
|
||||
log.Debugf("PRUNE: Remove mesh link to %s in %s", p, topic)
|
||||
delete(peers, p)
|
||||
}
|
||||
}
|
||||
|
@ -252,6 +260,8 @@ func (gs *GossipSubRouter) Join(topic string) {
|
|||
return
|
||||
}
|
||||
|
||||
log.Debugf("JOIN %s", topic)
|
||||
|
||||
gmap, ok = gs.fanout[topic]
|
||||
if ok {
|
||||
gs.mesh[topic] = gmap
|
||||
|
@ -264,6 +274,7 @@ func (gs *GossipSubRouter) Join(topic string) {
|
|||
}
|
||||
|
||||
for p := range gmap {
|
||||
log.Debugf("JOIN: Add mesh link to %s in %s", p, topic)
|
||||
gs.sendGraft(p, topic)
|
||||
}
|
||||
}
|
||||
|
@ -274,9 +285,12 @@ func (gs *GossipSubRouter) Leave(topic string) {
|
|||
return
|
||||
}
|
||||
|
||||
log.Debugf("LEAVE %s", topic)
|
||||
|
||||
delete(gs.mesh, topic)
|
||||
|
||||
for p := range gmap {
|
||||
log.Debugf("LEAVE: Remove mesh link to %s in %s", p, topic)
|
||||
gs.sendPrune(p, topic)
|
||||
}
|
||||
}
|
||||
|
@ -344,6 +358,8 @@ func (gs *GossipSubRouter) heartbeatTimer() {
|
|||
}
|
||||
|
||||
func (gs *GossipSubRouter) heartbeat() {
|
||||
defer log.EventBegin(gs.p.ctx, "heartbeat").Done()
|
||||
|
||||
// flush pending control message from retries and gossip
|
||||
// that hasn't been piggybacked since the last heartbeat
|
||||
gs.flush()
|
||||
|
@ -364,6 +380,7 @@ func (gs *GossipSubRouter) heartbeat() {
|
|||
})
|
||||
|
||||
for _, p := range plst {
|
||||
log.Debugf("HEARTBEAT: Add mesh link to %s in %s", p, topic)
|
||||
peers[p] = struct{}{}
|
||||
topics := tograft[p]
|
||||
tograft[p] = append(topics, topic)
|
||||
|
@ -377,6 +394,7 @@ func (gs *GossipSubRouter) heartbeat() {
|
|||
shufflePeers(plst)
|
||||
|
||||
for _, p := range plst[:idontneed] {
|
||||
log.Debugf("HEARTBEAT: Remove mesh link to %s in %s", p, topic)
|
||||
delete(peers, p)
|
||||
topics := toprune[p]
|
||||
toprune[p] = append(topics, topic)
|
||||
|
|
Loading…
Reference in New Issue