trying events from repostore
This commit is contained in:
parent
eb8f468880
commit
bf650fc2fd
|
@ -308,7 +308,7 @@ proc getAnnouceCids(blocksDelivery: seq[BlockDelivery]): seq[Cid] =
|
||||||
cids.incl(bd.address.cid)
|
cids.incl(bd.address.cid)
|
||||||
return cids.toSeq
|
return cids.toSeq
|
||||||
|
|
||||||
proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} =
|
proc resolveBlocks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} =
|
||||||
trace "Resolving blocks", blocks = blocksDelivery.len
|
trace "Resolving blocks", blocks = blocksDelivery.len
|
||||||
|
|
||||||
b.pendingBlocks.resolve(blocksDelivery)
|
b.pendingBlocks.resolve(blocksDelivery)
|
||||||
|
@ -318,7 +318,7 @@ proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.asy
|
||||||
|
|
||||||
b.discovery.queueProvideBlocksReq(announceCids)
|
b.discovery.queueProvideBlocksReq(announceCids)
|
||||||
|
|
||||||
proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} =
|
proc resolveBlocks(b: BlockExcEngine, blocks: seq[Block]) {.async.} =
|
||||||
await b.resolveBlocks(
|
await b.resolveBlocks(
|
||||||
blocks.mapIt(
|
blocks.mapIt(
|
||||||
BlockDelivery(blk: it, address: BlockAddress(leaf: false, cid: it.cid)
|
BlockDelivery(blk: it, address: BlockAddress(leaf: false, cid: it.cid)
|
||||||
|
@ -400,7 +400,7 @@ proc blocksDeliveryHandler*(
|
||||||
|
|
||||||
validatedBlocksDelivery.add(bd)
|
validatedBlocksDelivery.add(bd)
|
||||||
|
|
||||||
await b.resolveBlocks(validatedBlocksDelivery)
|
# await b.resolveBlocks(validatedBlocksDelivery)
|
||||||
codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64)
|
codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64)
|
||||||
|
|
||||||
let
|
let
|
||||||
|
@ -679,6 +679,26 @@ proc new*(
|
||||||
proc paymentHandler(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} =
|
proc paymentHandler(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} =
|
||||||
engine.paymentHandler(peer, payment)
|
engine.paymentHandler(peer, payment)
|
||||||
|
|
||||||
|
proc onBlockPutHandler(event: BlockPutEvent): Future[?!void] {.async.} =
|
||||||
|
let delivery = BlockDelivery(
|
||||||
|
address: event.address,
|
||||||
|
proof: CodexProof.none
|
||||||
|
)
|
||||||
|
engine.resolveBlocks(@[delivery])
|
||||||
|
success()
|
||||||
|
|
||||||
|
proc onCidAndProofHandler(event: ProofPutEvent): Future[?!void] {.async.} =
|
||||||
|
let delivery = BlockDelivery(
|
||||||
|
address: BlockAddress(leaf: true, cid: event.blockCid),
|
||||||
|
proof: event.proof.some
|
||||||
|
)
|
||||||
|
|
||||||
|
engine.resolveBlocks(@[delivery])
|
||||||
|
success()
|
||||||
|
|
||||||
|
localStore.onBlockPutEvent.subscribe(onBlockPutHandler)
|
||||||
|
localStore.onCidAndProofPutEvent.subscribe(onCidAndProofHandler)
|
||||||
|
|
||||||
network.handlers = BlockExcHandlers(
|
network.handlers = BlockExcHandlers(
|
||||||
onWantList: blockWantListHandler,
|
onWantList: blockWantListHandler,
|
||||||
onBlocksDelivery: blocksDeliveryHandler,
|
onBlocksDelivery: blocksDeliveryHandler,
|
||||||
|
|
|
@ -29,7 +29,19 @@ type
|
||||||
BlockType* {.pure.} = enum
|
BlockType* {.pure.} = enum
|
||||||
Manifest, Block, Both
|
Manifest, Block, Both
|
||||||
|
|
||||||
|
BlockPutEvent* = object
|
||||||
|
blk*: Block
|
||||||
|
address*: BlockAddress
|
||||||
|
|
||||||
|
ProofPutEvent* = object
|
||||||
|
treeCid*: Cid
|
||||||
|
index*: Natural
|
||||||
|
blockCid*: Cid
|
||||||
|
proof*: CodexProof
|
||||||
|
|
||||||
BlockStore* = ref object of RootObj
|
BlockStore* = ref object of RootObj
|
||||||
|
onBlockPutEvent*: AsyncDataEvent[BlockPutEvent]
|
||||||
|
onCidAndProofPutEvent*: AsyncDataEvent[ProofPutEvent]
|
||||||
|
|
||||||
method getBlock*(self: BlockStore, cid: Cid): Future[?!Block] {.base.} =
|
method getBlock*(self: BlockStore, cid: Cid): Future[?!Block] {.base.} =
|
||||||
## Get a block from the blockstore
|
## Get a block from the blockstore
|
||||||
|
|
|
@ -76,7 +76,7 @@ method putBlock*(
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
return res
|
return res
|
||||||
|
|
||||||
await self.engine.resolveBlocks(@[blk])
|
# await self.engine.resolveBlocks(@[blk])
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
method putCidAndProof*(
|
method putCidAndProof*(
|
||||||
|
|
|
@ -125,6 +125,15 @@ method putCidAndProof*(
|
||||||
|
|
||||||
await self.metaDs.put(key, value)
|
await self.metaDs.put(key, value)
|
||||||
|
|
||||||
|
let event = ProofPutEvent(
|
||||||
|
treeCid: treeCid,
|
||||||
|
index: index,
|
||||||
|
blockCid: blockCid,
|
||||||
|
proof: proof
|
||||||
|
)
|
||||||
|
|
||||||
|
await self.onCidAndProofPutEvent.fire(event)
|
||||||
|
|
||||||
method getCidAndProof*(
|
method getCidAndProof*(
|
||||||
self: RepoStore,
|
self: RepoStore,
|
||||||
treeCid: Cid,
|
treeCid: Cid,
|
||||||
|
@ -360,6 +369,15 @@ method putBlock*(
|
||||||
trace "Unable to update block total metadata"
|
trace "Unable to update block total metadata"
|
||||||
return failure("Unable to update block total metadata")
|
return failure("Unable to update block total metadata")
|
||||||
|
|
||||||
|
let event = BlockPutEvent(
|
||||||
|
blk: blk,
|
||||||
|
address: BlockAddress(leaf: false, cid: blk.cid)
|
||||||
|
)
|
||||||
|
|
||||||
|
? await self.onBlockPutEvent.fire(event)
|
||||||
|
# # todo propagate error message!
|
||||||
|
# return failure("Error during put-block event.")
|
||||||
|
|
||||||
self.updateMetrics()
|
self.updateMetrics()
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue