diff --git a/ethp2p/enode.nim b/ethp2p/enode.nim index 4672d97..0085e70 100644 --- a/ethp2p/enode.nim +++ b/ethp2p/enode.nim @@ -23,7 +23,7 @@ type IncompleteENode ## Incomplete ENODE object Address* = object - ## Network address object + ## Network address object ip*: IpAddress ## IPv4/IPv6 address udpPort*: Port ## UDP discovery port number tcpPort*: Port ## TCP port number @@ -125,23 +125,13 @@ proc initENode*(uri: string): ENode {.inline.} = if res != Success: raiseENodeError(res) -proc initENode*(pubkey: PublicKey, address: Address): ENode = +proc initENode*(pubkey: PublicKey, address: Address): ENode {.inline.} = ## Create ENode object from public key ``pubkey`` and ``address``. result.pubkey = pubkey - if address.tcpPort == Port(0): - raiseENodeError(IncorrectPort) - if address.udpPort == Port(0): - raiseENodeError(IncorrectDiscPort) result.address = address proc isCorrect*(n: ENode): bool = ## Returns ``true`` if ENode ``n`` is properly filled. - if n.address.ip.family notin {IpAddressFamily.IPv4, IpAddressFamily.IPv6}: - return false - if n.address.tcpPort == Port(0): - return false - if n.address.udpPort == Port(0): - return false result = false for i in n.pubkey.data: if i != 0x00'u8: @@ -162,8 +152,9 @@ proc `$`*(n: ENode): string = result.add($n.pubkey) result.add("@") result.add(ipaddr) - result.add(":") - result.add($int(n.address.tcpPort)) + if uint16(n.address.tcpPort) != 0: + result.add(":") + result.add($int(n.address.tcpPort)) if uint16(n.address.udpPort) != uint16(n.address.tcpPort): result.add("?") result.add("discport=") diff --git a/ethp2p/peer_pool.nim b/ethp2p/peer_pool.nim index 772d16f..9d2559a 100644 --- a/ethp2p/peer_pool.nim +++ b/ethp2p/peer_pool.nim @@ -21,6 +21,7 @@ type lastLookupTime: float connectedNodes: Table[Node, Peer] running: bool + listenPort*: Port AsyncChainDb* = ref object # TODO: This should be defined elsewhere @@ -40,6 +41,7 @@ proc newPeerPool*(chainDb: AsyncChainDb, networkId: int, keyPair: KeyPair, result.networkId = networkId result.discovery = discovery result.connectedNodes = initTable[Node, Peer]() + result.listenPort = Port(30303) template ensureFuture(f: untyped) = asyncCheck f @@ -72,7 +74,7 @@ proc connect(p: PeerPool, remote: Node): Future[Peer] {.async.} = debug "Skipping ", remote, "; already connected to it" return nil - result = await rlpxConnect(p.keyPair, remote) + result = await rlpxConnect(p.keyPair, p.listenPort, remote) # expected_exceptions = ( # UnreachablePeer, TimeoutError, PeerConnectionLost, HandshakeFailure) diff --git a/ethp2p/rlpx.nim b/ethp2p/rlpx.nim index 133e9d5..0af3c1c 100644 --- a/ethp2p/rlpx.nim +++ b/ethp2p/rlpx.nim @@ -9,13 +9,11 @@ # import - macros, sets, algorithm, async, asyncnet, asyncfutures, net, + macros, sets, algorithm, async, asyncnet, asyncfutures, net, logging, hashes, rlp, ranges/[stackarrays, ptr_arith], eth_keys, - ethereum_types, kademlia, discovery, auth, rlpxcrypt + ethereum_types, kademlia, discovery, auth, rlpxcrypt, nimcrypto, enode type - P2PNodeId = MDigest[512] - ConnectionState = enum None, Connected, @@ -23,7 +21,6 @@ type Disconnected Peer* = ref object - id: P2PNodeId # XXX: not fillet yed socket: AsyncSocket dispatcher: Dispatcher networkId: int @@ -200,7 +197,7 @@ proc writeMsgId(p: ProtocolInfo, msgId: int, peer: Peer, rlpOut: var RlpWriter) let baseMsgId = peer.dispatcher.protocolOffsets[p.index] if baseMsgId == -1: raise newException(UnsupportedProtocol, - p.nameStr & " is not supported by peer " & $peer.id) + p.nameStr & " is not supported by peer " & $peer.remote.id) rlpOut.append(baseMsgId + msgId) proc dispatchMsg(peer: Peer, msgId: int, msgData: var Rlp) = @@ -215,9 +212,11 @@ proc dispatchMsg(peer: Peer, msgId: int, msgData: var Rlp) = thunk(peer, msgData) -proc send(p: Peer, data: BytesRange): Future[void] = +proc send(p: Peer, data: BytesRange) {.async.} = var cipherText = encryptMsg(data, p.secretsState) + GC_ref(cipherText) result = p.socket.send(addr cipherText[0], cipherText.len) + GC_unref(cipherText) proc fullRecvInto(s: AsyncSocket, buffer: pointer, bufferLen: int) {.async.} = # XXX: This should be a library function @@ -495,8 +494,8 @@ rlpxProtocol p2p, 0: clientId: string, capabilities: openarray[Capability], listenPort: uint, - nodeId: P2PNodeId) = - peer.id = nodeId + nodeId: array[RawPublicKeySize, byte]) = + # peer.id = nodeId peer.dispatcher = getDispatcher(capabilities) proc disconnect(peer: Peer, reason: DisconnectionReason) @@ -511,26 +510,22 @@ template `^`(arr): auto = # variable as an open array arr.toOpenArray(0, `arr Len` - 1) -proc sendHelloPacket(peer: Peer) {.async.} = - var - # XXX: TODO: get these from somewhere - nodeId: P2PNodeId - listeningPort = uint 0 +proc validatePubKeyInHello(msg: p2p.hello, pubKey: PublicKey): bool = + var pk: PublicKey + recoverPublicKey(msg.nodeId, pk) == EthKeysStatus.Success and pk == pubKey - discard peer.hello(baseProtocolVersion, clienId, - gCapabilities, listeningPort, nodeId) +proc check(status: AuthStatus) = + if status != AuthStatus.Success: + raise newException(Exception, "Error: " & $status) - echo "wait hello from outgoing" - var response = await peer.nextMsg(p2p.hello, discardOthers = true) - echo "received hello from outgoing" - - peer.dispatcher = getDispatcher(response.capabilities) - peer.id = response.nodeId - peer.connectionState = Connected - newSeq(peer.protocolStates, gProtocols.len) +proc connectionEstablished(p: Peer, h: p2p.hello) = + p.dispatcher = getDispatcher(h.capabilities) + # p.id = h.nodeId + p.connectionState = Connected + newSeq(p.protocolStates, gProtocols.len) # XXX: initialize the sub-protocol states -proc rlpxConnect*(myKeys: KeyPair, remote: Node): Future[Peer] {.async.} = +proc rlpxConnect*(myKeys: KeyPair, listenPort: Port, remote: Node): Future[Peer] {.async.} = # TODO: Make sure to close the socket in case of exception new result result.socket = newAsyncSocket() @@ -539,11 +534,6 @@ proc rlpxConnect*(myKeys: KeyPair, remote: Node): Future[Peer] {.async.} = const encryptionEnabled = true - template check(body: untyped) = - let c = body - if c != AuthStatus.Success: - raise newException(Exception, "Error: " & $c) - var handshake = newHandshake({Initiator}) handshake.host = myKeys @@ -563,26 +553,30 @@ proc rlpxConnect*(myKeys: KeyPair, remote: Node): Future[Peer] {.async.} = check handshake.getSecrets(^authMsg, ^ackMsg, secrets) initSecretState(secrets, result.secretsState) - await result.sendHelloPacket - return result + if handshake.remoteHPubkey != remote.node.pubKey: + raise newException(Exception, "Remote pubkey is wrong") -proc rlpxConnectIncoming*(myKeys: KeyPair, s: AsyncSocket): Future[Peer] {.async.} = + discard result.hello(baseProtocolVersion, clienId, + gCapabilities, uint(listenPort), myKeys.pubkey.getRaw()) + + var response = await result.nextMsg(p2p.hello, discardOthers = true) + + if not validatePubKeyInHello(response, remote.node.pubKey): + warn "Remote nodeId is not its public key" # XXX: Do we care? + + connectionEstablished(result, response) + +proc rlpxConnectIncoming*(myKeys: KeyPair, listenPort: Port, address: IpAddress, s: AsyncSocket): Future[Peer] {.async.} = new result result.socket = s - # XXX: how to get the .remote Node ID? var handshake = newHandshake({Responder}) handshake.host = myKeys var authMsg: array[1024, byte] var authMsgLen = AuthMessageV4Length - echo "Reading..." + # TODO: Handle both auth methods await s.fullRecvInto(addr authMsg[0], authMsgLen) - echo "Decode: ", handshake.decodeAuthMessage(^authMsg) - - template check(body: untyped) = - let c = body - if c != AuthStatus.Success: - raise newException(Exception, "Error: " & $c) + check handshake.decodeAuthMessage(^authMsg) var ackMsg: array[AckMessageMaxEIP8, byte] var ackMsgLen: int @@ -594,8 +588,18 @@ proc rlpxConnectIncoming*(myKeys: KeyPair, s: AsyncSocket): Future[Peer] {.async check handshake.getSecrets(^authMsg, ^ackMsg, secrets) initSecretState(secrets, result.secretsState) - await result.sendHelloPacket - return result + var response = await result.nextMsg(p2p.hello, discardOthers = true) + discard result.hello(baseProtocolVersion, clienId, + gCapabilities, listenPort.uint, myKeys.pubkey.getRaw()) + + if validatePubKeyInHello(response, handshake.remoteHPubkey): + warn "Remote nodeId is not its public key" # XXX: Do we care? + + let port = Port(response.listenPort) + let address = Address(ip: address, tcpPort: port, udpPort: port) + result.remote = newNode(initEnode(handshake.remoteHPubkey, address)) + + connectionEstablished(result, response) when isMainModule: import rlp diff --git a/ethp2p/rlpxcrypt.nim b/ethp2p/rlpxcrypt.nim index ae60bcd..72a2cca 100644 --- a/ethp2p/rlpxcrypt.nim +++ b/ethp2p/rlpxcrypt.nim @@ -133,10 +133,10 @@ proc encrypt*(c: var SecretState, header: openarray[byte], copyMem(addr output[frameMacPos], addr frameMac.data[0], RlpHeaderLength) result = Success -template encryptMsg*(msg: BytesRange, secrets: SecretState): auto = +proc encryptMsg*(msg: BytesRange, secrets: var SecretState): seq[byte] = var header: RlpxHeader - if uint32(data.len) > maxUInt24: + if uint32(msg.len) > maxUInt24: raise newException(OverflowError, "RLPx message size exceeds limit") # write the frame size in the first 3 bytes of the header @@ -146,12 +146,10 @@ template encryptMsg*(msg: BytesRange, secrets: SecretState): auto = # XXX: # This would be safer if we use a thread-local sequ for the temporary buffer - var outCipherText = allocStackArray(byte, encryptedLength(msg.len)) - let s = encrypt(secrets, header, msg.toOpenArray, outCipherText.toOpenArray) + result = newSeq[byte](encryptedLength(msg.len)) + let s = encrypt(secrets, header, msg.toOpenArray, result) assert s == Success - outCipherText - proc getBodySize*(a: RlpxHeader): int = (int(a[0]) shl 16) or (int(a[1]) shl 8) or int(a[2]) diff --git a/ethp2p/server.nim b/ethp2p/server.nim index 26748c9..5cfb6fe 100644 --- a/ethp2p/server.nim +++ b/ethp2p/server.nim @@ -1,4 +1,4 @@ -import peer_pool, discovery, enode, async, asyncnet, auth, rlpx +import peer_pool, discovery, enode, async, asyncnet, auth, rlpx, net import eth_keys type Server* = ref object @@ -24,7 +24,7 @@ proc newServer*(keyPair: KeyPair, address: Address, chainDb: AsyncChainDB, proc isRunning(s: Server): bool {.inline.} = not s.socket.isNil proc receiveHandshake(s: Server, address: string, remote: AsyncSocket) {.async.} = - let p = await rlpxConnectIncoming(s.keyPair, remote) + let p = await rlpxConnectIncoming(s.keyPair, s.address.tcpPort, parseIpAddress(address), remote) if not p.isNil: echo "TODO: Add peer to the pool..." else: diff --git a/tests/tserver.nim b/tests/tserver.nim index bd081bb..cb82e40 100644 --- a/tests/tserver.nim +++ b/tests/tserver.nim @@ -16,7 +16,7 @@ proc test() {.async.} = await sleepAsync(500) let n = newNode(initENode(kp.pubKey, address)) - let peer = await rlpxConnect(newKeyPair(), n) + let peer = await rlpxConnect(newKeyPair(), Port(1234), n) doAssert(not peer.isNil)