Sending chunked responses
This commit is contained in:
parent
b120a60493
commit
9dec05f9c9
|
@ -6,7 +6,7 @@ import
|
||||||
libp2p_json_serialization, ssz
|
libp2p_json_serialization, ssz
|
||||||
|
|
||||||
export
|
export
|
||||||
daemonapi, p2pProtocol, libp2p_json_serialization
|
daemonapi, p2pProtocol, libp2p_json_serialization, ssz
|
||||||
|
|
||||||
type
|
type
|
||||||
Eth2Node* = ref object of RootObj
|
Eth2Node* = ref object of RootObj
|
||||||
|
@ -317,12 +317,32 @@ proc sendMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} =
|
||||||
if sent != requestBytes.len:
|
if sent != requestBytes.len:
|
||||||
raise newException(TransmissionError, "Failed to deliver msg bytes")
|
raise newException(TransmissionError, "Failed to deliver msg bytes")
|
||||||
|
|
||||||
proc sendResponseBytes(stream: P2PStream, bytes: Bytes) {.async.} =
|
proc sendResponseChunkBytes(stream: P2PStream, payload: Bytes) {.async.} =
|
||||||
var sent = await stream.transp.write(@[byte Success])
|
var s = init OutputStream
|
||||||
if sent != 1:
|
s.append byte(Success)
|
||||||
raise newException(TransmissionError, "Failed to deliver response code")
|
s.appendVarint payload.len
|
||||||
await writeSizePrefix(stream.transp, uint64(bytes.len))
|
s.append payload
|
||||||
sent = await stream.transp.write(bytes)
|
let bytes = s.getOutput
|
||||||
|
let sent = await stream.transp.write(bytes)
|
||||||
|
if sent != bytes.len:
|
||||||
|
raise newException(TransmissionError, "Failed to deliver all bytes")
|
||||||
|
|
||||||
|
proc sendResponseChunkObj(stream: P2PStream, val: auto) {.async.} =
|
||||||
|
var s = init OutputStream
|
||||||
|
s.append byte(Success)
|
||||||
|
s.appendValue SSZ, sizePrefixed(val)
|
||||||
|
let bytes = s.getOutput
|
||||||
|
let sent = await stream.transp.write(bytes)
|
||||||
|
if sent != bytes.len:
|
||||||
|
raise newException(TransmissionError, "Failed to deliver all bytes")
|
||||||
|
|
||||||
|
proc sendResponseChunks[T](stream: P2PStream, chunks: seq[T]) {.async.} =
|
||||||
|
var s = init OutputStream
|
||||||
|
for chunk in chunks:
|
||||||
|
s.append byte(Success)
|
||||||
|
s.appendValue SSZ, sizePrefixed(chunk)
|
||||||
|
let bytes = s.getOutput
|
||||||
|
let sent = await stream.transp.write(bytes)
|
||||||
if sent != bytes.len:
|
if sent != bytes.len:
|
||||||
raise newException(TransmissionError, "Failed to deliver all bytes")
|
raise newException(TransmissionError, "Failed to deliver all bytes")
|
||||||
|
|
||||||
|
@ -410,6 +430,25 @@ proc init*[MsgType](T: type Responder[MsgType],
|
||||||
peer: Peer, stream: P2PStream): T =
|
peer: Peer, stream: P2PStream): T =
|
||||||
T(UntypedResponder(peer: peer, stream: stream))
|
T(UntypedResponder(peer: peer, stream: stream))
|
||||||
|
|
||||||
|
import
|
||||||
|
typetraits
|
||||||
|
|
||||||
|
template write*[M](r: var Responder[M], val: auto): auto =
|
||||||
|
mixin send
|
||||||
|
type Msg = M
|
||||||
|
type MsgRec = RecType(Msg)
|
||||||
|
when MsgRec is seq|openarray:
|
||||||
|
type E = ElemType(MsgRec)
|
||||||
|
when val is E:
|
||||||
|
sendResponseChunkObj(UntypedResponder(r).stream, val)
|
||||||
|
elif val is MsgRec:
|
||||||
|
sendResponseChunks(UntypedResponder(r).stream, val)
|
||||||
|
else:
|
||||||
|
static: echo "BAD TYPE ", name(E), " vs ", name(type(val))
|
||||||
|
{.fatal: "bad".}
|
||||||
|
else:
|
||||||
|
send(r, val)
|
||||||
|
|
||||||
proc implementSendProcBody(sendProc: SendProc) =
|
proc implementSendProcBody(sendProc: SendProc) =
|
||||||
let
|
let
|
||||||
msg = sendProc.msg
|
msg = sendProc.msg
|
||||||
|
@ -430,7 +469,7 @@ proc implementSendProcBody(sendProc: SendProc) =
|
||||||
else:
|
else:
|
||||||
quote: sendMsg(`peer`, `msgProto`, `bytes`)
|
quote: sendMsg(`peer`, `msgProto`, `bytes`)
|
||||||
else:
|
else:
|
||||||
quote: sendResponseBytes(`UntypedResponder`(`peer`).stream, `bytes`)
|
quote: sendResponseChunkBytes(`UntypedResponder`(`peer`).stream, `bytes`)
|
||||||
|
|
||||||
sendProc.useStandardBody(nil, nil, sendCallGenerator)
|
sendProc.useStandardBody(nil, nil, sendCallGenerator)
|
||||||
|
|
||||||
|
|
|
@ -66,6 +66,10 @@ serializationFormat SSZ,
|
||||||
Writer = SszWriter,
|
Writer = SszWriter,
|
||||||
PreferedOutput = seq[byte]
|
PreferedOutput = seq[byte]
|
||||||
|
|
||||||
|
template sizePrefixed*[TT](x: TT): untyped =
|
||||||
|
type T = TT
|
||||||
|
SizePrefixed[T](x)
|
||||||
|
|
||||||
proc init*(T: type SszReader,
|
proc init*(T: type SszReader,
|
||||||
stream: ByteStreamVar,
|
stream: ByteStreamVar,
|
||||||
maxObjectSize = defaultMaxObjectSize): T =
|
maxObjectSize = defaultMaxObjectSize): T =
|
||||||
|
@ -252,8 +256,15 @@ func writeValue*[T](w: var SszWriter, x: SizePrefixed[T]) =
|
||||||
var cursor = w.stream.delayVarSizeWrite(10)
|
var cursor = w.stream.delayVarSizeWrite(10)
|
||||||
let initPos = w.stream.pos
|
let initPos = w.stream.pos
|
||||||
w.writeValue T(x)
|
w.writeValue T(x)
|
||||||
cursor.appendVarint uint64(w.stream.pos - initPos)
|
let length = uint64(w.stream.pos - initPos)
|
||||||
finalize cursor
|
when false:
|
||||||
|
discard
|
||||||
|
# TODO varintBytes is sub-optimal at the moment
|
||||||
|
# cursor.writeAndFinalize length.varintBytes
|
||||||
|
else:
|
||||||
|
var buf: VarintBuffer
|
||||||
|
buf.appendVarint length
|
||||||
|
cursor.writeAndFinalize buf.writtenBytes
|
||||||
|
|
||||||
template checkEof(n: int) =
|
template checkEof(n: int) =
|
||||||
if not r.stream[].ensureBytes(n):
|
if not r.stream[].ensureBytes(n):
|
||||||
|
|
|
@ -214,7 +214,6 @@ p2pProtocol BeaconSync(version = 1,
|
||||||
count: uint64,
|
count: uint64,
|
||||||
step: uint64) {.
|
step: uint64) {.
|
||||||
libp2pProtocol("beacon_blocks_by_range", 1).} =
|
libp2pProtocol("beacon_blocks_by_range", 1).} =
|
||||||
var blocks: seq[BeaconBlock]
|
|
||||||
# `step == 0` has no sense, so we will return empty array of blocks.
|
# `step == 0` has no sense, so we will return empty array of blocks.
|
||||||
# `count == 0` means that empty array of blocks requested.
|
# `count == 0` means that empty array of blocks requested.
|
||||||
#
|
#
|
||||||
|
@ -223,38 +222,37 @@ p2pProtocol BeaconSync(version = 1,
|
||||||
# which is follows `start_slot + step` sequence. For example for, if
|
# which is follows `start_slot + step` sequence. For example for, if
|
||||||
# `start_slot` is 2 and `step` is 2 and slots 2, 4, 6 are not available,
|
# `start_slot` is 2 and `step` is 2 and slots 2, 4, 6 are not available,
|
||||||
# then [8, 10, ...] will be returned.
|
# then [8, 10, ...] will be returned.
|
||||||
|
var sentBlocksCount = 0
|
||||||
if step > 0'u64 and count > 0'u64:
|
if step > 0'u64 and count > 0'u64:
|
||||||
let pool = peer.networkState.node.blockPool
|
let pool = peer.networkState.node.blockPool
|
||||||
var blck = pool.getRef(headBlockRoot)
|
var blck = pool.getRef(headBlockRoot)
|
||||||
var slot = start_slot
|
var slot = start_slot
|
||||||
while not(isNil(blck)):
|
while not(isNil(blck)):
|
||||||
if blck.slot == slot:
|
if blck.slot == slot:
|
||||||
blocks.add(pool.get(blck).data)
|
await response.write(pool.get(blck).data)
|
||||||
|
inc sentBlocksCount
|
||||||
slot = slot + step
|
slot = slot + step
|
||||||
elif blck.slot > slot:
|
elif blck.slot > slot:
|
||||||
if (blck.slot - slot) mod step == 0:
|
if (blck.slot - slot) mod step == 0:
|
||||||
blocks.add(pool.get(blck).data)
|
await response.write(pool.get(blck).data)
|
||||||
|
inc sentBlocksCount
|
||||||
slot = slot + ((blck.slot - slot) div step + 1) * step
|
slot = slot + ((blck.slot - slot) div step + 1) * step
|
||||||
if uint64(len(blocks)) == count:
|
if uint64(sentBlocksCount) == count:
|
||||||
break
|
break
|
||||||
blck = blck.parent
|
blck = blck.parent
|
||||||
|
|
||||||
await response.send(blocks)
|
|
||||||
|
|
||||||
proc beaconBlocksByRoot(
|
proc beaconBlocksByRoot(
|
||||||
peer: Peer,
|
peer: Peer,
|
||||||
blockRoots: openarray[Eth2Digest]) {.
|
blockRoots: openarray[Eth2Digest]) {.
|
||||||
libp2pProtocol("beacon_blocks_by_root", 1).} =
|
libp2pProtocol("beacon_blocks_by_root", 1).} =
|
||||||
let pool = peer.networkState.node.blockPool
|
let
|
||||||
let db = peer.networkState.db
|
pool = peer.networkState.node.blockPool
|
||||||
var blocks = newSeqOfCap[BeaconBlock](blockRoots.len)
|
db = peer.networkState.db
|
||||||
|
|
||||||
for root in blockRoots:
|
for root in blockRoots:
|
||||||
let blockRef = pool.getRef(root)
|
let blockRef = pool.getRef(root)
|
||||||
if not(isNil(blockRef)):
|
if not isNil(blockRef):
|
||||||
blocks.add pool.get(blockRef).data
|
await response.write(pool.get(blockRef).data)
|
||||||
|
|
||||||
await response.send(blocks)
|
|
||||||
|
|
||||||
proc beaconBlocks(
|
proc beaconBlocks(
|
||||||
peer: Peer,
|
peer: Peer,
|
||||||
|
@ -276,7 +274,7 @@ p2pProtocol BeaconSync(version = 1,
|
||||||
roots.add BlockRootSlot(blockRoot: r, slot: s)
|
roots.add BlockRootSlot(blockRoot: r, slot: s)
|
||||||
if roots.len == maxRoots.int: break
|
if roots.len == maxRoots.int: break
|
||||||
s += 1
|
s += 1
|
||||||
await response.send(roots)
|
await response.write(roots)
|
||||||
|
|
||||||
proc beaconBlockRoots(
|
proc beaconBlockRoots(
|
||||||
peer: Peer,
|
peer: Peer,
|
||||||
|
@ -344,10 +342,10 @@ p2pProtocol BeaconSync(version = 1,
|
||||||
peer: Peer,
|
peer: Peer,
|
||||||
needed: openarray[FetchRecord]) {.
|
needed: openarray[FetchRecord]) {.
|
||||||
libp2pProtocol("ancestor_blocks", 1).} =
|
libp2pProtocol("ancestor_blocks", 1).} =
|
||||||
var resp = newSeqOfCap[BeaconBlock](needed.len)
|
|
||||||
let db = peer.networkState.db
|
let db = peer.networkState.db
|
||||||
var neededRoots = initSet[Eth2Digest]()
|
var neededRoots = initSet[Eth2Digest]()
|
||||||
for rec in needed: neededRoots.incl(rec.root)
|
for rec in needed: neededRoots.incl(rec.root)
|
||||||
|
var resultsCounter = 0
|
||||||
|
|
||||||
for rec in needed:
|
for rec in needed:
|
||||||
if (var blck = db.getBlock(rec.root); blck.isSome()):
|
if (var blck = db.getBlock(rec.root); blck.isSome()):
|
||||||
|
@ -355,8 +353,9 @@ p2pProtocol BeaconSync(version = 1,
|
||||||
let firstSlot = blck.get().slot - rec.historySlots
|
let firstSlot = blck.get().slot - rec.historySlots
|
||||||
|
|
||||||
for i in 0..<rec.historySlots.int:
|
for i in 0..<rec.historySlots.int:
|
||||||
resp.add(blck.get())
|
await response.write(blck.get())
|
||||||
if resp.len >= MaxAncestorBlocksResponse:
|
inc resultsCounter
|
||||||
|
if resultsCounter >= MaxAncestorBlocksResponse:
|
||||||
break
|
break
|
||||||
|
|
||||||
if blck.get().parent_root in neededRoots:
|
if blck.get().parent_root in neededRoots:
|
||||||
|
@ -368,11 +367,9 @@ p2pProtocol BeaconSync(version = 1,
|
||||||
blck.isNone() or blck.get().slot < firstSlot):
|
blck.isNone() or blck.get().slot < firstSlot):
|
||||||
break
|
break
|
||||||
|
|
||||||
if resp.len >= MaxAncestorBlocksResponse:
|
if resultsCounter >= MaxAncestorBlocksResponse:
|
||||||
break
|
break
|
||||||
|
|
||||||
await response.send(resp)
|
|
||||||
|
|
||||||
proc ancestorBlocks(
|
proc ancestorBlocks(
|
||||||
peer: Peer,
|
peer: Peer,
|
||||||
blocks: openarray[BeaconBlock])
|
blocks: openarray[BeaconBlock])
|
||||||
|
@ -387,10 +384,7 @@ p2pProtocol BeaconSync(version = 1,
|
||||||
let db = peer.networkState.db
|
let db = peer.networkState.db
|
||||||
for r in blockRoots:
|
for r in blockRoots:
|
||||||
if (let blk = db.getBlock(r); blk.isSome):
|
if (let blk = db.getBlock(r); blk.isSome):
|
||||||
bodies.add(blk.get().body)
|
await response.write(blk.get().body)
|
||||||
else:
|
|
||||||
bodies.setLen(bodies.len + 1) # According to wire spec. Pad with zero body.
|
|
||||||
await response.send(bodies)
|
|
||||||
|
|
||||||
proc beaconBlockBodies(
|
proc beaconBlockBodies(
|
||||||
peer: Peer,
|
peer: Peer,
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
Subproject commit b4c0629bf35edd346a54fa6fcf5805395e893a72
|
Subproject commit 4d14677d834edbdb3b29f4ca21e4d83eb58329d3
|
|
@ -1 +1 @@
|
||||||
Subproject commit a81d1fac850119c2ede9f3997332fa9f0a2ad3d8
|
Subproject commit 5f1dc751ca436e599c59df939344a641f935dd4d
|
Loading…
Reference in New Issue