diff --git a/trace.go b/trace.go index 530c614..1924b4e 100644 --- a/trace.go +++ b/trace.go @@ -14,9 +14,23 @@ type EventTracer interface { Trace(evt *pb.TraceEvent) } +// internal interface for score tracing +type scoreTracer interface { + AddPeer(p peer.ID, proto protocol.ID) + RemovePeer(p peer.ID) + Join(topic string) + Leave(topic string) + Graft(p peer.ID, topic string) + Prune(p peer.ID, topic string) + DeliverMessage(msg *Message) + RejectMessage(msg *Message, reason string) + DuplicateMessage(msg *Message) +} + // pubsub tracer details type pubsubTracer struct { tracer EventTracer + score scoreTracer pid peer.ID msgID MsgIdFunction } @@ -26,6 +40,10 @@ func (t *pubsubTracer) PublishMessage(msg *Message) { return } + if t.tracer == nil { + return + } + now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_PUBLISH_MESSAGE.Enum(), @@ -45,6 +63,14 @@ func (t *pubsubTracer) RejectMessage(msg *Message, reason string) { return } + if t.score != nil && msg.ReceivedFrom != t.pid { + t.score.RejectMessage(msg, reason) + } + + if t.tracer == nil { + return + } + now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_REJECT_MESSAGE.Enum(), @@ -65,6 +91,14 @@ func (t *pubsubTracer) DuplicateMessage(msg *Message) { return } + if t.score != nil && msg.ReceivedFrom != t.pid { + t.score.DuplicateMessage(msg) + } + + if t.tracer == nil { + return + } + now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_DUPLICATE_MESSAGE.Enum(), @@ -84,6 +118,14 @@ func (t *pubsubTracer) DeliverMessage(msg *Message) { return } + if t.score != nil && msg.ReceivedFrom != t.pid { + t.score.DeliverMessage(msg) + } + + if t.tracer == nil { + return + } + now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_DELIVER_MESSAGE.Enum(), @@ -102,6 +144,14 @@ func (t *pubsubTracer) AddPeer(p peer.ID, proto protocol.ID) { return } + if t.score != nil { + t.score.AddPeer(p, proto) + } + + if t.tracer == nil { + return + } + protoStr := string(proto) now := time.Now().UnixNano() evt := &pb.TraceEvent{ @@ -122,6 +172,14 @@ func (t *pubsubTracer) RemovePeer(p peer.ID) { return } + if t.score != nil { + t.score.RemovePeer(p) + } + + if t.tracer == nil { + return + } + now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_REMOVE_PEER.Enum(), @@ -140,6 +198,10 @@ func (t *pubsubTracer) RecvRPC(rpc *RPC) { return } + if t.tracer == nil { + return + } + now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_RECV_RPC.Enum(), @@ -159,6 +221,10 @@ func (t *pubsubTracer) SendRPC(rpc *RPC, p peer.ID) { return } + if t.tracer == nil { + return + } + now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_SEND_RPC.Enum(), @@ -178,6 +244,10 @@ func (t *pubsubTracer) DropRPC(rpc *RPC, p peer.ID) { return } + if t.tracer == nil { + return + } + now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_DROP_RPC.Enum(), @@ -272,6 +342,14 @@ func (t *pubsubTracer) Join(topic string) { return } + if t.score != nil { + t.score.Join(topic) + } + + if t.tracer == nil { + return + } + now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_JOIN.Enum(), @@ -290,6 +368,14 @@ func (t *pubsubTracer) Leave(topic string) { return } + if t.score != nil { + t.score.Leave(topic) + } + + if t.tracer == nil { + return + } + now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_LEAVE.Enum(), @@ -308,6 +394,14 @@ func (t *pubsubTracer) Graft(p peer.ID, topic string) { return } + if t.score != nil { + t.score.Graft(p, topic) + } + + if t.tracer == nil { + return + } + now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_GRAFT.Enum(), @@ -327,6 +421,14 @@ func (t *pubsubTracer) Prune(p peer.ID, topic string) { return } + if t.score != nil { + t.score.Prune(p, topic) + } + + if t.tracer == nil { + return + } + now := time.Now().UnixNano() evt := &pb.TraceEvent{ Type: pb.TraceEvent_PRUNE.Enum(),