Eric Mastro 4a70312ee9
feat: introduce blockstore manager
Implement blockstore manager which executes block storage operations on its block stores, in the order to which they were added to the manager, typically in the order of most local (fastest, eg cache) to least local (slowest, eg filesystem or perhaps a network filesystem). As an example, given a `BlockStoreManager` instantiated with a `@[MemoryStore, FSStore]`, retrieving a block would first attempt to get from the `MemoryStore`, and if not found, attempt to get from the `FSStore`.

Remove all dependencies on `BlockStores` (typically in the shape of `localstore`) and instead depend on `BlockStoreManager` via the `BlockExcEngine`.

Modify the role of the `BlockExcEngine` to make a “local vs remote” decision on block access/storage. For all operations other than retrieving blocks, this means simply going to the `BlockStoreManager`. For retrieving blocks, however, this means going first to the `BlockStoreManager`, and then if not found, going to the Dagger network (via pending block and want/have lists).

Remove `NetworkStore` as its two purposes were to defer block retrieval from a local store first, then go to the block exchange to requeest a block from the Dagger network. `BlockStoreManager` takes care of going to local storage first, and the block exchange engine handles going to Dagger network if retrieval from the store manager fails.

### Notes
1. Future work may want to consider breaking up `BlockExcEngine` further in to three modules:
  - `BlockExcEngine` (depends on `WantHave`, `DHT`)
  - `WantHave`
  - `DHT` (work is in progress)

Co-authored-by: Michael Bradley <michaelsbradleyjr@gmail.com>
2022-02-08 13:20:09 +11:00

478 lines
13 KiB
Nim

## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/sequtils
import pkg/chronos
import pkg/chronicles
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import ../stores/blockstore
import ../stores/manager
import ../blocktype as bt
import ../utils/asyncheapqueue
import ./protobuf/blockexc
import ./protobuf/presence
import ./network
import ./pendingblocks
import ./peercontext
import ./engine/payments
export asyncheapqueue, peercontext, payments, pendingblocks
logScope:
topics = "dagger blockexc engine"
const
DefaultBlockTimeout* = 5.minutes
DefaultMaxPeersPerRequest* = 10
DefaultTaskQueueSize = 100
DefaultConcurrentTasks = 10
DefaultMaxRetries = 3
type
TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.}
TaskScheduler* = proc(task: BlockExcPeerCtx): bool {.gcsafe.}
BlockExcEngine* = ref object of RootObj
blockStoreMgr: BlockStoreManager # manages storing/accessing blocks for this instance
network*: BlockExcNetwork # network interface
peers*: seq[BlockExcPeerCtx] # peers we're currently actively exchanging with
wantList*: seq[Cid] # local wants list
taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] # peers we're currently processing tasks for
concurrentTasks: int # number of concurrent peers we're serving at any given time
maxRetries: int # max number of tries for a failed block
blockexcTasks: seq[Future[void]] # future to control blockexc task
blockexcRunning: bool # indicates if the blockexc task is running
pendingBlocks*: PendingBlocksManager # blocks we're awaiting to be resolved
peersPerRequest: int # max number of peers to request from
wallet*: WalletRef # nitro wallet for micropayments
pricing*: ?Pricing # optional bandwidth pricing
Pricing* = object
address*: EthAddress
price*: UInt256
proc contains*(a: AsyncHeapQueue[Entry], b: Cid): bool =
## Convenience method to check for entry prepense
##
a.anyIt( it.cid == b )
proc getPeerCtx*(b: BlockExcEngine, peerId: PeerID): BlockExcPeerCtx =
## Get the peer's context
##
let peer = b.peers.filterIt( it.id == peerId )
if peer.len > 0:
return peer[0]
# attach task scheduler to engine
proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe} =
b.taskQueue.pushOrUpdateNoWait(task).isOk()
proc blockexcTaskRunner(b: BlockExcEngine): Future[void] {.gcsafe.}
proc start*(b: BlockExcEngine) {.async.} =
## Start the blockexc task
##
trace "blockexc start"
if b.blockexcRunning:
warn "Starting blockexc twice"
return
b.blockexcRunning = true
for i in 0..<b.concurrentTasks:
b.blockexcTasks.add(blockexcTaskRunner(b))
proc stop*(b: BlockExcEngine) {.async.} =
## Stop the blockexc blockexc
##
trace "Block Exchange Engine stop"
if not b.blockexcRunning:
warn "Stopping blockexc without starting it"
return
b.blockexcRunning = false
for t in b.blockexcTasks:
if not t.finished:
trace "Awaiting task to stop"
await t.cancelAndWait()
trace "Task stopped"
trace "Block Exchange Engine stopped"
proc requestBlock*(
b: BlockExcEngine,
cid: Cid,
timeout = DefaultBlockTimeout): Future[bt.Block] =
## Request a block from remotes
##
let
blk = b.pendingBlocks.addOrAwait(cid).wait(timeout)
if b.peers.len <= 0:
warn "No peers to request blocks from"
# TODO: run discovery here to get peers for the block
return blk
var peers = b.peers
# get the first peer with at least one (any)
# matching cid
var blockPeer: BlockExcPeerCtx
for i, p in peers:
if cid in p.peerHave:
blockPeer = p
break
# didn't find any peer with matching cids
# use the first one in the sorted array
if isNil(blockPeer):
blockPeer = peers[0]
peers.keepItIf(
it != blockPeer
)
trace "Requesting block from peer", peer = blockPeer.id, cid
# request block
b.network.request.sendWantList(
blockPeer.id,
@[cid],
wantType = WantType.wantBlock) # we want this remote to send us a block
if peers.len == 0:
return blk # no peers to send wants to
# filter out the peer we've already requested from
let stop = min(peers.high, b.peersPerRequest)
trace "Sending want list requests to remaining peers", count = stop + 1
for p in peers[0..stop]:
if cid notin p.peerHave:
# just send wants
b.network.request.sendWantList(
p.id,
@[cid],
wantType = WantType.wantHave) # we only want to know if the peer has the block
return blk
proc retrieve*(
b: BlockExcEngine,
cid: Cid): Future[?!bt.Block] {.async.} =
## Attempt to retrieve block from local stores, and if not retrieved,
## request from Dagger network
##
var
blk: bt.Block
blkResult = await b.blockStoreMgr.getBlock(cid)
if blkResult.isOk:
blk = blkResult.get
else:
blk = try:
await b.requestBlock(cid)
except AsyncTimeoutError:
trace "Block request timed out", cid
return failure("Block request timed out")
except CatchableError as exc:
trace "Exception requesting block", cid, exc = exc.msg
return failure(exc.msg)
return success(blk)
proc store*(
self: BlockExcEngine,
blk: bt.Block): Future[bool] {.async.} =
## Store the block *locally* using the block store manager
return await self.blockStoreMgr.putBlock(blk)
proc exists*(
self: BlockExcEngine,
cid: Cid): bool =
## Check if the block exists *locally* using the block store manager
return self.blockStoreMgr.hasBlock(cid)
proc blockPresenceHandler*(
b: BlockExcEngine,
peer: PeerID,
blocks: seq[BlockPresence]) {.async.} =
## Handle block presence
##
let peerCtx = b.getPeerCtx(peer)
if isNil(peerCtx):
return
for blk in blocks:
if presence =? Presence.init(blk):
peerCtx.updatePresence(presence)
proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) =
trace "Schedule a task for new blocks"
let cids = blocks.mapIt( it.cid )
# schedule any new peers to provide blocks to
for p in b.peers:
for c in cids: # for each cid
# schedule a peer if it wants at least one
# cid and we have it in our local store
if c in p.peerWants and c in b.blockStoreMgr:
if not b.scheduleTask(p):
trace "Unable to schedule task for peer", peer = p.id
break # do next peer
proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) =
## Resolve pending blocks from the pending blocks manager
## and schedule any new task to be ran
##
trace "Resolving blocks"
b.pendingBlocks.resolve(blocks)
b.scheduleTasks(blocks)
proc payForBlocks(engine: BlockExcEngine,
peer: BlockExcPeerCtx,
blocks: seq[bt.Block]) =
let sendPayment = engine.network.request.sendPayment
if sendPayment.isNil:
return
let cids = blocks.mapIt(it.cid)
if payment =? engine.wallet.pay(peer, peer.price(cids)):
sendPayment(peer.id, payment)
proc blocksHandler*(
b: BlockExcEngine,
peer: PeerID,
blocks: seq[bt.Block]) {.async.} =
## handle incoming blocks
##
trace "Got blocks from peer", peer, len = blocks.len
if not (await b.blockStoreMgr.putBlocks(blocks)):
trace "Unable to store blocks"
b.resolveBlocks(blocks)
let peerCtx = b.getPeerCtx(peer)
if peerCtx != nil:
b.payForBlocks(peerCtx, blocks)
proc wantListHandler*(
b: BlockExcEngine,
peer: PeerID,
wantList: WantList) {.async.} =
## Handle incoming want lists
##
trace "Got want list for peer", peer
let peerCtx = b.getPeerCtx(peer)
if isNil(peerCtx):
return
var dontHaves: seq[Cid]
let entries = wantList.entries
for e in entries:
let idx = peerCtx.peerWants.find(e)
if idx > -1:
# peer doesn't want this block anymore
if e.cancel:
peerCtx.peerWants.del(idx)
continue
peerCtx.peerWants[idx] = e # update entry
else:
peerCtx.peerWants.add(e)
trace "Added entry to peer's want list", peer = peerCtx.id, cid = $e.cid
# peer might want to ask for the same cid with
# different want params
if e.sendDontHave and e.cid notin b.blockStoreMgr:
dontHaves.add(e.cid)
# send don't have's to remote
if dontHaves.len > 0:
b.network.request.sendPresence(
peer,
dontHaves.mapIt(
BlockPresence(
cid: it.data.buffer,
`type`: BlockPresenceType.presenceDontHave)))
if not b.scheduleTask(peerCtx):
trace "Unable to schedule task for peer", peer
proc accountHandler*(engine: BlockExcEngine, peer: PeerID, account: Account) {.async.} =
let context = engine.getPeerCtx(peer)
if context.isNil:
return
context.account = account.some
proc paymentHandler*(engine: BlockExcEngine, peer: PeerId, payment: SignedState) {.async.} =
without context =? engine.getPeerCtx(peer).option and
account =? context.account:
return
if channel =? context.paymentChannel:
let sender = account.address
discard engine.wallet.acceptPayment(channel, Asset, sender, payment)
else:
context.paymentChannel = engine.wallet.acceptChannel(payment).option
proc setupPeer*(b: BlockExcEngine, peer: PeerID) =
## Perform initial setup, such as want
## list exchange
##
trace "Setting up new peer", peer
if peer notin b.peers:
b.peers.add(BlockExcPeerCtx(
id: peer
))
# broadcast our want list, the other peer will do the same
if b.wantList.len > 0:
b.network.request.sendWantList(peer, b.wantList, full = true)
if address =? b.pricing.?address:
b.network.request.sendAccount(peer, Account(address: address))
proc dropPeer*(b: BlockExcEngine, peer: PeerID) =
## Cleanup disconnected peer
##
trace "Dropping peer", peer
# drop the peer from the peers table
b.peers.keepItIf( it.id != peer )
proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
trace "Handling task for peer", peer = task.id
var wantsBlocks = newAsyncHeapQueue[Entry](queueType = QueueType.Max)
# get blocks and wants to send to the remote
for e in task.peerWants:
if e.wantType == WantType.wantBlock:
await wantsBlocks.push(e)
# TODO: There should be all sorts of accounting of
# bytes sent/received here
if wantsBlocks.len > 0:
let
cids = wantsBlocks.mapIt(it.cid)
blocks = await b.blockStoreMgr.getBlocks(cids)
if blocks.len > 0:
b.network.request.sendBlocks(
task.id,
blocks)
# Remove successfully sent blocks
task.peerWants.keepIf(
proc(e: Entry): bool =
not blocks.anyIt( it.cid == e.cid )
)
var wants: seq[BlockPresence]
# do not remove wants from the queue unless
# we send the block or get a cancel
for e in task.peerWants:
if e.wantType == WantType.wantHave:
var presence = Presence(cid: e.cid)
presence.have = b.blockStoreMgr.hasblock(presence.cid)
if presence.have and price =? b.pricing.?price:
presence.price = price
wants.add(BlockPresence.init(presence))
if wants.len > 0:
b.network.request.sendPresence(task.id, wants)
proc blockexcTaskRunner(b: BlockExcEngine) {.async.} =
## process tasks
##
while b.blockexcRunning:
let peerCtx = await b.taskQueue.pop()
asyncSpawn b.taskHandler(peerCtx)
trace "Exiting blockexc task runner"
proc new*(
T: type BlockExcEngine,
wallet: WalletRef,
network: BlockExcNetwork,
storeMgr: BlockStoreManager,
concurrentTasks = DefaultConcurrentTasks,
maxRetries = DefaultMaxRetries,
peersPerRequest = DefaultMaxPeersPerRequest): T =
let engine = BlockExcEngine(
pendingBlocks: PendingBlocksManager.new(),
peersPerRequest: peersPerRequest,
network: network,
blockStoreMgr: storeMgr,
wallet: wallet,
concurrentTasks: concurrentTasks,
maxRetries: maxRetries,
taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize))
proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} =
if event.kind == PeerEventKind.Joined:
engine.setupPeer(peerId)
else:
engine.dropPeer(peerId)
if not isNil(network.switch):
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined)
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left)
proc blockWantListHandler(
peer: PeerID,
wantList: WantList): Future[void] {.gcsafe.} =
engine.wantListHandler(peer, wantList)
proc blockPresenceHandler(
peer: PeerID,
presence: seq[BlockPresence]): Future[void] {.gcsafe.} =
engine.blockPresenceHandler(peer, presence)
proc blocksHandler(
peer: PeerID,
blocks: seq[bt.Block]): Future[void] {.gcsafe.} =
engine.blocksHandler(peer, blocks)
proc accountHandler(peer: PeerId, account: Account): Future[void] {.gcsafe.} =
engine.accountHandler(peer, account)
proc paymentHandler(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} =
engine.paymentHandler(peer, payment)
network.handlers = BlockExcHandlers(
onWantList: blockWantListHandler,
onBlocks: blocksHandler,
onPresence: blockPresenceHandler,
onAccount: accountHandler,
onPayment: paymentHandler
)
return engine