Merge branch 'master' into review

This commit is contained in:
cheatfate 2018-03-30 23:12:05 +03:00
commit b89d42b633
10 changed files with 1009 additions and 28 deletions

11
.gitignore vendored Normal file
View File

@ -0,0 +1,11 @@
nimcache/
# Executables shall be put in an ignored build/ directory
# Ignore dynamic, static libs and libtool archive files
build/
*.so
*.dylib
*.a
*.la
*.exe
*.dll

View File

@ -1,20 +1,8 @@
sudo: false
language: c
os:
- linux
# - osx
dist: trusty
before_script:
- git clone https://github.com/nim-lang/Nim.git
- cd Nim
- git clone --depth 1 https://github.com/nim-lang/csources
- cd csources && sh build.sh
- cd ..
- bin/nim c koch
- ./koch boot -d:release
- ./koch nimble
- export PATH=$(pwd)/bin:$PATH
- cd ..
sudo: required
services:
- docker
before_install:
- docker pull statusteam/nim-base
script:
- nimble install -y
- nimble tests
- docker run statusteam/nim-base nim --version
- docker run -v "$(pwd):/project" -w /project statusteam/nim-base sh -c "nimble install -dy && nimble test"

205
LICENSE-APACHEv2 Normal file
View File

@ -0,0 +1,205 @@
p2p_discobus is licensed under the Apache License version 2
Copyright (c) 2018 Status Research & Development GmbH
-----------------------------------------------------
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2018 Status Research & Development GmbH
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -1,6 +1,10 @@
MIT License
p2p_discobus is licensed under the MIT License
Copyright (c) 2018 Status Research & Development GmbH
-----------------------------------------------------
Copyright (c) 2018 Status
The MIT License (MIT)
Copyright (c) 2018 Status Research & Development GmbH
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

View File

@ -1,2 +1,10 @@
# nim-eth-p2p [![Build Status](https://travis-ci.org/status-im/nim-eth-p2p.svg?branch=master)](https://travis-ci.org/status-im/nim-eth-p2p) [![Build Status](https://ci.appveyor.com/api/projects/status/github/status-im/nim-eth-p2p?branch=master&svg=true)](https://ci.appveyor.com/project/status-im/nim-eth-p2p)
# nim-eth-p2p [![Build Status](https://travis-ci.org/status-im/nim-eth-p2p.svg?branch=master)](https://travis-ci.org/status-im/nim-eth-p2p) [![Build status](https://ci.appveyor.com/api/projects/status/i4txsa2pdyaahmn0/branch/master?svg=true)](https://ci.appveyor.com/project/cheatfate/nim-eth-p2p/branch/master)
Nim Ethereum P2P protocol implementation
## License
Licensed and distributed under either of
* MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
* Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
at your option. This file may not be copied, modified, or distributed except according to those terms.

View File

@ -39,6 +39,6 @@ build_script:
- cd C:\projects\nim-eth-p2p
- nimble install -y
test_script:
- nimble tests
- nimble test
deploy: off

View File

@ -7,9 +7,19 @@ description = "Ethereum P2P library"
license = "MIT"
skipDirs = @["tests", "Nim"]
requires "nim > 0.18.0", "https://github.com/status-im/nim-rlp >= 1.0.1", "https://github.com/cheatfate/nimcrypto >= 0.1.0", "https://github.com/status-im/nim-secp256k1 >= 0.1.0"
requires "nim > 0.18.0",
"rlp >= 1.0.1",
"nimcrypto >= 0.1.0",
"secp256k1 >= 0.1.0",
"eth_keys",
"ranges",
"ttmath"
task tests, "Runs the test suite":
exec "nim c -r tests/testecc"
exec "nim c -r tests/testecies"
exec "nim c -r tests/testauth"
proc runTest(name: string, lang = "c") = exec "nim " & lang & " -r tests/" & name
task test, "Runs the test suite":
runTest "testecc"
runTest "testecies"
runTest "testauth"
runTest("tdiscovery", "cpp")

294
ethp2p/discovery.nim Normal file
View File

@ -0,0 +1,294 @@
from strutils import nil
import asyncnet, asyncdispatch, net, times, nativesockets, algorithm, logging
import kademlia
import eth_keys, rlp, ranges, ttmath, nimcrypto
export Address, Node
const
MAINNET_BOOTNODES* = [
"enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303", # noqa: E501
"enode://aa36fdf33dd030378a0168efe6ed7d5cc587fafa3cdd375854fe735a2e11ea3650ba29644e2db48368c46e1f60e716300ba49396cd63778bf8a818c09bded46f@13.93.211.84:30303", # noqa: E501
"enode://78de8a0916848093c73790ead81d1928bec737d565119932b98c6b100d944b7a95e94f847f689fc723399d2e31129d182f7ef3863f2b4c820abbf3ab2722344d@191.235.84.50:30303", # noqa: E501
"enode://158f8aab45f6d19c6cbf4a089c2670541a8da11978a2f90dbf6a502a4a3bab80d288afdbeb7ec0ef6d92de563767f3b1ea9e8e334ca711e9f8e2df5a0385e8e6@13.75.154.138:30303", # noqa: E501
"enode://1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082@52.74.57.123:30303", # noqa: E501
]
ROPSTEN_BOOTNODES* = [
"enode://30b7ab30a01c124a6cceca36863ece12c4f5fa68e3ba9b0b51407ccc002eeed3b3102d20a88f1c1d3c3154e2449317b8ef95090e77b312d5cc39354f86d5d606@52.176.7.10:30303", # noqa: E501
"enode://865a63255b3bb68023b6bffd5095118fcc13e79dcf014fe4e47e065c350c7cc72af2e53eff895f11ba1bbb6a2b33271c1116ee870f266618eadfc2e78aa7349c@52.176.100.77:30303", # noqa: E501
"enode://6332792c4a00e3e4ee0926ed89e0d27ef985424d97b6a45bf0f23e51f0dcb5e66b875777506458aea7af6f9e4ffb69f43f3778ee73c81ed9d34c51c4b16b0b0f@52.232.243.152:30303", # noqa: E501
"enode://94c15d1b9e2fe7ce56e458b9a3b672ef11894ddedd0c6f247e0f1d3487f52b66208fb4aeb8179fce6e3a749ea93ed147c37976d67af557508d199d9594c35f09@192.81.208.223:30303", # noqa: E501
]
LOCAL_BOOTNODES = [
"enode://6456719e7267e061161c88720287a77b80718d2a3a4ff5daeba614d029dc77601b75e32190aed1c9b0b9ccb6fac3bcf000f48e54079fa79e339c25d8e9724226@127.0.0.1:30301"
]
# UDP packet constants.
MAC_SIZE = 256 div 8 # 32
SIG_SIZE = 520 div 8 # 65
HEAD_SIZE = MAC_SIZE + SIG_SIZE # 97
EXPIRATION = 60 # let messages expire after N secondes
PROTO_VERSION = 4
type
DiscoveryProtocol* = ref object
privKey: PrivateKey
address: Address
bootstrapNodes: seq[Node]
thisNode: Node
kademlia: KademliaProtocol[DiscoveryProtocol]
socket: AsyncSocket
CommandId = enum
cmdPing = 1
cmdPong = 2
cmdFindNode = 3
cmdNeighbours = 4
const MaxDgramSize = 1280
proc append*(w: var RlpWriter, a: IpAddress) =
case a.family
of IpAddressFamily.IPv6:
w.append(a.address_v6.toMemRange)
of IpAddressFamily.IPv4:
w.append(a.address_v4.toMemRange)
proc append*(w: var RlpWriter, p: Port) {.inline.} = w.append(p.int)
proc append*(w: var RlpWriter, pk: PublicKey) {.inline.} =
var bytes: array[64, byte]
pk.serialize(bytes)
w.append(toMemRange(bytes))
proc append*(w: var RlpWriter, h: MDigest[256]) {.inline.} =
w.append(toMemRange(h.data))
proc toBytes(s: Signature): Bytes =
result = newSeq[byte](sizeof(s))
s.serialize(result)
proc pack(cmdId: CommandId, payload: BytesRange, pk: PrivateKey): Bytes =
## Create and sign a UDP message to be sent to a remote node.
##
## See https://github.com/ethereum/devp2p/blob/master/rlpx.md#node-discovery for information on
## how UDP packets are structured.
let encodedData = @[cmdId.byte] & payload.toSeq()
let signature = toBytes(pk.sign_msg(keccak256.digest(encodedData)))
let msgHash = keccak256.digest(signature & encodedData)
result = @(msgHash.data) & signature & encodedData
proc validateMsgHash(msg: Bytes, msgHash: var MDigest[256]): bool =
msgHash.data[0 .. ^1] = msg.toOpenArray(0, msgHash.data.high)
result = msgHash == keccak256.digest(msg.toOpenArray(MAC_SIZE, msg.high))
proc unpack(msg: Bytes): tuple[remotePubkey: PublicKey, cmdId: CommandId, payload: Bytes] =
result.cmdId = msg[HEAD_SIZE].CommandId
let signature = parseSignature(msg, MAC_SIZE)
result.remotePubkey = recover_pubkey_from_msg(keccak256.digest(msg.toOpenArray(HEAD_SIZE, msg.high)), signature)
result.payload = msg[HEAD_SIZE + 1 .. ^1]
proc expiration(): uint32 =
result = uint32(epochTime() + EXPIRATION)
# Wire protocol
proc sendTo*(socket: AsyncFD, data: seq[byte], ip: IpAddress, port: Port,
flags = {SocketFlag.SafeDisconn}) {.async.} =
var sa: Sockaddr_storage
var ln: Socklen
ip.toSockaddr(port, sa, ln)
GC_ref(data)
await sendTo(socket, unsafeAddr data[0], data.len, cast[ptr Sockaddr](addr sa), ln)
GC_unref(data)
proc send(d: DiscoveryProtocol, n: Node, data: seq[byte]) =
asyncCheck d.socket.getFd().AsyncFD.sendTo(data, n.address.ip, n.address.udpPort)
proc sendPing*(d: DiscoveryProtocol, n: Node): seq[byte] =
let payload = rlp.encode((PROTO_VERSION, d.address, n.address, expiration()))
let msg = pack(cmdPing, payload, d.privKey)
result = msg[0 ..< MAC_SIZE]
debug ">>> ping ", n
d.send(n, msg)
proc sendPong*(d: DiscoveryProtocol, n: Node, token: MDigest[256]) =
let payload = rlp.encode((n.address, token, expiration()))
let msg = pack(cmdPong, payload, d.privKey)
debug ">>> pong ", n
d.send(n, msg)
proc sendFindNode*(d: DiscoveryProtocol, n: Node, targetNodeId: NodeId) =
var data = newSeq[byte](32) & @(targetNodeId.toByteArrayBE())
let payload = rlp.encode((data, expiration()))
let msg = pack(cmdFindNode, payload, d.privKey)
debug ">>> find_node to ", n#, ": ", msg.toHex()
d.send(n, msg)
proc sendNeighbours*(d: DiscoveryProtocol, node: Node, neighbours: seq[Node]) =
const MAX_NEIGHBOURS_PER_PACKET = 12 # TODO: Implement a smarter way to compute it
type Neighbour = tuple[ip: IpAddress, udpPort, tcpPort: Port, pk: PublicKey]
var nodes = newSeqOfCap[Neighbour](MAX_NEIGHBOURS_PER_PACKET)
shallow(nodes)
template flush() =
block:
let payload = rlp.encode((nodes, expiration()))
let msg = pack(cmdNeighbours, payload, d.privkey)
debug ">>> neighbours to ", node, ": ", nodes
d.send(node, msg)
nodes.setLen(0)
for i, n in neighbours:
nodes.add((n.address.ip, n.address.udpPort, n.address.tcpPort, n.pubkey))
if nodes.len == MAX_NEIGHBOURS_PER_PACKET:
flush()
if nodes.len != 0: flush()
proc newDiscoveryProtocol*(privKey: PrivateKey, address: Address, bootstrapNodes: openarray[string]): DiscoveryProtocol =
result.new()
result.privKey = privKey
result.address = address
result.bootstrapNodes = newSeqOfCap[Node](bootstrapNodes.len)
for n in bootstrapNodes: result.bootstrapNodes.add(newNode(n))
result.thisNode = newNode(privKey.public_key, address)
result.kademlia = newKademliaProtocol(result.thisNode, result) {.explain.}
proc recvPing(d: DiscoveryProtocol, node: Node, msgHash: MDigest[256]) {.inline.} =
d.kademlia.recvPing(node, msgHash)
proc recvPong(d: DiscoveryProtocol, node: Node, payload: Bytes) {.inline.} =
let rlp = rlpFromBytes(payload.toRange)
let tok = rlp.listElem(1).toBytes().toSeq()
d.kademlia.recvPong(node, tok)
proc recvNeighbours(d: DiscoveryProtocol, node: Node, payload: Bytes) {.inline.} =
let rlp = rlpFromBytes(payload.toRange)
let neighboursList = rlp.listElem(0)
let sz = neighboursList.listLen()
var neighbours = newSeqOfCap[Node](16)
for i in 0 ..< sz:
let n = neighboursList.listElem(i)
let ipBlob = n.listElem(0).toBytes
var ip: IpAddress
case ipBlob.len
of 4:
ip = IpAddress(family: IpAddressFamily.IPv4)
copyMem(addr ip.address_v4[0], baseAddr ipBlob, 4)
of 16:
ip = IpAddress(family: IpAddressFamily.IPv6)
copyMem(addr ip.address_v6[0], baseAddr ipBlob, 16)
else:
error "Wrong ip address length!"
continue
let udpPort = n.listElem(1).toInt(uint16).Port
let tcpPort = n.listElem(2).toInt(uint16).Port
let pk = parsePublicKey(n.listElem(3).toBytes.toOpenArray())
neighbours.add(newNode(pk, Address(ip: ip, udpPort: udpPort, tcpPort: tcpPort)))
d.kademlia.recvNeighbours(node, neighbours)
proc recvFindNode(d: DiscoveryProtocol, node: Node, payload: Bytes) {.inline.} =
let rlp = rlpFromBytes(payload.toRange)
debug "<<< find_node from ", node
let rng = rlp.listElem(0).toBytes
let nodeId = readUIntBE[256](rng.toOpenArray())
d.kademlia.recvFindNode(node, nodeId)
proc expirationValid(rlpEncodedPayload: seq[byte]): bool {.inline.} =
let rlp = rlpFromBytes(rlpEncodedPayload.toRange)
let expiration = rlp.listElem(rlp.listLen - 1).toInt(uint32)
result = epochTime() <= expiration.float
proc receive(d: DiscoveryProtocol, a: Address, msg: Bytes) =
var msgHash: MDigest[256]
if validateMsgHash(msg, msgHash):
let (remotePubkey, cmdId, payload) = unpack(msg)
# echo "received cmd: ", cmdId, ", from: ", a
# echo "pubkey: ", remotePubkey.raw_key.toHex()
if expirationValid(payload):
let node = newNode(remotePubkey, a)
case cmdId
of cmdPing:
d.recvPing(node, msgHash)
of cmdPong:
d.recvPong(node, payload)
of cmdNeighbours:
d.recvNeighbours(node, payload)
of cmdFindNode:
d.recvFindNode(node, payload)
else:
echo "Unknown command: ", cmdId
else:
debug "Received msg ", cmdId, " from ", a, " already expired"
else:
error "Wrong msg mac from ", a
proc runListeningLoop(d: DiscoveryProtocol) {.async.} =
var buf = newSeq[byte](MaxDgramSize)
var saddr: Sockaddr_storage
var slen: Socklen
while not d.socket.isNil:
buf.setLen(MaxDgramSize)
slen = sizeof(saddr).Socklen
let received = await recvFromInto(d.socket.getFd().AsyncFD, addr buf[0], buf.len, cast[ptr SockAddr](addr saddr), addr slen)
buf.setLen(received)
var port: Port
var ip: IpAddress
fromSockAddr(saddr, slen, ip, port)
d.receive(Address(ip: ip, udpPort: port, tcpPort: port), buf)
proc open*(d: DiscoveryProtocol) =
d.socket = newAsyncSocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)
d.socket.bindAddr(port = d.address.udpPort)
asyncCheck d.runListeningLoop()
proc bootstrap*(d: DiscoveryProtocol) {.async.} =
await d.kademlia.bootstrap(d.bootstrapNodes)
proc resolve*(d: DiscoveryProtocol, n: NodeId): Future[Node] =
d.kademlia.resolve(n)
when isMainModule:
import logging
from private.conversion_bytes import hexToSeqByteBE # from eth_keys
addHandler(newConsoleLogger())
block:
let m = hexToSeqByteBE"79664bff52ee17327b4a2d8f97d8fb32c9244d719e5038eb4f6b64da19ca6d271d659c3ad9ad7861a928ca85f8d8debfbe6b7ade26ad778f2ae2ba712567fcbd55bc09eb3e74a893d6b180370b266f6aaf3fe58a0ad95f7435bf3ddf1db940d20102f2cb842edbd4d182944382765da0ab56fb9e64a85a597e6bb27c656b4f1afb7e06b0fd4e41ccde6dba69a3c4a150845aaa4de2"
var msgHash: MDigest[256]
doAssert(validateMsgHash(m, msgHash))
let (remotePubkey, cmdId, payload) = unpack(m)
assert(payload == hexToSeqByteBE"f2cb842edbd4d182944382765da0ab56fb9e64a85a597e6bb27c656b4f1afb7e06b0fd4e41ccde6dba69a3c4a150845aaa4de2")
assert(cmdId == cmdPong)
assert(remotePubkey == initPublicKey("78de8a0916848093c73790ead81d1928bec737d565119932b98c6b100d944b7a95e94f847f689fc723399d2e31129d182f7ef3863f2b4c820abbf3ab2722344d"))
let privKey = initPrivateKey("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a617")
# echo privKey
# block:
# var b = @[1.byte, 2, 3]
# let m = pack(cmdPing, b.initBytesRange, privKey)
# let (remotePubkey, cmdId, payload) = unpack(m)
# assert(remotePubkey.raw_key.toHex == privKey.public_key.raw_key.toHex)
let listenPort = Port(30310)
var address = Address(udpPort: listenPort, tcpPort: listenPort)
address.ip.family = IpAddressFamily.IPv4
let discovery = newDiscoveryProtocol(privkey, address, LOCAL_BOOTNODES)
echo discovery.thisNode.pubkey
echo "this_node.id: ", discovery.thisNode.id.toHex()
discovery.open()
proc test() {.async.} =
await discovery.bootstrap()
waitFor test()

458
ethp2p/kademlia.nim Normal file
View File

@ -0,0 +1,458 @@
import asyncdispatch, net, uri, logging, tables, hashes, times, algorithm, sets,
sequtils
from strutils import parseInt
export sets # TODO: This should not be needed, but compilation fails otherwise
import eth_keys, ttmath, nimcrypto
type
KademliaProtocol* [Wire] = ref object
wire: Wire
thisNode: Node
routing: RoutingTable
pongFutures: Table[seq[byte], Future[bool]]
pingFutures: Table[Node, Future[bool]]
neighboursCallbacks: Table[Node, proc(n: seq[Node])]
NodeId* = UInt256 # This is probably too small...
Node* = ref object
pubkey*: PublicKey
address*: Address
id*: NodeId
Address* = object
ip*: IpAddress
udpPort*: Port
tcpPort*: Port
RoutingTable = object
thisNode: Node
buckets: seq[KBucket]
KBucket = ref object
istart, iend: UInt256
nodes: seq[Node]
replacementCache: seq[Node]
lastUpdated: float # epochTime
const
BUCKET_SIZE = 16
BITS_PER_HOP = 8
REQUEST_TIMEOUT = 0.9 # timeout of message round trips
FIND_CONCURRENCY = 3 # parallel find node lookups
ID_SIZE = 256
proc toNodeId(pk: PublicKey): NodeId =
var k: array[64, byte]
pk.serialize(k)
result = readUintBE[256](keccak256.digest(k).data)
proc newNode*(pk: PublicKey, address: Address): Node =
result.new()
result.pubkey = pk
result.address = address
result.id = pk.toNodeId()
proc newNode*(uriString: string): Node =
let u = parseUri(uriString)
let k = initPublicKey(u.username)
let port = parseInt(u.port).Port
newNode(k, Address(ip: parseIpAddress(u.hostname), udpPort: port, tcpPort: port))
proc distanceTo(n: Node, id: NodeId): UInt256 = n.id xor id
proc `$`*(n: Node): string =
"Node[" & $n.address.ip & ":" & $n.address.udpPort & "]"
proc hash*(n: Node): hashes.Hash = hash(n.pubkey.raw_key)
proc `==`*(a, b: Node): bool = a.pubkey == b.pubkey
proc newKBucket(istart, iend: NodeId): KBucket =
result.new()
result.istart = istart
result.iend = iend
result.nodes = @[]
result.replacementCache = @[]
proc midpoint(k: KBucket): NodeId =
k.istart + (k.iend - k.istart) div 2.u256
proc distanceTo(k: KBucket, id: NodeId): UInt256 = k.midpoint xor id
proc nodesByDistanceTo(k: KBucket, id: NodeId): seq[Node] =
sortedByIt(k.nodes, it.distanceTo(id))
proc len(k: KBucket): int {.inline.} = k.nodes.len
proc head(k: KBucket): Node {.inline.} = k.nodes[0]
proc add(k: KBucket, n: Node): Node =
## Try to add the given node to this bucket.
## If the node is already present, it is moved to the tail of the list, and we return None.
## If the node is not already present and the bucket has fewer than k entries, it is inserted
## at the tail of the list, and we return None.
## If the bucket is full, we add the node to the bucket's replacement cache and return the
## node at the head of the list (i.e. the least recently seen), which should be evicted if it
## fails to respond to a ping.
k.lastUpdated = epochTime()
let nodeIdx = k.nodes.find(n)
if nodeIdx != -1:
k.nodes.delete(nodeIdx)
k.nodes.add(n)
elif k.len < BUCKET_SIZE:
k.nodes.add(n)
else:
k.replacementCache.add(n)
return k.head
return nil
proc removeNode(k: KBucket, n: Node) =
let i = k.nodes.find(n)
if i != -1: k.nodes.delete(i)
proc split(k: KBucket): tuple[lower, upper: KBucket] =
## Split at the median id
let splitid = k.midpoint
result.lower = newKBucket(k.istart, splitid)
result.upper = newKBucket(splitid + 1.u256, k.iend)
for node in k.nodes:
let bucket = if node.id <= splitid: result.lower else: result.upper
discard bucket.add(node)
for node in k.replacementCache:
let bucket = if node.id <= splitid: result.lower else: result.upper
bucket.replacementCache.add(node)
proc inRange(k: KBucket, n: Node): bool {.inline.} =
k.istart <= n.id and n.id <= k.iend
proc isFull(k: KBucket): bool = k.len == BUCKET_SIZE
proc contains(k: KBucket, n: Node): bool = n in k.nodes
proc binaryGetBucketForNode(buckets: openarray[KBucket], node: Node): KBucket {.inline.} =
## Given a list of ordered buckets, returns the bucket for a given node.
let bucketPos = lowerBound(buckets, node.id) do(a: KBucket, b: NodeId) -> int:
cmp(a.iend, b)
# Prevents edge cases where bisect_left returns an out of range index
if bucketPos < buckets.len:
let bucket = buckets[bucketPos]
if bucket.istart <= node.id and node.id <= bucket.iend:
result = bucket
if result.isNil:
raise newException(ValueError, "No bucket found for node with id " & $node.id)
proc computeSharedPrefixBits(nodes: openarray[Node]): int =
## Count the number of prefix bits shared by all nodes.
if nodes.len < 2:
return ID_SIZE
var mask, one: UInt256
mask.setZero()
one.setOne()
for i in 1 .. ID_SIZE:
mask |= one shl uint64(ID_SIZE - i)
let reference = nodes[0].id and mask
for j in 1 .. nodes.high:
if (nodes[j].id and mask) != reference: return i - 1
assert(false, "Unable to calculate number of shared prefix bits")
proc init(r: var RoutingTable, thisNode: Node) {.inline.} =
r.thisNode = thisNode
var maxId: NodeId
maxId.setMax()
r.buckets = @[newKBucket(0.u256, maxId)]
proc splitBucket(r: var RoutingTable, index: int) =
let bucket = r.buckets[index]
let (a, b) = bucket.split()
r.buckets[index] = a
r.buckets.insert(b, index + 1)
proc bucketForNode(r: RoutingTable, n: Node): KBucket =
binaryGetBucketForNode(r.buckets, n)
proc removeNode(r: var RoutingTable, n: Node) =
r.bucketForNode(n).removeNode(n)
proc addNode(r: var RoutingTable, n: Node): Node =
assert(n != r.thisNode)
let bucket = r.bucketForNode(n)
let evictionCandidate = bucket.add(n)
if not evictionCandidate.isNil:
# Split if the bucket has the local node in its range or if the depth is not congruent
# to 0 mod BITS_PER_HOP
let depth = computeSharedPrefixBits(bucket.nodes)
if bucket.inRange(r.thisNode) or (depth mod BITS_PER_HOP != 0 and depth != ID_SIZE):
r.splitBucket(r.buckets.find(bucket))
return r.addNode(n) # retry
# Nothing added, ping evictionCandidate
return evictionCandidate
proc contains(r: RoutingTable, n: Node): bool = n in r.bucketForNode(n)
proc bucketsByDistanceTo(r: RoutingTable, id: NodeId): seq[KBucket] =
sortedByIt(r.buckets, it.distanceTo(id))
proc notFullBuckets(r: RoutingTable): seq[KBucket] =
r.buckets.filterIt(not it.isFull)
proc neighbours(r: RoutingTable, id: NodeId, k: int = BUCKET_SIZE): seq[Node] =
## Return up to k neighbours of the given node.
result = newSeqOfCap[Node](k * 2)
for bucket in r.bucketsByDistanceTo(id):
for n in bucket.nodesByDistanceTo(id):
if n.id != id:
result.add(n)
if result.len == k * 2:
break
result = sortedByIt(result, it.distanceTo(id))
if result.len > k:
result.setLen(k)
proc newKademliaProtocol*[Wire](thisNode: Node, wire: Wire): KademliaProtocol[Wire] =
result.new()
result.thisNode = thisNode
result.wire = wire
result.pongFutures = initTable[seq[byte], Future[bool]]()
result.pingFutures = initTable[Node, Future[bool]]()
result.neighboursCallbacks = initTable[Node, proc(n: seq[Node])]()
result.routing.init(thisNode)
proc bond(k: KademliaProtocol, n: Node): Future[bool] {.async.}
proc updateRoutingTable(k: KademliaProtocol, n: Node) =
## Update the routing table entry for the given node.
let evictionCandidate = k.routing.addNode(n)
if not evictionCandidate.isNil:
# This means we couldn't add the node because its bucket is full, so schedule a bond()
# with the least recently seen node on that bucket. If the bonding fails the node will
# be removed from the bucket and a new one will be picked from the bucket's
# replacement cache.
asyncCheck k.bond(evictionCandidate)
proc doSleep(p: proc()) {.async.} =
await sleepAsync(REQUEST_TIMEOUT * 1000)
p()
template onTimeout(b: untyped) =
asyncCheck doSleep() do():
b
proc waitPong(k: KademliaProtocol, node: Node, token: seq[byte]): Future[bool] =
let pingid = token & @(node.pubkey.raw_key)
assert(pingid notin k.pongFutures, "Already waiting for pong from " & $node)
result = newFuture[bool]("waitPong")
let fut = result
k.pongFutures[pingid] = result
onTimeout:
if not fut.finished:
k.pongFutures.del(pingid)
fut.complete(false)
proc ping(k: KademliaProtocol, n: Node): seq[byte] =
assert(n != k.thisNode)
k.wire.sendPing(n)
proc waitPing(k: KademliaProtocol, n: Node): Future[bool] =
result = newFuture[bool]("waitPing")
assert(n notin k.pingFutures)
k.pingFutures[n] = result
let fut = result
onTimeout:
if not fut.finished:
k.pingFutures.del(n)
fut.complete(false)
proc waitNeighbours(k: KademliaProtocol, remote: Node): Future[seq[Node]] =
assert(remote notin k.neighboursCallbacks)
result = newFuture[seq[Node]]("waitNeighbours")
let fut = result
var neighbours = newSeqOfCap[Node](BUCKET_SIZE)
k.neighboursCallbacks[remote] = proc(n: seq[Node]) =
# This callback is expected to be called multiple times because nodes usually
# split the neighbours replies into multiple packets, so we only complete the
# future event.set() we've received enough neighbours.
for i in n:
if i != k.thisNode:
neighbours.add(i)
if neighbours.len == BUCKET_SIZE:
k.neighboursCallbacks.del(remote)
assert(not fut.finished)
fut.complete(neighbours)
onTimeout:
if not fut.finished:
k.neighboursCallbacks.del(remote)
fut.complete(neighbours)
proc populateNotFullBuckets(k: KademliaProtocol) =
## Go through all buckets that are not full and try to fill them.
##
## For every node in the replacement cache of every non-full bucket, try to bond.
## When the bonding succeeds the node is automatically added to the bucket.
for bucket in k.routing.notFullBuckets:
for node in bucket.replacementCache:
asyncCheck k.bond(node)
proc bond(k: KademliaProtocol, n: Node): Future[bool] {.async.} =
## Bond with the given node.
##
## Bonding consists of pinging the node, waiting for a pong and maybe a ping as well.
## It is necessary to do this at least once before we send findNode requests to a node.
if n in k.routing:
return true
let token = k.ping(n)
let gotPong = await k.waitPong(n, token)
if not gotPong:
debug "bonding failed, didn't receive pong from ", n
# Drop the failing node and schedule a populateNotFullBuckets() call to try and
# fill its spot.
k.routing.removeNode(n)
k.populateNotFullBuckets()
return false
# Give the remote node a chance to ping us before we move on and start sending findNode
# requests. It is ok for waitPing() to timeout and return false here as that just means
# the remote remembers us.
discard await k.waitPing(n)
debug "bonding completed successfully with ", n
k.updateRoutingTable(n)
return true
proc sortByDistance(nodes: var seq[Node], nodeId: NodeId, maxResults = 0) =
nodes = nodes.sortedByIt(it.distanceTo(nodeId))
if maxResults != 0 and nodes.len > maxResults:
nodes.setLen(maxResults)
proc lookup*(k: KademliaProtocol, nodeId: NodeId): Future[seq[Node]] {.async.} =
## Lookup performs a network search for nodes close to the given target.
## It approaches the target by querying nodes that are closer to it on each iteration. The
## given target does not need to be an actual node identifier.
var nodesAsked = initSet[Node]()
var nodesSeen = initSet[Node]()
proc findNode(nodeId: NodeId, remote: Node): Future[seq[Node]] {.async.} =
k.wire.sendFindNode(remote, nodeId)
var candidates = await k.waitNeighbours(remote)
if candidates.len == 0:
debug "got no candidates from ", remote, ", returning"
result = candidates
else:
# The following line:
# 1. Add new candidates to nodesSeen so that we don't attempt to bond with failing ones
# in the future
# 2. Removes all previously seen nodes from candidates
# 3. Deduplicates candidates
candidates.keepItIf(not nodesSeen.containsOrIncl(it))
debug "got ", candidates.len, " new candidates"
let bonded = await all(candidates.mapIt(k.bond(it)))
for i in 0 ..< bonded.len:
if not bonded[i]: candidates[i] = nil
candidates.keepItIf(not it.isNil)
debug "bonded with ", candidates.len, " candidates"
result = candidates
proc excludeIfAsked(nodes: seq[Node]): seq[Node] =
result = toSeq(items(nodes.toSet() - nodesAsked))
sortByDistance(result, nodeId, FIND_CONCURRENCY)
var closest = k.routing.neighbours(nodeId)
debug "starting lookup; initial neighbours: ", closest
var nodesToAsk = excludeIfAsked(closest)
while nodesToAsk.len != 0:
debug "node lookup; querying ", nodesToAsk
nodesAsked.incl(nodesToAsk.toSet())
let results = await all(nodesToAsk.mapIt(findNode(nodeId, it)))
for candidates in results:
closest.add(candidates)
sortByDistance(closest, nodeId, BUCKET_SIZE)
nodesToAsk = excludeIfAsked(closest)
info "lookup finished for ", nodeId.toHex(), ": ", closest
result = closest
proc lookupRandom*(k: KademliaProtocol): Future[seq[Node]] =
var id: NodeId
discard randomBytes(addr id.table[0], sizeof(id.table))
k.lookup(id)
proc resolve*(k: KademliaProtocol, id: NodeId): Future[Node] {.async.} =
let closest = await k.lookup(id)
for n in closest:
if n.id == id: return n
proc bootstrap*(k: KademliaProtocol, bootstrapNodes: seq[Node]) {.async.} =
let bonded = await all(bootstrapNodes.mapIt(k.bond(it)))
if true notin bonded:
info "Failed to bond with bootstrap nodes "
return
discard await k.lookupRandom()
proc recvPong*(k: KademliaProtocol, node: Node, token: seq[byte]) =
debug "<<< pong from ", node
let pingid = token & @(node.pubkey.raw_key)
var future: Future[bool]
if k.pongFutures.take(pingid, future):
future.complete(true)
proc recvPing*(k: KademliaProtocol, node: Node, msgHash: any) =
debug "<<< ping from ", node
k.updateRoutingTable(node)
k.wire.sendPong(node, msgHash)
var future: Future[bool]
if k.pingFutures.take(node, future):
future.complete(true)
proc recvNeighbours*(k: KademliaProtocol, remote: Node, neighbours: seq[Node]) =
## Process a neighbours response.
##
## Neighbours responses should only be received as a reply to a find_node, and that is only
## done as part of node lookup, so the actual processing is left to the callback from
## neighbours_callbacks, which is added (and removed after it's done or timed out) in
## wait_neighbours().
debug "<<< neighbours from ", remote, ": ", neighbours
let cb = k.neighboursCallbacks.getOrDefault(remote)
if not cb.isNil:
cb(neighbours)
else:
debug "unexpected neighbours from ", remote, ", probably came too late"
proc recvFindNode*(k: KademliaProtocol, remote: Node, nodeId: NodeId) =
if remote notin k.routing:
# FIXME: This is not correct; a node we've bonded before may have become unavailable
# and thus removed from self.routing, but once it's back online we should accept
# find_nodes from them.
debug "Ignoring find_node request from unknown node ", remote
return
k.updateRoutingTable(remote)
var found = k.routing.neighbours(nodeId)
found.sort() do(x, y: Node) -> int: cmp(x.id, y.id)
k.wire.sendNeighbours(remote, found)
when isMainModule:
proc randomNode(): Node =
newNode("enode://aa36fdf33dd030378a0168efe6ed7d5cc587fafa3cdd375854fe735a2e11ea3650ba29644e2db48368c46e1f60e716300ba49396cd63778bf8a818c09bded46f@13.93.211.84:30303")
var nodes = @[randomNode()]
doAssert(computeSharedPrefixBits(nodes) == ID_SIZE)
nodes.add(randomNode())
nodes[0].id = 0b1.u256
nodes[1].id = 0b0.u256
doAssert(computeSharedPrefixBits(nodes) == ID_SIZE - 1)
nodes[0].id = 0b010.u256
nodes[1].id = 0b110.u256
doAssert(computeSharedPrefixBits(nodes) == ID_SIZE - 3)

3
tests/tdiscovery.nim Normal file
View File

@ -0,0 +1,3 @@
import ../ethp2p/discovery
# TODO: