Moved eth_p2p to asyncdispatch2.

Fix some warnings at rlpx.nim.
Commented debug echo in rlpx.nim.
This commit is contained in:
cheatfate 2018-06-17 13:21:06 +03:00 committed by zah
parent 812c6284c0
commit 24d762da34
16 changed files with 234 additions and 232 deletions

View File

@ -11,9 +11,8 @@
## This module implements Ethereum authentication ## This module implements Ethereum authentication
import endians import endians
import eth_keys, ecies, rlp import eth_keys, nimcrypto, rlp
import nimcrypto/sysrand, nimcrypto/hash, nimcrypto/utils, nimcrypto/hmac import ecies
import nimcrypto/rijndael, nimcrypto/keccak, nimcrypto/sha2
const const
SupportedRlpxVersion* = 4 SupportedRlpxVersion* = 4

View File

@ -9,9 +9,9 @@
# #
from strutils import nil from strutils import nil
import asyncnet, asyncdispatch, net, times, nativesockets, algorithm, logging import times, algorithm, logging
import asyncdispatch2, eth_keys, ranges, stint, nimcrypto, rlp
import kademlia, enode import kademlia, enode
import eth_keys, rlp, ranges, stint, nimcrypto
export Node export Node
@ -33,7 +33,6 @@ const
"enode://6456719e7267e061161c88720287a77b80718d2a3a4ff5daeba614d029dc77601b75e32190aed1c9b0b9ccb6fac3bcf000f48e54079fa79e339c25d8e9724226@127.0.0.1:30301" "enode://6456719e7267e061161c88720287a77b80718d2a3a4ff5daeba614d029dc77601b75e32190aed1c9b0b9ccb6fac3bcf000f48e54079fa79e339c25d8e9724226@127.0.0.1:30301"
] ]
# UDP packet constants. # UDP packet constants.
MAC_SIZE = 256 div 8 # 32 MAC_SIZE = 256 div 8 # 32
SIG_SIZE = 520 div 8 # 65 SIG_SIZE = 520 div 8 # 65
@ -48,7 +47,7 @@ type
bootstrapNodes*: seq[Node] bootstrapNodes*: seq[Node]
thisNode*: Node thisNode*: Node
kademlia: KademliaProtocol[DiscoveryProtocol] kademlia: KademliaProtocol[DiscoveryProtocol]
socket: AsyncSocket transp: DatagramTransport
CommandId = enum CommandId = enum
cmdPing = 1 cmdPing = 1
@ -98,20 +97,9 @@ proc expiration(): uint32 =
# Wire protocol # Wire protocol
proc sendTo*(socket: AsyncFD, data: seq[byte], ip: IpAddress, port: Port,
flags = {SocketFlag.SafeDisconn}) {.async.} =
var sa: Sockaddr_storage
var ln: Socklen
ip.toSockaddr(port, sa, ln)
try:
await sendTo(socket, unsafeAddr data[0], data.len, cast[ptr Sockaddr](addr sa), ln)
except:
error "sendTo failed: ", getCurrentExceptionMsg()
proc send(d: DiscoveryProtocol, n: Node, data: seq[byte]) = proc send(d: DiscoveryProtocol, n: Node, data: seq[byte]) =
asyncCheck d.socket.getFd().AsyncFD.sendTo(data, let ta = initTAddress(n.node.address.ip, n.node.address.udpPort)
n.node.address.ip, asyncCheck d.transp.sendTo(ta, data)
n.node.address.udpPort)
proc sendPing*(d: DiscoveryProtocol, n: Node): seq[byte] = proc sendPing*(d: DiscoveryProtocol, n: Node): seq[byte] =
let payload = rlp.encode((PROTO_VERSION, d.address, n.node.address, let payload = rlp.encode((PROTO_VERSION, d.address, n.node.address,
@ -157,7 +145,9 @@ proc sendNeighbours*(d: DiscoveryProtocol, node: Node, neighbours: seq[Node]) =
if nodes.len != 0: flush() if nodes.len != 0: flush()
proc newDiscoveryProtocol*(privKey: PrivateKey, address: Address, bootstrapNodes: openarray[ENode]): DiscoveryProtocol = proc newDiscoveryProtocol*(privKey: PrivateKey, address: Address,
bootstrapNodes: openarray[ENode]
): DiscoveryProtocol =
result.new() result.new()
result.privKey = privKey result.privKey = privKey
result.address = address result.address = address
@ -166,7 +156,8 @@ proc newDiscoveryProtocol*(privKey: PrivateKey, address: Address, bootstrapNodes
result.thisNode = newNode(privKey.getPublicKey(), address) result.thisNode = newNode(privKey.getPublicKey(), address)
result.kademlia = newKademliaProtocol(result.thisNode, result) {.explain.} result.kademlia = newKademliaProtocol(result.thisNode, result) {.explain.}
proc recvPing(d: DiscoveryProtocol, node: Node, msgHash: MDigest[256]) {.inline.} = proc recvPing(d: DiscoveryProtocol, node: Node,
msgHash: MDigest[256]) {.inline.} =
d.kademlia.recvPing(node, msgHash) d.kademlia.recvPing(node, msgHash)
proc recvPong(d: DiscoveryProtocol, node: Node, payload: Bytes) {.inline.} = proc recvPong(d: DiscoveryProtocol, node: Node, payload: Bytes) {.inline.} =
@ -174,7 +165,8 @@ proc recvPong(d: DiscoveryProtocol, node: Node, payload: Bytes) {.inline.} =
let tok = rlp.listElem(1).toBytes().toSeq() let tok = rlp.listElem(1).toBytes().toSeq()
d.kademlia.recvPong(node, tok) d.kademlia.recvPong(node, tok)
proc recvNeighbours(d: DiscoveryProtocol, node: Node, payload: Bytes) {.inline.} = proc recvNeighbours(d: DiscoveryProtocol, node: Node,
payload: Bytes) {.inline.} =
let rlp = rlpFromBytes(payload.toRange) let rlp = rlpFromBytes(payload.toRange)
let neighboursList = rlp.listElem(0) let neighboursList = rlp.listElem(0)
let sz = neighboursList.listLen() let sz = neighboursList.listLen()
@ -245,28 +237,23 @@ proc receive(d: DiscoveryProtocol, a: Address, msg: Bytes) =
else: else:
error "Wrong msg mac from ", a error "Wrong msg mac from ", a
proc runListeningLoop(d: DiscoveryProtocol) {.async.} = proc processClient(transp: DatagramTransport,
var buf = newSeq[byte](MaxDgramSize) raddr: TransportAddress): Future[void] {.async, gcsafe.} =
var saddr: Sockaddr_storage var proto = getUserData[DiscoveryProtocol](transp)
var slen: Socklen var buf: seq[byte]
while not d.socket.isNil: try:
buf.setLen(MaxDgramSize) # TODO: Maybe here better to use `peekMessage()` to avoid allocation,
slen = sizeof(saddr).Socklen # but `Bytes` object is just a simple seq[byte], and `ByteRange` object
let received = await recvFromInto(d.socket.getFd().AsyncFD, addr buf[0], buf.len, cast[ptr SockAddr](addr saddr), addr slen) # do not support custom length.
buf.setLen(received) var buf = transp.getMessage()
try: let a = Address(ip: raddr.address, udpPort: raddr.port, tcpPort: raddr.port)
var port: Port proto.receive(a, buf)
var ip: IpAddress except:
fromSockAddr(saddr, slen, ip, port) error "receive failed: ", getCurrentExceptionMsg()
d.receive(Address(ip: ip, udpPort: port, tcpPort: port), buf)
except:
error "receive failed: ", getCurrentExceptionMsg()
proc open*(d: DiscoveryProtocol) = proc open*(d: DiscoveryProtocol) =
d.socket = newAsyncSocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP) let ta = initTAddress(d.address.ip, d.address.udpPort)
d.socket.bindAddr(port = d.address.udpPort) d.transp = newDatagramTransport(processClient, udata = d, local = ta)
asyncCheck d.runListeningLoop()
proc bootstrap*(d: DiscoveryProtocol) {.async.} = proc bootstrap*(d: DiscoveryProtocol) {.async.} =
await d.kademlia.bootstrap(d.bootstrapNodes) await d.kademlia.bootstrap(d.bootstrapNodes)

View File

@ -10,10 +10,7 @@
## This module implements ECIES method encryption/decryption. ## This module implements ECIES method encryption/decryption.
import eth_keys import eth_keys, nimcrypto
import nimcrypto/sha2, nimcrypto/hash, nimcrypto/hmac
import nimcrypto/rijndael, nimcrypto/utils, nimcrypto/sysrand
import nimcrypto/bcmode, nimcrypto/utils
const const
emptyMac* = array[0, byte]([]) emptyMac* = array[0, byte]([])

View File

@ -8,7 +8,8 @@
# MIT license (LICENSE-MIT) # MIT license (LICENSE-MIT)
# #
import uri, eth_keys, strutils, net import uri, strutils, net
import eth_keys
type type
ENodeStatus* = enum ENodeStatus* = enum

View File

@ -8,8 +8,7 @@
# MIT license (LICENSE-MIT) # MIT license (LICENSE-MIT)
# #
import import rlp/types, nimcrypto/hash, stint
rlp/types, nimcrypto/hash, stint
export export
MDigest MDigest

View File

@ -8,14 +8,12 @@
# MIT license (LICENSE-MIT) # MIT license (LICENSE-MIT)
# #
import asyncdispatch, net, uri, logging, tables, hashes, times, algorithm, sets, import uri, logging, tables, hashes, times, algorithm, sets, sequtils, random
sequtils, random
from strutils import parseInt from strutils import parseInt
import asyncdispatch2, eth_keys, stint, nimcrypto, enode
export sets # TODO: This should not be needed, but compilation fails otherwise export sets # TODO: This should not be needed, but compilation fails otherwise
import eth_keys, stint, nimcrypto, enode
type type
KademliaProtocol* [Wire] = ref object KademliaProtocol* [Wire] = ref object
wire: Wire wire: Wire
@ -23,7 +21,7 @@ type
routing: RoutingTable routing: RoutingTable
pongFutures: Table[seq[byte], Future[bool]] pongFutures: Table[seq[byte], Future[bool]]
pingFutures: Table[Node, Future[bool]] pingFutures: Table[Node, Future[bool]]
neighboursCallbacks: Table[Node, proc(n: seq[Node])] neighboursCallbacks: Table[Node, proc(n: seq[Node]) {.gcsafe.}]
NodeId* = UInt256 NodeId* = UInt256
@ -44,8 +42,8 @@ type
const const
BUCKET_SIZE = 16 BUCKET_SIZE = 16
BITS_PER_HOP = 8 BITS_PER_HOP = 8
REQUEST_TIMEOUT = 0.9 # timeout of message round trips REQUEST_TIMEOUT = 900 # timeout of message round trips
FIND_CONCURRENCY = 3 # parallel find node lookups FIND_CONCURRENCY = 3 # parallel find node lookups
ID_SIZE = 256 ID_SIZE = 256
proc toNodeId(pk: PublicKey): NodeId = proc toNodeId(pk: PublicKey): NodeId =
@ -246,7 +244,7 @@ proc updateRoutingTable(k: KademliaProtocol, n: Node) =
asyncCheck k.bond(evictionCandidate) asyncCheck k.bond(evictionCandidate)
proc doSleep(p: proc()) {.async.} = proc doSleep(p: proc()) {.async.} =
await sleepAsync(REQUEST_TIMEOUT * 1000) await sleepAsync(REQUEST_TIMEOUT)
p() p()
template onTimeout(b: untyped) = template onTimeout(b: untyped) =
@ -266,7 +264,7 @@ proc waitPong(k: KademliaProtocol, n: Node, token: seq[byte]): Future[bool] =
proc ping(k: KademliaProtocol, n: Node): seq[byte] = proc ping(k: KademliaProtocol, n: Node): seq[byte] =
assert(n != k.thisNode) assert(n != k.thisNode)
k.wire.sendPing(n) result = k.wire.sendPing(n)
proc waitPing(k: KademliaProtocol, n: Node): Future[bool] = proc waitPing(k: KademliaProtocol, n: Node): Future[bool] =
result = newFuture[bool]("waitPing") result = newFuture[bool]("waitPing")
@ -409,7 +407,6 @@ proc bootstrap*(k: KademliaProtocol, bootstrapNodes: seq[Node]) {.async.} =
proc recvPong*(k: KademliaProtocol, n: Node, token: seq[byte]) = proc recvPong*(k: KademliaProtocol, n: Node, token: seq[byte]) =
debug "<<< pong from ", n debug "<<< pong from ", n
let pingid = token & @(n.node.pubkey.data) let pingid = token & @(n.node.pubkey.data)
var future: Future[bool] var future: Future[bool]
if k.pongFutures.take(pingid, future): if k.pongFutures.take(pingid, future):

View File

@ -8,8 +8,8 @@
# MIT license (LICENSE-MIT) # MIT license (LICENSE-MIT)
# #
import logging, tables, asyncdispatch, times, random import logging, tables, times, random
import eth_keys import eth_keys, asyncdispatch2
import discovery, rlpx, kademlia import discovery, rlpx, kademlia
type type
@ -32,9 +32,8 @@ const
lookupInterval = 5 lookupInterval = 5
connectLoopSleepMs = 2000 connectLoopSleepMs = 2000
proc newPeerPool*(chainDb: AsyncChainDb, networkId: int, keyPair: KeyPair, proc newPeerPool*(chainDb: AsyncChainDb, networkId: int, keyPair: KeyPair,
discovery: DiscoveryProtocol, minPeers = 10): PeerPool = discovery: DiscoveryProtocol, minPeers = 10): PeerPool =
result.new() result.new()
result.keyPair = keyPair result.keyPair = keyPair
result.minPeers = minPeers result.minPeers = minPeers
@ -74,7 +73,7 @@ proc connect(p: PeerPool, remote: Node): Future[Peer] {.async.} =
debug "Skipping ", remote, "; already connected to it" debug "Skipping ", remote, "; already connected to it"
return nil return nil
result = await rlpxConnect(p.keyPair, p.listenPort, remote) result = await remote.rlpxConnect(p.keyPair, p.listenPort)
# expected_exceptions = ( # expected_exceptions = (
# UnreachablePeer, TimeoutError, PeerConnectionLost, HandshakeFailure) # UnreachablePeer, TimeoutError, PeerConnectionLost, HandshakeFailure)
@ -95,15 +94,16 @@ proc connect(p: PeerPool, remote: Node): Future[Peer] {.async.} =
# return None # return None
proc lookupRandomNode(p: PeerPool) {.async.} = proc lookupRandomNode(p: PeerPool) {.async.} =
# This method runs in the background, so we must catch OperationCancelled here otherwise # This method runs in the background, so we must catch OperationCancelled
# asyncio will warn that its exception was never retrieved. # ere otherwise asyncio will warn that its exception was never retrieved.
try: try:
discard await p.discovery.lookupRandom() discard await p.discovery.lookupRandom()
except: # OperationCancelled except: # OperationCancelled
discard discard
p.lastLookupTime = epochTime() p.lastLookupTime = epochTime()
proc getRandomBootnode(p: PeerPool): seq[Node] = @[p.discovery.bootstrapNodes.rand()] proc getRandomBootnode(p: PeerPool): seq[Node] =
@[p.discovery.bootstrapNodes.rand()]
proc peerFinished(p: PeerPool, peer: Peer) = proc peerFinished(p: PeerPool, peer: Peer) =
## Remove the given peer from our list of connected nodes. ## Remove the given peer from our list of connected nodes.
@ -117,8 +117,8 @@ proc run(p: Peer, completionHandler: proc() = nil) {.async.} =
proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} = proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} =
for node in nodes: for node in nodes:
# TODO: Consider changing connect() to raise an exception instead of returning None, # TODO: Consider changing connect() to raise an exception instead of
# as discussed in # returning None, as discussed in
# https://github.com/ethereum/py-evm/pull/139#discussion_r152067425 # https://github.com/ethereum/py-evm/pull/139#discussion_r152067425
let peer = await p.connect(node) let peer = await p.connect(node)
if not peer.isNil: if not peer.isNil:
@ -143,8 +143,9 @@ proc maybeConnectToMorePeers(p: PeerPool) {.async.} =
await p.connectToNodes(p.nodesToConnect()) await p.connectToNodes(p.nodesToConnect())
# In some cases (e.g ROPSTEN or private testnets), the discovery table might be full of # In some cases (e.g ROPSTEN or private testnets), the discovery table might
# bad peers so if we can't connect to any peers we try a random bootstrap node as well. # be full of bad peers, so if we can't connect to any peers we try a random
# bootstrap node as well.
if p.connectedNodes.len == 0: if p.connectedNodes.len == 0:
await p.connectToNodes(p.getRandomBootnode()) await p.connectToNodes(p.getRandomBootnode())
@ -156,7 +157,8 @@ proc run(p: PeerPool) {.async.} =
try: try:
await p.maybeConnectToMorePeers() await p.maybeConnectToMorePeers()
except: except:
# Most unexpected errors should be transient, so we log and restart from scratch. # Most unexpected errors should be transient, so we log and restart from
# scratch.
error "Unexpected error, restarting" error "Unexpected error, restarting"
dropConnections = true dropConnections = true
@ -173,7 +175,8 @@ proc start*(p: PeerPool) =
# @property # @property
# def peers(self) -> List[BasePeer]: # def peers(self) -> List[BasePeer]:
# peers = list(self.connected_nodes.values()) # peers = list(self.connected_nodes.values())
# # Shuffle the list of peers so that dumb callsites are less likely to send all requests to # # Shuffle the list of peers so that dumb callsites are less likely to send
# # all requests to
# # a single peer even if they always pick the first one from the list. # # a single peer even if they always pick the first one from the list.
# random.shuffle(peers) # random.shuffle(peers)
# return peers # return peers

View File

@ -8,10 +8,10 @@
# MIT license (LICENSE-MIT) # MIT license (LICENSE-MIT)
# #
import import macros, sets, algorithm, logging, hashes
macros, sets, algorithm, async, asyncnet, asyncfutures, net, logging, import rlp, ranges/[stackarrays, ptr_arith], eth_keys, ethereum_types,
hashes, rlp, ranges/[stackarrays, ptr_arith], eth_keys, nimcrypto, asyncdispatch2
ethereum_types, kademlia, discovery, auth, rlpxcrypt, nimcrypto, enode import kademlia, discovery, auth, rlpxcrypt, enode
type type
ConnectionState = enum ConnectionState = enum
@ -21,7 +21,7 @@ type
Disconnected Disconnected
Peer* = ref object Peer* = ref object
socket: AsyncSocket transp: StreamTransport
dispatcher: Dispatcher dispatcher: Dispatcher
networkId: int networkId: int
secretsState: SecretState secretsState: SecretState
@ -75,6 +75,7 @@ const
baseProtocolVersion = 4 baseProtocolVersion = 4
clienId = "Nimbus 0.1.0" clienId = "Nimbus 0.1.0"
# TODO: Usage of this variables causes GCSAFE problems.
var var
gProtocols: seq[ProtocolInfo] gProtocols: seq[ProtocolInfo]
gCapabilities: seq[Capability] gCapabilities: seq[Capability]
@ -115,7 +116,7 @@ proc getDispatcher(otherPeerCapabilities: openarray[Capability]): Dispatcher =
var nextUserMsgId = 0x10 var nextUserMsgId = 0x10
for i in 0 .. <gProtocols.len: for i in 0..<gProtocols.len:
let localProtocol = gProtocols[i] let localProtocol = gProtocols[i]
block findMatchingProtocol: block findMatchingProtocol:
@ -139,7 +140,7 @@ proc getDispatcher(otherPeerCapabilities: openarray[Capability]): Dispatcher =
result.thunks = newSeq[MessageHandler](nextUserMsgId) result.thunks = newSeq[MessageHandler](nextUserMsgId)
devp2p.messages.copyTo(result.thunks, 0) devp2p.messages.copyTo(result.thunks, 0)
for i in 0 .. <gProtocols.len: for i in 0..<gProtocols.len:
if result.protocolOffsets[i] != -1: if result.protocolOffsets[i] != -1:
gProtocols[i].messages.copyTo(result.thunks, result.protocolOffsets[i]) gProtocols[i].messages.copyTo(result.thunks, result.protocolOffsets[i])
@ -195,11 +196,13 @@ proc read*(rlp: var Rlp, T: typedesc[KeccakHash]): T =
# Message composition and encryption # Message composition and encryption
# #
proc writeMsgId(p: ProtocolInfo, msgId: int, peer: Peer, rlpOut: var RlpWriter) = proc writeMsgId(p: ProtocolInfo, msgId: int, peer: Peer,
rlpOut: var RlpWriter) =
let baseMsgId = peer.dispatcher.protocolOffsets[p.index] let baseMsgId = peer.dispatcher.protocolOffsets[p.index]
if baseMsgId == -1: if baseMsgId == -1:
raise newException(UnsupportedProtocol, raise newException(UnsupportedProtocol,
p.nameStr & " is not supported by peer " & $peer.remote.id) p.nameStr & " is not supported by peer " &
$peer.remote.id)
rlpOut.append(baseMsgId + msgId) rlpOut.append(baseMsgId + msgId)
proc dispatchMsg(peer: Peer, msgId: int, msgData: var Rlp) = proc dispatchMsg(peer: Peer, msgId: int, msgData: var Rlp) =
@ -219,26 +222,13 @@ proc send(p: Peer, data: BytesRange) {.async.} =
# echo "sending: ", rlp.read(int) # echo "sending: ", rlp.read(int)
# echo "payload: ", rlp.inspect # echo "payload: ", rlp.inspect
var cipherText = encryptMsg(data, p.secretsState) var cipherText = encryptMsg(data, p.secretsState)
await p.socket.send(addr cipherText[0], cipherText.len) var res = await p.transp.write(cipherText)
proc fullRecvInto(s: AsyncSocket, buffer: pointer, bufferLen: int) {.async.} =
# XXX: This should be a library function
var receivedBytes = 0
while receivedBytes < bufferLen:
let sz = await s.recvInto(buffer.shift(receivedBytes),
bufferLen - receivedBytes)
if sz == 0:
raise newException(IOError, "Socket disconnected")
receivedBytes += sz
template fullRecvInto(s: AsyncSocket, buff: var openarray[byte]): auto =
fullRecvInto(s, addr buff[0], buff.len)
proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} = proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} =
## This procs awaits the next complete RLPx message in the TCP stream ## This procs awaits the next complete RLPx message in the TCP stream
var headerBytes: array[32, byte] var headerBytes: array[32, byte]
await peer.socket.fullRecvInto(headerBytes) await peer.transp.readExactly(addr headerBytes[0], 32)
var msgSize: int var msgSize: int
if decryptHeaderAndGetMsgSize(peer.secretsState, if decryptHeaderAndGetMsgSize(peer.secretsState,
@ -248,7 +238,7 @@ proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} =
let remainingBytes = encryptedLength(msgSize) - 32 let remainingBytes = encryptedLength(msgSize) - 32
# XXX: Migrate this to a thread-local seq # XXX: Migrate this to a thread-local seq
var encryptedBytes = newSeq[byte](remainingBytes) var encryptedBytes = newSeq[byte](remainingBytes)
await peer.socket.fullRecvInto(encryptedBytes) await peer.transp.readExactly(addr encryptedBytes[0], len(encryptedBytes))
let decryptedMaxLength = decryptedLength(msgSize) let decryptedMaxLength = decryptedLength(msgSize)
var var
@ -288,7 +278,7 @@ iterator typedParams(n: NimNode, skip = 0): (NimNode, NimNode) =
let paramNodes = n.params[i] let paramNodes = n.params[i]
let paramType = paramNodes[^2] let paramType = paramNodes[^2]
for j in 0 .. < (paramNodes.len-2): for j in 0..<(paramNodes.len - 2):
yield (paramNodes[j], paramType) yield (paramNodes[j], paramType)
proc chooseFieldType(n: NimNode): NimNode = proc chooseFieldType(n: NimNode): NimNode =
@ -300,7 +290,7 @@ proc chooseFieldType(n: NimNode): NimNode =
result = n result = n
if n.kind == nnkBracketExpr and if n.kind == nnkBracketExpr and
n[0].kind == nnkIdent and n[0].kind == nnkIdent and
$n[0].ident == "openarray": n[0].eqIdent("openarray"):
result = n.copyNimTree result = n.copyNimTree
result[0] = newIdentNode("seq") result[0] = newIdentNode("seq")
@ -351,7 +341,7 @@ macro rlpxProtocol*(protoIdentifier: untyped,
for n in body: for n in body:
case n.kind case n.kind
of {nnkCall, nnkCommand}: of {nnkCall, nnkCommand}:
if n.len == 2 and n[0].kind == nnkIdent and $n[0].ident == "nextID": if n.len == 2 and n[0].kind == nnkIdent and n[0].eqIdent("nextID"):
if n[1].kind == nnkIntLit: if n[1].kind == nnkIntLit:
nextId = n[1].intVal nextId = n[1].intVal
else: else:
@ -359,7 +349,7 @@ macro rlpxProtocol*(protoIdentifier: untyped,
else: else:
error(repr(n) & " is not a recognized call in RLPx protocol definitions", n) error(repr(n) & " is not a recognized call in RLPx protocol definitions", n)
of nnkTypeSection: of nnkTypeSection:
if n.len == 1 and n[0][0].kind == nnkIdent and $n[0][0].ident == "State": if n.len == 1 and n[0][0].kind == nnkIdent and n[0][0].eqIdent("State"):
stateType = genSym(nskType, protoName & "State") stateType = genSym(nskType, protoName & "State")
n[0][0] = stateType n[0][0] = stateType
result.add n result.add n
@ -372,7 +362,7 @@ macro rlpxProtocol*(protoIdentifier: untyped,
of nnkProcDef: of nnkProcDef:
let let
msgIdent = n.name.ident msgIdent = n.name.ident
msgName = $msgIdent msgName = n.name.strVal
var var
thunkName = newNilLit() thunkName = newNilLit()
@ -455,7 +445,7 @@ macro rlpxProtocol*(protoIdentifier: untyped,
# XXX TODO: check that the first param has the correct type # XXX TODO: check that the first param has the correct type
n.params[1][0] = peer n.params[1][0] = peer
echo n.params.treeRepr # echo n.params.treeRepr
n.params[0] = newTree(nnkBracketExpr, n.params[0] = newTree(nnkBracketExpr,
newIdentNode("Future"), newIdentNode("void")) newIdentNode("Future"), newIdentNode("void"))
@ -541,90 +531,105 @@ proc connectionEstablished(p: Peer, h: p2p.hello) =
newSeq(p.protocolStates, gProtocols.len) newSeq(p.protocolStates, gProtocols.len)
# XXX: initialize the sub-protocol states # XXX: initialize the sub-protocol states
proc initSecretState(hs: var Handshake, authMsg, ackMsg: openarray[byte], p: Peer) = proc initSecretState(hs: var Handshake, authMsg, ackMsg: openarray[byte],
p: Peer) =
var secrets: ConnectionSecret var secrets: ConnectionSecret
check hs.getSecrets(authMsg, ackMsg, secrets) check hs.getSecrets(authMsg, ackMsg, secrets)
initSecretState(secrets, p.secretsState) initSecretState(secrets, p.secretsState)
burnMem(secrets) burnMem(secrets)
proc rlpxConnect*(myKeys: KeyPair, listenPort: Port, remote: Node): Future[Peer] {.async.} = proc rlpxConnect*(remote: Node, myKeys: KeyPair,
# TODO: Make sure to close the socket in case of exception listenPort: Port): Future[Peer] {.async.} =
new result new result
result.socket = newAsyncSocket()
result.remote = remote result.remote = remote
await result.socket.connect($remote.node.address.ip, remote.node.address.tcpPort) let ta = initTAddress(remote.node.address.ip, remote.node.address.tcpPort)
try:
result.transp = await connect(ta)
var handshake = newHandshake({Initiator}) var handshake = newHandshake({Initiator})
handshake.host = myKeys handshake.host = myKeys
var authMsg: array[AuthMessageMaxEIP8, byte] var authMsg: array[AuthMessageMaxEIP8, byte]
var authMsgLen = 0 var authMsgLen = 0
check authMessage(handshake, remote.node.pubkey, authMsg, authMsgLen) check authMessage(handshake, remote.node.pubkey, authMsg, authMsgLen)
await result.socket.send(addr authMsg[0], authMsgLen) var res = result.transp.write(addr authMsg[0], authMsgLen)
let initialSize = handshake.expectedLength let initialSize = handshake.expectedLength
var ackMsg = newSeqOfCap[byte](1024) var ackMsg = newSeqOfCap[byte](1024)
ackMsg.setLen(initialSize) ackMsg.setLen(initialSize)
await result.socket.fullRecvInto(ackMsg)
var ret = handshake.decodeAckMessage(ackMsg)
if ret == AuthStatus.IncompleteError:
ackMsg.setLen(handshake.expectedLength)
await result.socket.fullRecvInto(addr ackMsg[initialSize],
len(ackMsg) - initialSize)
ret = handshake.decodeAckMessage(ackMsg)
check ret
initSecretState(handshake, ^authMsg, ackMsg, result) await result.transp.readExactly(addr ackMsg[0], len(ackMsg))
if handshake.remoteHPubkey != remote.node.pubKey: var ret = handshake.decodeAckMessage(ackMsg)
raise newException(Exception, "Remote pubkey is wrong") if ret == AuthStatus.IncompleteError:
ackMsg.setLen(handshake.expectedLength)
await result.transp.readExactly(addr ackMsg[initialSize],
len(ackMsg) - initialSize)
ret = handshake.decodeAckMessage(ackMsg)
check ret
discard result.hello(baseProtocolVersion, clienId, initSecretState(handshake, ^authMsg, ackMsg, result)
gCapabilities, uint(listenPort), myKeys.pubkey.getRaw())
var response = await result.nextMsg(p2p.hello, discardOthers = true) # if handshake.remoteHPubkey != remote.node.pubKey:
# raise newException(Exception, "Remote pubkey is wrong")
if not validatePubKeyInHello(response, remote.node.pubKey): discard result.hello(baseProtocolVersion, clienId, gCapabilities,
warn "Remote nodeId is not its public key" # XXX: Do we care? uint(listenPort), myKeys.pubkey.getRaw())
connectionEstablished(result, response) var response = await result.nextMsg(p2p.hello, discardOthers = true)
proc rlpxConnectIncoming*(myKeys: KeyPair, listenPort: Port, address: IpAddress, s: AsyncSocket): Future[Peer] {.async.} = if not validatePubKeyInHello(response, remote.node.pubKey):
warn "Remote nodeId is not its public key" # XXX: Do we care?
connectionEstablished(result, response)
except:
if not isNil(result.transp):
result.transp.close()
proc rlpxAccept*(transp: StreamTransport,
myKeys: KeyPair): Future[Peer] {.async.} =
new result new result
result.socket = s result.transp = transp
var handshake = newHandshake({Responder}) var handshake = newHandshake({Responder})
handshake.host = myKeys handshake.host = myKeys
let initialSize = handshake.expectedLength try:
var authMsg = newSeqOfCap[byte](1024) let initialSize = handshake.expectedLength
authMsg.setLen(initialSize) var authMsg = newSeqOfCap[byte](1024)
await s.fullRecvInto(authMsg) authMsg.setLen(initialSize)
var ret = handshake.decodeAuthMessage(authMsg) await transp.readExactly(addr authMsg[0], len(authMsg))
if ret == AuthStatus.IncompleteError: # Eip8 auth message is likely var ret = handshake.decodeAuthMessage(authMsg)
authMsg.setLen(handshake.expectedLength) if ret == AuthStatus.IncompleteError: # Eip8 auth message is likely
await s.fullRecvInto(addr authMsg[initialSize], len(authMsg) - initialSize) authMsg.setLen(handshake.expectedLength)
ret = handshake.decodeAuthMessage(authMsg) await transp.readExactly(addr authMsg[initialSize],
check ret len(authMsg) - initialSize)
ret = handshake.decodeAuthMessage(authMsg)
check ret
var ackMsg: array[AckMessageMaxEIP8, byte] var ackMsg: array[AckMessageMaxEIP8, byte]
var ackMsgLen: int var ackMsgLen: int
check handshake.ackMessage(ackMsg, ackMsgLen) check handshake.ackMessage(ackMsg, ackMsgLen)
await s.send(addr ackMsg[0], ackMsgLen) var res = transp.write(addr ackMsg[0], ackMsgLen)
initSecretState(handshake, authMsg, ^ackMsg, result) initSecretState(handshake, authMsg, ^ackMsg, result)
var response = await result.nextMsg(p2p.hello, discardOthers = true) var response = await result.nextMsg(p2p.hello, discardOthers = true)
discard result.hello(baseProtocolVersion, clienId, let listenPort = transp.localAddress().port
gCapabilities, listenPort.uint, myKeys.pubkey.getRaw()) discard result.hello(baseProtocolVersion, clienId,
gCapabilities, listenPort.uint, myKeys.pubkey.getRaw())
if validatePubKeyInHello(response, handshake.remoteHPubkey): if validatePubKeyInHello(response, handshake.remoteHPubkey):
warn "Remote nodeId is not its public key" # XXX: Do we care? warn "Remote nodeId is not its public key" # XXX: Do we care?
let port = Port(response.listenPort) let port = Port(response.listenPort)
let address = Address(ip: address, tcpPort: port, udpPort: port) let remote = transp.remoteAddress()
result.remote = newNode(initEnode(handshake.remoteHPubkey, address)) let address = Address(ip: remote.address, tcpPort: remote.port,
udpPort: remote.port)
result.remote = newNode(initEnode(handshake.remoteHPubkey, address))
connectionEstablished(result, response) connectionEstablished(result, response)
except:
transp.close()
when isMainModule: when isMainModule:
import rlp import rlp

View File

@ -10,10 +10,7 @@
## This module implements RLPx cryptography ## This module implements RLPx cryptography
import import ranges/stackarrays, rlp/types, nimcrypto
ranges/stackarrays, rlp/types,
nimcrypto/[rijndael, bcmode, keccak, utils]
from auth import ConnectionSecret from auth import ConnectionSecret
const const

View File

@ -1,52 +1,57 @@
import peer_pool, discovery, enode, async, asyncnet, auth, rlpx, net #
import eth_keys # Ethereum P2P
# (c) Copyright 2018
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
#
type Server* = ref object import asyncdispatch2, eth_keys
socket: AsyncSocket import peer_pool, discovery, enode, auth, rlpx
chainDb: AsyncChainDb
keyPair: KeyPair
address: Address
networkId: int
discovery: DiscoveryProtocol
peerPool: PeerPool
proc newServer*(keyPair: KeyPair, address: Address, chainDb: AsyncChainDB, type
bootstrapNodes: openarray[ENode], networkId: int): Server = P2PServer* = ref object
server: StreamServer
chainDb: AsyncChainDb
keyPair: KeyPair
address: Address
networkId: int
discovery: DiscoveryProtocol
peerPool: PeerPool
proc processIncoming(server: StreamServer,
remote: StreamTransport): Future[void] {.async, gcsafe.} =
var p2p = getUserData[P2PServer](server)
let peerfut = remote.rlpxAccept(p2p.keyPair)
yield peerfut
if not peerfut.failed:
let peer = peerfut.read()
echo "TODO: Add peer to the pool..."
else:
echo "Could not establish connection with incoming peer ",
$remote.remoteAddress()
remote.close()
proc newP2PServer*(keyPair: KeyPair, address: Address, chainDb: AsyncChainDB,
bootstrapNodes: openarray[ENode],
networkId: int): P2PServer =
result.new() result.new()
result.chainDb = chainDb result.chainDb = chainDb
result.keyPair = keyPair result.keyPair = keyPair
result.address = address result.address = address
result.networkId = networkId result.networkId = networkId
# TODO: bootstrap_nodes should be looked up by network_id. result.discovery = newDiscoveryProtocol(keyPair.seckey, address,
result.discovery = newDiscoveryProtocol(keyPair.seckey, address, bootstrapNodes) bootstrapNodes)
result.peerPool = newPeerPool(chainDb, networkId, keyPair, result.discovery) result.peerPool = newPeerPool(chainDb, networkId, keyPair, result.discovery)
proc isRunning(s: Server): bool {.inline.} = not s.socket.isNil let ta = initTAddress(address.ip, address.tcpPort)
result.server = createStreamServer(ta, processIncoming, {ReuseAddr},
udata = result)
proc receiveHandshake(s: Server, address: string, remote: AsyncSocket) {.async.} = proc start*(s: P2PServer) =
let p = await rlpxConnectIncoming(s.keyPair, s.address.tcpPort, parseIpAddress(address), remote) s.server.start()
if not p.isNil:
echo "TODO: Add peer to the pool..."
else:
echo "Could not establish connection with incoming peer"
proc run(s: Server) {.async.} = proc stop*(s: P2PServer) =
# TODO: Add error handling s.server.stop()
s.socket = newAsyncSocket()
s.socket.setSockOpt(OptReuseAddr, true)
s.socket.bindAddr(s.address.tcpPort)
s.socket.listen()
while s.isRunning:
let (address, client) = await s.socket.acceptAddr()
discard s.receiveHandshake(address, client)
proc start*(s: Server) =
if not s.isRunning:
discard s.run()
proc stop*(s: Server) =
if s.isRunning:
s.socket.close()
s.socket = nil
# s.peerPool.stop() # XXX

View File

@ -1,6 +1,15 @@
import #
eth_keys, net, asyncdispatch, sequtils, logging, byteutils, # Ethereum P2P
../eth_p2p/[discovery, kademlia, peer_pool, enode] # (c) Copyright 2018
# Status Research & Development GmbH
#
# See the file "LICENSE", included in this
# distribution, for details about the copyright.
#
import sequtils, logging
import eth_keys, asyncdispatch2, byteutils
import eth_p2p/[discovery, kademlia, peer_pool, enode]
addHandler(newConsoleLogger()) addHandler(newConsoleLogger())
@ -23,7 +32,6 @@ let
initPrivateKey("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a619"), initPrivateKey("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a619"),
initPrivateKey("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a620") initPrivateKey("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a620")
] ]
proc nodeIdInNodes(id: NodeId, nodes: openarray[Node]): bool = proc nodeIdInNodes(id: NodeId, nodes: openarray[Node]): bool =
for n in nodes: for n in nodes:
if id == n.id: return true if id == n.id: return true

View File

@ -7,9 +7,9 @@
# distribution, for details about the copyright. # distribution, for details about the copyright.
# #
import import unittest
unittest, eth_keys, nimcrypto/[utils, keccak], import eth_keys, nimcrypto/[utils, keccak]
eth_p2p/auth import eth_p2p/auth
# This was generated by `print` actual auth message generated by # This was generated by `print` actual auth message generated by
# https://github.com/ethereum/py-evm/blob/master/tests/p2p/test_auth.py # https://github.com/ethereum/py-evm/blob/master/tests/p2p/test_auth.py

View File

@ -7,10 +7,9 @@
# distribution, for details about the copyright. # distribution, for details about the copyright.
# #
import import unittest
unittest, eth_keys, import eth_keys, nimcrypto/[utils, sysrand, keccak]
nimcrypto/[utils, sysrand, nimcrypto/keccak], import eth_p2p/[auth, rlpxcrypt]
eth_p2p/[auth, rlpxcrypt]
const data = [ const data = [
("initiator_private_key", ("initiator_private_key",

View File

@ -7,10 +7,9 @@
# distribution, for details about the copyright. # distribution, for details about the copyright.
# #
import import unittest
unittest, eth_keys, import eth_keys, nimcrypto/[utils, sha2, hmac, rijndael]
nimcrypto/[utils, sha2, hmac, rijndael], import eth_p2p/ecies
eth_p2p/ecies
proc compare[A, B](x: openarray[A], y: openarray[B], s: int = 0): bool = proc compare[A, B](x: openarray[A], y: openarray[B], s: int = 0): bool =
result = true result = true

View File

@ -7,8 +7,8 @@
# Apache License, version 2.0, (LICENSE-APACHEv2) # Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT) # MIT license (LICENSE-MIT)
import import unittest, net
unittest, net, eth_p2p/enode import eth_p2p/enode
suite "ENode": suite "ENode":
test "Go-Ethereum tests": test "Go-Ethereum tests":

View File

@ -1,23 +1,29 @@
import #
eth_keys, net, asyncdispatch, sequtils, # Ethereum P2P
../eth_p2p/[discovery, kademlia, peer_pool, enode, server, rlpx] # (c) Copyright 2018
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import sequtils
import eth_keys, asyncdispatch2
import eth_p2p/[discovery, kademlia, peer_pool, enode, server, rlpx]
proc localAddress(port: int): Address = proc localAddress(port: int): Address =
let port = Port(port) let port = Port(port)
result = Address(udpPort: port, tcpPort: port, ip: parseIpAddress("127.0.0.1")) result = Address(udpPort: port, tcpPort: port, ip: parseIpAddress("127.0.0.1"))
proc test() {.async.} = proc test() {.async.} =
let kp = newKeyPair() let kp = newKeyPair()
let address = localAddress(20301) let address = localAddress(20301)
let s = newServer(kp, address, nil, [], 1) let s = newP2PServer(kp, address, nil, [], 1)
s.start() s.start()
await sleepAsync(500)
let n = newNode(initENode(kp.pubKey, address)) let n = newNode(initENode(kp.pubKey, address))
let peer = await rlpxConnect(newKeyPair(), Port(1234), n) let peer = await rlpxConnect(n, newKeyPair(), Port(1234))
doAssert(not peer.isNil) doAssert(not peer.isNil)