commit cfa97f49959db74ddcb4f54bfad17d6b0ea84d6b Author: Csaba Kiraly Date: Mon Feb 14 01:51:28 2022 +0100 initial commit based on nim-eth@779d767b024175a51cf74c79ec7513301ebe2f46 diff --git a/LICENSE-APACHEv2 b/LICENSE-APACHEv2 new file mode 100644 index 0000000..19ab4c8 --- /dev/null +++ b/LICENSE-APACHEv2 @@ -0,0 +1,204 @@ +Copyright (c) 2018 Status Research & Development GmbH +----------------------------------------------------- + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2018 Status Research & Development GmbH + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/LICENSE-MIT b/LICENSE-MIT new file mode 100644 index 0000000..2590e8c --- /dev/null +++ b/LICENSE-MIT @@ -0,0 +1,24 @@ +Copyright (c) 2018 Status Research & Development GmbH +----------------------------------------------------- + +The MIT License (MIT) + +Copyright (c) 2018 Status Research & Development GmbH + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/eth/p2p/discoveryv5/encoding.nim b/eth/p2p/discoveryv5/encoding.nim new file mode 100644 index 0000000..ef22522 --- /dev/null +++ b/eth/p2p/discoveryv5/encoding.nim @@ -0,0 +1,623 @@ +# nim-eth - Node Discovery Protocol v5 +# Copyright (c) 2020-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. +# +## Discovery v5 packet encoding as specified at +## https://github.com/ethereum/devp2p/blob/master/discv5/discv5-wire.md#packet-encoding +## And handshake/sessions as specified at +## https://github.com/ethereum/devp2p/blob/master/discv5/discv5-theory.md#sessions +## + +{.push raises: [Defect].} + +import + std/[tables, options, hashes, net], + nimcrypto, stint, chronicles, bearssl, stew/[results, byteutils], metrics, + ".."/../[rlp, keys], + "."/[messages, node, enr, hkdf, sessions] + +from stew/objects import checkedEnumAssign + +export keys + +declareCounter discovery_session_lru_cache_hits, "Session LRU cache hits" +declareCounter discovery_session_lru_cache_misses, "Session LRU cache misses" +declareCounter discovery_session_decrypt_failures, "Session decrypt failures" + +logScope: + topics = "discv5" + +const + version: uint16 = 1 + idSignatureText = "discovery v5 identity proof" + keyAgreementPrefix = "discovery v5 key agreement" + protocolIdStr = "discv5" + protocolId = toBytes(protocolIdStr) + gcmNonceSize* = 12 + idNonceSize* = 16 + gcmTagSize* = 16 + ivSize* = 16 + staticHeaderSize = protocolId.len + 2 + 2 + 1 + gcmNonceSize + authdataHeadSize = sizeof(NodeId) + 1 + 1 + whoareyouSize = ivSize + staticHeaderSize + idNonceSize + 8 + +type + AESGCMNonce* = array[gcmNonceSize, byte] + IdNonce* = array[idNonceSize, byte] + + WhoareyouData* = object + requestNonce*: AESGCMNonce + idNonce*: IdNonce # TODO: This data is also available in challengeData + recordSeq*: uint64 + challengeData*: seq[byte] + + Challenge* = object + whoareyouData*: WhoareyouData + pubkey*: Option[PublicKey] + + StaticHeader* = object + flag: Flag + nonce: AESGCMNonce + authdataSize: uint16 + + HandshakeSecrets* = object + initiatorKey*: AesKey + recipientKey*: AesKey + + Flag* = enum + OrdinaryMessage = 0x00 + Whoareyou = 0x01 + HandshakeMessage = 0x02 + + Packet* = object + case flag*: Flag + of OrdinaryMessage: + messageOpt*: Option[Message] + requestNonce*: AESGCMNonce + srcId*: NodeId + of Whoareyou: + whoareyou*: WhoareyouData + of HandshakeMessage: + message*: Message # In a handshake we expect to always be able to decrypt + # TODO record or node immediately? + node*: Option[Node] + srcIdHs*: NodeId + + HandshakeKey* = object + nodeId*: NodeId + address*: Address + + Codec* = object + localNode*: Node + privKey*: PrivateKey + handshakes*: Table[HandshakeKey, Challenge] + sessions*: Sessions + + DecodeResult*[T] = Result[T, cstring] + +func `==`*(a, b: HandshakeKey): bool = + (a.nodeId == b.nodeId) and (a.address == b.address) + +func hash*(key: HandshakeKey): Hash = + result = key.nodeId.hash !& key.address.hash + result = !$result + +proc idHash(challengeData, ephkey: openArray[byte], nodeId: NodeId): + MDigest[256] = + var ctx: sha256 + ctx.init() + ctx.update(idSignatureText) + ctx.update(challengeData) + ctx.update(ephkey) + ctx.update(nodeId.toByteArrayBE()) + result = ctx.finish() + ctx.clear() + +proc createIdSignature*(privKey: PrivateKey, challengeData, + ephKey: openArray[byte], nodeId: NodeId): SignatureNR = + signNR(privKey, SkMessage(idHash(challengeData, ephKey, nodeId).data)) + +proc verifyIdSignature*(sig: SignatureNR, challengeData, ephKey: openArray[byte], + nodeId: NodeId, pubkey: PublicKey): bool = + let h = idHash(challengeData, ephKey, nodeId) + verify(sig, SkMessage(h.data), pubkey) + +proc deriveKeys*(n1, n2: NodeId, priv: PrivateKey, pub: PublicKey, + challengeData: openArray[byte]): HandshakeSecrets = + let eph = ecdhRawFull(priv, pub) + + var info = newSeqOfCap[byte](keyAgreementPrefix.len + 32 * 2) + for i, c in keyAgreementPrefix: info.add(byte(c)) + info.add(n1.toByteArrayBE()) + info.add(n2.toByteArrayBE()) + + var secrets: HandshakeSecrets + static: assert(sizeof(secrets) == aesKeySize * 2) + var res = cast[ptr UncheckedArray[byte]](addr secrets) + + hkdf(sha256, eph.data, challengeData, info, + toOpenArray(res, 0, sizeof(secrets) - 1)) + secrets + +proc encryptGCM*(key: AesKey, nonce, pt, authData: openArray[byte]): seq[byte] = + var ectx: GCM[aes128] + ectx.init(key, nonce, authData) + result = newSeq[byte](pt.len + gcmTagSize) + ectx.encrypt(pt, result) + ectx.getTag(result.toOpenArray(pt.len, result.high)) + ectx.clear() + +proc decryptGCM*(key: AesKey, nonce, ct, authData: openArray[byte]): + Option[seq[byte]] = + if ct.len <= gcmTagSize: + debug "cipher is missing tag", len = ct.len + return + + var dctx: GCM[aes128] + dctx.init(key, nonce, authData) + var res = newSeq[byte](ct.len - gcmTagSize) + var tag: array[gcmTagSize, byte] + dctx.decrypt(ct.toOpenArray(0, ct.high - gcmTagSize), res) + dctx.getTag(tag) + dctx.clear() + + if tag != ct.toOpenArray(ct.len - gcmTagSize, ct.high): + return + + return some(res) + +proc encryptHeader*(id: NodeId, iv, header: openArray[byte]): seq[byte] = + var ectx: CTR[aes128] + ectx.init(id.toByteArrayBE().toOpenArray(0, 15), iv) + result = newSeq[byte](header.len) + ectx.encrypt(header, result) + ectx.clear() + +proc hasHandshake*(c: Codec, key: HandshakeKey): bool = + c.handshakes.hasKey(key) + +proc encodeStaticHeader*(flag: Flag, nonce: AESGCMNonce, authSize: int): + seq[byte] = + result.add(protocolId) + result.add(version.toBytesBE()) + result.add(byte(flag)) + result.add(nonce) + # TODO: assert on authSize of > 2^16? + result.add((uint16(authSize)).toBytesBE()) + +proc encodeMessagePacket*(rng: var BrHmacDrbgContext, c: var Codec, + toId: NodeId, toAddr: Address, message: openArray[byte]): + (seq[byte], AESGCMNonce) = + var nonce: AESGCMNonce + brHmacDrbgGenerate(rng, nonce) # Random AESGCM nonce + var iv: array[ivSize, byte] + brHmacDrbgGenerate(rng, iv) # Random IV + + # static-header + let authdata = c.localNode.id.toByteArrayBE() + let staticHeader = encodeStaticHeader(Flag.OrdinaryMessage, nonce, + authdata.len()) + # header = static-header || authdata + var header: seq[byte] + header.add(staticHeader) + header.add(authdata) + + # message + var messageEncrypted: seq[byte] + var initiatorKey, recipientKey: AesKey + if c.sessions.load(toId, toAddr, recipientKey, initiatorKey): + messageEncrypted = encryptGCM(initiatorKey, nonce, message, @iv & header) + discovery_session_lru_cache_hits.inc() + else: + # We might not have the node's keys if the handshake hasn't been performed + # yet. That's fine, we send a random-packet and we will be responded with + # a WHOAREYOU packet. + # Select 20 bytes of random data, which is the smallest possible ping + # message. 16 bytes for the gcm tag and 4 bytes for ping with requestId of + # 1 byte (e.g "01c20101"). Could increase to 27 for 8 bytes requestId in + # case this must not look like a random packet. + var randomData: array[gcmTagSize + 4, byte] + brHmacDrbgGenerate(rng, randomData) + messageEncrypted.add(randomData) + discovery_session_lru_cache_misses.inc() + + let maskedHeader = encryptHeader(toId, iv, header) + + var packet: seq[byte] + packet.add(iv) + packet.add(maskedHeader) + packet.add(messageEncrypted) + + return (packet, nonce) + +proc encodeWhoareyouPacket*(rng: var BrHmacDrbgContext, c: var Codec, + toId: NodeId, toAddr: Address, requestNonce: AESGCMNonce, recordSeq: uint64, + pubkey: Option[PublicKey]): seq[byte] = + var idNonce: IdNonce + brHmacDrbgGenerate(rng, idNonce) + + # authdata + var authdata: seq[byte] + authdata.add(idNonce) + authdata.add(recordSeq.toBytesBE) + + # static-header + let staticHeader = encodeStaticHeader(Flag.Whoareyou, requestNonce, + authdata.len()) + + # header = static-header || authdata + var header: seq[byte] + header.add(staticHeader) + header.add(authdata) + + var iv: array[ivSize, byte] + brHmacDrbgGenerate(rng, iv) # Random IV + + let maskedHeader = encryptHeader(toId, iv, header) + + var packet: seq[byte] + packet.add(iv) + packet.add(maskedHeader) + + let + whoareyouData = WhoareyouData( + requestNonce: requestNonce, + idNonce: idNonce, + recordSeq: recordSeq, + challengeData: @iv & header) + challenge = Challenge(whoareyouData: whoareyouData, pubkey: pubkey) + key = HandshakeKey(nodeId: toId, address: toAddr) + + c.handshakes[key] = challenge + + return packet + +proc encodeHandshakePacket*(rng: var BrHmacDrbgContext, c: var Codec, + toId: NodeId, toAddr: Address, message: openArray[byte], + whoareyouData: WhoareyouData, pubkey: PublicKey): seq[byte] = + var header: seq[byte] + var nonce: AESGCMNonce + brHmacDrbgGenerate(rng, nonce) + var iv: array[ivSize, byte] + brHmacDrbgGenerate(rng, iv) # Random IV + + var authdata: seq[byte] + var authdataHead: seq[byte] + + authdataHead.add(c.localNode.id.toByteArrayBE()) + authdataHead.add(64'u8) # sig-size: 64 + authdataHead.add(33'u8) # eph-key-size: 33 + authdata.add(authdataHead) + + let ephKeys = KeyPair.random(rng) + let signature = createIdSignature(c.privKey, whoareyouData.challengeData, + ephKeys.pubkey.toRawCompressed(), toId) + + authdata.add(signature.toRaw()) + # compressed pub key format (33 bytes) + authdata.add(ephKeys.pubkey.toRawCompressed()) + + # Add ENR of sequence number is newer + if whoareyouData.recordSeq < c.localNode.record.seqNum: + authdata.add(encode(c.localNode.record)) + + let secrets = deriveKeys(c.localNode.id, toId, ephKeys.seckey, pubkey, + whoareyouData.challengeData) + + # Header + let staticHeader = encodeStaticHeader(Flag.HandshakeMessage, nonce, + authdata.len()) + + header.add(staticHeader) + header.add(authdata) + + c.sessions.store(toId, toAddr, secrets.recipientKey, secrets.initiatorKey) + let messageEncrypted = encryptGCM(secrets.initiatorKey, nonce, message, + @iv & header) + + let maskedHeader = encryptHeader(toId, iv, header) + + var packet: seq[byte] + packet.add(iv) + packet.add(maskedHeader) + packet.add(messageEncrypted) + + return packet + +proc decodeHeader*(id: NodeId, iv, maskedHeader: openArray[byte]): + DecodeResult[(StaticHeader, seq[byte])] = + # No need to check staticHeader size as that is included in minimum packet + # size check in decodePacket + var ectx: CTR[aes128] + ectx.init(id.toByteArrayBE().toOpenArray(0, aesKeySize - 1), iv) + # Decrypt static-header part of the header + var staticHeader = newSeq[byte](staticHeaderSize) + ectx.decrypt(maskedHeader.toOpenArray(0, staticHeaderSize - 1), staticHeader) + + # Check fields of the static-header + if staticHeader.toOpenArray(0, protocolId.len - 1) != protocolId: + return err("Invalid protocol id") + + if uint16.fromBytesBE(staticHeader.toOpenArray(6, 7)) != version: + return err("Invalid protocol version") + + var flag: Flag + if not checkedEnumAssign(flag, staticHeader[8]): + return err("Invalid packet flag") + + var nonce: AESGCMNonce + copyMem(addr nonce[0], unsafeAddr staticHeader[9], gcmNonceSize) + + let authdataSize = uint16.fromBytesBE(staticHeader.toOpenArray(21, + staticHeader.high)) + + # Input should have minimum size of staticHeader + provided authdata size + # Can be larger as there can come a message after. + if maskedHeader.len < staticHeaderSize + int(authdataSize): + return err("Authdata is smaller than authdata-size indicates") + + var authdata = newSeq[byte](int(authdataSize)) + ectx.decrypt(maskedHeader.toOpenArray(staticHeaderSize, + staticHeaderSize + int(authdataSize) - 1), authdata) + ectx.clear() + + ok((StaticHeader(authdataSize: authdataSize, flag: flag, nonce: nonce), + staticHeader & authdata)) + +proc decodeMessage*(body: openArray[byte]): DecodeResult[Message] = + ## Decodes to the specific `Message` type. + if body.len < 1: + return err("No message data") + + var kind: MessageKind + if not checkedEnumAssign(kind, body[0]): + return err("Invalid message type") + + var message = Message(kind: kind) + var rlp = rlpFromBytes(body.toOpenArray(1, body.high)) + if rlp.enterList: + try: + message.reqId = rlp.read(RequestId) + except RlpError, ValueError: + return err("Invalid request-id") + + proc decode[T](rlp: var Rlp, v: var T) + {.nimcall, raises:[RlpError, ValueError, Defect].} = + for k, v in v.fieldPairs: + v = rlp.read(typeof(v)) + + try: + case kind + of unused: return err("Invalid message type") + of ping: rlp.decode(message.ping) + of pong: rlp.decode(message.pong) + of findNode: rlp.decode(message.findNode) + of nodes: rlp.decode(message.nodes) + of talkReq: rlp.decode(message.talkReq) + of talkResp: rlp.decode(message.talkResp) + of regTopic, ticket, regConfirmation, topicQuery: + # We just pass the empty type of this message without attempting to + # decode, so that the protocol knows what was received. + # But we ignore the message as per specification as "the content and + # semantics of this message are not final". + discard + except RlpError, ValueError: + return err("Invalid message encoding") + + ok(message) + else: + err("Invalid message encoding: no rlp list") + +proc decodeMessagePacket(c: var Codec, fromAddr: Address, nonce: AESGCMNonce, + iv, header, ct: openArray[byte]): DecodeResult[Packet] = + # We now know the exact size that the header should be + if header.len != staticHeaderSize + sizeof(NodeId): + return err("Invalid header length for ordinary message packet") + + # Need to have at minimum the gcm tag size for the message. + if ct.len < gcmTagSize: + return err("Invalid message length for ordinary message packet") + + let srcId = NodeId.fromBytesBE(header.toOpenArray(staticHeaderSize, + header.high)) + + var initiatorKey, recipientKey: AesKey + if not c.sessions.load(srcId, fromAddr, recipientKey, initiatorKey): + # Don't consider this an error, simply haven't done a handshake yet or + # the session got removed. + trace "Decrypting failed (no keys)" + discovery_session_lru_cache_misses.inc() + return ok(Packet(flag: Flag.OrdinaryMessage, requestNonce: nonce, + srcId: srcId)) + + discovery_session_lru_cache_hits.inc() + + let pt = decryptGCM(recipientKey, nonce, ct, @iv & @header) + if pt.isNone(): + # Don't consider this an error, the session got probably removed at the + # peer's side and a random message is send. + trace "Decrypting failed (invalid keys)" + c.sessions.del(srcId, fromAddr) + discovery_session_decrypt_failures.inc() + return ok(Packet(flag: Flag.OrdinaryMessage, requestNonce: nonce, + srcId: srcId)) + + let message = ? decodeMessage(pt.get()) + + return ok(Packet(flag: Flag.OrdinaryMessage, + messageOpt: some(message), requestNonce: nonce, srcId: srcId)) + +proc decodeWhoareyouPacket(c: var Codec, nonce: AESGCMNonce, + iv, header, ct: openArray[byte]): DecodeResult[Packet] = + # TODO improve this + let authdata = header[staticHeaderSize..header.high()] + # We now know the exact size that the authdata should be + if authdata.len != idNonceSize + sizeof(uint64): + return err("Invalid header length for whoareyou packet") + + # The `message` part of WHOAREYOU packets is always empty. + if ct.len != 0: + return err("Invalid message length for whoareyou packet") + + var idNonce: IdNonce + copyMem(addr idNonce[0], unsafeAddr authdata[0], idNonceSize) + let whoareyou = WhoareyouData(requestNonce: nonce, idNonce: idNonce, + recordSeq: uint64.fromBytesBE( + authdata.toOpenArray(idNonceSize, authdata.high)), + challengeData: @iv & @header) + + return ok(Packet(flag: Flag.Whoareyou, whoareyou: whoareyou)) + +proc decodeHandshakePacket(c: var Codec, fromAddr: Address, nonce: AESGCMNonce, + iv, header, ct: openArray[byte]): DecodeResult[Packet] = + # Checking if there is enough data to decode authdata-head + if header.len <= staticHeaderSize + authdataHeadSize: + return err("Invalid header for handshake message packet: no authdata-head") + + # Need to have at minimum the gcm tag size for the message. + # TODO: And actually, as we should be able to decrypt it, it should also be + # a valid message and thus we could increase here to the size of the smallest + # message possible. + if ct.len < gcmTagSize: + return err("Invalid message length for handshake message packet") + + let + authdata = header[staticHeaderSize..header.high()] + srcId = NodeId.fromBytesBE(authdata.toOpenArray(0, 31)) + sigSize = uint8(authdata[32]) + ephKeySize = uint8(authdata[33]) + + # If smaller, as it can be equal and bigger (in case it holds an enr) + if header.len < staticHeaderSize + authdataHeadSize + int(sigSize) + int(ephKeySize): + return err("Invalid header for handshake message packet") + + let key = HandshakeKey(nodeId: srcId, address: fromAddr) + var challenge: Challenge + if not c.handshakes.pop(key, challenge): + return err("No challenge found: timed out or unsolicited packet") + + # This should be the compressed public key. But as we use the provided + # ephKeySize, it should also work with full sized key. However, the idNonce + # signature verification will fail. + let + ephKeyPos = authdataHeadSize + int(sigSize) + ephKeyRaw = authdata[ephKeyPos.. recordPos: + # There is possibly an ENR still + try: + # Signature check of record happens in decode. + record = some(rlp.decode(authdata.toOpenArray(recordPos, authdata.high), + enr.Record)) + except RlpError, ValueError: + return err("Invalid encoded ENR") + + var pubkey: PublicKey + var newNode: Option[Node] + # TODO: Shall we return Node or Record? Record makes more sense, but we do + # need the pubkey and the nodeid + if record.isSome(): + # Node returned might not have an address or not a valid address. + let node = ? newNode(record.get()) + if node.id != srcId: + return err("Invalid node id: does not match node id of ENR") + + # Note: Not checking if the record seqNum is higher than the one we might + # have stored as it comes from this node directly. + pubkey = node.pubkey + newNode = some(node) + else: + # TODO: Hmm, should we still verify node id of the ENR of this node? + if challenge.pubkey.isSome(): + pubkey = challenge.pubkey.get() + else: + # We should have received a Record in this case. + return err("Missing ENR in handshake packet") + + # Verify the id-signature + let sig = ? SignatureNR.fromRaw( + authdata.toOpenArray(authdataHeadSize, authdataHeadSize + int(sigSize) - 1)) + if not verifyIdSignature(sig, challenge.whoareyouData.challengeData, + ephKeyRaw, c.localNode.id, pubkey): + return err("Invalid id-signature") + + # Do the key derivation step only after id-signature is verified as this is + # costly. + var secrets = deriveKeys(srcId, c.localNode.id, c.privKey, + ephKey, challenge.whoareyouData.challengeData) + + swap(secrets.recipientKey, secrets.initiatorKey) + + let pt = decryptGCM(secrets.recipientKey, nonce, ct, @iv & @header) + if pt.isNone(): + c.sessions.del(srcId, fromAddr) + # Differently from an ordinary message, this is seen as an error as the + # secrets just got negotiated in the handshake and thus decryption should + # always work. We do not send a new Whoareyou on these as it probably means + # there is a compatiblity issue and we might loop forever in failed + # handshakes with this peer. + return err("Decryption of message failed in handshake packet") + + let message = ? decodeMessage(pt.get()) + + # Only store the session secrets in case decryption was successful and also + # in case the message can get decoded. + c.sessions.store(srcId, fromAddr, secrets.recipientKey, secrets.initiatorKey) + + return ok(Packet(flag: Flag.HandshakeMessage, message: message, + srcIdHs: srcId, node: newNode)) + +proc decodePacket*(c: var Codec, fromAddr: Address, input: openArray[byte]): + DecodeResult[Packet] = + ## Decode a packet. This can be a regular packet or a packet in response to a + ## WHOAREYOU packet. In case of the latter a `newNode` might be provided. + # Smallest packet is Whoareyou packet so that is the minimum size + if input.len() < whoareyouSize: + return err("Packet size too short") + + # TODO: Just pass in the full input? Makes more sense perhaps. + let (staticHeader, header) = ? decodeHeader(c.localNode.id, + input.toOpenArray(0, ivSize - 1), # IV + # Don't know the size yet of the full header, so we pass all. + input.toOpenArray(ivSize, input.high)) + + case staticHeader.flag + of OrdinaryMessage: + return decodeMessagePacket(c, fromAddr, staticHeader.nonce, + input.toOpenArray(0, ivSize - 1), header, + input.toOpenArray(ivSize + header.len, input.high)) + + of Whoareyou: + return decodeWhoareyouPacket(c, staticHeader.nonce, + input.toOpenArray(0, ivSize - 1), header, + input.toOpenArray(ivSize + header.len, input.high)) + + of HandshakeMessage: + return decodeHandshakePacket(c, fromAddr, staticHeader.nonce, + input.toOpenArray(0, ivSize - 1), header, + input.toOpenArray(ivSize + header.len, input.high)) + +proc init*(T: type RequestId, rng: var BrHmacDrbgContext): T = + var reqId = RequestId(id: newSeq[byte](8)) # RequestId must be <= 8 bytes + brHmacDrbgGenerate(rng, reqId.id) + reqId + +proc numFields(T: typedesc): int = + for k, v in fieldPairs(default(T)): inc result + +proc encodeMessage*[T: SomeMessage](p: T, reqId: RequestId): seq[byte] = + result = newSeqOfCap[byte](64) + result.add(messageKind(T).ord) + + const sz = numFields(T) + var writer = initRlpList(sz + 1) + writer.append(reqId) + for k, v in fieldPairs(p): + writer.append(v) + result.add(writer.finish()) diff --git a/eth/p2p/discoveryv5/enr.nim b/eth/p2p/discoveryv5/enr.nim new file mode 100644 index 0000000..acb50fe --- /dev/null +++ b/eth/p2p/discoveryv5/enr.nim @@ -0,0 +1,528 @@ +# nim-eth - Node Discovery Protocol v5 +# Copyright (c) 2020-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. +# +## ENR implementation according to specification in EIP-778: +## https://github.com/ethereum/EIPs/blob/master/EIPS/eip-778.md + +{.push raises: [Defect].} + +import + std/[strutils, macros, algorithm, options], + stew/shims/net, stew/[base64, results], nimcrypto, + ".."/../[rlp, keys] + +export options, results, keys + +const + maxEnrSize = 300 ## Maximum size of an encoded node record, in bytes. + minRlpListLen = 4 ## Minimum node record RLP list has: signature, seqId, + ## "id" key and value. + +type + FieldPair* = (string, Field) + + Record* = object + seqNum*: uint64 + # signature: seq[byte] + raw*: seq[byte] # RLP encoded record + pairs: seq[FieldPair] # sorted list of all key/value pairs + + EnrUri* = distinct string + + TypedRecord* = object + id*: string + secp256k1*: Option[array[33, byte]] + ip*: Option[array[4, byte]] + ip6*: Option[array[16, byte]] + tcp*: Option[int] + udp*: Option[int] + tcp6*: Option[int] + udp6*: Option[int] + + FieldKind = enum + kString, + kNum, + kBytes, + kList + + Field = object + case kind: FieldKind + of kString: + str: string + of kNum: + num: BiggestUInt + of kBytes: + bytes: seq[byte] + of kList: + listRaw: seq[byte] ## Differently from the other kinds, this is is stored + ## as raw (encoded) RLP data, and thus treated as such further on. + + EnrResult*[T] = Result[T, cstring] + +template toField[T](v: T): Field = + when T is string: + Field(kind: kString, str: v) + elif T is array: + Field(kind: kBytes, bytes: @v) + elif T is seq[byte]: + Field(kind: kBytes, bytes: v) + elif T is SomeUnsignedInt: + Field(kind: kNum, num: BiggestUInt(v)) + elif T is object|tuple: + Field(kind: kList, listRaw: rlp.encode(v)) + else: + {.error: "Unsupported field type".} + +proc `==`(a, b: Field): bool = + if a.kind == b.kind: + case a.kind + of kString: + return a.str == b.str + of kNum: + return a.num == b.num + of kBytes: + return a.bytes == b.bytes + of kList: + return a.listRaw == b.listRaw + else: + return false + +proc cmp(a, b: FieldPair): int = cmp(a[0], b[0]) + +proc makeEnrRaw(seqNum: uint64, pk: PrivateKey, + pairs: openArray[FieldPair]): EnrResult[seq[byte]] = + proc append(w: var RlpWriter, seqNum: uint64, + pairs: openArray[FieldPair]): seq[byte] = + w.append(seqNum) + for (k, v) in pairs: + w.append(k) + case v.kind + of kString: w.append(v.str) + of kNum: w.append(v.num) + of kBytes: w.append(v.bytes) + of kList: w.appendRawBytes(v.listRaw) # No encoding needs to happen + w.finish() + + let toSign = block: + var w = initRlpList(pairs.len * 2 + 1) + w.append(seqNum, pairs) + + let sig = signNR(pk, toSign) + + var raw = block: + var w = initRlpList(pairs.len * 2 + 2) + w.append(sig.toRaw()) + w.append(seqNum, pairs) + + if raw.len > maxEnrSize: + err("Record exceeds maximum size") + else: + ok(raw) + +proc makeEnrAux(seqNum: uint64, pk: PrivateKey, + pairs: openArray[FieldPair]): EnrResult[Record] = + var record: Record + record.pairs = @pairs + record.seqNum = seqNum + + let pubkey = pk.toPublicKey() + + record.pairs.add(("id", Field(kind: kString, str: "v4"))) + record.pairs.add(("secp256k1", + Field(kind: kBytes, bytes: @(pubkey.toRawCompressed())))) + + # Sort by key + record.pairs.sort(cmp) + # TODO: Should deduplicate on keys here also. Should we error on that or just + # deal with it? + + record.raw = ? makeEnrRaw(seqNum, pk, record.pairs) + ok(record) + +macro initRecord*(seqNum: uint64, pk: PrivateKey, + pairs: untyped{nkTableConstr}): untyped = + ## Initialize a `Record` with given sequence number, private key and k:v + ## pairs. + ## + ## Can fail in case the record exceeds the `maxEnrSize`. + for c in pairs: + c.expectKind(nnkExprColonExpr) + c[1] = newCall(bindSym"toField", c[1]) + + result = quote do: + makeEnrAux(`seqNum`, `pk`, `pairs`) + +template toFieldPair*(key: string, value: auto): FieldPair = + (key, toField(value)) + +proc addAddress(fields: var seq[FieldPair], ip: Option[ValidIpAddress], + tcpPort, udpPort: Option[Port]) = + ## Add address information in new fields. Incomplete address + ## information is allowed (example: Port but not IP) as that information + ## might be already in the ENR or added later. + if ip.isSome(): + let + ipExt = ip.get() + isV6 = ipExt.family == IPv6 + + fields.add(if isV6: ("ip6", ipExt.address_v6.toField) + else: ("ip", ipExt.address_v4.toField)) + if tcpPort.isSome(): + fields.add(((if isV6: "tcp6" else: "tcp"), tcpPort.get().uint16.toField)) + if udpPort.isSome(): + fields.add(((if isV6: "udp6" else: "udp"), udpPort.get().uint16.toField)) + else: + if tcpPort.isSome(): + fields.add(("tcp", tcpPort.get().uint16.toField)) + if udpPort.isSome(): + fields.add(("udp", udpPort.get().uint16.toField)) + +proc init*(T: type Record, seqNum: uint64, + pk: PrivateKey, + ip: Option[ValidIpAddress], + tcpPort, udpPort: Option[Port], + extraFields: openArray[FieldPair] = []): + EnrResult[T] = + ## Initialize a `Record` with given sequence number, private key, optional + ## ip address, tcp port, udp port, and optional custom k:v pairs. + ## + ## Can fail in case the record exceeds the `maxEnrSize`. + var fields = newSeq[FieldPair]() + + # TODO: Allow for initializing ENR with both ip4 and ipv6 address. + fields.addAddress(ip, tcpPort, udpPort) + fields.add extraFields + makeEnrAux(seqNum, pk, fields) + +proc getField(r: Record, name: string, field: var Field): bool = + # It might be more correct to do binary search, + # as the fields are sorted, but it's unlikely to + # make any difference in reality. + for (k, v) in r.pairs: + if k == name: + field = v + return true + +proc requireKind(f: Field, kind: FieldKind): EnrResult[void] = + if f.kind != kind: + err("Wrong field kind") + else: + ok() + +proc get*(r: Record, key: string, T: type): EnrResult[T] = + ## Get the value from the provided key. + var f: Field + if r.getField(key, f): + when T is SomeInteger: + ? requireKind(f, kNum) + ok(T(f.num)) + elif T is seq[byte]: + ? requireKind(f, kBytes) + ok(f.bytes) + elif T is string: + ? requireKind(f, kString) + ok(f.str) + elif T is PublicKey: + ? requireKind(f, kBytes) + let pk = PublicKey.fromRaw(f.bytes) + if pk.isErr: + err("Invalid public key") + else: + ok(pk[]) + elif T is array: + when type(default(T)[low(T)]) is byte: + ? requireKind(f, kBytes) + if f.bytes.len != T.len: + err("Invalid byte blob length") + else: + var res: T + copyMem(addr res[0], addr f.bytes[0], res.len) + ok(res) + else: + {.fatal: "Unsupported output type in enr.get".} + else: + {.fatal: "Unsupported output type in enr.get".} + else: + err("Key not found in ENR") + +proc get*(r: Record, T: type PublicKey): Option[T] = + ## Get the `PublicKey` from provided `Record`. Return `none` when there is + ## no `PublicKey` in the record. + var pubkeyField: Field + if r.getField("secp256k1", pubkeyField) and pubkeyField.kind == kBytes: + let pk = PublicKey.fromRaw(pubkeyField.bytes) + if pk.isOk: + return some pk[] + +proc find(r: Record, key: string): Option[int] = + ## Search for key in record key:value pairs. + ## + ## Returns some(index of key) if key is found in record. Else return none. + for i, (k, v) in r.pairs: + if k == key: + return some(i) + +proc update*(record: var Record, pk: PrivateKey, + fieldPairs: openArray[FieldPair]): EnrResult[void] = + ## Update a `Record` k:v pairs. + ## + ## In case any of the k:v pairs is updated or added (new), the sequence number + ## of the `Record` will be incremented and a new signature will be applied. + ## + ## Can fail in case of wrong `PrivateKey`, if the size of the resulting record + ## exceeds `maxEnrSize` or if maximum sequence number is reached. The `Record` + ## will not be altered in these cases. + var r = record + + let pubkey = r.get(PublicKey) + if pubkey.isNone() or pubkey.get() != pk.toPublicKey(): + return err("Public key does not correspond with given private key") + + var updated = false + for fieldPair in fieldPairs: + let index = r.find(fieldPair[0]) + if(index.isSome()): + if r.pairs[index.get()][1] == fieldPair[1]: + # Exact k:v pair is already in record, nothing to do here. + continue + else: + # Need to update the value. + r.pairs[index.get()] = fieldPair + updated = true + else: + # Add new k:v pair. + r.pairs.insert(fieldPair, lowerBound(r.pairs, fieldPair, cmp)) + updated = true + + if updated: + if r.seqNum == high(r.seqNum): # highly unlikely + return err("Maximum sequence number reached") + r.seqNum.inc() + r.raw = ? makeEnrRaw(r.seqNum, pk, r.pairs) + record = r + + ok() + +proc update*(r: var Record, pk: PrivateKey, + ip: Option[ValidIpAddress], + tcpPort, udpPort: Option[Port] = none[Port](), + extraFields: openArray[FieldPair] = []): + EnrResult[void] = + ## Update a `Record` with given ip address, tcp port, udp port and optional + ## custom k:v pairs. + ## + ## In case any of the k:v pairs is updated or added (new), the sequence number + ## of the `Record` will be incremented and a new signature will be applied. + ## + ## Can fail in case of wrong `PrivateKey`, if the size of the resulting record + ## exceeds `maxEnrSize` or if maximum sequence number is reached. The `Record` + ## will not be altered in these cases. + var fields = newSeq[FieldPair]() + + # TODO: Make updating of both ipv4 and ipv6 address in ENR more convenient. + fields.addAddress(ip, tcpPort, udpPort) + fields.add extraFields + r.update(pk, fields) + +proc tryGet*(r: Record, key: string, T: type): Option[T] = + ## Get the value from the provided key. + ## Return `none` if the key does not exist or if the value is invalid + ## according to type `T`. + let val = get(r, key, T) + if val.isOk(): + some(val.get()) + else: + none(T) + +proc toTypedRecord*(r: Record): EnrResult[TypedRecord] = + let id = r.tryGet("id", string) + if id.isSome: + var tr: TypedRecord + tr.id = id.get + + template readField(fieldName: untyped) {.dirty.} = + tr.fieldName = tryGet(r, astToStr(fieldName), type(tr.fieldName.get)) + + readField secp256k1 + readField ip + readField ip6 + readField tcp + readField tcp6 + readField udp + readField udp6 + + ok(tr) + else: + err("Record without id field") + +proc contains*(r: Record, fp: (string, seq[byte])): bool = + # TODO: use FieldPair for this, but that is a bit cumbersome. Perhaps the + # `get` call can be improved to make this easier. + let field = r.tryGet(fp[0], seq[byte]) + if field.isSome(): + if field.get() == fp[1]: + return true + +proc verifySignatureV4(r: Record, sigData: openArray[byte], content: seq[byte]): + bool = + let publicKey = r.get(PublicKey) + if publicKey.isSome: + let sig = SignatureNR.fromRaw(sigData) + if sig.isOk: + var h = keccak256.digest(content) + return verify(sig[], SkMessage(h.data), publicKey.get) + +proc verifySignature(r: Record): bool {.raises: [RlpError, Defect].} = + var rlp = rlpFromBytes(r.raw) + let sz = rlp.listLen + if not rlp.enterList: + return false + let sigData = rlp.read(seq[byte]) + let content = block: + var writer = initRlpList(sz - 1) + var reader = rlp + for i in 1 ..< sz: + writer.appendRawBytes(reader.rawData) + reader.skipElem + writer.finish() + + var id: Field + if r.getField("id", id) and id.kind == kString: + case id.str + of "v4": + result = verifySignatureV4(r, sigData, content) + else: + # Unknown Identity Scheme + discard + +proc fromBytesAux(r: var Record): bool {.raises: [RlpError, Defect].} = + if r.raw.len > maxEnrSize: + return false + + var rlp = rlpFromBytes(r.raw) + if not rlp.isList: + return false + + let sz = rlp.listLen + if sz < minRlpListLen or sz mod 2 != 0: + # Wrong rlp object + return false + + # We already know we are working with a list + doAssert rlp.enterList() + rlp.skipElem() # Skip signature + + r.seqNum = rlp.read(uint64) + + let numPairs = (sz - 2) div 2 + + for i in 0 ..< numPairs: + let k = rlp.read(string) + case k + of "id": + let id = rlp.read(string) + r.pairs.add((k, Field(kind: kString, str: id))) + of "secp256k1": + let pubkeyData = rlp.read(seq[byte]) + r.pairs.add((k, Field(kind: kBytes, bytes: pubkeyData))) + of "tcp", "udp", "tcp6", "udp6": + let v = rlp.read(uint16) + r.pairs.add((k, Field(kind: kNum, num: v))) + else: + # Don't know really what this is supposed to represent so drop it in + # `kBytes` field pair when a single byte or blob. + if rlp.isSingleByte() or rlp.isBlob(): + r.pairs.add((k, Field(kind: kBytes, bytes: rlp.read(seq[byte])))) + elif rlp.isList(): + # Not supporting decoding lists as value (especially unknown ones), + # just drop the raw RLP value in there. + r.pairs.add((k, Field(kind: kList, listRaw: @(rlp.rawData())))) + # Need to skip the element still. + rlp.skipElem() + + verifySignature(r) + +proc fromBytes*(r: var Record, s: openArray[byte]): bool = + ## Loads ENR from rlp-encoded bytes, and validates the signature. + r.raw = @s + try: + result = fromBytesAux(r) + except RlpError: + discard + +proc fromBase64*(r: var Record, s: string): bool = + ## Loads ENR from base64-encoded rlp-encoded bytes, and validates the + ## signature. + try: + r.raw = Base64Url.decode(s) + result = fromBytesAux(r) + except RlpError, Base64Error: + discard + +proc fromURI*(r: var Record, s: string): bool = + ## Loads ENR from its text encoding: base64-encoded rlp-encoded bytes, + ## prefixed with "enr:". Validates the signature. + const prefix = "enr:" + if s.startsWith(prefix): + result = r.fromBase64(s[prefix.len .. ^1]) + +template fromURI*(r: var Record, url: EnrUri): bool = + fromURI(r, string(url)) + +proc toBase64*(r: Record): string = + result = Base64Url.encode(r.raw) + +proc toURI*(r: Record): string = "enr:" & r.toBase64 + +proc `$`(f: Field): string = + case f.kind + of kNum: + $f.num + of kBytes: + "0x" & f.bytes.toHex + of kString: + "\"" & f.str & "\"" + of kList: + "(Raw RLP list) " & "0x" & f.listRaw.toHex + +proc `$`*(r: Record): string = + result = "(" + result &= $r.seqNum + for (k, v) in r.pairs: + result &= ", " + result &= k + result &= ": " + # For IP addresses we print something prettier than the default kinds + # Note: Could disallow for invalid IPs in ENR also. + if k == "ip": + let ip = r.tryGet("ip", array[4, byte]) + if ip.isSome(): + result &= $ipv4(ip.get()) + else: + result &= "(Invalid) " & $v + elif k == "ip6": + let ip = r.tryGet("ip6", array[16, byte]) + if ip.isSome(): + result &= $ipv6(ip.get()) + else: + result &= "(Invalid) " & $v + else: + result &= $v + result &= ')' + +proc `==`*(a, b: Record): bool = a.raw == b.raw + +proc read*(rlp: var Rlp, T: typedesc[Record]): + T {.raises: [RlpError, ValueError, Defect].} = + if not rlp.hasData() or not result.fromBytes(rlp.rawData): + # TODO: This could also just be an invalid signature, would be cleaner to + # split of RLP deserialisation errors from this. + raise newException(ValueError, "Could not deserialize") + rlp.skipElem() + +proc append*(rlpWriter: var RlpWriter, value: Record) = + rlpWriter.appendRawBytes(value.raw) diff --git a/eth/p2p/discoveryv5/hkdf.nim b/eth/p2p/discoveryv5/hkdf.nim new file mode 100644 index 0000000..88ee7c9 --- /dev/null +++ b/eth/p2p/discoveryv5/hkdf.nim @@ -0,0 +1,30 @@ +import nimcrypto + +proc hkdf*(HashType: typedesc, ikm, salt, info: openArray[byte], + output: var openArray[byte]) = + var ctx: HMAC[HashType] + ctx.init(salt) + ctx.update(ikm) + let prk = ctx.finish().data + const hashLen = HashType.bits div 8 + + var t: MDigest[HashType.bits] + + var numIters = output.len div hashLen + if output.len mod hashLen != 0: + inc numIters + + for i in 0 ..< numIters: + ctx.init(prk) + if i != 0: + ctx.update(t.data) + ctx.update(info) + ctx.update([uint8(i + 1)]) + t = ctx.finish() + let iStart = i * hashLen + var sz = hashLen + if iStart + sz >= output.len: + sz = output.len - iStart + copyMem(addr output[iStart], addr t.data, sz) + + ctx.clear() diff --git a/eth/p2p/discoveryv5/ip_vote.nim b/eth/p2p/discoveryv5/ip_vote.nim new file mode 100644 index 0000000..20e8609 --- /dev/null +++ b/eth/p2p/discoveryv5/ip_vote.nim @@ -0,0 +1,76 @@ +# nim-eth - Node Discovery Protocol v5 +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. +# +## IP:port address votes implemented similarly as in +## https://github.com/sigp/discv5 +## +## This allows the selection of a node its own public IP based on address +## information that is received from other nodes. +## This can be used in conjuction with discovery v5 ping-pong request responses +## that provide this information. +## To select the right address, a majority count is done. This is done over a +## sort of moving window as votes expire after `IpVoteTimeout`. + +{.push raises: [Defect].} + +import + std/[tables, options], + chronos, + ./node + +export options + +const IpVoteTimeout = 5.minutes ## Duration until a vote expires + +type + IpVote* = object + votes: Table[NodeId, tuple[address: Address, expiry: chronos.Moment]] + threshold: uint ## Minimum threshold to allow for a majority to count + +func init*(T: type IpVote, threshold: uint = 10): T = + ## Initialize IpVote. + ## + ## If provided threshold is lower than 2 it will be set to 2. + if threshold < 2: + IpVote(threshold: 2) + else: + IpVote(threshold: threshold) + +proc insert*(ipvote: var IpVote, key: NodeId, address: Address) = + ## Insert a vote for an address coming from a specific `NodeId`. A `NodeId` + ## can only hold 1 vote. + ipvote.votes[key] = (address, now(chronos.Moment) + IpVoteTimeout) + +proc majority*(ipvote: var IpVote): Option[Address] = + ## Get the majority of votes on an address. Pruning of votes older than + ## `IpVoteTime` will be done before the majority count. + ## Note: When there is a draw the selected "majority" will depend on whichever + ## address comes first in the CountTable. This seems acceptable as there is no + ## other criteria to make a selection. + let now = now(chronos.Moment) + + var + pruneList: seq[NodeId] + ipCount: CountTable[Address] + for k, v in ipvote.votes: + if now > v.expiry: + pruneList.add(k) + else: + ipCount.inc(v.address) + + for id in pruneList: + ipvote.votes.del(id) + + if ipCount.len <= 0: + return none(Address) + + let (address, count) = ipCount.largest() + + if uint(count) >= ipvote.threshold: + some(address) + else: + none(Address) diff --git a/eth/p2p/discoveryv5/lru.nim b/eth/p2p/discoveryv5/lru.nim new file mode 100644 index 0000000..d4ffb41 --- /dev/null +++ b/eth/p2p/discoveryv5/lru.nim @@ -0,0 +1,41 @@ +import std/[tables, lists, options] + +{.push raises: [Defect].} + +type + LRUCache*[K, V] = object of RootObj + list: DoublyLinkedList[(K, V)] # Head is MRU k:v and tail is LRU k:v + table: Table[K, DoublyLinkedNode[(K, V)]] # DoublyLinkedNode is alraedy ref + capacity: int + +func init*[K, V](T: type LRUCache[K, V], capacity: int): LRUCache[K, V] = + LRUCache[K, V](capacity: capacity) # Table and list init is done default + +func get*[K, V](lru: var LRUCache[K, V], key: K): Option[V] = + let node = lru.table.getOrDefault(key, nil) + if node.isNil: + return none(V) + + lru.list.remove(node) + lru.list.prepend(node) + return some(node.value[1]) + +func put*[K, V](lru: var LRUCache[K, V], key: K, value: V) = + let node = lru.table.getOrDefault(key, nil) + if not node.isNil: + lru.list.remove(node) + else: + if lru.table.len >= lru.capacity: + lru.table.del(lru.list.tail.value[0]) + lru.list.remove(lru.list.tail) + + lru.list.prepend((key, value)) + lru.table[key] = lru.list.head + +func del*[K, V](lru: var LRUCache[K, V], key: K) = + var node: DoublyLinkedNode[(K, V)] + if lru.table.pop(key, node): + lru.list.remove(node) + +func len*[K, V](lru: LRUCache[K, V]): int = + lru.table.len diff --git a/eth/p2p/discoveryv5/messages.nim b/eth/p2p/discoveryv5/messages.nim new file mode 100644 index 0000000..8d29bc3 --- /dev/null +++ b/eth/p2p/discoveryv5/messages.nim @@ -0,0 +1,144 @@ +# nim-eth - Node Discovery Protocol v5 +# Copyright (c) 2020-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. +# +## Discovery v5 Protocol Messages as specified at +## https://github.com/ethereum/devp2p/blob/master/discv5/discv5-wire.md#protocol-messages +## These messages get RLP encoded. +## + +{.push raises: [Defect].} + +import + std/[hashes, net], + stew/arrayops, + ../../rlp, ./enr + +type + MessageKind* = enum + # TODO This is needed only to make Nim 1.2.6 happy + # Without it, the `MessageKind` type cannot be used as + # a discriminator in case objects. + unused = 0x00 + + ping = 0x01 + pong = 0x02 + findNode = 0x03 + nodes = 0x04 + talkReq = 0x05 + talkResp = 0x06 + regTopic = 0x07 + ticket = 0x08 + regConfirmation = 0x09 + topicQuery = 0x0A + + RequestId* = object + id*: seq[byte] + + PingMessage* = object + enrSeq*: uint64 + + PongMessage* = object + enrSeq*: uint64 + ip*: IpAddress + port*: uint16 + + FindNodeMessage* = object + distances*: seq[uint16] + + NodesMessage* = object + total*: uint32 + enrs*: seq[Record] + + TalkReqMessage* = object + protocol*: seq[byte] + request*: seq[byte] + + TalkRespMessage* = object + response*: seq[byte] + + # Not implemented, specification is not final here. + RegTopicMessage* = object + TicketMessage* = object + RegConfirmationMessage* = object + TopicQueryMessage* = object + + SomeMessage* = PingMessage or PongMessage or FindNodeMessage or NodesMessage or + TalkReqMessage or TalkRespMessage + + Message* = object + reqId*: RequestId + case kind*: MessageKind + of ping: + ping*: PingMessage + of pong: + pong*: PongMessage + of findNode: + findNode*: FindNodeMessage + of nodes: + nodes*: NodesMessage + of talkReq: + talkReq*: TalkReqMessage + of talkResp: + talkResp*: TalkRespMessage + of regTopic: + regtopic*: RegTopicMessage + of ticket: + ticket*: TicketMessage + of regConfirmation: + regConfirmation*: RegConfirmationMessage + of topicQuery: + topicQuery*: TopicQueryMessage + else: + discard + +template messageKind*(T: typedesc[SomeMessage]): MessageKind = + when T is PingMessage: ping + elif T is PongMessage: pong + elif T is FindNodeMessage: findNode + elif T is NodesMessage: nodes + elif T is TalkReqMessage: talkReq + elif T is TalkRespMessage: talkResp + +proc read*(rlp: var Rlp, T: type RequestId): T + {.raises: [ValueError, RlpError, Defect].} = + mixin read + var reqId: RequestId + reqId.id = rlp.toBytes() + if reqId.id.len > 8: + raise newException(ValueError, "RequestId is > 8 bytes") + rlp.skipElem() + + reqId + +proc append*(writer: var RlpWriter, value: RequestId) = + writer.append(value.id) + +proc read*(rlp: var Rlp, T: type IpAddress): T + {.raises: [RlpError, Defect].} = + let ipBytes = rlp.toBytes() + rlp.skipElem() + + if ipBytes.len == 4: + var ip: array[4, byte] + discard copyFrom(ip, ipBytes) + IpAddress(family: IPv4, address_v4: ip) + elif ipBytes.len == 16: + var ip: array[16, byte] + discard copyFrom(ip, ipBytes) + IpAddress(family: IPv6, address_v6: ip) + else: + raise newException(RlpTypeMismatch, + "Amount of bytes for IP address is different from 4 or 16") + +proc append*(writer: var RlpWriter, ip: IpAddress) = + case ip.family: + of IpAddressFamily.IPv4: + writer.append(ip.address_v4) + of IpAddressFamily.IPv6: writer.append(ip.address_v6) + +proc hash*(reqId: RequestId): Hash = + hash(reqId.id) diff --git a/eth/p2p/discoveryv5/node.nim b/eth/p2p/discoveryv5/node.nim new file mode 100644 index 0000000..3b311b9 --- /dev/null +++ b/eth/p2p/discoveryv5/node.nim @@ -0,0 +1,141 @@ +# nim-eth - Node Discovery Protocol v5 +# Copyright (c) 2020-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + +import + std/hashes, + nimcrypto, stint, chronos, stew/shims/net, chronicles, + ../../keys, ../../net/utils, + ./enr + +export stint + +type + NodeId* = UInt256 + + Address* = object + ip*: ValidIpAddress + port*: Port + + Node* = ref object + id*: NodeId + pubkey*: PublicKey + address*: Option[Address] + record*: Record + seen*: bool ## Indicates if there was at least one successful + ## request-response with this node. + +func toNodeId*(pk: PublicKey): NodeId = + ## Convert public key to a node identifier. + # Keccak256 hash is used as defined in ENR spec for scheme v4: + # https://github.com/ethereum/devp2p/blob/master/enr.md#v4-identity-scheme + readUintBE[256](keccak256.digest(pk.toRaw()).data) + +func newNode*(r: Record): Result[Node, cstring] = + ## Create a new `Node` from a `Record`. + # TODO: Handle IPv6 + + let pk = r.get(PublicKey) + # This check is redundant for a properly created record as the deserialization + # of a record will fail at `verifySignature` if there is no public key. + if pk.isNone(): + return err("Could not recover public key from ENR") + + # Also this can not fail for a properly created record as id is checked upon + # deserialization. + let tr = ? r.toTypedRecord() + if tr.ip.isSome() and tr.udp.isSome(): + let a = Address(ip: ipv4(tr.ip.get()), port: Port(tr.udp.get())) + + ok(Node(id: pk.get().toNodeId(), pubkey: pk.get() , record: r, + address: some(a))) + else: + ok(Node(id: pk.get().toNodeId(), pubkey: pk.get(), record: r, + address: none(Address))) + +func update*(n: Node, pk: PrivateKey, ip: Option[ValidIpAddress], + tcpPort, udpPort: Option[Port] = none[Port](), + extraFields: openArray[FieldPair] = []): Result[void, cstring] = + ? n.record.update(pk, ip, tcpPort, udpPort, extraFields) + + if ip.isSome(): + if udpPort.isSome(): + let a = Address(ip: ip.get(), port: udpPort.get()) + n.address = some(a) + elif n.address.isSome(): + let a = Address(ip: ip.get(), port: n.address.get().port) + n.address = some(a) + else: + n.address = none(Address) + else: + n.address = none(Address) + + ok() + +func hash*(n: Node): hashes.Hash = hash(n.pubkey.toRaw) + +func `==`*(a, b: Node): bool = + (a.isNil and b.isNil) or + (not a.isNil and not b.isNil and a.pubkey == b.pubkey) + +func hash*(id: NodeId): Hash = + hash(id.toByteArrayBE) + +proc random*(T: type NodeId, rng: var BrHmacDrbgContext): T = + var id: NodeId + brHmacDrbgGenerate(addr rng, addr id, csize_t(sizeof(id))) + + id + +func `$`*(id: NodeId): string = + id.toHex() + +func shortLog*(id: NodeId): string = + ## Returns compact string representation of ``id``. + var sid = $id + if len(sid) <= 10: + result = sid + else: + result = newStringOfCap(10) + for i in 0..<2: + result.add(sid[i]) + result.add("*") + for i in (len(sid) - 6)..sid.high: + result.add(sid[i]) +chronicles.formatIt(NodeId): shortLog(it) + +func hash*(a: Address): hashes.Hash = + let res = a.ip.hash !& a.port.hash + !$res + +func `$`*(a: Address): string = + result.add($a.ip) + result.add(":" & $a.port) + +func shortLog*(n: Node): string = + if n.isNil: + "uninitialized" + elif n.address.isNone(): + shortLog(n.id) & ":unaddressable" + else: + shortLog(n.id) & ":" & $n.address.get() +chronicles.formatIt(Node): shortLog(it) + +func shortLog*(nodes: seq[Node]): string = + result = "[" + + var first = true + for n in nodes: + if first: + first = false + else: + result.add(", ") + result.add(shortLog(n)) + + result.add("]") +chronicles.formatIt(seq[Node]): shortLog(it) diff --git a/eth/p2p/discoveryv5/nodes_verification.nim b/eth/p2p/discoveryv5/nodes_verification.nim new file mode 100644 index 0000000..45fd89f --- /dev/null +++ b/eth/p2p/discoveryv5/nodes_verification.nim @@ -0,0 +1,86 @@ +{.push raises: [Defect].} + +import + std/[sets, options], + stew/results, stew/shims/net, chronicles, chronos, + "."/[node, enr, routing_table] + +logScope: + topics = "nodes-verification" + +proc validIp(sender, address: IpAddress): bool = + let + s = initTAddress(sender, Port(0)) + a = initTAddress(address, Port(0)) + if a.isAnyLocal(): + return false + if a.isMulticast(): + return false + if a.isLoopback() and not s.isLoopback(): + return false + if a.isSiteLocal() and not s.isSiteLocal(): + return false + # TODO: Also check for special reserved ip addresses: + # https://www.iana.org/assignments/iana-ipv4-special-registry/iana-ipv4-special-registry.xhtml + # https://www.iana.org/assignments/iana-ipv6-special-registry/iana-ipv6-special-registry.xhtml + return true + +proc verifyNodesRecords(enrs: openArray[Record], fromNode: Node, nodesLimit: int, + distances: Option[seq[uint16]]): seq[Node] = + ## Verify and convert ENRs to a sequence of nodes. Only ENRs that pass + ## verification will be added. ENRs are verified for duplicates, invalid + ## addresses and invalid distances if those are specified. + var seen: HashSet[Node] + var count = 0 + for r in enrs: + # Check and allow for processing of maximum `findNodeResultLimit` ENRs + # returned. This limitation is required so no huge lists of invalid ENRs + # are processed for no reason, and for not overwhelming a routing table + # with nodes from a malicious actor. + # The discovery v5 specification specifies no limit on the amount of ENRs + # that can be returned, but clients usually stick with the bucket size limit + # as in original Kademlia. Because of this it is chosen not to fail + # immediatly, but still process maximum `findNodeResultLimit`. + if count >= nodesLimit: + debug "Too many ENRs", enrs = enrs.len(), + limit = nodesLimit, sender = fromNode.record.toURI + break + + count.inc() + + let node = newNode(r) + if node.isOk(): + let n = node.get() + # Check for duplicates in the nodes reply. Duplicates are checked based + # on node id. + if n in seen: + trace "Duplicate node ids", + record = n.record.toURI, id = n.id, sender = fromNode.record.toURI + continue + # Check if the node has an address and if the address is public or from + # the same local network or lo network as the sender. The latter allows + # for local testing. + if not n.address.isSome() or not + validIp(fromNode.address.get().ip, n.address.get().ip): + trace "Invalid ip-address", + record = n.record.toURI, node = n, sender = fromNode.record.toURI + continue + # Check if returned node has one of the requested distances. + if distances.isSome(): + # TODO: This is incorrect for custom distances + if (not distances.get().contains(logDistance(n.id, fromNode.id))): + debug "Incorrect distance", + record = n.record.toURI, sender = fromNode.record.toURI + continue + + # No check on UDP port and thus any port is allowed, also the so called + # "well-known" ports. + + seen.incl(n) + result.add(n) + +proc verifyNodesRecords*(enrs: openArray[Record], fromNode: Node, nodesLimit: int): seq[Node] = + verifyNodesRecords(enrs, fromNode, nodesLimit, none[seq[uint16]]()) + +proc verifyNodesRecords*(enrs: openArray[Record], fromNode: Node, nodesLimit: int, distances: seq[uint16]): seq[Node] = + verifyNodesRecords(enrs, fromNode, nodesLimit, some[seq[uint16]](distances)) diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim new file mode 100644 index 0000000..2582727 --- /dev/null +++ b/eth/p2p/discoveryv5/protocol.nim @@ -0,0 +1,1005 @@ +# nim-eth - Node Discovery Protocol v5 +# Copyright (c) 2020-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +## Node Discovery Protocol v5 +## +## Node discovery protocol implementation as per specification: +## https://github.com/ethereum/devp2p/blob/master/discv5/discv5.md +## +## This node discovery protocol implementation uses the same underlying +## implementation of routing table as is also used for the discovery v4 +## implementation, which is the same or similar as the one described in the +## original Kademlia paper: +## https://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf +## +## This might not be the most optimal implementation for the node discovery +## protocol v5. Why? +## +## The Kademlia paper describes an implementation that starts off from one +## k-bucket, and keeps splitting the bucket as more nodes are discovered and +## added. The bucket splits only on the part of the binary tree where our own +## node its id belongs too (same prefix). Resulting eventually in a k-bucket per +## logarithmic distance (log base2 distance). Well, not really, as nodes with +## ids in the closer distance ranges will never be found. And because of this an +## optimisation is done where buckets will also split sometimes even if the +## nodes own id does not have the same prefix (this is to avoid creating highly +## unbalanced branches which would require longer lookups). +## +## Now, some implementations take a more simplified approach. They just create +## directly a bucket for each possible logarithmic distance (e.g. here 1->256). +## Some implementations also don't create buckets with logarithmic distance +## lower than a certain value (e.g. only 1/15th of the highest buckets), +## because the closer to the node (the lower the distance), the less chance +## there is to still find nodes. +## +## The discovery protocol v4 its `FindNode` call will request the k closest +## nodes. As does original Kademlia. This effectively puts the work at the node +## that gets the request. This node will have to check its buckets and gather +## the closest. Some implementations go over all the nodes in all the buckets +## for this (e.g. go-ethereum discovery v4). However, in our bucket splitting +## approach, this search is improved. +## +## In the discovery protocol v5 the `FindNode` call is changed and now the +## logarithmic distance is passed as parameter instead of the NodeId. And only +## nodes that match that logarithmic distance are allowed to be returned. +## This change was made to not put the trust at the requested node for selecting +## the closest nodes. To counter a possible (mistaken) difference in +## implementation, but more importantly for security reasons. See also: +## https://github.com/ethereum/devp2p/blob/master/discv5/discv5-rationale.md#115-guard-against-kademlia-implementation-flaws +## +## The result is that in an implementation which just stores buckets per +## logarithmic distance, it simply needs to return the right bucket. In our +## split-bucket implementation, this cannot be done as such and thus the closest +## neighbours search is still done. And to do this, a reverse calculation of an +## id at given logarithmic distance is needed (which is why there is the +## `idAtDistance` proc). Next, nodes with invalid distances need to be filtered +## out to be compliant to the specification. This can most likely get further +## optimised, but it sounds likely better to switch away from the split-bucket +## approach. I believe that the main benefit it has is improved lookups +## (due to no unbalanced branches), and it looks like this will be negated by +## limiting the returned nodes to only the ones of the requested logarithmic +## distance for the `FindNode` call. + +## This `FindNode` change in discovery v5 will also have an effect on the +## efficiency of the network. Work will be moved from the receiver of +## `FindNodes` to the requester. But this also means more network traffic, +## as less nodes will potentially be passed around per `FindNode` call, and thus +## more requests will be needed for a lookup (adding bandwidth and latency). +## This might be a concern for mobile devices. + +{.push raises: [Defect].} + +import + std/[tables, sets, options, math, sequtils, algorithm], + stew/shims/net as stewNet, json_serialization/std/net, + stew/[endians2, results], chronicles, chronos, stint, bearssl, metrics, + ".."/../[rlp, keys, async_utils], + "."/[messages, encoding, node, routing_table, enr, random2, sessions, ip_vote, nodes_verification] + +import nimcrypto except toHex + +export options, results, node, enr + +declareCounter discovery_message_requests_outgoing, + "Discovery protocol outgoing message requests", labels = ["response"] +declareCounter discovery_message_requests_incoming, + "Discovery protocol incoming message requests", labels = ["response"] +declareCounter discovery_unsolicited_messages, + "Discovery protocol unsolicited or timed-out messages" +declareCounter discovery_enr_auto_update, + "Amount of discovery IP:port address ENR auto updates" + +logScope: + topics = "discv5" + +const + alpha = 3 ## Kademlia concurrency factor + lookupRequestLimit = 3 ## Amount of distances requested in a single Findnode + ## message for a lookup or query + findNodeResultLimit = 16 ## Maximum amount of ENRs in the total Nodes messages + ## that will be processed + maxNodesPerMessage = 3 ## Maximum amount of ENRs per individual Nodes message + refreshInterval = 5.minutes ## Interval of launching a random query to + ## refresh the routing table. + revalidateMax = 10000 ## Revalidation of a peer is done between 0 and this + ## value in milliseconds + ipMajorityInterval = 5.minutes ## Interval for checking the latest IP:Port + ## majority and updating this when ENR auto update is set. + initialLookups = 1 ## Amount of lookups done when populating the routing table + handshakeTimeout* = 2.seconds ## timeout for the reply on the + ## whoareyou message + responseTimeout* = 4.seconds ## timeout for the response of a request-response + ## call + +type + DiscoveryConfig* = object + tableIpLimits*: TableIpLimits + bitsPerHop*: int + + Protocol* = ref object + transp: DatagramTransport + localNode*: Node + privateKey: PrivateKey + bindAddress: Address ## UDP binding address + pendingRequests: Table[AESGCMNonce, PendingRequest] + routingTable*: RoutingTable + codec*: Codec + awaitedMessages: Table[(NodeId, RequestId), Future[Option[Message]]] + refreshLoop: Future[void] + revalidateLoop: Future[void] + ipMajorityLoop: Future[void] + lastLookup: chronos.Moment + bootstrapRecords*: seq[Record] + ipVote: IpVote + enrAutoUpdate: bool + talkProtocols*: Table[seq[byte], TalkProtocol] # TODO: Table is a bit of + # overkill here, use sequence + rng*: ref BrHmacDrbgContext + + PendingRequest = object + node: Node + message: seq[byte] + + TalkProtocolHandler* = proc(p: TalkProtocol, request: seq[byte], fromId: NodeId, fromUdpAddress: Address): seq[byte] + {.gcsafe, raises: [Defect].} + + TalkProtocol* = ref object of RootObj + protocolHandler*: TalkProtocolHandler + + DiscResult*[T] = Result[T, cstring] + +const + defaultDiscoveryConfig* = DiscoveryConfig( + tableIpLimits: DefaultTableIpLimits, + bitsPerHop: DefaultBitsPerHop) + +proc addNode*(d: Protocol, node: Node): bool = + ## Add `Node` to discovery routing table. + ## + ## Returns true only when `Node` was added as a new entry to a bucket in the + ## routing table. + if d.routingTable.addNode(node) == Added: + return true + else: + return false + +proc addNode*(d: Protocol, r: Record): bool = + ## Add `Node` from a `Record` to discovery routing table. + ## + ## Returns false only if no valid `Node` can be created from the `Record` or + ## on the conditions of `addNode` from a `Node`. + let node = newNode(r) + if node.isOk(): + return d.addNode(node[]) + +proc addNode*(d: Protocol, enr: EnrUri): bool = + ## Add `Node` from a ENR URI to discovery routing table. + ## + ## Returns false if no valid ENR URI, or on the conditions of `addNode` from + ## an `Record`. + var r: Record + let res = r.fromURI(enr) + if res: + return d.addNode(r) + +proc getNode*(d: Protocol, id: NodeId): Option[Node] = + ## Get the node with id from the routing table. + d.routingTable.getNode(id) + +proc randomNodes*(d: Protocol, maxAmount: int): seq[Node] = + ## Get a `maxAmount` of random nodes from the local routing table. + d.routingTable.randomNodes(maxAmount) + +proc randomNodes*(d: Protocol, maxAmount: int, + pred: proc(x: Node): bool {.gcsafe, noSideEffect.}): seq[Node] = + ## Get a `maxAmount` of random nodes from the local routing table with the + ## `pred` predicate function applied as filter on the nodes selected. + d.routingTable.randomNodes(maxAmount, pred) + +proc randomNodes*(d: Protocol, maxAmount: int, + enrField: (string, seq[byte])): seq[Node] = + ## Get a `maxAmount` of random nodes from the local routing table. The + ## the nodes selected are filtered by provided `enrField`. + d.randomNodes(maxAmount, proc(x: Node): bool = x.record.contains(enrField)) + +proc neighbours*(d: Protocol, id: NodeId, k: int = BUCKET_SIZE, + seenOnly = false): seq[Node] = + ## Return up to k neighbours (closest node ids) of the given node id. + d.routingTable.neighbours(id, k, seenOnly) + +proc neighboursAtDistances*(d: Protocol, distances: seq[uint16], + k: int = BUCKET_SIZE, seenOnly = false): seq[Node] = + ## Return up to k neighbours (closest node ids) at given distances. + d.routingTable.neighboursAtDistances(distances, k, seenOnly) + +proc nodesDiscovered*(d: Protocol): int = d.routingTable.len + +func privKey*(d: Protocol): lent PrivateKey = + d.privateKey + +func getRecord*(d: Protocol): Record = + ## Get the ENR of the local node. + d.localNode.record + +proc updateRecord*( + d: Protocol, enrFields: openArray[(string, seq[byte])]): DiscResult[void] = + ## Update the ENR of the local node with provided `enrFields` k:v pairs. + let fields = mapIt(enrFields, toFieldPair(it[0], it[1])) + d.localNode.record.update(d.privateKey, fields) + # TODO: Would it make sense to actively ping ("broadcast") to all the peers + # we stored a handshake with in order to get that ENR updated? + +proc send*(d: Protocol, a: Address, data: seq[byte]) = + let ta = initTAddress(a.ip, a.port) + let f = d.transp.sendTo(ta, data) + f.callback = proc(data: pointer) {.gcsafe.} = + if f.failed: + # Could be `TransportUseClosedError` in case the transport is already + # closed, or could be `TransportOsError` in case of a socket error. + # In the latter case this would probably mostly occur if the network + # interface underneath gets disconnected or similar. + # TODO: Should this kind of error be propagated upwards? Probably, but + # it should not stop the process as that would reset the discovery + # progress in case there is even a small window of no connection. + # One case that needs this error available upwards is when revalidating + # nodes. Else the revalidation might end up clearing the routing tabl + # because of ping failures due to own network connection failure. + warn "Discovery send failed", msg = f.readError.msg + +proc send(d: Protocol, n: Node, data: seq[byte]) = + doAssert(n.address.isSome()) + d.send(n.address.get(), data) + +proc sendNodes(d: Protocol, toId: NodeId, toAddr: Address, reqId: RequestId, + nodes: openArray[Node]) = + proc sendNodes(d: Protocol, toId: NodeId, toAddr: Address, + message: NodesMessage, reqId: RequestId) {.nimcall.} = + let (data, _) = encodeMessagePacket(d.rng[], d.codec, toId, toAddr, + encodeMessage(message, reqId)) + + trace "Respond message packet", dstId = toId, address = toAddr, + kind = MessageKind.nodes + d.send(toAddr, data) + + if nodes.len == 0: + # In case of 0 nodes, a reply is still needed + d.sendNodes(toId, toAddr, NodesMessage(total: 1, enrs: @[]), reqId) + return + + var message: NodesMessage + # TODO: Do the total calculation based on the max UDP packet size we want to + # send and the ENR size of all (max 16) nodes. + # Which UDP packet size to take? 1280? 576? + message.total = ceil(nodes.len / maxNodesPerMessage).uint32 + + for i in 0 ..< nodes.len: + message.enrs.add(nodes[i].record) + if message.enrs.len == maxNodesPerMessage: + d.sendNodes(toId, toAddr, message, reqId) + message.enrs.setLen(0) + + if message.enrs.len != 0: + d.sendNodes(toId, toAddr, message, reqId) + +proc handlePing(d: Protocol, fromId: NodeId, fromAddr: Address, + ping: PingMessage, reqId: RequestId) = + let pong = PongMessage(enrSeq: d.localNode.record.seqNum, ip: fromAddr.ip, + port: fromAddr.port.uint16) + + let (data, _) = encodeMessagePacket(d.rng[], d.codec, fromId, fromAddr, + encodeMessage(pong, reqId)) + + trace "Respond message packet", dstId = fromId, address = fromAddr, + kind = MessageKind.pong + d.send(fromAddr, data) + +proc handleFindNode(d: Protocol, fromId: NodeId, fromAddr: Address, + fn: FindNodeMessage, reqId: RequestId) = + if fn.distances.len == 0: + d.sendNodes(fromId, fromAddr, reqId, []) + elif fn.distances.contains(0): + # A request for our own record. + # It would be a weird request if there are more distances next to 0 + # requested, so in this case lets just pass only our own. TODO: OK? + d.sendNodes(fromId, fromAddr, reqId, [d.localNode]) + else: + # TODO: Still deduplicate also? + if fn.distances.all(proc (x: uint16): bool = return x <= 256): + d.sendNodes(fromId, fromAddr, reqId, + d.routingTable.neighboursAtDistances(fn.distances, seenOnly = true)) + else: + # At least one invalid distance, but the polite node we are, still respond + # with empty nodes. + d.sendNodes(fromId, fromAddr, reqId, []) + +proc handleTalkReq(d: Protocol, fromId: NodeId, fromAddr: Address, + talkreq: TalkReqMessage, reqId: RequestId) = + let talkProtocol = d.talkProtocols.getOrDefault(talkreq.protocol) + + let talkresp = + if talkProtocol.isNil() or talkProtocol.protocolHandler.isNil(): + # Protocol identifier that is not registered and thus not supported. An + # empty response is send as per specification. + TalkRespMessage(response: @[]) + else: + TalkRespMessage(response: talkProtocol.protocolHandler(talkProtocol, + talkreq.request, fromId, fromAddr)) + let (data, _) = encodeMessagePacket(d.rng[], d.codec, fromId, fromAddr, + encodeMessage(talkresp, reqId)) + + trace "Respond message packet", dstId = fromId, address = fromAddr, + kind = MessageKind.talkresp + d.send(fromAddr, data) + +proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address, + message: Message) = + case message.kind + of ping: + discovery_message_requests_incoming.inc() + d.handlePing(srcId, fromAddr, message.ping, message.reqId) + of findNode: + discovery_message_requests_incoming.inc() + d.handleFindNode(srcId, fromAddr, message.findNode, message.reqId) + of talkReq: + discovery_message_requests_incoming.inc() + d.handleTalkReq(srcId, fromAddr, message.talkReq, message.reqId) + of regTopic, topicQuery: + discovery_message_requests_incoming.inc() + discovery_message_requests_incoming.inc(labelValues = ["no_response"]) + trace "Received unimplemented message kind", kind = message.kind, + origin = fromAddr + else: + var waiter: Future[Option[Message]] + if d.awaitedMessages.take((srcId, message.reqId), waiter): + waiter.complete(some(message)) + else: + discovery_unsolicited_messages.inc() + trace "Timed out or unrequested message", kind = message.kind, + origin = fromAddr + +proc registerTalkProtocol*(d: Protocol, protocolId: seq[byte], + protocol: TalkProtocol): DiscResult[void] = + # Currently allow only for one handler per talk protocol. + if d.talkProtocols.hasKeyOrPut(protocolId, protocol): + err("Protocol identifier already registered") + else: + ok() + +proc sendWhoareyou(d: Protocol, toId: NodeId, a: Address, + requestNonce: AESGCMNonce, node: Option[Node]) = + let key = HandshakeKey(nodeId: toId, address: a) + if not d.codec.hasHandshake(key): + let + recordSeq = if node.isSome(): node.get().record.seqNum + else: 0 + pubkey = if node.isSome(): some(node.get().pubkey) + else: none(PublicKey) + + let data = encodeWhoareyouPacket(d.rng[], d.codec, toId, a, requestNonce, + recordSeq, pubkey) + sleepAsync(handshakeTimeout).addCallback() do(data: pointer): + # TODO: should we still provide cancellation in case handshake completes + # correctly? + d.codec.handshakes.del(key) + + trace "Send whoareyou", dstId = toId, address = a + d.send(a, data) + else: + debug "Node with this id already has ongoing handshake, ignoring packet" + +proc receive*(d: Protocol, a: Address, packet: openArray[byte]) = + let decoded = d.codec.decodePacket(a, packet) + if decoded.isOk: + let packet = decoded[] + case packet.flag + of OrdinaryMessage: + if packet.messageOpt.isSome(): + let message = packet.messageOpt.get() + trace "Received message packet", srcId = packet.srcId, address = a, + kind = message.kind + d.handleMessage(packet.srcId, a, message) + else: + trace "Not decryptable message packet received", + srcId = packet.srcId, address = a + d.sendWhoareyou(packet.srcId, a, packet.requestNonce, + d.getNode(packet.srcId)) + + of Flag.Whoareyou: + trace "Received whoareyou packet", address = a + var pr: PendingRequest + if d.pendingRequests.take(packet.whoareyou.requestNonce, pr): + let toNode = pr.node + # This is a node we previously contacted and thus must have an address. + doAssert(toNode.address.isSome()) + let address = toNode.address.get() + let data = encodeHandshakePacket(d.rng[], d.codec, toNode.id, + address, pr.message, packet.whoareyou, toNode.pubkey) + + trace "Send handshake message packet", dstId = toNode.id, address + d.send(toNode, data) + else: + debug "Timed out or unrequested whoareyou packet", address = a + of HandshakeMessage: + trace "Received handshake message packet", srcId = packet.srcIdHs, + address = a, kind = packet.message.kind + d.handleMessage(packet.srcIdHs, a, packet.message) + # For a handshake message it is possible that we received an newer ENR. + # In that case we can add/update it to the routing table. + if packet.node.isSome(): + let node = packet.node.get() + # Lets not add nodes without correct IP in the ENR to the routing table. + # The ENR could contain bogus IPs and although they would get removed + # on the next revalidation, one could spam these as the handshake + # message occurs on (first) incoming messages. + if node.address.isSome() and a == node.address.get(): + if d.addNode(node): + trace "Added new node to routing table after handshake", node + else: + trace "Packet decoding error", error = decoded.error, address = a + +proc processClient(transp: DatagramTransport, raddr: TransportAddress): + Future[void] {.async.} = + let proto = getUserData[Protocol](transp) + + # TODO: should we use `peekMessage()` to avoid allocation? + let buf = try: transp.getMessage() + except TransportOsError as e: + # This is likely to be local network connection issues. + warn "Transport getMessage", exception = e.name, msg = e.msg + return + + let ip = try: raddr.address() + except ValueError as e: + error "Not a valid IpAddress", exception = e.name, msg = e.msg + return + let a = Address(ip: ValidIpAddress.init(ip), port: raddr.port) + + proto.receive(a, buf) + +proc replaceNode(d: Protocol, n: Node) = + if n.record notin d.bootstrapRecords: + d.routingTable.replaceNode(n) + else: + # For now we never remove bootstrap nodes. It might make sense to actually + # do so and to retry them only in case we drop to a really low amount of + # peers in the routing table. + debug "Message request to bootstrap node failed", enr = toURI(n.record) + +# TODO: This could be improved to do the clean-up immediatily in case a non +# whoareyou response does arrive, but we would need to store the AuthTag +# somewhere +proc registerRequest(d: Protocol, n: Node, message: seq[byte], + nonce: AESGCMNonce) = + let request = PendingRequest(node: n, message: message) + if not d.pendingRequests.hasKeyOrPut(nonce, request): + sleepAsync(responseTimeout).addCallback() do(data: pointer): + d.pendingRequests.del(nonce) + +proc waitMessage(d: Protocol, fromNode: Node, reqId: RequestId): + Future[Option[Message]] = + result = newFuture[Option[Message]]("waitMessage") + let res = result + let key = (fromNode.id, reqId) + sleepAsync(responseTimeout).addCallback() do(data: pointer): + d.awaitedMessages.del(key) + if not res.finished: + res.complete(none(Message)) + d.awaitedMessages[key] = result + +proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId): + Future[DiscResult[seq[Record]]] {.async.} = + ## Wait for one or more nodes replies. + ## + ## The first reply will hold the total number of replies expected, and based + ## on that, more replies will be awaited. + ## If one reply is lost here (timed out), others are ignored too. + ## Same counts for out of order receival. + var op = await d.waitMessage(fromNode, reqId) + if op.isSome: + if op.get.kind == nodes: + var res = op.get.nodes.enrs + let total = op.get.nodes.total + for i in 1 ..< total: + op = await d.waitMessage(fromNode, reqId) + if op.isSome and op.get.kind == nodes: + res.add(op.get.nodes.enrs) + else: + # No error on this as we received some nodes. + break + return ok(res) + else: + discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"]) + return err("Invalid response to find node message") + else: + discovery_message_requests_outgoing.inc(labelValues = ["no_response"]) + return err("Nodes message not received in time") + +proc sendMessage*[T: SomeMessage](d: Protocol, toNode: Node, m: T): + RequestId = + doAssert(toNode.address.isSome()) + let + address = toNode.address.get() + reqId = RequestId.init(d.rng[]) + message = encodeMessage(m, reqId) + + let (data, nonce) = encodeMessagePacket(d.rng[], d.codec, toNode.id, + address, message) + + d.registerRequest(toNode, message, nonce) + trace "Send message packet", dstId = toNode.id, address, kind = messageKind(T) + d.send(toNode, data) + discovery_message_requests_outgoing.inc() + return reqId + +proc ping*(d: Protocol, toNode: Node): + Future[DiscResult[PongMessage]] {.async.} = + ## Send a discovery ping message. + ## + ## Returns the received pong message or an error. + let reqId = d.sendMessage(toNode, + PingMessage(enrSeq: d.localNode.record.seqNum)) + let resp = await d.waitMessage(toNode, reqId) + + if resp.isSome(): + if resp.get().kind == pong: + d.routingTable.setJustSeen(toNode) + return ok(resp.get().pong) + else: + d.replaceNode(toNode) + discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"]) + return err("Invalid response to ping message") + else: + d.replaceNode(toNode) + discovery_message_requests_outgoing.inc(labelValues = ["no_response"]) + return err("Pong message not received in time") + +proc findNode*(d: Protocol, toNode: Node, distances: seq[uint16]): + Future[DiscResult[seq[Node]]] {.async.} = + ## Send a discovery findNode message. + ## + ## Returns the received nodes or an error. + ## Received ENRs are already validated and converted to `Node`. + let reqId = d.sendMessage(toNode, FindNodeMessage(distances: distances)) + let nodes = await d.waitNodes(toNode, reqId) + + if nodes.isOk: + let res = verifyNodesRecords(nodes.get(), toNode, findNodeResultLimit, distances) + d.routingTable.setJustSeen(toNode) + return ok(res) + else: + d.replaceNode(toNode) + return err(nodes.error) + +proc talkReq*(d: Protocol, toNode: Node, protocol, request: seq[byte]): + Future[DiscResult[seq[byte]]] {.async.} = + ## Send a discovery talkreq message. + ## + ## Returns the received talkresp message or an error. + let reqId = d.sendMessage(toNode, + TalkReqMessage(protocol: protocol, request: request)) + let resp = await d.waitMessage(toNode, reqId) + + if resp.isSome(): + if resp.get().kind == talkResp: + d.routingTable.setJustSeen(toNode) + return ok(resp.get().talkResp.response) + else: + d.replaceNode(toNode) + discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"]) + return err("Invalid response to talk request message") + else: + d.replaceNode(toNode) + discovery_message_requests_outgoing.inc(labelValues = ["no_response"]) + return err("Talk response message not received in time") + +proc lookupDistances*(target, dest: NodeId): seq[uint16] = + let td = logDistance(target, dest) + let tdAsInt = int(td) + result.add(td) + var i = 1 + while result.len < lookupRequestLimit: + if tdAsInt + i < 256: + result.add(td + uint16(i)) + if tdAsInt - i > 0: + result.add(td - uint16(i)) + inc i + +proc lookupWorker(d: Protocol, destNode: Node, target: NodeId): + Future[seq[Node]] {.async.} = + let dists = lookupDistances(target, destNode.id) + + # Instead of doing max `lookupRequestLimit` findNode requests, make use + # of the discv5.1 functionality to request nodes for multiple distances. + let r = await d.findNode(destNode, dists) + if r.isOk: + result.add(r[]) + + # Attempt to add all nodes discovered + for n in result: + discard d.addNode(n) + +proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] {.async.} = + ## Perform a lookup for the given target, return the closest n nodes to the + ## target. Maximum value for n is `BUCKET_SIZE`. + # `closestNodes` holds the k closest nodes to target found, sorted by distance + # Unvalidated nodes are used for requests as a form of validation. + var closestNodes = d.routingTable.neighbours(target, BUCKET_SIZE, + seenOnly = false) + + var asked, seen = initHashSet[NodeId]() + asked.incl(d.localNode.id) # No need to ask our own node + seen.incl(d.localNode.id) # No need to discover our own node + for node in closestNodes: + seen.incl(node.id) + + var pendingQueries = newSeqOfCap[Future[seq[Node]]](alpha) + + while true: + var i = 0 + # Doing `alpha` amount of requests at once as long as closer non queried + # nodes are discovered. + while i < closestNodes.len and pendingQueries.len < alpha: + let n = closestNodes[i] + if not asked.containsOrIncl(n.id): + pendingQueries.add(d.lookupWorker(n, target)) + inc i + + trace "discv5 pending queries", total = pendingQueries.len + + if pendingQueries.len == 0: + break + + let query = await one(pendingQueries) + trace "Got discv5 lookup query response" + + let index = pendingQueries.find(query) + if index != -1: + pendingQueries.del(index) + else: + error "Resulting query should have been in the pending queries" + + let nodes = query.read + # TODO: Remove node on timed-out query? + for n in nodes: + if not seen.containsOrIncl(n.id): + # If it wasn't seen before, insert node while remaining sorted + closestNodes.insert(n, closestNodes.lowerBound(n, + proc(x: Node, n: Node): int = + cmp(distance(x.id, target), distance(n.id, target)) + )) + + if closestNodes.len > BUCKET_SIZE: + closestNodes.del(closestNodes.high()) + + d.lastLookup = now(chronos.Moment) + return closestNodes + +proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]] + {.async.} = + ## Query k nodes for the given target, returns all nodes found, including the + ## nodes queried. + ## + ## This will take k nodes from the routing table closest to target and + ## query them for nodes closest to target. If there are less than k nodes in + ## the routing table, nodes returned by the first queries will be used. + var queryBuffer = d.routingTable.neighbours(target, k, seenOnly = false) + + var asked, seen = initHashSet[NodeId]() + asked.incl(d.localNode.id) # No need to ask our own node + seen.incl(d.localNode.id) # No need to discover our own node + for node in queryBuffer: + seen.incl(node.id) + + var pendingQueries = newSeqOfCap[Future[seq[Node]]](alpha) + + while true: + var i = 0 + while i < min(queryBuffer.len, k) and pendingQueries.len < alpha: + let n = queryBuffer[i] + if not asked.containsOrIncl(n.id): + pendingQueries.add(d.lookupWorker(n, target)) + inc i + + trace "discv5 pending queries", total = pendingQueries.len + + if pendingQueries.len == 0: + break + + let query = await one(pendingQueries) + trace "Got discv5 lookup query response" + + let index = pendingQueries.find(query) + if index != -1: + pendingQueries.del(index) + else: + error "Resulting query should have been in the pending queries" + + let nodes = query.read + # TODO: Remove node on timed-out query? + for n in nodes: + if not seen.containsOrIncl(n.id): + queryBuffer.add(n) + + d.lastLookup = now(chronos.Moment) + return queryBuffer + +proc queryRandom*(d: Protocol): Future[seq[Node]] = + ## Perform a query for a random target, return all nodes discovered. + d.query(NodeId.random(d.rng[])) + +proc queryRandom*(d: Protocol, enrField: (string, seq[byte])): + Future[seq[Node]] {.async.} = + ## Perform a query for a random target, return all nodes discovered which + ## contain enrField. + let nodes = await d.queryRandom() + var filtered: seq[Node] + for n in nodes: + if n.record.contains(enrField): + filtered.add(n) + + return filtered + +proc resolve*(d: Protocol, id: NodeId): Future[Option[Node]] {.async.} = + ## Resolve a `Node` based on provided `NodeId`. + ## + ## This will first look in the own routing table. If the node is known, it + ## will try to contact if for newer information. If node is not known or it + ## does not reply, a lookup is done to see if it can find a (newer) record of + ## the node on the network. + if id == d.localNode.id: + return some(d.localNode) + + let node = d.getNode(id) + if node.isSome(): + let request = await d.findNode(node.get(), @[0'u16]) + + # TODO: Handle failures better. E.g. stop on different failures than timeout + if request.isOk() and request[].len > 0: + return some(request[][0]) + + let discovered = await d.lookup(id) + for n in discovered: + if n.id == id: + if node.isSome() and node.get().record.seqNum >= n.record.seqNum: + return node + else: + return some(n) + + return node + +proc seedTable*(d: Protocol) = + ## Seed the table with known nodes. + for record in d.bootstrapRecords: + if d.addNode(record): + debug "Added bootstrap node", uri = toURI(record) + else: + debug "Bootstrap node could not be added", uri = toURI(record) + + # TODO: + # Persistent stored nodes could be added to seed from here + # See: https://github.com/status-im/nim-eth/issues/189 + +proc populateTable*(d: Protocol) {.async.} = + ## Do a set of initial lookups to quickly populate the table. + # start with a self target query (neighbour nodes) + let selfQuery = await d.query(d.localNode.id) + trace "Discovered nodes in self target query", nodes = selfQuery.len + + # `initialLookups` random queries + for i in 0.. n.record.seqNum: + # Request new ENR + let nodes = await d.findNode(n, @[0'u16]) + if nodes.isOk() and nodes[].len > 0: + discard d.addNode(nodes[][0]) + + # Get IP and port from pong message and add it to the ip votes + let a = Address(ip: ValidIpAddress.init(res.ip), port: Port(res.port)) + d.ipVote.insert(n.id, a) + +proc revalidateLoop(d: Protocol) {.async.} = + ## Loop which revalidates the nodes in the routing table by sending the ping + ## message. + try: + while true: + await sleepAsync(milliseconds(d.rng[].rand(revalidateMax))) + let n = d.routingTable.nodeToRevalidate() + if not n.isNil: + traceAsyncErrors d.revalidateNode(n) + except CancelledError: + trace "revalidateLoop canceled" + +proc refreshLoop(d: Protocol) {.async.} = + ## Loop that refreshes the routing table by starting a random query in case + ## no queries were done since `refreshInterval` or more. + ## It also refreshes the majority address voted for via pong responses. + try: + await d.populateTable() + + while true: + let currentTime = now(chronos.Moment) + if currentTime > (d.lastLookup + refreshInterval): + let randomQuery = await d.queryRandom() + trace "Discovered nodes in random target query", nodes = randomQuery.len + debug "Total nodes in discv5 routing table", total = d.routingTable.len() + + await sleepAsync(refreshInterval) + except CancelledError: + trace "refreshLoop canceled" + +proc ipMajorityLoop(d: Protocol) {.async.} = + ## When `enrAutoUpdate` is enabled, the IP:port combination returned + ## by the majority will be used to update the local ENR. + ## This should be safe as long as the routing table is not overwhelmed by + ## malicious nodes trying to provide invalid addresses. + ## Why is that? + ## - Only one vote per NodeId is counted, and they are removed over time. + ## - IP:port values are provided through the pong message. The local node + ## initiates this by first sending a ping message. Unsolicited pong messages + ## are ignored. + ## - At interval pings are send to the least recently contacted node (tail of + ## bucket) from a random bucket from the routing table. + ## - Only messages that our node initiates (ping, findnode, talkreq) and that + ## successfully get a response move a node to the head of the bucket. + ## Additionally, findNode requests have typically a randomness to it, as they + ## usually come from a query for random NodeId. + ## - Currently, when a peer fails the respond, it gets replaced. It doesn't + ## remain at the tail of the bucket. + ## - There are IP limits on the buckets and the whole routing table. + try: + while true: + let majority = d.ipVote.majority() + if majority.isSome(): + if d.localNode.address != majority: + let address = majority.get() + let previous = d.localNode.address + if d.enrAutoUpdate: + let res = d.localNode.update(d.privateKey, + ip = some(address.ip), udpPort = some(address.port)) + if res.isErr: + warn "Failed updating ENR with newly discovered external address", + majority, previous, error = res.error + else: + discovery_enr_auto_update.inc() + info "Updated ENR with newly discovered external address", + majority, previous, uri = toURI(d.localNode.record) + else: + warn "Discovered new external address but ENR auto update is off", + majority, previous + else: + debug "Discovered external address matches current address", majority, + current = d.localNode.address + + await sleepAsync(ipMajorityInterval) + except CancelledError: + trace "ipMajorityLoop canceled" + +func init*( + T: type DiscoveryConfig, + tableIpLimit: uint, + bucketIpLimit: uint, + bitsPerHop: int): T = + + DiscoveryConfig( + tableIpLimits: TableIpLimits( + tableIpLimit: tableIpLimit, + bucketIpLimit: bucketIpLimit), + bitsPerHop: bitsPerHop + ) + +proc newProtocol*( + privKey: PrivateKey, + enrIp: Option[ValidIpAddress], + enrTcpPort, enrUdpPort: Option[Port], + localEnrFields: openArray[(string, seq[byte])] = [], + bootstrapRecords: openArray[Record] = [], + previousRecord = none[enr.Record](), + bindPort: Port, + bindIp = IPv4_any(), + enrAutoUpdate = false, + config = defaultDiscoveryConfig, + rng = newRng()): + Protocol = + # TODO: Tried adding bindPort = udpPort as parameter but that gave + # "Error: internal error: environment misses: udpPort" in nim-beacon-chain. + # Anyhow, nim-beacon-chain would also require some changes to support port + # remapping through NAT and this API is also subject to change once we + # introduce support for ipv4 + ipv6 binding/listening. + let extraFields = mapIt(localEnrFields, toFieldPair(it[0], it[1])) + # TODO: + # - Defect as is now or return a result for enr errors? + # - In case incorrect key, allow for new enr based on new key (new node id)? + var record: Record + if previousRecord.isSome(): + record = previousRecord.get() + record.update(privKey, enrIp, enrTcpPort, enrUdpPort, + extraFields).expect("Record within size limits and correct key") + else: + record = enr.Record.init(1, privKey, enrIp, enrTcpPort, enrUdpPort, + extraFields).expect("Record within size limits") + + info "ENR initialized", ip = enrIp, tcp = enrTcpPort, udp = enrUdpPort, + seqNum = record.seqNum, uri = toURI(record) + if enrIp.isNone(): + if enrAutoUpdate: + notice "No external IP provided for the ENR, this node will not be " & + "discoverable until the ENR is updated with the discovered external IP address" + else: + warn "No external IP provided for the ENR, this node will not be discoverable" + + let node = newNode(record).expect("Properly initialized record") + + # TODO Consider whether this should be a Defect + doAssert rng != nil, "RNG initialization failed" + + Protocol( + privateKey: privKey, + localNode: node, + bindAddress: Address(ip: ValidIpAddress.init(bindIp), port: bindPort), + codec: Codec(localNode: node, privKey: privKey, + sessions: Sessions.init(256)), + bootstrapRecords: @bootstrapRecords, + ipVote: IpVote.init(), + enrAutoUpdate: enrAutoUpdate, + routingTable: RoutingTable.init( + node, config.bitsPerHop, config.tableIpLimits, rng), + rng: rng) + +template listeningAddress*(p: Protocol): Address = + p.bindAddress + +proc open*(d: Protocol) {.raises: [Defect, CatchableError].} = + info "Starting discovery node", node = d.localNode, + bindAddress = d.bindAddress + + # TODO allow binding to specific IP / IPv6 / etc + let ta = initTAddress(d.bindAddress.ip, d.bindAddress.port) + d.transp = newDatagramTransport(processClient, udata = d, local = ta) + + d.seedTable() + +proc start*(d: Protocol) = + d.refreshLoop = refreshLoop(d) + d.revalidateLoop = revalidateLoop(d) + d.ipMajorityLoop = ipMajorityLoop(d) + +proc close*(d: Protocol) = + doAssert(not d.transp.closed) + + debug "Closing discovery node", node = d.localNode + if not d.revalidateLoop.isNil: + d.revalidateLoop.cancel() + if not d.refreshLoop.isNil: + d.refreshLoop.cancel() + if not d.ipMajorityLoop.isNil: + d.ipMajorityLoop.cancel() + + d.transp.close() + +proc closeWait*(d: Protocol) {.async.} = + doAssert(not d.transp.closed) + + debug "Closing discovery node", node = d.localNode + if not d.revalidateLoop.isNil: + await d.revalidateLoop.cancelAndWait() + if not d.refreshLoop.isNil: + await d.refreshLoop.cancelAndWait() + if not d.ipMajorityLoop.isNil: + await d.ipMajorityLoop.cancelAndWait() + + await d.transp.closeWait() diff --git a/eth/p2p/discoveryv5/random2.nim b/eth/p2p/discoveryv5/random2.nim new file mode 100644 index 0000000..0ec72f0 --- /dev/null +++ b/eth/p2p/discoveryv5/random2.nim @@ -0,0 +1,22 @@ +import bearssl + +## Random helpers: similar as in stdlib, but with BrHmacDrbgContext rng +# TODO: Move these somewhere else? +const randMax = 18_446_744_073_709_551_615'u64 + +proc rand*(rng: var BrHmacDrbgContext, max: Natural): int = + if max == 0: return 0 + + var x: uint64 + while true: + brHmacDrbgGenerate(addr rng, addr x, csize_t(sizeof(x))) + if x < randMax - (randMax mod (uint64(max) + 1'u64)): # against modulo bias + return int(x mod (uint64(max) + 1'u64)) + +proc sample*[T](rng: var BrHmacDrbgContext, a: openArray[T]): T = + result = a[rng.rand(a.high)] + +proc shuffle*[T](rng: var BrHmacDrbgContext, a: var openArray[T]) = + for i in countdown(a.high, 1): + let j = rng.rand(i) + swap(a[i], a[j]) diff --git a/eth/p2p/discoveryv5/routing_table.nim b/eth/p2p/discoveryv5/routing_table.nim new file mode 100644 index 0000000..136ad39 --- /dev/null +++ b/eth/p2p/discoveryv5/routing_table.nim @@ -0,0 +1,544 @@ +# nim-eth - Node Discovery Protocol v5 +# Copyright (c) 2020-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + +import + std/[algorithm, times, sequtils, bitops, sets, options], + stint, chronicles, metrics, bearssl, chronos, stew/shims/net as stewNet, + ../../net/utils, + "."/[node, random2, enr] + +export options + +declarePublicGauge routing_table_nodes, + "Discovery routing table nodes", labels = ["state"] + +type + DistanceProc* = proc(a, b: NodeId): NodeId {.raises: [Defect], gcsafe, noSideEffect.} + LogDistanceProc* = proc(a, b: NodeId): uint16 {.raises: [Defect], gcsafe, noSideEffect.} + IdAtDistanceProc* = proc (id: NodeId, dist: uint16): NodeId {.raises: [Defect], gcsafe, noSideEffect.} + + DistanceCalculator* = object + calculateDistance*: DistanceProc + calculateLogDistance*: LogDistanceProc + calculateIdAtDistance*: IdAtDistanceProc + + RoutingTable* = object + localNode*: Node + buckets*: seq[KBucket] + bitsPerHop: int ## This value indicates how many bits (at minimum) you get + ## closer to finding your target per query. Practically, it tells you also + ## how often your "not in range" branch will split off. Setting this to 1 + ## is the basic, non accelerated version, which will never split off the + ## not in range branch and which will result in log base2 n hops per lookup. + ## Setting it higher will increase the amount of splitting on a not in range + ## branch (thus holding more nodes with a better keyspace coverage) and this + ## will result in an improvement of log base(2^b) n hops per lookup. + ipLimits: IpLimits ## IP limits for total routing table: all buckets and + ## replacement caches. + distanceCalculator: DistanceCalculator + rng: ref BrHmacDrbgContext + + KBucket = ref object + istart, iend: NodeId ## Range of NodeIds this KBucket covers. This is not a + ## simple logarithmic distance as buckets can be split over a prefix that + ## does not cover the `localNode` id. + nodes*: seq[Node] ## Node entries of the KBucket. Sorted according to last + ## time seen. First entry (head) is considered the most recently seen node + ## and the last entry (tail) is considered the least recently seen node. + ## Here "seen" means a successful request-response. This can also not have + ## occured yet. + replacementCache: seq[Node] ## Nodes that could not be added to the `nodes` + ## seq as it is full and without stale nodes. This is practically a small + ## LRU cache. + ipLimits: IpLimits ## IP limits for bucket: node entries and replacement + ## cache entries combined. + + ## The routing table IP limits are applied on both the total table, and on the + ## individual buckets. In each case, the active node entries, but also the + ## entries waiting in the replacement cache are accounted for. This way, the + ## replacement cache can't get filled with nodes that then can't be added due + ## to the limits that apply. + ## + ## As entries are not verified (=contacted) immediately before or on entry, it + ## is possible that a malicious node could fill (poison) the routing table or + ## a specific bucket with ENRs with IPs it does not control. The effect of + ## this would be that a node that actually owns the IP could have a difficult + ## time getting its ENR distrubuted in the DHT and as a consequence would + ## not be reached from the outside as much (or at all). However, that node can + ## still search and find nodes to connect to. So it would practically be a + ## similar situation as a node that is not reachable behind the NAT because + ## port mapping is not set up properly. + ## There is the possiblity to set the IP limit on verified (=contacted) nodes + ## only, but that would allow for lookups to be done on a higher set of nodes + ## owned by the same identity. This is a worse alternative. + ## Next, doing lookups only on verified nodes would slow down discovery start + ## up. + TableIpLimits* = object + tableIpLimit*: uint + bucketIpLimit*: uint + + NodeStatus* = enum + Added + LocalNode + Existing + IpLimitReached + ReplacementAdded + ReplacementExisting + NoAddress + +# xor distance functions +func distance*(a, b: NodeId): UInt256 = + ## Calculate the distance to a NodeId. + a xor b + +func logDistance*(a, b: NodeId): uint16 = + ## Calculate the logarithmic distance between two `NodeId`s. + ## + ## According the specification, this is the log base 2 of the distance. But it + ## is rather the log base 2 of the distance + 1, as else the 0 value can not + ## be used (e.g. by FindNode call to return peer its own ENR) + ## For NodeId of 256 bits, range is 0-256. + let a = a.toBytesBE + let b = b.toBytesBE + var lz = 0 + for i in 0.. int: + cmp(a.iend, b) + + # Prevent cases where `lowerBound` returns an out of range index e.g. at empty + # openArray, or when the id is out range for all buckets in the openArray. + if bucketPos < buckets.len: + let bucket = buckets[bucketPos] + if bucket.istart <= id and id <= bucket.iend: + result = bucket + +proc computeSharedPrefixBits(nodes: openArray[NodeId]): int = + ## Count the number of prefix bits shared by all nodes. + if nodes.len < 2: + return ID_SIZE + + var mask = zero(UInt256) + let one = one(UInt256) + + for i in 1 .. ID_SIZE: + mask = mask or (one shl (ID_SIZE - i)) + let reference = nodes[0] and mask + for j in 1 .. nodes.high: + if (nodes[j] and mask) != reference: return i - 1 + + for n in nodes: + echo n.toHex() + + # Reaching this would mean that all node ids are equal. + doAssert(false, "Unable to calculate number of shared prefix bits") + +proc init*(T: type RoutingTable, localNode: Node, bitsPerHop = DefaultBitsPerHop, + ipLimits = DefaultTableIpLimits, rng: ref BrHmacDrbgContext, + distanceCalculator = XorDistanceCalculator): T = + ## Initialize the routing table for provided `Node` and bitsPerHop value. + ## `bitsPerHop` is default set to 5 as recommended by original Kademlia paper. + RoutingTable( + localNode: localNode, + buckets: @[KBucket.new(0.u256, high(UInt256), ipLimits.bucketIpLimit)], + bitsPerHop: bitsPerHop, + ipLimits: IpLimits(limit: ipLimits.tableIpLimit), + distanceCalculator: distanceCalculator, + rng: rng) + +proc splitBucket(r: var RoutingTable, index: int) = + let bucket = r.buckets[index] + let (a, b) = bucket.split() + r.buckets[index] = a + r.buckets.insert(b, index + 1) + +proc bucketForNode(r: RoutingTable, id: NodeId): KBucket = + result = binaryGetBucketForNode(r.buckets, id) + doAssert(not result.isNil(), + "Routing table should always cover the full id space") + +proc addReplacement(r: var RoutingTable, k: KBucket, n: Node): NodeStatus = + ## Add the node to the tail of the replacement cache of the KBucket. + ## + ## If the replacement cache is full, the oldest (first entry) node will be + ## removed. If the node is already in the replacement cache, it will be moved + ## to the tail. + ## When the IP of the node has reached the IP limits for the bucket or the + ## total routing table, the node will not be added to the replacement cache. + let nodeIdx = k.replacementCache.find(n) + if nodeIdx != -1: + if k.replacementCache[nodeIdx].record.seqNum <= n.record.seqNum: + # In case the record sequence number is higher or the same, the new node + # gets moved to the tail. + if k.replacementCache[nodeIdx].address.get().ip != n.address.get().ip: + if not ipLimitInc(r, k, n): + return IpLimitReached + ipLimitDec(r, k, k.replacementCache[nodeIdx]) + k.replacementCache.delete(nodeIdx) + k.replacementCache.add(n) + return ReplacementExisting + elif not ipLimitInc(r, k, n): + return IpLimitReached + else: + doAssert(k.replacementCache.len <= REPLACEMENT_CACHE_SIZE) + + if k.replacementCache.len == REPLACEMENT_CACHE_SIZE: + # Remove ip from limits for the to be deleted node. + ipLimitDec(r, k, k.replacementCache[0]) + k.replacementCache.delete(0) + + k.replacementCache.add(n) + return ReplacementAdded + +proc addNode*(r: var RoutingTable, n: Node): NodeStatus = + ## Try to add the node to the routing table. + ## + ## First, an attempt will be done to add the node to the bucket in its range. + ## If this fails, the bucket will be split if it is eligable for splitting. + ## If so, a new attempt will be done to add the node. If not, the node will be + ## added to the replacement cache. + ## + ## In case the node was already in the table, it will be updated if it has a + ## newer record. + ## When the IP of the node has reached the IP limits for the bucket or the + ## total routing table, the node will not be added to the bucket, nor its + ## replacement cache. + + # Don't allow nodes without an address field in the ENR to be added. + # This could also be reworked by having another Node type that always has an + # address. + if n.address.isNone(): + return NoAddress + + if n == r.localNode: + return LocalNode + + let bucket = r.bucketForNode(n.id) + + ## Check if the node is already present. If so, check if the record requires + ## updating. + let nodeIdx = bucket.nodes.find(n) + if nodeIdx != -1: + if bucket.nodes[nodeIdx].record.seqNum < n.record.seqNum: + # In case of a newer record, it gets replaced. + if bucket.nodes[nodeIdx].address.get().ip != n.address.get().ip: + if not ipLimitInc(r, bucket, n): + return IpLimitReached + ipLimitDec(r, bucket, bucket.nodes[nodeIdx]) + # Copy over the seen status, we trust here that after the ENR update the + # node will still be reachable, but it might not be the case. + n.seen = bucket.nodes[nodeIdx].seen + bucket.nodes[nodeIdx] = n + + return Existing + + # If the bucket has fewer than `BUCKET_SIZE` entries, it is inserted as the + # last entry of the bucket (least recently seen node). If the bucket is + # full, it might get split and adding is retried, else it is added as a + # replacement. + # Reasoning here is that adding nodes will happen for a big part from + # lookups, which do not necessarily return nodes that are (still) reachable. + # So, more trust is put in the own ordering by actually contacting peers and + # newly additions are added as least recently seen (in fact they have not been + # seen yet from our node its perspective). + # However, in discovery v5 a node can also be added after a incoming request + # if a handshake is done and an ENR is provided, and considering that this + # handshake needs to be done, it is more likely that this node is reachable. + # However, it is not certain and depending on different NAT mechanisms and + # timers it might still fail. For this reason we currently do not add a way to + # immediately add nodes to the most recently seen spot. + if bucket.len < BUCKET_SIZE: + if not ipLimitInc(r, bucket, n): + return IpLimitReached + + bucket.add(n) + else: + # Bucket must be full, but lets see if it should be split the bucket. + + # Calculate the prefix shared by all nodes in the bucket's range, not the + # ones actually in the bucket. + let depth = computeSharedPrefixBits(@[bucket.istart, bucket.iend]) + # Split if the bucket has the local node in its range or if the depth is not + # congruent to 0 mod `bitsPerHop` + if bucket.inRange(r.localNode) or + (depth mod r.bitsPerHop != 0 and depth != ID_SIZE): + r.splitBucket(r.buckets.find(bucket)) + return r.addNode(n) # retry adding + else: + # When bucket doesn't get split the node is added to the replacement cache + return r.addReplacement(bucket, n) + +proc removeNode*(r: var RoutingTable, n: Node) = + ## Remove the node `n` from the routing table. + let b = r.bucketForNode(n.id) + if b.remove(n): + ipLimitDec(r, b, n) + +proc replaceNode*(r: var RoutingTable, n: Node) = + ## Replace node `n` with last entry in the replacement cache. If there are + ## no entries in the replacement cache, node `n` will simply be removed. + # TODO: Kademlia paper recommends here to not remove nodes if there are no + # replacements. However, that would require a bit more complexity in the + # revalidation as you don't want to try pinging that node all the time. + let b = r.bucketForNode(n.id) + if b.remove(n): + ipLimitDec(r, b, n) + + if b.replacementCache.len > 0: + # Nodes in the replacement cache are already included in the ip limits. + b.add(b.replacementCache[high(b.replacementCache)]) + b.replacementCache.delete(high(b.replacementCache)) + +proc getNode*(r: RoutingTable, id: NodeId): Option[Node] = + ## Get the `Node` with `id` as `NodeId` from the routing table. + ## If no node with provided node id can be found,`none` is returned . + let b = r.bucketForNode(id) + for n in b.nodes: + if n.id == id: + return some(n) + +proc contains*(r: RoutingTable, n: Node): bool = n in r.bucketForNode(n.id) + # Check if the routing table contains node `n`. + +proc bucketsByDistanceTo(r: RoutingTable, id: NodeId): seq[KBucket] = + sortedByIt(r.buckets, r.distance(it.midpoint, id)) + +proc nodesByDistanceTo(r: RoutingTable, k: KBucket, id: NodeId): seq[Node] = + sortedByIt(k.nodes, r.distance(it.id, id)) + +proc neighbours*(r: RoutingTable, id: NodeId, k: int = BUCKET_SIZE, + seenOnly = false): seq[Node] = + ## Return up to k neighbours of the given node id. + ## When seenOnly is set to true, only nodes that have been contacted + ## previously successfully will be selected. + result = newSeqOfCap[Node](k * 2) + block addNodes: + for bucket in r.bucketsByDistanceTo(id): + for n in r.nodesByDistanceTo(bucket, id): + # Only provide actively seen nodes when `seenOnly` set. + if not seenOnly or n.seen: + result.add(n) + if result.len == k * 2: + break addNodes + + # TODO: is this sort still needed? Can we get nodes closer from the "next" + # bucket? + result = sortedByIt(result, r.distance(it.id, id)) + if result.len > k: + result.setLen(k) + +proc neighboursAtDistance*(r: RoutingTable, distance: uint16, + k: int = BUCKET_SIZE, seenOnly = false): seq[Node] = + ## Return up to k neighbours at given logarithmic distance. + result = r.neighbours(r.idAtDistance(r.localNode.id, distance), k, seenOnly) + # This is a bit silly, first getting closest nodes then to only keep the ones + # that are exactly the requested distance. + keepIf(result, proc(n: Node): bool = r.logDistance(n.id, r.localNode.id) == distance) + +proc neighboursAtDistances*(r: RoutingTable, distances: seq[uint16], + k: int = BUCKET_SIZE, seenOnly = false): seq[Node] = + ## Return up to k neighbours at given logarithmic distances. + # TODO: This will currently return nodes with neighbouring distances on the + # first one prioritize. It might end up not including all the node distances + # requested. Need to rework the logic here and not use the neighbours call. + if distances.len > 0: + result = r.neighbours(r.idAtDistance(r.localNode.id, distances[0]), k, + seenOnly) + # This is a bit silly, first getting closest nodes then to only keep the ones + # that are exactly the requested distances. + keepIf(result, proc(n: Node): bool = + distances.contains(r.logDistance(n.id, r.localNode.id))) + +proc len*(r: RoutingTable): int = + for b in r.buckets: result += b.len + +proc moveRight[T](arr: var openArray[T], a, b: int) = + ## In `arr` move elements in range [a, b] right by 1. + var t: T + shallowCopy(t, arr[b + 1]) + for i in countdown(b, a): + shallowCopy(arr[i + 1], arr[i]) + shallowCopy(arr[a], t) + +proc setJustSeen*(r: RoutingTable, n: Node) = + ## Move `n` to the head (most recently seen) of its bucket. + ## If `n` is not in the routing table, do nothing. + let b = r.bucketForNode(n.id) + let idx = b.nodes.find(n) + if idx >= 0: + if idx != 0: + b.nodes.moveRight(0, idx - 1) + + if not n.seen: + b.nodes[0].seen = true + routing_table_nodes.inc(labelValues = ["seen"]) + +proc nodeToRevalidate*(r: RoutingTable): Node = + ## Return a node to revalidate. The least recently seen node from a random + ## bucket is selected. + var buckets = r.buckets + r.rng[].shuffle(buckets) + # TODO: Should we prioritize less-recently-updated buckets instead? Could + # store a `now` Moment at setJustSeen or at revalidate per bucket. + for b in buckets: + if b.len > 0: + return b.nodes[^1] + +proc randomNodes*(r: RoutingTable, maxAmount: int, + pred: proc(x: Node): bool {.gcsafe, noSideEffect.} = nil): seq[Node] = + ## Get a `maxAmount` of random nodes from the routing table with the `pred` + ## predicate function applied as filter on the nodes selected. + var maxAmount = maxAmount + let sz = r.len + if maxAmount > sz: + debug "Less peers in routing table than maximum requested", + requested = maxAmount, present = sz + maxAmount = sz + + result = newSeqOfCap[Node](maxAmount) + var seen = initHashSet[Node]() + + # This is a rather inefficient way of randomizing nodes from all buckets, but even if we + # iterate over all nodes in the routing table, the time it takes would still be + # insignificant compared to the time it takes for the network roundtrips when connecting + # to nodes. + # However, "time it takes" might not be relevant, as there might be no point + # in providing more `randomNodes` as the routing table might not have anything + # new to provide. And there is no way for the calling code to know this. So + # while it will take less total time compared to e.g. an (async) + # randomLookup, the time might be wasted as all nodes are possibly seen + # already. + while len(seen) < maxAmount: + let bucket = r.rng[].sample(r.buckets) + if bucket.nodes.len != 0: + let node = r.rng[].sample(bucket.nodes) + if node notin seen: + seen.incl(node) + if pred.isNil() or node.pred: + result.add(node) diff --git a/eth/p2p/discoveryv5/sessions.nim b/eth/p2p/discoveryv5/sessions.nim new file mode 100644 index 0000000..5b52d17 --- /dev/null +++ b/eth/p2p/discoveryv5/sessions.nim @@ -0,0 +1,62 @@ +# nim-eth - Node Discovery Protocol v5 +# Copyright (c) 2020-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. +# +## Session cache as mentioned at +## https://github.com/ethereum/devp2p/blob/master/discv5/discv5-theory.md#session-cache +## + +{.push raises: [Defect].} + +import + std/options, + stint, stew/endians2, stew/shims/net, + node, lru + +export lru + +const + aesKeySize* = 128 div 8 + keySize = sizeof(NodeId) + + 16 + # max size of ip address (ipv6) + 2 # Sizeof port + +type + AesKey* = array[aesKeySize, byte] + SessionKey* = array[keySize, byte] + SessionValue* = array[sizeof(AesKey) + sizeof(AesKey), byte] + Sessions* = LRUCache[SessionKey, SessionValue] + +func makeKey(id: NodeId, address: Address): SessionKey = + var pos = 0 + result[pos ..< pos+sizeof(id)] = toBytes(id) + pos.inc(sizeof(id)) + case address.ip.family + of IpAddressFamily.IpV4: + result[pos ..< pos+sizeof(address.ip.address_v4)] = address.ip.address_v4 + of IpAddressFamily.IpV6: + result[pos ..< pos+sizeof(address.ip.address_v6)] = address.ip.address_v6 + pos.inc(sizeof(address.ip.address_v6)) + result[pos ..< pos+sizeof(address.port)] = toBytes(address.port.uint16) + +func store*(s: var Sessions, id: NodeId, address: Address, r, w: AesKey) = + var value: array[sizeof(r) + sizeof(w), byte] + value[0 .. 15] = r + value[16 .. ^1] = w + s.put(makeKey(id, address), value) + +func load*(s: var Sessions, id: NodeId, address: Address, r, w: var AesKey): bool = + let res = s.get(makeKey(id, address)) + if res.isSome(): + let val = res.get() + copyMem(addr r[0], unsafeAddr val[0], sizeof(r)) + copyMem(addr w[0], unsafeAddr val[sizeof(r)], sizeof(w)) + return true + else: + return false + +func del*(s: var Sessions, id: NodeId, address: Address) = + s.del(makeKey(id, address)) diff --git a/tests/p2p/discv5_test_helper.nim b/tests/p2p/discv5_test_helper.nim new file mode 100644 index 0000000..397bed5 --- /dev/null +++ b/tests/p2p/discv5_test_helper.nim @@ -0,0 +1,87 @@ +import + stew/shims/net, bearssl, chronos, + ../../eth/keys, + ../../eth/p2p/discoveryv5/[enr, node, routing_table], + ../../eth/p2p/discoveryv5/protocol as discv5_protocol + +export net + +proc localAddress*(port: int): Address = + Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(port)) + +proc initDiscoveryNode*( + rng: ref BrHmacDrbgContext, + privKey: PrivateKey, + address: Address, + bootstrapRecords: openArray[Record] = [], + localEnrFields: openArray[(string, seq[byte])] = [], + previousRecord = none[enr.Record]()): + discv5_protocol.Protocol = + # set bucketIpLimit to allow bucket split + let config = DiscoveryConfig.init(1000, 24, 5) + + let protocol = newProtocol( + privKey, + some(address.ip), + some(address.port), some(address.port), + bindPort = address.port, + bootstrapRecords = bootstrapRecords, + localEnrFields = localEnrFields, + previousRecord = previousRecord, + config = config, + rng = rng) + + protocol.open() + + protocol + +proc nodeIdInNodes*(id: NodeId, nodes: openArray[Node]): bool = + for n in nodes: + if id == n.id: return true + +proc generateNode*(privKey: PrivateKey, port: int = 20302, + ip: ValidIpAddress = ValidIpAddress.init("127.0.0.1"), + localEnrFields: openArray[FieldPair] = []): Node = + let port = Port(port) + let enr = enr.Record.init(1, privKey, some(ip), + some(port), some(port), localEnrFields).expect("Properly intialized private key") + result = newNode(enr).expect("Properly initialized node") + +proc generateNRandomNodes*(rng: ref BrHmacDrbgContext, n: int): seq[Node] = + var res = newSeq[Node]() + for i in 1..n: + let node = generateNode(PrivateKey.random(rng[])) + res.add(node) + res + +proc nodeAndPrivKeyAtDistance*(n: Node, rng: var BrHmacDrbgContext, d: uint32, + ip: ValidIpAddress = ValidIpAddress.init("127.0.0.1")): (Node, PrivateKey) = + while true: + let pk = PrivateKey.random(rng) + let node = generateNode(pk, ip = ip) + if logDistance(n.id, node.id) == d: + return (node, pk) + +proc nodeAtDistance*(n: Node, rng: var BrHmacDrbgContext, d: uint32, + ip: ValidIpAddress = ValidIpAddress.init("127.0.0.1")): Node = + let (node, _) = n.nodeAndPrivKeyAtDistance(rng, d, ip) + node + +proc nodesAtDistance*( + n: Node, rng: var BrHmacDrbgContext, d: uint32, amount: int, + ip: ValidIpAddress = ValidIpAddress.init("127.0.0.1")): seq[Node] = + for i in 0.. encode + privKeyB = PrivateKey.fromHex(nodeBKey)[] # receive -> decode + + enrRecA = enr.Record.init(1, privKeyA, + some(ValidIpAddress.init("127.0.0.1")), some(Port(9000)), + some(Port(9000))).expect("Properly intialized private key") + nodeA = newNode(enrRecA).expect("Properly initialized record") + + enrRecB = enr.Record.init(1, privKeyB, + some(ValidIpAddress.init("127.0.0.1")), some(Port(9000)), + some(Port(9000))).expect("Properly intialized private key") + nodeB = newNode(enrRecB).expect("Properly initialized record") + + var + codecA {.used.} = Codec(localNode: nodeA, privKey: privKeyA, + sessions: Sessions.init(5)) + codecB = Codec(localNode: nodeB, privKey: privKeyB, + sessions: Sessions.init(5)) + + test "Ping Ordinary Message Packet": + const + readKey = "0x00000000000000000000000000000000" + pingReqId = "0x00000001" + pingEnrSeq = 2'u64 + + encodedPacket = + "00000000000000000000000000000000088b3d4342774649325f313964a39e55" & + "ea96c005ad52be8c7560413a7008f16c9e6d2f43bbea8814a546b7409ce783d3" & + "4c4f53245d08dab84102ed931f66d1492acb308fa1c6715b9d139b81acbdcc" + + let dummyKey = "0x00000000000000000000000000000001" # of no importance + codecA.sessions.store(nodeB.id, nodeB.address.get(), + hexToByteArray[aesKeySize](dummyKey), hexToByteArray[aesKeySize](readKey)) + codecB.sessions.store(nodeA.id, nodeA.address.get(), + hexToByteArray[aesKeySize](readKey), hexToByteArray[aesKeySize](dummyKey)) + + let decoded = codecB.decodePacket(nodeA.address.get(), + hexToSeqByte(encodedPacket)) + check: + decoded.isOk() + decoded.get().messageOpt.isSome() + decoded.get().messageOpt.get().reqId.id == hexToSeqByte(pingReqId) + decoded.get().messageOpt.get().kind == ping + decoded.get().messageOpt.get().ping.enrSeq == pingEnrSeq + + test "Whoareyou Packet": + const + whoareyouChallengeData = "0x000000000000000000000000000000006469736376350001010102030405060708090a0b0c00180102030405060708090a0b0c0d0e0f100000000000000000" + whoareyouRequestNonce = "0x0102030405060708090a0b0c" + whoareyouIdNonce = "0x0102030405060708090a0b0c0d0e0f10" + whoareyouEnrSeq = 0 + + encodedPacket = + "00000000000000000000000000000000088b3d434277464933a1ccc59f5967ad" & + "1d6035f15e528627dde75cd68292f9e6c27d6b66c8100a873fcbaed4e16b8d" + + let decoded = codecB.decodePacket(nodeA.address.get(), + hexToSeqByte(encodedPacket)) + + check: + decoded.isOk() + decoded.get().flag == Flag.Whoareyou + decoded.get().whoareyou.requestNonce == hexToByteArray[gcmNonceSize](whoareyouRequestNonce) + decoded.get().whoareyou.idNonce == hexToByteArray[idNonceSize](whoareyouIdNonce) + decoded.get().whoareyou.recordSeq == whoareyouEnrSeq + decoded.get().whoareyou.challengeData == hexToSeqByte(whoareyouChallengeData) + + codecB.decodePacket(nodeA.address.get(), + hexToSeqByte(encodedPacket & "00")).isErr() + + test "Ping Handshake Message Packet": + const + pingReqId = "0x00000001" + pingEnrSeq = 1'u64 + # + # handshake inputs: + # + whoareyouChallengeData = "0x000000000000000000000000000000006469736376350001010102030405060708090a0b0c00180102030405060708090a0b0c0d0e0f100000000000000001" + whoareyouRequestNonce = "0x0102030405060708090a0b0c" + whoareyouIdNonce = "0x0102030405060708090a0b0c0d0e0f10" + whoareyouEnrSeq = 1'u64 + + encodedPacket = + "00000000000000000000000000000000088b3d4342774649305f313964a39e55" & + "ea96c005ad521d8c7560413a7008f16c9e6d2f43bbea8814a546b7409ce783d3" & + "4c4f53245d08da4bb252012b2cba3f4f374a90a75cff91f142fa9be3e0a5f3ef" & + "268ccb9065aeecfd67a999e7fdc137e062b2ec4a0eb92947f0d9a74bfbf44dfb" & + "a776b21301f8b65efd5796706adff216ab862a9186875f9494150c4ae06fa4d1" & + "f0396c93f215fa4ef524f1eadf5f0f4126b79336671cbcf7a885b1f8bd2a5d83" & + "9cf8" + + let + whoareyouData = WhoareyouData( + requestNonce: hexToByteArray[gcmNonceSize](whoareyouRequestNonce), + idNonce: hexToByteArray[idNonceSize](whoareyouIdNonce), + recordSeq: whoareyouEnrSeq, + challengeData: hexToSeqByte(whoareyouChallengeData)) + pubkey = some(privKeyA.toPublicKey()) + challenge = Challenge(whoareyouData: whoareyouData, pubkey: pubkey) + key = HandshakeKey(nodeId: nodeA.id, address: nodeA.address.get()) + + check: not codecB.handshakes.hasKeyOrPut(key, challenge) + + let decoded = codecB.decodePacket(nodeA.address.get(), + hexToSeqByte(encodedPacket)) + + check: + decoded.isOk() + decoded.get().message.reqId.id == hexToSeqByte(pingReqId) + decoded.get().message.kind == ping + decoded.get().message.ping.enrSeq == pingEnrSeq + decoded.get().node.isNone() + + codecB.decodePacket(nodeA.address.get(), + hexToSeqByte(encodedPacket & "00")).isErr() + + test "Ping Handshake Message Packet with ENR": + const + pingReqId = "0x00000001" + pingEnrSeq = 1'u64 + # + # handshake inputs: + # + whoareyouChallengeData = "0x000000000000000000000000000000006469736376350001010102030405060708090a0b0c00180102030405060708090a0b0c0d0e0f100000000000000000" + whoareyouRequestNonce = "0x0102030405060708090a0b0c" + whoareyouIdNonce = "0x0102030405060708090a0b0c0d0e0f10" + whoareyouEnrSeq = 0'u64 + + encodedPacket = + "00000000000000000000000000000000088b3d4342774649305f313964a39e55" & + "ea96c005ad539c8c7560413a7008f16c9e6d2f43bbea8814a546b7409ce783d3" & + "4c4f53245d08da4bb23698868350aaad22e3ab8dd034f548a1c43cd246be9856" & + "2fafa0a1fa86d8e7a3b95ae78cc2b988ded6a5b59eb83ad58097252188b902b2" & + "1481e30e5e285f19735796706adff216ab862a9186875f9494150c4ae06fa4d1" & + "f0396c93f215fa4ef524e0ed04c3c21e39b1868e1ca8105e585ec17315e755e6" & + "cfc4dd6cb7fd8e1a1f55e49b4b5eb024221482105346f3c82b15fdaae36a3bb1" & + "2a494683b4a3c7f2ae41306252fed84785e2bbff3b022812d0882f06978df84a" & + "80d443972213342d04b9048fc3b1d5fcb1df0f822152eced6da4d3f6df27e70e" & + "4539717307a0208cd208d65093ccab5aa596a34d7511401987662d8cf62b1394" & + "71" + + let + whoareyouData = WhoareyouData( + requestNonce: hexToByteArray[gcmNonceSize](whoareyouRequestNonce), + idNonce: hexToByteArray[idNonceSize](whoareyouIdNonce), + recordSeq: whoareyouEnrSeq, + challengeData: hexToSeqByte(whoareyouChallengeData)) + pubkey = none(PublicKey) + challenge = Challenge(whoareyouData: whoareyouData, pubkey: pubkey) + key = HandshakeKey(nodeId: nodeA.id, address: nodeA.address.get()) + + check: not codecB.handshakes.hasKeyOrPut(key, challenge) + + let decoded = codecB.decodePacket(nodeA.address.get(), + hexToSeqByte(encodedPacket)) + + check: + decoded.isOk() + decoded.get().message.reqId.id == hexToSeqByte(pingReqId) + decoded.get().message.kind == ping + decoded.get().message.ping.enrSeq == pingEnrSeq + decoded.get().node.isSome() + + codecB.decodePacket(nodeA.address.get(), + hexToSeqByte(encodedPacket & "00")).isErr() + +suite "Discovery v5.1 Additional Encode/Decode": + test "Encryption/Decryption": + let + encryptionKey = hexToByteArray[aesKeySize]("0x9f2d77db7004bf8a1a85107ac686990b") + nonce = hexToByteArray[gcmNonceSize]("0x27b5af763c446acd2749fe8e") + ad = hexToByteArray[32]("0x93a7400fa0d6a694ebc24d5cf570f65d04215b6ac00757875e3f3a5f42107903") + pt = hexToSeqByte("0xa1") + + let + ct = encryptGCM(encryptionKey, nonce, pt, ad) + decrypted = decryptGCM(encryptionKey, nonce, ct, ad) + + check decrypted.get() == pt + + test "Decryption": + let + encryptionKey = hexToByteArray[aesKeySize]("0x9f2d77db7004bf8a1a85107ac686990b") + nonce = hexToByteArray[gcmNonceSize]("0x27b5af763c446acd2749fe8e") + ad = hexToByteArray[32]("0x93a7400fa0d6a694ebc24d5cf570f65d04215b6ac00757875e3f3a5f42107903") + pt = hexToSeqByte("0x01c20101") + ct = hexToSeqByte("0xa5d12a2d94b8ccb3ba55558229867dc13bfa3648") + + # valid case + check decryptGCM(encryptionKey, nonce, ct, ad).get() == pt + + # invalid tag/data sizes + var invalidCipher: seq[byte] = @[] + check decryptGCM(encryptionKey, nonce, invalidCipher, ad).isNone() + + invalidCipher = repeat(byte(4), gcmTagSize) + check decryptGCM(encryptionKey, nonce, invalidCipher, ad).isNone() + + # invalid tag/data itself + invalidCipher = repeat(byte(4), gcmTagSize + 1) + check decryptGCM(encryptionKey, nonce, invalidCipher, ad).isNone() + + test "Encrypt / Decrypt header": + var nonce: AESGCMNonce + brHmacDrbgGenerate(rng[], nonce) + let + privKey = PrivateKey.random(rng[]) + nodeId = privKey.toPublicKey().toNodeId() + authdata = newSeq[byte](32) + staticHeader = encodeStaticHeader(Flag.OrdinaryMessage, nonce, + authdata.len()) + header = staticHeader & authdata + + var iv: array[128 div 8, byte] + brHmacDrbgGenerate(rng[], iv) + + let + encrypted = encryptHeader(nodeId, iv, header) + decoded = decodeHeader(nodeId, iv, encrypted) + + check decoded.isOk() + + setup: + let + privKeyA = PrivateKey.random(rng[]) # sender -> encode + privKeyB = PrivateKey.random(rng[]) # receiver -> decode + + enrRecA = enr.Record.init(1, privKeyA, + some(ValidIpAddress.init("127.0.0.1")), some(Port(9000)), + some(Port(9000))).expect("Properly intialized private key") + nodeA = newNode(enrRecA).expect("Properly initialized record") + + enrRecB = enr.Record.init(1, privKeyB, + some(ValidIpAddress.init("127.0.0.1")), some(Port(9000)), + some(Port(9000))).expect("Properly intialized private key") + nodeB = newNode(enrRecB).expect("Properly initialized record") + + var + codecA = Codec(localNode: nodeA, privKey: privKeyA, sessions: Sessions.init(5)) + codecB = Codec(localNode: nodeB, privKey: privKeyB, sessions: Sessions.init(5)) + + test "Encode / Decode Ordinary Random Message Packet": + let + m = PingMessage(enrSeq: 0) + reqId = RequestId.init(rng[]) + message = encodeMessage(m, reqId) + + let (data, nonce) = encodeMessagePacket(rng[], codecA, nodeB.id, + nodeB.address.get(), message) + + let decoded = codecB.decodePacket(nodeA.address.get(), data) + check: + decoded.isOk() + decoded[].flag == OrdinaryMessage + decoded[].messageOpt.isNone() + decoded[].requestNonce == nonce + + test "Encode / Decode Whoareyou Packet": + var requestNonce: AESGCMNonce + brHmacDrbgGenerate(rng[], requestNonce) + let recordSeq = 0'u64 + + let data = encodeWhoareyouPacket(rng[], codecA, nodeB.id, + nodeB.address.get(), requestNonce, recordSeq, none(PublicKey)) + + let decoded = codecB.decodePacket(nodeA.address.get(), data) + + let key = HandshakeKey(nodeId: nodeB.id, address: nodeB.address.get()) + var challenge: Challenge + + check: + codecA.handshakes.pop(key, challenge) + decoded.isOk() + decoded[].flag == Flag.Whoareyou + decoded[].whoareyou.requestNonce == requestNonce + decoded[].whoareyou.idNonce == challenge.whoareyouData.idNonce + decoded[].whoareyou.recordSeq == recordSeq + + test "Encode / Decode Handshake Message Packet": + var requestNonce: AESGCMNonce + brHmacDrbgGenerate(rng[], requestNonce) + let + recordSeq = 1'u64 + m = PingMessage(enrSeq: 0) + reqId = RequestId.init(rng[]) + message = encodeMessage(m, reqId) + pubkey = some(privKeyA.toPublicKey()) + + # Encode/decode whoareyou packet to get the handshake stored and the + # whoareyou data returned. It's either that or construct the header for the + # whoareyouData manually. + let + encodedDummy = encodeWhoareyouPacket(rng[], codecB, nodeA.id, + nodeA.address.get(), requestNonce, recordSeq, pubkey) + decodedDummy = codecA.decodePacket(nodeB.address.get(), encodedDummy) + + let data = encodeHandshakePacket(rng[], codecA, nodeB.id, + nodeB.address.get(), message, decodedDummy[].whoareyou, + privKeyB.toPublicKey()) + + let decoded = codecB.decodePacket(nodeA.address.get(), data) + + check: + decoded.isOk() + decoded.get().message.reqId == reqId + decoded.get().message.kind == ping + decoded.get().message.ping.enrSeq == 0 + decoded.get().node.isNone() + + test "Encode / Decode Handshake Message Packet with ENR": + var requestNonce: AESGCMNonce + brHmacDrbgGenerate(rng[], requestNonce) + let + recordSeq = 0'u64 + m = PingMessage(enrSeq: 0) + reqId = RequestId.init(rng[]) + message = encodeMessage(m, reqId) + pubkey = none(PublicKey) + + # Encode/decode whoareyou packet to get the handshake stored and the + # whoareyou data returned. It's either that or construct the header for the + # whoareyouData manually. + let + encodedDummy = encodeWhoareyouPacket(rng[], codecB, nodeA.id, + nodeA.address.get(), requestNonce, recordSeq, pubkey) + decodedDummy = codecA.decodePacket(nodeB.address.get(), encodedDummy) + + let encoded = encodeHandshakePacket(rng[], codecA, nodeB.id, + nodeB.address.get(), message, decodedDummy[].whoareyou, + privKeyB.toPublicKey()) + + let decoded = codecB.decodePacket(nodeA.address.get(), encoded) + + check: + decoded.isOk() + decoded.get().message.reqId == reqId + decoded.get().message.kind == ping + decoded.get().message.ping.enrSeq == 0 + decoded.get().node.isSome() + decoded.get().node.get().record.seqNum == 1 + + test "Encode / Decode Ordinary Message Packet": + let + m = PingMessage(enrSeq: 0) + reqId = RequestId.init(rng[]) + message = encodeMessage(m, reqId) + + # Need to manually add the secrets that normally get negotiated in the + # handshake packet. + var secrets: HandshakeSecrets + codecA.sessions.store(nodeB.id, nodeB.address.get(), secrets.recipientKey, + secrets.initiatorKey) + codecB.sessions.store(nodeA.id, nodeA.address.get(), secrets.initiatorKey, + secrets.recipientKey) + + let (data, nonce) = encodeMessagePacket(rng[], codecA, nodeB.id, + nodeB.address.get(), message) + + let decoded = codecB.decodePacket(nodeA.address.get(), data) + check: + decoded.isOk() + decoded.get().flag == OrdinaryMessage + decoded.get().messageOpt.isSome() + decoded.get().messageOpt.get().reqId == reqId + decoded.get().messageOpt.get().kind == ping + decoded.get().messageOpt.get().ping.enrSeq == 0 + decoded[].requestNonce == nonce