idontwant with d=dlow-1 and iwant to pull messages!
This commit is contained in:
parent
60f953629d
commit
87f950a562
|
@ -46,6 +46,9 @@ declareCounter(libp2p_gossipsub_saved_bytes, "bytes saved by gossipsub optimizat
|
||||||
declareCounter(libp2p_gossipsub_duplicate, "number of duplicates received")
|
declareCounter(libp2p_gossipsub_duplicate, "number of duplicates received")
|
||||||
declareCounter(libp2p_gossipsub_received, "number of messages received (deduplicated)")
|
declareCounter(libp2p_gossipsub_received, "number of messages received (deduplicated)")
|
||||||
|
|
||||||
|
export libp2p_gossipsub_duplicate_during_validation, libp2p_gossipsub_idontwant_saved_messages,
|
||||||
|
libp2p_gossipsub_saved_bytes, libp2p_gossipsub_duplicate, libp2p_gossipsub_received
|
||||||
|
|
||||||
proc init*(_: type[GossipSubParams]): GossipSubParams =
|
proc init*(_: type[GossipSubParams]): GossipSubParams =
|
||||||
GossipSubParams(
|
GossipSubParams(
|
||||||
explicit: true,
|
explicit: true,
|
||||||
|
@ -270,7 +273,14 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
||||||
g.handlePrune(peer, control.prune)
|
g.handlePrune(peer, control.prune)
|
||||||
|
|
||||||
var respControl: ControlMessage
|
var respControl: ControlMessage
|
||||||
g.handleIDontWant(peer, control.idontwant)
|
var respCtrl: ControlMessage #for idontwant messages
|
||||||
|
|
||||||
|
let ineed = g.handleIDontWant(peer, control.idontwant)
|
||||||
|
if ineed.messageIds.len > 0:
|
||||||
|
respCtrl.iwant.add(ineed)
|
||||||
|
var mess: seq[Message] #g.handleIWant(peer, control.idontwant)
|
||||||
|
g.send(peer, RPCMsg(control: some(respCtrl), messages: mess))
|
||||||
|
|
||||||
let iwant = g.handleIHave(peer, control.ihave)
|
let iwant = g.handleIHave(peer, control.ihave)
|
||||||
if iwant.messageIds.len > 0:
|
if iwant.messageIds.len > 0:
|
||||||
respControl.iwant.add(iwant)
|
respControl.iwant.add(iwant)
|
||||||
|
@ -349,13 +359,7 @@ proc validateAndRelay(g: GossipSub,
|
||||||
toSendPeers.excl(peer)
|
toSendPeers.excl(peer)
|
||||||
toSendPeers.excl(seenPeers)
|
toSendPeers.excl(seenPeers)
|
||||||
|
|
||||||
# IDontWant is only worth it if the message is substantially
|
#we dont send idont want to peers from which we already received idontwant
|
||||||
# bigger than the messageId
|
|
||||||
if msg.data.len > msgId.len * 10:
|
|
||||||
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
|
|
||||||
idontwant: @[ControlIWant(messageIds: @[msgId])]
|
|
||||||
))))
|
|
||||||
|
|
||||||
for peer in toSendPeers:
|
for peer in toSendPeers:
|
||||||
for heDontWant in peer.heDontWants:
|
for heDontWant in peer.heDontWants:
|
||||||
if msgId in heDontWant:
|
if msgId in heDontWant:
|
||||||
|
@ -366,6 +370,21 @@ proc validateAndRelay(g: GossipSub,
|
||||||
toSendPeers.excl(seenPeers)
|
toSendPeers.excl(seenPeers)
|
||||||
|
|
||||||
|
|
||||||
|
# IDontWant is only worth it if the message is substantially bigger than the messageId
|
||||||
|
# We randomly send large message to a maximum of dLow peers, others get idontwant
|
||||||
|
if msg.data.len > msgId.len * 10:
|
||||||
|
var dontwantPeers: HashSet[PubSubPeer]
|
||||||
|
var lrgMsgPeers = toSendPeers.toSeq()
|
||||||
|
g.rng.shuffle(lrgMsgPeers)
|
||||||
|
for dpeer in lrgMsgPeers:
|
||||||
|
if (lrgMsgPeers.len - dontwantPeers.len) < g.parameters.dLow: break
|
||||||
|
dontwantPeers.incl(dpeer)
|
||||||
|
|
||||||
|
g.broadcast(dontwantPeers, RPCMsg(control: some(ControlMessage(
|
||||||
|
idontwant: @[ControlIWant(messageIds: @[msgId])]
|
||||||
|
))))
|
||||||
|
toSendPeers.excl(dontwantPeers)
|
||||||
|
|
||||||
# In theory, if topics are the same in all messages, we could batch - we'd
|
# In theory, if topics are the same in all messages, we could batch - we'd
|
||||||
# also have to be careful to only include validated messages
|
# also have to be careful to only include validated messages
|
||||||
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
|
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
|
||||||
|
|
|
@ -266,12 +266,19 @@ proc handleIHave*(g: GossipSub,
|
||||||
|
|
||||||
proc handleIDontWant*(g: GossipSub,
|
proc handleIDontWant*(g: GossipSub,
|
||||||
peer: PubSubPeer,
|
peer: PubSubPeer,
|
||||||
iDontWants: seq[ControlIWant]) =
|
iDontWants: seq[ControlIWant]): ControlIWant {.raises: [].}=
|
||||||
|
var res: ControlIWant
|
||||||
for dontWant in iDontWants:
|
for dontWant in iDontWants:
|
||||||
for messageId in dontWant.messageIds:
|
for messageId in dontWant.messageIds:
|
||||||
if peer.heDontWants[^1].len > 1000: break
|
if peer.heDontWants[^1].len > 1000: break
|
||||||
if messageId.len > 100: continue
|
if messageId.len > 100: continue
|
||||||
peer.heDontWants[^1].incl(messageId)
|
peer.heDontWants[^1].incl(messageId)
|
||||||
|
#we can ask for unreceived message, learnt through idontwantmessage
|
||||||
|
if (not g.hasSeen(messageId)) and messageId notin g.outstandingIWANTs:
|
||||||
|
if peer.score < g.parameters.gossipThreshold: continue #dont request from low score peer
|
||||||
|
g.outstandingIWANTs[messageId] = IWANTRequest(messageId: messageId, peer: peer, timestamp: Moment.now())
|
||||||
|
res.messageIds.add(messageId)
|
||||||
|
return res
|
||||||
|
|
||||||
proc handleIWant*(g: GossipSub,
|
proc handleIWant*(g: GossipSub,
|
||||||
peer: PubSubPeer,
|
peer: PubSubPeer,
|
||||||
|
|
Loading…
Reference in New Issue