Server updates
This commit is contained in:
parent
b3e0df26b6
commit
5ac3163920
|
@ -9,11 +9,11 @@ skipDirs = @["tests", "Nim"]
|
||||||
|
|
||||||
requires "nim > 0.18.0",
|
requires "nim > 0.18.0",
|
||||||
"rlp >= 1.0.1",
|
"rlp >= 1.0.1",
|
||||||
"https://github.com/cheatfate/nimcrypto",
|
"nimcrypto",
|
||||||
"secp256k1 >= 0.1.0",
|
"secp256k1 >= 0.1.0",
|
||||||
"eth_keys",
|
"eth_keys",
|
||||||
"ranges",
|
"ranges",
|
||||||
"https://github.com/status-im/nim-stint",
|
"stint",
|
||||||
"https://github.com/status-im/nim-byteutils"
|
"https://github.com/status-im/nim-byteutils"
|
||||||
|
|
||||||
proc runTest(name: string, lang = "c") = exec "nim " & lang & " -r tests/" & name
|
proc runTest(name: string, lang = "c") = exec "nim " & lang & " -r tests/" & name
|
||||||
|
@ -24,3 +24,4 @@ task test, "Runs the test suite":
|
||||||
runTest "testcrypt"
|
runTest "testcrypt"
|
||||||
runTest "testenode"
|
runTest "testenode"
|
||||||
runTest "tdiscovery"
|
runTest "tdiscovery"
|
||||||
|
runTest "tserver"
|
||||||
|
|
|
@ -9,7 +9,7 @@
|
||||||
#
|
#
|
||||||
|
|
||||||
import
|
import
|
||||||
macros, sets, algorithm, async, asyncnet, asyncfutures,
|
macros, sets, algorithm, async, asyncnet, asyncfutures, net,
|
||||||
hashes, rlp, ranges/[stackarrays, ptr_arith], eth_keys,
|
hashes, rlp, ranges/[stackarrays, ptr_arith], eth_keys,
|
||||||
ethereum_types, kademlia, discovery, auth, rlpxcrypt
|
ethereum_types, kademlia, discovery, auth, rlpxcrypt
|
||||||
|
|
||||||
|
@ -513,6 +513,7 @@ import typetraits
|
||||||
|
|
||||||
proc rlpxConnect*(myKeys: KeyPair, remote: Node): Future[Peer] {.async.} =
|
proc rlpxConnect*(myKeys: KeyPair, remote: Node): Future[Peer] {.async.} =
|
||||||
# TODO: Make sure to close the socket in case of exception
|
# TODO: Make sure to close the socket in case of exception
|
||||||
|
result.new()
|
||||||
result.socket = newAsyncSocket()
|
result.socket = newAsyncSocket()
|
||||||
await result.socket.connect($remote.node.address.ip, remote.node.address.tcpPort)
|
await result.socket.connect($remote.node.address.ip, remote.node.address.tcpPort)
|
||||||
|
|
||||||
|
@ -529,8 +530,7 @@ proc rlpxConnect*(myKeys: KeyPair, remote: Node): Future[Peer] {.async.} =
|
||||||
arr.toOpenArray(0, `arr Len` - 1)
|
arr.toOpenArray(0, `arr Len` - 1)
|
||||||
|
|
||||||
var handshake = newHandshake({Initiator})
|
var handshake = newHandshake({Initiator})
|
||||||
handshake.host.seckey = myKeys.seckey
|
handshake.host = myKeys
|
||||||
handshake.host.pubkey = myKeys.pubKey
|
|
||||||
|
|
||||||
var authMsg: array[AuthMessageMaxEIP8, byte]
|
var authMsg: array[AuthMessageMaxEIP8, byte]
|
||||||
var authMsgLen = 0
|
var authMsgLen = 0
|
||||||
|
@ -556,7 +556,10 @@ proc rlpxConnect*(myKeys: KeyPair, remote: Node): Future[Peer] {.async.} =
|
||||||
discard hello(result, baseProtocolVersion, clienId,
|
discard hello(result, baseProtocolVersion, clienId,
|
||||||
gCapabilities, listeningPort, nodeId)
|
gCapabilities, listeningPort, nodeId)
|
||||||
|
|
||||||
|
echo "wait hello from outgoing"
|
||||||
var response = await result.nextMsg(p2p.hello, discardOthers = true)
|
var response = await result.nextMsg(p2p.hello, discardOthers = true)
|
||||||
|
echo "received hello from outgoing"
|
||||||
|
|
||||||
result.dispatcher = getDispatcher(response.capabilities)
|
result.dispatcher = getDispatcher(response.capabilities)
|
||||||
result.id = response.nodeId
|
result.id = response.nodeId
|
||||||
result.connectionState = Connected
|
result.connectionState = Connected
|
||||||
|
@ -564,6 +567,59 @@ proc rlpxConnect*(myKeys: KeyPair, remote: Node): Future[Peer] {.async.} =
|
||||||
newSeq(result.protocolStates, gProtocols.len)
|
newSeq(result.protocolStates, gProtocols.len)
|
||||||
# XXX: initialize the sub-protocol states
|
# XXX: initialize the sub-protocol states
|
||||||
|
|
||||||
|
proc rlpxConnectIncoming*(myKeys: KeyPair, s: AsyncSocket): Future[Peer] {.async.} =
|
||||||
|
result.new()
|
||||||
|
result.socket = s
|
||||||
|
var handshake = newHandshake({Responder})
|
||||||
|
handshake.host = myKeys
|
||||||
|
|
||||||
|
var authMsg: array[1024, byte]
|
||||||
|
echo "Reading..."
|
||||||
|
let authMsgLen = await s.recvInto(addr authMsg[0], 307)
|
||||||
|
echo "Read: ", authMsgLen
|
||||||
|
echo "Decode: ", handshake.decodeAuthMessage(authMsg.toOpenArray(0, authMsgLen - 1))
|
||||||
|
|
||||||
|
template check(body: untyped) =
|
||||||
|
let c = body
|
||||||
|
if c != AuthStatus.Success:
|
||||||
|
raise newException(Exception, "Error: " & $c)
|
||||||
|
|
||||||
|
var ackMsg: array[AckMessageMaxEIP8, byte]
|
||||||
|
var ackMsgLen: int
|
||||||
|
check handshake.ackMessage(ackMsg, ackMsgLen)
|
||||||
|
|
||||||
|
await s.send(addr ackMsg[0], ackMsgLen)
|
||||||
|
|
||||||
|
template `^`(arr): auto =
|
||||||
|
# passes a stack array with a matching `arrLen`
|
||||||
|
# variable as an open array
|
||||||
|
arr.toOpenArray(0, `arr Len` - 1)
|
||||||
|
|
||||||
|
var secrets: ConnectionSecret
|
||||||
|
check handshake.getSecrets(^authMsg, ^ackMsg, secrets)
|
||||||
|
initSecretState(secrets, result.secretsState)
|
||||||
|
|
||||||
|
var
|
||||||
|
# XXX: TODO: get these from somewhere
|
||||||
|
nodeId: P2PNodeId
|
||||||
|
listeningPort = uint 0
|
||||||
|
|
||||||
|
discard hello(result, baseProtocolVersion, clienId,
|
||||||
|
gCapabilities, listeningPort, nodeId)
|
||||||
|
|
||||||
|
echo "wait hello from incoming"
|
||||||
|
var response = await result.nextMsg(p2p.hello, discardOthers = true)
|
||||||
|
echo "received hello from incoming"
|
||||||
|
|
||||||
|
result.dispatcher = getDispatcher(response.capabilities)
|
||||||
|
result.id = response.nodeId
|
||||||
|
|
||||||
|
echo response.nodeId
|
||||||
|
|
||||||
|
result.connectionState = Connected
|
||||||
|
result.remote = nil
|
||||||
|
newSeq(result.protocolStates, gProtocols.len)
|
||||||
|
|
||||||
when isMainModule:
|
when isMainModule:
|
||||||
import rlp
|
import rlp
|
||||||
|
|
||||||
|
|
|
@ -136,7 +136,7 @@ proc encrypt*(c: var SecretState, header: openarray[byte],
|
||||||
template encryptMsg*(msg: BytesRange, secrets: SecretState): auto =
|
template encryptMsg*(msg: BytesRange, secrets: SecretState): auto =
|
||||||
var header: RlpxHeader
|
var header: RlpxHeader
|
||||||
|
|
||||||
if data.len > int(maxUInt24):
|
if uint32(data.len) > maxUInt24:
|
||||||
raise newException(OverflowError, "RLPx message size exceeds limit")
|
raise newException(OverflowError, "RLPx message size exceeds limit")
|
||||||
|
|
||||||
# write the frame size in the first 3 bytes of the header
|
# write the frame size in the first 3 bytes of the header
|
||||||
|
@ -207,7 +207,7 @@ proc decryptBody*(c: var SecretState, data: openarray[byte], bodysize: int,
|
||||||
##
|
##
|
||||||
## `data` must be at least `roundup16(bodysize) + RlpMacLength` length.
|
## `data` must be at least `roundup16(bodysize) + RlpMacLength` length.
|
||||||
## `output` must be at least `roundup16(bodysize)` length.
|
## `output` must be at least `roundup16(bodysize)` length.
|
||||||
##
|
##
|
||||||
## On success completion `outlen` will hold actual size of decrypted body.
|
## On success completion `outlen` will hold actual size of decrypted body.
|
||||||
var
|
var
|
||||||
tmpmac: keccak256
|
tmpmac: keccak256
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
import peer_pool, discovery, async, asyncnet
|
import peer_pool, discovery, enode, async, asyncnet, auth, rlpx
|
||||||
import eth_keys
|
import eth_keys
|
||||||
|
|
||||||
type Server* = ref object
|
type Server* = ref object
|
||||||
|
@ -11,7 +11,7 @@ type Server* = ref object
|
||||||
peerPool: PeerPool
|
peerPool: PeerPool
|
||||||
|
|
||||||
proc newServer*(keyPair: KeyPair, address: Address, chainDb: AsyncChainDB,
|
proc newServer*(keyPair: KeyPair, address: Address, chainDb: AsyncChainDB,
|
||||||
bootstrapNodes: openarray[string], networkId: int): Server =
|
bootstrapNodes: openarray[ENode], networkId: int): Server =
|
||||||
result.new()
|
result.new()
|
||||||
result.chainDb = chainDb
|
result.chainDb = chainDb
|
||||||
result.keyPair = keyPair
|
result.keyPair = keyPair
|
||||||
|
@ -24,93 +24,15 @@ proc newServer*(keyPair: KeyPair, address: Address, chainDb: AsyncChainDB,
|
||||||
proc isRunning(s: Server): bool {.inline.} = not s.socket.isNil
|
proc isRunning(s: Server): bool {.inline.} = not s.socket.isNil
|
||||||
|
|
||||||
proc receiveHandshake(s: Server, address: string, remote: AsyncSocket) {.async.} =
|
proc receiveHandshake(s: Server, address: string, remote: AsyncSocket) {.async.} =
|
||||||
discard # Perform hanshake
|
let p = await rlpxConnectIncoming(s.keyPair, remote)
|
||||||
discard # Create Peer
|
if not p.isNil:
|
||||||
discard # Add Peer to PeerPool
|
echo "TODO: Add peer to the pool..."
|
||||||
|
|
||||||
async def receive_handshake(
|
|
||||||
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
|
|
||||||
# Use reader to read the auth_init msg until EOF
|
|
||||||
msg = await reader.read(ENCRYPTED_AUTH_MSG_LEN)
|
|
||||||
|
|
||||||
# Use HandshakeResponder.decode_authentication(auth_init_message) on auth init msg
|
|
||||||
try:
|
|
||||||
ephem_pubkey, initiator_nonce, initiator_pubkey = decode_authentication(
|
|
||||||
msg, self.privkey)
|
|
||||||
# Try to decode as EIP8
|
|
||||||
except DecryptionError:
|
|
||||||
msg_size = big_endian_to_int(msg[:2])
|
|
||||||
remaining_bytes = msg_size - ENCRYPTED_AUTH_MSG_LEN + 2
|
|
||||||
msg += await reader.read(remaining_bytes)
|
|
||||||
ephem_pubkey, initiator_nonce, initiator_pubkey = decode_authentication(
|
|
||||||
msg, self.privkey)
|
|
||||||
|
|
||||||
# Get remote's address: IPv4 or IPv6
|
|
||||||
ip, port, *_ = writer.get_extra_info("peername")
|
|
||||||
remote_address = Address(ip, port)
|
|
||||||
|
|
||||||
# Create `HandshakeResponder(remote: kademlia.Node, privkey: datatypes.PrivateKey)` instance
|
|
||||||
initiator_remote = Node(initiator_pubkey, remote_address)
|
|
||||||
responder = HandshakeResponder(initiator_remote, self.privkey)
|
|
||||||
|
|
||||||
# Call `HandshakeResponder.create_auth_ack_message(nonce: bytes)` to create the reply
|
|
||||||
responder_nonce = secrets.token_bytes(HASH_LEN)
|
|
||||||
auth_ack_msg = responder.create_auth_ack_message(nonce=responder_nonce)
|
|
||||||
auth_ack_ciphertext = responder.encrypt_auth_ack_message(auth_ack_msg)
|
|
||||||
|
|
||||||
# Use the `writer` to send the reply to the remote
|
|
||||||
writer.write(auth_ack_ciphertext)
|
|
||||||
await writer.drain()
|
|
||||||
|
|
||||||
# Call `HandshakeResponder.derive_shared_secrets()` and use return values to create `Peer`
|
|
||||||
aes_secret, mac_secret, egress_mac, ingress_mac = responder.derive_secrets(
|
|
||||||
initiator_nonce=initiator_nonce,
|
|
||||||
responder_nonce=responder_nonce,
|
|
||||||
remote_ephemeral_pubkey=ephem_pubkey,
|
|
||||||
auth_init_ciphertext=msg,
|
|
||||||
auth_ack_ciphertext=auth_ack_ciphertext
|
|
||||||
)
|
|
||||||
|
|
||||||
# Create and register peer in peer_pool
|
|
||||||
eth_peer = ETHPeer(
|
|
||||||
remote=initiator_remote, privkey=self.privkey, reader=reader,
|
|
||||||
writer=writer, aes_secret=aes_secret, mac_secret=mac_secret,
|
|
||||||
egress_mac=egress_mac, ingress_mac=ingress_mac, chaindb=self.chaindb,
|
|
||||||
network_id=self.network_id
|
|
||||||
)
|
|
||||||
self.peer_pool.add_peer(eth_peer)
|
|
||||||
|
|
||||||
|
|
||||||
def decode_authentication(ciphertext: bytes,
|
|
||||||
privkey: datatypes.PrivateKey
|
|
||||||
) -> Tuple[datatypes.PublicKey, bytes, datatypes.PublicKey]:
|
|
||||||
"""
|
|
||||||
Decrypts and decodes the ciphertext msg.
|
|
||||||
Returns the initiator's ephemeral pubkey, nonce, and pubkey.
|
|
||||||
"""
|
|
||||||
if len(ciphertext) < ENCRYPTED_AUTH_MSG_LEN:
|
|
||||||
raise ValueError("Auth msg too short: {}".format(len(ciphertext)))
|
|
||||||
elif len(ciphertext) == ENCRYPTED_AUTH_MSG_LEN:
|
|
||||||
sig, initiator_pubkey, initiator_nonce, _ = decode_auth_plain(
|
|
||||||
ciphertext, privkey)
|
|
||||||
else:
|
else:
|
||||||
sig, initiator_pubkey, initiator_nonce, _ = decode_auth_eip8(
|
echo "Could not establish connection with incoming peer"
|
||||||
ciphertext, privkey)
|
|
||||||
|
|
||||||
# recover initiator ephemeral pubkey from sig
|
|
||||||
# S(ephemeral-privk, ecdh-shared-secret ^ nonce)
|
|
||||||
shared_secret = ecdh_agree(privkey, initiator_pubkey)
|
|
||||||
|
|
||||||
ephem_pubkey = sig.recover_public_key_from_msg_hash(
|
|
||||||
sxor(shared_secret, initiator_nonce))
|
|
||||||
|
|
||||||
return ephem_pubkey, initiator_nonce, initiator_pubkey
|
|
||||||
|
|
||||||
|
|
||||||
proc run(s: Server) {.async.} =
|
proc run(s: Server) {.async.} =
|
||||||
s.socket = newAsyncSocket()
|
s.socket = newAsyncSocket()
|
||||||
s.socket.setSockOpt(OptReuseAddr, true)
|
s.socket.setSockOpt(OptReuseAddr, true)
|
||||||
s.socket.setSockOpt(OptReusePort, true)
|
|
||||||
s.socket.bindAddr(s.address.tcpPort)
|
s.socket.bindAddr(s.address.tcpPort)
|
||||||
s.socket.listen()
|
s.socket.listen()
|
||||||
|
|
||||||
|
@ -119,7 +41,7 @@ proc run(s: Server) {.async.} =
|
||||||
asyncCheck s.receiveHandshake(address, client)
|
asyncCheck s.receiveHandshake(address, client)
|
||||||
|
|
||||||
proc start*(s: Server) =
|
proc start*(s: Server) =
|
||||||
if s.isRunning:
|
if not s.isRunning:
|
||||||
asyncCheck s.run()
|
asyncCheck s.run()
|
||||||
|
|
||||||
proc stop*(s: Server) =
|
proc stop*(s: Server) =
|
||||||
|
@ -127,4 +49,3 @@ proc stop*(s: Server) =
|
||||||
s.socket.close()
|
s.socket.close()
|
||||||
s.socket = nil
|
s.socket = nil
|
||||||
# s.peerPool.stop() # XXX
|
# s.peerPool.stop() # XXX
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
import ../ethp2p/[discovery, kademlia, peer_pool, enode, server, rlpx]
|
||||||
|
import eth_keys, net, asyncdispatch, sequtils
|
||||||
|
|
||||||
|
proc localAddress(port: int): Address =
|
||||||
|
let port = Port(port)
|
||||||
|
result = Address(udpPort: port, tcpPort: port, ip: parseIpAddress("127.0.0.1"))
|
||||||
|
|
||||||
|
|
||||||
|
proc test() {.async.} =
|
||||||
|
let kp = newKeyPair()
|
||||||
|
let address = localAddress(20301)
|
||||||
|
|
||||||
|
let s = newServer(kp, address, nil, [], 1)
|
||||||
|
s.start()
|
||||||
|
|
||||||
|
await sleepAsync(500)
|
||||||
|
|
||||||
|
let n = newNode(initENode(kp.pubKey, address))
|
||||||
|
let peer = await rlpxConnect(newKeyPair(), n)
|
||||||
|
|
||||||
|
doAssert(not peer.isNil)
|
||||||
|
|
||||||
|
waitFor test()
|
Loading…
Reference in New Issue