mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-05-20 23:49:31 +00:00
track message delivery time within the validation pipeline
This commit is contained in:
parent
3610b05538
commit
e17e266aa9
23
score.go
23
score.go
@ -378,13 +378,32 @@ func (ps *peerScore) DeliverMessage(msg *Message) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ps *peerScore) ValidateMessage(msg *Message) {
|
||||||
|
ps.Lock()
|
||||||
|
defer ps.Unlock()
|
||||||
|
|
||||||
|
// the pubsub subsystem is beginning validation; create a record to track time in
|
||||||
|
// the validation pipeline with an accurate firstSeen time.
|
||||||
|
_ = ps.deliveries.getRecord(ps.msgID(msg.Message))
|
||||||
|
}
|
||||||
|
|
||||||
func (ps *peerScore) RejectMessage(msg *Message, reason string) {
|
func (ps *peerScore) RejectMessage(msg *Message, reason string) {
|
||||||
ps.Lock()
|
ps.Lock()
|
||||||
defer ps.Unlock()
|
defer ps.Unlock()
|
||||||
|
|
||||||
if reason == "invalid signature" {
|
// TODO: the reasons should become named strings; good enough for now.
|
||||||
|
switch reason {
|
||||||
|
// we don't track those messages, but we penalize the peer as they are clearly invalid
|
||||||
|
case "missing signature":
|
||||||
|
fallthrough
|
||||||
|
case "invalid signature":
|
||||||
ps.markInvalidMessageDelivery(msg.ReceivedFrom, msg)
|
ps.markInvalidMessageDelivery(msg.ReceivedFrom, msg)
|
||||||
// if we reject with "invalid signature" we don't track this message delivery.
|
return
|
||||||
|
|
||||||
|
// we ignore those messages, so do nothing.
|
||||||
|
case "blacklisted peer":
|
||||||
|
fallthrough
|
||||||
|
case "blacklisted source":
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
11
trace.go
11
trace.go
@ -22,6 +22,7 @@ type scoreTracer interface {
|
|||||||
Leave(topic string)
|
Leave(topic string)
|
||||||
Graft(p peer.ID, topic string)
|
Graft(p peer.ID, topic string)
|
||||||
Prune(p peer.ID, topic string)
|
Prune(p peer.ID, topic string)
|
||||||
|
ValidateMessage(msg *Message)
|
||||||
DeliverMessage(msg *Message)
|
DeliverMessage(msg *Message)
|
||||||
RejectMessage(msg *Message, reason string)
|
RejectMessage(msg *Message, reason string)
|
||||||
DuplicateMessage(msg *Message)
|
DuplicateMessage(msg *Message)
|
||||||
@ -58,6 +59,16 @@ func (t *pubsubTracer) PublishMessage(msg *Message) {
|
|||||||
t.tracer.Trace(evt)
|
t.tracer.Trace(evt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *pubsubTracer) ValidateMessage(msg *Message) {
|
||||||
|
if t == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if t.score != nil && msg.ReceivedFrom != t.pid {
|
||||||
|
t.score.ValidateMessage(msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (t *pubsubTracer) RejectMessage(msg *Message, reason string) {
|
func (t *pubsubTracer) RejectMessage(msg *Message, reason string) {
|
||||||
if t == nil {
|
if t == nil {
|
||||||
return
|
return
|
||||||
|
|||||||
@ -206,6 +206,8 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) {
|
|||||||
if !v.p.markSeen(id) {
|
if !v.p.markSeen(id) {
|
||||||
v.tracer.DuplicateMessage(msg)
|
v.tracer.DuplicateMessage(msg)
|
||||||
return
|
return
|
||||||
|
} else {
|
||||||
|
v.tracer.ValidateMessage(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
var inline, async []*topicVal
|
var inline, async []*topicVal
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user