Share more code between the libp2p backends

This commit is contained in:
Zahary Karadjov 2019-06-03 20:07:50 +03:00
parent 3b166be166
commit 87601a5eae
No known key found for this signature in database
GPG Key ID: C8936F8A3073D609
5 changed files with 139 additions and 146 deletions

View File

@ -18,6 +18,7 @@ when networkBackend == rlpxBackend:
const
netBackendName* = "rlpx"
IrrelevantNetwork* = UselessPeer
type
Eth2Node* = EthereumNode
@ -116,22 +117,30 @@ when networkBackend == rlpxBackend:
node.peerPool.len
else:
import libp2p/daemon/daemonapi
import
random,
libp2p/daemon/daemonapi, eth/async_utils,
ssz
when networkBackend == libp2pSpecBackend:
import libp2p_spec_backend
export libp2p_spec_backend
const
BreachOfProtocol* = FaultOrError
netBackendName* = "libp2p_spec"
else:
import libp2p_backend
export libp2p_backend
const
netBackendName* = "libp2p_native"
type
BootstrapAddr* = PeerInfo
Eth2NodeIdentity* = PeerInfo
const
netBackendName* = "libp2p"
proc writeValue*(writer: var JsonWriter, value: PeerID) {.inline.} =
writer.writeValue value.pretty
@ -193,3 +202,28 @@ else:
func peersCount*(node: Eth2Node): int =
node.peers.len
proc makeMessageHandler[MsgType](msgHandler: proc(msg: MsgType)): P2PPubSubCallback =
result = proc(api: DaemonAPI,
ticket: PubsubTicket,
msg: PubSubMessage): Future[bool] {.async.} =
msgHandler SSZ.decode(msg.data, MsgType)
return true
proc subscribe*[MsgType](node: Eth2Node,
topic: string,
msgHandler: proc(msg: MsgType)) {.async.} =
discard await node.daemon.pubsubSubscribe(topic, makeMessageHandler(msgHandler))
proc broadcast*(node: Eth2Node, topic: string, msg: auto) =
traceAsyncErrors node.daemon.pubsubPublish(topic, SSZ.encode(msg))
# TODO:
# At the moment, this is just a compatiblity shim for the existing RLPx functionality.
# The filtering is not implemented properly yet.
iterator randomPeers*(node: Eth2Node, maxPeers: int, Protocol: type): Peer =
var peers = newSeq[Peer]()
for _, peer in pairs(node.peers): peers.add peer
shuffle peers
if peers.len > maxPeers: peers.setLen(maxPeers)
for p in peers: yield p

View File

@ -1,8 +1,8 @@
import
options, macros, algorithm, random, tables,
options, macros, algorithm, tables,
std_shims/[macros_shim, tables_shims], chronos, chronicles,
libp2p/daemon/daemonapi, faststreams/output_stream, serialization,
eth/async_utils, eth/p2p/p2p_protocol_dsl,
eth/p2p/p2p_protocol_dsl,
ssz
export
@ -64,11 +64,11 @@ type
Disconnecting,
Disconnected
UntypedResponse = object
UntypedResponder = object
peer*: Peer
stream*: P2PStream
Responder*[MsgType] = distinct UntypedResponse
Responder*[MsgType] = distinct UntypedResponder
Bytes = seq[byte]
@ -274,7 +274,7 @@ template getRecipient(stream: P2PStream): P2PStream =
stream
template getRecipient(response: Responder): Peer =
UntypedResponse(response).peer
UntypedResponder(response).peer
proc initProtocol(name: string,
peerInit: PeerStateInitializer,
@ -308,28 +308,30 @@ proc registerProtocol(protocol: ProtocolInfo) =
gProtocols[i].index = i
proc getRequestProtoName(fn: NimNode): NimNode =
# `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes
# (TODO: file as an issue)
when true:
return newLit("rpc/" & $fn.name)
else:
# `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes
# (TODO: file as an issue)
let pragmas = fn.pragma
if pragmas.kind == nnkPragma and pragmas.len > 0:
for pragma in pragmas:
if pragma.len > 0 and $pragma[0] == "libp2pProtocol":
return pragma[1]
let pragmas = fn.pragma
if pragmas.kind == nnkPragma and pragmas.len > 0:
for pragma in pragmas:
if pragma.len > 0 and $pragma[0] == "libp2pProtocol":
return pragma[1]
error "All stream opening procs must have the 'libp2pProtocol' pragma specified.", fn
error "All stream opening procs must have the 'libp2pProtocol' pragma specified.", fn
template libp2pProtocol*(name, version: string) {.pragma.}
proc init*[MsgType](T: type Responder[MsgType],
peer: Peer, stream: P2PStream): T =
T(UntypedResponder(peer: peer, stream: stream))
proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
var
response = ident "response"
name_openStream = newTree(nnkPostfix, ident("*"), ident"openStream")
outputStream = ident "outputStream"
currentProtocolSym = ident "CurrentProtocol"
Format = ident "SSZ"
Option = bindSym "Option"
UntypedResponse = bindSym "UntypedResponse"
UntypedResponder = bindSym "UntypedResponder"
Responder = bindSym "Responder"
DaemonAPI = bindSym "DaemonAPI"
P2PStream = ident "P2PStream"
@ -345,6 +347,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
getRecipient = bindSym "getRecipient"
peerFromStream = bindSym "peerFromStream"
makeEth2Request = bindSym "makeEth2Request"
handshakeImpl = bindSym "handshakeImpl"
sendMsg = bindSym "sendMsg"
sendBytes = bindSym "sendBytes"
resolveNextMsgFutures = bindSym "resolveNextMsgFutures"
@ -358,9 +361,10 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
recordStartMemo = ident"recordStartMemo"
receivedMsg = ident "msg"
daemon = ident "daemon"
stream = ident "stream"
streamVar = ident "stream"
await = ident "await"
peerIdent = ident "peer"
p.useRequestIds = false
new result
@ -386,79 +390,42 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
ResponseRecord = if msg.response != nil: msg.response.recIdent else: nil
userPragmas = n.pragma
if n.body.kind != nnkEmpty and msg.kind == msgRequest:
# Request procs need an extra param - the stream where the response
# should be written:
msg.userHandler.params.insert(1, newIdentDefs(streamVar, P2PStream))
msg.initResponderCall.add streamVar
let awaitUserHandler = msg.genAwaitUserHandler(receivedMsg, [streamVar, peerVar])
let tracing = when tracingEnabled:
quote do: logReceivedMsg(`streamVar`.peer, `receivedMsg`.get)
else:
newStmtList()
let requestDataTimeout = newCall(milliseconds, newLit(defaultIncomingReqTimeout))
let thunkName = ident(msgName & "_thunk")
var thunkProc = quote do:
proc `thunkName`(`daemon`: `DaemonAPI`, `streamVar`: `P2PStream`) {.async, gcsafe.} =
var `receivedMsg` = `await` readMsg(`streamVar`, `msgRecName`, `requestDataTimeout`)
if `receivedMsg`.isNone:
# TODO: This peer is misbehaving, perhaps we should penalize him somehow
return
let `peerVar` = `peerFromStream`(`daemon`, `streamVar`)
`tracing`
`awaitUserHandler`
`resolveNextMsgFutures`(`peerVar`, get(`receivedMsg`))
protocol.outRecvProcs.add thunkProc
var
# variables used in the sending procs
appendParams = newNimNode(nnkStmtList)
paramsToWrite = newSeq[NimNode](0)
# variables used in the receiving procs
tracing = newNimNode(nnkStmtList)
# nodes to store the user-supplied message handling proc if present
userHandlerProc: NimNode = nil
userHandlerCall: NimNode = nil
awaitUserHandler = newStmtList()
if n.body.kind != nnkEmpty:
# This is the call to the user supplied handler.
# Here we add only the initial params, the rest will be added later.
userHandlerCall = newCall(msg.userHandler.name)
# When there is a user handler, it must be awaited in the thunk proc.
# Above, by default `awaitUserHandler` is set to a no-op statement list.
awaitUserHandler = newCall(await, userHandlerCall)
var extraDefs: NimNode
if msgKind == msgRequest:
# Request procs need an extra param - the stream where the response
# should be written:
msg.userHandler.params.insert(1, newIdentDefs(stream, P2PStream))
userHandlerCall.add stream
let peer = msg.userHandler.params[2][0]
extraDefs = quote do:
# Jump through some hoops to work aroung
# https://github.com/nim-lang/Nim/issues/6248
let `response` = `Responder`[`ResponseRecord`](
`UntypedResponse`(peer: `peer`, stream: `stream`))
# Resolve the Eth2Peer from the LibP2P data received in the thunk
userHandlerCall.add peerIdent
msg.userHandler.addPreludeDefs extraDefs
protocol.outRecvProcs.add msg.userHandler
elif msgName == "status":
#awaitUserHandler = quote do:
# `await` `handshake`(`peerIdent`, `stream`)
discard
# TODO: revisit this
for param, paramType in n.typedParams(skip = 1):
paramsToWrite.add param
# If there is user message handler, we'll place a call to it by
# unpacking the fields of the received message:
if userHandlerCall != nil:
userHandlerCall.add quote do: get(`receivedMsg`).`param` # newDotExpr(newCall("get", receivedMsg), param)
when tracingEnabled:
tracing = quote do:
logReceivedMsg(`stream`.peer, `receivedMsg`.get)
let requestDataTimeout = newCall(milliseconds, newLit(defaultIncomingReqTimeout))
let thunkName = ident(msgName & "_thunk")
var thunkProc = quote do:
proc `thunkName`(`daemon`: `DaemonAPI`, `stream`: `P2PStream`) {.async, gcsafe.} =
var `receivedMsg` = `await` readMsg(`stream`, `msgRecName`, `requestDataTimeout`)
if `receivedMsg`.isNone:
# TODO: This peer is misbehaving, perhaps we should penalize him somehow
return
let `peerIdent` = `peerFromStream`(`daemon`, `stream`)
`tracing`
`awaitUserHandler`
`resolveNextMsgFutures`(`peerIdent`, get(`receivedMsg`))
protocol.outRecvProcs.add thunkProc
var msgSendProc = n
let msgSendProcName = n.name
protocol.outSendProcs.add msgSendProc
@ -498,7 +465,6 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
paramsArray = newTree(nnkBracket).appendAllParams(handshakeExchanger.def)
bindSym = ident "bindSym"
getAst = ident "getAst"
handshakeImpl = ident "handshakeImpl"
# TODO: macros.body triggers an assertion error when the proc type is nnkMacroDef
handshakeExchanger.def[6] = quote do:
@ -558,7 +524,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
else:
quote: `sendMsg`(`msgRecipient`, `msgProto`, `msgBytes`)
else:
quote: `sendBytes`(`UntypedResponse`(`sendTo`).stream, `msgBytes`)
quote: `sendBytes`(`UntypedResponder`(`sendTo`).stream, `msgBytes`)
msgSendProc.body = quote do:
let `msgRecipient` = `getRecipient`(`sendTo`)
@ -578,26 +544,3 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
result.implementProtocolInit = proc (p: P2PProtocol): NimNode =
return newCall(initProtocol, newLit(p.name), p.peerInit, p.netInit)
proc makeMessageHandler[MsgType](msgHandler: proc(msg: MsgType)): P2PPubSubCallback =
result = proc(api: DaemonAPI, ticket: PubsubTicket, msg: PubSubMessage): Future[bool] {.async.} =
msgHandler SSZ.decode(msg.data, MsgType)
return true
proc subscribe*[MsgType](node: EthereumNode,
topic: string,
msgHandler: proc(msg: MsgType)) {.async.} =
discard await node.daemon.pubsubSubscribe(topic, makeMessageHandler(msgHandler))
proc broadcast*(node: Eth2Node, topic: string, msg: auto) =
traceAsyncErrors node.daemon.pubsubPublish(topic, SSZ.encode(msg))
# TODO:
# At the moment, this is just a compatiblity shim for the existing RLPx functionality.
# The filtering is not implemented properly yet.
iterator randomPeers*(node: EthereumNode, maxPeers: int, Protocol: type): Peer =
var peers = newSeq[Peer]()
for _, peer in pairs(node.peers): peers.add peer
shuffle peers
if peers.len > maxPeers: peers.setLen(maxPeers)
for p in peers: yield p

View File

@ -5,7 +5,7 @@ import
ssz
export
daemonapi, p2pProtocol, ssz
daemonapi, p2pProtocol, serialization, ssz
const
# Compression nibble
@ -37,11 +37,11 @@ type
FaultOrError
CompressedMsgId = tuple
protocolIndex, msgId: int
protocolIdx, methodId: int
ResponderWithId*[MsgType] = object
peer*: Peer
id*: int
reqId*: uint64
Response*[MsgType] = distinct Peer
@ -125,7 +125,7 @@ type
msgData: ByteStreamVar): Future[void] {.gcsafe.}
MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.}
NextMsgResolver* = proc(msgData: SszReader, future: FutureBase) {.gcsafe.}
NextMsgResolver* = proc(msg: pointer, future: FutureBase) {.gcsafe.}
RequestResolver* = proc(msg: pointer, future: FutureBase) {.gcsafe.}
Bytes = seq[byte]
@ -149,7 +149,7 @@ proc `$`*(peer: Peer): string = $peer.id
proc readPackedObject(stream: P2PStream, T: type): Future[T] {.async.} =
await stream.transp.readExactly(addr result, sizeof result)
proc appendPackedObject*(stream: OutputStreamVar, value: auto) =
proc appendPackedObject(stream: OutputStreamVar, value: auto) =
let valueAsBytes = cast[ptr byte](unsafeAddr(value))
stream.append makeOpenArray(valueAsBytes, sizeof(value))
@ -189,6 +189,11 @@ proc init*(node: Eth2Node) {.async.} =
await node.daemon.addHandler(@[beaconChainProtocol], handleConnectingBeaconChainPeer)
proc getCompressedMsgId*(MsgType: type): CompressedMsgId =
mixin msgId, msgProtocol, protocolInfo
(protocolIdx: MsgType.msgProtocol.protocolInfo.index,
methodId: MsgType.msgId)
proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer =
new result
result.id = id
@ -201,6 +206,10 @@ proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer =
if proto.peerStateInitializer != nil:
result.protocolStates[i] = proto.peerStateInitializer(result)
proc init*[MsgName](T: type ResponderWithId[MsgName],
peer: Peer, reqId: uint64): T =
T(peer: peer, reqId: reqId)
proc performProtocolHandshakes*(peer: Peer) {.async.} =
var subProtocolsHandshakes = newSeqOfCap[Future[void]](allProtocols.len)
for protocol in allProtocols:
@ -301,11 +310,11 @@ proc sendMsg*[T](responder: ResponderWithId[T], data: Bytes): Future[void] =
return sendMsg(responder.peer, data)
proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] =
## This procs awaits a specific RLPx message.
## This procs awaits a specific P2P message.
## Any messages received while waiting will be dispatched to their
## respective handlers. The designated message handler will also run
## to completion before the future returned by `nextMsg` is resolved.
let wantedId = peer.perPeerMsgId(MsgType)
let wantedId = MsgType.getCompressedMsgId
let f = peer.awaitedMessages[wantedId]
if not f.isNil:
return Future[MsgType](f)
@ -328,10 +337,6 @@ proc dispatchMessages*(peer: Peer, protocol: ProtocolInfo, stream: P2PStream):
if status == Success: continue
return status
proc nextMsgResolver[MsgType](msgData: ByteStreamVar, future: FutureBase) {.gcsafe.} =
var reader = msgData
Future[MsgType](future).complete reader.readRecordType(MsgType, MsgType.rlpFieldsCount > 1)
proc registerRequest(peer: Peer,
protocol: ProtocolInfo,
timeout: Duration,
@ -351,14 +356,13 @@ proc registerRequest(peer: Peer,
addTimer(timeoutAt, timeoutExpired, nil)
proc resolveResponseFuture(peer: Peer, protocol: ProtocolInfo,
proc resolvePendingFutures(peer: Peer, protocol: ProtocolInfo,
methodId: int, msg: pointer, reqId: uint64) =
when false:
logScope:
msg = peer.dispatcher.messages[methodId].name
msgContents = peer.dispatcher.messages[methodId].printer(msg)
receivedReqId = reqId
remotePeer = peer.remote
logScope:
msg = protocol.dispatcher.messages[methodId].name
msgContents = protocol.dispatcher.messages[methodId].printer(msg)
receivedReqId = reqId
remotePeer = peer.id
template resolve(future) =
(protocol.dispatcher.messages[methodId].requestResolver)(msg, future)
@ -366,6 +370,12 @@ proc resolveResponseFuture(peer: Peer, protocol: ProtocolInfo,
template outstandingReqs: auto =
peer.outstandingRequests[methodId]
let msgId = (protocolIdx: protocol.index, methodId: methodId)
if peer.awaitedMessages[msgId] != nil:
let msgInfo = protocol.dispatcher.messages[methodId]
msgInfo.nextMsgResolver(msg, peer.awaitedMessages[msgId])
peer.awaitedMessages[msgId] = nil
# TODO: This is not completely sound because we are still using a global
# `reqId` sequence (the problem is that we might get a response ID that
# matches a request ID for a different type of request). To make the code
@ -407,7 +417,7 @@ proc resolveResponseFuture(peer: Peer, protocol: ProtocolInfo,
inc idx
debug "late or duplicate reply for a RLPx request"
debug "late or duplicate reply for a network request"
proc initProtocol(name: string, version: int,
peerInit: PeerStateInitializer,
@ -529,9 +539,9 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
mount = bindSym "mount"
messagePrinter = bindSym "messagePrinter"
nextMsgResolver = bindSym "nextMsgResolver"
resolveFuture = bindSym "resolveFuture"
requestResolver = bindSym "requestResolver"
resolveResponseFuture = bindSym "resolveResponseFuture"
resolvePendingFutures = bindSym "resolvePendingFutures"
nextMsg = bindSym "nextMsg"
initProtocol = bindSym "initProtocol"
registerMsg = bindSym "registerMsg"
@ -558,6 +568,9 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
p.useRequestIds = true
result.ResponderType = ResponderWithId
result.afterProtocolInit = proc (p: P2PProtocol) =
p.onPeerConnected.params.add newIdentDefs(ident"handshakeStream", P2PStream)
result.implementMsg = proc (msg: Message) =
var
msgIdLit = newLit(msg.id)
@ -574,8 +587,10 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
else:
newStmtList()
let callResolvedResponseFuture = if msg.kind == msgResponse:
newCall(resolveResponseFuture, peerVar, msgIdLit, newCall("addr", receivedMsg), reqIdVar)
let callResolvePendingFutures = if msg.kind == msgResponse:
newCall(resolvePendingFutures,
peerVar, protocol.protocolInfoVar,
msgIdLit, newCall("addr", receivedMsg), reqIdVar)
else:
newStmtList()
@ -595,7 +610,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
var `receivedMsg` = `mount`(`Format`, `msgContents`, `msgRecName`)
`traceMsg`
`awaitUserHandler`
`callResolvedResponseFuture`
`callResolvePendingFutures`
protocol.outRecvProcs.add thunkProc
@ -618,7 +633,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
thunkName,
newTree(nnkBracketExpr, messagePrinter, msgRecName),
newTree(nnkBracketExpr, requestResolver, msgRecName),
newTree(nnkBracketExpr, nextMsgResolver, msgRecName)))
newTree(nnkBracketExpr, resolveFuture, msgRecName)))
result.implementProtocolInit = proc (protocol: P2PProtocol): NimNode =
return newCall(initProtocol,

View File

@ -42,7 +42,8 @@ proc init*(T: type SszReader, stream: ByteStreamVar): T =
proc mount*(F: type SSZ, stream: ByteStreamVar, T: type): T =
mixin readValue
init(SszReader, stream).readValue(T)
var reader = init(SszReader, stream)
reader.readValue(T)
func toSSZType(x: Slot|Epoch): auto = x.uint64
func toSSZType(x: auto): auto = x

View File

@ -92,7 +92,7 @@ p2pProtocol BeaconSync(version = 1,
timeout = 10.seconds)
if m.networkId != networkId:
await peer.disconnect(UselessPeer)
await peer.disconnect(IrrelevantNetwork)
return
# TODO: onPeerConnected runs unconditionally for every connected peer, but we
@ -140,7 +140,7 @@ p2pProtocol BeaconSync(version = 1,
latestFinalizedRoot: Eth2Digest,
latestFinalizedEpoch: Epoch,
bestRoot: Eth2Digest,
bestSlot: Slot) {.libp2pProtocol("hello", "1.0.0").}
bestSlot: Slot)
proc sendGoodbye(peer: Peer, reason: DisconnectionReason)
@ -163,7 +163,7 @@ p2pProtocol BeaconSync(version = 1,
proc getBeaconBlockRoots(
peer: Peer,
fromSlot: Slot,
maxRoots: int) {.libp2pProtocol("rpc/beacon_block_roots", "1.0.0").} =
maxRoots: int) =
let maxRoots = min(MaxRootsToRequest, maxRoots)
var s = fromSlot
var roots = newSeqOfCap[(Eth2Digest, Slot)](maxRoots)
@ -185,7 +185,7 @@ p2pProtocol BeaconSync(version = 1,
slot: Slot,
maxHeaders: int,
skipSlots: int,
backward: uint8) {.libp2pProtocol("rpc/beacon_block_headers", "1.0.0").} =
backward: uint8) =
let maxHeaders = min(MaxHeadersToRequest, maxHeaders)
var headers: seq[BeaconBlockHeader]
let db = peer.networkState.db
@ -235,7 +235,7 @@ p2pProtocol BeaconSync(version = 1,
requestResponse:
proc getAncestorBlocks(
peer: Peer,
needed: openarray[FetchRecord]) {.libp2pProtocol("rpc/ancestor_blocks", "1.0.0").} =
needed: openarray[FetchRecord]) =
var resp = newSeqOfCap[BeaconBlock](needed.len)
let db = peer.networkState.db
var neededRoots = initSet[Eth2Digest]()
@ -270,7 +270,7 @@ p2pProtocol BeaconSync(version = 1,
requestResponse:
proc getBeaconBlockBodies(
peer: Peer,
blockRoots: openarray[Eth2Digest]) {.libp2pProtocol("rpc/beacon_block_bodies", "1.0.0").} =
blockRoots: openarray[Eth2Digest]) =
# TODO: Validate blockRoots.len
var bodies = newSeqOfCap[BeaconBlockBody](blockRoots.len)
let db = peer.networkState.db