Reform the networking layer in order to handle the new stricter SSZ API
This commit is contained in:
parent
a8003e7e38
commit
833f19e942
|
@ -2,19 +2,20 @@ FixtureSSZGeneric-mainnet
|
|||
===
|
||||
## Official - SSZ generic types
|
||||
```diff
|
||||
+ **Skipping** bitlist inputs - valid - skipped altogether OK
|
||||
Testing basic_vector inputs - invalid - skipping Vector[uint128, N] and Vector[uint256, N] Skip
|
||||
+ Testing basic_vector inputs - valid - skipping Vector[uint128, N] and Vector[uint256, N] OK
|
||||
+ Testing bitlist inputs - invalid OK
|
||||
+ Testing bitlist inputs - valid OK
|
||||
Testing bitvector inputs - invalid Skip
|
||||
+ Testing bitvector inputs - valid OK
|
||||
+ Testing boolean inputs - invalid OK
|
||||
+ Testing boolean inputs - valid OK
|
||||
+ Testing containers inputs - invalid - skipping VarTestStruct, ComplexTestStruct, BitsStr OK
|
||||
+ Testing containers inputs - valid - skipping VarTestStruct, ComplexTestStruct, BitsStruc OK
|
||||
+ Testing containers inputs - invalid - skipping BitsStruct OK
|
||||
+ Testing containers inputs - valid - skipping BitsStruct OK
|
||||
+ Testing uints inputs - invalid - skipping uint128 and uint256 OK
|
||||
+ Testing uints inputs - valid - skipping uint128 and uint256 OK
|
||||
```
|
||||
OK: 9/11 Fail: 0/11 Skip: 2/11
|
||||
OK: 10/12 Fail: 0/12 Skip: 2/12
|
||||
|
||||
---TOTAL---
|
||||
OK: 9/11 Fail: 0/11 Skip: 2/11
|
||||
OK: 10/12 Fail: 0/12 Skip: 2/12
|
||||
|
|
|
@ -1139,3 +1139,4 @@ programMain:
|
|||
let navigator = DynamicSszNavigator.init(bytes, BeaconState)
|
||||
|
||||
echo navigator.navigatePath(pathFragments[1 .. ^1]).toJson
|
||||
|
||||
|
|
|
@ -85,12 +85,20 @@ type
|
|||
Disconnecting,
|
||||
Disconnected
|
||||
|
||||
UntypedResponder = object
|
||||
UntypedResponse = ref object
|
||||
peer*: Peer
|
||||
stream*: Connection
|
||||
noSnappy*: bool
|
||||
writtenChunks*: int
|
||||
|
||||
Responder*[MsgType] = distinct UntypedResponder
|
||||
SingleChunkResponse*[MsgType] = distinct UntypedResponse
|
||||
## Protocol requests using this type will produce request-making
|
||||
## client-side procs that return `NetRes[MsgType]`
|
||||
|
||||
MultipleChunksResponse*[MsgType] = distinct UntypedResponse
|
||||
## Protocol requests using this type will produce request-making
|
||||
## client-side procs that return `NetRes[seq[MsgType]]`.
|
||||
## In the future, such procs will return an `InputStream[NetRes[MsgType]]`.
|
||||
|
||||
MessageInfo* = object
|
||||
name*: string
|
||||
|
@ -354,16 +362,24 @@ proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.
|
|||
finally:
|
||||
await safeClose(stream)
|
||||
|
||||
proc sendResponseChunkBytes(responder: UntypedResponder, payload: Bytes) {.async.} =
|
||||
await responder.stream.writeChunk(some Success, payload, responder.noSnappy)
|
||||
proc sendResponseChunkBytes(response: UntypedResponse, payload: Bytes) {.async.} =
|
||||
inc response.writtenChunks
|
||||
await response.stream.writeChunk(some Success, payload, response.noSnappy)
|
||||
|
||||
proc sendResponseChunkObj(responder: UntypedResponder, val: auto) {.async.} =
|
||||
await responder.stream.writeChunk(some Success, SSZ.encode(val),
|
||||
responder.noSnappy)
|
||||
proc sendResponseChunkObj(response: UntypedResponse, val: auto) {.async.} =
|
||||
inc response.writtenChunks
|
||||
await response.stream.writeChunk(some Success, SSZ.encode(val), response.noSnappy)
|
||||
|
||||
proc sendResponseChunks[T](responder: UntypedResponder, chunks: seq[T]) {.async.} =
|
||||
for chunk in chunks:
|
||||
await sendResponseChunkObj(responder, chunk)
|
||||
template sendUserHandlerResultAsChunkImpl*(stream: Connection,
|
||||
noSnappy: bool,
|
||||
handlerResultFut: Future): untyped =
|
||||
let handlerRes = await handlerResultFut
|
||||
writeChunk(stream, some Success, SSZ.encode(handlerRes), noSnappy)
|
||||
|
||||
template sendUserHandlerResultAsChunkImpl*(stream: Connection,
|
||||
noSnappy: bool,
|
||||
handlerResult: auto): untyped =
|
||||
writeChunk(stream, some Success, SSZ.encode(handlerResult), noSnappy)
|
||||
|
||||
when useNativeSnappy:
|
||||
include faststreams_backend
|
||||
|
@ -403,24 +419,20 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
|
|||
finally:
|
||||
await safeClose(stream)
|
||||
|
||||
proc init*[MsgType](T: type Responder[MsgType],
|
||||
proc init*[MsgType](T: type MultipleChunksResponse[MsgType],
|
||||
peer: Peer, conn: Connection, noSnappy: bool): T =
|
||||
T(UntypedResponder(peer: peer, stream: conn, noSnappy: noSnappy))
|
||||
T(UntypedResponse(peer: peer, stream: conn, noSnappy: noSnappy))
|
||||
|
||||
template write*[M](r: var Responder[M], val: auto): auto =
|
||||
mixin send
|
||||
type Msg = M
|
||||
type MsgRec = RecType(Msg)
|
||||
when MsgRec is seq|openarray:
|
||||
type E = ElemType(MsgRec)
|
||||
when val is E:
|
||||
sendResponseChunkObj(UntypedResponder(r), val)
|
||||
elif val is MsgRec:
|
||||
sendResponseChunks(UntypedResponder(r), val)
|
||||
else:
|
||||
{.fatal: "Unepected message type".}
|
||||
else:
|
||||
send(r, val)
|
||||
proc init*[MsgType](T: type SingleChunkResponse[MsgType],
|
||||
peer: Peer, conn: Connection, noSnappy: bool): T =
|
||||
T(UntypedResponse(peer: peer, stream: conn, noSnappy: noSnappy))
|
||||
|
||||
template write*[M](r: MultipleChunksResponse[M], val: M): untyped =
|
||||
sendResponseChunkObj(UntypedResponse(r), val)
|
||||
|
||||
template send*[M](r: SingleChunkResponse[M], val: auto): untyped =
|
||||
doAssert UntypedResponse(r).writtenChunks == 0
|
||||
sendResponseChunkObj(UntypedResponse(r), val)
|
||||
|
||||
proc performProtocolHandshakes*(peer: Peer) {.async.} =
|
||||
var subProtocolsHandshakes = newSeqOfCap[Future[void]](allProtocols.len)
|
||||
|
@ -457,23 +469,21 @@ proc setEventHandlers(p: ProtocolInfo,
|
|||
proc implementSendProcBody(sendProc: SendProc) =
|
||||
let
|
||||
msg = sendProc.msg
|
||||
UntypedResponder = bindSym "UntypedResponder"
|
||||
UntypedResponse = bindSym "UntypedResponse"
|
||||
|
||||
proc sendCallGenerator(peer, bytes: NimNode): NimNode =
|
||||
if msg.kind != msgResponse:
|
||||
let msgProto = getRequestProtoName(msg.procDef)
|
||||
case msg.kind
|
||||
of msgRequest:
|
||||
let
|
||||
timeout = msg.timeoutParam[0]
|
||||
ResponseRecord = msg.response.recName
|
||||
let ResponseRecord = msg.response.recName
|
||||
quote:
|
||||
makeEth2Request(`peer`, `msgProto`, `bytes`,
|
||||
`ResponseRecord`, `timeout`)
|
||||
`ResponseRecord`, `timeoutVar`)
|
||||
else:
|
||||
quote: sendNotificationMsg(`peer`, `msgProto`, `bytes`)
|
||||
else:
|
||||
quote: sendResponseChunkBytes(`UntypedResponder`(`peer`), `bytes`)
|
||||
quote: sendResponseChunkBytes(`UntypedResponse`(`peer`), `bytes`)
|
||||
|
||||
sendProc.useStandardBody(nil, nil, sendCallGenerator)
|
||||
|
||||
|
@ -562,7 +572,8 @@ proc handleIncomingStream(network: Eth2Node,
|
|||
|
||||
try:
|
||||
logReceivedMsg(peer, MsgType(msg.get))
|
||||
await callUserHandler(peer, conn, noSnappy, msg.get)
|
||||
let userHandlerFut = callUserHandler(MsgType, peer, conn, noSnappy, msg.get)
|
||||
await userHandlerFut
|
||||
except CatchableError as err:
|
||||
await sendErrorResponse(peer, conn, noSnappy, ServerError,
|
||||
ErrorMsg err.msg.toBytes)
|
||||
|
@ -761,7 +772,6 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
|||
var
|
||||
Format = ident "SSZ"
|
||||
Bool = bindSym "bool"
|
||||
Responder = bindSym "Responder"
|
||||
Connection = bindSym "Connection"
|
||||
Peer = bindSym "Peer"
|
||||
Eth2Node = bindSym "Eth2Node"
|
||||
|
@ -771,6 +781,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
|||
networkVar = ident "network"
|
||||
callUserHandler = ident "callUserHandler"
|
||||
noSnappyVar = ident "noSnappy"
|
||||
MSG = ident "MSG"
|
||||
|
||||
p.useRequestIds = false
|
||||
p.useSingleRecordInlining = true
|
||||
|
@ -783,12 +794,14 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
|||
result.setEventHandlers = bindSym "setEventHandlers"
|
||||
result.SerializationFormat = Format
|
||||
result.RequestResultsWrapper = ident "NetRes"
|
||||
result.ResponderType = Responder
|
||||
|
||||
result.afterProtocolInit = proc (p: P2PProtocol) =
|
||||
p.onPeerConnected.params.add newIdentDefs(streamVar, Connection)
|
||||
|
||||
result.implementMsg = proc (msg: p2p_protocol_dsl.Message) =
|
||||
if msg.kind == msgResponse:
|
||||
return
|
||||
|
||||
let
|
||||
protocol = msg.protocol
|
||||
msgName = $msg.ident
|
||||
|
@ -797,13 +810,6 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
|||
MsgStrongRecName = msg.strongRecName
|
||||
codecNameLit = getRequestProtoName(msg.procDef)
|
||||
|
||||
if msg.procDef.body.kind != nnkEmpty and msg.kind == msgRequest:
|
||||
# Request procs need an extra param - the stream where the response
|
||||
# should be written:
|
||||
msg.userHandler.params.insert(2, newIdentDefs(streamVar, Connection))
|
||||
msg.userHandler.params.insert(3, newIdentdefs(noSnappyVar, Bool))
|
||||
msg.initResponderCall.add [streamVar, noSnappyVar]
|
||||
|
||||
##
|
||||
## Implement the Thunk:
|
||||
##
|
||||
|
@ -817,15 +823,38 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
|||
## initialize the network object by creating handlers bound to the
|
||||
## specific network.
|
||||
##
|
||||
let
|
||||
protocolMounterName = ident(msgName & "_mounter")
|
||||
userHandlerCall = msg.genUserHandlerCall(
|
||||
msgVar, [peerVar, streamVar, noSnappyVar])
|
||||
|
||||
var mounter: NimNode
|
||||
if msg.userHandler != nil:
|
||||
var
|
||||
protocolMounterName = ident(msgName & "_mounter")
|
||||
userHandlerCall: NimNode
|
||||
OutputParamType = msg.outputParamType
|
||||
|
||||
if OutputParamType == nil:
|
||||
userHandlerCall = newCall(ident"sendUserHandlerResultAsChunkImpl",
|
||||
streamVar,
|
||||
noSnappyVar,
|
||||
msg.genUserHandlerCall(msgVar, [peerVar]))
|
||||
else:
|
||||
if OutputParamType.kind == nnkVarTy:
|
||||
OutputParamType = OutputParamType[0]
|
||||
|
||||
let isChunkStream = eqIdent(OutputParamType[0], "MultipleChunksResponse")
|
||||
msg.response.recName = if isChunkStream:
|
||||
newTree(nnkBracketExpr, ident"seq", OutputParamType[1])
|
||||
else:
|
||||
OutputParamType[1]
|
||||
|
||||
let responseVar = ident("response")
|
||||
userHandlerCall = newStmtList(
|
||||
newVarStmt(responseVar,
|
||||
newCall(ident"init", OutputParamType,
|
||||
peerVar, streamVar, noSnappyVar)),
|
||||
msg.genUserHandlerCall(msgVar, [peerVar], outputParam = responseVar))
|
||||
|
||||
protocol.outRecvProcs.add quote do:
|
||||
template `callUserHandler`(`peerVar`: `Peer`,
|
||||
template `callUserHandler`(`MSG`: type `MsgStrongRecName`,
|
||||
`peerVar`: `Peer`,
|
||||
`streamVar`: `Connection`,
|
||||
`noSnappyVar`: bool,
|
||||
`msgVar`: `MsgRecName`): untyped =
|
||||
|
@ -833,7 +862,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
|||
|
||||
proc `protocolMounterName`(`networkVar`: `Eth2Node`) =
|
||||
proc sszThunk(`streamVar`: `Connection`,
|
||||
proto: string): Future[void] {.gcsafe.} =
|
||||
`protocolVar`: string): Future[void] {.gcsafe.} =
|
||||
return handleIncomingStream(`networkVar`, `streamVar`, true,
|
||||
`MsgStrongRecName`)
|
||||
|
||||
|
@ -842,7 +871,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
|||
handler: sszThunk)
|
||||
|
||||
proc snappyThunk(`streamVar`: `Connection`,
|
||||
proto: string): Future[void] {.gcsafe.} =
|
||||
`protocolVar`: string): Future[void] {.gcsafe.} =
|
||||
return handleIncomingStream(`networkVar`, `streamVar`, false,
|
||||
`MsgStrongRecName`)
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ proc fetchAncestorBlocksFromPeer(
|
|||
# block to be stored in the FetchRecord, so we can ask for a range of
|
||||
# blocks starting N positions before this slot number.
|
||||
try:
|
||||
let blocks = await peer.beaconBlocksByRoot([rec.root])
|
||||
let blocks = await peer.beaconBlocksByRoot(BlockRootsList @[rec.root])
|
||||
if blocks.isOk:
|
||||
for b in blocks.get:
|
||||
responseHandler(b)
|
||||
|
@ -40,7 +40,7 @@ proc fetchAncestorBlocksFromNetwork(
|
|||
var peer: Peer
|
||||
try:
|
||||
peer = await network.peerPool.acquire()
|
||||
let blocks = await peer.beaconBlocksByRoot([rec.root])
|
||||
let blocks = await peer.beaconBlocksByRoot(BlockRootsList @[rec.root])
|
||||
if blocks.isOk:
|
||||
for b in blocks.get:
|
||||
responseHandler(b)
|
||||
|
|
|
@ -7,6 +7,12 @@ import
|
|||
logScope:
|
||||
topics = "sync"
|
||||
|
||||
const
|
||||
MAX_REQUESTED_BLOCKS = SLOTS_PER_EPOCH * 4
|
||||
# A boundary on the number of blocks we'll allow in any single block
|
||||
# request - typically clients will ask for an epoch or so at a time, but we
|
||||
# allow a little bit more in case they want to stream blocks faster
|
||||
|
||||
type
|
||||
StatusMsg* = object
|
||||
forkDigest*: ForkDigest
|
||||
|
@ -41,11 +47,7 @@ type
|
|||
blockRoot: Eth2Digest
|
||||
slot: Slot
|
||||
|
||||
const
|
||||
MAX_REQUESTED_BLOCKS = SLOTS_PER_EPOCH * 4
|
||||
# A boundary on the number of blocks we'll allow in any single block
|
||||
# request - typically clients will ask for an epoch or so at a time, but we
|
||||
# allow a little bit more in case they want to stream blocks faster
|
||||
BlockRootsList* = List[Eth2Digest, MAX_REQUESTED_BLOCKS]
|
||||
|
||||
proc shortLog*(s: StatusMsg): auto =
|
||||
(
|
||||
|
@ -83,11 +85,10 @@ proc handleStatus(peer: Peer,
|
|||
proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) {.gcsafe.}
|
||||
|
||||
p2pProtocol BeaconSync(version = 1,
|
||||
rlpxName = "bcs",
|
||||
networkState = BeaconSyncNetworkState,
|
||||
peerState = BeaconSyncPeerState):
|
||||
|
||||
onPeerConnected do (peer: Peer):
|
||||
onPeerConnected do (peer: Peer) {.async.}:
|
||||
if peer.wasDialed:
|
||||
let
|
||||
ourStatus = peer.networkState.getCurrentStatus()
|
||||
|
@ -101,80 +102,71 @@ p2pProtocol BeaconSync(version = 1,
|
|||
else:
|
||||
warn "Status response not received in time", peer = peer
|
||||
|
||||
requestResponse:
|
||||
proc status(peer: Peer, theirStatus: StatusMsg) {.libp2pProtocol("status", 1).} =
|
||||
let ourStatus = peer.networkState.getCurrentStatus()
|
||||
trace "Sending status message", peer = peer, status = ourStatus
|
||||
await response.send(ourStatus)
|
||||
await peer.handleStatus(peer.networkState, ourStatus, theirStatus)
|
||||
proc status(peer: Peer,
|
||||
theirStatus: StatusMsg,
|
||||
response: SingleChunkResponse[StatusMsg])
|
||||
{.async, libp2pProtocol("status", 1).} =
|
||||
let ourStatus = peer.networkState.getCurrentStatus()
|
||||
trace "Sending status message", peer = peer, status = ourStatus
|
||||
await response.send(ourStatus)
|
||||
await peer.handleStatus(peer.networkState, ourStatus, theirStatus)
|
||||
|
||||
proc statusResp(peer: Peer, msg: StatusMsg)
|
||||
proc ping(peer: Peer, value: uint64): uint64
|
||||
{.libp2pProtocol("ping", 1).} =
|
||||
return peer.network.metadata.seq_number
|
||||
|
||||
proc goodbye(peer: Peer, reason: DisconnectionReason) {.libp2pProtocol("goodbye", 1).}
|
||||
proc getMetadata(peer: Peer): Eth2Metadata
|
||||
{.libp2pProtocol("metadata", 1).} =
|
||||
return peer.network.metadata
|
||||
|
||||
requestResponse:
|
||||
proc ping(peer: Peer, value: uint64) {.libp2pProtocol("ping", 1).} =
|
||||
await response.write(peer.network.metadata.seq_number)
|
||||
proc beaconBlocksByRange(peer: Peer,
|
||||
startSlot: Slot,
|
||||
count: uint64,
|
||||
step: uint64,
|
||||
response: MultipleChunksResponse[SignedBeaconBlock])
|
||||
{.async, libp2pProtocol("beacon_blocks_by_range", 1).} =
|
||||
trace "got range request", peer, startSlot, count, step
|
||||
|
||||
proc pingResp(peer: Peer, value: uint64)
|
||||
|
||||
requestResponse:
|
||||
proc getMetadata(peer: Peer) {.libp2pProtocol("metadata", 1).} =
|
||||
await response.write(peer.network.metadata)
|
||||
|
||||
proc metadataReps(peer: Peer, metadata: Eth2Metadata)
|
||||
|
||||
requestResponse:
|
||||
proc beaconBlocksByRange(
|
||||
peer: Peer,
|
||||
startSlot: Slot,
|
||||
count: uint64,
|
||||
step: uint64) {.
|
||||
libp2pProtocol("beacon_blocks_by_range", 1).} =
|
||||
trace "got range request", peer, startSlot, count, step
|
||||
|
||||
if count > 0'u64:
|
||||
var blocks: array[MAX_REQUESTED_BLOCKS, BlockRef]
|
||||
let
|
||||
pool = peer.networkState.blockPool
|
||||
# Limit number of blocks in response
|
||||
count = min(count.Natural, blocks.len)
|
||||
|
||||
let
|
||||
endIndex = count - 1
|
||||
startIndex =
|
||||
pool.getBlockRange(startSlot, step, blocks.toOpenArray(0, endIndex))
|
||||
|
||||
for b in blocks[startIndex..endIndex]:
|
||||
doAssert not b.isNil, "getBlockRange should return non-nil blocks only"
|
||||
trace "wrote response block", slot = b.slot, roor = shortLog(b.root)
|
||||
await response.write(pool.get(b).data)
|
||||
|
||||
debug "Block range request done",
|
||||
peer, startSlot, count, step, found = count - startIndex
|
||||
|
||||
proc beaconBlocksByRoot(
|
||||
peer: Peer,
|
||||
blockRoots: openarray[Eth2Digest]) {.
|
||||
libp2pProtocol("beacon_blocks_by_root", 1).} =
|
||||
if count > 0'u64:
|
||||
var blocks: array[MAX_REQUESTED_BLOCKS, BlockRef]
|
||||
let
|
||||
pool = peer.networkState.blockPool
|
||||
count = min(blockRoots.len, MAX_REQUESTED_BLOCKS)
|
||||
# Limit number of blocks in response
|
||||
count = min(count.Natural, blocks.len)
|
||||
|
||||
var found = 0
|
||||
let
|
||||
endIndex = count - 1
|
||||
startIndex =
|
||||
pool.getBlockRange(startSlot, step, blocks.toOpenArray(0, endIndex))
|
||||
|
||||
for root in blockRoots[0..<count]:
|
||||
let blockRef = pool.getRef(root)
|
||||
if not isNil(blockRef):
|
||||
await response.write(pool.get(blockRef).data)
|
||||
inc found
|
||||
for b in blocks[startIndex..endIndex]:
|
||||
doAssert not b.isNil, "getBlockRange should return non-nil blocks only"
|
||||
trace "wrote response block", slot = b.slot, roor = shortLog(b.root)
|
||||
await response.write(pool.get(b).data)
|
||||
|
||||
debug "Block root request done",
|
||||
peer, roots = blockRoots.len, count, found
|
||||
debug "Block range request done",
|
||||
peer, startSlot, count, step, found = count - startIndex
|
||||
|
||||
proc beaconBlocks(
|
||||
peer: Peer,
|
||||
blocks: openarray[SignedBeaconBlock])
|
||||
proc beaconBlocksByRoot(peer: Peer,
|
||||
blockRoots: BlockRootsList,
|
||||
response: MultipleChunksResponse[SignedBeaconBlock])
|
||||
{.async, libp2pProtocol("beacon_blocks_by_root", 1).} =
|
||||
let
|
||||
pool = peer.networkState.blockPool
|
||||
count = blockRoots.len
|
||||
|
||||
var found = 0
|
||||
|
||||
for root in blockRoots[0..<count]:
|
||||
let blockRef = pool.getRef(root)
|
||||
if not isNil(blockRef):
|
||||
await response.write(pool.get(blockRef).data)
|
||||
inc found
|
||||
|
||||
debug "Block root request done",
|
||||
peer, roots = blockRoots.len, count, found
|
||||
|
||||
proc goodbye(peer: Peer, reason: DisconnectionReason) {.libp2pProtocol("goodbye", 1).}
|
||||
|
||||
proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) =
|
||||
debug "Peer status", peer, statusMsg
|
||||
|
|
|
@ -0,0 +1,359 @@
|
|||
|
||||
## Generated at line 87
|
||||
type
|
||||
BeaconSync* = object
|
||||
template State*(PROTO: type BeaconSync): type =
|
||||
ref[BeaconSyncPeerState:ObjectType]
|
||||
|
||||
template NetworkState*(PROTO: type BeaconSync): type =
|
||||
ref[BeaconSyncNetworkState:ObjectType]
|
||||
|
||||
type
|
||||
statusObj* = distinct StatusMsg
|
||||
template status*(PROTO: type BeaconSync): type =
|
||||
StatusMsg
|
||||
|
||||
template msgProtocol*(MSG: type statusObj): type =
|
||||
BeaconSync
|
||||
|
||||
template RecType*(MSG: type statusObj): untyped =
|
||||
StatusMsg
|
||||
|
||||
type
|
||||
pingObj* = distinct uint64
|
||||
template ping*(PROTO: type BeaconSync): type =
|
||||
uint64
|
||||
|
||||
template msgProtocol*(MSG: type pingObj): type =
|
||||
BeaconSync
|
||||
|
||||
template RecType*(MSG: type pingObj): untyped =
|
||||
uint64
|
||||
|
||||
type
|
||||
getMetadataObj* = object
|
||||
|
||||
template getMetadata*(PROTO: type BeaconSync): type =
|
||||
getMetadataObj
|
||||
|
||||
template msgProtocol*(MSG: type getMetadataObj): type =
|
||||
BeaconSync
|
||||
|
||||
template RecType*(MSG: type getMetadataObj): untyped =
|
||||
getMetadataObj
|
||||
|
||||
type
|
||||
beaconBlocksByRangeObj* = object
|
||||
startSlot*: Slot
|
||||
count*: uint64
|
||||
step*: uint64
|
||||
|
||||
template beaconBlocksByRange*(PROTO: type BeaconSync): type =
|
||||
beaconBlocksByRangeObj
|
||||
|
||||
template msgProtocol*(MSG: type beaconBlocksByRangeObj): type =
|
||||
BeaconSync
|
||||
|
||||
template RecType*(MSG: type beaconBlocksByRangeObj): untyped =
|
||||
beaconBlocksByRangeObj
|
||||
|
||||
type
|
||||
beaconBlocksByRootObj* = distinct BlockRootsList
|
||||
template beaconBlocksByRoot*(PROTO: type BeaconSync): type =
|
||||
BlockRootsList
|
||||
|
||||
template msgProtocol*(MSG: type beaconBlocksByRootObj): type =
|
||||
BeaconSync
|
||||
|
||||
template RecType*(MSG: type beaconBlocksByRootObj): untyped =
|
||||
BlockRootsList
|
||||
|
||||
type
|
||||
goodbyeObj* = distinct DisconnectionReason
|
||||
template goodbye*(PROTO: type BeaconSync): type =
|
||||
DisconnectionReason
|
||||
|
||||
template msgProtocol*(MSG: type goodbyeObj): type =
|
||||
BeaconSync
|
||||
|
||||
template RecType*(MSG: type goodbyeObj): untyped =
|
||||
DisconnectionReason
|
||||
|
||||
var BeaconSyncProtocolObj = initProtocol("BeaconSync", createPeerState[Peer,
|
||||
ref[BeaconSyncPeerState:ObjectType]], createNetworkState[Eth2Node,
|
||||
ref[BeaconSyncNetworkState:ObjectType]])
|
||||
var BeaconSyncProtocol = addr BeaconSyncProtocolObj
|
||||
template protocolInfo*(P`gensym187610300: type BeaconSync): auto =
|
||||
BeaconSyncProtocol
|
||||
|
||||
proc status*(peer: Peer; theirStatus: StatusMsg;
|
||||
timeout: Duration = milliseconds(10000'i64)): Future[NetRes[StatusMsg]] {.
|
||||
gcsafe, libp2pProtocol("status", 1).} =
|
||||
var outputStream = memoryOutput()
|
||||
var writer = init(WriterType(SSZ), outputStream)
|
||||
writeValue(writer, theirStatus)
|
||||
let msgBytes = getOutput(outputStream)
|
||||
makeEth2Request(peer, "/eth2/beacon_chain/req/status/1/", msgBytes, StatusMsg,
|
||||
timeout)
|
||||
|
||||
proc ping*(peer: Peer; value: uint64; timeout: Duration = milliseconds(10000'i64)): Future[
|
||||
NetRes[uint64]] {.gcsafe, libp2pProtocol("ping", 1).} =
|
||||
var outputStream = memoryOutput()
|
||||
var writer = init(WriterType(SSZ), outputStream)
|
||||
writeValue(writer, value)
|
||||
let msgBytes = getOutput(outputStream)
|
||||
makeEth2Request(peer, "/eth2/beacon_chain/req/ping/1/", msgBytes, uint64, timeout)
|
||||
|
||||
proc getMetadata*(peer: Peer; timeout: Duration = milliseconds(10000'i64)): Future[
|
||||
NetRes[Eth2Metadata]] {.gcsafe, libp2pProtocol("metadata", 1).} =
|
||||
var msgBytes: seq[byte]
|
||||
makeEth2Request(peer, "/eth2/beacon_chain/req/metadata/1/", msgBytes,
|
||||
Eth2Metadata, timeout)
|
||||
|
||||
proc beaconBlocksByRange*(peer: Peer; startSlot: Slot; count: uint64; step: uint64;
|
||||
timeout: Duration = milliseconds(10000'i64)): Future[
|
||||
NetRes[seq[SignedBeaconBlock]]] {.gcsafe, libp2pProtocol(
|
||||
"beacon_blocks_by_range", 1).} =
|
||||
var outputStream = memoryOutput()
|
||||
var writer = init(WriterType(SSZ), outputStream)
|
||||
var recordWriterCtx = beginRecord(writer, beaconBlocksByRangeObj)
|
||||
writeField(writer, recordWriterCtx, "startSlot", startSlot)
|
||||
writeField(writer, recordWriterCtx, "count", count)
|
||||
writeField(writer, recordWriterCtx, "step", step)
|
||||
endRecord(writer, recordWriterCtx)
|
||||
let msgBytes = getOutput(outputStream)
|
||||
makeEth2Request(peer, "/eth2/beacon_chain/req/beacon_blocks_by_range/1/",
|
||||
msgBytes, seq[SignedBeaconBlock], timeout)
|
||||
|
||||
proc beaconBlocksByRoot*(peer: Peer; blockRoots: BlockRootsList;
|
||||
timeout: Duration = milliseconds(10000'i64)): Future[
|
||||
NetRes[seq[SignedBeaconBlock]]] {.gcsafe,
|
||||
libp2pProtocol("beacon_blocks_by_root", 1).} =
|
||||
var outputStream = memoryOutput()
|
||||
var writer = init(WriterType(SSZ), outputStream)
|
||||
writeValue(writer, blockRoots)
|
||||
let msgBytes = getOutput(outputStream)
|
||||
makeEth2Request(peer, "/eth2/beacon_chain/req/beacon_blocks_by_root/1/",
|
||||
msgBytes, seq[SignedBeaconBlock], timeout)
|
||||
|
||||
proc goodbye*(peer: Peer; reason: DisconnectionReason): Future[void] {.gcsafe,
|
||||
libp2pProtocol("goodbye", 1).} =
|
||||
var outputStream = memoryOutput()
|
||||
var writer = init(WriterType(SSZ), outputStream)
|
||||
writeValue(writer, reason)
|
||||
let msgBytes = getOutput(outputStream)
|
||||
sendNotificationMsg(peer, "/eth2/beacon_chain/req/goodbye/1/", msgBytes)
|
||||
|
||||
proc statusUserHandler(peer: Peer; theirStatus: StatusMsg;
|
||||
response: SingleChunkResponse[StatusMsg]) {.async,
|
||||
libp2pProtocol("status", 1), gcsafe.} =
|
||||
type
|
||||
CurrentProtocol = BeaconSync
|
||||
template state(peer: Peer): ref[BeaconSyncPeerState:ObjectType] =
|
||||
cast[ref[BeaconSyncPeerState:ObjectType]](getState(peer, BeaconSyncProtocol))
|
||||
|
||||
template networkState(peer: Peer): ref[BeaconSyncNetworkState:ObjectType] =
|
||||
cast[ref[BeaconSyncNetworkState:ObjectType]](getNetworkState(peer.network,
|
||||
BeaconSyncProtocol))
|
||||
|
||||
let ourStatus = peer.networkState.getCurrentStatus()
|
||||
trace "Sending status message", peer = peer, status = ourStatus
|
||||
await response.send(ourStatus)
|
||||
await peer.handleStatus(peer.networkState, ourStatus, theirStatus)
|
||||
|
||||
proc pingUserHandler(peer: Peer; value: uint64): uint64 {.libp2pProtocol("ping", 1),
|
||||
gcsafe.} =
|
||||
type
|
||||
CurrentProtocol = BeaconSync
|
||||
template state(peer: Peer): ref[BeaconSyncPeerState:ObjectType] =
|
||||
cast[ref[BeaconSyncPeerState:ObjectType]](getState(peer, BeaconSyncProtocol))
|
||||
|
||||
template networkState(peer: Peer): ref[BeaconSyncNetworkState:ObjectType] =
|
||||
cast[ref[BeaconSyncNetworkState:ObjectType]](getNetworkState(peer.network,
|
||||
BeaconSyncProtocol))
|
||||
|
||||
return peer.network.metadata.seq_number
|
||||
|
||||
proc getMetadataUserHandler(peer: Peer): Eth2Metadata {.
|
||||
libp2pProtocol("metadata", 1), gcsafe.} =
|
||||
type
|
||||
CurrentProtocol = BeaconSync
|
||||
template state(peer: Peer): ref[BeaconSyncPeerState:ObjectType] =
|
||||
cast[ref[BeaconSyncPeerState:ObjectType]](getState(peer, BeaconSyncProtocol))
|
||||
|
||||
template networkState(peer: Peer): ref[BeaconSyncNetworkState:ObjectType] =
|
||||
cast[ref[BeaconSyncNetworkState:ObjectType]](getNetworkState(peer.network,
|
||||
BeaconSyncProtocol))
|
||||
|
||||
return peer.network.metadata
|
||||
|
||||
proc beaconBlocksByRangeUserHandler(peer: Peer; startSlot: Slot; count: uint64;
|
||||
step: uint64; response: MultipleChunksResponse[
|
||||
SignedBeaconBlock]) {.async, libp2pProtocol("beacon_blocks_by_range", 1), gcsafe.} =
|
||||
type
|
||||
CurrentProtocol = BeaconSync
|
||||
template state(peer: Peer): ref[BeaconSyncPeerState:ObjectType] =
|
||||
cast[ref[BeaconSyncPeerState:ObjectType]](getState(peer, BeaconSyncProtocol))
|
||||
|
||||
template networkState(peer: Peer): ref[BeaconSyncNetworkState:ObjectType] =
|
||||
cast[ref[BeaconSyncNetworkState:ObjectType]](getNetworkState(peer.network,
|
||||
BeaconSyncProtocol))
|
||||
|
||||
trace "got range request", peer, startSlot, count, step
|
||||
if count > 0'u64:
|
||||
var blocks: array[MAX_REQUESTED_BLOCKS, BlockRef]
|
||||
let
|
||||
pool = peer.networkState.blockPool
|
||||
count = min(count.Natural, blocks.len)
|
||||
let
|
||||
endIndex = count - 1
|
||||
startIndex = pool.getBlockRange(startSlot, step,
|
||||
blocks.toOpenArray(0, endIndex))
|
||||
for b in blocks[startIndex .. endIndex]:
|
||||
doAssert not b.isNil, "getBlockRange should return non-nil blocks only"
|
||||
trace "wrote response block", slot = b.slot, roor = shortLog(b.root)
|
||||
await response.write(pool.get(b).data)
|
||||
debug "Block range request done", peer, startSlot, count, step,
|
||||
found = count - startIndex
|
||||
|
||||
proc beaconBlocksByRootUserHandler(peer: Peer; blockRoots: BlockRootsList; response: MultipleChunksResponse[
|
||||
SignedBeaconBlock]) {.async, libp2pProtocol("beacon_blocks_by_root", 1), gcsafe.} =
|
||||
type
|
||||
CurrentProtocol = BeaconSync
|
||||
template state(peer: Peer): ref[BeaconSyncPeerState:ObjectType] =
|
||||
cast[ref[BeaconSyncPeerState:ObjectType]](getState(peer, BeaconSyncProtocol))
|
||||
|
||||
template networkState(peer: Peer): ref[BeaconSyncNetworkState:ObjectType] =
|
||||
cast[ref[BeaconSyncNetworkState:ObjectType]](getNetworkState(peer.network,
|
||||
BeaconSyncProtocol))
|
||||
|
||||
let
|
||||
pool = peer.networkState.blockPool
|
||||
count = blockRoots.len
|
||||
var found = 0
|
||||
for root in blockRoots[0 ..< count]:
|
||||
let blockRef = pool.getRef(root)
|
||||
if not isNil(blockRef):
|
||||
await response.write(pool.get(blockRef).data)
|
||||
inc found
|
||||
debug "Block root request done", peer, roots = blockRoots.len, count, found
|
||||
|
||||
template callUserHandler(MSG: type statusObj; peer: Peer; stream: Connection;
|
||||
noSnappy: bool; msg: StatusMsg): untyped =
|
||||
var response = init(SingleChunkResponse[StatusMsg], peer, stream, noSnappy)
|
||||
statusUserHandler(peer, msg, response)
|
||||
|
||||
proc status_mounter(network: Eth2Node) =
|
||||
proc sszThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} =
|
||||
return handleIncomingStream(network, stream, true, statusObj)
|
||||
|
||||
mount network.switch, LPProtocol(codec: "/eth2/beacon_chain/req/status/1/" &
|
||||
"ssz", handler: sszThunk)
|
||||
proc snappyThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} =
|
||||
return handleIncomingStream(network, stream, false, statusObj)
|
||||
|
||||
mount network.switch, LPProtocol(codec: "/eth2/beacon_chain/req/status/1/" &
|
||||
"ssz_snappy", handler: snappyThunk)
|
||||
|
||||
template callUserHandler(MSG: type pingObj; peer: Peer; stream: Connection;
|
||||
noSnappy: bool; msg: uint64): untyped =
|
||||
sendUserHandlerResultAsChunkImpl(stream, noSnappy, pingUserHandler(peer, msg))
|
||||
|
||||
proc ping_mounter(network: Eth2Node) =
|
||||
proc sszThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} =
|
||||
return handleIncomingStream(network, stream, true, pingObj)
|
||||
|
||||
mount network.switch, LPProtocol(codec: "/eth2/beacon_chain/req/ping/1/" & "ssz",
|
||||
handler: sszThunk)
|
||||
proc snappyThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} =
|
||||
return handleIncomingStream(network, stream, false, pingObj)
|
||||
|
||||
mount network.switch, LPProtocol(codec: "/eth2/beacon_chain/req/ping/1/" &
|
||||
"ssz_snappy", handler: snappyThunk)
|
||||
|
||||
template callUserHandler(MSG: type getMetadataObj; peer: Peer; stream: Connection;
|
||||
noSnappy: bool; msg: getMetadataObj): untyped =
|
||||
sendUserHandlerResultAsChunkImpl(stream, noSnappy, getMetadataUserHandler(peer))
|
||||
|
||||
proc getMetadata_mounter(network: Eth2Node) =
|
||||
proc sszThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} =
|
||||
return handleIncomingStream(network, stream, true, getMetadataObj)
|
||||
|
||||
mount network.switch, LPProtocol(codec: "/eth2/beacon_chain/req/metadata/1/" &
|
||||
"ssz", handler: sszThunk)
|
||||
proc snappyThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} =
|
||||
return handleIncomingStream(network, stream, false, getMetadataObj)
|
||||
|
||||
mount network.switch, LPProtocol(codec: "/eth2/beacon_chain/req/metadata/1/" &
|
||||
"ssz_snappy", handler: snappyThunk)
|
||||
|
||||
template callUserHandler(MSG: type beaconBlocksByRangeObj; peer: Peer;
|
||||
stream: Connection; noSnappy: bool;
|
||||
msg: beaconBlocksByRangeObj): untyped =
|
||||
var response = init(MultipleChunksResponse[SignedBeaconBlock], peer, stream,
|
||||
noSnappy)
|
||||
beaconBlocksByRangeUserHandler(peer, msg.startSlot, msg.count, msg.step, response)
|
||||
|
||||
proc beaconBlocksByRange_mounter(network: Eth2Node) =
|
||||
proc sszThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} =
|
||||
return handleIncomingStream(network, stream, true, beaconBlocksByRangeObj)
|
||||
|
||||
mount network.switch, LPProtocol(codec: "/eth2/beacon_chain/req/beacon_blocks_by_range/1/" &
|
||||
"ssz", handler: sszThunk)
|
||||
proc snappyThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} =
|
||||
return handleIncomingStream(network, stream, false, beaconBlocksByRangeObj)
|
||||
|
||||
mount network.switch, LPProtocol(codec: "/eth2/beacon_chain/req/beacon_blocks_by_range/1/" &
|
||||
"ssz_snappy", handler: snappyThunk)
|
||||
|
||||
template callUserHandler(MSG: type beaconBlocksByRootObj; peer: Peer;
|
||||
stream: Connection; noSnappy: bool; msg: BlockRootsList): untyped =
|
||||
var response = init(MultipleChunksResponse[SignedBeaconBlock], peer, stream,
|
||||
noSnappy)
|
||||
beaconBlocksByRootUserHandler(peer, msg, response)
|
||||
|
||||
proc beaconBlocksByRoot_mounter(network: Eth2Node) =
|
||||
proc sszThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} =
|
||||
return handleIncomingStream(network, stream, true, beaconBlocksByRootObj)
|
||||
|
||||
mount network.switch, LPProtocol(codec: "/eth2/beacon_chain/req/beacon_blocks_by_root/1/" &
|
||||
"ssz", handler: sszThunk)
|
||||
proc snappyThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} =
|
||||
return handleIncomingStream(network, stream, false, beaconBlocksByRootObj)
|
||||
|
||||
mount network.switch, LPProtocol(codec: "/eth2/beacon_chain/req/beacon_blocks_by_root/1/" &
|
||||
"ssz_snappy", handler: snappyThunk)
|
||||
|
||||
registerMsg(BeaconSyncProtocol, "status", status_mounter,
|
||||
"/eth2/beacon_chain/req/status/1/")
|
||||
registerMsg(BeaconSyncProtocol, "ping", ping_mounter,
|
||||
"/eth2/beacon_chain/req/ping/1/")
|
||||
registerMsg(BeaconSyncProtocol, "getMetadata", getMetadata_mounter,
|
||||
"/eth2/beacon_chain/req/metadata/1/")
|
||||
registerMsg(BeaconSyncProtocol, "beaconBlocksByRange",
|
||||
beaconBlocksByRange_mounter,
|
||||
"/eth2/beacon_chain/req/beacon_blocks_by_range/1/")
|
||||
registerMsg(BeaconSyncProtocol, "beaconBlocksByRoot", beaconBlocksByRoot_mounter,
|
||||
"/eth2/beacon_chain/req/beacon_blocks_by_root/1/")
|
||||
registerMsg(BeaconSyncProtocol, "goodbye", nil, "/eth2/beacon_chain/req/goodbye/1/")
|
||||
proc BeaconSyncPeerConnected(peer: Peer; stream: Connection) {.async, gcsafe.} =
|
||||
type
|
||||
CurrentProtocol = BeaconSync
|
||||
template state(peer: Peer): ref[BeaconSyncPeerState:ObjectType] =
|
||||
cast[ref[BeaconSyncPeerState:ObjectType]](getState(peer, BeaconSyncProtocol))
|
||||
|
||||
template networkState(peer: Peer): ref[BeaconSyncNetworkState:ObjectType] =
|
||||
cast[ref[BeaconSyncNetworkState:ObjectType]](getNetworkState(peer.network,
|
||||
BeaconSyncProtocol))
|
||||
|
||||
if peer.wasDialed:
|
||||
let
|
||||
ourStatus = peer.networkState.getCurrentStatus()
|
||||
theirStatus = await peer.status(ourStatus, timeout = 60.seconds)
|
||||
if theirStatus.isOk:
|
||||
await peer.handleStatus(peer.networkState, ourStatus, theirStatus.get())
|
||||
else:
|
||||
warn "Status response not received in time", peer = peer
|
||||
|
||||
setEventHandlers(BeaconSyncProtocol, BeaconSyncPeerConnected, nil)
|
||||
registerProtocol(BeaconSyncProtocol)
|
|
@ -1 +1 @@
|
|||
Subproject commit 53166fd1ff28693d12044a7143d0508805c7ae8a
|
||||
Subproject commit 218192aa0900518ebc65a1d53efb3be0667628c5
|
|
@ -1 +1 @@
|
|||
Subproject commit a695d9e7bdd0cb0663ea05ef80952822a50ec928
|
||||
Subproject commit a333eb080f89a035415a17d583a651d40b309b12
|
Loading…
Reference in New Issue