From 16eeacd798eed1f151a9916e934fb3a25711ba2f Mon Sep 17 00:00:00 2001 From: vyzo Date: Sun, 8 Mar 2020 16:48:45 +0200 Subject: [PATCH] message delivery tracking --- score.go | 216 ++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 200 insertions(+), 16 deletions(-) diff --git a/score.go b/score.go index a1015e0..33823b7 100644 --- a/score.go +++ b/score.go @@ -58,7 +58,7 @@ type TopicScoreParams struct { FirstMessageDeliveriesCap float64 // P3: mesh message deliveries - // This is the number of message deliveries in the mesh, within the MeshMessageDeliveriesLatency of + // This is the number of message deliveries in the mesh, within the MeshMessageDeliveriesWindow of // the first message delivery. // The parameter has an associated counter, decaying with MessageMessageDeliveriesDecay. // If the counter exceeds the threshold, its value is 0. @@ -66,9 +66,9 @@ type TopicScoreParams struct { // the deficit, ie (MessageDeliveriesThreshold - counter)^2 // The penalty is only activated after MeshMessageDeliveriesActivation time in the mesh. // The weight of the parameter MUST be negative. - MeshMessageDeliveriesWeight, MeshMessageDeliveriesDecay float64 - MeshMessageDeliveriesThreshold float64 - MeshMessageDeliveriesLatency, MeshMessageDeliveriesActivation time.Duration + MeshMessageDeliveriesWeight, MeshMessageDeliveriesDecay float64 + MeshMessageDeliveriesCap, MeshMessageDeliveriesThreshold float64 + MeshMessageDeliveriesWindow, MeshMessageDeliveriesActivation time.Duration // P4: invalid messages // This is the number of invalid messages in the topic. @@ -126,15 +126,39 @@ type peerScore struct { // IP colocation tracking peerIPs map[string]map[peer.ID]struct{} + + // message delivery tracking + deliveries *messageDeliveries + + msgID MsgIdFunction } +type messageDeliveries struct { + // TODO +} + +type deliveryRecord struct { + firstSeen time.Time + status int + peers map[peer.ID]time.Time +} + +// delivery record status +const ( + delivery_unknown = iota // we don't know (yet) if the message is valid + delivery_valid + delivery_invalid + delivery_throttled +) + func newPeerScore(params *PeerScoreParams) *peerScore { + // TODO return nil } // router interface func (ps *peerScore) Start(gs *GossipSubRouter) { - + // TODO } func (ps *peerScore) Score(p peer.ID) float64 { @@ -266,41 +290,201 @@ func (ps *peerScore) refreshScores() { func (ps *peerScore) resfreshIPs() { // peer IPs may change, so we periodically refresh them + // TODO } // tracer interface func (ps *peerScore) AddPeer(p peer.ID, proto protocol.ID) { - + // TODO } func (ps *peerScore) RemovePeer(p peer.ID) { - + // TODO } -func (ps *peerScore) Join(topic string) { - -} - -func (ps *peerScore) Leave(topic string) { - -} +func (ps *peerScore) Join(topic string) {} +func (ps *peerScore) Leave(topic string) {} func (ps *peerScore) Graft(p peer.ID, topic string) { - + // TODO } func (ps *peerScore) Prune(p peer.ID, topic string) { - + // TODO } func (ps *peerScore) DeliverMessage(msg *Message) { + ps.Lock() + defer ps.Unlock() + ps.markFirstMessageDelivery(msg.ReceivedFrom, msg) + + drec := ps.deliveries.getRecord(ps.msgID(msg.Message)) + + // mark the message as valid and reward mesh peers that have forwarded it in time + drec.status = delivery_valid + for p, t := range drec.peers { + ps.markDuplicateMessageDelivery(p, msg, drec.firstSeen, t) + } } func (ps *peerScore) RejectMessage(msg *Message, reason string) { + ps.Lock() + defer ps.Unlock() + if reason == "invalid signature" { + ps.markInvalidMessageDelivery(msg.ReceivedFrom, msg) + // if we reject with "invalid signature" we don't track this message delivery. + return + } + + drec := ps.deliveries.getRecord(ps.msgID(msg.Message)) + + if reason == "validation throttled" { + // if we reject with "validation throttled" we don't penalize the peer(s) that forward it + // because we don't know if it was valid. + drec.status = delivery_throttled + // release the delivery time tracking map to free some memory early + drec.peers = nil + return + } + + // mark the message as invalid and penalize peers that have already forwarded it. + drec.status = delivery_invalid + for p := range drec.peers { + ps.markInvalidMessageDelivery(p, msg) + } + + // release the delivery time tracking map to free some memory early + drec.peers = nil } func (ps *peerScore) DuplicateMessage(msg *Message) { + ps.Lock() + defer ps.Unlock() + drec := ps.deliveries.getRecord(ps.msgID(msg.Message)) + + _, ok := drec.peers[msg.ReceivedFrom] + if ok { + // we have already seen this duplicate! + return + } + + switch drec.status { + case delivery_unknown: + // the message is being validated; track the peer delivery time and wait for + // the Deliver/Reject notification. + now := time.Now() + drec.peers[msg.ReceivedFrom] = now + + case delivery_valid: + // mark the peer delivery time to only count a duplicate delivery once. + now := time.Now() + drec.peers[msg.ReceivedFrom] = now + ps.markDuplicateMessageDelivery(msg.ReceivedFrom, msg, drec.firstSeen, now) + + case delivery_invalid: + // we no longer track delivery time + ps.markInvalidMessageDelivery(msg.ReceivedFrom, msg) + + case delivery_throttled: + // the message ws throttled; do nothing (we don't know if it was valid) + } +} + +// message delivery records +func (d *messageDeliveries) getRecord(id string) *deliveryRecord { + // TODO + return nil +} + +// utilities +func (pstats *peerStats) getTopicStats(topic string, params *PeerScoreParams) (*topicStats, bool) { + tstats, ok := pstats.topics[topic] + if ok { + return tstats, true + } + + _, scoredTopic := params.Topics[topic] + if !scoredTopic { + return nil, false + } + + tstats = &topicStats{} + pstats.topics[topic] = tstats + + return tstats, true +} + +func (ps *peerScore) markInvalidMessageDelivery(p peer.ID, msg *Message) { + pstats, ok := ps.peerStats[p] + if !ok { + return + } + + for _, topic := range msg.GetTopicIDs() { + tstats, ok := pstats.getTopicStats(topic, ps.params) + if !ok { + continue + } + + tstats.invalidMessageDeliveries += 1 + } +} + +func (ps *peerScore) markFirstMessageDelivery(p peer.ID, msg *Message) { + pstats, ok := ps.peerStats[p] + if !ok { + return + } + + for _, topic := range msg.GetTopicIDs() { + tstats, ok := pstats.getTopicStats(topic, ps.params) + if !ok { + continue + } + + tstats.firstMessageDeliveries += 1 + if tstats.firstMessageDeliveries > ps.params.Topics[topic].FirstMessageDeliveriesCap { + tstats.firstMessageDeliveries = ps.params.Topics[topic].FirstMessageDeliveriesCap + } + + if !tstats.inMesh { + continue + } + + tstats.meshMessageDeliveries += 1 + if tstats.meshMessageDeliveries > ps.params.Topics[topic].MeshMessageDeliveriesCap { + tstats.meshMessageDeliveries = ps.params.Topics[topic].MeshMessageDeliveriesCap + } + } +} + +func (ps *peerScore) markDuplicateMessageDelivery(p peer.ID, msg *Message, firstSeen, deliveryTime time.Time) { + pstats, ok := ps.peerStats[p] + if !ok { + return + } + + for _, topic := range msg.GetTopicIDs() { + tstats, ok := pstats.getTopicStats(topic, ps.params) + if !ok { + continue + } + + if !tstats.inMesh { + continue + } + + // check against the mesh delivery window + if deliveryTime.After(firstSeen.Add(ps.params.Topics[topic].MeshMessageDeliveriesWindow)) { + continue + } + + tstats.meshMessageDeliveries += 1 + if tstats.meshMessageDeliveries > ps.params.Topics[topic].MeshMessageDeliveriesCap { + tstats.meshMessageDeliveries = ps.params.Topics[topic].MeshMessageDeliveriesCap + } + } }