From 47fe560b07e01dcefba178069b55ad78c64a9da0 Mon Sep 17 00:00:00 2001 From: Tanguy Date: Tue, 2 May 2023 11:14:16 +0200 Subject: [PATCH] sims --- .pinned | 3 +- libp2p/muxers/mplex/lpchannel.nim | 4 +- libp2p/protocols/pubsub/gossipsub.nim | 46 ++++++++---- .../protocols/pubsub/gossipsub/behavior.nim | 72 +++++++++++-------- libp2p/protocols/pubsub/pubsubpeer.nim | 32 +++++---- libp2p/protocols/pubsub/rpc/messages.nim | 1 + libp2p/protocols/pubsub/rpc/protobuf.nim | 6 ++ 7 files changed, 105 insertions(+), 59 deletions(-) diff --git a/.pinned b/.pinned index a9b5e63f5..9bf6877bd 100644 --- a/.pinned +++ b/.pinned @@ -13,4 +13,5 @@ stew;https://github.com/status-im/nim-stew@#407a59883691d362db2fe8eab7f7c3b1f751 testutils;https://github.com/status-im/nim-testutils@#dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34 unittest2;https://github.com/status-im/nim-unittest2@#da8398c45cafd5bd7772da1fc96e3924a18d3823 websock;https://github.com/status-im/nim-websock@#fea05cde8b123b38d1a0a8524b77efbc84daa848 -zlib;https://github.com/status-im/nim-zlib@#826e2fc013f55b4478802d4f2e39f187c50d520a \ No newline at end of file +zlib;https://github.com/status-im/nim-zlib@#826e2fc013f55b4478802d4f2e39f187c50d520a +testground_sdk;https://github.com/status-im/testground-nim-sdk@#872753fe083c470ecce8e3a90091cf8dee7840ac diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index e85999ea5..3e49d708c 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -31,7 +31,7 @@ when defined(libp2p_mplex_metrics): declareHistogram libp2p_mplex_qtime, "message queuing time" when defined(libp2p_network_protocols_metrics): - declareCounter libp2p_protocols_bytes, "total sent or received bytes", ["protocol", "direction"] + declarePublicCounter libp2p_protocols_bytes, "total sent or received bytes", ["protocol", "direction"] ## Channel half-closed states ## @@ -236,7 +236,7 @@ proc completeWrite( else: await fut - when defined(libp2p_network_protocol_metrics): + when defined(libp2p_network_protocols_metrics): if s.protocol.len > 0: libp2p_protocols_bytes.inc(msgLen.int64, labelValues=[s.protocol, "out"]) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index f85630a54..61dbc099c 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -44,8 +44,8 @@ logScope: declareCounter(libp2p_gossipsub_failed_publish, "number of failed publish") declareCounter(libp2p_gossipsub_invalid_topic_subscription, "number of invalid topic subscriptions that happened") declareCounter(libp2p_gossipsub_duplicate_during_validation, "number of duplicates received during message validation") -declareCounter(libp2p_gossipsub_duplicate, "number of duplicates received") -declareCounter(libp2p_gossipsub_received, "number of messages received (deduplicated)") +declarePublicCounter(libp2p_gossipsub_duplicate, "number of duplicates received") +declarePublicCounter(libp2p_gossipsub_received, "number of messages received (deduplicated)") proc init*(_: type[GossipSubParams]): GossipSubParams = GossipSubParams( @@ -274,6 +274,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) = respControl.prune.add(g.handleGraft(peer, control.graft)) let messages = g.handleIWant(peer, control.iwant) g.handleDontSend(peer, control.dontSend) + #g.handleSending(peer, control.sending) if respControl.prune.len > 0 or @@ -338,12 +339,11 @@ proc validateAndRelay(g: GossipSub, g.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) g.mesh.withValue(t, peers): toSendPeers.incl(peers[]) - if msg.data.len >= g.parameters.lazyPushThreshold: - g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(dontSend: @[ControlIHave(messageIds: @[msgId])])))) - # Don't send it to source peer, or peers that # sent it during validation toSendPeers.excl(peer) + if msg.data.len >= g.parameters.lazyPushThreshold: + g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(dontSend: @[ControlIHave(messageIds: @[msgId])])))) toSendPeers.excl(seenPeers) if msg.data.len < g.parameters.lazyPushThreshold: @@ -351,7 +351,7 @@ proc validateAndRelay(g: GossipSub, # also have to be careful to only include validated messages g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) else: - let sem = newAsyncSemaphore(1) + let sem = newAsyncSemaphore(2) var peers = toSeq(toSendPeers) g.rng.shuffle(peers) @@ -361,10 +361,17 @@ proc validateAndRelay(g: GossipSub, let fut = p.gotMsgs.mgetOrPut(msgId, newFuture[void]()) if fut.completed: return + + #g.broadcast(@[p], RPCMsg(control: some(ControlMessage(sending: @[ControlIHave(messageIds: @[msgId], topicId: msg.topicIds[0])])))) g.broadcast(@[p], RPCMsg(messages: @[msg])) - await fut or sleepAsync(200.milliseconds) - if not fut.completed: - echo g.switch.peerInfo.peerId, ": timeout from ", p.peerId + let estimation = + p.bufferSizeAtMoment(Moment.now()) div p.bandwidth# + p.minRtt + let start = Moment.now() + #await fut or sleepAsync(2.seconds) + await sleepAsync(milliseconds(estimation)) + #if not fut.completed: + # echo g.switch.peerInfo.peerId, ": timeout from ", p.peerId + #echo g.switch.peerInfo.peerId, " sent in ", $(Moment.now() - start), " estimated ", milliseconds(estimation) for p in peers: asyncSpawn sendToOne(p) @@ -410,6 +417,8 @@ method rpcHandler*(g: GossipSub, msgId = msgIdResult.get msgIdSalted = msgId & g.seenSalt + g.broadcast(@[peer], RPCMsg(control: some(ControlMessage(dontSend: @[ControlIHave(messageIds: @[msgId])])))) + # addSeen adds salt to msgId to avoid # remote attacking the hash function if g.addSeen(msgId): @@ -599,17 +608,26 @@ method publish*(g: GossipSub, var peersSeq = toSeq(peers) g.rng.shuffle(peersSeq) - let sem = newAsyncSemaphore(1) + let sem = newAsyncSemaphore(2) proc sendToOne(p: PubSubPeer) {.async.} = await sem.acquire() defer: sem.release() let fut = p.gotMsgs.mgetOrPut(msgId, newFuture[void]()) - if fut.completed: return + if fut.completed: + echo "peer already got" + return + g.broadcast(@[p], RPCMsg(control: some(ControlMessage(sending: @[ControlIHave(messageIds: @[msgId], topicId: topic)])))) g.broadcast(@[p], RPCMsg(messages: @[msg])) - await fut or sleepAsync(200.milliseconds) - if not fut.completed: - echo g.switch.peerInfo.peerId, ": timeout from ", p.peerId + let estimation = + p.bufferSizeAtMoment(Moment.now()) div p.bandwidth# + p.minRtt + let start = Moment.now() + #await fut or sleepAsync(2.seconds) + await sleepAsync(milliseconds(estimation)) + #if not fut.completed: + # echo g.switch.peerInfo.peerId, ": timeout from ", p.peerId + #echo g.switch.peerInfo.peerId, " sent in ", $(Moment.now() - start) + #echo g.switch.peerInfo.peerId, " sent in ", $(Moment.now() - start), " estimated ", milliseconds(estimation) for p in peersSeq: asyncSpawn sendToOne(p) diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index e6c8506c5..d664246b5 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -164,7 +164,7 @@ proc handleGraft*(g: GossipSub, # If they send us a graft before they send us a subscribe, what should # we do? For now, we add them to mesh but don't add them to gossipsub. if topic in g.topics: - if g.mesh.peers(topic) < g.parameters.dHigh or peer.outbound: + if g.mesh.peers(topic) < g.parameters.dHigh:# or peer.outbound: # In the spec, there's no mention of DHi here, but implicitly, a # peer will be removed from the mesh on next rebalance, so we don't want # this peer to push someone else out @@ -254,6 +254,15 @@ proc handleDontSend*(g: GossipSub, if not fut.completed: fut.complete() +proc handleSending*(g: GossipSub, + peer: PubSubPeer, + sending: seq[ControlIHave]) {.raises: [Defect].} = + for ds in sending: + var toSendPeers: HashSet[PubSubPeer] + g.mesh.withValue(ds.topicId, peer): toSendPeers.incl(peer[]) + toSendPeers.excl(peer) + g.broadcast(toSeq(toSendPeers), RPCMsg(control: some(ControlMessage(dontSend: @[ControlIHave(messageIds: ds.messageIds)])))) + proc handleIHave*(g: GossipSub, peer: PubSubPeer, ihaves: seq[ControlIHave]): ControlIWant {.raises: [Defect].} = @@ -380,41 +389,44 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) else: trace "replenishing mesh outbound quota", peers = g.mesh.peers(topic) - var - candidates: seq[PubSubPeer] - currentMesh = addr defaultMesh + var currentMesh = addr defaultMesh g.mesh.withValue(topic, v): currentMesh = v - g.gossipsub.withValue(topic, peerList): - for it in peerList[]: - if - it.connected and - # get only outbound ones - it.outbound and - it notin currentMesh[] and - # avoid negative score peers - it.score >= 0.0 and - # don't pick explicit peers - it.peerId notin g.parameters.directPeers and - # and avoid peers we are backing off - it.peerId notin backingOff: - candidates.add(it) - # shuffle anyway, score might be not used - g.rng.shuffle(candidates) + let outboundPeers = currentMesh[].countIt(it.outbound) - # sort peers by score, high score first, we are grafting - candidates.sort(byScore, SortOrder.Descending) + if outboundPeers < g.parameters.dOut: + var candidates: seq[PubSubPeer] + g.gossipsub.withValue(topic, peerList): + for it in peerList[]: + if + it.connected and + # get only outbound ones + it.outbound and + it notin currentMesh[] and + # avoid negative score peers + it.score >= 0.0 and + # don't pick explicit peers + it.peerId notin g.parameters.directPeers and + # and avoid peers we are backing off + it.peerId notin backingOff: + candidates.add(it) - # Graft peers so we reach a count of D - candidates.setLen(min(candidates.len, g.parameters.dOut)) + # shuffle anyway, score might be not used + g.rng.shuffle(candidates) - trace "grafting outbound peers", topic, peers = candidates.len + # sort peers by score, high score first, we are grafting + candidates.sort(byScore, SortOrder.Descending) - for peer in candidates: - if g.mesh.addPeer(topic, peer): - g.grafted(peer, topic) - g.fanout.removePeer(topic, peer) - grafts &= peer + # Graft peers so we reach a count of D + candidates.setLen(min(candidates.len, g.parameters.dOut - outboundPeers)) + + trace "grafting outbound peers", topic, peers = candidates.len + + for peer in candidates: + if g.mesh.addPeer(topic, peer): + g.grafted(peer, topic) + g.fanout.removePeer(topic, peer) + grafts &= peer # get again npeers after possible grafts diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index a10a57a9e..f466645b5 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -63,6 +63,7 @@ type gotMsgs*: Table[MessageId, Future[void]] sentData*: seq[(Moment, int)] pings*: Table[seq[byte], Moment] + lastPing*: Moment rtts*: seq[int] bandwidth*: int # in bytes per ms @@ -114,14 +115,15 @@ proc bufferSizeAtMoment*(p: PubSubPeer, m: Moment): int = if sent > m: break let timeGap = (sent - lastSent).milliseconds - result -= timeGap * p.bandwidth + result -= int(timeGap * p.bandwidth) if result < 0: result = 0 result += size lastSent = sent let timeGap = (m - lastSent).milliseconds - result -= timeGap * p.bandwidth + result -= int(timeGap * p.bandwidth) + return max(0, result) proc minRtt*(p: PubSubPeer): int = @@ -130,27 +132,29 @@ proc minRtt*(p: PubSubPeer): int = else: 100 proc handlePong*(p: PubSubPeer, pong: seq[byte]) = - if pong notin p.pings: return + if pong notin p.pings: + return let pingMoment = p.pings.getOrDefault(pong) - delay = (Moment.now() - pingMoment).milliseconds + delay = int((Moment.now() - pingMoment).milliseconds) minRtt = p.minRtt p.rtts.add(delay) - if delay < minRtt: + if delay <= minRtt: # can't make bandwidth estimate in this situation return let bufferSizeWhenSendingPing = p.bufferSizeAtMoment(pingMoment) estimatedBandwidth = - bufferSizeWhenSendingPing / delay + bufferSizeWhenSendingPing / (delay - minRtt) - if bufferSizeWhenSendingPing / p.bandwidth < minRtt / 5: - # can't make bandwidth estimate in this situation + if bufferSizeWhenSendingPing / p.bandwidth < minRtt / 6: return p.bandwidth = (p.bandwidth * 9 + int(estimatedBandwidth)) div 10 + #echo minRtt, ", ", delay, " : ", bufferSizeWhenSendingPing, " ___ ", bufferSizeWhenSendingPing / p.bandwidth, " xx ", minRtt / 10 + #echo "estimated ", p.bandwidth, " (", estimatedBandwidth, ")" proc recvObservers(p: PubSubPeer, msg: var RPCMsg) = # trigger hooks @@ -302,14 +306,18 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [Defect], async.} = trace "sending encoded msgs to peer", conn, encoded = shortLog(msg) - let startOfSend = Moment.now() + let + startOfSend = Moment.now() + lastPing = startOfSend - p.lastPing p.sentData.add((startOfSend, msg.len)) try: - if p.bufferSizeAtMoment(startOfSend) / p.bandwidth < p.minRtt / 5: + if (p.bufferSizeAtMoment(startOfSend) / p.bandwidth > p.minRtt / 5) or# and lastPing.milliseconds > p.minRtt div 10) or + lastPing.milliseconds > p.minRtt * 2: # ping it let pingKey = p.rng[].generateBytes(32) p.pings[pingKey] = startOfSend - await conn.writeLp(msg & encodeRpcMsg(RPCMsg(control: some(ControlMessage(ping: pingKey))), true)) + p.lastPing = startOfSend + await conn.writeLp(msg) and conn.writeLp(encodeRpcMsg(RPCMsg(control: some(ControlMessage(ping: pingKey))), true)) else: await conn.writeLp(msg) trace "sent pubsub message to remote", conn @@ -359,7 +367,7 @@ proc new*( onEvent: onEvent, codec: codec, peerId: peerId, - bandwidth: 6250, + bandwidth: 625, connectedFut: newFuture[void](), maxMessageSize: maxMessageSize, rng: rng diff --git a/libp2p/protocols/pubsub/rpc/messages.nim b/libp2p/protocols/pubsub/rpc/messages.nim index ce00ff58b..912d6b84c 100644 --- a/libp2p/protocols/pubsub/rpc/messages.nim +++ b/libp2p/protocols/pubsub/rpc/messages.nim @@ -43,6 +43,7 @@ type ControlMessage* = object ihave*: seq[ControlIHave] dontSend*: seq[ControlIHave] + sending*: seq[ControlIHave] iwant*: seq[ControlIWant] graft*: seq[ControlGraft] prune*: seq[ControlPrune] diff --git a/libp2p/protocols/pubsub/rpc/protobuf.nim b/libp2p/protocols/pubsub/rpc/protobuf.nim index 1779d2867..be1d25a90 100644 --- a/libp2p/protocols/pubsub/rpc/protobuf.nim +++ b/libp2p/protocols/pubsub/rpc/protobuf.nim @@ -96,6 +96,8 @@ proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) = ipb.write(10, control.ping) if control.pong.len > 0: ipb.write(11, control.pong) + for ihave in control.sending: + ipb.write(6, ihave) if len(ipb.buffer) > 0: ipb.finish() pb.write(field, ipb) @@ -217,6 +219,7 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {. var cpb = initProtoBuffer(buffer) var ihavepbs: seq[seq[byte]] var dontsendpbs: seq[seq[byte]] + var sendingpbs: seq[seq[byte]] var iwantpbs: seq[seq[byte]] var graftpbs: seq[seq[byte]] var prunepbs: seq[seq[byte]] @@ -237,6 +240,9 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {. control.dontSend.add(? decodeIHave(initProtoBuffer(item))) discard ? cpb.getField(10, control.ping) discard ? cpb.getField(11, control.pong) + if ? cpb.getRepeatedField(6, sendingpbs): + for item in sendingpbs: + control.sending.add(? decodeIHave(initProtoBuffer(item))) trace "decodeControl: message statistics", graft_count = len(control.graft), prune_count = len(control.prune), ihave_count = len(control.ihave),