parameters wip

This commit is contained in:
Giovanni Petrantoni 2020-07-16 13:23:11 +09:00
parent 66e5f41c38
commit 0e2ff309df
2 changed files with 83 additions and 20 deletions

View File

@ -66,6 +66,15 @@ type
graylistThreshold*: float graylistThreshold*: float
acceptPXThreshold*: float acceptPXThreshold*: float
opportunisticGraftThreshold*: float opportunisticGraftThreshold*: float
decayInterval*: Duration
decayToZero*: float
retainScore*: Duration
appSpecificWeight*: float
ipColocationFactorWeight*: float
ipColocationFactorThreshold*: float
behaviourPenaltyWeight*: float
behaviourPenaltyDecay*: float
GossipSub* = ref object of FloodSub GossipSub* = ref object of FloodSub
parameters*: GossipSubParams parameters*: GossipSubParams
@ -104,9 +113,46 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
gossipThreshold: -10, gossipThreshold: -10,
publishThreshold: -100, publishThreshold: -100,
graylistThreshold: -10000, graylistThreshold: -10000,
opportunisticGraftThreshold: 1 opportunisticGraftThreshold: 1,
decayInterval: 1.seconds,
decayToZero: 0.01,
retainScore: 10.seconds,
appSpecificWeight: 1.0,
ipColocationFactorWeight: 0.0,
ipColocationFactorThreshold: 1.0,
behaviourPenaltyWeight: -1.0,
behaviourPenaltyDecay: 0.999,
) )
proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
if (parameters.dOut >= GossipSubDlo) or
(parameters.dOut > (GossipSubD div 2)):
err("gossipsub: dOut parameter error, Number of outbound connections to keep in the mesh. Must be less than D_lo and at most D/2")
elif parameters.gossipThreshold >= 0:
err("gossipsub: gossipThreshold parameter error, Must be < 0")
elif parameters.publishThreshold >= parameters.gossipThreshold:
err("gossipsub: publishThreshold parameter error, Must be < gossipThreshold")
elif parameters.graylistThreshold >= parameters.publishThreshold:
err("gossipsub: graylistThreshold parameter error, Must be < publishThreshold")
elif parameters.acceptPXThreshold < 0:
err("gossipsub: acceptPXThreshold parameter error, Must be >= 0")
elif parameters.opportunisticGraftThreshold < 0:
err("gossipsub: opportunisticGraftThreshold parameter error, Must be >= 0")
elif parameters.decayToZero > 0.5 or parameters.decayToZero <= 0.0:
err("gossipsub: decayToZero parameter error, Should be close to 0.0")
elif parameters.appSpecificWeight < 0:
err("gossipsub: appSpecificWeight parameter error, Must be positive")
elif parameters.ipColocationFactorWeight > 0:
err("gossipsub: ipColocationFactorWeight parameter error, Must be negative or 0")
elif parameters.ipColocationFactorThreshold < 1.0:
err("gossipsub: ipColocationFactorThreshold parameter error, Must be at least 1")
elif parameters.behaviourPenaltyWeight >= 0:
err("gossipsub: behaviourPenaltyWeight parameter error, Must be negative")
elif parameters.behaviourPenaltyDecay < 0 or parameters.behaviourPenaltyDecay >= 1:
err("gossipsub: behaviourPenaltyDecay parameter error, Must be between 0 and 1")
else:
ok()
method init*(g: GossipSub) = method init*(g: GossipSub) =
proc handler(conn: Connection, proto: string) {.async.} = proc handler(conn: Connection, proto: string) {.async.} =
## main protocol handler that gets triggered on every ## main protocol handler that gets triggered on every
@ -618,27 +664,10 @@ method stop*(g: GossipSub) {.async.} =
trace "awaiting last heartbeat" trace "awaiting last heartbeat"
await g.heartbeatFut await g.heartbeatFut
proc validateParameters(g: GossipSub): Result[void, cstring] =
if (g.parameters.dOut >= GossipSubDlo) or
(g.parameters.dOut > (GossipSubD div 2)):
err("gossipsub: dOut parameter error, Number of outbound connections to keep in the mesh. Must be less than D_lo and at most D/2")
elif g.parameters.gossipThreshold >= 0:
err("gossipsub: gossipThreshold parameter error, Must be < 0")
elif g.parameters.publishThreshold >= g.parameters.gossipThreshold:
err("gossipsub: publishThreshold parameter error, Must be < gossipThreshold")
elif g.parameters.graylistThreshold >= g.parameters.publishThreshold:
err("gossipsub: graylistThreshold parameter error, Must be < publishThreshold")
elif g.parameters.acceptPXThreshold < 0:
err("gossipsub: acceptPXThreshold parameter error, Must be >= 0")
elif g.parameters.opportunisticGraftThreshold < 0:
err("gossipsub: opportunisticGraftThreshold parameter error, Must be >= 0")
else:
ok()
method initPubSub*(g: GossipSub) = method initPubSub*(g: GossipSub) =
procCall FloodSub(g).initPubSub() procCall FloodSub(g).initPubSub()
g.validateParameters().tryGet() g.parameters.validateParameters().tryGet()
randomize() randomize()
g.mcache = newMCache(GossipSubHistoryGossip, GossipSubHistoryLength) g.mcache = newMCache(GossipSubHistoryGossip, GossipSubHistoryLength)

View File

@ -16,6 +16,8 @@ import pubsubpeer,
../../peerid, ../../peerid,
../../peerinfo ../../peerinfo
import metrics import metrics
import stew/results
export results
export PubSubPeer export PubSubPeer
export PubSubObserver export PubSubObserver
@ -44,9 +46,24 @@ type
MsgIdProvider* = MsgIdProvider* =
proc(m: Message): string {.noSideEffect, raises: [Defect], nimcall, gcsafe.} proc(m: Message): string {.noSideEffect, raises: [Defect], nimcall, gcsafe.}
TopicParams* = object
topicWeight*: float
# p1
timeInMeshWeight*: float
timeinMeshQuantum*: Duration
timeInMeshCap*: float
# p2
firstMessageDeliveriesWeight*: float
firstMessageDeliveriesDecay*: float
firstMessageDeliveriesCap*: float
Topic* = object Topic* = object
# make this a variant type if one day we have different Params structs
name*: string name*: string
handler*: seq[TopicHandler] handler*: seq[TopicHandler]
parameters*: TopicParams
PubSub* = ref object of LPProtocol PubSub* = ref object of LPProtocol
peerInfo*: PeerInfo # this peer's info peerInfo*: PeerInfo # this peer's info
@ -61,6 +78,23 @@ type
msgIdProvider*: MsgIdProvider # Turn message into message id (not nil) msgIdProvider*: MsgIdProvider # Turn message into message id (not nil)
msgSeqno*: uint64 msgSeqno*: uint64
proc init*(_: type[TopicParams]): TopicParams =
TopicParams(
topicWeight: 1.0,
timeInMeshWeight: 0.01,
timeinMeshQuantum: 1.seconds,
timeInMeshCap: 10.0,
firstMessageDeliveriesWeight: 1.0,
firstMessageDeliveriesDecay: 0.5,
firstMessageDeliveriesCap: 10.0,
)
proc validateParameters*(parameters: TopicParams): Result[void, cstring] =
if parameters.timeInMeshWeight <= 0.0 or parameters.timeInMeshWeight > 1.0:
err("gossipsub: timeInMeshWeight parameter error, Must be a small positive value")
else:
ok()
method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} = method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} =
## handle peer disconnects ## handle peer disconnects
## ##
@ -244,7 +278,7 @@ method subscribe*(p: PubSub,
## ##
if topic notin p.topics: if topic notin p.topics:
trace "subscribing to topic", name = topic trace "subscribing to topic", name = topic
p.topics[topic] = Topic(name: topic) p.topics[topic] = Topic(name: topic, parameters: TopicParams.init())
p.topics[topic].handler.add(handler) p.topics[topic].handler.add(handler)