finishup with score and ihave budget

This commit is contained in:
Giovanni Petrantoni 2020-08-17 01:20:50 +09:00
parent 1661344e17
commit afaa7f2a84
4 changed files with 85 additions and 22 deletions

View File

@ -11,7 +11,7 @@
{.push raises: [Defect].}
import nativesockets
import nativesockets, hashes
import tables, strutils, stew/shims/net
import chronos
import multicodec, multihash, multibase, transcoder, vbuffer, peerid,
@ -56,6 +56,13 @@ const
IPPROTO_TCP = Protocol.IPPROTO_TCP
IPPROTO_UDP = Protocol.IPPROTO_UDP
proc hash*(a: MultiAddress): Hash =
var h: Hash = 0
h = h !& hash(a.data.buffer)
h = h !& hash(a.data.offset)
h = h !& hash(a.data.length)
!$h
proc ip4StB(s: string, vb: var VBuffer): bool =
## IPv4 stringToBuffer() implementation.
try:

View File

@ -56,6 +56,7 @@ const
const
BackoffSlackTime = 2 # seconds
IWantPeerBudget = 25 # 25 messages per second ( reset every heartbeat )
IHavePeerBudget = 10
type
TopicInfo* = object
@ -146,6 +147,7 @@ type
parameters*: GossipSubParams
topicParams*: Table[string, TopicParams]
directPeersLoop: Future[void]
peersInIP: Table[MultiAddress, HashSet[PubSubPeer]]
heartbeatEvents*: seq[AsyncEvent]
@ -282,6 +284,7 @@ method onNewPeer(g: GossipSub, peer: PubSubPeer) =
# new peer
g.peerStats[peer] = PeerStats()
peer.iWantBudget = IWantPeerBudget
peer.iHaveBudget = IHavePeerBudget
return
else:
# we knew this peer
@ -618,6 +621,24 @@ func `/`(a, b: Duration): float64 =
fb = float64(b.nanoseconds) / 1000000000
fa / fb
proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 =
if peer.sendConn == nil:
0.0
else:
let
address = peer.sendConn.observedAddr
ipPeers = g.peersInIP.getOrDefault(address)
len = ipPeers.len.float64
if len > g.parameters.ipColocationFactorThreshold:
let over = len - g.parameters.ipColocationFactorThreshold
over * over
else:
# lazy update peersInIP
if address notin g.peersInIP:
g.peersInIP[address] = initHashSet[PubSubPeer]()
g.peersInIP[address].incl(peer)
0.0
proc updateScores(g: GossipSub) = # avoid async
trace "updating scores", peers = g.peers.len
@ -694,7 +715,18 @@ proc updateScores(g: GossipSub) = # avoid async
# Wrap up
# commit our changes, mgetOrPut does NOT work as wanted with value types (lent?)
stats.topicInfos[topic] = info
peer.score += peer.appScore * g.parameters.appSpecificWeight
peer.score += peer.behaviourPenalty * peer.behaviourPenalty * g.parameters.behaviourPenaltyWeight
peer.score += g.colocationFactor(peer) * g.parameters.ipColocationFactorWeight
# decay behaviourPenalty
peer.behaviourPenalty *= g.parameters.behaviourPenaltyDecay
if peer.behaviourPenalty < g.parameters.decayToZero:
peer.behaviourPenalty = 0
trace "updated peer's score", peer, score = peer.score
for peer in evicting:
@ -715,9 +747,11 @@ proc heartbeat(g: GossipSub) {.async.} =
g.backingOff.del(peer)
# reset IWANT budget
# reset IHAVE cap
block:
for peer in g.peers.values:
peer.iWantBudget = IWantPeerBudget
peer.iHaveBudget = IHavePeerBudget
g.updateScores()
@ -779,6 +813,11 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) =
if pubSubPeer.isNil:
return
# remove from peer IPs collection too
if pubSubPeer.sendConn != nil:
g.peersInIP.withValue(pubSubPeer.sendConn.observedAddr, s) do:
s[].excl(pubSubPeer)
for t in toSeq(g.gossipsub.keys):
g.gossipsub.removePeer(t, pubSubPeer)
# also try to remove from explicit table here
@ -947,29 +986,38 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
proc handleIHave(g: GossipSub,
peer: PubSubPeer,
ihaves: seq[ControlIHave]): ControlIWant =
for ihave in ihaves:
trace "peer sent ihave",
peer = peer.id, topic = ihave.topicID, msgs = ihave.messageIDs
if peer.score < g.parameters.gossipThreshold:
trace "ihave: ignoring low score peer", peer = $peer, score = peer.score
elif peer.iHaveBudget == 0:
trace "ihave: ignoring out of budget peer", peer = $peer, score = peer.score
else:
dec peer.iHaveBudget
for ihave in ihaves:
trace "peer sent ihave",
peer = peer.id, topic = ihave.topicID, msgs = ihave.messageIDs
if ihave.topicID in g.mesh:
for m in ihave.messageIDs:
if m notin g.seen:
result.messageIDs.add(m)
if ihave.topicID in g.mesh:
for m in ihave.messageIDs:
if m notin g.seen:
result.messageIDs.add(m)
proc handleIWant(g: GossipSub,
peer: PubSubPeer,
iwants: seq[ControlIWant]): seq[Message] =
for iwant in iwants:
for mid in iwant.messageIDs:
trace "peer sent iwant", peer = peer.id, messageID = mid
let msg = g.mcache.get(mid)
if msg.isSome:
# avoid spam
if peer.iWantBudget > 0:
result.add(msg.get())
dec peer.iWantBudget
else:
return
if peer.score < g.parameters.gossipThreshold:
trace "iwant: ignoring low score peer", peer = $peer, score = peer.score
else:
for iwant in iwants:
for mid in iwant.messageIDs:
trace "peer sent iwant", peer = peer.id, messageID = mid
let msg = g.mcache.get(mid)
if msg.isSome:
# avoid spam
if peer.iWantBudget > 0:
result.add(msg.get())
dec peer.iWantBudget
else:
return
proc punishPeer(g: GossipSub, peer: PubSubPeer, msg: Message) =
for t in msg.topicIDs:

View File

@ -39,7 +39,7 @@ type
PubSubPeer* = ref object of RootObj
switch*: Switch # switch instance to dial peers
codec*: string # the protocol that this peer joined from
sendConn: Connection
sendConn*: Connection
peerId*: PeerID
handler*: RPCHandler
sentRpcCache: TimedCache[string] # cache for already sent messages
@ -50,7 +50,10 @@ type
score*: float64
iWantBudget*: int
outbound*: bool
iHaveBudget*: int
outbound*: bool # if this is an outbound connection
appScore*: float64 # application specific score
behaviourPenalty*: float64 # the eventual penalty score
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}

View File

@ -251,6 +251,11 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
for muxer in s.muxers.values:
ms.addHandler(muxer.codecs, muxer)
# add the mounted protocols
# notice this should be kept in sync ...
for handler in s.ms.handlers:
ms.handlers &= handler
# handle subsequent secure requests
await ms.handle(sconn)