diff --git a/eth/p2p.nim b/eth/p2p.nim index 32647e9..9d4592b 100644 --- a/eth/p2p.nim +++ b/eth/p2p.nim @@ -84,7 +84,7 @@ proc newEthereumNode*( bootstrapNodes: seq[ENode] = @[], bindUdpPort: Port, bindTcpPort: Port, - bindIp = IPv4_any(), + bindIp = IPv6_any(), rng = newRng()): EthereumNode = if rng == nil: # newRng could fail diff --git a/eth/p2p/discovery.nim b/eth/p2p/discovery.nim index e089dfb..aa0e722 100644 --- a/eth/p2p/discovery.nim +++ b/eth/p2p/discovery.nim @@ -9,24 +9,27 @@ import std/[times, net], - chronos, stint, nimcrypto/keccak, chronicles, - stew/objects, results, + chronos, + stint, + nimcrypto/keccak, + chronicles, + stew/objects, + results, ../rlp, ../common/keys, "."/[kademlia, enode] -export - Node, results +export Node, results logScope: topics = "eth p2p discovery" const # UDP packet constants. - MAC_SIZE = 256 div 8 # 32 - SIG_SIZE = 520 div 8 # 65 - HEAD_SIZE = MAC_SIZE + SIG_SIZE # 97 - EXPIRATION = 60 # let messages expire after N seconds + MAC_SIZE = 256 div 8 # 32 + SIG_SIZE = 520 div 8 # 65 + HEAD_SIZE = MAC_SIZE + SIG_SIZE # 97 + EXPIRATION = 60 # let messages expire after N seconds PROTO_VERSION = 4 type @@ -55,9 +58,14 @@ proc append*(w: var RlpWriter, a: IpAddress) = of IpAddressFamily.IPv4: w.append(a.address_v4) -proc append(w: var RlpWriter, p: Port) = w.append(p.uint) -proc append(w: var RlpWriter, pk: PublicKey) = w.append(pk.toRaw()) -proc append(w: var RlpWriter, h: MDigest[256]) = w.append(h.data) +proc append(w: var RlpWriter, p: Port) = + w.append(p.uint) + +proc append(w: var RlpWriter, pk: PublicKey) = + w.append(pk.toRaw()) + +proc append(w: var RlpWriter, h: MDigest[256]) = + w.append(h.data) proc pack(cmdId: CommandId, payload: openArray[byte], pk: PrivateKey): seq[byte] = ## Create and sign a UDP message to be sent to a remote node. @@ -65,11 +73,13 @@ proc pack(cmdId: CommandId, payload: openArray[byte], pk: PrivateKey): seq[byte] ## See https://github.com/ethereum/devp2p/blob/master/rlpx.md#node-discovery for information on ## how UDP packets are structured. - # TODO: There is a lot of unneeded allocations here - let encodedData = @[cmdId.byte] & @payload - let signature = @(pk.sign(encodedData).toRaw()) - let msgHash = keccak256.digest(signature & encodedData) - result = @(msgHash.data) & signature & encodedData + result = newSeq[byte](HEAD_SIZE + 1 + payload.len) + result[HEAD_SIZE] = cmdId.byte + result[HEAD_SIZE + 1 ..< result.len] = payload + result[MAC_SIZE ..< MAC_SIZE + SIG_SIZE] = + pk.sign(result.toOpenArray(HEAD_SIZE, result.high)).toRaw() + result[0 ..< MAC_SIZE] = + keccak256.digest(result.toOpenArray(MAC_SIZE, result.high)).data proc validateMsgHash(msg: openArray[byte]): DiscResult[MDigest[256]] = if msg.len > HEAD_SIZE: @@ -85,20 +95,20 @@ proc validateMsgHash(msg: openArray[byte]): DiscResult[MDigest[256]] = proc recoverMsgPublicKey(msg: openArray[byte]): DiscResult[PublicKey] = if msg.len <= HEAD_SIZE: return err("disc: can't get public key") - let sig = ? Signature.fromRaw(msg.toOpenArray(MAC_SIZE, HEAD_SIZE)) + let sig = ?Signature.fromRaw(msg.toOpenArray(MAC_SIZE, HEAD_SIZE)) recover(sig, msg.toOpenArray(HEAD_SIZE, msg.high)) -proc unpack(msg: openArray[byte]): tuple[cmdId: CommandId, payload: seq[byte]] - {.raises: [DiscProtocolError].} = +proc unpack( + msg: openArray[byte] +): tuple[cmdId: CommandId, payload: seq[byte]] {.raises: [DiscProtocolError].} = # Check against possible RangeDefect - if msg[HEAD_SIZE].int < CommandId.low.ord or - msg[HEAD_SIZE].int > CommandId.high.ord: + if msg[HEAD_SIZE].int < CommandId.low.ord or msg[HEAD_SIZE].int > CommandId.high.ord: raise newException(DiscProtocolError, "Unsupported packet id") - result = (cmdId: msg[HEAD_SIZE].CommandId, payload: msg[HEAD_SIZE + 1 .. ^1]) + (cmdId: msg[HEAD_SIZE].CommandId, payload: msg[HEAD_SIZE + 1 .. ^1]) -proc expiration(): uint32 = - result = uint32(epochTime() + EXPIRATION) +proc expiration(): uint64 = + uint64(getTime().toUnix() + EXPIRATION) # Wire protocol @@ -110,15 +120,16 @@ proc send(d: DiscoveryProtocol, n: Node, data: seq[byte]) = when defined(chronicles_log_level): try: # readError will raise FutureError - debug "Discovery send failed", msg = f.readError.msg + debug "Discovery send failed", + msg = f.readError.msg, address = $n.node.address except FutureError as exc: - error "Failed to get discovery send future error", msg=exc.msg + error "Failed to get discovery send future error", msg = exc.msg f.addCallback cb proc sendPing*(d: DiscoveryProtocol, n: Node): seq[byte] = - let payload = rlp.encode((PROTO_VERSION.uint, d.address, n.node.address, - expiration())) + let payload = + rlp.encode((PROTO_VERSION.uint, d.address, n.node.address, expiration())) let msg = pack(cmdPing, payload, d.privKey) result = msg[0 ..< MAC_SIZE] trace ">>> ping ", n @@ -135,7 +146,7 @@ proc sendFindNode*(d: DiscoveryProtocol, n: Node, targetNodeId: NodeId) = data[32 .. ^1] = targetNodeId.toByteArrayBE() let payload = rlp.encode((data, expiration())) let msg = pack(cmdFindNode, payload, d.privKey) - trace ">>> find_node to ", n#, ": ", msg.toHex() + trace ">>> find_node to ", n #, ": ", msg.toHex() d.send(n, msg) proc sendNeighbours*(d: DiscoveryProtocol, node: Node, neighbours: seq[Node]) = @@ -152,18 +163,23 @@ proc sendNeighbours*(d: DiscoveryProtocol, node: Node, neighbours: seq[Node]) = nodes.setLen(0) for i, n in neighbours: - nodes.add((n.node.address.ip, n.node.address.udpPort, - n.node.address.tcpPort, n.node.pubkey)) + nodes.add( + (n.node.address.ip, n.node.address.udpPort, n.node.address.tcpPort, n.node.pubkey) + ) if nodes.len == MAX_NEIGHBOURS_PER_PACKET: flush() - if nodes.len != 0: flush() + if nodes.len != 0: + flush() proc newDiscoveryProtocol*( - privKey: PrivateKey, address: Address, + privKey: PrivateKey, + address: Address, bootstrapNodes: openArray[ENode], - bindPort: Port, bindIp = IPv4_any(), - rng = newRng()): DiscoveryProtocol = + bindPort: Port, + bindIp = IPv6_any(), + rng = newRng(), +): DiscoveryProtocol = let localNode = newNode(privKey.toPublicKey(), address) discovery = DiscoveryProtocol( @@ -171,27 +187,32 @@ proc newDiscoveryProtocol*( address: address, localNode: localNode, bindIp: bindIp, - bindPort: bindPort) + bindPort: bindPort, + ) kademlia = newKademliaProtocol(localNode, discovery, rng = rng) discovery.kademlia = kademlia - for n in bootstrapNodes: discovery.bootstrapNodes.add(newNode(n)) + for n in bootstrapNodes: + discovery.bootstrapNodes.add(newNode(n)) discovery -proc recvPing(d: DiscoveryProtocol, node: Node, msgHash: MDigest[256]) - {.raises: [ValueError].} = +proc recvPing( + d: DiscoveryProtocol, node: Node, msgHash: MDigest[256] +) {.raises: [ValueError].} = d.kademlia.recvPing(node, msgHash) -proc recvPong(d: DiscoveryProtocol, node: Node, payload: seq[byte]) - {.raises: [RlpError].} = +proc recvPong( + d: DiscoveryProtocol, node: Node, payload: seq[byte] +) {.raises: [RlpError].} = let rlp = rlpFromBytes(payload) let tok = rlp.listElem(1).toBytes() d.kademlia.recvPong(node, tok) -proc recvNeighbours(d: DiscoveryProtocol, node: Node, payload: seq[byte]) - {.raises: [RlpError].} = +proc recvNeighbours( + d: DiscoveryProtocol, node: Node, payload: seq[byte] +) {.raises: [RlpError].} = let rlp = rlpFromBytes(payload) let neighboursList = rlp.listElem(0) let sz = neighboursList.listLen() @@ -203,11 +224,9 @@ proc recvNeighbours(d: DiscoveryProtocol, node: Node, payload: seq[byte]) var ip: IpAddress case ipBlob.len of 4: - ip = IpAddress( - family: IpAddressFamily.IPv4, address_v4: toArray(4, ipBlob)) + ip = IpAddress(family: IpAddressFamily.IPv4, address_v4: toArray(4, ipBlob)) of 16: - ip = IpAddress( - family: IpAddressFamily.IPv6, address_v6: toArray(16, ipBlob)) + ip = IpAddress(family: IpAddressFamily.IPv6, address_v6: toArray(16, ipBlob)) else: error "Wrong ip address length!" continue @@ -222,8 +241,9 @@ proc recvNeighbours(d: DiscoveryProtocol, node: Node, payload: seq[byte]) neighbours.add(newNode(pk[], Address(ip: ip, udpPort: udpPort, tcpPort: tcpPort))) d.kademlia.recvNeighbours(node, neighbours) -proc recvFindNode(d: DiscoveryProtocol, node: Node, payload: openArray[byte]) - {.raises: [RlpError, ValueError].} = +proc recvFindNode( + d: DiscoveryProtocol, node: Node, payload: openArray[byte] +) {.raises: [RlpError, ValueError].} = let rlp = rlpFromBytes(payload) trace "<<< find_node from ", node let rng = rlp.listElem(0).toBytes @@ -234,8 +254,9 @@ proc recvFindNode(d: DiscoveryProtocol, node: Node, payload: openArray[byte]) else: trace "Invalid target public key received" -proc expirationValid(cmdId: CommandId, rlpEncodedPayload: openArray[byte]): - bool {.raises: [DiscProtocolError, RlpError].} = +proc expirationValid( + cmdId: CommandId, rlpEncodedPayload: openArray[byte] +): bool {.raises: [DiscProtocolError, RlpError].} = ## Can only raise `DiscProtocolError` and all of `RlpError` # Check if there is a payload if rlpEncodedPayload.len <= 0: @@ -250,8 +271,9 @@ proc expirationValid(cmdId: CommandId, rlpEncodedPayload: openArray[byte]): else: raise newException(DiscProtocolError, "Invalid RLP list for this packet id") -proc receive*(d: DiscoveryProtocol, a: Address, msg: openArray[byte]) - {.raises: [DiscProtocolError, RlpError, ValueError].} = +proc receive*( + d: DiscoveryProtocol, a: Address, msg: openArray[byte] +) {.raises: [DiscProtocolError, RlpError, ValueError].} = # Note: export only needed for testing let msgHash = validateMsgHash(msg) if msgHash.isOk(): @@ -280,17 +302,20 @@ proc receive*(d: DiscoveryProtocol, a: Address, msg: openArray[byte]) else: notice "Wrong msg mac from ", a -proc processClient(transp: DatagramTransport, raddr: TransportAddress): - Future[void] {.async: (raises: []).} = +proc processClient( + transp: DatagramTransport, raddr: TransportAddress +): Future[void] {.async: (raises: []).} = var proto = getUserData[DiscoveryProtocol](transp) - let buf = try: transp.getMessage() - except TransportOsError as e: - # This is likely to be local network connection issues. - warn "Transport getMessage", exception = e.name, msg = e.msg - return - except TransportError as exc: - debug "getMessage error", msg = exc.msg - return + let buf = + try: + transp.getMessage() + except TransportOsError as e: + # This is likely to be local network connection issues. + warn "Transport getMessage", exception = e.name, msg = e.msg + return + except TransportError as exc: + debug "getMessage error", msg = exc.msg + return try: let a = Address(ip: raddr.address, udpPort: raddr.port, tcpPort: raddr.port) proto.receive(a, buf) @@ -326,26 +351,30 @@ proc randomNodes*(d: DiscoveryProtocol, count: int): seq[Node] = d.kademlia.randomNodes(count) when isMainModule: - import logging, stew/byteutils - - const LOCAL_BOOTNODES = [ - "enode://6456719e7267e061161c88720287a77b80718d2a3a4ff5daeba614d029dc77601b75e32190aed1c9b0b9ccb6fac3bcf000f48e54079fa79e339c25d8e9724226@127.0.0.1:30301" - ] - - addHandler(newConsoleLogger()) + import stew/byteutils, ./bootnodes block: - let m = hexToSeqByte"79664bff52ee17327b4a2d8f97d8fb32c9244d719e5038eb4f6b64da19ca6d271d659c3ad9ad7861a928ca85f8d8debfbe6b7ade26ad778f2ae2ba712567fcbd55bc09eb3e74a893d6b180370b266f6aaf3fe58a0ad95f7435bf3ddf1db940d20102f2cb842edbd4d182944382765da0ab56fb9e64a85a597e6bb27c656b4f1afb7e06b0fd4e41ccde6dba69a3c4a150845aaa4de2" + let m = + hexToSeqByte"79664bff52ee17327b4a2d8f97d8fb32c9244d719e5038eb4f6b64da19ca6d271d659c3ad9ad7861a928ca85f8d8debfbe6b7ade26ad778f2ae2ba712567fcbd55bc09eb3e74a893d6b180370b266f6aaf3fe58a0ad95f7435bf3ddf1db940d20102f2cb842edbd4d182944382765da0ab56fb9e64a85a597e6bb27c656b4f1afb7e06b0fd4e41ccde6dba69a3c4a150845aaa4de2" discard validateMsgHash(m).expect("valid hash") var remotePubkey = recoverMsgPublicKey(m).expect("valid key") let (cmdId, payload) = unpack(m) - doAssert(payload == hexToSeqByte"f2cb842edbd4d182944382765da0ab56fb9e64a85a597e6bb27c656b4f1afb7e06b0fd4e41ccde6dba69a3c4a150845aaa4de2") + doAssert( + payload == + hexToSeqByte"f2cb842edbd4d182944382765da0ab56fb9e64a85a597e6bb27c656b4f1afb7e06b0fd4e41ccde6dba69a3c4a150845aaa4de2" + ) doAssert(cmdId == cmdPong) - doAssert(remotePubkey == PublicKey.fromHex( - "78de8a0916848093c73790ead81d1928bec737d565119932b98c6b100d944b7a95e94f847f689fc723399d2e31129d182f7ef3863f2b4c820abbf3ab2722344d")[]) + doAssert( + remotePubkey == + PublicKey.fromHex( + "78de8a0916848093c73790ead81d1928bec737d565119932b98c6b100d944b7a95e94f847f689fc723399d2e31129d182f7ef3863f2b4c820abbf3ab2722344d" + )[] + ) - let privKey = PrivateKey.fromHex("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a617")[] + let privKey = PrivateKey.fromHex( + "a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a617" + )[] # echo privKey @@ -355,14 +384,14 @@ when isMainModule: # let (remotePubkey, cmdId, payload) = unpack(m) # doAssert(remotePubkey.raw_key.toHex == privKey.public_key.raw_key.toHex) - var bootnodes = newSeq[ENode]() - for item in LOCAL_BOOTNODES: - bootnodes.add(ENode.fromString(item)[]) + var nodes = newSeq[ENode]() + for item in MainnetBootnodes: + nodes.add(ENode.fromString(item)[]) let listenPort = Port(30310) var address = Address(udpPort: listenPort, tcpPort: listenPort) address.ip.family = IpAddressFamily.IPv4 - let discovery = newDiscoveryProtocol(privkey, address, bootnodes, listenPort) + let discovery = newDiscoveryProtocol(privKey, address, nodes, bindPort = listenPort) echo discovery.localNode.node.pubkey echo "this_node.id: ", discovery.localNode.id.toHex() @@ -372,5 +401,6 @@ when isMainModule: proc test() {.async.} = {.gcsafe.}: await discovery.bootstrap() - + for node in discovery.randomNodes(discovery.kademlia.nodesDiscovered): + echo node waitFor test() diff --git a/eth/p2p/kademlia.nim b/eth/p2p/kademlia.nim index ad77f64..e3d49d7 100644 --- a/eth/p2p/kademlia.nim +++ b/eth/p2p/kademlia.nim @@ -49,7 +49,6 @@ type istart, iend: UInt256 nodes: seq[Node] replacementCache: seq[Node] - lastUpdated: float # epochTime CommandId* = enum cmdPing = 1 @@ -216,7 +215,6 @@ proc add(k: KBucket, n: Node): Node = ## If the bucket is full, we add the node to the bucket's replacement cache and return the ## node at the head of the list (i.e. the least recently seen), which should be evicted if it ## fails to respond to a ping. - k.lastUpdated = epochTime() let nodeIdx = k.nodes.find(n) if nodeIdx != -1: k.nodes.delete(nodeIdx)