tracing scaffolding

This commit is contained in:
vyzo 2019-11-04 19:22:14 +02:00
parent 28a87b3121
commit 67275a6382
4 changed files with 133 additions and 0 deletions

View File

@ -65,6 +65,7 @@ type GossipSubRouter struct {
gossip map[peer.ID][]*pb.ControlIHave // pending gossip
control map[peer.ID]*pb.ControlMessage // pending control messages
mcache *MessageCache
tracer *pubsubTracer
}
func (gs *GossipSubRouter) Protocols() []protocol.ID {
@ -73,16 +74,21 @@ func (gs *GossipSubRouter) Protocols() []protocol.ID {
func (gs *GossipSubRouter) Attach(p *PubSub) {
gs.p = p
if p.tracer != nil {
gs.tracer = p.tracer
}
go gs.heartbeatTimer()
}
func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
log.Debugf("PEERUP: Add new peer %s using %s", p, proto)
gs.tracer.AddPeer(p, proto)
gs.peers[p] = proto
}
func (gs *GossipSubRouter) RemovePeer(p peer.ID) {
log.Debugf("PEERDOWN: Remove disconnected peer %s", p)
gs.tracer.RemovePeer(p)
delete(gs.peers, p)
for _, peers := range gs.mesh {
delete(peers, p)
@ -208,6 +214,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
prune = append(prune, topic)
} else {
log.Debugf("GRAFT: Add mesh link from %s in %s", p, topic)
gs.tracer.Graft(p, topic)
peers[p] = struct{}{}
gs.tagPeer(p, topic)
}
@ -231,6 +238,7 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
peers, ok := gs.mesh[topic]
if ok {
log.Debugf("PRUNE: Remove mesh link to %s in %s", p, topic)
gs.tracer.Prune(p, topic)
delete(peers, p)
gs.untagPeer(p, topic)
}
@ -294,6 +302,7 @@ func (gs *GossipSubRouter) Join(topic string) {
}
log.Debugf("JOIN %s", topic)
gs.tracer.Join(topic)
gmap, ok = gs.fanout[topic]
if ok {
@ -319,6 +328,7 @@ func (gs *GossipSubRouter) Join(topic string) {
for p := range gmap {
log.Debugf("JOIN: Add mesh link to %s in %s", p, topic)
gs.tracer.Graft(p, topic)
gs.sendGraft(p, topic)
gs.tagPeer(p, topic)
}
@ -331,11 +341,13 @@ func (gs *GossipSubRouter) Leave(topic string) {
}
log.Debugf("LEAVE %s", topic)
gs.tracer.Leave(topic)
delete(gs.mesh, topic)
for p := range gmap {
log.Debugf("LEAVE: Remove mesh link to %s in %s", p, topic)
gs.tracer.Prune(p, topic)
gs.sendPrune(p, topic)
gs.untagPeer(p, topic)
}
@ -384,8 +396,10 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) {
select {
case mch <- out:
gs.tracer.SendRPC(out, p)
default:
log.Infof("dropping message to peer %s: queue full", p)
gs.tracer.DropRPC(out, p)
// push control messages that need to be retried
ctl := out.GetControl()
if ctl != nil {
@ -443,6 +457,7 @@ func (gs *GossipSubRouter) heartbeat() {
for _, p := range plst {
log.Debugf("HEARTBEAT: Add mesh link to %s in %s", p, topic)
gs.tracer.Graft(p, topic)
peers[p] = struct{}{}
gs.tagPeer(p, topic)
topics := tograft[p]
@ -458,6 +473,7 @@ func (gs *GossipSubRouter) heartbeat() {
for _, p := range plst[:idontneed] {
log.Debugf("HEARTBEAT: Remove mesh link to %s in %s", p, topic)
gs.tracer.Prune(p, topic)
delete(peers, p)
gs.untagPeer(p, topic)
topics := toprune[p]

View File

@ -46,6 +46,8 @@ type PubSub struct {
disc *discover
tracer *pubsubTracer
// size of the outbound message channel that we maintain for each peer
peerOutboundQueueSize int
@ -321,6 +323,14 @@ func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option {
}
}
// WithEventTracer provides a tracer for the pubsub system
func WithEventTracer(tracer EventTracer) Option {
return func(p *PubSub) error {
p.tracer = &pubsubTracer{tracer: tracer}
return nil
}
}
// processLoop handles all inputs arriving on the channels
func (p *PubSub) processLoop(ctx context.Context) {
defer func() {
@ -671,6 +681,8 @@ func (p *PubSub) notifyLeave(topic string, pid peer.ID) {
}
func (p *PubSub) handleIncomingRPC(rpc *RPC) {
p.tracer.RecvRPC(rpc)
for _, subopt := range rpc.GetSubscriptions() {
t := subopt.GetTopicid()
if subopt.GetSubscribe() {
@ -724,24 +736,28 @@ func (p *PubSub) pushMsg(msg *Message) {
// reject messages from blacklisted peers
if p.blacklist.Contains(src) {
log.Warningf("dropping message from blacklisted peer %s", src)
p.tracer.RejectMessage(msg, "blacklisted peer")
return
}
// even if they are forwarded by good peers
if p.blacklist.Contains(msg.GetFrom()) {
log.Warningf("dropping message from blacklisted source %s", src)
p.tracer.RejectMessage(msg, "blacklisted source")
return
}
// reject unsigned messages when strict before we even process the id
if p.signStrict && msg.Signature == nil {
log.Debugf("dropping unsigned message from %s", src)
p.tracer.RejectMessage(msg, "missing signature")
return
}
// have we already seen and validated this message?
id := msgID(msg.Message)
if p.seenMessage(id) {
p.tracer.DuplicateMessage(msg)
return
}
@ -755,6 +771,7 @@ func (p *PubSub) pushMsg(msg *Message) {
}
func (p *PubSub) publishMessage(msg *Message) {
p.tracer.DeliverMessage(msg)
p.notifySubs(msg)
p.rt.Publish(msg.ReceivedFrom, msg.Message)
}

87
trace.go Normal file
View File

@ -0,0 +1,87 @@
package pubsub
import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)
// Generic event tracer interface
type EventTracer interface {
Trace(evt interface{})
}
type pubsubTracer struct {
tracer EventTracer
}
func (t *pubsubTracer) RejectMessage(msg *Message, reason string) {
if t != nil {
// TODO
}
}
func (t *pubsubTracer) DuplicateMessage(msg *Message) {
if t != nil {
// TODO
}
}
func (t *pubsubTracer) DeliverMessage(msg *Message) {
if t != nil {
// TODO
}
}
func (t *pubsubTracer) AddPeer(p peer.ID, proto protocol.ID) {
if t != nil {
// TODO
}
}
func (t *pubsubTracer) RemovePeer(p peer.ID) {
if t != nil {
// TODO
}
}
func (t *pubsubTracer) RecvRPC(rpc *RPC) {
if t != nil {
// TODO
}
}
func (t *pubsubTracer) SendRPC(rpc *RPC, p peer.ID) {
if t != nil {
// TODO
}
}
func (t *pubsubTracer) DropRPC(rpc *RPC, p peer.ID) {
if t != nil {
// TODO
}
}
func (t *pubsubTracer) Join(topic string) {
if t != nil {
// TODO
}
}
func (t *pubsubTracer) Leave(topic string) {
if t != nil {
// TODO
}
}
func (t *pubsubTracer) Graft(p peer.ID, topic string) {
if t != nil {
// TODO
}
}
func (t *pubsubTracer) Prune(p peer.ID, topic string) {
if t != nil {
// TODO
}
}

View File

@ -31,6 +31,8 @@ type ValidatorOpt func(addVal *addValReq) error
type validation struct {
p *PubSub
tracer *pubsubTracer
// topicVals tracks per topic validators
topicVals map[string]*topicVal
@ -90,6 +92,9 @@ func newValidation() *validation {
// workers
func (v *validation) Start(p *PubSub) {
v.p = p
if p.tracer != nil {
v.tracer = p.tracer
}
for i := 0; i < v.validateWorkers; i++ {
go v.validateWorker()
}
@ -148,6 +153,7 @@ func (v *validation) Push(src peer.ID, msg *Message) bool {
case v.validateQ <- &validateReq{vals, src, msg}:
default:
log.Warningf("message validation throttled; dropping message from %s", src)
v.tracer.RejectMessage(msg, "validation throttled")
}
return false
}
@ -190,6 +196,7 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) {
if msg.Signature != nil {
if !v.validateSignature(msg) {
log.Warningf("message signature validation failed; dropping message from %s", src)
v.tracer.RejectMessage(msg, "invalid signature")
return
}
}
@ -198,6 +205,7 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) {
// and avoid invoking user validators more than once
id := msgID(msg.Message)
if !v.p.markSeen(id) {
v.tracer.DuplicateMessage(msg)
return
}
@ -214,6 +222,7 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) {
for _, val := range inline {
if !val.validateMsg(v.p.ctx, src, msg) {
log.Debugf("message validation failed; dropping message from %s", src)
v.tracer.RejectMessage(msg, "validation failed")
return
}
}
@ -228,6 +237,7 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) {
}()
default:
log.Warningf("message validation throttled; dropping message from %s", src)
v.tracer.RejectMessage(msg, "validation throttled")
}
return
}
@ -249,6 +259,7 @@ func (v *validation) validateSignature(msg *Message) bool {
func (v *validation) doValidateTopic(vals []*topicVal, src peer.ID, msg *Message) {
if !v.validateTopic(vals, src, msg) {
log.Warningf("message validation failed; dropping message from %s", src)
v.tracer.RejectMessage(msg, "validation failed")
return
}
@ -286,6 +297,7 @@ loop:
}
if throttle {
v.tracer.RejectMessage(msg, "validation throttled")
return false
}
@ -310,6 +322,7 @@ func (v *validation) validateSingleTopic(val *topicVal, src peer.ID, msg *Messag
default:
log.Debugf("validation throttled for topic %s", val.topic)
v.tracer.RejectMessage(msg, "validation throttled")
return false
}
}