package pubsub import ( "time" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" pb "github.com/libp2p/go-libp2p-pubsub/pb" ) // EventTracer is a generic event tracer interface. // This is a high level tracing interface which delivers tracing events, as defined by the protobuf // schema in pb/trace.proto. type EventTracer interface { Trace(evt *pb.TraceEvent) } // RawTracer is a low level tracing interface that allows an application to trace the internal // operation of the pubsub subsystem. // // Note that the tracers are invoked synchronously, which means that application tracers must // take care to not block or modify arguments. // // Warning: this interface is not fixed, we may be adding new methods as necessitated by the system // in the future. type RawTracer interface { // AddPeer is invoked when a new peer is added. AddPeer(p peer.ID, proto protocol.ID) // RemovePeer is invoked when a peer is removed. RemovePeer(p peer.ID) // Join is invoked when a new topic is joined Join(topic string) // Leave is invoked when a topic is abandoned Leave(topic string) // Graft is invoked when a new peer is grafted on the mesh (gossipsub) Graft(p peer.ID, topic string) // Prune is invoked when a peer is pruned from the message (gossipsub) Prune(p peer.ID, topic string) // ValidateMessage is invoked when a message first enters the validation pipeline. ValidateMessage(msg *Message) // DeliverMessage is invoked when a message is delivered DeliverMessage(msg *Message) // RejectMessage is invoked when a message is Rejected or Ignored. // The reason argument can be one of the named strings Reject*. RejectMessage(msg *Message, reason string) // DuplicateMessage is invoked when a duplicate message is dropped. DuplicateMessage(msg *Message) // ThrottlePeer is invoked when a peer is throttled by the peer gater. ThrottlePeer(p peer.ID) // RecvRPC is invoked when an incoming RPC is received. RecvRPC(rpc *RPC) // SendRPC is invoked when a RPC is sent. SendRPC(rpc *RPC, p peer.ID) // DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full. DropRPC(rpc *RPC, p peer.ID) // UndeliverableMessage is invoked when the consumer of Subscribe is not reading messages fast enough and // the pressure release mechanism trigger, dropping messages. UndeliverableMessage(msg *Message) } // pubsub tracer details type pubsubTracer struct { tracer EventTracer raw []RawTracer pid peer.ID idGen *msgIDGenerator } func (t *pubsubTracer) PublishMessage(msg *Message) { if t == nil { return } if t.tracer == nil { return } now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_PUBLISH_MESSAGE.Enum(), PeerID: []byte(t.pid), Timestamp: &now, PublishMessage: &pb.TraceEvent_PublishMessage{ MessageID: []byte(t.idGen.ID(msg)), Topic: msg.Message.Topic, }, } t.tracer.Trace(evt) } func (t *pubsubTracer) ValidateMessage(msg *Message) { if t == nil { return } if msg.ReceivedFrom != t.pid { for _, tr := range t.raw { tr.ValidateMessage(msg) } } } func (t *pubsubTracer) RejectMessage(msg *Message, reason string) { if t == nil { return } if msg.ReceivedFrom != t.pid { for _, tr := range t.raw { tr.RejectMessage(msg, reason) } } if t.tracer == nil { return } now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_REJECT_MESSAGE.Enum(), PeerID: []byte(t.pid), Timestamp: &now, RejectMessage: &pb.TraceEvent_RejectMessage{ MessageID: []byte(t.idGen.ID(msg)), ReceivedFrom: []byte(msg.ReceivedFrom), Reason: &reason, Topic: msg.Topic, }, } t.tracer.Trace(evt) } func (t *pubsubTracer) DuplicateMessage(msg *Message) { if t == nil { return } if msg.ReceivedFrom != t.pid { for _, tr := range t.raw { tr.DuplicateMessage(msg) } } if t.tracer == nil { return } now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_DUPLICATE_MESSAGE.Enum(), PeerID: []byte(t.pid), Timestamp: &now, DuplicateMessage: &pb.TraceEvent_DuplicateMessage{ MessageID: []byte(t.idGen.ID(msg)), ReceivedFrom: []byte(msg.ReceivedFrom), Topic: msg.Topic, }, } t.tracer.Trace(evt) } func (t *pubsubTracer) DeliverMessage(msg *Message) { if t == nil { return } if msg.ReceivedFrom != t.pid { for _, tr := range t.raw { tr.DeliverMessage(msg) } } if t.tracer == nil { return } now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_DELIVER_MESSAGE.Enum(), PeerID: []byte(t.pid), Timestamp: &now, DeliverMessage: &pb.TraceEvent_DeliverMessage{ MessageID: []byte(t.idGen.ID(msg)), Topic: msg.Topic, ReceivedFrom: []byte(msg.ReceivedFrom), }, } t.tracer.Trace(evt) } func (t *pubsubTracer) AddPeer(p peer.ID, proto protocol.ID) { if t == nil { return } for _, tr := range t.raw { tr.AddPeer(p, proto) } if t.tracer == nil { return } protoStr := string(proto) now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_ADD_PEER.Enum(), PeerID: []byte(t.pid), Timestamp: &now, AddPeer: &pb.TraceEvent_AddPeer{ PeerID: []byte(p), Proto: &protoStr, }, } t.tracer.Trace(evt) } func (t *pubsubTracer) RemovePeer(p peer.ID) { if t == nil { return } for _, tr := range t.raw { tr.RemovePeer(p) } if t.tracer == nil { return } now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_REMOVE_PEER.Enum(), PeerID: []byte(t.pid), Timestamp: &now, RemovePeer: &pb.TraceEvent_RemovePeer{ PeerID: []byte(p), }, } t.tracer.Trace(evt) } func (t *pubsubTracer) RecvRPC(rpc *RPC) { if t == nil { return } for _, tr := range t.raw { tr.RecvRPC(rpc) } if t.tracer == nil { return } now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_RECV_RPC.Enum(), PeerID: []byte(t.pid), Timestamp: &now, RecvRPC: &pb.TraceEvent_RecvRPC{ ReceivedFrom: []byte(rpc.from), Meta: t.traceRPCMeta(rpc), }, } t.tracer.Trace(evt) } func (t *pubsubTracer) SendRPC(rpc *RPC, p peer.ID) { if t == nil { return } for _, tr := range t.raw { tr.SendRPC(rpc, p) } if t.tracer == nil { return } now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_SEND_RPC.Enum(), PeerID: []byte(t.pid), Timestamp: &now, SendRPC: &pb.TraceEvent_SendRPC{ SendTo: []byte(p), Meta: t.traceRPCMeta(rpc), }, } t.tracer.Trace(evt) } func (t *pubsubTracer) DropRPC(rpc *RPC, p peer.ID) { if t == nil { return } for _, tr := range t.raw { tr.DropRPC(rpc, p) } if t.tracer == nil { return } now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_DROP_RPC.Enum(), PeerID: []byte(t.pid), Timestamp: &now, DropRPC: &pb.TraceEvent_DropRPC{ SendTo: []byte(p), Meta: t.traceRPCMeta(rpc), }, } t.tracer.Trace(evt) } func (t *pubsubTracer) UndeliverableMessage(msg *Message) { if t == nil { return } for _, tr := range t.raw { tr.UndeliverableMessage(msg) } } func (t *pubsubTracer) traceRPCMeta(rpc *RPC) *pb.TraceEvent_RPCMeta { rpcMeta := new(pb.TraceEvent_RPCMeta) var msgs []*pb.TraceEvent_MessageMeta for _, m := range rpc.Publish { msgs = append(msgs, &pb.TraceEvent_MessageMeta{ MessageID: []byte(t.idGen.RawID(m)), Topic: m.Topic, }) } rpcMeta.Messages = msgs var subs []*pb.TraceEvent_SubMeta for _, sub := range rpc.Subscriptions { subs = append(subs, &pb.TraceEvent_SubMeta{ Subscribe: sub.Subscribe, Topic: sub.Topicid, }) } rpcMeta.Subscription = subs if rpc.Control != nil { var ihave []*pb.TraceEvent_ControlIHaveMeta for _, ctl := range rpc.Control.Ihave { var mids [][]byte for _, mid := range ctl.MessageIDs { mids = append(mids, []byte(mid)) } ihave = append(ihave, &pb.TraceEvent_ControlIHaveMeta{ Topic: ctl.TopicID, MessageIDs: mids, }) } var iwant []*pb.TraceEvent_ControlIWantMeta for _, ctl := range rpc.Control.Iwant { var mids [][]byte for _, mid := range ctl.MessageIDs { mids = append(mids, []byte(mid)) } iwant = append(iwant, &pb.TraceEvent_ControlIWantMeta{ MessageIDs: mids, }) } var graft []*pb.TraceEvent_ControlGraftMeta for _, ctl := range rpc.Control.Graft { graft = append(graft, &pb.TraceEvent_ControlGraftMeta{ Topic: ctl.TopicID, }) } var prune []*pb.TraceEvent_ControlPruneMeta for _, ctl := range rpc.Control.Prune { peers := make([][]byte, 0, len(ctl.Peers)) for _, pi := range ctl.Peers { peers = append(peers, pi.PeerID) } prune = append(prune, &pb.TraceEvent_ControlPruneMeta{ Topic: ctl.TopicID, Peers: peers, }) } rpcMeta.Control = &pb.TraceEvent_ControlMeta{ Ihave: ihave, Iwant: iwant, Graft: graft, Prune: prune, } } return rpcMeta } func (t *pubsubTracer) Join(topic string) { if t == nil { return } for _, tr := range t.raw { tr.Join(topic) } if t.tracer == nil { return } now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_JOIN.Enum(), PeerID: []byte(t.pid), Timestamp: &now, Join: &pb.TraceEvent_Join{ Topic: &topic, }, } t.tracer.Trace(evt) } func (t *pubsubTracer) Leave(topic string) { if t == nil { return } for _, tr := range t.raw { tr.Leave(topic) } if t.tracer == nil { return } now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_LEAVE.Enum(), PeerID: []byte(t.pid), Timestamp: &now, Leave: &pb.TraceEvent_Leave{ Topic: &topic, }, } t.tracer.Trace(evt) } func (t *pubsubTracer) Graft(p peer.ID, topic string) { if t == nil { return } for _, tr := range t.raw { tr.Graft(p, topic) } if t.tracer == nil { return } now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_GRAFT.Enum(), PeerID: []byte(t.pid), Timestamp: &now, Graft: &pb.TraceEvent_Graft{ PeerID: []byte(p), Topic: &topic, }, } t.tracer.Trace(evt) } func (t *pubsubTracer) Prune(p peer.ID, topic string) { if t == nil { return } for _, tr := range t.raw { tr.Prune(p, topic) } if t.tracer == nil { return } now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_PRUNE.Enum(), PeerID: []byte(t.pid), Timestamp: &now, Prune: &pb.TraceEvent_Prune{ PeerID: []byte(p), Topic: &topic, }, } t.tracer.Trace(evt) } func (t *pubsubTracer) ThrottlePeer(p peer.ID) { if t == nil { return } for _, tr := range t.raw { tr.ThrottlePeer(p) } }