hook the tracer for peer scoring

This commit is contained in:
vyzo 2020-02-28 14:32:14 +02:00
parent c50a739fb5
commit 455a836d7d
1 changed files with 102 additions and 0 deletions

102
trace.go
View File

@ -14,9 +14,23 @@ type EventTracer interface {
Trace(evt *pb.TraceEvent)
}
// internal interface for score tracing
type scoreTracer interface {
AddPeer(p peer.ID, proto protocol.ID)
RemovePeer(p peer.ID)
Join(topic string)
Leave(topic string)
Graft(p peer.ID, topic string)
Prune(p peer.ID, topic string)
DeliverMessage(msg *Message)
RejectMessage(msg *Message, reason string)
DuplicateMessage(msg *Message)
}
// pubsub tracer details
type pubsubTracer struct {
tracer EventTracer
score scoreTracer
pid peer.ID
msgID MsgIdFunction
}
@ -26,6 +40,10 @@ func (t *pubsubTracer) PublishMessage(msg *Message) {
return
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_PUBLISH_MESSAGE.Enum(),
@ -45,6 +63,14 @@ func (t *pubsubTracer) RejectMessage(msg *Message, reason string) {
return
}
if t.score != nil && msg.ReceivedFrom != t.pid {
t.score.RejectMessage(msg, reason)
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_REJECT_MESSAGE.Enum(),
@ -65,6 +91,14 @@ func (t *pubsubTracer) DuplicateMessage(msg *Message) {
return
}
if t.score != nil && msg.ReceivedFrom != t.pid {
t.score.DuplicateMessage(msg)
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_DUPLICATE_MESSAGE.Enum(),
@ -84,6 +118,14 @@ func (t *pubsubTracer) DeliverMessage(msg *Message) {
return
}
if t.score != nil && msg.ReceivedFrom != t.pid {
t.score.DeliverMessage(msg)
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_DELIVER_MESSAGE.Enum(),
@ -102,6 +144,14 @@ func (t *pubsubTracer) AddPeer(p peer.ID, proto protocol.ID) {
return
}
if t.score != nil {
t.score.AddPeer(p, proto)
}
if t.tracer == nil {
return
}
protoStr := string(proto)
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
@ -122,6 +172,14 @@ func (t *pubsubTracer) RemovePeer(p peer.ID) {
return
}
if t.score != nil {
t.score.RemovePeer(p)
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_REMOVE_PEER.Enum(),
@ -140,6 +198,10 @@ func (t *pubsubTracer) RecvRPC(rpc *RPC) {
return
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_RECV_RPC.Enum(),
@ -159,6 +221,10 @@ func (t *pubsubTracer) SendRPC(rpc *RPC, p peer.ID) {
return
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_SEND_RPC.Enum(),
@ -178,6 +244,10 @@ func (t *pubsubTracer) DropRPC(rpc *RPC, p peer.ID) {
return
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_DROP_RPC.Enum(),
@ -272,6 +342,14 @@ func (t *pubsubTracer) Join(topic string) {
return
}
if t.score != nil {
t.score.Join(topic)
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_JOIN.Enum(),
@ -290,6 +368,14 @@ func (t *pubsubTracer) Leave(topic string) {
return
}
if t.score != nil {
t.score.Leave(topic)
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_LEAVE.Enum(),
@ -308,6 +394,14 @@ func (t *pubsubTracer) Graft(p peer.ID, topic string) {
return
}
if t.score != nil {
t.score.Graft(p, topic)
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_GRAFT.Enum(),
@ -327,6 +421,14 @@ func (t *pubsubTracer) Prune(p peer.ID, topic string) {
return
}
if t.score != nil {
t.score.Prune(p, topic)
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_PRUNE.Enum(),