This commit is contained in:
Tanguy 2023-05-02 11:14:16 +02:00
parent 1629eccde0
commit 47fe560b07
No known key found for this signature in database
GPG Key ID: 7DD8EC6B6CE6C45E
7 changed files with 105 additions and 59 deletions

View File

@ -13,4 +13,5 @@ stew;https://github.com/status-im/nim-stew@#407a59883691d362db2fe8eab7f7c3b1f751
testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34
unittest2;https://github.com/status-im/nim-unittest2@#da8398c45cafd5bd7772da1fc96e3924a18d3823
websock;https://github.com/status-im/nim-websock@#fea05cde8b123b38d1a0a8524b77efbc84daa848
zlib;https://github.com/status-im/nim-zlib@#826e2fc013f55b4478802d4f2e39f187c50d520a
zlib;https://github.com/status-im/nim-zlib@#826e2fc013f55b4478802d4f2e39f187c50d520a
testground_sdk;https://github.com/status-im/testground-nim-sdk@#872753fe083c470ecce8e3a90091cf8dee7840ac

View File

@ -31,7 +31,7 @@ when defined(libp2p_mplex_metrics):
declareHistogram libp2p_mplex_qtime, "message queuing time"
when defined(libp2p_network_protocols_metrics):
declareCounter libp2p_protocols_bytes, "total sent or received bytes", ["protocol", "direction"]
declarePublicCounter libp2p_protocols_bytes, "total sent or received bytes", ["protocol", "direction"]
## Channel half-closed states
##
@ -236,7 +236,7 @@ proc completeWrite(
else:
await fut
when defined(libp2p_network_protocol_metrics):
when defined(libp2p_network_protocols_metrics):
if s.protocol.len > 0:
libp2p_protocols_bytes.inc(msgLen.int64, labelValues=[s.protocol, "out"])

View File

@ -44,8 +44,8 @@ logScope:
declareCounter(libp2p_gossipsub_failed_publish, "number of failed publish")
declareCounter(libp2p_gossipsub_invalid_topic_subscription, "number of invalid topic subscriptions that happened")
declareCounter(libp2p_gossipsub_duplicate_during_validation, "number of duplicates received during message validation")
declareCounter(libp2p_gossipsub_duplicate, "number of duplicates received")
declareCounter(libp2p_gossipsub_received, "number of messages received (deduplicated)")
declarePublicCounter(libp2p_gossipsub_duplicate, "number of duplicates received")
declarePublicCounter(libp2p_gossipsub_received, "number of messages received (deduplicated)")
proc init*(_: type[GossipSubParams]): GossipSubParams =
GossipSubParams(
@ -274,6 +274,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
respControl.prune.add(g.handleGraft(peer, control.graft))
let messages = g.handleIWant(peer, control.iwant)
g.handleDontSend(peer, control.dontSend)
#g.handleSending(peer, control.sending)
if
respControl.prune.len > 0 or
@ -338,12 +339,11 @@ proc validateAndRelay(g: GossipSub,
g.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
g.mesh.withValue(t, peers): toSendPeers.incl(peers[])
if msg.data.len >= g.parameters.lazyPushThreshold:
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(dontSend: @[ControlIHave(messageIds: @[msgId])]))))
# Don't send it to source peer, or peers that
# sent it during validation
toSendPeers.excl(peer)
if msg.data.len >= g.parameters.lazyPushThreshold:
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(dontSend: @[ControlIHave(messageIds: @[msgId])]))))
toSendPeers.excl(seenPeers)
if msg.data.len < g.parameters.lazyPushThreshold:
@ -351,7 +351,7 @@ proc validateAndRelay(g: GossipSub,
# also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
else:
let sem = newAsyncSemaphore(1)
let sem = newAsyncSemaphore(2)
var peers = toSeq(toSendPeers)
g.rng.shuffle(peers)
@ -361,10 +361,17 @@ proc validateAndRelay(g: GossipSub,
let fut = p.gotMsgs.mgetOrPut(msgId, newFuture[void]())
if fut.completed: return
#g.broadcast(@[p], RPCMsg(control: some(ControlMessage(sending: @[ControlIHave(messageIds: @[msgId], topicId: msg.topicIds[0])]))))
g.broadcast(@[p], RPCMsg(messages: @[msg]))
await fut or sleepAsync(200.milliseconds)
if not fut.completed:
echo g.switch.peerInfo.peerId, ": timeout from ", p.peerId
let estimation =
p.bufferSizeAtMoment(Moment.now()) div p.bandwidth# + p.minRtt
let start = Moment.now()
#await fut or sleepAsync(2.seconds)
await sleepAsync(milliseconds(estimation))
#if not fut.completed:
# echo g.switch.peerInfo.peerId, ": timeout from ", p.peerId
#echo g.switch.peerInfo.peerId, " sent in ", $(Moment.now() - start), " estimated ", milliseconds(estimation)
for p in peers:
asyncSpawn sendToOne(p)
@ -410,6 +417,8 @@ method rpcHandler*(g: GossipSub,
msgId = msgIdResult.get
msgIdSalted = msgId & g.seenSalt
g.broadcast(@[peer], RPCMsg(control: some(ControlMessage(dontSend: @[ControlIHave(messageIds: @[msgId])]))))
# addSeen adds salt to msgId to avoid
# remote attacking the hash function
if g.addSeen(msgId):
@ -599,17 +608,26 @@ method publish*(g: GossipSub,
var peersSeq = toSeq(peers)
g.rng.shuffle(peersSeq)
let sem = newAsyncSemaphore(1)
let sem = newAsyncSemaphore(2)
proc sendToOne(p: PubSubPeer) {.async.} =
await sem.acquire()
defer: sem.release()
let fut = p.gotMsgs.mgetOrPut(msgId, newFuture[void]())
if fut.completed: return
if fut.completed:
echo "peer already got"
return
g.broadcast(@[p], RPCMsg(control: some(ControlMessage(sending: @[ControlIHave(messageIds: @[msgId], topicId: topic)]))))
g.broadcast(@[p], RPCMsg(messages: @[msg]))
await fut or sleepAsync(200.milliseconds)
if not fut.completed:
echo g.switch.peerInfo.peerId, ": timeout from ", p.peerId
let estimation =
p.bufferSizeAtMoment(Moment.now()) div p.bandwidth# + p.minRtt
let start = Moment.now()
#await fut or sleepAsync(2.seconds)
await sleepAsync(milliseconds(estimation))
#if not fut.completed:
# echo g.switch.peerInfo.peerId, ": timeout from ", p.peerId
#echo g.switch.peerInfo.peerId, " sent in ", $(Moment.now() - start)
#echo g.switch.peerInfo.peerId, " sent in ", $(Moment.now() - start), " estimated ", milliseconds(estimation)
for p in peersSeq:
asyncSpawn sendToOne(p)

View File

@ -164,7 +164,7 @@ proc handleGraft*(g: GossipSub,
# If they send us a graft before they send us a subscribe, what should
# we do? For now, we add them to mesh but don't add them to gossipsub.
if topic in g.topics:
if g.mesh.peers(topic) < g.parameters.dHigh or peer.outbound:
if g.mesh.peers(topic) < g.parameters.dHigh:# or peer.outbound:
# In the spec, there's no mention of DHi here, but implicitly, a
# peer will be removed from the mesh on next rebalance, so we don't want
# this peer to push someone else out
@ -254,6 +254,15 @@ proc handleDontSend*(g: GossipSub,
if not fut.completed:
fut.complete()
proc handleSending*(g: GossipSub,
peer: PubSubPeer,
sending: seq[ControlIHave]) {.raises: [Defect].} =
for ds in sending:
var toSendPeers: HashSet[PubSubPeer]
g.mesh.withValue(ds.topicId, peer): toSendPeers.incl(peer[])
toSendPeers.excl(peer)
g.broadcast(toSeq(toSendPeers), RPCMsg(control: some(ControlMessage(dontSend: @[ControlIHave(messageIds: ds.messageIds)]))))
proc handleIHave*(g: GossipSub,
peer: PubSubPeer,
ihaves: seq[ControlIHave]): ControlIWant {.raises: [Defect].} =
@ -380,41 +389,44 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
else:
trace "replenishing mesh outbound quota", peers = g.mesh.peers(topic)
var
candidates: seq[PubSubPeer]
currentMesh = addr defaultMesh
var currentMesh = addr defaultMesh
g.mesh.withValue(topic, v): currentMesh = v
g.gossipsub.withValue(topic, peerList):
for it in peerList[]:
if
it.connected and
# get only outbound ones
it.outbound and
it notin currentMesh[] and
# avoid negative score peers
it.score >= 0.0 and
# don't pick explicit peers
it.peerId notin g.parameters.directPeers and
# and avoid peers we are backing off
it.peerId notin backingOff:
candidates.add(it)
# shuffle anyway, score might be not used
g.rng.shuffle(candidates)
let outboundPeers = currentMesh[].countIt(it.outbound)
# sort peers by score, high score first, we are grafting
candidates.sort(byScore, SortOrder.Descending)
if outboundPeers < g.parameters.dOut:
var candidates: seq[PubSubPeer]
g.gossipsub.withValue(topic, peerList):
for it in peerList[]:
if
it.connected and
# get only outbound ones
it.outbound and
it notin currentMesh[] and
# avoid negative score peers
it.score >= 0.0 and
# don't pick explicit peers
it.peerId notin g.parameters.directPeers and
# and avoid peers we are backing off
it.peerId notin backingOff:
candidates.add(it)
# Graft peers so we reach a count of D
candidates.setLen(min(candidates.len, g.parameters.dOut))
# shuffle anyway, score might be not used
g.rng.shuffle(candidates)
trace "grafting outbound peers", topic, peers = candidates.len
# sort peers by score, high score first, we are grafting
candidates.sort(byScore, SortOrder.Descending)
for peer in candidates:
if g.mesh.addPeer(topic, peer):
g.grafted(peer, topic)
g.fanout.removePeer(topic, peer)
grafts &= peer
# Graft peers so we reach a count of D
candidates.setLen(min(candidates.len, g.parameters.dOut - outboundPeers))
trace "grafting outbound peers", topic, peers = candidates.len
for peer in candidates:
if g.mesh.addPeer(topic, peer):
g.grafted(peer, topic)
g.fanout.removePeer(topic, peer)
grafts &= peer
# get again npeers after possible grafts

View File

@ -63,6 +63,7 @@ type
gotMsgs*: Table[MessageId, Future[void]]
sentData*: seq[(Moment, int)]
pings*: Table[seq[byte], Moment]
lastPing*: Moment
rtts*: seq[int]
bandwidth*: int # in bytes per ms
@ -114,14 +115,15 @@ proc bufferSizeAtMoment*(p: PubSubPeer, m: Moment): int =
if sent > m:
break
let timeGap = (sent - lastSent).milliseconds
result -= timeGap * p.bandwidth
result -= int(timeGap * p.bandwidth)
if result < 0: result = 0
result += size
lastSent = sent
let timeGap = (m - lastSent).milliseconds
result -= timeGap * p.bandwidth
result -= int(timeGap * p.bandwidth)
return max(0, result)
proc minRtt*(p: PubSubPeer): int =
@ -130,27 +132,29 @@ proc minRtt*(p: PubSubPeer): int =
else: 100
proc handlePong*(p: PubSubPeer, pong: seq[byte]) =
if pong notin p.pings: return
if pong notin p.pings:
return
let
pingMoment = p.pings.getOrDefault(pong)
delay = (Moment.now() - pingMoment).milliseconds
delay = int((Moment.now() - pingMoment).milliseconds)
minRtt = p.minRtt
p.rtts.add(delay)
if delay < minRtt:
if delay <= minRtt:
# can't make bandwidth estimate in this situation
return
let
bufferSizeWhenSendingPing = p.bufferSizeAtMoment(pingMoment)
estimatedBandwidth =
bufferSizeWhenSendingPing / delay
bufferSizeWhenSendingPing / (delay - minRtt)
if bufferSizeWhenSendingPing / p.bandwidth < minRtt / 5:
# can't make bandwidth estimate in this situation
if bufferSizeWhenSendingPing / p.bandwidth < minRtt / 6:
return
p.bandwidth = (p.bandwidth * 9 + int(estimatedBandwidth)) div 10
#echo minRtt, ", ", delay, " : ", bufferSizeWhenSendingPing, " ___ ", bufferSizeWhenSendingPing / p.bandwidth, " xx ", minRtt / 10
#echo "estimated ", p.bandwidth, " (", estimatedBandwidth, ")"
proc recvObservers(p: PubSubPeer, msg: var RPCMsg) =
# trigger hooks
@ -302,14 +306,18 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect], async.} =
trace "sending encoded msgs to peer", conn, encoded = shortLog(msg)
let startOfSend = Moment.now()
let
startOfSend = Moment.now()
lastPing = startOfSend - p.lastPing
p.sentData.add((startOfSend, msg.len))
try:
if p.bufferSizeAtMoment(startOfSend) / p.bandwidth < p.minRtt / 5:
if (p.bufferSizeAtMoment(startOfSend) / p.bandwidth > p.minRtt / 5) or# and lastPing.milliseconds > p.minRtt div 10) or
lastPing.milliseconds > p.minRtt * 2:
# ping it
let pingKey = p.rng[].generateBytes(32)
p.pings[pingKey] = startOfSend
await conn.writeLp(msg & encodeRpcMsg(RPCMsg(control: some(ControlMessage(ping: pingKey))), true))
p.lastPing = startOfSend
await conn.writeLp(msg) and conn.writeLp(encodeRpcMsg(RPCMsg(control: some(ControlMessage(ping: pingKey))), true))
else:
await conn.writeLp(msg)
trace "sent pubsub message to remote", conn
@ -359,7 +367,7 @@ proc new*(
onEvent: onEvent,
codec: codec,
peerId: peerId,
bandwidth: 6250,
bandwidth: 625,
connectedFut: newFuture[void](),
maxMessageSize: maxMessageSize,
rng: rng

View File

@ -43,6 +43,7 @@ type
ControlMessage* = object
ihave*: seq[ControlIHave]
dontSend*: seq[ControlIHave]
sending*: seq[ControlIHave]
iwant*: seq[ControlIWant]
graft*: seq[ControlGraft]
prune*: seq[ControlPrune]

View File

@ -96,6 +96,8 @@ proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) =
ipb.write(10, control.ping)
if control.pong.len > 0:
ipb.write(11, control.pong)
for ihave in control.sending:
ipb.write(6, ihave)
if len(ipb.buffer) > 0:
ipb.finish()
pb.write(field, ipb)
@ -217,6 +219,7 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.
var cpb = initProtoBuffer(buffer)
var ihavepbs: seq[seq[byte]]
var dontsendpbs: seq[seq[byte]]
var sendingpbs: seq[seq[byte]]
var iwantpbs: seq[seq[byte]]
var graftpbs: seq[seq[byte]]
var prunepbs: seq[seq[byte]]
@ -237,6 +240,9 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.
control.dontSend.add(? decodeIHave(initProtoBuffer(item)))
discard ? cpb.getField(10, control.ping)
discard ? cpb.getField(11, control.pong)
if ? cpb.getRepeatedField(6, sendingpbs):
for item in sendingpbs:
control.sending.add(? decodeIHave(initProtoBuffer(item)))
trace "decodeControl: message statistics", graft_count = len(control.graft),
prune_count = len(control.prune),
ihave_count = len(control.ihave),