Merge branch 'p2p-research' into staggered_send_with_late_elimination

This commit is contained in:
diegomrsantos 2024-06-19 16:54:36 +02:00 committed by GitHub
commit a478b8f4d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 80 additions and 41 deletions

View File

@ -134,7 +134,8 @@ The libp2p implementation in Nim is a work in progress. We welcome contributors
- Go through the modules and **check out existing issues**. This would be especially useful for modules in active development. Some knowledge of IPFS/libp2p may be required, as well as the infrastructure behind it. - Go through the modules and **check out existing issues**. This would be especially useful for modules in active development. Some knowledge of IPFS/libp2p may be required, as well as the infrastructure behind it.
- **Perform code reviews**. Feel free to let us know if you found anything that can a) speed up the project development b) ensure better quality and c) reduce possible future bugs. - **Perform code reviews**. Feel free to let us know if you found anything that can a) speed up the project development b) ensure better quality and c) reduce possible future bugs.
- **Add tests**. Help nim-libp2p to be more robust by adding more tests to the [tests folder](tests/). - **Add tests**. Help nim-libp2p to be more robust by adding more tests to the [tests folder](tests/).
- **Small PRs**. Try to keep PRs atomic and digestible. This makes the review process and pinpointing bugs easier.
- **Code format**. Please format code using [nph](https://github.com/arnetheduck/nph).
The code follows the [Status Nim Style Guide](https://status-im.github.io/nim-style-guide/). The code follows the [Status Nim Style Guide](https://status-im.github.io/nim-style-guide/).
### Contributors ### Contributors

View File

@ -164,7 +164,6 @@ type
closedRemotely: Future[void].Raising([]) closedRemotely: Future[void].Raising([])
closedLocally: bool closedLocally: bool
receivedData: AsyncEvent receivedData: AsyncEvent
returnedEof: bool
proc `$`(channel: YamuxChannel): string = proc `$`(channel: YamuxChannel): string =
result = if channel.conn.dir == Out: "=> " else: "<= " result = if channel.conn.dir == Out: "=> " else: "<= "
@ -204,8 +203,8 @@ proc remoteClosed(channel: YamuxChannel) {.async: (raises: []).} =
method closeImpl*(channel: YamuxChannel) {.async: (raises: []).} = method closeImpl*(channel: YamuxChannel) {.async: (raises: []).} =
if not channel.closedLocally: if not channel.closedLocally:
trace "Closing yamux channel locally", streamId = channel.id, conn = channel.conn
channel.closedLocally = true channel.closedLocally = true
channel.isEof = true
if not channel.isReset and channel.sendQueue.len == 0: if not channel.isReset and channel.sendQueue.len == 0:
try: await channel.conn.write(YamuxHeader.data(channel.id, 0, {Fin})) try: await channel.conn.write(YamuxHeader.data(channel.id, 0, {Fin}))
@ -273,7 +272,7 @@ method readOnce*(
newLPStreamClosedError() newLPStreamClosedError()
else: else:
newLPStreamConnDownError() newLPStreamConnDownError()
if channel.returnedEof: if channel.isEof:
raise newLPStreamRemoteClosedError() raise newLPStreamRemoteClosedError()
if channel.recvQueue.len == 0: if channel.recvQueue.len == 0:
channel.receivedData.clear() channel.receivedData.clear()
@ -281,9 +280,8 @@ method readOnce*(
discard await race(channel.closedRemotely, channel.receivedData.wait()) discard await race(channel.closedRemotely, channel.receivedData.wait())
except ValueError: raiseAssert("Futures list is not empty") except ValueError: raiseAssert("Futures list is not empty")
if channel.closedRemotely.completed() and channel.recvQueue.len == 0: if channel.closedRemotely.completed() and channel.recvQueue.len == 0:
channel.returnedEof = true
channel.isEof = true channel.isEof = true
return 0 return 0 # we return 0 to indicate that the channel is closed for reading from now on
let toRead = min(channel.recvQueue.len, nbytes) let toRead = min(channel.recvQueue.len, nbytes)

View File

@ -56,7 +56,7 @@ method init*(p: Ping) =
trace "handling ping", conn trace "handling ping", conn
var buf: array[PingSize, byte] var buf: array[PingSize, byte]
await conn.readExactly(addr buf[0], PingSize) await conn.readExactly(addr buf[0], PingSize)
trace "echoing ping", conn trace "echoing ping", conn, pingData = @buf
await conn.write(@buf) await conn.write(@buf)
if not isNil(p.pingHandler): if not isNil(p.pingHandler):
await p.pingHandler(conn.peerId) await p.pingHandler(conn.peerId)

View File

@ -363,6 +363,30 @@ proc validateAndRelay(g: GossipSub,
msgId: MessageId, saltedId: SaltedId, msgId: MessageId, saltedId: SaltedId,
peer: PubSubPeer) {.async.} = peer: PubSubPeer) {.async.} =
try: try:
template topic: string = msg.topic
proc addToSendPeers(toSendPeers: var HashSet[PubSubPeer]) =
g.floodsub.withValue(topic, peers): toSendPeers.incl(peers[])
g.mesh.withValue(topic, peers): toSendPeers.incl(peers[])
g.subscribedDirectPeers.withValue(topic, peers): toSendPeers.incl(peers[])
toSendPeers.excl(peer)
if msg.data.len > max(512, msgId.len * 10):
# If the message is "large enough", let the mesh know that we do not want
# any more copies of it, regardless if it is valid or not.
#
# In the case that it is not valid, this leads to some redundancy
# (since the other peer should not send us an invalid message regardless),
# but the expectation is that this is rare (due to such peers getting
# descored) and that the savings from honest peers are greater than the
# cost a dishonest peer can incur in short time (since the IDONTWANT is
# small).
var toSendPeers = HashSet[PubSubPeer]()
addToSendPeers(toSendPeers)
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
idontwant: @[ControlIWant(messageIDs: @[msgId])]
))), isHighPriority = true)
let validation = await g.validate(msg) let validation = await g.validate(msg)
var seenPeers: HashSet[PubSubPeer] var seenPeers: HashSet[PubSubPeer]
@ -383,40 +407,30 @@ proc validateAndRelay(g: GossipSub,
of ValidationResult.Accept: of ValidationResult.Accept:
discard discard
if topic notin g.topics:
return # Topic was unsubscribed while validating
# store in cache only after validation # store in cache only after validation
g.mcache.put(msgId, msg) g.mcache.put(msgId, msg)
let topic = msg.topic
g.rewardDelivered(peer, topic, true) g.rewardDelivered(peer, topic, true)
# The send list typically matches the idontwant list from above, but
# might differ if validation takes time
var toSendPeers = HashSet[PubSubPeer]() var toSendPeers = HashSet[PubSubPeer]()
if topic notin g.topics: addToSendPeers(toSendPeers)
return # Don't send it to peers that sent it during validation
g.floodsub.withValue(topic, peers): toSendPeers.incl(peers[])
g.mesh.withValue(topic, peers): toSendPeers.incl(peers[])
g.subscribedDirectPeers.withValue(topic, peers): toSendPeers.incl(peers[])
# Don't send it to source peer, or peers that
# sent it during validation
toSendPeers.excl(peer)
toSendPeers.excl(seenPeers) toSendPeers.excl(seenPeers)
# IDontWant is only worth it if the message is substantially var peersWhoSentIdontwant = HashSet[PubSubPeer]()
# bigger than the messageId
if msg.data.len > msgId.len * 10:
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
idontwant: @[ControlIWant(messageIDs: @[msgId])]
))), isHighPriority = true)
for peer in toSendPeers: for peer in toSendPeers:
for heDontWant in peer.heDontWants: for heDontWant in peer.heDontWants:
if saltedId in heDontWant: if saltedId in heDontWant:
seenPeers.incl(peer) peersWhoSentIdontwant.incl(peer)
libp2p_gossipsub_idontwant_saved_messages.inc libp2p_gossipsub_idontwant_saved_messages.inc
libp2p_gossipsub_saved_bytes.inc(msg.data.len.int64, labelValues = ["idontwant"]) libp2p_gossipsub_saved_bytes.inc(msg.data.len.int64, labelValues = ["idontwant"])
break break
toSendPeers.excl(seenPeers) toSendPeers.excl(peersWhoSentIdontwant) # avoids len(s) == length` the length of the HashSet changed while iterating over it [AssertionDefect]
#We first send to the outbound peers to avoid peers sending same message to each other #We first send to the outbound peers to avoid peers sending same message to each other
var outboundPeers: seq[PubSubPeer] var outboundPeers: seq[PubSubPeer]
@ -510,6 +524,8 @@ method rpcHandler*(g: GossipSub,
for i in 0..<rpcMsg.messages.len(): # for every message for i in 0..<rpcMsg.messages.len(): # for every message
template msg: untyped = rpcMsg.messages[i] template msg: untyped = rpcMsg.messages[i]
template topic: string = msg.topic
let msgIdResult = g.msgIdProvider(msg) let msgIdResult = g.msgIdProvider(msg)
if msgIdResult.isErr: if msgIdResult.isErr:
@ -521,7 +537,6 @@ method rpcHandler*(g: GossipSub,
let let
msgId = msgIdResult.get msgId = msgIdResult.get
msgIdSalted = g.salt(msgId) msgIdSalted = g.salt(msgId)
topic = msg.topic
if g.addSeen(msgIdSalted): if g.addSeen(msgIdSalted):
trace "Dropping already-seen message", msgId = shortLog(msgId), peer trace "Dropping already-seen message", msgId = shortLog(msgId), peer

View File

@ -121,7 +121,7 @@ proc updateScores*(g: GossipSub) = # avoid async
var var
n_topics = 0 n_topics = 0
is_grafted = 0 is_grafted = 0
score = 0.0 scoreAcc = 0.0 # accumulates the peer score
# Per topic # Per topic
for topic, topicParams in g.topicParams: for topic, topicParams in g.topicParams:
@ -161,11 +161,12 @@ proc updateScores*(g: GossipSub) = # avoid async
trace "p3b", peer, p3b = info.meshFailurePenalty, topic, topicScore trace "p3b", peer, p3b = info.meshFailurePenalty, topic, topicScore
topicScore += info.invalidMessageDeliveries * info.invalidMessageDeliveries * topicParams.invalidMessageDeliveriesWeight topicScore += info.invalidMessageDeliveries * info.invalidMessageDeliveries * topicParams.invalidMessageDeliveriesWeight
trace "p4", p4 = info.invalidMessageDeliveries * info.invalidMessageDeliveries, topic, topicScore trace "p4", peer, p4 = info.invalidMessageDeliveries * info.invalidMessageDeliveries, topic, topicScore
trace "updated peer topic's scores", peer, topic, info, topicScore scoreAcc += topicScore * topicParams.topicWeight
score += topicScore * topicParams.topicWeight trace "updated peer topic's scores", peer, scoreAcc, topic, info, topicScore,
topicWeight = topicParams.topicWeight
# Score metrics # Score metrics
let agent = peer.getAgent() let agent = peer.getAgent()
@ -195,15 +196,19 @@ proc updateScores*(g: GossipSub) = # avoid async
# commit our changes, mgetOrPut does NOT work as wanted with value types (lent?) # commit our changes, mgetOrPut does NOT work as wanted with value types (lent?)
stats.topicInfos[topic] = info stats.topicInfos[topic] = info
score += peer.appScore * g.parameters.appSpecificWeight scoreAcc += peer.appScore * g.parameters.appSpecificWeight
trace "appScore", peer, scoreAcc, appScore = peer.appScore,
appSpecificWeight = g.parameters.appSpecificWeight
# The value of the parameter is the square of the counter and is mixed with a negative weight. # The value of the parameter is the square of the counter and is mixed with a negative weight.
score += peer.behaviourPenalty * peer.behaviourPenalty * g.parameters.behaviourPenaltyWeight scoreAcc += peer.behaviourPenalty * peer.behaviourPenalty * g.parameters.behaviourPenaltyWeight
trace "behaviourPenalty", peer, scoreAcc, behaviourPenalty = peer.behaviourPenalty,
behaviourPenaltyWeight = g.parameters.behaviourPenaltyWeight
let colocationFactor = g.colocationFactor(peer) let colocationFactor = g.colocationFactor(peer)
score += colocationFactor * g.parameters.ipColocationFactorWeight scoreAcc += colocationFactor * g.parameters.ipColocationFactorWeight
trace "colocationFactor", peer, scoreAcc, colocationFactor,
ipColocationFactorWeight = g.parameters.ipColocationFactorWeight
# Score metrics # Score metrics
let agent = peer.getAgent() let agent = peer.getAgent()
libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = [agent]) libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = [agent])
@ -215,7 +220,7 @@ proc updateScores*(g: GossipSub) = # avoid async
if peer.behaviourPenalty < g.parameters.decayToZero: if peer.behaviourPenalty < g.parameters.decayToZero:
peer.behaviourPenalty = 0 peer.behaviourPenalty = 0
peer.score = score peer.score = scoreAcc
# copy into stats the score to keep until expired # copy into stats the score to keep until expired
stats.score = peer.score stats.score = peer.score
@ -223,7 +228,7 @@ proc updateScores*(g: GossipSub) = # avoid async
stats.behaviourPenalty = peer.behaviourPenalty stats.behaviourPenalty = peer.behaviourPenalty
stats.expire = now + g.parameters.retainScore # refresh expiration stats.expire = now + g.parameters.retainScore # refresh expiration
trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted trace "updated (accumulated) peer's score", peer, peerScore = peer.score, n_topics, is_grafted
g.disconnectIfBadScorePeer(peer, stats.score) g.disconnectIfBadScorePeer(peer, stats.score)
libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent]) libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent])

View File

@ -315,7 +315,6 @@ suite "Circuit Relay V2":
await sleepAsync(chronos.timer.seconds(ttl + 1)) await sleepAsync(chronos.timer.seconds(ttl + 1))
expect(DialFailedError): expect(DialFailedError):
check: conn.atEof()
await conn.close() await conn.close()
await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec) conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec)

View File

@ -377,3 +377,24 @@ suite "Yamux":
expect LPStreamClosedError: discard await streamA.readLp(100) expect LPStreamClosedError: discard await streamA.readLp(100)
blocker.complete() blocker.complete()
await streamA.close() await streamA.close()
asyncTest "Peer must be able to read from stream after closing it for writing":
mSetup()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
try:
check (await conn.readLp(100)) == fromHex("1234")
except CancelledError, LPStreamError:
return
try:
await conn.writeLp(fromHex("5678"))
except CancelledError, LPStreamError:
return
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
await streamA.writeLp(fromHex("1234"))
await streamA.close()
check (await streamA.readLp(100)) == fromHex("5678")

View File

@ -11,6 +11,6 @@ COPY . nim-libp2p/
RUN \ RUN \
cd nim-libp2p && \ cd nim-libp2p && \
nim c --skipProjCfg --skipParentCfg --NimblePath:./nimbledeps/pkgs -p:nim-libp2p -d:chronicles_log_level=WARN --threads:off ./tests/transport-interop/main.nim nim c --skipProjCfg --skipParentCfg --NimblePath:./nimbledeps/pkgs -p:nim-libp2p -d:chronicles_log_level=WARN -d:chronicles_default_output_device=stderr --threads:off ./tests/transport-interop/main.nim
ENTRYPOINT ["/app/nim-libp2p/tests/transport-interop/main"] ENTRYPOINT ["/app/nim-libp2p/tests/transport-interop/main"]