parent
bf650fc2fd
commit
f112b6b2ac
|
@ -308,7 +308,7 @@ proc getAnnouceCids(blocksDelivery: seq[BlockDelivery]): seq[Cid] =
|
||||||
cids.incl(bd.address.cid)
|
cids.incl(bd.address.cid)
|
||||||
return cids.toSeq
|
return cids.toSeq
|
||||||
|
|
||||||
proc resolveBlocks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} =
|
proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} =
|
||||||
trace "Resolving blocks", blocks = blocksDelivery.len
|
trace "Resolving blocks", blocks = blocksDelivery.len
|
||||||
|
|
||||||
b.pendingBlocks.resolve(blocksDelivery)
|
b.pendingBlocks.resolve(blocksDelivery)
|
||||||
|
@ -318,7 +318,7 @@ proc resolveBlocks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.asyn
|
||||||
|
|
||||||
b.discovery.queueProvideBlocksReq(announceCids)
|
b.discovery.queueProvideBlocksReq(announceCids)
|
||||||
|
|
||||||
proc resolveBlocks(b: BlockExcEngine, blocks: seq[Block]) {.async.} =
|
proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} =
|
||||||
await b.resolveBlocks(
|
await b.resolveBlocks(
|
||||||
blocks.mapIt(
|
blocks.mapIt(
|
||||||
BlockDelivery(blk: it, address: BlockAddress(leaf: false, cid: it.cid)
|
BlockDelivery(blk: it, address: BlockAddress(leaf: false, cid: it.cid)
|
||||||
|
@ -400,7 +400,7 @@ proc blocksDeliveryHandler*(
|
||||||
|
|
||||||
validatedBlocksDelivery.add(bd)
|
validatedBlocksDelivery.add(bd)
|
||||||
|
|
||||||
# await b.resolveBlocks(validatedBlocksDelivery)
|
await b.resolveBlocks(validatedBlocksDelivery)
|
||||||
codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64)
|
codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64)
|
||||||
|
|
||||||
let
|
let
|
||||||
|
@ -679,26 +679,6 @@ proc new*(
|
||||||
proc paymentHandler(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} =
|
proc paymentHandler(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} =
|
||||||
engine.paymentHandler(peer, payment)
|
engine.paymentHandler(peer, payment)
|
||||||
|
|
||||||
proc onBlockPutHandler(event: BlockPutEvent): Future[?!void] {.async.} =
|
|
||||||
let delivery = BlockDelivery(
|
|
||||||
address: event.address,
|
|
||||||
proof: CodexProof.none
|
|
||||||
)
|
|
||||||
engine.resolveBlocks(@[delivery])
|
|
||||||
success()
|
|
||||||
|
|
||||||
proc onCidAndProofHandler(event: ProofPutEvent): Future[?!void] {.async.} =
|
|
||||||
let delivery = BlockDelivery(
|
|
||||||
address: BlockAddress(leaf: true, cid: event.blockCid),
|
|
||||||
proof: event.proof.some
|
|
||||||
)
|
|
||||||
|
|
||||||
engine.resolveBlocks(@[delivery])
|
|
||||||
success()
|
|
||||||
|
|
||||||
localStore.onBlockPutEvent.subscribe(onBlockPutHandler)
|
|
||||||
localStore.onCidAndProofPutEvent.subscribe(onCidAndProofHandler)
|
|
||||||
|
|
||||||
network.handlers = BlockExcHandlers(
|
network.handlers = BlockExcHandlers(
|
||||||
onWantList: blockWantListHandler,
|
onWantList: blockWantListHandler,
|
||||||
onBlocksDelivery: blocksDeliveryHandler,
|
onBlocksDelivery: blocksDeliveryHandler,
|
||||||
|
|
|
@ -29,19 +29,7 @@ type
|
||||||
BlockType* {.pure.} = enum
|
BlockType* {.pure.} = enum
|
||||||
Manifest, Block, Both
|
Manifest, Block, Both
|
||||||
|
|
||||||
BlockPutEvent* = object
|
|
||||||
blk*: Block
|
|
||||||
address*: BlockAddress
|
|
||||||
|
|
||||||
ProofPutEvent* = object
|
|
||||||
treeCid*: Cid
|
|
||||||
index*: Natural
|
|
||||||
blockCid*: Cid
|
|
||||||
proof*: CodexProof
|
|
||||||
|
|
||||||
BlockStore* = ref object of RootObj
|
BlockStore* = ref object of RootObj
|
||||||
onBlockPutEvent*: AsyncDataEvent[BlockPutEvent]
|
|
||||||
onCidAndProofPutEvent*: AsyncDataEvent[ProofPutEvent]
|
|
||||||
|
|
||||||
method getBlock*(self: BlockStore, cid: Cid): Future[?!Block] {.base.} =
|
method getBlock*(self: BlockStore, cid: Cid): Future[?!Block] {.base.} =
|
||||||
## Get a block from the blockstore
|
## Get a block from the blockstore
|
||||||
|
|
|
@ -76,7 +76,7 @@ method putBlock*(
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
return res
|
return res
|
||||||
|
|
||||||
# await self.engine.resolveBlocks(@[blk])
|
await self.engine.resolveBlocks(@[blk])
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
method putCidAndProof*(
|
method putCidAndProof*(
|
||||||
|
|
|
@ -125,15 +125,6 @@ method putCidAndProof*(
|
||||||
|
|
||||||
await self.metaDs.put(key, value)
|
await self.metaDs.put(key, value)
|
||||||
|
|
||||||
let event = ProofPutEvent(
|
|
||||||
treeCid: treeCid,
|
|
||||||
index: index,
|
|
||||||
blockCid: blockCid,
|
|
||||||
proof: proof
|
|
||||||
)
|
|
||||||
|
|
||||||
await self.onCidAndProofPutEvent.fire(event)
|
|
||||||
|
|
||||||
method getCidAndProof*(
|
method getCidAndProof*(
|
||||||
self: RepoStore,
|
self: RepoStore,
|
||||||
treeCid: Cid,
|
treeCid: Cid,
|
||||||
|
@ -369,15 +360,6 @@ method putBlock*(
|
||||||
trace "Unable to update block total metadata"
|
trace "Unable to update block total metadata"
|
||||||
return failure("Unable to update block total metadata")
|
return failure("Unable to update block total metadata")
|
||||||
|
|
||||||
let event = BlockPutEvent(
|
|
||||||
blk: blk,
|
|
||||||
address: BlockAddress(leaf: false, cid: blk.cid)
|
|
||||||
)
|
|
||||||
|
|
||||||
? await self.onBlockPutEvent.fire(event)
|
|
||||||
# # todo propagate error message!
|
|
||||||
# return failure("Error during put-block event.")
|
|
||||||
|
|
||||||
self.updateMetrics()
|
self.updateMetrics()
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue