This commit is contained in:
Giovanni Petrantoni 2020-07-19 12:37:45 +09:00
parent 6af7909378
commit d9fa9e2e84
4 changed files with 136 additions and 107 deletions

View File

@ -61,6 +61,35 @@ type
inMesh*: bool
meshMessageDeliveriesActive*: bool
TopicParams* = object
topicWeight*: float64
# p1
timeInMeshWeight*: float64
timeInMeshQuantum*: Duration
timeInMeshCap*: float64
# p2
firstMessageDeliveriesWeight*: float64
firstMessageDeliveriesDecay*: float64
firstMessageDeliveriesCap*: float64
# p3
meshMessageDeliveriesWeight*: float64
meshMessageDeliveriesDecay*: float64
meshMessageDeliveriesThreshold*: float64
meshMessageDeliveriesCap*: float64
meshMessageDeliveriesActivation*: Duration
meshMessageDeliveriesWindow*: Duration
# p3b
meshFailurePenaltyWeight*: float64
meshFailurePenaltyDecay*: float64
# p4
invalidMessageDeliveriesWeight*: float64
invalidMessageDeliveriesDecay*: float64
PeerStats* = object
topicInfos*: Table[string, TopicInfo]
expire*: Moment # updated on disconnect, to retain scores until expire
@ -68,27 +97,26 @@ type
GossipSubParams* = object
pruneBackoff*: Duration
floodPublish*: bool
gossipFactor*: float
gossipFactor*: float64
dScore*: int
dOut*: int
gossipThreshold*: float
publishThreshold*: float
graylistThreshold*: float
acceptPXThreshold*: float
opportunisticGraftThreshold*: float
gossipThreshold*: float64
publishThreshold*: float64
graylistThreshold*: float64
acceptPXThreshold*: float64
opportunisticGraftThreshold*: float64
decayInterval*: Duration
decayToZero*: float
decayToZero*: float64
retainScore*: Duration
appSpecificWeight*: float
ipColocationFactorWeight*: float
ipColocationFactorThreshold*: float
behaviourPenaltyWeight*: float
behaviourPenaltyDecay*: float
appSpecificWeight*: float64
ipColocationFactorWeight*: float64
ipColocationFactorThreshold*: float64
behaviourPenaltyWeight*: float64
behaviourPenaltyDecay*: float64
GossipSub* = ref object of FloodSub
parameters*: GossipSubParams
mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic
fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic
gossipsub*: PeerTable # peers that are subscribed to a topic
@ -101,7 +129,10 @@ type
heartbeatFut: Future[void] # cancellation future for heartbeat interval
heartbeatRunning: bool
heartbeatLock: AsyncLock # heartbeat lock to prevent two consecutive concurrent heartbeats
peerStats: Table[PubSubPeer, PeerStats]
parameters*: GossipSubParams
topicParams*: Table[string, TopicParams]
when not defined(release):
prunedPeers: HashSet[PubSubPeer]
@ -168,6 +199,49 @@ proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
else:
ok()
proc init*(_: type[TopicParams]): TopicParams =
TopicParams(
topicWeight: 1.0,
timeInMeshWeight: 0.01,
timeInMeshQuantum: 1.seconds,
timeInMeshCap: 10.0,
firstMessageDeliveriesWeight: 1.0,
firstMessageDeliveriesDecay: 0.5,
firstMessageDeliveriesCap: 10.0,
meshMessageDeliveriesWeight: -1.0,
meshMessageDeliveriesDecay: 0.5,
meshMessageDeliveriesCap: 10,
meshMessageDeliveriesThreshold: 5,
meshMessageDeliveriesWindow: 5.milliseconds,
meshMessageDeliveriesActivation: 1.seconds,
meshFailurePenaltyWeight: -1.0,
meshFailurePenaltyDecay: 0.5,
invalidMessageDeliveriesWeight: -1.0,
invalidMessageDeliveriesDecay: 0.5
)
proc validateParameters*(parameters: TopicParams): Result[void, cstring] =
if parameters.timeInMeshWeight <= 0.0 or parameters.timeInMeshWeight > 1.0:
err("gossipsub: timeInMeshWeight parameter error, Must be a small positive value")
elif parameters.timeInMeshCap <= 0.0:
err("gossipsub: timeInMeshCap parameter error, Should be a positive value")
elif parameters.firstMessageDeliveriesWeight <= 0.0:
err("gossipsub: firstMessageDeliveriesWeight parameter error, Should be a positive value")
elif parameters.meshMessageDeliveriesWeight >= 0.0:
err("gossipsub: meshMessageDeliveriesWeight parameter error, Should be a negative value")
elif parameters.meshMessageDeliveriesThreshold <= 0.0:
err("gossipsub: meshMessageDeliveriesThreshold parameter error, Should be a positive value")
elif parameters.meshMessageDeliveriesCap < parameters.meshMessageDeliveriesThreshold:
err("gossipsub: meshMessageDeliveriesCap parameter error, Should be >= meshMessageDeliveriesThreshold")
elif parameters.meshMessageDeliveriesWindow > 100.milliseconds:
err("gossipsub: meshMessageDeliveriesWindow parameter error, Should be small, 1-5ms")
elif parameters.meshFailurePenaltyWeight >= 0.0:
err("gossipsub: meshFailurePenaltyWeight parameter error, Should be a negative value")
elif parameters.invalidMessageDeliveriesWeight >= 0.0:
err("gossipsub: invalidMessageDeliveriesWeight parameter error, Should be a negative value")
else:
ok()
method init*(g: GossipSub) =
proc handler(conn: Connection, proto: string) {.async.} =
## main protocol handler that gets triggered on every
@ -198,23 +272,31 @@ proc grafted(g: GossipSub, p: PubSubPeer, topic: string) =
var info = stats.topicInfos.mgetOrPut(topic, TopicInfo())
info.graftTime = Moment.now()
info.meshTime = 0.seconds
do:
raise newException(CatchableError, "TopicInfo key not found for " & $p)
info.inMesh = true
proc pruned(g: GossipSub, p: PubSubPeer, topic: string) {.gcsafe.} =
assert(g.peerStats[p].topicInfos[topic].inMesh == true)
debug "grafted", p
do:
doAssert(false, "grafted: TopicInfo key not found for " & $p)
proc pruned(g: GossipSub, p: PubSubPeer, topic: string) =
g.peerStats.withValue(p, stats) do:
when not defined(release):
g.prunedPeers.incl(p)
var _ = stats.topicInfos[topic]
var info = stats.topicInfos[topic]
info.inMesh = false
debug "pruned", p
do:
when not defined(release):
if p in g.prunedPeers:
raise newException(CatchableError, "Dupe prune " & $p)
doAssert(false, "pruned: Dupe prune " & $p)
else:
raise newException(CatchableError, "TopicInfo key not found for " & $p)
doAssert(false, "pruned: TopicInfo key not found for " & $p)
else:
raise newException(CatchableError, "TopicInfo key not found for " & $p)
doAssert(false, "pruned: TopicInfo key not found for " & $p)
proc replenishFanout(g: GossipSub, topic: string) =
## get fanout peers for a topic
@ -261,8 +343,8 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
# sort peers by score
grafts.sort(proc (x, y: PubSubPeer): int =
let
peerx = x.score()
peery = y.score()
peerx = x.score
peery = y.score
if peerx < peery: -1
elif peerx == peery: 0
else: 1)
@ -358,12 +440,18 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
result[peer.id].ihave.add(ihave)
func `/`(a, b: Duration): float64 =
let
fa = float64(a.nanoseconds) / 1000000000
fb = float64(b.nanoseconds) / 1000000000
fa / fb
proc updateScores(g: GossipSub) = # avoid async
debug "updating scores", peers = g.peers.len
let now = Moment.now()
for peer, stats in g.peerStats:
for peer, stats in g.peerStats.mpairs:
debug "updating peer score", peer, gossipTopics = peer.topics.len
# TODO
@ -375,16 +463,27 @@ proc updateScores(g: GossipSub) = # avoid async
for topic in peer.topics:
debug "updating peer topic's scores", peer, topic
# Defect on purpose, no magic here please, this should not fail!
let topicParams = g.topics[topic].parameters
var info = stats.topicInfos[topic]
var topicParams = g.topicParams.mgetOrPut(topic, TopicParams.init())
var info = stats.topicInfos.mgetOrPut(topic, TopicInfo())
var topicScore = 0'f64
if info.inMesh:
info.meshTime = now - info.graftTime
if info.meshTime > topicParams.meshMessageDeliveriesActivation:
info.meshMessageDeliveriesActive = true
var p1 = info.meshTime / topicParams.timeInMeshQuantum
if p1 > topicParams.timeInMeshCap:
p1 = topicParams.timeInMeshCap
topicScore = p1 * topicParams.timeInMeshWeight
peer.score += topicScore * topicParams.topicWeight
# debug assert to check nim compiler is doing what we are asking...
assert(stats.topicInfos[topic].meshTime == info.meshTime)
debug "updated peer's score", peer, score = peer.score
proc heartbeat(g: GossipSub) {.async.} =
while g.heartbeatRunning:
try:
@ -412,7 +511,8 @@ proc heartbeat(g: GossipSub) {.async.} =
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception ocurred in gossipsub heartbeat", exc = exc.msg
warn "exception ocurred in gossipsub heartbeat", exc = exc.msg, trace = exc.getStackTrace()
assert(false, "exception ocurred in gossipsub heartbeat")
await sleepAsync(GossipSubHeartbeatInterval)
@ -445,7 +545,7 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) =
g.explicitPeers.excl(peer.id)
# don't retain bad score peers
if peer.score() > 0:
if peer.score > 0:
g.peerStats.del(peer)
return
@ -684,7 +784,7 @@ method publish*(g: GossipSub,
if g.parameters.floodPublish:
for id, peer in g.peers:
if topic in peer.topics and
peer.score() >= g.parameters.publishThreshold:
peer.score >= g.parameters.publishThreshold:
debug "publish: including flood/high score peer", peer = id
peers.incl(peer)

View File

@ -48,40 +48,10 @@ type
MsgIdProvider* =
proc(m: Message): string {.noSideEffect, raises: [Defect], nimcall, gcsafe.}
TopicParams* = object
topicWeight*: float
# p1
timeInMeshWeight*: float
timeinMeshQuantum*: Duration
timeInMeshCap*: float
# p2
firstMessageDeliveriesWeight*: float
firstMessageDeliveriesDecay*: float
firstMessageDeliveriesCap*: float
# p3
meshMessageDeliveriesWeight*: float
meshMessageDeliveriesDecay*: float
meshMessageDeliveriesThreshold*: float
meshMessageDeliveriesCap*: float
meshMessageDeliveriesActivation*: Duration
meshMessageDeliveriesWindow*: Duration
# p3b
meshFailurePenaltyWeight*: float
meshFailurePenaltyDecay*: float
# p4
invalidMessageDeliveriesWeight*: float
invalidMessageDeliveriesDecay*: float
Topic* = object
# make this a variant type if one day we have different Params structs
name*: string
handler*: seq[TopicHandler]
parameters*: TopicParams
PubSub* = ref object of LPProtocol
peerInfo*: PeerInfo # this peer's info
@ -97,49 +67,6 @@ type
msgIdProvider*: MsgIdProvider # Turn message into message id (not nil)
msgSeqno*: uint64
proc init*(_: type[TopicParams]): TopicParams =
TopicParams(
topicWeight: 1.0,
timeInMeshWeight: 0.01,
timeinMeshQuantum: 1.seconds,
timeInMeshCap: 10.0,
firstMessageDeliveriesWeight: 1.0,
firstMessageDeliveriesDecay: 0.5,
firstMessageDeliveriesCap: 10.0,
meshMessageDeliveriesWeight: -1.0,
meshMessageDeliveriesDecay: 0.5,
meshMessageDeliveriesCap: 10,
meshMessageDeliveriesThreshold: 5,
meshMessageDeliveriesWindow: 5.milliseconds,
meshMessageDeliveriesActivation: 1.seconds,
meshFailurePenaltyWeight: -1.0,
meshFailurePenaltyDecay: 0.5,
invalidMessageDeliveriesWeight: -1.0,
invalidMessageDeliveriesDecay: 0.5
)
proc validateParameters*(parameters: TopicParams): Result[void, cstring] =
if parameters.timeInMeshWeight <= 0.0 or parameters.timeInMeshWeight > 1.0:
err("gossipsub: timeInMeshWeight parameter error, Must be a small positive value")
elif parameters.timeInMeshCap <= 0.0:
err("gossipsub: timeInMeshCap parameter error, Should be a positive value")
elif parameters.firstMessageDeliveriesWeight <= 0.0:
err("gossipsub: firstMessageDeliveriesWeight parameter error, Should be a positive value")
elif parameters.meshMessageDeliveriesWeight >= 0.0:
err("gossipsub: meshMessageDeliveriesWeight parameter error, Should be a negative value")
elif parameters.meshMessageDeliveriesThreshold <= 0.0:
err("gossipsub: meshMessageDeliveriesThreshold parameter error, Should be a positive value")
elif parameters.meshMessageDeliveriesCap < parameters.meshMessageDeliveriesThreshold:
err("gossipsub: meshMessageDeliveriesCap parameter error, Should be >= meshMessageDeliveriesThreshold")
elif parameters.meshMessageDeliveriesWindow > 100.milliseconds:
err("gossipsub: meshMessageDeliveriesWindow parameter error, Should be small, 1-5ms")
elif parameters.meshFailurePenaltyWeight >= 0.0:
err("gossipsub: meshFailurePenaltyWeight parameter error, Should be a negative value")
elif parameters.invalidMessageDeliveriesWeight >= 0.0:
err("gossipsub: invalidMessageDeliveriesWeight parameter error, Should be a negative value")
else:
ok()
method handleConnect*(p: PubSub, peer: PubSubPeer) {.base.} =
discard
@ -352,7 +279,7 @@ method subscribe*(p: PubSub,
##
if topic notin p.topics:
trace "subscribing to topic", name = topic
p.topics[topic] = Topic(name: topic, parameters: TopicParams.init())
p.topics[topic] = Topic(name: topic)
p.topics[topic].handler.add(handler)

View File

@ -42,14 +42,12 @@ type
onConnect*: AsyncEvent
observers*: ref seq[PubSubObserver] # ref as in smart_ptr
score*: float64
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
chronicles.formatIt(PubSubPeer): it.peerInfo.id
func score*(p: PubSubPeer): float64 =
# TODO
0.0
func hash*(p: PubSubPeer): Hash =
# int is either 32/64, so intptr basically, pubsubpeer is a ref
cast[pointer](p).hash

View File

@ -308,6 +308,10 @@ suite "GossipSub":
passed.complete(true)
var nodes = generateNodes(2, true)
var gossipSub1: GossipSub = GossipSub(nodes[0].pubSub.get())
gossipSub1.parameters.floodPublish = false
var gossipSub2: GossipSub = GossipSub(nodes[1].pubSub.get())
gossipSub2.parameters.floodPublish = false
var wait: seq[Future[void]]
wait.add(await nodes[0].start())
wait.add(await nodes[1].start())