Implemented the request/response future resolving logic

This commit is contained in:
Zahary Karadjov 2018-07-06 20:20:44 +03:00
parent 72016046fb
commit 9057d18abe
1 changed files with 272 additions and 65 deletions

View File

@ -9,8 +9,9 @@
#
import
tables, macros, sets, algorithm, logging, hashes, times, random,
rlp, ranges/[stackarrays, ptr_arith], nimcrypto, asyncdispatch2,
tables, deques, macros, sets, algorithm, hashes, times, random, options,
asyncdispatch2, asyncdispatch2/timer,
rlp, ranges/[stackarrays, ptr_arith], nimcrypto, chronicles,
eth_keys, eth_common,
kademlia, discovery, auth, rlpxcrypt, enode
@ -32,16 +33,21 @@ type
discovery: DiscoveryProtocol
peerPool: PeerPool
OutstandingRequest = object
reqId: int
future: FutureBase
timeoutAt: uint64
Peer* = ref object
transp: StreamTransport
dispatcher: Dispatcher
networkId: int
nextRequestId: int
nextReqId: int
network: NetworkConnection
secretsState: SecretState
connectionState: ConnectionState
protocolStates: seq[RootRef]
remote*: Node
protocolStates: seq[RootRef]
outstandingRequests*: seq[Deque[OutstandingRequest]]
PeerPool* = ref object
keyPair: KeyPair
@ -55,11 +61,15 @@ type
listenPort*: Port
MessageHandler* = proc(x: Peer, data: Rlp): Future[void]
MessageContentPrinter* = proc(msg: pointer): string
MessageFutureResolver* = proc(msg: pointer, future: FutureBase)
MessageInfo* = object
id*: int
name*: string
thunk*: MessageHandler
printer*: MessageContentPrinter
futureResolver*: MessageFutureResolver
CapabilityName* = array[3, char]
@ -85,10 +95,10 @@ type
# (for this particular connection). If the other peer doesn't support the
# particular protocol, the stored offset is -1.
#
# `thunks` holds a mapping from valid message IDs to their handler procs.
# `messages` holds a mapping from valid message IDs to their handler procs.
#
protocolOffsets: seq[int]
thunks: seq[MessageHandler]
messages: seq[ptr MessageInfo]
RlpxMessageKind* = enum
rlpxNotification,
@ -101,8 +111,12 @@ type
MalformedMessageError* = object of Exception
logScope:
topic = "rlpx"
const
baseProtocolVersion = 4
defaultReqTimeout = 10000
var
gProtocols: seq[ProtocolInfo]
@ -119,7 +133,8 @@ template devp2pProtocolInfo: auto = {.gcsafe.}: devp2p
# Dispatcher
#
proc `$`*(p: Peer): string {.inline.} = $p.remote
proc `$`*(p: Peer): string {.inline.} =
$p.remote
proc hash(d: Dispatcher): int =
hash(d.protocolOffsets)
@ -135,7 +150,7 @@ proc describeProtocols(d: Dispatcher): string =
for c in rlpxProtocols[i].name: result.add(c)
proc getDispatcher(otherPeerCapabilities: openarray[Capability]): Dispatcher =
# XXX: sub-optimal solution until progress is made here:
# TODO: sub-optimal solution until progress is made here:
# https://github.com/nim-lang/Nim/issues/7457
# We should be able to find an existing dispatcher without allocating a new one
@ -163,14 +178,14 @@ proc getDispatcher(otherPeerCapabilities: openarray[Capability]): Dispatcher =
else:
template copyTo(src, dest; index: int) =
for i in 0 ..< src.len:
dest[index + i] = src[i].thunk
dest[index + i] = addr src[i]
result.thunks = newSeq[MessageHandler](nextUserMsgId)
devp2pProtocolInfo.messages.copyTo(result.thunks, 0)
result.messages = newSeq[ptr MessageInfo](nextUserMsgId)
devp2pProtocolInfo.messages.copyTo(result.messages, 0)
for i in 0 ..< rlpxProtocols.len:
if result.protocolOffsets[i] != -1:
rlpxProtocols[i].messages.copyTo(result.thunks, result.protocolOffsets[i])
rlpxProtocols[i].messages.copyTo(result.messages, result.protocolOffsets[i])
gDispatchers.incl result
@ -195,12 +210,42 @@ proc cmp*(lhs, rhs: ProtocolInfo): int {.inline.} =
return int16(lhs.name[i]) - int16(rhs.name[i])
return 0
proc messagePrinter[MsgType](msg: pointer): string =
result = $(cast[ptr MsgType](msg)[])
proc messageFutureResolver[MsgType](msg: pointer, future: FutureBase) =
var f = Future[Option[MsgType]](future)
if not f.finished:
if msg != nil:
f.complete some(cast[ptr MsgType](msg)[])
else:
f.complete none(MsgType)
else:
# This future was already resolved, but let's do some sanity checks
# here. The only reasonable explanation is that the request should
# have timed out.
if msg != nil:
if f.read.isSome:
doAssert false, "trying to resolve a request twice"
else:
doAssert false, "trying to resolve a timed out request with a value"
else:
if not f.read.isSome:
doAssert false, "a request timed out twice"
proc registerMsg(protocol: var ProtocolInfo,
id: int, name: string, thunk: MessageHandler) =
protocol.messages.add MessageInfo(id: id, name: name, thunk: thunk)
id: int, name: string,
thunk: MessageHandler,
printer: MessageContentPrinter,
futureResolver: MessageFutureResolver) =
protocol.messages.add MessageInfo(id: id,
name: name,
thunk: thunk,
printer: printer,
futureResolver: futureResolver)
proc registerProtocol(protocol: ProtocolInfo) =
# XXX: This can be done at compile-time in the future
# TODO: This can be done at compile-time in the future
if protocol.version > 0:
if gProtocols.isNil: gProtocols = @[]
if gCapabilities.isNil: gCapabilities = @[]
@ -227,8 +272,8 @@ proc dispatchMsg(peer: Peer, msgId: int, msgData: var Rlp): Future[void] =
"RLPx message with an invalid id " & $msgId &
" on a connection supporting " & peer.dispatcher.describeProtocols)
if msgId >= peer.dispatcher.thunks.len: invalidIdError()
let thunk = peer.dispatcher.thunks[msgId]
if msgId >= peer.dispatcher.messages.len: invalidIdError()
let thunk = peer.dispatcher.messages[msgId].thunk
if thunk == nil: invalidIdError()
return thunk(peer, msgData)
@ -240,8 +285,105 @@ proc sendMsg(p: Peer, data: BytesRange): Future[int] =
var cipherText = encryptMsg(data, p.secretsState)
return p.transp.write(cipherText)
proc sendRequest(p: Peer, data: BytesRange, ResponseType: type): Future[ResponseType] =
discard
proc registerRequest(peer: Peer,
timeout: int,
responseFuture: FutureBase,
responseMsgId: int): int =
result = peer.nextReqId
inc peer.nextReqId
let timeoutAt = fastEpochTime() + uint64(timeout)
let req = OutstandingRequest(reqId: result,
future: responseFuture,
timeoutAt: timeoutAt)
peer.outstandingRequests[responseMsgId].addLast req
# XXX: is this safe?
let futureResolver = peer.dispatcher.messages[responseMsgId].futureResolver
proc timeoutExpired(udata: pointer) = futureResolver(nil, responseFuture)
addTimer(timeoutAt, timeoutExpired, nil)
proc resolveResponseFuture(peer: Peer, msgId: int, msg: pointer, reqId: int) =
logScope:
msg = peer.dispatcher.messages[msgId].name
msgContents = peer.dispatcher.messages[msgId].printer(msg)
receivedReqId = reqId
remotePeer = peer.remote
template resolve(future) =
peer.dispatcher.messages[msgId].futureResolver(msg, future)
template outstandingReqs: auto =
peer.outstandingRequests[msgId]
if reqId == -1:
# XXX: This is a response from an ETH-like protocol that doesn't feature
# request IDs. Handling the response is quite tricky here because this may
# be a late response to an already timed out request or a valid response
# from a more recent one.
#
# We can increase the robustness by recording enough features of the
# request so we can recognize the matching response, but this is not very
# easy to do because our peers are allowed to send partial responses.
#
# A more generally robust approach is to maintain a set of the wanted
# data items and then to periodically look for items that have been
# requested long time ago, but are still missing. New requests can be
# issues for such items potentially from another random peer.
var expiredRequests = 0
for req in outstandingReqs:
if not req.future.finished: break
inc expiredRequests
outstandingReqs.shrink(fromFront = expiredRequests)
if outstandingReqs.len > 0:
let oldestReq = outstandingReqs.popFirst
assert oldestReq.reqId == -1
resolve oldestReq.future
else:
debug "late or duplicate reply for a RLPx request"
else:
# TODO: This is not completely sound because we are still using a global
# `reqId` sequence (the problem is that we might get a response ID that
# matches a request ID for a different type of request). To make the code
# correct, we can use a separate sequence per response type, but we have
# to first verify that the other Ethereum clients are supporting this
# correctly (because then, we'll be reusing the same reqIds for different
# types of requests). Alternatively, we can assign a separate interval in
# the `reqId` space for each type of response.
if reqId >= peer.nextReqId:
warn "RLPx response without a matching request"
return
var idx = 0
while idx < outstandingReqs.len:
template req: auto = outstandingReqs()[idx]
if req.future.finished:
assert req.timeoutAt < fastEpochTime()
# Here we'll remove the expired request by swapping
# it with the last one in the deque (if necessary):
if idx != outstandingReqs.len - 1:
req = outstandingReqs.popLast
else:
outstandingReqs.shrink(fromEnd = 1)
# This was the last item, so we don't have any
# more work to do:
return
if req.reqId == reqId:
resolve req.future
# Here we'll remove the found request by swapping
# it with the last one in the deque (if necessary):
if idx != outstandingReqs.len - 1:
req = outstandingReqs.popLast
else:
outstandingReqs.shrink(fromEnd = 1)
return
inc idx
debug "late or duplicate reply for a RLPx request"
proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} =
## This procs awaits the next complete RLPx message in the TCP stream
@ -296,17 +438,12 @@ proc nextMsg*(peer: Peer, MsgType: typedesc): Future[MsgType] {.async.} =
var (nextMsgId, nextMsgData) = await peer.recvMsg()
# echo "got msg(", nextMsgId, "): ", nextMsgData.inspect
if nextMsgData.listLen != 0:
# TODO: this should be `enterList`
nextMsgData = nextMsgData.listElem(0)
await peer.dispatchMsg(nextMsgId, nextMsgData)
if nextMsgId == wantedId:
return nextMsgData.read(MsgType)
proc registerRequest(peer: Peer, responseFuture: FutureBase): uint =
discard
proc resolveResponseFuture(peer: Peer, msgId: int, msg: pointer, reqID: uint) =
discard
iterator typedParams(n: NimNode, skip = 0): (NimNode, NimNode) =
for i in (1 + skip) ..< n.params.len:
let paramNodes = n.params[i]
@ -342,6 +479,14 @@ template networkState*(connection: Peer, Protocol: typedesc): untyped =
## particular connection.
cast[ref Protocol.NetworkState](connection.getNetworkState(Protocol.protocolInfo))
proc popTimeoutParam(n: NimNode): NimNode =
var lastParam = n.params[^1]
if eqIdent(lastParam[0], "timeout"):
if lastParam[2].kind == nnkEmpty:
macros.error "You must specify a default value for the `timeout` parameter", lastParam
result = lastParam
n.params.del(n.params.len - 1)
macro rlpxProtocol*(protoIdentifier: untyped,
version: static[int],
body: untyped): untyped =
@ -359,8 +504,11 @@ macro rlpxProtocol*(protoIdentifier: untyped,
finish = bindSym "finish"
append = bindSym "append"
sendMsg = bindSym "sendMsg"
sendRequest = bindSym "sendRequest"
Peer = bindSym "Peer"
Option = bindSym "Option"
# XXX: Binding the int type causes instantiation failure for some reason
# Int = bindSym "int"
Int = newIdentNode "int"
writeMsgId = bindSym "writeMsgId"
resolveResponseFuture = bindSym "resolveResponseFuture"
registerRequest = bindSym "registerRequest"
@ -371,6 +519,8 @@ macro rlpxProtocol*(protoIdentifier: untyped,
stateType: NimNode = nil
networkStateType: NimNode = nil
useRequestIds = true
messagePrinter = bindSym "messagePrinter"
messageFutureResolver = bindSym "messageFutureResolver"
# By convention, all Ethereum protocol names must be abbreviated to 3 letters
assert protoName.len == 3
@ -388,8 +538,10 @@ macro rlpxProtocol*(protoIdentifier: untyped,
# variables used in the sending procs
msgRecipient = genSym(nskParam, "msgRecipient")
reqTimeout: NimNode
rlpWriter = genSym(nskVar, "writer")
appendParams = newNimNode(nnkStmtList)
sentReqId = genSym(nskLet, "reqId")
# variables used in the receiving procs
msgSender = genSym(nskParam, "msgSender")
@ -416,29 +568,41 @@ macro rlpxProtocol*(protoIdentifier: untyped,
case msgKind
of rlpxNotification: discard
of rlpxRequest:
# If the request proc has a default timeout specified, remove it from
# the signature for now so we can generate the `thunk` proc without it.
# The parameter will be added back later only for to the sender proc.
# When the timeout is not specified, we use a default one.
reqTimeout = popTimeoutParam(n)
if reqTimeout == nil:
reqTimeout = newTree(nnkIdentDefs,
genSym(nskParam, "timeout"),
Int, newLit(defaultReqTimeout))
# Each request is registered so we can resolve it when the response
# arrives. There are two types of protocols: LES-like protocols use
# explicit `reqID` sent over the wire, while the ETH wire protocol
# explicit `reqId` sent over the wire, while the ETH wire protocol
# assumes there is one outstanding request at a time (if there are
# multiple requests we'll resolve them in FIFO order).
let registerRequestCall = newCall(registerRequest, msgRecipient,
reqTimeout[0],
resultIdent,
newLit(responseMsgId))
if useRequestIds:
inc paramCount
appendParams.add quote do:
`append`(`rlpWriter`, `registerRequest`(`msgRecipient`,
`resultIdent`,
`responseMsgId`))
new `resultIdent`
let `sentReqId` = `registerRequestCall`
`append`(`rlpWriter`, `sentReqId`)
else:
appendParams.add quote do:
discard `registerRequest`(`msgRecipient`,
`resultIdent`,
`responseMsgId`)
discard `registerRequestCall`
of rlpxResponse:
if useRequestIds:
var reqId = genSym(nskLet, "reqId")
# Messages using request Ids
readParams.add quote do:
let `reqId` = `read`(`receivedRlp`, uint)
let `reqId` = `read`(`receivedRlp`, int)
callResolvedResponseFuture.add quote do:
`resolveResponseFuture`(`msgSender`, `msgId`, addr(`receivedMsg`), `reqId`)
@ -528,10 +692,13 @@ macro rlpxProtocol*(protoIdentifier: untyped,
# TODO: check that the first param has the correct type
msgSendProc.params[1][0] = msgRecipient
# We change the return type of the proc to a Future.
# Add a timeout parameter for all request procs
if msgKind == rlpxRequest: msgSendProc.params.add reqTimeout
# We change the return type of the sending proc to a Future.
# If this is a request proc, the future will return the response record.
let rt = if msgKind == rlpxRequest: responseRecord
else: newIdentNode("int")
let rt = if msgKind != rlpxRequest: Int
else: newTree(nnkBracketExpr, Option, responseRecord)
msgSendProc.params[0] = newTree(nnkBracketExpr, newIdentNode("Future"), rt)
let writeMsgId = if isSubprotocol:
@ -539,11 +706,18 @@ macro rlpxProtocol*(protoIdentifier: untyped,
else:
quote: `append`(`rlpWriter`, `msgId`)
let sendProc = if msgKind == rlpxRequest: sendRequest else: sendMsg
var sendCall = newCall(sendProc, msgRecipient, newCall(finish, rlpWriter))
if msgKind == rlpxRequest:
sendCall.add(responseRecord)
var sendCall = newCall(sendMsg, msgRecipient, newCall(finish, rlpWriter))
let senderEpilogue = if msgKind == rlpxRequest:
# In RLPx requests, the returned future was allocated here and passed
# to `registerRequest`. It's already assigned to the result variable
# of the proc, so we just wait for the sending operation to complete
# and we return in a normal way. (the waiting is done, so we can catch
# any possible errors).
quote: discard waitFor(`sendCall`)
else:
# In normal RLPx messages, we are returning the future returned by the
# `sendMsg` call.
quote: return `sendCall`
# let paramCountNode = newLit(paramCount)
msgSendProc.body = quote do:
@ -551,14 +725,18 @@ macro rlpxProtocol*(protoIdentifier: untyped,
`writeMsgId`
`startList`(`rlpWriter`, `paramCount`)
`appendParams`
return `sendCall`
`senderEpilogue`
finalOutput.add msgSendProc
msgThunksAndRegistrations.add newCall(bindSym("registerMsg"),
msgThunksAndRegistrations.add(
newCall(bindSym("registerMsg"),
protocol,
newIntLitNode(msgId),
newStrLitNode($n.name),
thunkName)
thunkName,
newTree(nnkBracketExpr, messagePrinter, msgRecord),
newTree(nnkBracketExpr, messageFutureResolver, msgRecord)))
result = finalOutput
result.add quote do:
@ -580,12 +758,12 @@ macro rlpxProtocol*(protoIdentifier: untyped,
if n.len == 2 and n[1].kind == nnkIntLit:
nextId = n[1].intVal.int
else:
error("nextID expects a single int value", n)
macros.error("nextID expects a single int value", n)
elif eqIdent(n[0], "requestResponse"):
# `requestResponse` can be given a block of 2 or more procs.
# The last one is considered to be a response message, while
# all preceeding ones are requests triggering the response.
# The system makes sure to automatically insert a hidden `reqID`
# The system makes sure to automatically insert a hidden `reqId`
# parameter used to discriminate the individual messages.
block processReqResp:
if n.len == 2 and n[1].kind == nnkStmtList:
@ -609,15 +787,15 @@ macro rlpxProtocol*(protoIdentifier: untyped,
# we got all the way to here, so everything is fine.
# break the block so it doesn't reach the error call below
break processReqResp
error("requestResponse expects a block with at least two proc definitions")
macros.error("requestResponse expects a block with at least two proc definitions")
else:
error(repr(n) & " is not a recognized call in RLPx protocol definitions", n)
macros.error(repr(n) & " is not a recognized call in RLPx protocol definitions", n)
of nnkAsgn:
if eqIdent(n[0], "useRequestIds"):
useRequestIds = $n[1] == "true"
else:
error(repr(n[0]) & " is not a recognized protocol option")
macros.error(repr(n[0]) & " is not a recognized protocol option")
of nnkTypeSection:
result.add n
@ -637,7 +815,7 @@ macro rlpxProtocol*(protoIdentifier: untyped,
`networkStateType`
else:
error("The only type names allowed within a RLPx protocol definition are 'State' and 'NetworkState'")
macros.error("The only type names allowed within a RLPx protocol definition are 'State' and 'NetworkState'")
of nnkProcDef:
@ -645,7 +823,7 @@ macro rlpxProtocol*(protoIdentifier: untyped,
inc nextId
else:
error("illegal syntax in a RLPx protocol definition", n)
macros.error("illegal syntax in a RLPx protocol definition", n)
result.add(msgThunksAndRegistrations)
result.add newCall(bindSym("registerProtocol"), protocol)
@ -700,10 +878,21 @@ proc check(status: AuthStatus) =
proc connectionEstablished(p: Peer, h: p2p.hello) =
p.dispatcher = getDispatcher(h.capabilities)
# The dispatcher has determined our message ID sequence.
# For each message ID, we allocate a potential slot for
# tracking responses to requests.
# (yes, some of the slots won't be used).
p.outstandingRequests.newSeq(p.dispatcher.messages.len)
for d in mitems(p.outstandingRequests): d = initDeque[OutstandingRequest](0)
p.nextReqId = 1
# p.id = h.nodeId
p.connectionState = Connected
newSeq(p.protocolStates, rlpxProtocols.len)
# XXX: initialize the sub-protocol states
p.connectionState = Connected
# TODO: initialize the sub-protocol states
proc initSecretState(hs: var Handshake, authMsg, ackMsg: openarray[byte],
p: Peer) =
@ -747,7 +936,7 @@ proc rlpxConnect*(remote: Node, myKeys: KeyPair, listenPort: Port,
# if handshake.remoteHPubkey != remote.node.pubKey:
# raise newException(Exception, "Remote pubkey is wrong")
discard result.hello(baseProtocolVersion, clientId, rlpxCapabilities,
asyncCheck result.hello(baseProtocolVersion, clientId, rlpxCapabilities,
uint(listenPort), myKeys.pubkey.getRaw())
var response = await result.waitSingleMsg(p2p.hello)
@ -852,7 +1041,7 @@ proc connect(p: PeerPool, remote: Node): Future[Peer] {.async.} =
## Connect to the given remote and return a Peer instance when successful.
## Returns nil if the remote is unreachable, times out or is useless.
if remote in p.connectedNodes:
debug "Skipping ", remote, "; already connected to it"
debug "skipping_connection_to_already_connected_peer", remote
return nil
result = await remote.rlpxConnect(p.keyPair, p.listenPort, p.clientId)
@ -917,7 +1106,7 @@ proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} =
proc maybeConnectToMorePeers(p: PeerPool) {.async.} =
## Connect to more peers if we're not yet connected to at least self.minPeers.
if p.connectedNodes.len >= p.minPeers:
debug "Already connected to enough peers: ", p.connectedNodes, "; sleeping"
debug "pool already connected to enough peers (sleeping)", count = p.connectedNodes
return
if p.lastLookupTime + lookupInterval < epochTime():
@ -1020,8 +1209,17 @@ when isMainModule:
type State = object
peerName: string
requestResponse:
proc aaaReq(p: Peer, n: int) =
echo "got req ", n
proc aaaRes(p: Peer, data: string) =
echo "got response ", data
proc hi(p: Peer, name: string) =
p.state.peerName = name
var r = await p.aaaReq(10)
echo r.get.data
rlpxProtocol bbb, 1:
type State = object
@ -1035,8 +1233,17 @@ when isMainModule:
proc bar(p: Peer, i: int, s: string)
requestResponse:
proc bbbReq(p: Peer, n: int, timeout = 3000) =
echo "got req ", n
proc bbbRes(p: Peer, data: string) =
echo "got response ", data
var p = Peer()
discard p.bar(10, "test")
var resp = waitFor p.bbbReq(10)
echo "B response: ", resp.get.data
when false:
# The assignments below can be used to investigate if the RLPx procs