From dfff39091bbf9aadbba17b3261dfd2c1d4dd9950 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 10 May 2023 21:31:18 +0200 Subject: [PATCH 1/8] introduce waitResponse wrapper initialize wait for response before sending request. This is needed in cases where the response arrives before moving to the next instruction, such as a directly connected test. Signed-off-by: Csaba Kiraly --- .../private/eth/p2p/discoveryv5/protocol.nim | 51 +++++++++++-------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/codexdht/private/eth/p2p/discoveryv5/protocol.nim b/codexdht/private/eth/p2p/discoveryv5/protocol.nim index c3f0672..4253059 100644 --- a/codexdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/codexdht/private/eth/p2p/discoveryv5/protocol.nim @@ -452,6 +452,11 @@ proc replaceNode(d: Protocol, n: Node) = # peers in the routing table. debug "Message request to bootstrap node failed", src=d.localNode, dst=n +proc waitResponse*[T: SomeMessage](d: Protocol, node: Node, msg: T): + Future[Option[Message]] = + let reqId = RequestId.init(d.rng[]) + result = d.waitMessage(node, reqId) + sendRequest(d, node, msg, reqId) proc waitMessage(d: Protocol, fromNode: Node, reqId: RequestId): Future[Option[Message]] = @@ -464,6 +469,12 @@ proc waitMessage(d: Protocol, fromNode: Node, reqId: RequestId): res.complete(none(Message)) d.awaitedMessages[key] = result +proc waitNodeResponses*[T: SomeMessage](d: Protocol, node: Node, msg: T): + Future[DiscResult[seq[SignedPeerRecord]]] = + let reqId = RequestId.init(d.rng[]) + result = d.waitNodes(node, reqId) + sendRequest(d, node, msg, reqId) + proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId): Future[DiscResult[seq[SignedPeerRecord]]] {.async.} = ## Wait for one or more nodes replies. @@ -492,23 +503,20 @@ proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId): discovery_message_requests_outgoing.inc(labelValues = ["no_response"]) return err("Nodes message not received in time") -proc sendRequest*[T: SomeMessage](d: Protocol, toId: NodeId, toAddr: Address, m: T): - RequestId = +proc sendRequest*[T: SomeMessage](d: Protocol, toId: NodeId, toAddr: Address, m: T, + reqId: RequestId) = let - reqId = RequestId.init(d.rng[]) message = encodeMessage(m, reqId) trace "Send message packet", dstId = toId, toAddr, kind = messageKind(T) discovery_message_requests_outgoing.inc() d.transport.sendMessage(toId, toAddr, message) - return reqId -proc sendRequest*[T: SomeMessage](d: Protocol, toNode: Node, m: T): - RequestId = +proc sendRequest*[T: SomeMessage](d: Protocol, toNode: Node, m: T, + reqId: RequestId) = doAssert(toNode.address.isSome()) let - reqId = RequestId.init(d.rng[]) message = encodeMessage(m, reqId) trace "Send message packet", dstId = toNode.id, @@ -516,16 +524,15 @@ proc sendRequest*[T: SomeMessage](d: Protocol, toNode: Node, m: T): discovery_message_requests_outgoing.inc() d.transport.sendMessage(toNode, message) - return reqId proc ping*(d: Protocol, toNode: Node): Future[DiscResult[PongMessage]] {.async.} = ## Send a discovery ping message. ## ## Returns the received pong message or an error. - let reqId = d.sendRequest(toNode, - PingMessage(sprSeq: d.localNode.record.seqNum)) - let resp = await d.waitMessage(toNode, reqId) + let + msg = PingMessage(sprSeq: d.localNode.record.seqNum) + resp = await d.waitResponse(toNode, msg) if resp.isSome(): if resp.get().kind == pong: @@ -546,8 +553,9 @@ proc findNode*(d: Protocol, toNode: Node, distances: seq[uint16]): ## ## Returns the received nodes or an error. ## Received SPRs are already validated and converted to `Node`. - let reqId = d.sendRequest(toNode, FindNodeMessage(distances: distances)) - let nodes = await d.waitNodes(toNode, reqId) + let + msg = FindNodeMessage(distances: distances) + nodes = await d.waitNodeResponses(toNode, msg) if nodes.isOk: let res = verifyNodesRecords(nodes.get(), toNode, FindNodeResultLimit, distances) @@ -564,8 +572,9 @@ proc findNodeFast*(d: Protocol, toNode: Node, target: NodeId): ## ## Returns the received nodes or an error. ## Received SPRs are already validated and converted to `Node`. - let reqId = d.sendRequest(toNode, FindNodeFastMessage(target: target)) - let nodes = await d.waitNodes(toNode, reqId) + let + msg = FindNodeFastMessage(target: target) + nodes = await d.waitNodeResponses(toNode, msg) if nodes.isOk: let res = verifyNodesRecords(nodes.get(), toNode, FindNodeResultLimit) @@ -581,9 +590,9 @@ proc talkReq*(d: Protocol, toNode: Node, protocol, request: seq[byte]): ## Send a discovery talkreq message. ## ## Returns the received talkresp message or an error. - let reqId = d.sendRequest(toNode, - TalkReqMessage(protocol: protocol, request: request)) - let resp = await d.waitMessage(toNode, reqId) + let + msg = TalkReqMessage(protocol: protocol, request: request) + resp = await d.waitResponse(toNode, msg) if resp.isSome(): if resp.get().kind == talkResp: @@ -707,7 +716,8 @@ proc addProvider*( res.add(d.localNode) for toNode in res: if toNode != d.localNode: - discard d.sendRequest(toNode, AddProviderMessage(cId: cId, prov: pr)) + let reqId = RequestId.init(d.rng[]) + d.sendRequest(toNode, AddProviderMessage(cId: cId, prov: pr), reqId) else: asyncSpawn d.addProviderLocal(cId, pr) @@ -720,8 +730,7 @@ proc sendGetProviders(d: Protocol, toNode: Node, trace "sendGetProviders", toNode, msg let - reqId = d.sendRequest(toNode, msg) - resp = await d.waitMessage(toNode, reqId) + resp = await d.waitResponse(toNode, msg) if resp.isSome(): if resp.get().kind == MessageKind.providers: From 6e61e02091c589ca9da3a2f7e5c071b1240e5244 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 10 May 2023 21:38:57 +0200 Subject: [PATCH 2/8] fixup: move sendRequest forward Signed-off-by: Csaba Kiraly --- .../private/eth/p2p/discoveryv5/protocol.nim | 44 +++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/codexdht/private/eth/p2p/discoveryv5/protocol.nim b/codexdht/private/eth/p2p/discoveryv5/protocol.nim index 4253059..ab6b4b6 100644 --- a/codexdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/codexdht/private/eth/p2p/discoveryv5/protocol.nim @@ -452,6 +452,28 @@ proc replaceNode(d: Protocol, n: Node) = # peers in the routing table. debug "Message request to bootstrap node failed", src=d.localNode, dst=n +proc sendRequest*[T: SomeMessage](d: Protocol, toId: NodeId, toAddr: Address, m: T, + reqId: RequestId) = + let + message = encodeMessage(m, reqId) + + trace "Send message packet", dstId = toId, toAddr, kind = messageKind(T) + discovery_message_requests_outgoing.inc() + + d.transport.sendMessage(toId, toAddr, message) + +proc sendRequest*[T: SomeMessage](d: Protocol, toNode: Node, m: T, + reqId: RequestId) = + doAssert(toNode.address.isSome()) + let + message = encodeMessage(m, reqId) + + trace "Send message packet", dstId = toNode.id, + address = toNode.address, kind = messageKind(T) + discovery_message_requests_outgoing.inc() + + d.transport.sendMessage(toNode, message) + proc waitResponse*[T: SomeMessage](d: Protocol, node: Node, msg: T): Future[Option[Message]] = let reqId = RequestId.init(d.rng[]) @@ -503,28 +525,6 @@ proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId): discovery_message_requests_outgoing.inc(labelValues = ["no_response"]) return err("Nodes message not received in time") -proc sendRequest*[T: SomeMessage](d: Protocol, toId: NodeId, toAddr: Address, m: T, - reqId: RequestId) = - let - message = encodeMessage(m, reqId) - - trace "Send message packet", dstId = toId, toAddr, kind = messageKind(T) - discovery_message_requests_outgoing.inc() - - d.transport.sendMessage(toId, toAddr, message) - -proc sendRequest*[T: SomeMessage](d: Protocol, toNode: Node, m: T, - reqId: RequestId) = - doAssert(toNode.address.isSome()) - let - message = encodeMessage(m, reqId) - - trace "Send message packet", dstId = toNode.id, - address = toNode.address, kind = messageKind(T) - discovery_message_requests_outgoing.inc() - - d.transport.sendMessage(toNode, message) - proc ping*(d: Protocol, toNode: Node): Future[DiscResult[PongMessage]] {.async.} = ## Send a discovery ping message. From 316464fc71e75c146f09f55e8c739c8a0fc02f51 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 17 May 2023 08:08:12 +0200 Subject: [PATCH 3/8] dht: waitMessage: expose timeout as parameter, keeping default defults to ResponseTimeout as before Signed-off-by: Csaba Kiraly --- codexdht/private/eth/p2p/discoveryv5/protocol.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/codexdht/private/eth/p2p/discoveryv5/protocol.nim b/codexdht/private/eth/p2p/discoveryv5/protocol.nim index ab6b4b6..36e8de4 100644 --- a/codexdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/codexdht/private/eth/p2p/discoveryv5/protocol.nim @@ -480,12 +480,12 @@ proc waitResponse*[T: SomeMessage](d: Protocol, node: Node, msg: T): result = d.waitMessage(node, reqId) sendRequest(d, node, msg, reqId) -proc waitMessage(d: Protocol, fromNode: Node, reqId: RequestId): +proc waitMessage(d: Protocol, fromNode: Node, reqId: RequestId, timeout = ResponseTimeout): Future[Option[Message]] = result = newFuture[Option[Message]]("waitMessage") let res = result let key = (fromNode.id, reqId) - sleepAsync(ResponseTimeout).addCallback() do(data: pointer): + sleepAsync(timeout).addCallback() do(data: pointer): d.awaitedMessages.del(key) if not res.finished: res.complete(none(Message)) From f766cb39b1a44a909463e96e018a695e5638cdb8 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 2 Jun 2023 19:03:57 +0200 Subject: [PATCH 4/8] encoding: introducing type cipher=aes128 Introducing the cipher type to ease changing cipher. No functional change Signed-off-by: Csaba Kiraly --- codexdht/private/eth/p2p/discoveryv5/encoding.nim | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/codexdht/private/eth/p2p/discoveryv5/encoding.nim b/codexdht/private/eth/p2p/discoveryv5/encoding.nim index 2520b28..7fd18e7 100644 --- a/codexdht/private/eth/p2p/discoveryv5/encoding.nim +++ b/codexdht/private/eth/p2p/discoveryv5/encoding.nim @@ -40,6 +40,9 @@ declareCounter discovery_session_decrypt_failures, "Session decrypt failures" logScope: topics = "discv5" +type + cipher = aes128 + const version: uint16 = 1 idSignatureText = "discovery v5 identity proof" @@ -162,7 +165,7 @@ proc deriveKeys*(n1, n2: NodeId, priv: PrivateKey, pub: PublicKey, ok secrets proc encryptGCM*(key: AesKey, nonce, pt, authData: openArray[byte]): seq[byte] = - var ectx: GCM[aes128] + var ectx: GCM[cipher] ectx.init(key, nonce, authData) result = newSeq[byte](pt.len + gcmTagSize) ectx.encrypt(pt, result) @@ -175,7 +178,7 @@ proc decryptGCM*(key: AesKey, nonce, ct, authData: openArray[byte]): debug "cipher is missing tag", len = ct.len return - var dctx: GCM[aes128] + var dctx: GCM[cipher] dctx.init(key, nonce, authData) var res = newSeq[byte](ct.len - gcmTagSize) var tag: array[gcmTagSize, byte] @@ -189,7 +192,7 @@ proc decryptGCM*(key: AesKey, nonce, ct, authData: openArray[byte]): return some(res) proc encryptHeader*(id: NodeId, iv, header: openArray[byte]): seq[byte] = - var ectx: CTR[aes128] + var ectx: CTR[cipher] ectx.init(id.toByteArrayBE().toOpenArray(0, 15), iv) result = newSeq[byte](header.len) ectx.encrypt(header, result) @@ -374,7 +377,7 @@ proc decodeHeader*(id: NodeId, iv, maskedHeader: openArray[byte]): DecodeResult[(StaticHeader, seq[byte])] = # No need to check staticHeader size as that is included in minimum packet # size check in decodePacket - var ectx: CTR[aes128] + var ectx: CTR[cipher] ectx.init(id.toByteArrayBE().toOpenArray(0, aesKeySize - 1), iv) # Decrypt static-header part of the header var staticHeader = newSeq[byte](staticHeaderSize) From d8160ff0f73d13026c4c31086562d9745a7be6cc Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 15 Jun 2023 12:09:34 +0200 Subject: [PATCH 5/8] add logging helper for Protocol Signed-off-by: Csaba Kiraly --- codexdht/private/eth/p2p/discoveryv5/protocol.nim | 3 +++ 1 file changed, 3 insertions(+) diff --git a/codexdht/private/eth/p2p/discoveryv5/protocol.nim b/codexdht/private/eth/p2p/discoveryv5/protocol.nim index 36e8de4..06bfec9 100644 --- a/codexdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/codexdht/private/eth/p2p/discoveryv5/protocol.nim @@ -182,6 +182,9 @@ type DiscResult*[T] = Result[T, cstring] +func `$`*(p: Protocol): string = + $p.localNode.id + const defaultDiscoveryConfig* = DiscoveryConfig( tableIpLimits: DefaultTableIpLimits, From f299c23e2ed780ac1cb477a4bc57574ef1477c12 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 10 Jul 2023 12:42:28 +0200 Subject: [PATCH 6/8] remove lookupWorkerFast duplicate code Signed-off-by: Csaba Kiraly --- .../private/eth/p2p/discoveryv5/protocol.nim | 32 +++++++------------ 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/codexdht/private/eth/p2p/discoveryv5/protocol.nim b/codexdht/private/eth/p2p/discoveryv5/protocol.nim index 06bfec9..a03122c 100644 --- a/codexdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/codexdht/private/eth/p2p/discoveryv5/protocol.nim @@ -622,25 +622,18 @@ proc lookupDistances*(target, dest: NodeId): seq[uint16] = result.add(td - uint16(i)) inc i -proc lookupWorker(d: Protocol, destNode: Node, target: NodeId): +proc lookupWorker(d: Protocol, destNode: Node, target: NodeId, fast: bool): Future[seq[Node]] {.async.} = - let dists = lookupDistances(target, destNode.id) - # Instead of doing max `LookupRequestLimit` findNode requests, make use - # of the discv5.1 functionality to request nodes for multiple distances. - let r = await d.findNode(destNode, dists) - if r.isOk: - result.add(r[]) + let r = + if fast: + await d.findNodeFast(destNode, target) + else: + # Instead of doing max `LookupRequestLimit` findNode requests, make use + # of the discv5.1 functionality to request nodes for multiple distances. + let dists = lookupDistances(target, destNode.id) + await d.findNode(destNode, dists) - # Attempt to add all nodes discovered - for n in result: - discard d.addNode(n) - -proc lookupWorkerFast(d: Protocol, destNode: Node, target: NodeId): - Future[seq[Node]] {.async.} = - ## use terget NodeId based find_node - - let r = await d.findNodeFast(destNode, target) if r.isOk: result.add(r[]) @@ -671,10 +664,7 @@ proc lookup*(d: Protocol, target: NodeId, fast: bool = false): Future[seq[Node]] while i < closestNodes.len and pendingQueries.len < Alpha: let n = closestNodes[i] if not asked.containsOrIncl(n.id): - if fast: - pendingQueries.add(d.lookupWorkerFast(n, target)) - else: - pendingQueries.add(d.lookupWorker(n, target)) + pendingQueries.add(d.lookupWorker(n, target, fast)) inc i trace "discv5 pending queries", total = pendingQueries.len @@ -835,7 +825,7 @@ proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]] while i < min(queryBuffer.len, k) and pendingQueries.len < Alpha: let n = queryBuffer[i] if not asked.containsOrIncl(n.id): - pendingQueries.add(d.lookupWorker(n, target)) + pendingQueries.add(d.lookupWorker(n, target, false)) inc i trace "discv5 pending queries", total = pendingQueries.len From 148b10908dacd28acb700b377a63efe8612a52f1 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Sun, 3 Sep 2023 13:51:46 +0200 Subject: [PATCH 7/8] trace log: do not log binary encoding Even at trace level this feels too much. Signed-off-by: Csaba Kiraly --- codexdht/private/eth/p2p/discoveryv5/messages_encoding.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codexdht/private/eth/p2p/discoveryv5/messages_encoding.nim b/codexdht/private/eth/p2p/discoveryv5/messages_encoding.nim index 707c325..09f690d 100644 --- a/codexdht/private/eth/p2p/discoveryv5/messages_encoding.nim +++ b/codexdht/private/eth/p2p/discoveryv5/messages_encoding.nim @@ -325,7 +325,7 @@ proc encodeMessage*[T: SomeMessage](p: T, reqId: RequestId): seq[byte] = pb.write(2, encoded) pb.finish() result.add(pb.buffer) - trace "Encoded protobuf message", typ = $T, encoded + trace "Encoded protobuf message", typ = $T proc decodeMessage*(body: openArray[byte]): DecodeResult[Message] = ## Decodes to the specific `Message` type. From 4c9c92232ba30b5c334ca81bf614a1b85cb18b89 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 4 Sep 2023 14:37:53 +0200 Subject: [PATCH 8/8] remove unused sendRequest call Signed-off-by: Csaba Kiraly --- codexdht/private/eth/p2p/discoveryv5/protocol.nim | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/codexdht/private/eth/p2p/discoveryv5/protocol.nim b/codexdht/private/eth/p2p/discoveryv5/protocol.nim index a03122c..ec2f6d0 100644 --- a/codexdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/codexdht/private/eth/p2p/discoveryv5/protocol.nim @@ -455,16 +455,6 @@ proc replaceNode(d: Protocol, n: Node) = # peers in the routing table. debug "Message request to bootstrap node failed", src=d.localNode, dst=n -proc sendRequest*[T: SomeMessage](d: Protocol, toId: NodeId, toAddr: Address, m: T, - reqId: RequestId) = - let - message = encodeMessage(m, reqId) - - trace "Send message packet", dstId = toId, toAddr, kind = messageKind(T) - discovery_message_requests_outgoing.inc() - - d.transport.sendMessage(toId, toAddr, message) - proc sendRequest*[T: SomeMessage](d: Protocol, toNode: Node, m: T, reqId: RequestId) = doAssert(toNode.address.isSome())