GossipSub: IDontWant (#934)
This commit is contained in:
parent
440461b24b
commit
b784167805
|
@ -263,6 +263,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
||||||
g.handlePrune(peer, control.prune)
|
g.handlePrune(peer, control.prune)
|
||||||
|
|
||||||
var respControl: ControlMessage
|
var respControl: ControlMessage
|
||||||
|
g.handleIDontWant(peer, control.idontwant)
|
||||||
let iwant = g.handleIHave(peer, control.ihave)
|
let iwant = g.handleIHave(peer, control.ihave)
|
||||||
if iwant.messageIds.len > 0:
|
if iwant.messageIds.len > 0:
|
||||||
respControl.iwant.add(iwant)
|
respControl.iwant.add(iwant)
|
||||||
|
@ -337,6 +338,21 @@ proc validateAndRelay(g: GossipSub,
|
||||||
toSendPeers.excl(peer)
|
toSendPeers.excl(peer)
|
||||||
toSendPeers.excl(seenPeers)
|
toSendPeers.excl(seenPeers)
|
||||||
|
|
||||||
|
# IDontWant is only worth it if the message is substantially
|
||||||
|
# bigger than the messageId
|
||||||
|
if msg.data.len > msgId.len * 10:
|
||||||
|
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
|
||||||
|
idontwant: @[ControlIWant(messageIds: @[msgId])]
|
||||||
|
))))
|
||||||
|
|
||||||
|
for peer in toSendPeers:
|
||||||
|
for heDontWant in peer.heDontWants:
|
||||||
|
if msgId in heDontWant:
|
||||||
|
seenPeers.incl(peer)
|
||||||
|
break
|
||||||
|
toSendPeers.excl(seenPeers)
|
||||||
|
|
||||||
|
|
||||||
# In theory, if topics are the same in all messages, we could batch - we'd
|
# In theory, if topics are the same in all messages, we could batch - we'd
|
||||||
# also have to be careful to only include validated messages
|
# also have to be careful to only include validated messages
|
||||||
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
|
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
|
||||||
|
|
|
@ -262,6 +262,15 @@ proc handleIHave*(g: GossipSub,
|
||||||
g.rng.shuffle(res.messageIds)
|
g.rng.shuffle(res.messageIds)
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
proc handleIDontWant*(g: GossipSub,
|
||||||
|
peer: PubSubPeer,
|
||||||
|
iDontWants: seq[ControlIWant]) =
|
||||||
|
for dontWant in iDontWants:
|
||||||
|
for messageId in dontWant.messageIds:
|
||||||
|
if peer.heDontWants[^1].len > 1000: break
|
||||||
|
if messageId.len > 100: continue
|
||||||
|
peer.heDontWants[^1].incl(messageId)
|
||||||
|
|
||||||
proc handleIWant*(g: GossipSub,
|
proc handleIWant*(g: GossipSub,
|
||||||
peer: PubSubPeer,
|
peer: PubSubPeer,
|
||||||
iwants: seq[ControlIWant]): seq[Message] {.raises: [].} =
|
iwants: seq[ControlIWant]): seq[Message] {.raises: [].} =
|
||||||
|
@ -629,6 +638,9 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} =
|
||||||
peer.sentIHaves.addFirst(default(HashSet[MessageId]))
|
peer.sentIHaves.addFirst(default(HashSet[MessageId]))
|
||||||
if peer.sentIHaves.len > g.parameters.historyLength:
|
if peer.sentIHaves.len > g.parameters.historyLength:
|
||||||
discard peer.sentIHaves.popLast()
|
discard peer.sentIHaves.popLast()
|
||||||
|
peer.heDontWants.addFirst(default(HashSet[MessageId]))
|
||||||
|
if peer.heDontWants.len > g.parameters.historyLength:
|
||||||
|
discard peer.heDontWants.popLast()
|
||||||
peer.iHaveBudget = IHavePeerBudget
|
peer.iHaveBudget = IHavePeerBudget
|
||||||
peer.pingBudget = PingsPeerBudget
|
peer.pingBudget = PingsPeerBudget
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@ import rpc/[messages, message, protobuf],
|
||||||
../../protobuf/minprotobuf,
|
../../protobuf/minprotobuf,
|
||||||
../../utility
|
../../utility
|
||||||
|
|
||||||
export peerid, connection
|
export peerid, connection, deques
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "libp2p pubsubpeer"
|
topics = "libp2p pubsubpeer"
|
||||||
|
@ -60,6 +60,7 @@ type
|
||||||
|
|
||||||
score*: float64
|
score*: float64
|
||||||
sentIHaves*: Deque[HashSet[MessageId]]
|
sentIHaves*: Deque[HashSet[MessageId]]
|
||||||
|
heDontWants*: Deque[HashSet[MessageId]]
|
||||||
iHaveBudget*: int
|
iHaveBudget*: int
|
||||||
pingBudget*: int
|
pingBudget*: int
|
||||||
maxMessageSize: int
|
maxMessageSize: int
|
||||||
|
@ -317,3 +318,4 @@ proc new*(
|
||||||
maxMessageSize: maxMessageSize
|
maxMessageSize: maxMessageSize
|
||||||
)
|
)
|
||||||
result.sentIHaves.addFirst(default(HashSet[MessageId]))
|
result.sentIHaves.addFirst(default(HashSet[MessageId]))
|
||||||
|
result.heDontWants.addFirst(default(HashSet[MessageId]))
|
||||||
|
|
|
@ -42,6 +42,7 @@ type
|
||||||
iwant*: seq[ControlIWant]
|
iwant*: seq[ControlIWant]
|
||||||
graft*: seq[ControlGraft]
|
graft*: seq[ControlGraft]
|
||||||
prune*: seq[ControlPrune]
|
prune*: seq[ControlPrune]
|
||||||
|
idontwant*: seq[ControlIWant]
|
||||||
|
|
||||||
ControlIHave* = object
|
ControlIHave* = object
|
||||||
topicId*: string
|
topicId*: string
|
||||||
|
|
|
@ -87,6 +87,8 @@ proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) =
|
||||||
ipb.write(3, graft)
|
ipb.write(3, graft)
|
||||||
for prune in control.prune:
|
for prune in control.prune:
|
||||||
ipb.write(4, prune)
|
ipb.write(4, prune)
|
||||||
|
for idontwant in control.idontwant:
|
||||||
|
ipb.write(5, idontwant)
|
||||||
if len(ipb.buffer) > 0:
|
if len(ipb.buffer) > 0:
|
||||||
ipb.finish()
|
ipb.finish()
|
||||||
pb.write(field, ipb)
|
pb.write(field, ipb)
|
||||||
|
@ -210,6 +212,7 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.
|
||||||
var iwantpbs: seq[seq[byte]]
|
var iwantpbs: seq[seq[byte]]
|
||||||
var graftpbs: seq[seq[byte]]
|
var graftpbs: seq[seq[byte]]
|
||||||
var prunepbs: seq[seq[byte]]
|
var prunepbs: seq[seq[byte]]
|
||||||
|
var idontwant: seq[seq[byte]]
|
||||||
if ? cpb.getRepeatedField(1, ihavepbs):
|
if ? cpb.getRepeatedField(1, ihavepbs):
|
||||||
for item in ihavepbs:
|
for item in ihavepbs:
|
||||||
control.ihave.add(? decodeIHave(initProtoBuffer(item)))
|
control.ihave.add(? decodeIHave(initProtoBuffer(item)))
|
||||||
|
@ -222,6 +225,9 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.
|
||||||
if ? cpb.getRepeatedField(4, prunepbs):
|
if ? cpb.getRepeatedField(4, prunepbs):
|
||||||
for item in prunepbs:
|
for item in prunepbs:
|
||||||
control.prune.add(? decodePrune(initProtoBuffer(item)))
|
control.prune.add(? decodePrune(initProtoBuffer(item)))
|
||||||
|
if ? cpb.getRepeatedField(5, idontwant):
|
||||||
|
for item in idontwant:
|
||||||
|
control.idontwant.add(? decodeIWant(initProtoBuffer(item)))
|
||||||
trace "decodeControl: message statistics", graft_count = len(control.graft),
|
trace "decodeControl: message statistics", graft_count = len(control.graft),
|
||||||
prune_count = len(control.prune),
|
prune_count = len(control.prune),
|
||||||
ihave_count = len(control.ihave),
|
ihave_count = len(control.ihave),
|
||||||
|
|
|
@ -796,3 +796,63 @@ suite "GossipSub":
|
||||||
)
|
)
|
||||||
|
|
||||||
await allFuturesThrowing(nodesFut.concat())
|
await allFuturesThrowing(nodesFut.concat())
|
||||||
|
|
||||||
|
asyncTest "e2e - iDontWant":
|
||||||
|
# 3 nodes: A <=> B <=> C
|
||||||
|
# (A & C are NOT connected). We pre-emptively send a dontwant from C to B,
|
||||||
|
# and check that B doesn't relay the message to C.
|
||||||
|
# We also check that B sends IDONTWANT to C, but not A
|
||||||
|
func dumbMsgIdProvider(m: Message): Result[MessageId, ValidationResult] =
|
||||||
|
ok(newSeq[byte](10))
|
||||||
|
let
|
||||||
|
nodes = generateNodes(
|
||||||
|
3,
|
||||||
|
gossip = true,
|
||||||
|
msgIdProvider = dumbMsgIdProvider
|
||||||
|
)
|
||||||
|
|
||||||
|
nodesFut = await allFinished(
|
||||||
|
nodes[0].switch.start(),
|
||||||
|
nodes[1].switch.start(),
|
||||||
|
nodes[2].switch.start(),
|
||||||
|
)
|
||||||
|
|
||||||
|
await nodes[0].switch.connect(nodes[1].switch.peerInfo.peerId, nodes[1].switch.peerInfo.addrs)
|
||||||
|
await nodes[1].switch.connect(nodes[2].switch.peerInfo.peerId, nodes[2].switch.peerInfo.addrs)
|
||||||
|
|
||||||
|
let bFinished = newFuture[void]()
|
||||||
|
proc handlerA(topic: string, data: seq[byte]) {.async, gcsafe.} = discard
|
||||||
|
proc handlerB(topic: string, data: seq[byte]) {.async, gcsafe.} = bFinished.complete()
|
||||||
|
proc handlerC(topic: string, data: seq[byte]) {.async, gcsafe.} = doAssert false
|
||||||
|
|
||||||
|
nodes[0].subscribe("foobar", handlerA)
|
||||||
|
nodes[1].subscribe("foobar", handlerB)
|
||||||
|
nodes[2].subscribe("foobar", handlerB)
|
||||||
|
await waitSubGraph(nodes, "foobar")
|
||||||
|
|
||||||
|
var gossip1: GossipSub = GossipSub(nodes[0])
|
||||||
|
var gossip2: GossipSub = GossipSub(nodes[1])
|
||||||
|
var gossip3: GossipSub = GossipSub(nodes[2])
|
||||||
|
|
||||||
|
check: gossip3.mesh.peers("foobar") == 1
|
||||||
|
|
||||||
|
gossip3.broadcast(gossip3.mesh["foobar"], RPCMsg(control: some(ControlMessage(
|
||||||
|
idontwant: @[ControlIWant(messageIds: @[newSeq[byte](10)])]
|
||||||
|
))))
|
||||||
|
checkExpiring: gossip2.mesh.getOrDefault("foobar").anyIt(it.heDontWants[^1].len == 1)
|
||||||
|
|
||||||
|
tryPublish await nodes[0].publish("foobar", newSeq[byte](10000)), 1
|
||||||
|
|
||||||
|
await bFinished
|
||||||
|
|
||||||
|
checkExpiring: toSeq(gossip3.mesh.getOrDefault("foobar")).anyIt(it.heDontWants[^1].len == 1)
|
||||||
|
check: toSeq(gossip1.mesh.getOrDefault("foobar")).anyIt(it.heDontWants[^1].len == 0)
|
||||||
|
|
||||||
|
await allFuturesThrowing(
|
||||||
|
nodes[0].switch.stop(),
|
||||||
|
nodes[1].switch.stop(),
|
||||||
|
nodes[2].switch.stop()
|
||||||
|
)
|
||||||
|
|
||||||
|
await allFuturesThrowing(nodesFut.concat())
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue