Fixed rlpx send, perform handshake in a test. ENode allows zero ports now.

This commit is contained in:
Yuriy Glukhov 2018-05-10 22:02:12 +03:00
parent e43fa613b6
commit 128460099a
6 changed files with 62 additions and 67 deletions

View File

@ -23,7 +23,7 @@ type
IncompleteENode ## Incomplete ENODE object IncompleteENode ## Incomplete ENODE object
Address* = object Address* = object
## Network address object ## Network address object
ip*: IpAddress ## IPv4/IPv6 address ip*: IpAddress ## IPv4/IPv6 address
udpPort*: Port ## UDP discovery port number udpPort*: Port ## UDP discovery port number
tcpPort*: Port ## TCP port number tcpPort*: Port ## TCP port number
@ -125,23 +125,13 @@ proc initENode*(uri: string): ENode {.inline.} =
if res != Success: if res != Success:
raiseENodeError(res) raiseENodeError(res)
proc initENode*(pubkey: PublicKey, address: Address): ENode = proc initENode*(pubkey: PublicKey, address: Address): ENode {.inline.} =
## Create ENode object from public key ``pubkey`` and ``address``. ## Create ENode object from public key ``pubkey`` and ``address``.
result.pubkey = pubkey result.pubkey = pubkey
if address.tcpPort == Port(0):
raiseENodeError(IncorrectPort)
if address.udpPort == Port(0):
raiseENodeError(IncorrectDiscPort)
result.address = address result.address = address
proc isCorrect*(n: ENode): bool = proc isCorrect*(n: ENode): bool =
## Returns ``true`` if ENode ``n`` is properly filled. ## Returns ``true`` if ENode ``n`` is properly filled.
if n.address.ip.family notin {IpAddressFamily.IPv4, IpAddressFamily.IPv6}:
return false
if n.address.tcpPort == Port(0):
return false
if n.address.udpPort == Port(0):
return false
result = false result = false
for i in n.pubkey.data: for i in n.pubkey.data:
if i != 0x00'u8: if i != 0x00'u8:
@ -162,8 +152,9 @@ proc `$`*(n: ENode): string =
result.add($n.pubkey) result.add($n.pubkey)
result.add("@") result.add("@")
result.add(ipaddr) result.add(ipaddr)
result.add(":") if uint16(n.address.tcpPort) != 0:
result.add($int(n.address.tcpPort)) result.add(":")
result.add($int(n.address.tcpPort))
if uint16(n.address.udpPort) != uint16(n.address.tcpPort): if uint16(n.address.udpPort) != uint16(n.address.tcpPort):
result.add("?") result.add("?")
result.add("discport=") result.add("discport=")

View File

@ -21,6 +21,7 @@ type
lastLookupTime: float lastLookupTime: float
connectedNodes: Table[Node, Peer] connectedNodes: Table[Node, Peer]
running: bool running: bool
listenPort*: Port
AsyncChainDb* = ref object # TODO: This should be defined elsewhere AsyncChainDb* = ref object # TODO: This should be defined elsewhere
@ -40,6 +41,7 @@ proc newPeerPool*(chainDb: AsyncChainDb, networkId: int, keyPair: KeyPair,
result.networkId = networkId result.networkId = networkId
result.discovery = discovery result.discovery = discovery
result.connectedNodes = initTable[Node, Peer]() result.connectedNodes = initTable[Node, Peer]()
result.listenPort = Port(30303)
template ensureFuture(f: untyped) = asyncCheck f template ensureFuture(f: untyped) = asyncCheck f
@ -72,7 +74,7 @@ proc connect(p: PeerPool, remote: Node): Future[Peer] {.async.} =
debug "Skipping ", remote, "; already connected to it" debug "Skipping ", remote, "; already connected to it"
return nil return nil
result = await rlpxConnect(p.keyPair, remote) result = await rlpxConnect(p.keyPair, p.listenPort, remote)
# expected_exceptions = ( # expected_exceptions = (
# UnreachablePeer, TimeoutError, PeerConnectionLost, HandshakeFailure) # UnreachablePeer, TimeoutError, PeerConnectionLost, HandshakeFailure)

View File

@ -9,13 +9,11 @@
# #
import import
macros, sets, algorithm, async, asyncnet, asyncfutures, net, macros, sets, algorithm, async, asyncnet, asyncfutures, net, logging,
hashes, rlp, ranges/[stackarrays, ptr_arith], eth_keys, hashes, rlp, ranges/[stackarrays, ptr_arith], eth_keys,
ethereum_types, kademlia, discovery, auth, rlpxcrypt ethereum_types, kademlia, discovery, auth, rlpxcrypt, nimcrypto, enode
type type
P2PNodeId = MDigest[512]
ConnectionState = enum ConnectionState = enum
None, None,
Connected, Connected,
@ -23,7 +21,6 @@ type
Disconnected Disconnected
Peer* = ref object Peer* = ref object
id: P2PNodeId # XXX: not fillet yed
socket: AsyncSocket socket: AsyncSocket
dispatcher: Dispatcher dispatcher: Dispatcher
networkId: int networkId: int
@ -200,7 +197,7 @@ proc writeMsgId(p: ProtocolInfo, msgId: int, peer: Peer, rlpOut: var RlpWriter)
let baseMsgId = peer.dispatcher.protocolOffsets[p.index] let baseMsgId = peer.dispatcher.protocolOffsets[p.index]
if baseMsgId == -1: if baseMsgId == -1:
raise newException(UnsupportedProtocol, raise newException(UnsupportedProtocol,
p.nameStr & " is not supported by peer " & $peer.id) p.nameStr & " is not supported by peer " & $peer.remote.id)
rlpOut.append(baseMsgId + msgId) rlpOut.append(baseMsgId + msgId)
proc dispatchMsg(peer: Peer, msgId: int, msgData: var Rlp) = proc dispatchMsg(peer: Peer, msgId: int, msgData: var Rlp) =
@ -215,9 +212,11 @@ proc dispatchMsg(peer: Peer, msgId: int, msgData: var Rlp) =
thunk(peer, msgData) thunk(peer, msgData)
proc send(p: Peer, data: BytesRange): Future[void] = proc send(p: Peer, data: BytesRange) {.async.} =
var cipherText = encryptMsg(data, p.secretsState) var cipherText = encryptMsg(data, p.secretsState)
GC_ref(cipherText)
result = p.socket.send(addr cipherText[0], cipherText.len) result = p.socket.send(addr cipherText[0], cipherText.len)
GC_unref(cipherText)
proc fullRecvInto(s: AsyncSocket, buffer: pointer, bufferLen: int) {.async.} = proc fullRecvInto(s: AsyncSocket, buffer: pointer, bufferLen: int) {.async.} =
# XXX: This should be a library function # XXX: This should be a library function
@ -495,8 +494,8 @@ rlpxProtocol p2p, 0:
clientId: string, clientId: string,
capabilities: openarray[Capability], capabilities: openarray[Capability],
listenPort: uint, listenPort: uint,
nodeId: P2PNodeId) = nodeId: array[RawPublicKeySize, byte]) =
peer.id = nodeId # peer.id = nodeId
peer.dispatcher = getDispatcher(capabilities) peer.dispatcher = getDispatcher(capabilities)
proc disconnect(peer: Peer, reason: DisconnectionReason) proc disconnect(peer: Peer, reason: DisconnectionReason)
@ -511,26 +510,22 @@ template `^`(arr): auto =
# variable as an open array # variable as an open array
arr.toOpenArray(0, `arr Len` - 1) arr.toOpenArray(0, `arr Len` - 1)
proc sendHelloPacket(peer: Peer) {.async.} = proc validatePubKeyInHello(msg: p2p.hello, pubKey: PublicKey): bool =
var var pk: PublicKey
# XXX: TODO: get these from somewhere recoverPublicKey(msg.nodeId, pk) == EthKeysStatus.Success and pk == pubKey
nodeId: P2PNodeId
listeningPort = uint 0
discard peer.hello(baseProtocolVersion, clienId, proc check(status: AuthStatus) =
gCapabilities, listeningPort, nodeId) if status != AuthStatus.Success:
raise newException(Exception, "Error: " & $status)
echo "wait hello from outgoing" proc connectionEstablished(p: Peer, h: p2p.hello) =
var response = await peer.nextMsg(p2p.hello, discardOthers = true) p.dispatcher = getDispatcher(h.capabilities)
echo "received hello from outgoing" # p.id = h.nodeId
p.connectionState = Connected
peer.dispatcher = getDispatcher(response.capabilities) newSeq(p.protocolStates, gProtocols.len)
peer.id = response.nodeId
peer.connectionState = Connected
newSeq(peer.protocolStates, gProtocols.len)
# XXX: initialize the sub-protocol states # XXX: initialize the sub-protocol states
proc rlpxConnect*(myKeys: KeyPair, remote: Node): Future[Peer] {.async.} = proc rlpxConnect*(myKeys: KeyPair, listenPort: Port, remote: Node): Future[Peer] {.async.} =
# TODO: Make sure to close the socket in case of exception # TODO: Make sure to close the socket in case of exception
new result new result
result.socket = newAsyncSocket() result.socket = newAsyncSocket()
@ -539,11 +534,6 @@ proc rlpxConnect*(myKeys: KeyPair, remote: Node): Future[Peer] {.async.} =
const encryptionEnabled = true const encryptionEnabled = true
template check(body: untyped) =
let c = body
if c != AuthStatus.Success:
raise newException(Exception, "Error: " & $c)
var handshake = newHandshake({Initiator}) var handshake = newHandshake({Initiator})
handshake.host = myKeys handshake.host = myKeys
@ -563,26 +553,30 @@ proc rlpxConnect*(myKeys: KeyPair, remote: Node): Future[Peer] {.async.} =
check handshake.getSecrets(^authMsg, ^ackMsg, secrets) check handshake.getSecrets(^authMsg, ^ackMsg, secrets)
initSecretState(secrets, result.secretsState) initSecretState(secrets, result.secretsState)
await result.sendHelloPacket if handshake.remoteHPubkey != remote.node.pubKey:
return result raise newException(Exception, "Remote pubkey is wrong")
proc rlpxConnectIncoming*(myKeys: KeyPair, s: AsyncSocket): Future[Peer] {.async.} = discard result.hello(baseProtocolVersion, clienId,
gCapabilities, uint(listenPort), myKeys.pubkey.getRaw())
var response = await result.nextMsg(p2p.hello, discardOthers = true)
if not validatePubKeyInHello(response, remote.node.pubKey):
warn "Remote nodeId is not its public key" # XXX: Do we care?
connectionEstablished(result, response)
proc rlpxConnectIncoming*(myKeys: KeyPair, listenPort: Port, address: IpAddress, s: AsyncSocket): Future[Peer] {.async.} =
new result new result
result.socket = s result.socket = s
# XXX: how to get the .remote Node ID?
var handshake = newHandshake({Responder}) var handshake = newHandshake({Responder})
handshake.host = myKeys handshake.host = myKeys
var authMsg: array[1024, byte] var authMsg: array[1024, byte]
var authMsgLen = AuthMessageV4Length var authMsgLen = AuthMessageV4Length
echo "Reading..." # TODO: Handle both auth methods
await s.fullRecvInto(addr authMsg[0], authMsgLen) await s.fullRecvInto(addr authMsg[0], authMsgLen)
echo "Decode: ", handshake.decodeAuthMessage(^authMsg) check handshake.decodeAuthMessage(^authMsg)
template check(body: untyped) =
let c = body
if c != AuthStatus.Success:
raise newException(Exception, "Error: " & $c)
var ackMsg: array[AckMessageMaxEIP8, byte] var ackMsg: array[AckMessageMaxEIP8, byte]
var ackMsgLen: int var ackMsgLen: int
@ -594,8 +588,18 @@ proc rlpxConnectIncoming*(myKeys: KeyPair, s: AsyncSocket): Future[Peer] {.async
check handshake.getSecrets(^authMsg, ^ackMsg, secrets) check handshake.getSecrets(^authMsg, ^ackMsg, secrets)
initSecretState(secrets, result.secretsState) initSecretState(secrets, result.secretsState)
await result.sendHelloPacket var response = await result.nextMsg(p2p.hello, discardOthers = true)
return result discard result.hello(baseProtocolVersion, clienId,
gCapabilities, listenPort.uint, myKeys.pubkey.getRaw())
if validatePubKeyInHello(response, handshake.remoteHPubkey):
warn "Remote nodeId is not its public key" # XXX: Do we care?
let port = Port(response.listenPort)
let address = Address(ip: address, tcpPort: port, udpPort: port)
result.remote = newNode(initEnode(handshake.remoteHPubkey, address))
connectionEstablished(result, response)
when isMainModule: when isMainModule:
import rlp import rlp

View File

@ -133,10 +133,10 @@ proc encrypt*(c: var SecretState, header: openarray[byte],
copyMem(addr output[frameMacPos], addr frameMac.data[0], RlpHeaderLength) copyMem(addr output[frameMacPos], addr frameMac.data[0], RlpHeaderLength)
result = Success result = Success
template encryptMsg*(msg: BytesRange, secrets: SecretState): auto = proc encryptMsg*(msg: BytesRange, secrets: var SecretState): seq[byte] =
var header: RlpxHeader var header: RlpxHeader
if uint32(data.len) > maxUInt24: if uint32(msg.len) > maxUInt24:
raise newException(OverflowError, "RLPx message size exceeds limit") raise newException(OverflowError, "RLPx message size exceeds limit")
# write the frame size in the first 3 bytes of the header # write the frame size in the first 3 bytes of the header
@ -146,12 +146,10 @@ template encryptMsg*(msg: BytesRange, secrets: SecretState): auto =
# XXX: # XXX:
# This would be safer if we use a thread-local sequ for the temporary buffer # This would be safer if we use a thread-local sequ for the temporary buffer
var outCipherText = allocStackArray(byte, encryptedLength(msg.len)) result = newSeq[byte](encryptedLength(msg.len))
let s = encrypt(secrets, header, msg.toOpenArray, outCipherText.toOpenArray) let s = encrypt(secrets, header, msg.toOpenArray, result)
assert s == Success assert s == Success
outCipherText
proc getBodySize*(a: RlpxHeader): int = proc getBodySize*(a: RlpxHeader): int =
(int(a[0]) shl 16) or (int(a[1]) shl 8) or int(a[2]) (int(a[0]) shl 16) or (int(a[1]) shl 8) or int(a[2])

View File

@ -1,4 +1,4 @@
import peer_pool, discovery, enode, async, asyncnet, auth, rlpx import peer_pool, discovery, enode, async, asyncnet, auth, rlpx, net
import eth_keys import eth_keys
type Server* = ref object type Server* = ref object
@ -24,7 +24,7 @@ proc newServer*(keyPair: KeyPair, address: Address, chainDb: AsyncChainDB,
proc isRunning(s: Server): bool {.inline.} = not s.socket.isNil proc isRunning(s: Server): bool {.inline.} = not s.socket.isNil
proc receiveHandshake(s: Server, address: string, remote: AsyncSocket) {.async.} = proc receiveHandshake(s: Server, address: string, remote: AsyncSocket) {.async.} =
let p = await rlpxConnectIncoming(s.keyPair, remote) let p = await rlpxConnectIncoming(s.keyPair, s.address.tcpPort, parseIpAddress(address), remote)
if not p.isNil: if not p.isNil:
echo "TODO: Add peer to the pool..." echo "TODO: Add peer to the pool..."
else: else:

View File

@ -16,7 +16,7 @@ proc test() {.async.} =
await sleepAsync(500) await sleepAsync(500)
let n = newNode(initENode(kp.pubKey, address)) let n = newNode(initENode(kp.pubKey, address))
let peer = await rlpxConnect(newKeyPair(), n) let peer = await rlpxConnect(newKeyPair(), Port(1234), n)
doAssert(not peer.isNil) doAssert(not peer.isNil)