mirror of
https://github.com/status-im/nim-libp2p.git
synced 2025-02-17 23:36:34 +00:00
GossipSub: cancel inflight msgs when receiving duplicate
This commit is contained in:
parent
351bda2b56
commit
d306c448fa
@ -43,6 +43,7 @@ logScope:
|
|||||||
declareCounter(libp2p_gossipsub_failed_publish, "number of failed publish")
|
declareCounter(libp2p_gossipsub_failed_publish, "number of failed publish")
|
||||||
declareCounter(libp2p_gossipsub_invalid_topic_subscription, "number of invalid topic subscriptions that happened")
|
declareCounter(libp2p_gossipsub_invalid_topic_subscription, "number of invalid topic subscriptions that happened")
|
||||||
declareCounter(libp2p_gossipsub_duplicate_during_validation, "number of duplicates received during message validation")
|
declareCounter(libp2p_gossipsub_duplicate_during_validation, "number of duplicates received during message validation")
|
||||||
|
declareCounter(libp2p_gossipsub_duplicate_during_broadcast, "number of duplicates received during message broadcast")
|
||||||
declareCounter(libp2p_gossipsub_duplicate, "number of duplicates received")
|
declareCounter(libp2p_gossipsub_duplicate, "number of duplicates received")
|
||||||
declareCounter(libp2p_gossipsub_received, "number of messages received (deduplicated)")
|
declareCounter(libp2p_gossipsub_received, "number of messages received (deduplicated)")
|
||||||
|
|
||||||
@ -291,10 +292,28 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
|
|||||||
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])
|
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])
|
||||||
|
|
||||||
trace "sending control message", msg = shortLog(respControl), peer
|
trace "sending control message", msg = shortLog(respControl), peer
|
||||||
g.send(
|
asyncSpawn g.send(
|
||||||
peer,
|
peer,
|
||||||
RPCMsg(control: some(respControl), messages: messages))
|
RPCMsg(control: some(respControl), messages: messages))
|
||||||
|
|
||||||
|
proc lazyBroadcast(
|
||||||
|
g: GossipSub,
|
||||||
|
peers: seq[PubSubPeer],
|
||||||
|
msgIdSalted: MessageId,
|
||||||
|
msg: RPCMsg) {.async.} =
|
||||||
|
let
|
||||||
|
futsTable = g.cancellableBroadcast(peers, msg)
|
||||||
|
futs = toSeq(futsTable.values)
|
||||||
|
g.sendingFutures[msgIdSalted] = futsTable
|
||||||
|
await allFutures(futs)
|
||||||
|
g.sendingFutures.del(msgIdSalted)
|
||||||
|
|
||||||
|
# This isn't equal to the amount of saved bandwidth,
|
||||||
|
# because lower layer may send the data even after
|
||||||
|
# cancellation
|
||||||
|
let cancelledCount = futs.countIt(it.cancelled)
|
||||||
|
libp2p_gossipsub_duplicate_during_broadcast.inc(cancelledCount.int64)
|
||||||
|
|
||||||
proc validateAndRelay(g: GossipSub,
|
proc validateAndRelay(g: GossipSub,
|
||||||
msg: Message,
|
msg: Message,
|
||||||
msgId, msgIdSalted: MessageId,
|
msgId, msgIdSalted: MessageId,
|
||||||
@ -339,7 +358,7 @@ proc validateAndRelay(g: GossipSub,
|
|||||||
|
|
||||||
# In theory, if topics are the same in all messages, we could batch - we'd
|
# In theory, if topics are the same in all messages, we could batch - we'd
|
||||||
# also have to be careful to only include validated messages
|
# also have to be careful to only include validated messages
|
||||||
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
|
asyncSpawn g.lazyBroadcast(toSeq(toSendPeers), msgIdSalted, RPCMsg(messages: @[msg]))
|
||||||
trace "forwared message to peers", peers = toSendPeers.len, msgId, peer
|
trace "forwared message to peers", peers = toSendPeers.len, msgId, peer
|
||||||
for topic in msg.topicIds:
|
for topic in msg.topicIds:
|
||||||
if topic notin g.topics: continue
|
if topic notin g.topics: continue
|
||||||
@ -397,6 +416,10 @@ method rpcHandler*(g: GossipSub,
|
|||||||
let delay = Moment.now() - g.firstSeen(msgId)
|
let delay = Moment.now() - g.firstSeen(msgId)
|
||||||
g.rewardDelivered(peer, msg.topicIds, false, delay)
|
g.rewardDelivered(peer, msg.topicIds, false, delay)
|
||||||
|
|
||||||
|
g.sendingFutures.withValue(msgIdSalted, futs):
|
||||||
|
futs[].withValue(peer.peerId, fut):
|
||||||
|
fut[].cancel()
|
||||||
|
|
||||||
libp2p_gossipsub_duplicate.inc()
|
libp2p_gossipsub_duplicate.inc()
|
||||||
|
|
||||||
# onto the next message
|
# onto the next message
|
||||||
|
@ -677,7 +677,7 @@ proc onHeartbeat(g: GossipSub) {.raises: [Defect].} =
|
|||||||
libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicId])
|
libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicId])
|
||||||
else:
|
else:
|
||||||
libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"])
|
libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"])
|
||||||
g.send(peer, RPCMsg(control: some(control)))
|
asyncSpawn g.send(peer, RPCMsg(control: some(control)))
|
||||||
|
|
||||||
g.mcache.shift() # shift the cache
|
g.mcache.shift() # shift the cache
|
||||||
|
|
||||||
|
@ -145,6 +145,7 @@ type
|
|||||||
|
|
||||||
BackoffTable* = Table[string, Table[PeerId, Moment]]
|
BackoffTable* = Table[string, Table[PeerId, Moment]]
|
||||||
ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]]
|
ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]]
|
||||||
|
SendingFuturesTable* = Table[MessageId, Table[PeerId, Future[void]]]
|
||||||
|
|
||||||
RoutingRecordsPair* = tuple[id: PeerId, record: Option[PeerRecord]]
|
RoutingRecordsPair* = tuple[id: PeerId, record: Option[PeerRecord]]
|
||||||
RoutingRecordsHandler* =
|
RoutingRecordsHandler* =
|
||||||
@ -164,6 +165,7 @@ type
|
|||||||
control*: Table[string, ControlMessage] # pending control messages
|
control*: Table[string, ControlMessage] # pending control messages
|
||||||
mcache*: MCache # messages cache
|
mcache*: MCache # messages cache
|
||||||
validationSeen*: ValidationSeenTable # peers who sent us message in validation
|
validationSeen*: ValidationSeenTable # peers who sent us message in validation
|
||||||
|
sendingFutures*: SendingFuturesTable # messages which are currently being sent
|
||||||
heartbeatFut*: Future[void] # cancellation future for heartbeat interval
|
heartbeatFut*: Future[void] # cancellation future for heartbeat interval
|
||||||
scoringHeartbeatFut*: Future[void] # cancellation future for scoring heartbeat interval
|
scoringHeartbeatFut*: Future[void] # cancellation future for scoring heartbeat interval
|
||||||
heartbeatRunning*: bool
|
heartbeatRunning*: bool
|
||||||
|
@ -139,20 +139,17 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} =
|
|||||||
|
|
||||||
libp2p_pubsub_peers.set(p.peers.len.int64)
|
libp2p_pubsub_peers.set(p.peers.len.int64)
|
||||||
|
|
||||||
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) {.raises: [Defect].} =
|
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg): Future[void] {.raises: [Defect].} =
|
||||||
## Attempt to send `msg` to remote peer
|
## Attempt to send `msg` to remote peer
|
||||||
##
|
##
|
||||||
|
|
||||||
trace "sending pubsub message to peer", peer, msg = shortLog(msg)
|
trace "sending pubsub message to peer", peer, msg = shortLog(msg)
|
||||||
peer.send(msg, p.anonymize)
|
peer.send(msg, p.anonymize)
|
||||||
|
|
||||||
proc broadcast*(
|
proc updateBroadcastMetrics(
|
||||||
p: PubSub,
|
p: PubSub,
|
||||||
sendPeers: auto, # Iteratble[PubSubPeer]
|
msg: RPCMsg,
|
||||||
msg: RPCMsg) {.raises: [Defect].} =
|
npeers: int64) =
|
||||||
## Attempt to send `msg` to the given peers
|
|
||||||
|
|
||||||
let npeers = sendPeers.len.int64
|
|
||||||
for sub in msg.subscriptions:
|
for sub in msg.subscriptions:
|
||||||
if sub.subscribe:
|
if sub.subscribe:
|
||||||
if p.knownTopics.contains(sub.topic):
|
if p.knownTopics.contains(sub.topic):
|
||||||
@ -192,24 +189,50 @@ proc broadcast*(
|
|||||||
else:
|
else:
|
||||||
libp2p_pubsub_broadcast_prune.inc(npeers, labelValues = ["generic"])
|
libp2p_pubsub_broadcast_prune.inc(npeers, labelValues = ["generic"])
|
||||||
|
|
||||||
|
proc broadcast*(
|
||||||
|
p: PubSub,
|
||||||
|
sendPeers: auto, # Iteratble[PubSubPeer]
|
||||||
|
msg: RPCMsg) {.raises: [Defect].} =
|
||||||
|
## Attempt to send `msg` to the given peers
|
||||||
|
|
||||||
|
p.updateBroadcastMetrics(msg, sendPeers.len.int64)
|
||||||
trace "broadcasting messages to peers",
|
trace "broadcasting messages to peers",
|
||||||
peers = sendPeers.len, msg = shortLog(msg)
|
peers = sendPeers.len, msg = shortLog(msg)
|
||||||
|
|
||||||
if anyIt(sendPeers, it.hasObservers):
|
if anyIt(sendPeers, it.hasObservers):
|
||||||
for peer in sendPeers:
|
for peer in sendPeers:
|
||||||
p.send(peer, msg)
|
asyncSpawn p.send(peer, msg)
|
||||||
else:
|
else:
|
||||||
# Fast path that only encodes message once
|
# Fast path that only encodes message once
|
||||||
let encoded = encodeRpcMsg(msg, p.anonymize)
|
let encoded = encodeRpcMsg(msg, p.anonymize)
|
||||||
for peer in sendPeers:
|
for peer in sendPeers:
|
||||||
asyncSpawn peer.sendEncoded(encoded)
|
asyncSpawn peer.sendEncoded(encoded)
|
||||||
|
|
||||||
|
proc cancellableBroadcast*(
|
||||||
|
p: PubSub,
|
||||||
|
sendPeers: auto, # Iteratble[PubSubPeer]
|
||||||
|
msg: RPCMsg): Table[PeerId, Future[void]] {.raises: [Defect].} =
|
||||||
|
## Attempt to send `msg` to the given peers
|
||||||
|
|
||||||
|
p.updateBroadcastMetrics(msg, sendPeers.len)
|
||||||
|
trace "broadcasting messages to peers",
|
||||||
|
peers = sendPeers.len, msg = shortLog(msg)
|
||||||
|
|
||||||
|
if anyIt(sendPeers, it.hasObservers):
|
||||||
|
for peer in sendPeers:
|
||||||
|
result[peer.peerId] = p.send(peer, msg)
|
||||||
|
else:
|
||||||
|
# Fast path that only encodes message once
|
||||||
|
let encoded = encodeRpcMsg(msg, p.anonymize)
|
||||||
|
for peer in sendPeers:
|
||||||
|
result[peer.peerId] = peer.sendEncoded(encoded)
|
||||||
|
|
||||||
proc sendSubs*(p: PubSub,
|
proc sendSubs*(p: PubSub,
|
||||||
peer: PubSubPeer,
|
peer: PubSubPeer,
|
||||||
topics: openArray[string],
|
topics: openArray[string],
|
||||||
subscribe: bool) =
|
subscribe: bool) =
|
||||||
## send subscriptions to remote peer
|
## send subscriptions to remote peer
|
||||||
p.send(peer, RPCMsg.withSubs(topics, subscribe))
|
asyncSpawn p.send(peer, RPCMsg.withSubs(topics, subscribe))
|
||||||
|
|
||||||
for topic in topics:
|
for topic in topics:
|
||||||
if subscribe:
|
if subscribe:
|
||||||
|
@ -248,7 +248,9 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect], async.} =
|
|||||||
try:
|
try:
|
||||||
await conn.writeLp(msg)
|
await conn.writeLp(msg)
|
||||||
trace "sent pubsub message to remote", conn
|
trace "sent pubsub message to remote", conn
|
||||||
except CatchableError as exc: # never cancelled
|
except CancelledError as exc:
|
||||||
|
raise exc
|
||||||
|
except CatchableError as exc:
|
||||||
# Because we detach the send call from the currently executing task using
|
# Because we detach the send call from the currently executing task using
|
||||||
# asyncSpawn, no exceptions may leak out of it
|
# asyncSpawn, no exceptions may leak out of it
|
||||||
trace "Unable to send to remote", conn, msg = exc.msg
|
trace "Unable to send to remote", conn, msg = exc.msg
|
||||||
@ -257,7 +259,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect], async.} =
|
|||||||
|
|
||||||
await conn.close() # This will clean up the send connection
|
await conn.close() # This will clean up the send connection
|
||||||
|
|
||||||
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [Defect].} =
|
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool): Future[void] {.raises: [Defect].} =
|
||||||
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
|
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
|
||||||
|
|
||||||
# When sending messages, we take care to re-encode them with the right
|
# When sending messages, we take care to re-encode them with the right
|
||||||
@ -277,7 +279,7 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [Defect].} =
|
|||||||
sendMetrics(msg)
|
sendMetrics(msg)
|
||||||
encodeRpcMsg(msg, anonymize)
|
encodeRpcMsg(msg, anonymize)
|
||||||
|
|
||||||
asyncSpawn p.sendEncoded(encoded)
|
return p.sendEncoded(encoded)
|
||||||
|
|
||||||
proc new*(
|
proc new*(
|
||||||
T: typedesc[PubSubPeer],
|
T: typedesc[PubSubPeer],
|
||||||
|
Loading…
x
Reference in New Issue
Block a user