message delivery tracking

This commit is contained in:
vyzo 2020-03-08 16:48:45 +02:00
parent dd3ce7760c
commit 16eeacd798
1 changed files with 200 additions and 16 deletions

216
score.go
View File

@ -58,7 +58,7 @@ type TopicScoreParams struct {
FirstMessageDeliveriesCap float64
// P3: mesh message deliveries
// This is the number of message deliveries in the mesh, within the MeshMessageDeliveriesLatency of
// This is the number of message deliveries in the mesh, within the MeshMessageDeliveriesWindow of
// the first message delivery.
// The parameter has an associated counter, decaying with MessageMessageDeliveriesDecay.
// If the counter exceeds the threshold, its value is 0.
@ -66,9 +66,9 @@ type TopicScoreParams struct {
// the deficit, ie (MessageDeliveriesThreshold - counter)^2
// The penalty is only activated after MeshMessageDeliveriesActivation time in the mesh.
// The weight of the parameter MUST be negative.
MeshMessageDeliveriesWeight, MeshMessageDeliveriesDecay float64
MeshMessageDeliveriesThreshold float64
MeshMessageDeliveriesLatency, MeshMessageDeliveriesActivation time.Duration
MeshMessageDeliveriesWeight, MeshMessageDeliveriesDecay float64
MeshMessageDeliveriesCap, MeshMessageDeliveriesThreshold float64
MeshMessageDeliveriesWindow, MeshMessageDeliveriesActivation time.Duration
// P4: invalid messages
// This is the number of invalid messages in the topic.
@ -126,15 +126,39 @@ type peerScore struct {
// IP colocation tracking
peerIPs map[string]map[peer.ID]struct{}
// message delivery tracking
deliveries *messageDeliveries
msgID MsgIdFunction
}
type messageDeliveries struct {
// TODO
}
type deliveryRecord struct {
firstSeen time.Time
status int
peers map[peer.ID]time.Time
}
// delivery record status
const (
delivery_unknown = iota // we don't know (yet) if the message is valid
delivery_valid
delivery_invalid
delivery_throttled
)
func newPeerScore(params *PeerScoreParams) *peerScore {
// TODO
return nil
}
// router interface
func (ps *peerScore) Start(gs *GossipSubRouter) {
// TODO
}
func (ps *peerScore) Score(p peer.ID) float64 {
@ -266,41 +290,201 @@ func (ps *peerScore) refreshScores() {
func (ps *peerScore) resfreshIPs() {
// peer IPs may change, so we periodically refresh them
// TODO
}
// tracer interface
func (ps *peerScore) AddPeer(p peer.ID, proto protocol.ID) {
// TODO
}
func (ps *peerScore) RemovePeer(p peer.ID) {
// TODO
}
func (ps *peerScore) Join(topic string) {
}
func (ps *peerScore) Leave(topic string) {
}
func (ps *peerScore) Join(topic string) {}
func (ps *peerScore) Leave(topic string) {}
func (ps *peerScore) Graft(p peer.ID, topic string) {
// TODO
}
func (ps *peerScore) Prune(p peer.ID, topic string) {
// TODO
}
func (ps *peerScore) DeliverMessage(msg *Message) {
ps.Lock()
defer ps.Unlock()
ps.markFirstMessageDelivery(msg.ReceivedFrom, msg)
drec := ps.deliveries.getRecord(ps.msgID(msg.Message))
// mark the message as valid and reward mesh peers that have forwarded it in time
drec.status = delivery_valid
for p, t := range drec.peers {
ps.markDuplicateMessageDelivery(p, msg, drec.firstSeen, t)
}
}
func (ps *peerScore) RejectMessage(msg *Message, reason string) {
ps.Lock()
defer ps.Unlock()
if reason == "invalid signature" {
ps.markInvalidMessageDelivery(msg.ReceivedFrom, msg)
// if we reject with "invalid signature" we don't track this message delivery.
return
}
drec := ps.deliveries.getRecord(ps.msgID(msg.Message))
if reason == "validation throttled" {
// if we reject with "validation throttled" we don't penalize the peer(s) that forward it
// because we don't know if it was valid.
drec.status = delivery_throttled
// release the delivery time tracking map to free some memory early
drec.peers = nil
return
}
// mark the message as invalid and penalize peers that have already forwarded it.
drec.status = delivery_invalid
for p := range drec.peers {
ps.markInvalidMessageDelivery(p, msg)
}
// release the delivery time tracking map to free some memory early
drec.peers = nil
}
func (ps *peerScore) DuplicateMessage(msg *Message) {
ps.Lock()
defer ps.Unlock()
drec := ps.deliveries.getRecord(ps.msgID(msg.Message))
_, ok := drec.peers[msg.ReceivedFrom]
if ok {
// we have already seen this duplicate!
return
}
switch drec.status {
case delivery_unknown:
// the message is being validated; track the peer delivery time and wait for
// the Deliver/Reject notification.
now := time.Now()
drec.peers[msg.ReceivedFrom] = now
case delivery_valid:
// mark the peer delivery time to only count a duplicate delivery once.
now := time.Now()
drec.peers[msg.ReceivedFrom] = now
ps.markDuplicateMessageDelivery(msg.ReceivedFrom, msg, drec.firstSeen, now)
case delivery_invalid:
// we no longer track delivery time
ps.markInvalidMessageDelivery(msg.ReceivedFrom, msg)
case delivery_throttled:
// the message ws throttled; do nothing (we don't know if it was valid)
}
}
// message delivery records
func (d *messageDeliveries) getRecord(id string) *deliveryRecord {
// TODO
return nil
}
// utilities
func (pstats *peerStats) getTopicStats(topic string, params *PeerScoreParams) (*topicStats, bool) {
tstats, ok := pstats.topics[topic]
if ok {
return tstats, true
}
_, scoredTopic := params.Topics[topic]
if !scoredTopic {
return nil, false
}
tstats = &topicStats{}
pstats.topics[topic] = tstats
return tstats, true
}
func (ps *peerScore) markInvalidMessageDelivery(p peer.ID, msg *Message) {
pstats, ok := ps.peerStats[p]
if !ok {
return
}
for _, topic := range msg.GetTopicIDs() {
tstats, ok := pstats.getTopicStats(topic, ps.params)
if !ok {
continue
}
tstats.invalidMessageDeliveries += 1
}
}
func (ps *peerScore) markFirstMessageDelivery(p peer.ID, msg *Message) {
pstats, ok := ps.peerStats[p]
if !ok {
return
}
for _, topic := range msg.GetTopicIDs() {
tstats, ok := pstats.getTopicStats(topic, ps.params)
if !ok {
continue
}
tstats.firstMessageDeliveries += 1
if tstats.firstMessageDeliveries > ps.params.Topics[topic].FirstMessageDeliveriesCap {
tstats.firstMessageDeliveries = ps.params.Topics[topic].FirstMessageDeliveriesCap
}
if !tstats.inMesh {
continue
}
tstats.meshMessageDeliveries += 1
if tstats.meshMessageDeliveries > ps.params.Topics[topic].MeshMessageDeliveriesCap {
tstats.meshMessageDeliveries = ps.params.Topics[topic].MeshMessageDeliveriesCap
}
}
}
func (ps *peerScore) markDuplicateMessageDelivery(p peer.ID, msg *Message, firstSeen, deliveryTime time.Time) {
pstats, ok := ps.peerStats[p]
if !ok {
return
}
for _, topic := range msg.GetTopicIDs() {
tstats, ok := pstats.getTopicStats(topic, ps.params)
if !ok {
continue
}
if !tstats.inMesh {
continue
}
// check against the mesh delivery window
if deliveryTime.After(firstSeen.Add(ps.params.Topics[topic].MeshMessageDeliveriesWindow)) {
continue
}
tstats.meshMessageDeliveries += 1
if tstats.meshMessageDeliveries > ps.params.Topics[topic].MeshMessageDeliveriesCap {
tstats.meshMessageDeliveries = ps.params.Topics[topic].MeshMessageDeliveriesCap
}
}
}