mirror of
https://github.com/status-im/nim-eth-p2p.git
synced 2025-01-14 17:04:19 +00:00
commit
d22c8251dc
@ -9,11 +9,11 @@ skipDirs = @["tests", "Nim"]
|
|||||||
|
|
||||||
requires "nim > 0.18.0",
|
requires "nim > 0.18.0",
|
||||||
"rlp >= 1.0.1",
|
"rlp >= 1.0.1",
|
||||||
"https://github.com/cheatfate/nimcrypto",
|
"nimcrypto",
|
||||||
"secp256k1 >= 0.1.0",
|
"secp256k1 >= 0.1.0",
|
||||||
"eth_keys",
|
"eth_keys",
|
||||||
"ranges",
|
"ranges",
|
||||||
"https://github.com/status-im/nim-stint",
|
"stint",
|
||||||
"https://github.com/status-im/nim-byteutils"
|
"https://github.com/status-im/nim-byteutils"
|
||||||
|
|
||||||
proc runTest(name: string, lang = "c") = exec "nim " & lang & " -r tests/" & name
|
proc runTest(name: string, lang = "c") = exec "nim " & lang & " -r tests/" & name
|
||||||
@ -24,3 +24,4 @@ task test, "Runs the test suite":
|
|||||||
runTest "testcrypt"
|
runTest "testcrypt"
|
||||||
runTest "testenode"
|
runTest "testenode"
|
||||||
runTest "tdiscovery"
|
runTest "tdiscovery"
|
||||||
|
runTest "tserver"
|
||||||
|
@ -23,7 +23,7 @@ type
|
|||||||
IncompleteENode ## Incomplete ENODE object
|
IncompleteENode ## Incomplete ENODE object
|
||||||
|
|
||||||
Address* = object
|
Address* = object
|
||||||
## Network address object
|
## Network address object
|
||||||
ip*: IpAddress ## IPv4/IPv6 address
|
ip*: IpAddress ## IPv4/IPv6 address
|
||||||
udpPort*: Port ## UDP discovery port number
|
udpPort*: Port ## UDP discovery port number
|
||||||
tcpPort*: Port ## TCP port number
|
tcpPort*: Port ## TCP port number
|
||||||
@ -125,23 +125,13 @@ proc initENode*(uri: string): ENode {.inline.} =
|
|||||||
if res != Success:
|
if res != Success:
|
||||||
raiseENodeError(res)
|
raiseENodeError(res)
|
||||||
|
|
||||||
proc initENode*(pubkey: PublicKey, address: Address): ENode =
|
proc initENode*(pubkey: PublicKey, address: Address): ENode {.inline.} =
|
||||||
## Create ENode object from public key ``pubkey`` and ``address``.
|
## Create ENode object from public key ``pubkey`` and ``address``.
|
||||||
result.pubkey = pubkey
|
result.pubkey = pubkey
|
||||||
if address.tcpPort == Port(0):
|
|
||||||
raiseENodeError(IncorrectPort)
|
|
||||||
if address.udpPort == Port(0):
|
|
||||||
raiseENodeError(IncorrectDiscPort)
|
|
||||||
result.address = address
|
result.address = address
|
||||||
|
|
||||||
proc isCorrect*(n: ENode): bool =
|
proc isCorrect*(n: ENode): bool =
|
||||||
## Returns ``true`` if ENode ``n`` is properly filled.
|
## Returns ``true`` if ENode ``n`` is properly filled.
|
||||||
if n.address.ip.family notin {IpAddressFamily.IPv4, IpAddressFamily.IPv6}:
|
|
||||||
return false
|
|
||||||
if n.address.tcpPort == Port(0):
|
|
||||||
return false
|
|
||||||
if n.address.udpPort == Port(0):
|
|
||||||
return false
|
|
||||||
result = false
|
result = false
|
||||||
for i in n.pubkey.data:
|
for i in n.pubkey.data:
|
||||||
if i != 0x00'u8:
|
if i != 0x00'u8:
|
||||||
@ -162,8 +152,9 @@ proc `$`*(n: ENode): string =
|
|||||||
result.add($n.pubkey)
|
result.add($n.pubkey)
|
||||||
result.add("@")
|
result.add("@")
|
||||||
result.add(ipaddr)
|
result.add(ipaddr)
|
||||||
result.add(":")
|
if uint16(n.address.tcpPort) != 0:
|
||||||
result.add($int(n.address.tcpPort))
|
result.add(":")
|
||||||
|
result.add($int(n.address.tcpPort))
|
||||||
if uint16(n.address.udpPort) != uint16(n.address.tcpPort):
|
if uint16(n.address.udpPort) != uint16(n.address.tcpPort):
|
||||||
result.add("?")
|
result.add("?")
|
||||||
result.add("discport=")
|
result.add("discport=")
|
||||||
|
@ -21,6 +21,7 @@ type
|
|||||||
lastLookupTime: float
|
lastLookupTime: float
|
||||||
connectedNodes: Table[Node, Peer]
|
connectedNodes: Table[Node, Peer]
|
||||||
running: bool
|
running: bool
|
||||||
|
listenPort*: Port
|
||||||
|
|
||||||
AsyncChainDb* = ref object # TODO: This should be defined elsewhere
|
AsyncChainDb* = ref object # TODO: This should be defined elsewhere
|
||||||
|
|
||||||
@ -40,6 +41,7 @@ proc newPeerPool*(chainDb: AsyncChainDb, networkId: int, keyPair: KeyPair,
|
|||||||
result.networkId = networkId
|
result.networkId = networkId
|
||||||
result.discovery = discovery
|
result.discovery = discovery
|
||||||
result.connectedNodes = initTable[Node, Peer]()
|
result.connectedNodes = initTable[Node, Peer]()
|
||||||
|
result.listenPort = Port(30303)
|
||||||
|
|
||||||
template ensureFuture(f: untyped) = asyncCheck f
|
template ensureFuture(f: untyped) = asyncCheck f
|
||||||
|
|
||||||
@ -72,7 +74,7 @@ proc connect(p: PeerPool, remote: Node): Future[Peer] {.async.} =
|
|||||||
debug "Skipping ", remote, "; already connected to it"
|
debug "Skipping ", remote, "; already connected to it"
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
result = await rlpxConnect(p.keyPair, remote)
|
result = await rlpxConnect(p.keyPair, p.listenPort, remote)
|
||||||
|
|
||||||
# expected_exceptions = (
|
# expected_exceptions = (
|
||||||
# UnreachablePeer, TimeoutError, PeerConnectionLost, HandshakeFailure)
|
# UnreachablePeer, TimeoutError, PeerConnectionLost, HandshakeFailure)
|
||||||
|
120
ethp2p/rlpx.nim
120
ethp2p/rlpx.nim
@ -9,13 +9,11 @@
|
|||||||
#
|
#
|
||||||
|
|
||||||
import
|
import
|
||||||
macros, sets, algorithm, async, asyncnet, asyncfutures,
|
macros, sets, algorithm, async, asyncnet, asyncfutures, net, logging,
|
||||||
hashes, rlp, ranges/[stackarrays, ptr_arith], eth_keys,
|
hashes, rlp, ranges/[stackarrays, ptr_arith], eth_keys,
|
||||||
ethereum_types, kademlia, discovery, auth, rlpxcrypt
|
ethereum_types, kademlia, discovery, auth, rlpxcrypt, nimcrypto, enode
|
||||||
|
|
||||||
type
|
type
|
||||||
P2PNodeId = MDigest[512]
|
|
||||||
|
|
||||||
ConnectionState = enum
|
ConnectionState = enum
|
||||||
None,
|
None,
|
||||||
Connected,
|
Connected,
|
||||||
@ -23,7 +21,6 @@ type
|
|||||||
Disconnected
|
Disconnected
|
||||||
|
|
||||||
Peer* = ref object
|
Peer* = ref object
|
||||||
id: P2PNodeId # XXX: not fillet yed
|
|
||||||
socket: AsyncSocket
|
socket: AsyncSocket
|
||||||
dispatcher: Dispatcher
|
dispatcher: Dispatcher
|
||||||
networkId: int
|
networkId: int
|
||||||
@ -200,7 +197,7 @@ proc writeMsgId(p: ProtocolInfo, msgId: int, peer: Peer, rlpOut: var RlpWriter)
|
|||||||
let baseMsgId = peer.dispatcher.protocolOffsets[p.index]
|
let baseMsgId = peer.dispatcher.protocolOffsets[p.index]
|
||||||
if baseMsgId == -1:
|
if baseMsgId == -1:
|
||||||
raise newException(UnsupportedProtocol,
|
raise newException(UnsupportedProtocol,
|
||||||
p.nameStr & " is not supported by peer " & $peer.id)
|
p.nameStr & " is not supported by peer " & $peer.remote.id)
|
||||||
rlpOut.append(baseMsgId + msgId)
|
rlpOut.append(baseMsgId + msgId)
|
||||||
|
|
||||||
proc dispatchMsg(peer: Peer, msgId: int, msgData: var Rlp) =
|
proc dispatchMsg(peer: Peer, msgId: int, msgData: var Rlp) =
|
||||||
@ -215,12 +212,9 @@ proc dispatchMsg(peer: Peer, msgId: int, msgData: var Rlp) =
|
|||||||
|
|
||||||
thunk(peer, msgData)
|
thunk(peer, msgData)
|
||||||
|
|
||||||
proc send(p: Peer, data: BytesRange): Future[void] =
|
proc send(p: Peer, data: BytesRange) {.async.} =
|
||||||
var cipherText = encryptMsg(data, p.secretsState)
|
var cipherText = encryptMsg(data, p.secretsState)
|
||||||
result = p.socket.send(addr cipherText[0], cipherText.len)
|
await p.socket.send(addr cipherText[0], cipherText.len)
|
||||||
|
|
||||||
proc getMsgLen(header: RlpxHeader): int =
|
|
||||||
32
|
|
||||||
|
|
||||||
proc fullRecvInto(s: AsyncSocket, buffer: pointer, bufferLen: int) {.async.} =
|
proc fullRecvInto(s: AsyncSocket, buffer: pointer, bufferLen: int) {.async.} =
|
||||||
# XXX: This should be a library function
|
# XXX: This should be a library function
|
||||||
@ -498,8 +492,8 @@ rlpxProtocol p2p, 0:
|
|||||||
clientId: string,
|
clientId: string,
|
||||||
capabilities: openarray[Capability],
|
capabilities: openarray[Capability],
|
||||||
listenPort: uint,
|
listenPort: uint,
|
||||||
nodeId: P2PNodeId) =
|
nodeId: array[RawPublicKeySize, byte]) =
|
||||||
peer.id = nodeId
|
# peer.id = nodeId
|
||||||
peer.dispatcher = getDispatcher(capabilities)
|
peer.dispatcher = getDispatcher(capabilities)
|
||||||
|
|
||||||
proc disconnect(peer: Peer, reason: DisconnectionReason)
|
proc disconnect(peer: Peer, reason: DisconnectionReason)
|
||||||
@ -509,28 +503,43 @@ rlpxProtocol p2p, 0:
|
|||||||
proc pong(peer: Peer) =
|
proc pong(peer: Peer) =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
import typetraits
|
template `^`(arr): auto =
|
||||||
|
# passes a stack array with a matching `arrLen`
|
||||||
|
# variable as an open array
|
||||||
|
arr.toOpenArray(0, `arr Len` - 1)
|
||||||
|
|
||||||
proc rlpxConnect*(myKeys: KeyPair, remote: Node): Future[Peer] {.async.} =
|
proc validatePubKeyInHello(msg: p2p.hello, pubKey: PublicKey): bool =
|
||||||
|
var pk: PublicKey
|
||||||
|
recoverPublicKey(msg.nodeId, pk) == EthKeysStatus.Success and pk == pubKey
|
||||||
|
|
||||||
|
proc check(status: AuthStatus) =
|
||||||
|
if status != AuthStatus.Success:
|
||||||
|
raise newException(Exception, "Error: " & $status)
|
||||||
|
|
||||||
|
proc connectionEstablished(p: Peer, h: p2p.hello) =
|
||||||
|
p.dispatcher = getDispatcher(h.capabilities)
|
||||||
|
# p.id = h.nodeId
|
||||||
|
p.connectionState = Connected
|
||||||
|
newSeq(p.protocolStates, gProtocols.len)
|
||||||
|
# XXX: initialize the sub-protocol states
|
||||||
|
|
||||||
|
proc initSecretState(hs: var Handshake, authMsg, ackMsg: openarray[byte], p: Peer) =
|
||||||
|
var secrets: ConnectionSecret
|
||||||
|
check hs.getSecrets(authMsg, ackMsg, secrets)
|
||||||
|
initSecretState(secrets, p.secretsState)
|
||||||
|
burnMem(secrets)
|
||||||
|
|
||||||
|
proc rlpxConnect*(myKeys: KeyPair, listenPort: Port, remote: Node): Future[Peer] {.async.} =
|
||||||
# TODO: Make sure to close the socket in case of exception
|
# TODO: Make sure to close the socket in case of exception
|
||||||
|
new result
|
||||||
result.socket = newAsyncSocket()
|
result.socket = newAsyncSocket()
|
||||||
|
result.remote = remote
|
||||||
await result.socket.connect($remote.node.address.ip, remote.node.address.tcpPort)
|
await result.socket.connect($remote.node.address.ip, remote.node.address.tcpPort)
|
||||||
|
|
||||||
const encryptionEnabled = true
|
const encryptionEnabled = true
|
||||||
|
|
||||||
template check(body: untyped) =
|
|
||||||
let c = body
|
|
||||||
if c != AuthStatus.Success:
|
|
||||||
raise newException(Exception, "Error: " & $c)
|
|
||||||
|
|
||||||
template `^`(arr): auto =
|
|
||||||
# passes a stack array with a matching `arrLen`
|
|
||||||
# variable as an open array
|
|
||||||
arr.toOpenArray(0, `arr Len` - 1)
|
|
||||||
|
|
||||||
var handshake = newHandshake({Initiator})
|
var handshake = newHandshake({Initiator})
|
||||||
handshake.host.seckey = myKeys.seckey
|
handshake.host = myKeys
|
||||||
handshake.host.pubkey = myKeys.pubKey
|
|
||||||
|
|
||||||
var authMsg: array[AuthMessageMaxEIP8, byte]
|
var authMsg: array[AuthMessageMaxEIP8, byte]
|
||||||
var authMsgLen = 0
|
var authMsgLen = 0
|
||||||
@ -544,25 +553,52 @@ proc rlpxConnect*(myKeys: KeyPair, remote: Node): Future[Peer] {.async.} =
|
|||||||
await result.socket.fullRecvInto(addr ackMsg, ackMsgLen)
|
await result.socket.fullRecvInto(addr ackMsg, ackMsgLen)
|
||||||
|
|
||||||
check handshake.decodeAckMessage(^ackMsg)
|
check handshake.decodeAckMessage(^ackMsg)
|
||||||
var secrets: ConnectionSecret
|
initSecretState(handshake, ^authMsg, ^ackMsg, result)
|
||||||
check handshake.getSecrets(^authMsg, ^ackMsg, secrets)
|
|
||||||
initSecretState(secrets, result.secretsState)
|
|
||||||
|
|
||||||
var
|
if handshake.remoteHPubkey != remote.node.pubKey:
|
||||||
# XXX: TODO: get these from somewhere
|
raise newException(Exception, "Remote pubkey is wrong")
|
||||||
nodeId: P2PNodeId
|
|
||||||
listeningPort = uint 0
|
|
||||||
|
|
||||||
discard hello(result, baseProtocolVersion, clienId,
|
discard result.hello(baseProtocolVersion, clienId,
|
||||||
gCapabilities, listeningPort, nodeId)
|
gCapabilities, uint(listenPort), myKeys.pubkey.getRaw())
|
||||||
|
|
||||||
var response = await result.nextMsg(p2p.hello, discardOthers = true)
|
var response = await result.nextMsg(p2p.hello, discardOthers = true)
|
||||||
result.dispatcher = getDispatcher(response.capabilities)
|
|
||||||
result.id = response.nodeId
|
if not validatePubKeyInHello(response, remote.node.pubKey):
|
||||||
result.connectionState = Connected
|
warn "Remote nodeId is not its public key" # XXX: Do we care?
|
||||||
result.remote = remote
|
|
||||||
newSeq(result.protocolStates, gProtocols.len)
|
connectionEstablished(result, response)
|
||||||
# XXX: initialize the sub-protocol states
|
|
||||||
|
proc rlpxConnectIncoming*(myKeys: KeyPair, listenPort: Port, address: IpAddress, s: AsyncSocket): Future[Peer] {.async.} =
|
||||||
|
new result
|
||||||
|
result.socket = s
|
||||||
|
var handshake = newHandshake({Responder})
|
||||||
|
handshake.host = myKeys
|
||||||
|
|
||||||
|
var authMsg: array[1024, byte]
|
||||||
|
var authMsgLen = AuthMessageV4Length
|
||||||
|
# TODO: Handle both auth methods
|
||||||
|
await s.fullRecvInto(addr authMsg[0], authMsgLen)
|
||||||
|
check handshake.decodeAuthMessage(^authMsg)
|
||||||
|
|
||||||
|
var ackMsg: array[AckMessageMaxEIP8, byte]
|
||||||
|
var ackMsgLen: int
|
||||||
|
check handshake.ackMessage(ackMsg, ackMsgLen)
|
||||||
|
|
||||||
|
await s.send(addr ackMsg[0], ackMsgLen)
|
||||||
|
initSecretState(handshake, ^authMsg, ^ackMsg, result)
|
||||||
|
|
||||||
|
var response = await result.nextMsg(p2p.hello, discardOthers = true)
|
||||||
|
discard result.hello(baseProtocolVersion, clienId,
|
||||||
|
gCapabilities, listenPort.uint, myKeys.pubkey.getRaw())
|
||||||
|
|
||||||
|
if validatePubKeyInHello(response, handshake.remoteHPubkey):
|
||||||
|
warn "Remote nodeId is not its public key" # XXX: Do we care?
|
||||||
|
|
||||||
|
let port = Port(response.listenPort)
|
||||||
|
let address = Address(ip: address, tcpPort: port, udpPort: port)
|
||||||
|
result.remote = newNode(initEnode(handshake.remoteHPubkey, address))
|
||||||
|
|
||||||
|
connectionEstablished(result, response)
|
||||||
|
|
||||||
when isMainModule:
|
when isMainModule:
|
||||||
import rlp
|
import rlp
|
||||||
|
@ -133,10 +133,10 @@ proc encrypt*(c: var SecretState, header: openarray[byte],
|
|||||||
copyMem(addr output[frameMacPos], addr frameMac.data[0], RlpHeaderLength)
|
copyMem(addr output[frameMacPos], addr frameMac.data[0], RlpHeaderLength)
|
||||||
result = Success
|
result = Success
|
||||||
|
|
||||||
template encryptMsg*(msg: BytesRange, secrets: SecretState): auto =
|
proc encryptMsg*(msg: BytesRange, secrets: var SecretState): seq[byte] =
|
||||||
var header: RlpxHeader
|
var header: RlpxHeader
|
||||||
|
|
||||||
if data.len > int(maxUInt24):
|
if uint32(msg.len) > maxUInt24:
|
||||||
raise newException(OverflowError, "RLPx message size exceeds limit")
|
raise newException(OverflowError, "RLPx message size exceeds limit")
|
||||||
|
|
||||||
# write the frame size in the first 3 bytes of the header
|
# write the frame size in the first 3 bytes of the header
|
||||||
@ -146,12 +146,10 @@ template encryptMsg*(msg: BytesRange, secrets: SecretState): auto =
|
|||||||
|
|
||||||
# XXX:
|
# XXX:
|
||||||
# This would be safer if we use a thread-local sequ for the temporary buffer
|
# This would be safer if we use a thread-local sequ for the temporary buffer
|
||||||
var outCipherText = allocStackArray(byte, encryptedLength(msg.len))
|
result = newSeq[byte](encryptedLength(msg.len))
|
||||||
let s = encrypt(secrets, header, msg.toOpenArray, outCipherText.toOpenArray)
|
let s = encrypt(secrets, header, msg.toOpenArray, result)
|
||||||
assert s == Success
|
assert s == Success
|
||||||
|
|
||||||
outCipherText
|
|
||||||
|
|
||||||
proc getBodySize*(a: RlpxHeader): int =
|
proc getBodySize*(a: RlpxHeader): int =
|
||||||
(int(a[0]) shl 16) or (int(a[1]) shl 8) or int(a[2])
|
(int(a[0]) shl 16) or (int(a[1]) shl 8) or int(a[2])
|
||||||
|
|
||||||
@ -207,7 +205,7 @@ proc decryptBody*(c: var SecretState, data: openarray[byte], bodysize: int,
|
|||||||
##
|
##
|
||||||
## `data` must be at least `roundup16(bodysize) + RlpMacLength` length.
|
## `data` must be at least `roundup16(bodysize) + RlpMacLength` length.
|
||||||
## `output` must be at least `roundup16(bodysize)` length.
|
## `output` must be at least `roundup16(bodysize)` length.
|
||||||
##
|
##
|
||||||
## On success completion `outlen` will hold actual size of decrypted body.
|
## On success completion `outlen` will hold actual size of decrypted body.
|
||||||
var
|
var
|
||||||
tmpmac: keccak256
|
tmpmac: keccak256
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
import peer_pool, discovery, async, asyncnet
|
import peer_pool, discovery, enode, async, asyncnet, auth, rlpx, net
|
||||||
import eth_keys
|
import eth_keys
|
||||||
|
|
||||||
type Server* = ref object
|
type Server* = ref object
|
||||||
@ -11,7 +11,7 @@ type Server* = ref object
|
|||||||
peerPool: PeerPool
|
peerPool: PeerPool
|
||||||
|
|
||||||
proc newServer*(keyPair: KeyPair, address: Address, chainDb: AsyncChainDB,
|
proc newServer*(keyPair: KeyPair, address: Address, chainDb: AsyncChainDB,
|
||||||
bootstrapNodes: openarray[string], networkId: int): Server =
|
bootstrapNodes: openarray[ENode], networkId: int): Server =
|
||||||
result.new()
|
result.new()
|
||||||
result.chainDb = chainDb
|
result.chainDb = chainDb
|
||||||
result.keyPair = keyPair
|
result.keyPair = keyPair
|
||||||
@ -24,107 +24,29 @@ proc newServer*(keyPair: KeyPair, address: Address, chainDb: AsyncChainDB,
|
|||||||
proc isRunning(s: Server): bool {.inline.} = not s.socket.isNil
|
proc isRunning(s: Server): bool {.inline.} = not s.socket.isNil
|
||||||
|
|
||||||
proc receiveHandshake(s: Server, address: string, remote: AsyncSocket) {.async.} =
|
proc receiveHandshake(s: Server, address: string, remote: AsyncSocket) {.async.} =
|
||||||
discard # Perform hanshake
|
let p = await rlpxConnectIncoming(s.keyPair, s.address.tcpPort, parseIpAddress(address), remote)
|
||||||
discard # Create Peer
|
if not p.isNil:
|
||||||
discard # Add Peer to PeerPool
|
echo "TODO: Add peer to the pool..."
|
||||||
|
|
||||||
async def receive_handshake(
|
|
||||||
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
|
|
||||||
# Use reader to read the auth_init msg until EOF
|
|
||||||
msg = await reader.read(ENCRYPTED_AUTH_MSG_LEN)
|
|
||||||
|
|
||||||
# Use HandshakeResponder.decode_authentication(auth_init_message) on auth init msg
|
|
||||||
try:
|
|
||||||
ephem_pubkey, initiator_nonce, initiator_pubkey = decode_authentication(
|
|
||||||
msg, self.privkey)
|
|
||||||
# Try to decode as EIP8
|
|
||||||
except DecryptionError:
|
|
||||||
msg_size = big_endian_to_int(msg[:2])
|
|
||||||
remaining_bytes = msg_size - ENCRYPTED_AUTH_MSG_LEN + 2
|
|
||||||
msg += await reader.read(remaining_bytes)
|
|
||||||
ephem_pubkey, initiator_nonce, initiator_pubkey = decode_authentication(
|
|
||||||
msg, self.privkey)
|
|
||||||
|
|
||||||
# Get remote's address: IPv4 or IPv6
|
|
||||||
ip, port, *_ = writer.get_extra_info("peername")
|
|
||||||
remote_address = Address(ip, port)
|
|
||||||
|
|
||||||
# Create `HandshakeResponder(remote: kademlia.Node, privkey: datatypes.PrivateKey)` instance
|
|
||||||
initiator_remote = Node(initiator_pubkey, remote_address)
|
|
||||||
responder = HandshakeResponder(initiator_remote, self.privkey)
|
|
||||||
|
|
||||||
# Call `HandshakeResponder.create_auth_ack_message(nonce: bytes)` to create the reply
|
|
||||||
responder_nonce = secrets.token_bytes(HASH_LEN)
|
|
||||||
auth_ack_msg = responder.create_auth_ack_message(nonce=responder_nonce)
|
|
||||||
auth_ack_ciphertext = responder.encrypt_auth_ack_message(auth_ack_msg)
|
|
||||||
|
|
||||||
# Use the `writer` to send the reply to the remote
|
|
||||||
writer.write(auth_ack_ciphertext)
|
|
||||||
await writer.drain()
|
|
||||||
|
|
||||||
# Call `HandshakeResponder.derive_shared_secrets()` and use return values to create `Peer`
|
|
||||||
aes_secret, mac_secret, egress_mac, ingress_mac = responder.derive_secrets(
|
|
||||||
initiator_nonce=initiator_nonce,
|
|
||||||
responder_nonce=responder_nonce,
|
|
||||||
remote_ephemeral_pubkey=ephem_pubkey,
|
|
||||||
auth_init_ciphertext=msg,
|
|
||||||
auth_ack_ciphertext=auth_ack_ciphertext
|
|
||||||
)
|
|
||||||
|
|
||||||
# Create and register peer in peer_pool
|
|
||||||
eth_peer = ETHPeer(
|
|
||||||
remote=initiator_remote, privkey=self.privkey, reader=reader,
|
|
||||||
writer=writer, aes_secret=aes_secret, mac_secret=mac_secret,
|
|
||||||
egress_mac=egress_mac, ingress_mac=ingress_mac, chaindb=self.chaindb,
|
|
||||||
network_id=self.network_id
|
|
||||||
)
|
|
||||||
self.peer_pool.add_peer(eth_peer)
|
|
||||||
|
|
||||||
|
|
||||||
def decode_authentication(ciphertext: bytes,
|
|
||||||
privkey: datatypes.PrivateKey
|
|
||||||
) -> Tuple[datatypes.PublicKey, bytes, datatypes.PublicKey]:
|
|
||||||
"""
|
|
||||||
Decrypts and decodes the ciphertext msg.
|
|
||||||
Returns the initiator's ephemeral pubkey, nonce, and pubkey.
|
|
||||||
"""
|
|
||||||
if len(ciphertext) < ENCRYPTED_AUTH_MSG_LEN:
|
|
||||||
raise ValueError("Auth msg too short: {}".format(len(ciphertext)))
|
|
||||||
elif len(ciphertext) == ENCRYPTED_AUTH_MSG_LEN:
|
|
||||||
sig, initiator_pubkey, initiator_nonce, _ = decode_auth_plain(
|
|
||||||
ciphertext, privkey)
|
|
||||||
else:
|
else:
|
||||||
sig, initiator_pubkey, initiator_nonce, _ = decode_auth_eip8(
|
echo "Could not establish connection with incoming peer"
|
||||||
ciphertext, privkey)
|
|
||||||
|
|
||||||
# recover initiator ephemeral pubkey from sig
|
|
||||||
# S(ephemeral-privk, ecdh-shared-secret ^ nonce)
|
|
||||||
shared_secret = ecdh_agree(privkey, initiator_pubkey)
|
|
||||||
|
|
||||||
ephem_pubkey = sig.recover_public_key_from_msg_hash(
|
|
||||||
sxor(shared_secret, initiator_nonce))
|
|
||||||
|
|
||||||
return ephem_pubkey, initiator_nonce, initiator_pubkey
|
|
||||||
|
|
||||||
|
|
||||||
proc run(s: Server) {.async.} =
|
proc run(s: Server) {.async.} =
|
||||||
|
# TODO: Add error handling
|
||||||
s.socket = newAsyncSocket()
|
s.socket = newAsyncSocket()
|
||||||
s.socket.setSockOpt(OptReuseAddr, true)
|
s.socket.setSockOpt(OptReuseAddr, true)
|
||||||
s.socket.setSockOpt(OptReusePort, true)
|
|
||||||
s.socket.bindAddr(s.address.tcpPort)
|
s.socket.bindAddr(s.address.tcpPort)
|
||||||
s.socket.listen()
|
s.socket.listen()
|
||||||
|
|
||||||
while s.isRunning:
|
while s.isRunning:
|
||||||
let (address, client) = await s.socket.acceptAddr()
|
let (address, client) = await s.socket.acceptAddr()
|
||||||
asyncCheck s.receiveHandshake(address, client)
|
discard s.receiveHandshake(address, client)
|
||||||
|
|
||||||
proc start*(s: Server) =
|
proc start*(s: Server) =
|
||||||
if s.isRunning:
|
if not s.isRunning:
|
||||||
asyncCheck s.run()
|
discard s.run()
|
||||||
|
|
||||||
proc stop*(s: Server) =
|
proc stop*(s: Server) =
|
||||||
if s.isRunning:
|
if s.isRunning:
|
||||||
s.socket.close()
|
s.socket.close()
|
||||||
s.socket = nil
|
s.socket = nil
|
||||||
# s.peerPool.stop() # XXX
|
# s.peerPool.stop() # XXX
|
||||||
|
|
||||||
|
23
tests/tserver.nim
Normal file
23
tests/tserver.nim
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
import ../ethp2p/[discovery, kademlia, peer_pool, enode, server, rlpx]
|
||||||
|
import eth_keys, net, asyncdispatch, sequtils
|
||||||
|
|
||||||
|
proc localAddress(port: int): Address =
|
||||||
|
let port = Port(port)
|
||||||
|
result = Address(udpPort: port, tcpPort: port, ip: parseIpAddress("127.0.0.1"))
|
||||||
|
|
||||||
|
|
||||||
|
proc test() {.async.} =
|
||||||
|
let kp = newKeyPair()
|
||||||
|
let address = localAddress(20301)
|
||||||
|
|
||||||
|
let s = newServer(kp, address, nil, [], 1)
|
||||||
|
s.start()
|
||||||
|
|
||||||
|
await sleepAsync(500)
|
||||||
|
|
||||||
|
let n = newNode(initENode(kp.pubKey, address))
|
||||||
|
let peer = await rlpxConnect(newKeyPair(), Port(1234), n)
|
||||||
|
|
||||||
|
doAssert(not peer.isNil)
|
||||||
|
|
||||||
|
waitFor test()
|
Loading…
x
Reference in New Issue
Block a user