diff --git a/trace.go b/trace.go index 99b4fa5..c04c734 100644 --- a/trace.go +++ b/trace.go @@ -1,8 +1,12 @@ package pubsub import ( + "time" + "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" + + pb "github.com/libp2p/go-libp2p-pubsub/pb" ) // Generic event tracer interface @@ -16,45 +20,135 @@ type pubsubTracer struct { } func (t *pubsubTracer) PublishMessage(msg *Message) { - if t != nil { - // TODO + if t == nil { + return } + + now := time.Now().UnixNano() + evt := &pb.TraceEvent{ + Type: pb.TraceEvent_PUBLISH_MESSAGE.Enum(), + PeerID: []byte(t.pid), + Timestamp: &now, + PublishMessage: &pb.TraceEvent_PublishMessage{ + MessageID: []byte(msgID(msg.Message)), + }, + } + + t.tracer.Trace(evt) } func (t *pubsubTracer) RejectMessage(msg *Message, reason string) { - if t != nil { - // TODO + if t == nil { + return } + + now := time.Now().UnixNano() + evt := &pb.TraceEvent{ + Type: pb.TraceEvent_REJECT_MESSAGE.Enum(), + PeerID: []byte(t.pid), + Timestamp: &now, + RejectMessage: &pb.TraceEvent_RejectMessage{ + MessageID: []byte(msgID(msg.Message)), + ReceivedFrom: []byte(msg.ReceivedFrom), + Reason: &reason, + }, + } + + t.tracer.Trace(evt) } func (t *pubsubTracer) DuplicateMessage(msg *Message) { - if t != nil { - // TODO + if t == nil { + return } + + now := time.Now().UnixNano() + evt := &pb.TraceEvent{ + Type: pb.TraceEvent_DUPLICATE_MESSAGE.Enum(), + PeerID: []byte(t.pid), + Timestamp: &now, + DuplicateMessage: &pb.TraceEvent_DuplicateMessage{ + MessageID: []byte(msgID(msg.Message)), + ReceivedFrom: []byte(msg.ReceivedFrom), + }, + } + + t.tracer.Trace(evt) } func (t *pubsubTracer) DeliverMessage(msg *Message) { - if t != nil { - // TODO + if t == nil { + return } + + now := time.Now().UnixNano() + evt := &pb.TraceEvent{ + Type: pb.TraceEvent_DELIVER_MESSAGE.Enum(), + PeerID: []byte(t.pid), + Timestamp: &now, + DeliverMessage: &pb.TraceEvent_DeliverMessage{ + MessageID: []byte(msgID(msg.Message)), + }, + } + + t.tracer.Trace(evt) } func (t *pubsubTracer) AddPeer(p peer.ID, proto protocol.ID) { - if t != nil { - // TODO + if t == nil { + return } + + protoStr := string(proto) + now := time.Now().UnixNano() + evt := &pb.TraceEvent{ + Type: pb.TraceEvent_ADD_PEER.Enum(), + PeerID: []byte(t.pid), + Timestamp: &now, + AddPeer: &pb.TraceEvent_AddPeer{ + PeerID: []byte(p), + Proto: &protoStr, + }, + } + + t.tracer.Trace(evt) } func (t *pubsubTracer) RemovePeer(p peer.ID) { - if t != nil { - // TODO + if t == nil { + return } + + now := time.Now().UnixNano() + evt := &pb.TraceEvent{ + Type: pb.TraceEvent_REMOVE_PEER.Enum(), + PeerID: []byte(t.pid), + Timestamp: &now, + RemovePeer: &pb.TraceEvent_RemovePeer{ + PeerID: []byte(p), + }, + } + + t.tracer.Trace(evt) } func (t *pubsubTracer) RecvRPC(rpc *RPC) { - if t != nil { - // TODO + if t == nil { + return } + + now := time.Now().UnixNano() + evt := &pb.TraceEvent{ + Type: pb.TraceEvent_RECV_RPC.Enum(), + PeerID: []byte(t.pid), + Timestamp: &now, + RecvRPC: &pb.TraceEvent_RecvRPC{ + ReceivedFrom: []byte(rpc.from), + Meta: traceRPCMeta(rpc), + }, + } + + t.tracer.Trace(evt) } func (t *pubsubTracer) SendRPC(rpc *RPC, p peer.ID) { @@ -69,26 +163,143 @@ func (t *pubsubTracer) DropRPC(rpc *RPC, p peer.ID) { } } -func (t *pubsubTracer) Join(topic string) { - if t != nil { - // TODO +func traceRPCMeta(rpc *RPC) *pb.TraceEvent_RPCMeta { + rpcMeta := new(pb.TraceEvent_RPCMeta) + + var mids [][]byte + for _, m := range rpc.Publish { + mids = append(mids, []byte(msgID(m))) } + rpcMeta.MessageIDs = mids + + var subs []*pb.TraceEvent_SubMeta + for _, sub := range rpc.Subscriptions { + subs = append(subs, &pb.TraceEvent_SubMeta{ + Subscribe: sub.Subscribe, + Topic: sub.Topicid, + }) + } + rpcMeta.Subscription = subs + + if rpc.Control != nil { + var ihave []*pb.TraceEvent_ControlIHaveMeta + for _, ctl := range rpc.Control.Ihave { + var mids [][]byte + for _, mid := range ctl.MessageIDs { + mids = append(mids, []byte(mid)) + } + ihave = append(ihave, &pb.TraceEvent_ControlIHaveMeta{ + Topic: ctl.TopicID, + MessageIDs: mids, + }) + } + + var iwant []*pb.TraceEvent_ControlIWantMeta + for _, ctl := range rpc.Control.Iwant { + var mids [][]byte + for _, mid := range ctl.MessageIDs { + mids = append(mids, []byte(mid)) + } + iwant = append(iwant, &pb.TraceEvent_ControlIWantMeta{ + MessageIDs: mids, + }) + } + + var graft []*pb.TraceEvent_ControlGraftMeta + for _, ctl := range rpc.Control.Graft { + graft = append(graft, &pb.TraceEvent_ControlGraftMeta{ + Topic: ctl.TopicID, + }) + } + + var prune []*pb.TraceEvent_ControlPruneMeta + for _, ctl := range rpc.Control.Prune { + prune = append(prune, &pb.TraceEvent_ControlPruneMeta{ + Topic: ctl.TopicID, + }) + } + + rpcMeta.Control = &pb.TraceEvent_ControlMeta{ + Ihave: ihave, + Iwant: iwant, + Graft: graft, + Prune: prune, + } + } + + return rpcMeta +} + +func (t *pubsubTracer) Join(topic string) { + if t == nil { + return + } + + now := time.Now().UnixNano() + evt := &pb.TraceEvent{ + Type: pb.TraceEvent_JOIN.Enum(), + PeerID: []byte(t.pid), + Timestamp: &now, + Join: &pb.TraceEvent_Join{ + Topic: &topic, + }, + } + + t.tracer.Trace(evt) } func (t *pubsubTracer) Leave(topic string) { - if t != nil { - // TODO + if t == nil { + return } + + now := time.Now().UnixNano() + evt := &pb.TraceEvent{ + Type: pb.TraceEvent_LEAVE.Enum(), + PeerID: []byte(t.pid), + Timestamp: &now, + Leave: &pb.TraceEvent_Leave{ + Topic: &topic, + }, + } + + t.tracer.Trace(evt) } func (t *pubsubTracer) Graft(p peer.ID, topic string) { - if t != nil { - // TODO + if t == nil { + return } + + now := time.Now().UnixNano() + evt := &pb.TraceEvent{ + Type: pb.TraceEvent_GRAFT.Enum(), + PeerID: []byte(t.pid), + Timestamp: &now, + Graft: &pb.TraceEvent_Graft{ + PeerID: []byte(p), + Topic: &topic, + }, + } + + t.tracer.Trace(evt) } func (t *pubsubTracer) Prune(p peer.ID, topic string) { - if t != nil { - // TODO + if t == nil { + return } + + now := time.Now().UnixNano() + evt := &pb.TraceEvent{ + Type: pb.TraceEvent_PRUNE.Enum(), + PeerID: []byte(t.pid), + Timestamp: &now, + Prune: &pb.TraceEvent_Prune{ + PeerID: []byte(p), + Topic: &topic, + }, + } + + t.tracer.Trace(evt) }