Send IDontWant before validation + Use IMReceiving messages to minimize redundant transmission. Experimental PR intended for discussion only!
This commit is contained in:
parent
48a3ac06ff
commit
e7c54bed8d
|
@ -275,6 +275,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
|||
|
||||
var respControl: ControlMessage
|
||||
g.handleIDontWant(peer, control.idontwant)
|
||||
g.handleIMReceiving(peer, control.imreceiving)
|
||||
let iwant = g.handleIHave(peer, control.ihave)
|
||||
if iwant.messageIds.len > 0:
|
||||
respControl.iwant.add(iwant)
|
||||
|
@ -320,10 +321,39 @@ proc validateAndRelay(g: GossipSub,
|
|||
msg: Message,
|
||||
msgId, msgIdSalted: MessageId,
|
||||
peer: PubSubPeer) {.async.} =
|
||||
try:
|
||||
let validation = await g.validate(msg)
|
||||
|
||||
var seenPeers: HashSet[PubSubPeer]
|
||||
var seenPeers: HashSet[PubSubPeer]
|
||||
var toSendPeers = HashSet[PubSubPeer]()
|
||||
for t in msg.topicIds: # for every topic in the message
|
||||
if t notin g.topics:
|
||||
continue
|
||||
|
||||
g.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
|
||||
g.mesh.withValue(t, peers): toSendPeers.incl(peers[])
|
||||
# add direct peers
|
||||
toSendPeers.incl(g.subscribedDirectPeers.getOrDefault(t))
|
||||
# Don't send it to source peer
|
||||
toSendPeers.excl(peer)
|
||||
|
||||
for peer in toSendPeers:
|
||||
for heDontWant in peer.heDontWants:
|
||||
if msgId in heDontWant:
|
||||
seenPeers.incl(peer)
|
||||
libp2p_gossipsub_idontwant_saved_messages.inc
|
||||
libp2p_gossipsub_saved_bytes.inc(msg.data.len.int64, labelValues = ["idontwant"])
|
||||
break
|
||||
toSendPeers.excl(seenPeers)
|
||||
|
||||
try:
|
||||
# IDontWant is only worth it if the message is substantially
|
||||
# bigger than the messageId
|
||||
if msg.data.len > msgId.len * 10:
|
||||
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
|
||||
idontwant: @[ControlIWant(messageIds: @[msgId])]
|
||||
))), isHighPriority = true)
|
||||
|
||||
seenPeers.clear()
|
||||
let validation = await g.validate(msg)
|
||||
discard g.validationSeen.pop(msgIdSalted, seenPeers)
|
||||
libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64)
|
||||
libp2p_gossipsub_saved_bytes.inc((msg.data.len * seenPeers.len).int64, labelValues = ["validation_duplicate"])
|
||||
|
@ -346,35 +376,15 @@ proc validateAndRelay(g: GossipSub,
|
|||
|
||||
g.rewardDelivered(peer, msg.topicIds, true)
|
||||
|
||||
var toSendPeers = HashSet[PubSubPeer]()
|
||||
for t in msg.topicIds: # for every topic in the message
|
||||
if t notin g.topics:
|
||||
continue
|
||||
|
||||
g.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
|
||||
g.mesh.withValue(t, peers): toSendPeers.incl(peers[])
|
||||
|
||||
# add direct peers
|
||||
toSendPeers.incl(g.subscribedDirectPeers.getOrDefault(t))
|
||||
|
||||
# Don't send it to source peer, or peers that
|
||||
# sent it during validation
|
||||
toSendPeers.excl(peer)
|
||||
# Don't send it to peers that sent it during validation
|
||||
toSendPeers.excl(seenPeers)
|
||||
|
||||
# IDontWant is only worth it if the message is substantially
|
||||
# bigger than the messageId
|
||||
if msg.data.len > msgId.len * 10:
|
||||
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
|
||||
idontwant: @[ControlIWant(messageIds: @[msgId])]
|
||||
))), isHighPriority = true)
|
||||
|
||||
#We have received IMReceiving from these peers, We should not exclude them
|
||||
#Ideally we should wait (TxTime + large safety cushion) before sending to these peers
|
||||
for peer in toSendPeers:
|
||||
for heDontWant in peer.heDontWants:
|
||||
if msgId in heDontWant:
|
||||
for heIsReceiving in peer.heIsReceivings:
|
||||
if msgId in heIsReceiving:
|
||||
seenPeers.incl(peer)
|
||||
libp2p_gossipsub_idontwant_saved_messages.inc
|
||||
libp2p_gossipsub_saved_bytes.inc(msg.data.len.int64, labelValues = ["idontwant"])
|
||||
break
|
||||
toSendPeers.excl(seenPeers)
|
||||
|
||||
|
|
|
@ -272,6 +272,37 @@ proc handleIDontWant*(g: GossipSub,
|
|||
if messageId.len > 100: continue
|
||||
peer.heDontWants[^1].incl(messageId)
|
||||
|
||||
var toSendPeers = HashSet[PubSubPeer]()
|
||||
|
||||
#Experimental change for quick performance evaluation only (Ideally for very large messages):
|
||||
#[
|
||||
1) IDontWant is followed by the message. IMReceiving informs peers that we are receiving this message
|
||||
2) Prototype implementation for a single topic ("test"). Need topic ID in IDontWant
|
||||
|
||||
3) More reasonable solution can be to send Message detail before message, That can be used for IMReceiving
|
||||
]#
|
||||
g.floodsub.withValue("test", peers): toSendPeers.incl(peers[])
|
||||
g.mesh.withValue("test", peers): toSendPeers.incl(peers[])
|
||||
|
||||
# add direct peers
|
||||
toSendPeers.incl(g.subscribedDirectPeers.getOrDefault("test"))
|
||||
|
||||
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
|
||||
imreceiving: @[ControlIWant(messageIds: @[messageId])]
|
||||
))), isHighPriority = true)
|
||||
|
||||
|
||||
|
||||
proc handleIMReceiving*(g: GossipSub,
|
||||
peer: PubSubPeer,
|
||||
imreceivings: seq[ControlIWant]) =
|
||||
for imreceiving in imreceivings:
|
||||
for messageId in imreceiving.messageIds:
|
||||
if peer.heIsReceivings[^1].len > 1000: break
|
||||
if messageId.len > 100: continue
|
||||
peer.heIsReceivings[^1].incl(messageId)
|
||||
|
||||
|
||||
proc handleIWant*(g: GossipSub,
|
||||
peer: PubSubPeer,
|
||||
iwants: seq[ControlIWant]): seq[Message] {.raises: [].} =
|
||||
|
|
|
@ -75,6 +75,7 @@ type
|
|||
score*: float64
|
||||
sentIHaves*: Deque[HashSet[MessageId]]
|
||||
heDontWants*: Deque[HashSet[MessageId]]
|
||||
heIsReceivings*:Deque[HashSet[MessageId]]
|
||||
iHaveBudget*: int
|
||||
pingBudget*: int
|
||||
maxMessageSize: int
|
||||
|
@ -480,4 +481,5 @@ proc new*(
|
|||
)
|
||||
result.sentIHaves.addFirst(default(HashSet[MessageId]))
|
||||
result.heDontWants.addFirst(default(HashSet[MessageId]))
|
||||
result.heIsReceivings.addFirst(default(HashSet[MessageId]))
|
||||
result.startSendNonPriorityTask()
|
||||
|
|
|
@ -51,6 +51,7 @@ type
|
|||
graft*: seq[ControlGraft]
|
||||
prune*: seq[ControlPrune]
|
||||
idontwant*: seq[ControlIWant]
|
||||
imreceiving*: seq[ControlIWant]
|
||||
|
||||
ControlIHave* = object
|
||||
topicId*: string
|
||||
|
@ -163,11 +164,12 @@ static: expectedFields(ControlPrune, @["topicId", "peers", "backoff"])
|
|||
proc byteSize(controlPrune: ControlPrune): int =
|
||||
controlPrune.topicId.len + controlPrune.peers.foldl(a + b.byteSize, 0) + 8 # 8 bytes for uint64
|
||||
|
||||
static: expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant"])
|
||||
static: expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant", "imreceiving"])
|
||||
proc byteSize(control: ControlMessage): int =
|
||||
control.ihave.foldl(a + b.byteSize, 0) + control.iwant.foldl(a + b.byteSize, 0) +
|
||||
control.graft.foldl(a + b.byteSize, 0) + control.prune.foldl(a + b.byteSize, 0) +
|
||||
control.idontwant.foldl(a + b.byteSize, 0)
|
||||
control.idontwant.foldl(a + b.byteSize, 0) +
|
||||
control.imreceiving.foldl(a + b.byteSize, 0)
|
||||
|
||||
static: expectedFields(RPCMsg, @["subscriptions", "messages", "control", "ping", "pong"])
|
||||
proc byteSize*(rpc: RPCMsg): int =
|
||||
|
|
|
@ -89,6 +89,8 @@ proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) =
|
|||
ipb.write(4, prune)
|
||||
for idontwant in control.idontwant:
|
||||
ipb.write(5, idontwant)
|
||||
for imreceiving in control.imreceiving:
|
||||
ipb.write(6, imreceiving)
|
||||
if len(ipb.buffer) > 0:
|
||||
ipb.finish()
|
||||
pb.write(field, ipb)
|
||||
|
@ -213,6 +215,7 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.
|
|||
var graftpbs: seq[seq[byte]]
|
||||
var prunepbs: seq[seq[byte]]
|
||||
var idontwant: seq[seq[byte]]
|
||||
var imreceiving: seq[seq[byte]]
|
||||
if ? cpb.getRepeatedField(1, ihavepbs):
|
||||
for item in ihavepbs:
|
||||
control.ihave.add(? decodeIHave(initProtoBuffer(item)))
|
||||
|
@ -228,6 +231,9 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.
|
|||
if ? cpb.getRepeatedField(5, idontwant):
|
||||
for item in idontwant:
|
||||
control.idontwant.add(? decodeIWant(initProtoBuffer(item)))
|
||||
if ? cpb.getRepeatedField(6, imreceiving):
|
||||
for item in imreceiving:
|
||||
control.imreceiving.add(? decodeIWant(initProtoBuffer(item)))
|
||||
trace "decodeControl: message statistics", graft_count = len(control.graft),
|
||||
prune_count = len(control.prune),
|
||||
ihave_count = len(control.ihave),
|
||||
|
|
Loading…
Reference in New Issue