feat(networking): integrate gossipsub scoring (#1769)

This commit is contained in:
Alvaro Revuelta 2023-06-06 19:28:47 +02:00 committed by GitHub
parent 44bcf0f2c4
commit 34a9263191
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 295 additions and 247 deletions

View File

@ -564,16 +564,11 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
# Subscribe to a topic, if relay is mounted
if conf.relay:
proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} =
proc handler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
trace "Hit subscribe handler", topic
let decoded = WakuMessage.decode(data)
if decoded.isOk():
if decoded.get().contentTopic == chat.contentTopic:
chat.printReceivedMessage(decoded.get())
else:
trace "Invalid encoded WakuMessage", error = decoded.error
if msg.contentTopic == chat.contentTopic:
chat.printReceivedMessage(msg)
let topic = DefaultPubsubTopic
node.subscribe(topic, handler)

View File

@ -194,15 +194,14 @@ proc start*(cmb: Chat2MatterBridge) {.async.} =
# Always mount relay for bridge
# `triggerSelf` is false on a `bridge` to avoid duplicates
await cmb.nodev2.mountRelay(triggerSelf = false)
await cmb.nodev2.mountRelay()
cmb.nodev2.wakuRelay.triggerSelf = false
# Bridging
# Handle messages on Waku v2 and bridge to Matterbridge
proc relayHandler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe, raises: [Defect].} =
let msg = WakuMessage.decode(data)
if msg.isOk():
trace "Bridging message from Chat2 to Matterbridge", msg=msg[]
cmb.toMatterbridge(msg[])
proc relayHandler(pubsubTopic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
trace "Bridging message from Chat2 to Matterbridge", msg=msg
cmb.toMatterbridge(msg)
cmb.nodev2.subscribe(DefaultPubsubTopic, relayHandler)

View File

@ -306,13 +306,8 @@ proc subscribeAndHandleMessages(node: WakuNode,
msgPerContentTopic: ContentTopicMessageTableRef) =
# handle function
proc handler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
let messageRes = WakuMessage.decode(data)
if messageRes.isErr():
warn "could not decode message", data=data, pubsubTopic=pubsubTopic
let message = messageRes.get()
trace "rx message", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic
proc handler(pubsubTopic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
trace "rx message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic
# If we reach a table limit size, remove c topics with the least messages.
let tableSize = 100
@ -322,10 +317,10 @@ proc subscribeAndHandleMessages(node: WakuNode,
# TODO: Will overflow at some point
# +1 if content topic existed, init to 1 otherwise
if msgPerContentTopic.hasKey(message.contentTopic):
msgPerContentTopic[message.contentTopic] += 1
if msgPerContentTopic.hasKey(msg.contentTopic):
msgPerContentTopic[msg.contentTopic] += 1
else:
msgPerContentTopic[message.contentTopic] = 1
msgPerContentTopic[msg.contentTopic] = 1
node.subscribe(pubsubTopic, handler)

View File

@ -264,7 +264,8 @@ proc start*(bridge: WakuBridge) {.async.} =
# Always mount relay for bridge.
# `triggerSelf` is false on a `bridge` to avoid duplicates
await bridge.nodev2.mountRelay(triggerSelf = false)
await bridge.nodev2.mountRelay()
bridge.nodev2.wakuRelay.triggerSelf = false
# Bridging
# Handle messages on Waku v1 and bridge to Waku v2
@ -275,12 +276,11 @@ proc start*(bridge: WakuBridge) {.async.} =
bridge.nodev1.registerEnvReceivedHandler(handleEnvReceived)
# Handle messages on Waku v2 and bridge to Waku v1
proc relayHandler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.decode(data)
if msg.isOk() and msg.get().isBridgeable():
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
if msg.isBridgeable():
try:
trace "Bridging message from V2 to V1", msg=msg.tryGet()
bridge.toWakuV1(msg.tryGet())
trace "Bridging message from V2 to V1", msg=msg
bridge.toWakuV1(msg)
except ValueError:
trace "Failed to convert message to Waku v1. Check content-topic format.", msg=msg
waku_bridge_dropped.inc(labelValues = ["value_error"])

View File

@ -83,14 +83,13 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
# any content topic can be chosen. make sure it matches the publisher
let contentTopic = ContentTopic("/examples/1/pubsub-example/proto")
proc handler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
let message = WakuMessage.decode(data).value
let payloadStr = string.fromBytes(message.payload)
if message.contentTopic == contentTopic:
proc handler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
let payloadStr = string.fromBytes(msg.payload)
if msg.contentTopic == contentTopic:
notice "message received", payload=payloadStr,
pubsubTopic=pubsubTopic,
contentTopic=message.contentTopic,
timestamp=message.timestamp
contentTopic=msg.contentTopic,
timestamp=msg.timestamp
node.subscribe(pubSubTopic, handler)
when isMainModule:

View File

@ -60,14 +60,11 @@ suite "WakuNode":
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.decode(data)
if msg.isOk():
let val = msg.value()
check:
topic == pubSubTopic
val.contentTopic == contentTopic
val.payload == payload
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
msg.contentTopic == contentTopic
msg.payload == payload
completionFut.complete(true)
node2.subscribe(pubSubTopic, relayHandler)

View File

@ -43,10 +43,9 @@ suite "WakuNode - Lightpush":
let message = fakeWakuMessage()
var completionFutRelay = newFuture[bool]()
proc relayHandler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.decode(data).get()
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
check:
pubsubTopic == DefaultPubsubTopic
topic == DefaultPubsubTopic
msg == message
completionFutRelay.complete(true)
destNode.subscribe(DefaultPubsubTopic, relayHandler)

View File

@ -16,14 +16,14 @@ import
../testlib/wakucore
proc noopRawHandler(): PubsubRawHandler =
var handler: PubsubRawHandler
handler = proc(pubsubTopic: PubsubTopic, data: seq[byte]): Future[void] {.gcsafe, noSideEffect.} = discard
proc noopRawHandler(): WakuRelayHandler =
var handler: WakuRelayHandler
handler = proc(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = discard
handler
proc newTestWakuRelay(switch = newTestSwitch(), self = true): Future[WakuRelay] {.async.} =
let proto = WakuRelay.new(switch, triggerSelf = self).tryGet()
proc newTestWakuRelay(switch = newTestSwitch()): Future[WakuRelay] {.async.} =
let proto = WakuRelay.new(switch).tryGet()
await proto.start()
let protocolMatcher = proc(proto: string): bool {.gcsafe.} =
@ -85,7 +85,7 @@ suite "Waku Relay":
topics.contains(networkC)
## When
nodeA.unsubscribeAll(networkA)
nodeA.unsubscribe(networkA)
## Then
check:

View File

@ -94,14 +94,11 @@ suite "WakuNode - Relay":
)
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.decode(data)
if msg.isOk():
let val = msg.value()
check:
topic == pubSubTopic
val.contentTopic == contentTopic
val.payload == payload
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
msg.contentTopic == contentTopic
msg.payload == payload
completionFut.complete(true)
node3.subscribe(pubSubTopic, relayHandler)
@ -182,19 +179,14 @@ suite "WakuNode - Relay":
node2.wakuRelay.addValidator(pubSubTopic, validator)
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
debug "relayed pubsub topic:", topic
let msg = WakuMessage.decode(data)
if msg.isOk():
let val = msg.value()
check:
topic == pubSubTopic
# check that only messages with contentTopic1 is relayed (but not contentTopic2)
val.contentTopic == contentTopic1
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
# check that only messages with contentTopic1 is relayed (but not contentTopic2)
msg.contentTopic == contentTopic1
# relay handler is called
completionFut.complete(true)
node3.subscribe(pubSubTopic, relayHandler)
await sleepAsync(500.millis)
@ -269,14 +261,11 @@ suite "WakuNode - Relay":
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.decode(data)
if msg.isOk():
let val = msg.value()
check:
topic == pubSubTopic
val.contentTopic == contentTopic
val.payload == payload
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
msg.contentTopic == contentTopic
msg.payload == payload
completionFut.complete(true)
node1.subscribe(pubSubTopic, relayHandler)
@ -313,14 +302,11 @@ suite "WakuNode - Relay":
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.decode(data)
if msg.isOk():
let val = msg.value()
check:
topic == pubSubTopic
val.contentTopic == contentTopic
val.payload == payload
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
msg.contentTopic == contentTopic
msg.payload == payload
completionFut.complete(true)
node1.subscribe(pubSubTopic, relayHandler)
@ -361,14 +347,11 @@ suite "WakuNode - Relay":
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.decode(data)
if msg.isOk():
let val = msg.value()
check:
topic == pubSubTopic
val.contentTopic == contentTopic
val.payload == payload
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
msg.contentTopic == contentTopic
msg.payload == payload
completionFut.complete(true)
node1.subscribe(pubSubTopic, relayHandler)
@ -404,14 +387,11 @@ suite "WakuNode - Relay":
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.decode(data)
if msg.isOk():
let val = msg.value()
check:
topic == pubSubTopic
val.contentTopic == contentTopic
val.payload == payload
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
msg.contentTopic == contentTopic
msg.payload == payload
completionFut.complete(true)
node1.subscribe(pubSubTopic, relayHandler)
@ -447,14 +427,11 @@ suite "WakuNode - Relay":
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.decode(data)
if msg.isOk():
let val = msg.value()
check:
topic == pubSubTopic
val.contentTopic == contentTopic
val.payload == payload
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
msg.contentTopic == contentTopic
msg.payload == payload
completionFut.complete(true)
node1.subscribe(pubSubTopic, relayHandler)
@ -468,3 +445,54 @@ suite "WakuNode - Relay":
(await completionFut.withTimeout(5.seconds)) == true
await node1.stop()
await node2.stop()
asyncTest "Bad peers with low reputation are disconnected":
# Create 5 nodes
let nodes = toSeq(0..<5).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
# subscribe all nodes to a topic
let topic = "topic"
for node in nodes: node.wakuRelay.subscribe(topic, nil)
await sleepAsync(500.millis)
# connect nodes in full mesh
for i in 0..<5:
for j in 0..<5:
if i == j:
continue
let connOk = await nodes[i].peerManager.connectRelay(nodes[j].switch.peerInfo.toRemotePeerInfo())
require connOk
# connection triggers different actions, wait for them
await sleepAsync(1.seconds)
# all peers are connected in a mesh, 4 conns each
for i in 0..<5:
check:
nodes[i].peerManager.switch.connManager.getConnections().len == 4
# node[0] publishes wrong messages (random bytes not decoding into WakuMessage)
for j in 0..<50:
discard await nodes[0].wakuRelay.publish(topic, urandom(1*(10^3)))
# long wait, must be higher than the configured decayInterval (how often score is updated)
await sleepAsync(20.seconds)
# all nodes lower the score of nodes[0] (will change if gossipsub params or amount of msg changes)
for i in 1..<5:
check:
nodes[i].wakuRelay.peerStats[nodes[0].switch.peerInfo.peerId].score == -249999.9
# nodes[0] was blacklisted from all other peers, no connections
check:
nodes[0].peerManager.switch.connManager.getConnections().len == 0
# the rest of the nodes now have 1 conn less (kicked nodes[0] out)
for i in 1..<5:
check:
nodes[i].peerManager.switch.connManager.getConnections().len == 3
# Stop all nodes
await allFutures(nodes.mapIt(it.stop()))

View File

@ -83,12 +83,10 @@ procSuite "WakuNode - RLN relay":
await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.decode(data)
if msg.isOk():
debug "The received topic:", topic
if topic == rlnRelayPubSubTopic:
completionFut.complete(true)
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
debug "The received topic:", topic
if topic == rlnRelayPubSubTopic:
completionFut.complete(true)
# mount the relay handler
node3.subscribe(rlnRelayPubSubTopic, relayHandler)
@ -172,12 +170,10 @@ procSuite "WakuNode - RLN relay":
# define a custom relay handler
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.decode(data)
if msg.isOk():
debug "The received topic:", topic
if topic == rlnRelayPubSubTopic:
completionFut.complete(true)
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
debug "The received topic:", topic
if topic == rlnRelayPubSubTopic:
completionFut.complete(true)
# mount the relay handler
node3.subscribe(rlnRelayPubSubTopic, relayHandler)
@ -302,20 +298,17 @@ procSuite "WakuNode - RLN relay":
var completionFut2 = newFuture[bool]()
var completionFut3 = newFuture[bool]()
var completionFut4 = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.decode(data)
if msg.isOk():
let wm = msg.value()
debug "The received topic:", topic
if topic == rlnRelayPubSubTopic:
if wm == wm1:
completionFut1.complete(true)
if wm == wm2:
completionFut2.complete(true)
if wm == wm3:
completionFut3.complete(true)
if wm == wm4:
completionFut4.complete(true)
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
debug "The received topic:", topic
if topic == rlnRelayPubSubTopic:
if msg == wm1:
completionFut1.complete(true)
if msg == wm2:
completionFut2.complete(true)
if msg == wm3:
completionFut3.complete(true)
if msg == wm4:
completionFut4.complete(true)
# mount the relay handler for node3

View File

@ -88,21 +88,20 @@ procSuite "WakuBridge":
waitFor bridge.start()
waitFor v2Node.start()
await v2Node.mountRelay(@[DefaultBridgeTopic], triggerSelf = false)
await v2Node.mountRelay(@[DefaultBridgeTopic])
v2Node.wakuRelay.triggerSelf = false
discard waitFor v1Node.rlpxConnect(newNode(bridge.nodev1.toENode()))
waitFor waku_node.connectToNodes(v2Node, @[bridge.nodev2.switch.peerInfo.toRemotePeerInfo()])
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.decode(data)
if msg.isOk() and msg.value().version == 1:
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
if msg.version == 1:
check:
# Message fields are as expected
msg.value().contentTopic == contentTopic # Topic translation worked
string.fromBytes(msg.value().payload).contains("from V1")
msg.contentTopic == contentTopic # Topic translation worked
string.fromBytes(msg.payload).contains("from V1")
completionFut.complete(true)

View File

@ -66,7 +66,7 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
# Unsubscribe all handlers from requested topics
for topic in topics:
node.unsubscribeAll(topic)
node.unsubscribe(topic)
cache.unsubscribe(topic)
return true

View File

@ -359,7 +359,7 @@ proc new*(T: type PeerManager,
storage: storage,
initialBackoffInSec: initialBackoffInSec,
backoffFactor: backoffFactor,
outPeersTarget: max(maxConnections div 10, 10),
outPeersTarget: max(maxConnections div 2, 10),
maxFailedAttempts: maxFailedAttempts,
colocationLimit: colocationLimit)

View File

@ -88,7 +88,7 @@ proc installRelayDeleteSubscriptionsV1Handler*(router: var RestRouter, node: Wak
# Unsubscribe all handlers from requested topics
for topic in req:
node.unsubscribeAll(string(topic))
node.unsubscribe(string(topic))
cache.unsubscribe(string(topic))
# Successfully unsubscribed from all requested topics

View File

@ -23,7 +23,7 @@ type TopicCache* = MessageCache[PubSubTopic]
##### Message handler
type TopicCacheMessageHandler* = SubscriptionHandler
type TopicCacheMessageHandler* = WakuRelayHandler
proc messageHandler*(cache: TopicCache): TopicCacheMessageHandler =

View File

@ -269,14 +269,10 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
await node.wakuArchive.handleMessage(topic, msg)
let defaultHandler = proc(topic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.decode(data)
if msg.isErr():
return
await traceHandler(topic, msg.value)
await filterHandler(topic, msg.value)
await archiveHandler(topic, msg.value)
let defaultHandler = proc(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
await traceHandler(topic, msg)
await filterHandler(topic, msg)
await archiveHandler(topic, msg)
node.wakuRelay.subscribe(topic, defaultHandler)
@ -302,27 +298,16 @@ proc subscribe*(node: WakuNode, topic: PubsubTopic, handler: WakuRelayHandler) =
node.registerRelayDefaultHandler(topic)
node.wakuRelay.subscribe(topic, handler)
proc unsubscribe*(node: WakuNode, topic: PubsubTopic, handler: WakuRelayHandler) =
## Unsubscribes a handler from a PubSub topic.
proc unsubscribe*(node: WakuNode, topic: PubsubTopic) =
## Unsubscribes from a specific PubSub topic.
if node.wakuRelay.isNil():
error "Invalid API call to `unsubscribe`. WakuRelay not mounted."
return
debug "unsubscribe", oubsubTopic= topic
info "unsubscribe", pubsubTopic=topic
let wakuRelay = node.wakuRelay
wakuRelay.unsubscribe(@[(topic, handler)])
proc unsubscribeAll*(node: WakuNode, topic: PubsubTopic) =
## Unsubscribes all handlers registered on a specific PubSub topic.
if node.wakuRelay.isNil():
error "Invalid API call to `unsubscribeAll`. WakuRelay not mounted."
return
info "unsubscribeAll", topic=topic
node.wakuRelay.unsubscribeAll(topic)
node.wakuRelay.unsubscribe(topic)
proc publish*(node: WakuNode, topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
@ -370,7 +355,6 @@ proc startRelay*(node: WakuNode) {.async.} =
proc mountRelay*(node: WakuNode,
topics: seq[string] = @[],
triggerSelf = true,
peerExchangeHandler = none(RoutingRecordsHandler)) {.async, gcsafe.} =
if not node.wakuRelay.isNil():
error "wakuRelay already mounted, skipping"
@ -379,10 +363,7 @@ proc mountRelay*(node: WakuNode,
## The default relay topics is the union of all configured topics plus default PubsubTopic(s)
info "mounting relay protocol"
let initRes = WakuRelay.new(
node.switch,
triggerSelf = triggerSelf
)
let initRes = WakuRelay.new(node.switch)
if initRes.isErr():
error "failed mounting relay protocol", error=initRes.error
return

View File

@ -16,6 +16,7 @@ import
libp2p/multihash,
libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/pubsub/rpc/messages,
libp2p/stream/connection,
libp2p/switch
import
@ -29,19 +30,100 @@ logScope:
const
WakuRelayCodec* = "/vac/waku/relay/2.0.0"
# see: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#overview-of-new-parameters
const TopicParameters = TopicParams(
topicWeight: 1,
type WakuRelayResult*[T] = Result[T, string]
type
PubsubRawHandler* = proc(pubsubTopic: PubsubTopic, data: seq[byte]): Future[void] {.gcsafe, raises: [Defect].}
SubscriptionHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.gcsafe, raises: [Defect].}
# p1: favours peers already in the mesh
timeInMeshWeight: 0.01,
timeInMeshQuantum: 1.seconds,
timeInMeshCap: 10.0,
# p2: rewards fast peers
firstMessageDeliveriesWeight: 1.0,
firstMessageDeliveriesDecay: 0.5,
firstMessageDeliveriesCap: 10.0,
# p3: penalizes lazy peers. safe low value
meshMessageDeliveriesWeight: 0.0,
meshMessageDeliveriesDecay: 0.0,
meshMessageDeliveriesCap: 0,
meshMessageDeliveriesThreshold: 0,
meshMessageDeliveriesWindow: 0.milliseconds,
meshMessageDeliveriesActivation: 0.seconds,
# p3b: tracks history of prunes
meshFailurePenaltyWeight: 0.0,
meshFailurePenaltyDecay: 0.0,
# p4: penalizes invalid messages. highly penalize
# peers sending wrong messages
invalidMessageDeliveriesWeight: -100.0,
invalidMessageDeliveriesDecay: 0.5
)
# see: https://rfc.vac.dev/spec/29/#gossipsub-v10-parameters
const GossipsubParameters = GossipSubParams(
explicit: true,
pruneBackoff: chronos.minutes(1),
unsubscribeBackoff: chronos.seconds(5),
floodPublish: true,
gossipFactor: 0.25,
d: 6,
dLow: 4,
dHigh: 12,
dScore: 6,
dOut: 3,
dLazy: 6,
heartbeatInterval: chronos.seconds(1),
historyLength: 6,
historyGossip: 3,
fanoutTTL: chronos.minutes(1),
seenTTL: chronos.minutes(2),
# no gossip is sent to peers below this score
gossipThreshold: -100,
# no self-published msgs are sent to peers below this score
publishThreshold: -1000,
# used to trigger disconnections + ignore peer if below this score
graylistThreshold: -10000,
# grafts better peers if the mesh median score drops below this. unset.
opportunisticGraftThreshold: 0,
# how often peer scoring is updated
decayInterval: chronos.seconds(12),
# below this we consider the parameter to be zero
decayToZero: 0.01,
# remember peer score during x after it disconnects
retainScore: chronos.minutes(10),
# p5: application specific, unset
appSpecificWeight: 0.0,
# p6: penalizes peers sharing more than threshold ips
ipColocationFactorWeight: -50.0,
ipColocationFactorThreshold: 5.0,
# p7: penalizes bad behaviour (weight and decay)
behaviourPenaltyWeight: -10.0,
behaviourPenaltyDecay: 0.986,
# triggers disconnections of bad peers aka score <graylistThreshold
disconnectBadPeers: true
)
type
WakuRelayResult*[T] = Result[T, string]
WakuRelayHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.gcsafe, raises: [Defect].}
WakuRelay* = ref object of GossipSub
WakuRelayHandler* = PubsubRawHandler|SubscriptionHandler
proc initProtocolHandler(w: WakuRelay) =
proc handler(conn: Connection, proto: string) {.async.} =
## main protocol handler that gets triggered on every
@ -62,51 +144,28 @@ proc initProtocolHandler(w: WakuRelay) =
w.handler = handler
w.codec = WakuRelayCodec
method initPubSub(w: WakuRelay) {.raises: [InitializationError].} =
## NOTE: This method overrides GossipSub initPubSub method; it called by the
## parent protocol, PubSub.
debug "init waku relay"
proc new*(T: type WakuRelay, switch: Switch): WakuRelayResult[T] =
# After discussions with @sinkingsugar: This is essentially what is needed for
# the libp2p `StrictNoSign` policy
w.anonymize = true
w.verifySignature = false
w.sign = false
procCall GossipSub(w).initPubSub()
w.initProtocolHandler()
proc new*(T: type WakuRelay, switch: Switch, triggerSelf: bool = true): WakuRelayResult[T] =
var wr: WakuRelay
var w: WakuRelay
try:
wr = WakuRelay.init(
w = WakuRelay.init(
switch = switch,
msgIdProvider = defaultMessageIdProvider,
triggerSelf = triggerSelf,
sign = false,
anonymize = true,
verifySignature = false,
maxMessageSize = MaxWakuMessageSize
sign = false,
triggerSelf = true,
msgIdProvider = defaultMessageIdProvider,
maxMessageSize = MaxWakuMessageSize,
parameters = GossipsubParameters
)
procCall GossipSub(w).initPubSub()
w.initProtocolHandler()
except InitializationError:
return err("initialization error: " & getCurrentExceptionMsg())
# TODO: Add a function to validate the WakuMessage integrity
# # Rejects messages that are not WakuMessage
# proc validator(topic: string, message: messages.Message): Future[ValidationResult] {.async.} =
# let msg = WakuMessage.decode(message.data)
# if msg.isOk():
# return ValidationResult.Accept
# return ValidationResult.Reject
# # Add validator to all default pubsub topics
# for pubSubTopic in defaultPubsubTopics:
# wr.addValidator(pubSubTopic, validator)
ok(wr)
return ok(w)
method addValidator*(w: WakuRelay, topic: varargs[string], handler: ValidatorHandler) {.gcsafe.} =
procCall GossipSub(w).addValidator(topic, handler)
@ -120,6 +179,14 @@ method stop*(w: WakuRelay) {.async.} =
debug "stop"
await procCall GossipSub(w).stop()
# rejects messages that are not WakuMessage
proc validator(pubsubTopic: string, message: messages.Message): Future[ValidationResult] {.async.} =
# can be optimized by checking if the message is a WakuMessage without allocating memory
# see nim-libp2p protobuf library
let msg = WakuMessage.decode(message.data)
if msg.isOk():
return ValidationResult.Accept
return ValidationResult.Reject
proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool =
GossipSub(w).topics.hasKey(topic)
@ -131,38 +198,34 @@ iterator subscribedTopics*(w: WakuRelay): lent PubsubTopic =
proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler) =
debug "subscribe", pubsubTopic=pubsubTopic
var subsHandler: PubsubRawHandler
when handler is SubscriptionHandler:
subsHandler = proc(pubsubTopic: PubsubTopic, data: seq[byte]): Future[void] {.gcsafe.} =
let decodeRes = WakuMessage.decode(data)
if decodeRes.isErr():
debug "message decode failure", pubsubTopic=pubsubTopic, error=decodeRes.error
return
# we need to wrap the handler since gossipsub doesnt understand WakuMessage
let wrappedHandler = proc(pubsubTopic: string, data: seq[byte]): Future[void] {.gcsafe, raises: [Defect].} =
let decMsg = WakuMessage.decode(data)
if decMsg.isErr():
# fine if triggerSelf enabled, since validators are bypassed
error "failed to decode WakuMessage, validator passed a wrong message", error = decMsg.error
let fut = newFuture[void]()
fut.complete()
return fut
else:
return handler(pubsubTopic, decMsg.get())
handler(pubsubTopic, decodeRes.value)
else:
subsHandler = handler
# add the default validator to the topic
procCall GossipSub(w).addValidator(pubSubTopic, validator)
procCall GossipSub(w).subscribe(pubsubTopic, subsHandler)
# set this topic parameters for scoring
w.topicParams[pubsubTopic] = TopicParameters
proc unsubscribe*(w: WakuRelay, topics: seq[TopicPair]) =
debug "unsubscribe", pubsubTopic=topics.mapIt(it[0])
# subscribe to the topic with our wrapped handler
procCall GossipSub(w).subscribe(pubsubTopic, wrappedHandler)
procCall GossipSub(w).unsubscribe(topics)
proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) =
debug "unsubscribeAll", pubsubTopic=pubsubTopic
proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic) =
debug "unsubscribe", pubsubTopic=pubsubTopic
procCall GossipSub(w).unsubscribeAll(pubsubTopic)
proc publish*(w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage|seq[byte]): Future[int] {.async.} =
proc publish*(w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage): Future[int] {.async.} =
trace "publish", pubsubTopic=pubsubTopic
var data: seq[byte]
when message is WakuMessage:
data = message.encode().buffer
else:
data = message
let data = message.encode().buffer
return await procCall GossipSub(w).publish(pubsubTopic, data)