mirror of
https://github.com/codex-storage/nim-codex-dht.git
synced 2025-01-11 04:16:18 +00:00
splitting protocol in lower and upper half
Protocol was actually made of two sub-protocols. * a lower-half handling authentication, encryption, key exchange, and request/response. This is now called Transport. * an upper-half handling DHT messages. This is still called Protocol. Separation of these two reduces dependencies and simplifies modifications to the protocol. Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
parent
a33da803a5
commit
fd350b4fe5
@ -78,7 +78,7 @@ import
|
|||||||
stew/shims/net as stewNet, json_serialization/std/net,
|
stew/shims/net as stewNet, json_serialization/std/net,
|
||||||
stew/[endians2, results], chronicles, chronos, stint, bearssl, metrics,
|
stew/[endians2, results], chronicles, chronos, stint, bearssl, metrics,
|
||||||
".."/../[rlp, keys, async_utils],
|
".."/../[rlp, keys, async_utils],
|
||||||
"."/[messages, messages_encoding, encoding, node, routing_table, enr, random2, sessions, ip_vote, nodes_verification]
|
"."/[transport, messages, messages_encoding, encoding, node, routing_table, enr, random2, ip_vote, nodes_verification]
|
||||||
|
|
||||||
import nimcrypto except toHex
|
import nimcrypto except toHex
|
||||||
|
|
||||||
@ -110,8 +110,6 @@ const
|
|||||||
ipMajorityInterval = 5.minutes ## Interval for checking the latest IP:Port
|
ipMajorityInterval = 5.minutes ## Interval for checking the latest IP:Port
|
||||||
## majority and updating this when ENR auto update is set.
|
## majority and updating this when ENR auto update is set.
|
||||||
initialLookups = 1 ## Amount of lookups done when populating the routing table
|
initialLookups = 1 ## Amount of lookups done when populating the routing table
|
||||||
handshakeTimeout* = 2.seconds ## timeout for the reply on the
|
|
||||||
## whoareyou message
|
|
||||||
responseTimeout* = 4.seconds ## timeout for the response of a request-response
|
responseTimeout* = 4.seconds ## timeout for the response of a request-response
|
||||||
## call
|
## call
|
||||||
|
|
||||||
@ -121,13 +119,10 @@ type
|
|||||||
bitsPerHop*: int
|
bitsPerHop*: int
|
||||||
|
|
||||||
Protocol* = ref object
|
Protocol* = ref object
|
||||||
transp: DatagramTransport
|
|
||||||
localNode*: Node
|
localNode*: Node
|
||||||
privateKey: PrivateKey
|
privateKey: PrivateKey
|
||||||
bindAddress: Address ## UDP binding address
|
transport*: Transport[Protocol] # exported for tests
|
||||||
pendingRequests: Table[AESGCMNonce, PendingRequest]
|
|
||||||
routingTable*: RoutingTable
|
routingTable*: RoutingTable
|
||||||
codec*: Codec
|
|
||||||
awaitedMessages: Table[(NodeId, RequestId), Future[Option[Message]]]
|
awaitedMessages: Table[(NodeId, RequestId), Future[Option[Message]]]
|
||||||
refreshLoop: Future[void]
|
refreshLoop: Future[void]
|
||||||
revalidateLoop: Future[void]
|
revalidateLoop: Future[void]
|
||||||
@ -140,10 +135,6 @@ type
|
|||||||
# overkill here, use sequence
|
# overkill here, use sequence
|
||||||
rng*: ref BrHmacDrbgContext
|
rng*: ref BrHmacDrbgContext
|
||||||
|
|
||||||
PendingRequest = object
|
|
||||||
node: Node
|
|
||||||
message: seq[byte]
|
|
||||||
|
|
||||||
TalkProtocolHandler* = proc(p: TalkProtocol, request: seq[byte], fromId: NodeId, fromUdpAddress: Address): seq[byte]
|
TalkProtocolHandler* = proc(p: TalkProtocol, request: seq[byte], fromId: NodeId, fromUdpAddress: Address): seq[byte]
|
||||||
{.gcsafe, raises: [Defect].}
|
{.gcsafe, raises: [Defect].}
|
||||||
|
|
||||||
@ -233,37 +224,13 @@ proc updateRecord*(
|
|||||||
# TODO: Would it make sense to actively ping ("broadcast") to all the peers
|
# TODO: Would it make sense to actively ping ("broadcast") to all the peers
|
||||||
# we stored a handshake with in order to get that ENR updated?
|
# we stored a handshake with in order to get that ENR updated?
|
||||||
|
|
||||||
proc send*(d: Protocol, a: Address, data: seq[byte]) =
|
|
||||||
let ta = initTAddress(a.ip, a.port)
|
|
||||||
let f = d.transp.sendTo(ta, data)
|
|
||||||
f.callback = proc(data: pointer) {.gcsafe.} =
|
|
||||||
if f.failed:
|
|
||||||
# Could be `TransportUseClosedError` in case the transport is already
|
|
||||||
# closed, or could be `TransportOsError` in case of a socket error.
|
|
||||||
# In the latter case this would probably mostly occur if the network
|
|
||||||
# interface underneath gets disconnected or similar.
|
|
||||||
# TODO: Should this kind of error be propagated upwards? Probably, but
|
|
||||||
# it should not stop the process as that would reset the discovery
|
|
||||||
# progress in case there is even a small window of no connection.
|
|
||||||
# One case that needs this error available upwards is when revalidating
|
|
||||||
# nodes. Else the revalidation might end up clearing the routing tabl
|
|
||||||
# because of ping failures due to own network connection failure.
|
|
||||||
warn "Discovery send failed", msg = f.readError.msg
|
|
||||||
|
|
||||||
proc send(d: Protocol, n: Node, data: seq[byte]) =
|
|
||||||
doAssert(n.address.isSome())
|
|
||||||
d.send(n.address.get(), data)
|
|
||||||
|
|
||||||
proc sendNodes(d: Protocol, toId: NodeId, toAddr: Address, reqId: RequestId,
|
proc sendNodes(d: Protocol, toId: NodeId, toAddr: Address, reqId: RequestId,
|
||||||
nodes: openArray[Node]) =
|
nodes: openArray[Node]) =
|
||||||
proc sendNodes(d: Protocol, toId: NodeId, toAddr: Address,
|
proc sendNodes(d: Protocol, toId: NodeId, toAddr: Address,
|
||||||
message: NodesMessage, reqId: RequestId) {.nimcall.} =
|
message: NodesMessage, reqId: RequestId) {.nimcall.} =
|
||||||
let (data, _) = encodeMessagePacket(d.rng[], d.codec, toId, toAddr,
|
|
||||||
encodeMessage(message, reqId))
|
|
||||||
|
|
||||||
trace "Respond message packet", dstId = toId, address = toAddr,
|
trace "Respond message packet", dstId = toId, address = toAddr,
|
||||||
kind = MessageKind.nodes
|
kind = MessageKind.nodes
|
||||||
d.send(toAddr, data)
|
d.transport.send(toId, toAddr, encodeMessage(message, reqId))
|
||||||
|
|
||||||
if nodes.len == 0:
|
if nodes.len == 0:
|
||||||
# In case of 0 nodes, a reply is still needed
|
# In case of 0 nodes, a reply is still needed
|
||||||
@ -289,13 +256,9 @@ proc handlePing(d: Protocol, fromId: NodeId, fromAddr: Address,
|
|||||||
ping: PingMessage, reqId: RequestId) =
|
ping: PingMessage, reqId: RequestId) =
|
||||||
let pong = PongMessage(enrSeq: d.localNode.record.seqNum, ip: fromAddr.ip,
|
let pong = PongMessage(enrSeq: d.localNode.record.seqNum, ip: fromAddr.ip,
|
||||||
port: fromAddr.port.uint16)
|
port: fromAddr.port.uint16)
|
||||||
|
|
||||||
let (data, _) = encodeMessagePacket(d.rng[], d.codec, fromId, fromAddr,
|
|
||||||
encodeMessage(pong, reqId))
|
|
||||||
|
|
||||||
trace "Respond message packet", dstId = fromId, address = fromAddr,
|
trace "Respond message packet", dstId = fromId, address = fromAddr,
|
||||||
kind = MessageKind.pong
|
kind = MessageKind.pong
|
||||||
d.send(fromAddr, data)
|
d.transport.send(fromId, fromAddr, encodeMessage(pong, reqId))
|
||||||
|
|
||||||
proc handleFindNode(d: Protocol, fromId: NodeId, fromAddr: Address,
|
proc handleFindNode(d: Protocol, fromId: NodeId, fromAddr: Address,
|
||||||
fn: FindNodeMessage, reqId: RequestId) =
|
fn: FindNodeMessage, reqId: RequestId) =
|
||||||
@ -328,12 +291,10 @@ proc handleTalkReq(d: Protocol, fromId: NodeId, fromAddr: Address,
|
|||||||
else:
|
else:
|
||||||
TalkRespMessage(response: talkProtocol.protocolHandler(talkProtocol,
|
TalkRespMessage(response: talkProtocol.protocolHandler(talkProtocol,
|
||||||
talkreq.request, fromId, fromAddr))
|
talkreq.request, fromId, fromAddr))
|
||||||
let (data, _) = encodeMessagePacket(d.rng[], d.codec, fromId, fromAddr,
|
|
||||||
encodeMessage(talkresp, reqId))
|
|
||||||
|
|
||||||
trace "Respond message packet", dstId = fromId, address = fromAddr,
|
trace "Respond message packet", dstId = fromId, address = fromAddr,
|
||||||
kind = MessageKind.talkresp
|
kind = MessageKind.talkresp
|
||||||
d.send(fromAddr, data)
|
d.transport.send(fromId, fromAddr, encodeMessage(talkresp, reqId))
|
||||||
|
|
||||||
proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address,
|
proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address,
|
||||||
message: Message) =
|
message: Message) =
|
||||||
@ -369,97 +330,6 @@ proc registerTalkProtocol*(d: Protocol, protocolId: seq[byte],
|
|||||||
else:
|
else:
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
proc sendWhoareyou(d: Protocol, toId: NodeId, a: Address,
|
|
||||||
requestNonce: AESGCMNonce, node: Option[Node]) =
|
|
||||||
let key = HandshakeKey(nodeId: toId, address: a)
|
|
||||||
if not d.codec.hasHandshake(key):
|
|
||||||
let
|
|
||||||
recordSeq = if node.isSome(): node.get().record.seqNum
|
|
||||||
else: 0
|
|
||||||
pubkey = if node.isSome(): some(node.get().pubkey)
|
|
||||||
else: none(PublicKey)
|
|
||||||
|
|
||||||
let data = encodeWhoareyouPacket(d.rng[], d.codec, toId, a, requestNonce,
|
|
||||||
recordSeq, pubkey)
|
|
||||||
sleepAsync(handshakeTimeout).addCallback() do(data: pointer):
|
|
||||||
# TODO: should we still provide cancellation in case handshake completes
|
|
||||||
# correctly?
|
|
||||||
d.codec.handshakes.del(key)
|
|
||||||
|
|
||||||
trace "Send whoareyou", dstId = toId, address = a
|
|
||||||
d.send(a, data)
|
|
||||||
else:
|
|
||||||
debug "Node with this id already has ongoing handshake, ignoring packet"
|
|
||||||
|
|
||||||
proc receive*(d: Protocol, a: Address, packet: openArray[byte]) =
|
|
||||||
let decoded = d.codec.decodePacket(a, packet)
|
|
||||||
if decoded.isOk:
|
|
||||||
let packet = decoded[]
|
|
||||||
case packet.flag
|
|
||||||
of OrdinaryMessage:
|
|
||||||
if packet.messageOpt.isSome():
|
|
||||||
let message = packet.messageOpt.get()
|
|
||||||
trace "Received message packet", srcId = packet.srcId, address = a,
|
|
||||||
kind = message.kind
|
|
||||||
d.handleMessage(packet.srcId, a, message)
|
|
||||||
else:
|
|
||||||
trace "Not decryptable message packet received",
|
|
||||||
srcId = packet.srcId, address = a
|
|
||||||
d.sendWhoareyou(packet.srcId, a, packet.requestNonce,
|
|
||||||
d.getNode(packet.srcId))
|
|
||||||
|
|
||||||
of Flag.Whoareyou:
|
|
||||||
trace "Received whoareyou packet", address = a
|
|
||||||
var pr: PendingRequest
|
|
||||||
if d.pendingRequests.take(packet.whoareyou.requestNonce, pr):
|
|
||||||
let toNode = pr.node
|
|
||||||
# This is a node we previously contacted and thus must have an address.
|
|
||||||
doAssert(toNode.address.isSome())
|
|
||||||
let address = toNode.address.get()
|
|
||||||
let data = encodeHandshakePacket(d.rng[], d.codec, toNode.id,
|
|
||||||
address, pr.message, packet.whoareyou, toNode.pubkey)
|
|
||||||
|
|
||||||
trace "Send handshake message packet", dstId = toNode.id, address
|
|
||||||
d.send(toNode, data)
|
|
||||||
else:
|
|
||||||
debug "Timed out or unrequested whoareyou packet", address = a
|
|
||||||
of HandshakeMessage:
|
|
||||||
trace "Received handshake message packet", srcId = packet.srcIdHs,
|
|
||||||
address = a, kind = packet.message.kind
|
|
||||||
d.handleMessage(packet.srcIdHs, a, packet.message)
|
|
||||||
# For a handshake message it is possible that we received an newer ENR.
|
|
||||||
# In that case we can add/update it to the routing table.
|
|
||||||
if packet.node.isSome():
|
|
||||||
let node = packet.node.get()
|
|
||||||
# Lets not add nodes without correct IP in the ENR to the routing table.
|
|
||||||
# The ENR could contain bogus IPs and although they would get removed
|
|
||||||
# on the next revalidation, one could spam these as the handshake
|
|
||||||
# message occurs on (first) incoming messages.
|
|
||||||
if node.address.isSome() and a == node.address.get():
|
|
||||||
if d.addNode(node):
|
|
||||||
trace "Added new node to routing table after handshake", node
|
|
||||||
else:
|
|
||||||
trace "Packet decoding error", error = decoded.error, address = a
|
|
||||||
|
|
||||||
proc processClient(transp: DatagramTransport, raddr: TransportAddress):
|
|
||||||
Future[void] {.async.} =
|
|
||||||
let proto = getUserData[Protocol](transp)
|
|
||||||
|
|
||||||
# TODO: should we use `peekMessage()` to avoid allocation?
|
|
||||||
let buf = try: transp.getMessage()
|
|
||||||
except TransportOsError as e:
|
|
||||||
# This is likely to be local network connection issues.
|
|
||||||
warn "Transport getMessage", exception = e.name, msg = e.msg
|
|
||||||
return
|
|
||||||
|
|
||||||
let ip = try: raddr.address()
|
|
||||||
except ValueError as e:
|
|
||||||
error "Not a valid IpAddress", exception = e.name, msg = e.msg
|
|
||||||
return
|
|
||||||
let a = Address(ip: ValidIpAddress.init(ip), port: raddr.port)
|
|
||||||
|
|
||||||
proto.receive(a, buf)
|
|
||||||
|
|
||||||
proc replaceNode(d: Protocol, n: Node) =
|
proc replaceNode(d: Protocol, n: Node) =
|
||||||
if n.record notin d.bootstrapRecords:
|
if n.record notin d.bootstrapRecords:
|
||||||
d.routingTable.replaceNode(n)
|
d.routingTable.replaceNode(n)
|
||||||
@ -469,15 +339,6 @@ proc replaceNode(d: Protocol, n: Node) =
|
|||||||
# peers in the routing table.
|
# peers in the routing table.
|
||||||
debug "Message request to bootstrap node failed", enr = toURI(n.record)
|
debug "Message request to bootstrap node failed", enr = toURI(n.record)
|
||||||
|
|
||||||
# TODO: This could be improved to do the clean-up immediatily in case a non
|
|
||||||
# whoareyou response does arrive, but we would need to store the AuthTag
|
|
||||||
# somewhere
|
|
||||||
proc registerRequest(d: Protocol, n: Node, message: seq[byte],
|
|
||||||
nonce: AESGCMNonce) =
|
|
||||||
let request = PendingRequest(node: n, message: message)
|
|
||||||
if not d.pendingRequests.hasKeyOrPut(nonce, request):
|
|
||||||
sleepAsync(responseTimeout).addCallback() do(data: pointer):
|
|
||||||
d.pendingRequests.del(nonce)
|
|
||||||
|
|
||||||
proc waitMessage(d: Protocol, fromNode: Node, reqId: RequestId):
|
proc waitMessage(d: Protocol, fromNode: Node, reqId: RequestId):
|
||||||
Future[Option[Message]] =
|
Future[Option[Message]] =
|
||||||
@ -526,13 +387,10 @@ proc sendMessage*[T: SomeMessage](d: Protocol, toNode: Node, m: T):
|
|||||||
reqId = RequestId.init(d.rng[])
|
reqId = RequestId.init(d.rng[])
|
||||||
message = encodeMessage(m, reqId)
|
message = encodeMessage(m, reqId)
|
||||||
|
|
||||||
let (data, nonce) = encodeMessagePacket(d.rng[], d.codec, toNode.id,
|
|
||||||
address, message)
|
|
||||||
|
|
||||||
d.registerRequest(toNode, message, nonce)
|
|
||||||
trace "Send message packet", dstId = toNode.id, address, kind = messageKind(T)
|
trace "Send message packet", dstId = toNode.id, address, kind = messageKind(T)
|
||||||
d.send(toNode, data)
|
|
||||||
discovery_message_requests_outgoing.inc()
|
discovery_message_requests_outgoing.inc()
|
||||||
|
|
||||||
|
d.transport.sendMessage(toNode, address, message)
|
||||||
return reqId
|
return reqId
|
||||||
|
|
||||||
proc ping*(d: Protocol, toNode: Node):
|
proc ping*(d: Protocol, toNode: Node):
|
||||||
@ -947,12 +805,9 @@ proc newProtocol*(
|
|||||||
# TODO Consider whether this should be a Defect
|
# TODO Consider whether this should be a Defect
|
||||||
doAssert rng != nil, "RNG initialization failed"
|
doAssert rng != nil, "RNG initialization failed"
|
||||||
|
|
||||||
Protocol(
|
result = Protocol(
|
||||||
privateKey: privKey,
|
privateKey: privKey,
|
||||||
localNode: node,
|
localNode: node,
|
||||||
bindAddress: Address(ip: ValidIpAddress.init(bindIp), port: bindPort),
|
|
||||||
codec: Codec(localNode: node, privKey: privKey,
|
|
||||||
sessions: Sessions.init(256)),
|
|
||||||
bootstrapRecords: @bootstrapRecords,
|
bootstrapRecords: @bootstrapRecords,
|
||||||
ipVote: IpVote.init(),
|
ipVote: IpVote.init(),
|
||||||
enrAutoUpdate: enrAutoUpdate,
|
enrAutoUpdate: enrAutoUpdate,
|
||||||
@ -960,16 +815,13 @@ proc newProtocol*(
|
|||||||
node, config.bitsPerHop, config.tableIpLimits, rng),
|
node, config.bitsPerHop, config.tableIpLimits, rng),
|
||||||
rng: rng)
|
rng: rng)
|
||||||
|
|
||||||
template listeningAddress*(p: Protocol): Address =
|
result.transport = newTransport(result, privKey, node, bindPort, bindIp, rng)
|
||||||
p.bindAddress
|
|
||||||
|
|
||||||
proc open*(d: Protocol) {.raises: [Defect, CatchableError].} =
|
proc open*(d: Protocol) {.raises: [Defect, CatchableError].} =
|
||||||
info "Starting discovery node", node = d.localNode,
|
info "Starting discovery node", node = d.localNode
|
||||||
bindAddress = d.bindAddress
|
|
||||||
|
|
||||||
# TODO allow binding to specific IP / IPv6 / etc
|
d.transport.open()
|
||||||
let ta = initTAddress(d.bindAddress.ip, d.bindAddress.port)
|
|
||||||
d.transp = newDatagramTransport(processClient, udata = d, local = ta)
|
|
||||||
|
|
||||||
d.seedTable()
|
d.seedTable()
|
||||||
|
|
||||||
@ -979,7 +831,7 @@ proc start*(d: Protocol) =
|
|||||||
d.ipMajorityLoop = ipMajorityLoop(d)
|
d.ipMajorityLoop = ipMajorityLoop(d)
|
||||||
|
|
||||||
proc close*(d: Protocol) =
|
proc close*(d: Protocol) =
|
||||||
doAssert(not d.transp.closed)
|
doAssert(not d.transport.closed)
|
||||||
|
|
||||||
debug "Closing discovery node", node = d.localNode
|
debug "Closing discovery node", node = d.localNode
|
||||||
if not d.revalidateLoop.isNil:
|
if not d.revalidateLoop.isNil:
|
||||||
@ -989,10 +841,10 @@ proc close*(d: Protocol) =
|
|||||||
if not d.ipMajorityLoop.isNil:
|
if not d.ipMajorityLoop.isNil:
|
||||||
d.ipMajorityLoop.cancel()
|
d.ipMajorityLoop.cancel()
|
||||||
|
|
||||||
d.transp.close()
|
d.transport.close()
|
||||||
|
|
||||||
proc closeWait*(d: Protocol) {.async.} =
|
proc closeWait*(d: Protocol) {.async.} =
|
||||||
doAssert(not d.transp.closed)
|
doAssert(not d.transport.closed)
|
||||||
|
|
||||||
debug "Closing discovery node", node = d.localNode
|
debug "Closing discovery node", node = d.localNode
|
||||||
if not d.revalidateLoop.isNil:
|
if not d.revalidateLoop.isNil:
|
||||||
@ -1002,4 +854,4 @@ proc closeWait*(d: Protocol) {.async.} =
|
|||||||
if not d.ipMajorityLoop.isNil:
|
if not d.ipMajorityLoop.isNil:
|
||||||
await d.ipMajorityLoop.cancelAndWait()
|
await d.ipMajorityLoop.cancelAndWait()
|
||||||
|
|
||||||
await d.transp.closeWait()
|
await d.transport.closeWait()
|
||||||
|
202
eth/p2p/discoveryv5/transport.nim
Normal file
202
eth/p2p/discoveryv5/transport.nim
Normal file
@ -0,0 +1,202 @@
|
|||||||
|
# Copyright (c) 2020-2021 Status Research & Development GmbH
|
||||||
|
# Licensed and distributed under either of
|
||||||
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
|
# Everything below the handling of ordinary messages
|
||||||
|
import
|
||||||
|
std/[tables, options],
|
||||||
|
chronos,
|
||||||
|
chronicles,
|
||||||
|
stew/shims/net,
|
||||||
|
"."/[node, encoding, sessions]
|
||||||
|
|
||||||
|
const
|
||||||
|
handshakeTimeout* = 2.seconds ## timeout for the reply on the
|
||||||
|
## whoareyou message
|
||||||
|
responseTimeout* = 4.seconds ## timeout for the response of a request-response
|
||||||
|
## call
|
||||||
|
|
||||||
|
type
|
||||||
|
Transport* [Client] = ref object
|
||||||
|
client: Client
|
||||||
|
bindAddress: Address ## UDP binding address
|
||||||
|
transp: DatagramTransport
|
||||||
|
pendingRequests: Table[AESGCMNonce, PendingRequest]
|
||||||
|
codec*: Codec
|
||||||
|
rng: ref BrHmacDrbgContext
|
||||||
|
|
||||||
|
PendingRequest = object
|
||||||
|
node: Node
|
||||||
|
message: seq[byte]
|
||||||
|
|
||||||
|
proc send*(t: Transport, a: Address, data: seq[byte]) =
|
||||||
|
let ta = initTAddress(a.ip, a.port)
|
||||||
|
let f = t.transp.sendTo(ta, data)
|
||||||
|
f.callback = proc(data: pointer) {.gcsafe.} =
|
||||||
|
if f.failed:
|
||||||
|
# Could be `TransportUseClosedError` in case the transport is already
|
||||||
|
# closed, or could be `TransportOsError` in case of a socket error.
|
||||||
|
# In the latter case this would probably mostly occur if the network
|
||||||
|
# interface underneath gets disconnected or similar.
|
||||||
|
# TODO: Should this kind of error be propagated upwards? Probably, but
|
||||||
|
# it should not stop the process as that would reset the discovery
|
||||||
|
# progress in case there is even a small window of no connection.
|
||||||
|
# One case that needs this error available upwards is when revalidating
|
||||||
|
# nodes. Else the revalidation might end up clearing the routing tabl
|
||||||
|
# because of ping failures due to own network connection failure.
|
||||||
|
warn "Discovery send failed", msg = f.readError.msg
|
||||||
|
|
||||||
|
proc send(t: Transport, n: Node, data: seq[byte]) =
|
||||||
|
doAssert(n.address.isSome())
|
||||||
|
t.send(n.address.get(), data)
|
||||||
|
|
||||||
|
proc send*(t: Transport, toId: NodeId, toAddr: Address, message: seq[byte]) =
|
||||||
|
let (data, _) = encodeMessagePacket(t.rng[], t.codec, toId, toAddr,
|
||||||
|
message)
|
||||||
|
t.send(toAddr, data)
|
||||||
|
|
||||||
|
# TODO: This could be improved to do the clean-up immediatily in case a non
|
||||||
|
# whoareyou response does arrive, but we would need to store the AuthTag
|
||||||
|
# somewhere
|
||||||
|
proc registerRequest(t: Transport, n: Node, message: seq[byte],
|
||||||
|
nonce: AESGCMNonce) =
|
||||||
|
let request = PendingRequest(node: n, message: message)
|
||||||
|
if not t.pendingRequests.hasKeyOrPut(nonce, request):
|
||||||
|
sleepAsync(responseTimeout).addCallback() do(data: pointer):
|
||||||
|
t.pendingRequests.del(nonce)
|
||||||
|
|
||||||
|
##Todo: remove dependence on message. This should be higher
|
||||||
|
proc sendMessage*(t: Transport, toNode: Node, address: Address, message: seq[byte]) =
|
||||||
|
let (data, nonce) = encodeMessagePacket(t.rng[], t.codec, toNode.id,
|
||||||
|
address, message)
|
||||||
|
|
||||||
|
t.registerRequest(toNode, message, nonce)
|
||||||
|
t.send(toNode, data)
|
||||||
|
|
||||||
|
proc sendWhoareyou(t: Transport, toId: NodeId, a: Address,
|
||||||
|
requestNonce: AESGCMNonce, node: Option[Node]) =
|
||||||
|
let key = HandshakeKey(nodeId: toId, address: a)
|
||||||
|
if not t.codec.hasHandshake(key):
|
||||||
|
let
|
||||||
|
recordSeq = if node.isSome(): node.get().record.seqNum
|
||||||
|
else: 0
|
||||||
|
pubkey = if node.isSome(): some(node.get().pubkey)
|
||||||
|
else: none(PublicKey)
|
||||||
|
|
||||||
|
let data = encodeWhoareyouPacket(t.rng[], t.codec, toId, a, requestNonce,
|
||||||
|
recordSeq, pubkey)
|
||||||
|
sleepAsync(handshakeTimeout).addCallback() do(data: pointer):
|
||||||
|
# TODO: should we still provide cancellation in case handshake completes
|
||||||
|
# correctly?
|
||||||
|
t.codec.handshakes.del(key)
|
||||||
|
|
||||||
|
trace "Send whoareyou", dstId = toId, address = a
|
||||||
|
t.send(a, data)
|
||||||
|
else:
|
||||||
|
debug "Node with this id already has ongoing handshake, ignoring packet"
|
||||||
|
|
||||||
|
proc receive*(t: Transport, a: Address, packet: openArray[byte]) =
|
||||||
|
let decoded = t.codec.decodePacket(a, packet)
|
||||||
|
if decoded.isOk:
|
||||||
|
let packet = decoded[]
|
||||||
|
case packet.flag
|
||||||
|
of OrdinaryMessage:
|
||||||
|
if packet.messageOpt.isSome():
|
||||||
|
let message = packet.messageOpt.get()
|
||||||
|
trace "Received message packet", srcId = packet.srcId, address = a,
|
||||||
|
kind = message.kind
|
||||||
|
t.client.handleMessage(packet.srcId, a, message)
|
||||||
|
else:
|
||||||
|
trace "Not decryptable message packet received",
|
||||||
|
srcId = packet.srcId, address = a
|
||||||
|
t.sendWhoareyou(packet.srcId, a, packet.requestNonce,
|
||||||
|
t.client.getNode(packet.srcId))
|
||||||
|
|
||||||
|
of Flag.Whoareyou:
|
||||||
|
trace "Received whoareyou packet", address = a
|
||||||
|
var pr: PendingRequest
|
||||||
|
if t.pendingRequests.take(packet.whoareyou.requestNonce, pr):
|
||||||
|
let toNode = pr.node
|
||||||
|
# This is a node we previously contacted and thus must have an address.
|
||||||
|
doAssert(toNode.address.isSome())
|
||||||
|
let address = toNode.address.get()
|
||||||
|
let data = encodeHandshakePacket(t.rng[], t.codec, toNode.id,
|
||||||
|
address, pr.message, packet.whoareyou, toNode.pubkey)
|
||||||
|
|
||||||
|
trace "Send handshake message packet", dstId = toNode.id, address
|
||||||
|
t.send(toNode, data)
|
||||||
|
else:
|
||||||
|
debug "Timed out or unrequested whoareyou packet", address = a
|
||||||
|
of HandshakeMessage:
|
||||||
|
trace "Received handshake message packet", srcId = packet.srcIdHs,
|
||||||
|
address = a, kind = packet.message.kind
|
||||||
|
t.client.handleMessage(packet.srcIdHs, a, packet.message)
|
||||||
|
# For a handshake message it is possible that we received an newer ENR.
|
||||||
|
# In that case we can add/update it to the routing table.
|
||||||
|
if packet.node.isSome():
|
||||||
|
let node = packet.node.get()
|
||||||
|
# Lets not add nodes without correct IP in the ENR to the routing table.
|
||||||
|
# The ENR could contain bogus IPs and although they would get removed
|
||||||
|
# on the next revalidation, one could spam these as the handshake
|
||||||
|
# message occurs on (first) incoming messages.
|
||||||
|
if node.address.isSome() and a == node.address.get():
|
||||||
|
if t.client.addNode(node):
|
||||||
|
trace "Added new node to routing table after handshake", node
|
||||||
|
else:
|
||||||
|
trace "Packet decoding error", error = decoded.error, address = a
|
||||||
|
|
||||||
|
proc processClient[T](transp: DatagramTransport, raddr: TransportAddress):
|
||||||
|
Future[void] {.async.} =
|
||||||
|
let t = getUserData[Transport[T]](transp)
|
||||||
|
|
||||||
|
# TODO: should we use `peekMessage()` to avoid allocation?
|
||||||
|
let buf = try: transp.getMessage()
|
||||||
|
except TransportOsError as e:
|
||||||
|
# This is likely to be local network connection issues.
|
||||||
|
warn "Transport getMessage", exception = e.name, msg = e.msg
|
||||||
|
return
|
||||||
|
|
||||||
|
let ip = try: raddr.address()
|
||||||
|
except ValueError as e:
|
||||||
|
error "Not a valid IpAddress", exception = e.name, msg = e.msg
|
||||||
|
return
|
||||||
|
let a = Address(ip: ValidIpAddress.init(ip), port: raddr.port)
|
||||||
|
|
||||||
|
t.receive(a, buf)
|
||||||
|
|
||||||
|
proc open*[T](t: Transport[T]) {.raises: [Defect, CatchableError].} =
|
||||||
|
info "Starting transport", bindAddress = t.bindAddress
|
||||||
|
|
||||||
|
# TODO allow binding to specific IP / IPv6 / etc
|
||||||
|
let ta = initTAddress(t.bindAddress.ip, t.bindAddress.port)
|
||||||
|
t.transp = newDatagramTransport(processClient[T], udata = t, local = ta)
|
||||||
|
|
||||||
|
proc close*(t: Transport) =
|
||||||
|
t.transp.close
|
||||||
|
|
||||||
|
proc closed*(t: Transport) : bool =
|
||||||
|
t.transp.closed
|
||||||
|
|
||||||
|
proc closeWait*(t: Transport) {.async.} =
|
||||||
|
await t.transp.closeWait
|
||||||
|
|
||||||
|
proc newTransport*[T](
|
||||||
|
client: T,
|
||||||
|
privKey: PrivateKey,
|
||||||
|
localNode: Node,
|
||||||
|
bindPort: Port,
|
||||||
|
bindIp = IPv4_any(),
|
||||||
|
rng = newRng()):
|
||||||
|
Transport[T]=
|
||||||
|
|
||||||
|
# TODO Consider whether this should be a Defect
|
||||||
|
doAssert rng != nil, "RNG initialization failed"
|
||||||
|
|
||||||
|
Transport[T](
|
||||||
|
client: client,
|
||||||
|
bindAddress: Address(ip: ValidIpAddress.init(bindIp), port: bindPort),
|
||||||
|
codec: Codec(localNode: localNode, privKey: privKey,
|
||||||
|
sessions: Sessions.init(256)),
|
||||||
|
rng: rng)
|
@ -5,7 +5,7 @@ import
|
|||||||
chronos, chronicles, stint, testutils/unittests, stew/shims/net,
|
chronos, chronicles, stint, testutils/unittests, stew/shims/net,
|
||||||
stew/byteutils, bearssl,
|
stew/byteutils, bearssl,
|
||||||
../../eth/keys,
|
../../eth/keys,
|
||||||
../../eth/p2p/discoveryv5/[enr, node, routing_table, encoding, sessions, messages, nodes_verification],
|
../../eth/p2p/discoveryv5/[transport, enr, node, routing_table, encoding, sessions, messages, nodes_verification],
|
||||||
../../eth/p2p/discoveryv5/protocol as discv5_protocol,
|
../../eth/p2p/discoveryv5/protocol as discv5_protocol,
|
||||||
./discv5_test_helper
|
./discv5_test_helper
|
||||||
|
|
||||||
@ -607,15 +607,15 @@ suite "Discovery v5 Tests":
|
|||||||
|
|
||||||
let (packet, _) = encodeMessagePacket(rng[], codec,
|
let (packet, _) = encodeMessagePacket(rng[], codec,
|
||||||
receiveNode.localNode.id, receiveNode.localNode.address.get(), @[])
|
receiveNode.localNode.id, receiveNode.localNode.address.get(), @[])
|
||||||
receiveNode.receive(a, packet)
|
receiveNode.transport.receive(a, packet)
|
||||||
|
|
||||||
# Checking different nodeIds but same address
|
# Checking different nodeIds but same address
|
||||||
check receiveNode.codec.handshakes.len == 5
|
check receiveNode.transport.codec.handshakes.len == 5
|
||||||
# TODO: Could get rid of the sleep by storing the timeout future of the
|
# TODO: Could get rid of the sleep by storing the timeout future of the
|
||||||
# handshake
|
# handshake
|
||||||
await sleepAsync(handshakeTimeout)
|
await sleepAsync(handshakeTimeout)
|
||||||
# Checking handshake cleanup
|
# Checking handshake cleanup
|
||||||
check receiveNode.codec.handshakes.len == 0
|
check receiveNode.transport.codec.handshakes.len == 0
|
||||||
|
|
||||||
await receiveNode.closeWait()
|
await receiveNode.closeWait()
|
||||||
|
|
||||||
@ -637,15 +637,15 @@ suite "Discovery v5 Tests":
|
|||||||
let a = localAddress(20303 + i)
|
let a = localAddress(20303 + i)
|
||||||
let (packet, _) = encodeMessagePacket(rng[], codec,
|
let (packet, _) = encodeMessagePacket(rng[], codec,
|
||||||
receiveNode.localNode.id, receiveNode.localNode.address.get(), @[])
|
receiveNode.localNode.id, receiveNode.localNode.address.get(), @[])
|
||||||
receiveNode.receive(a, packet)
|
receiveNode.transport.receive(a, packet)
|
||||||
|
|
||||||
# Checking different nodeIds but same address
|
# Checking different nodeIds but same address
|
||||||
check receiveNode.codec.handshakes.len == 5
|
check receiveNode.transport.codec.handshakes.len == 5
|
||||||
# TODO: Could get rid of the sleep by storing the timeout future of the
|
# TODO: Could get rid of the sleep by storing the timeout future of the
|
||||||
# handshake
|
# handshake
|
||||||
await sleepAsync(handshakeTimeout)
|
await sleepAsync(handshakeTimeout)
|
||||||
# Checking handshake cleanup
|
# Checking handshake cleanup
|
||||||
check receiveNode.codec.handshakes.len == 0
|
check receiveNode.transport.codec.handshakes.len == 0
|
||||||
|
|
||||||
await receiveNode.closeWait()
|
await receiveNode.closeWait()
|
||||||
|
|
||||||
@ -669,15 +669,15 @@ suite "Discovery v5 Tests":
|
|||||||
for i in 0 ..< 5:
|
for i in 0 ..< 5:
|
||||||
let (packet, requestNonce) = encodeMessagePacket(rng[], codec,
|
let (packet, requestNonce) = encodeMessagePacket(rng[], codec,
|
||||||
receiveNode.localNode.id, receiveNode.localNode.address.get(), @[])
|
receiveNode.localNode.id, receiveNode.localNode.address.get(), @[])
|
||||||
receiveNode.receive(a, packet)
|
receiveNode.transport.receive(a, packet)
|
||||||
if i == 0:
|
if i == 0:
|
||||||
firstRequestNonce = requestNonce
|
firstRequestNonce = requestNonce
|
||||||
|
|
||||||
# Check handshake duplicates
|
# Check handshake duplicates
|
||||||
check receiveNode.codec.handshakes.len == 1
|
check receiveNode.transport.codec.handshakes.len == 1
|
||||||
# Check if it is for the first packet that a handshake is stored
|
# Check if it is for the first packet that a handshake is stored
|
||||||
let key = HandshakeKey(nodeId: sendNode.id, address: a)
|
let key = HandshakeKey(nodeId: sendNode.id, address: a)
|
||||||
check receiveNode.codec.handshakes[key].whoareyouData.requestNonce ==
|
check receiveNode.transport.codec.handshakes[key].whoareyouData.requestNonce ==
|
||||||
firstRequestNonce
|
firstRequestNonce
|
||||||
|
|
||||||
await receiveNode.closeWait()
|
await receiveNode.closeWait()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user