diff --git a/beacon_chain/libp2p_backend.nim b/beacon_chain/libp2p_backend.nim index 0a394775d..a9b909192 100644 --- a/beacon_chain/libp2p_backend.nim +++ b/beacon_chain/libp2p_backend.nim @@ -6,7 +6,7 @@ import libp2p_json_serialization, ssz export - daemonapi, p2pProtocol, libp2p_json_serialization + daemonapi, p2pProtocol, libp2p_json_serialization, ssz type Eth2Node* = ref object of RootObj @@ -317,12 +317,32 @@ proc sendMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} = if sent != requestBytes.len: raise newException(TransmissionError, "Failed to deliver msg bytes") -proc sendResponseBytes(stream: P2PStream, bytes: Bytes) {.async.} = - var sent = await stream.transp.write(@[byte Success]) - if sent != 1: - raise newException(TransmissionError, "Failed to deliver response code") - await writeSizePrefix(stream.transp, uint64(bytes.len)) - sent = await stream.transp.write(bytes) +proc sendResponseChunkBytes(stream: P2PStream, payload: Bytes) {.async.} = + var s = init OutputStream + s.append byte(Success) + s.appendVarint payload.len + s.append payload + let bytes = s.getOutput + let sent = await stream.transp.write(bytes) + if sent != bytes.len: + raise newException(TransmissionError, "Failed to deliver all bytes") + +proc sendResponseChunkObj(stream: P2PStream, val: auto) {.async.} = + var s = init OutputStream + s.append byte(Success) + s.appendValue SSZ, sizePrefixed(val) + let bytes = s.getOutput + let sent = await stream.transp.write(bytes) + if sent != bytes.len: + raise newException(TransmissionError, "Failed to deliver all bytes") + +proc sendResponseChunks[T](stream: P2PStream, chunks: seq[T]) {.async.} = + var s = init OutputStream + for chunk in chunks: + s.append byte(Success) + s.appendValue SSZ, sizePrefixed(chunk) + let bytes = s.getOutput + let sent = await stream.transp.write(bytes) if sent != bytes.len: raise newException(TransmissionError, "Failed to deliver all bytes") @@ -410,6 +430,25 @@ proc init*[MsgType](T: type Responder[MsgType], peer: Peer, stream: P2PStream): T = T(UntypedResponder(peer: peer, stream: stream)) +import + typetraits + +template write*[M](r: var Responder[M], val: auto): auto = + mixin send + type Msg = M + type MsgRec = RecType(Msg) + when MsgRec is seq|openarray: + type E = ElemType(MsgRec) + when val is E: + sendResponseChunkObj(UntypedResponder(r).stream, val) + elif val is MsgRec: + sendResponseChunks(UntypedResponder(r).stream, val) + else: + static: echo "BAD TYPE ", name(E), " vs ", name(type(val)) + {.fatal: "bad".} + else: + send(r, val) + proc implementSendProcBody(sendProc: SendProc) = let msg = sendProc.msg @@ -430,7 +469,7 @@ proc implementSendProcBody(sendProc: SendProc) = else: quote: sendMsg(`peer`, `msgProto`, `bytes`) else: - quote: sendResponseBytes(`UntypedResponder`(`peer`).stream, `bytes`) + quote: sendResponseChunkBytes(`UntypedResponder`(`peer`).stream, `bytes`) sendProc.useStandardBody(nil, nil, sendCallGenerator) diff --git a/beacon_chain/ssz.nim b/beacon_chain/ssz.nim index a95212990..4f36be105 100644 --- a/beacon_chain/ssz.nim +++ b/beacon_chain/ssz.nim @@ -66,6 +66,10 @@ serializationFormat SSZ, Writer = SszWriter, PreferedOutput = seq[byte] +template sizePrefixed*[TT](x: TT): untyped = + type T = TT + SizePrefixed[T](x) + proc init*(T: type SszReader, stream: ByteStreamVar, maxObjectSize = defaultMaxObjectSize): T = @@ -252,8 +256,15 @@ func writeValue*[T](w: var SszWriter, x: SizePrefixed[T]) = var cursor = w.stream.delayVarSizeWrite(10) let initPos = w.stream.pos w.writeValue T(x) - cursor.appendVarint uint64(w.stream.pos - initPos) - finalize cursor + let length = uint64(w.stream.pos - initPos) + when false: + discard + # TODO varintBytes is sub-optimal at the moment + # cursor.writeAndFinalize length.varintBytes + else: + var buf: VarintBuffer + buf.appendVarint length + cursor.writeAndFinalize buf.writtenBytes template checkEof(n: int) = if not r.stream[].ensureBytes(n): diff --git a/beacon_chain/sync_protocol.nim b/beacon_chain/sync_protocol.nim index 9f16f7d3b..35ea45fcc 100644 --- a/beacon_chain/sync_protocol.nim +++ b/beacon_chain/sync_protocol.nim @@ -214,7 +214,6 @@ p2pProtocol BeaconSync(version = 1, count: uint64, step: uint64) {. libp2pProtocol("beacon_blocks_by_range", 1).} = - var blocks: seq[BeaconBlock] # `step == 0` has no sense, so we will return empty array of blocks. # `count == 0` means that empty array of blocks requested. # @@ -223,38 +222,37 @@ p2pProtocol BeaconSync(version = 1, # which is follows `start_slot + step` sequence. For example for, if # `start_slot` is 2 and `step` is 2 and slots 2, 4, 6 are not available, # then [8, 10, ...] will be returned. + var sentBlocksCount = 0 if step > 0'u64 and count > 0'u64: let pool = peer.networkState.node.blockPool var blck = pool.getRef(headBlockRoot) var slot = start_slot while not(isNil(blck)): if blck.slot == slot: - blocks.add(pool.get(blck).data) + await response.write(pool.get(blck).data) + inc sentBlocksCount slot = slot + step elif blck.slot > slot: if (blck.slot - slot) mod step == 0: - blocks.add(pool.get(blck).data) + await response.write(pool.get(blck).data) + inc sentBlocksCount slot = slot + ((blck.slot - slot) div step + 1) * step - if uint64(len(blocks)) == count: + if uint64(sentBlocksCount) == count: break blck = blck.parent - await response.send(blocks) - proc beaconBlocksByRoot( peer: Peer, blockRoots: openarray[Eth2Digest]) {. libp2pProtocol("beacon_blocks_by_root", 1).} = - let pool = peer.networkState.node.blockPool - let db = peer.networkState.db - var blocks = newSeqOfCap[BeaconBlock](blockRoots.len) + let + pool = peer.networkState.node.blockPool + db = peer.networkState.db for root in blockRoots: let blockRef = pool.getRef(root) - if not(isNil(blockRef)): - blocks.add pool.get(blockRef).data - - await response.send(blocks) + if not isNil(blockRef): + await response.write(pool.get(blockRef).data) proc beaconBlocks( peer: Peer, @@ -276,7 +274,7 @@ p2pProtocol BeaconSync(version = 1, roots.add BlockRootSlot(blockRoot: r, slot: s) if roots.len == maxRoots.int: break s += 1 - await response.send(roots) + await response.write(roots) proc beaconBlockRoots( peer: Peer, @@ -344,10 +342,10 @@ p2pProtocol BeaconSync(version = 1, peer: Peer, needed: openarray[FetchRecord]) {. libp2pProtocol("ancestor_blocks", 1).} = - var resp = newSeqOfCap[BeaconBlock](needed.len) let db = peer.networkState.db var neededRoots = initSet[Eth2Digest]() for rec in needed: neededRoots.incl(rec.root) + var resultsCounter = 0 for rec in needed: if (var blck = db.getBlock(rec.root); blck.isSome()): @@ -355,8 +353,9 @@ p2pProtocol BeaconSync(version = 1, let firstSlot = blck.get().slot - rec.historySlots for i in 0..= MaxAncestorBlocksResponse: + await response.write(blck.get()) + inc resultsCounter + if resultsCounter >= MaxAncestorBlocksResponse: break if blck.get().parent_root in neededRoots: @@ -368,11 +367,9 @@ p2pProtocol BeaconSync(version = 1, blck.isNone() or blck.get().slot < firstSlot): break - if resp.len >= MaxAncestorBlocksResponse: + if resultsCounter >= MaxAncestorBlocksResponse: break - await response.send(resp) - proc ancestorBlocks( peer: Peer, blocks: openarray[BeaconBlock]) @@ -387,10 +384,7 @@ p2pProtocol BeaconSync(version = 1, let db = peer.networkState.db for r in blockRoots: if (let blk = db.getBlock(r); blk.isSome): - bodies.add(blk.get().body) - else: - bodies.setLen(bodies.len + 1) # According to wire spec. Pad with zero body. - await response.send(bodies) + await response.write(blk.get().body) proc beaconBlockBodies( peer: Peer, diff --git a/vendor/nim-eth b/vendor/nim-eth index b4c0629bf..4d14677d8 160000 --- a/vendor/nim-eth +++ b/vendor/nim-eth @@ -1 +1 @@ -Subproject commit b4c0629bf35edd346a54fa6fcf5805395e893a72 +Subproject commit 4d14677d834edbdb3b29f4ca21e4d83eb58329d3 diff --git a/vendor/nim-stew b/vendor/nim-stew index a81d1fac8..5f1dc751c 160000 --- a/vendor/nim-stew +++ b/vendor/nim-stew @@ -1 +1 @@ -Subproject commit a81d1fac850119c2ede9f3997332fa9f0a2ad3d8 +Subproject commit 5f1dc751ca436e599c59df939344a641f935dd4d