nim-codex/dagger/bitswap.nim

188 lines
5.1 KiB
Nim
Raw Normal View History

Poc 2 (#7) * moving protobuf into bitswap * adding block type * reworking bitswap * adding chunker * adding license header * use 1.2.6 * adding fixed size chunker * add blockstore * add iterator to chunker * more bitswap changes * rename ipfs to dagger * rename to dagger * blockstore inherits from BlockProvider * wip - add core block handling logic * smal changes * use proper block store methods * adding asynq heapqueue * wip prepare for bitswap task runner * adding `$` * adding memory store and tests * fixed chunking * extracted desicion engine from bitswap * added helper random funcs * adding testing helpers * only handle seqs * add peer events * cleanup pending blocks on blockstore event * allow nil handlers * move protobuf type helpers * allow initializing block from Cid * testing and fixes * small fixes * expose `<` * spelling * default value * spelling * pending blocks manager * adding stores manager * more tests a wip around bitswap * small changes * merge bitswap and engine for now * for now run only the new poc's tests * add a more complete ci setup * use template in map * remove p2pd * remove go * dont use asyncCheck * few small changes * adding ability to update items * adding multiple task runners * handle cancelation properly * use Result instead of throwing * wip bitswap tests * moving things around * split out engine again * add request and handlers interface * fix tests * wip - engine tests * remove unused imports * fix tests * cleanup block requesting logic * add block request tests * more block requests * add support for max heap * don't use result * use max heap & send block presence in task handler * add task handler tests * rename store to localStore * cleanup & logging * cancel task on stop * don't depend on local store for events * dont use heap queue for wants * add chronicles * fix issue with peer wants * add test for delayed block sends * remove obsolete tests * wip chunker * run all tests * add todo * misc * remove irrelevant files * removing more files * adding helpers for bitswap tests * moved bitswap file * misc * make blocks timeout longer * adjust block timeout * speedup test * compile with threads * import missing crypto * misc * disable threads for now * fix 32 bit platforms * re-enable threads support in tests
2021-02-26 00:23:22 +00:00
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/sequtils
import pkg/chronicles
import pkg/chronos
import pkg/libp2p
import pkg/libp2p/errors
import ./bitswap/protobuf/bitswap as pb
import ./blocktype as bt
import ./stores/blockstore
import ./utils/asyncheapqueue
import ./bitswap/network
import ./bitswap/engine
export network, blockstore, asyncheapqueue, engine
logScope:
topics = "dagger bitswap"
const
DefaultTaskQueueSize = 100
DefaultConcurrentTasks = 10
DefaultMaxRetries = 3
type
Bitswap* = ref object of BlockStore
engine*: BitswapEngine # bitswap decision engine
taskQueue*: AsyncHeapQueue[BitswapPeerCtx] # peers we're currently processing tasks for
bitswapTasks: seq[Future[void]] # future to control bitswap task
bitswapRunning: bool # indicates if the bitswap task is running
concurrentTasks: int # number of concurrent peers we're serving at any given time
maxRetries: int # max number of tries for a failed block
taskHandler: TaskHandler # handler provided by the engine called by the runner
proc bitswapTaskRunner(b: Bitswap) {.async.} =
## process tasks in order of least amount of
## debt ratio
##
while b.bitswapRunning:
let peerCtx = await b.taskQueue.pop()
asyncSpawn b.taskHandler(peerCtx)
trace "Exiting bitswap task runner"
proc start*(b: Bitswap) {.async.} =
## Start the bitswap task
##
trace "bitswap start"
if b.bitswapTasks.len > 0:
warn "Starting bitswap twice"
return
b.bitswapRunning = true
for i in 0..<b.concurrentTasks:
b.bitswapTasks.add(b.bitswapTaskRunner)
proc stop*(b: Bitswap) {.async.} =
## Stop the bitswap bitswap
##
trace "Bitswap stop"
if b.bitswapTasks.len <= 0:
warn "Stopping bitswap without starting it"
return
b.bitswapRunning = false
for t in b.bitswapTasks:
if not t.finished:
trace "Awaiting task to stop"
t.cancel()
trace "Task stopped"
trace "Bitswap stopped"
method getBlocks*(b: Bitswap, cid: seq[Cid]): Future[seq[bt.Block]] {.async.} =
## Get a block from a remote peer
##
let blocks = await allFinished(b.engine.requestBlocks(cid))
return blocks.filterIt(
not it.failed
).mapIt(
it.read
)
method putBlocks*(b: Bitswap, blocks: seq[bt.Block]) =
b.engine.resolveBlocks(blocks)
procCall BlockStore(b).putBlocks(blocks)
proc new*(
T: type Bitswap,
localStore: BlockStore,
network: BitswapNetwork,
concurrentTasks = DefaultConcurrentTasks,
maxRetries = DefaultMaxRetries,
peersPerRequest = DefaultMaxPeersPerRequest): T =
proc sendWantList(
id: PeerID,
cids: seq[Cid],
priority: int32 = 0,
cancel: bool = false,
wantType: WantType = WantType.wantHave,
full: bool = false,
sendDontHave: bool = false) {.gcsafe.} =
network.broadcastWantList(
id, cids, priority, cancel,
wantType, full, sendDontHave)
proc sendBlocks(id: PeerID, blocks: seq[bt.Block]) {.gcsafe.} =
network.broadcastBlocks(id, blocks)
proc sendPresence(id: PeerID, presence: seq[BlockPresence]) {.gcsafe.} =
network.broadcastBlockPresence(id, presence)
let engine = BitswapEngine.new(
localStore = localStore,
peersPerRequest = peersPerRequest,
request = network.request,
)
let b = Bitswap(
engine: engine,
taskQueue: newAsyncHeapQueue[BitswapPeerCtx](DefaultTaskQueueSize),
concurrentTasks: concurrentTasks,
maxRetries: maxRetries,
)
# attach engine's task handler
b.taskHandler = proc(task: BitswapPeerCtx):
Future[void] {.gcsafe.} =
engine.taskHandler(task)
# attach task scheduler to engine
engine.scheduleTask = proc(task: BitswapPeerCtx):
bool {.gcsafe} =
b.taskQueue.pushOrUpdateNoWait(task).isOk()
proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} =
if event.kind == PeerEventKind.Joined:
b.engine.setupPeer(peerId)
else:
b.engine.dropPeer(peerId)
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined)
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left)
proc blockWantListHandler(
peer: PeerID,
wantList: WantList) {.gcsafe.} =
engine.wantListHandler(peer, wantList)
proc blockPresenceHandler(
peer: PeerID,
presence: seq[BlockPresence]) {.gcsafe.} =
engine.blockPresenceHandler(peer, presence)
proc blocksHandler(
peer: PeerID,
blocks: seq[bt.Block]) {.gcsafe.} =
engine.blocksHandler(peer, blocks)
proc pricingHandler(peer: PeerId, pricing: Pricing) =
engine.pricingHandler(peer, pricing)
Poc 2 (#7) * moving protobuf into bitswap * adding block type * reworking bitswap * adding chunker * adding license header * use 1.2.6 * adding fixed size chunker * add blockstore * add iterator to chunker * more bitswap changes * rename ipfs to dagger * rename to dagger * blockstore inherits from BlockProvider * wip - add core block handling logic * smal changes * use proper block store methods * adding asynq heapqueue * wip prepare for bitswap task runner * adding `$` * adding memory store and tests * fixed chunking * extracted desicion engine from bitswap * added helper random funcs * adding testing helpers * only handle seqs * add peer events * cleanup pending blocks on blockstore event * allow nil handlers * move protobuf type helpers * allow initializing block from Cid * testing and fixes * small fixes * expose `<` * spelling * default value * spelling * pending blocks manager * adding stores manager * more tests a wip around bitswap * small changes * merge bitswap and engine for now * for now run only the new poc's tests * add a more complete ci setup * use template in map * remove p2pd * remove go * dont use asyncCheck * few small changes * adding ability to update items * adding multiple task runners * handle cancelation properly * use Result instead of throwing * wip bitswap tests * moving things around * split out engine again * add request and handlers interface * fix tests * wip - engine tests * remove unused imports * fix tests * cleanup block requesting logic * add block request tests * more block requests * add support for max heap * don't use result * use max heap & send block presence in task handler * add task handler tests * rename store to localStore * cleanup & logging * cancel task on stop * don't depend on local store for events * dont use heap queue for wants * add chronicles * fix issue with peer wants * add test for delayed block sends * remove obsolete tests * wip chunker * run all tests * add todo * misc * remove irrelevant files * removing more files * adding helpers for bitswap tests * moved bitswap file * misc * make blocks timeout longer * adjust block timeout * speedup test * compile with threads * import missing crypto * misc * disable threads for now * fix 32 bit platforms * re-enable threads support in tests
2021-02-26 00:23:22 +00:00
network.handlers = BitswapHandlers(
onWantList: blockWantListHandler,
onBlocks: blocksHandler,
onPresence: blockPresenceHandler,
onPricing: pricingHandler
Poc 2 (#7) * moving protobuf into bitswap * adding block type * reworking bitswap * adding chunker * adding license header * use 1.2.6 * adding fixed size chunker * add blockstore * add iterator to chunker * more bitswap changes * rename ipfs to dagger * rename to dagger * blockstore inherits from BlockProvider * wip - add core block handling logic * smal changes * use proper block store methods * adding asynq heapqueue * wip prepare for bitswap task runner * adding `$` * adding memory store and tests * fixed chunking * extracted desicion engine from bitswap * added helper random funcs * adding testing helpers * only handle seqs * add peer events * cleanup pending blocks on blockstore event * allow nil handlers * move protobuf type helpers * allow initializing block from Cid * testing and fixes * small fixes * expose `<` * spelling * default value * spelling * pending blocks manager * adding stores manager * more tests a wip around bitswap * small changes * merge bitswap and engine for now * for now run only the new poc's tests * add a more complete ci setup * use template in map * remove p2pd * remove go * dont use asyncCheck * few small changes * adding ability to update items * adding multiple task runners * handle cancelation properly * use Result instead of throwing * wip bitswap tests * moving things around * split out engine again * add request and handlers interface * fix tests * wip - engine tests * remove unused imports * fix tests * cleanup block requesting logic * add block request tests * more block requests * add support for max heap * don't use result * use max heap & send block presence in task handler * add task handler tests * rename store to localStore * cleanup & logging * cancel task on stop * don't depend on local store for events * dont use heap queue for wants * add chronicles * fix issue with peer wants * add test for delayed block sends * remove obsolete tests * wip chunker * run all tests * add todo * misc * remove irrelevant files * removing more files * adding helpers for bitswap tests * moved bitswap file * misc * make blocks timeout longer * adjust block timeout * speedup test * compile with threads * import missing crypto * misc * disable threads for now * fix 32 bit platforms * re-enable threads support in tests
2021-02-26 00:23:22 +00:00
)
return b