Rename bitswap (#25)

* use PeerInfo in event handlers

* use CidV1 and raw multicodec as default

* add block stream abstraction

* raises defect

* adding dataset abstraction

* move blockstream into own dir

* reorg files and fix tests

* rename dataset to blockset

* wip

* wip

* adding basic test for treehash algo

* run blockset tests along with with the rest

* remove obsolete contents

* fix chunker tests

* rename bitswap and move to stores

* rename bitwsap to blockexc and move to stores

* moare project structure reorg
This commit is contained in:
Dmitriy Ryajov 2021-08-30 13:25:20 -06:00 committed by GitHub
parent b049e54d9e
commit 2fb39ca4a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 554 additions and 368 deletions

View File

@ -1,49 +0,0 @@
import pkg/chronos
import pkg/libp2p/peerinfo
import pkg/libp2p/multiaddress
import ./ipfs/p2p/switch
import ./ipfs/repo
import ./ipfs/chunking
import ./ipfs/bitswap
export peerinfo except IPFS
export multiaddress except IPFS
type
Ipfs* = ref object
repo: Repo
switch: Switch
bitswap: Bitswap
proc info*(ipfs: Ipfs): PeerInfo =
ipfs.switch.peerInfo
proc start*(_: type Ipfs, addresses: seq[MultiAddress]): Future[Ipfs] {.async.} =
let repo = Repo()
let switch = Switch.create()
let bitswap = Bitswap.start(switch, repo)
switch.peerInfo.addrs.add(addresses)
discard await switch.start()
result = Ipfs(repo: repo, switch: switch, bitswap: bitswap)
proc start*(_: type Ipfs, address: MultiAddress): Future[Ipfs] {.async.} =
result = await Ipfs.start(@[address])
proc start*(_: type Ipfs): Future[Ipfs] {.async.} =
result = await Ipfs.start(@[])
proc connect*(peer: Ipfs, info: PeerInfo) {.async.} =
await peer.bitswap.connect(info)
proc add*(peer: Ipfs, input: File): Future[Cid] {.async.} =
let obj = createObject(input)
peer.repo.store(obj)
result = obj.cid
proc get*(peer: Ipfs, identifier: Cid, output: File) {.async.} =
let obj = await peer.bitswap.retrieve(identifier)
if obj.isSome:
obj.get().writeToFile(output)
proc stop*(peer: Ipfs) {.async.} =
await peer.switch.stop()

View File

@ -4,7 +4,7 @@ description = "The hardrive for Web3"
license = "MIT" license = "MIT"
requires "nim >= 1.2.6", requires "nim >= 1.2.6",
"libp2p >= 0.0.2 & < 0.1.0", "libp2p#unstable",
"nimcrypto >= 0.4.1", "nimcrypto >= 0.4.1",
"bearssl >= 0.1.4", "bearssl >= 0.1.4",
"chronicles >= 0.7.2", "chronicles >= 0.7.2",

11
dagger/blockexchange.nim Normal file
View File

@ -0,0 +1,11 @@
import ./blockexchange/[
network,
engine,
peercontext]
import ./blockexchange/protobuf/[
blockexc,
payments,
presence]
export network, engine, peercontext, blockexc, payments, presence

BIN
dagger/blockexchange.out Executable file

Binary file not shown.

View File

@ -15,38 +15,39 @@ import pkg/chronicles
import pkg/libp2p import pkg/libp2p
import pkg/libp2p/errors import pkg/libp2p/errors
import ./protobuf/bitswap as pb
import ./protobuf/presence
import ../blocktype as bt
import ../stores/blockstore import ../stores/blockstore
import ../blocktype as bt
import ../utils/asyncheapqueue import ../utils/asyncheapqueue
import ./protobuf/blockexc
import ./protobuf/presence
import ./network import ./network
import ./pendingblocks import ./pendingblocks
import ./peercontext import ./peercontext
import ./engine/payments import ./engine/payments
export peercontext export peercontext, payments, pendingblocks
logScope: logScope:
topics = "dagger bitswap engine" topics = "dagger blockexc engine"
const const
DefaultTimeout* = 5.seconds DefaultTimeout* = 5.seconds
DefaultMaxPeersPerRequest* = 10 DefaultMaxPeersPerRequest* = 10
type type
TaskHandler* = proc(task: BitswapPeerCtx): Future[void] {.gcsafe.} TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.}
TaskScheduler* = proc(task: BitswapPeerCtx): bool {.gcsafe.} TaskScheduler* = proc(task: BlockExcPeerCtx): bool {.gcsafe.}
BitswapEngine* = ref object of RootObj BlockExcEngine* = ref object of RootObj
localStore*: BlockStore # where we localStore blocks for this instance localStore*: BlockStore # where we localStore blocks for this instance
peers*: seq[BitswapPeerCtx] # peers we're currently actively exchanging with peers*: seq[BlockExcPeerCtx] # peers we're currently actively exchanging with
wantList*: seq[Cid] # local wants list wantList*: seq[Cid] # local wants list
pendingBlocks*: PendingBlocksManager # blocks we're awaiting to be resolved pendingBlocks*: PendingBlocksManager # blocks we're awaiting to be resolved
peersPerRequest: int # max number of peers to request from peersPerRequest: int # max number of peers to request from
scheduleTask*: TaskScheduler # schedule a new task with the task runner scheduleTask*: TaskScheduler # schedule a new task with the task runner
request*: BitswapRequest # bitswap network requests request*: BlockExcRequest # block exchange network requests
wallet*: WalletRef # nitro wallet for micropayments wallet*: WalletRef # nitro wallet for micropayments
pricing*: ?Pricing # optional bandwidth pricing pricing*: ?Pricing # optional bandwidth pricing
@ -60,7 +61,7 @@ proc contains*(a: AsyncHeapQueue[Entry], b: Cid): bool =
a.anyIt( it.cid == b ) a.anyIt( it.cid == b )
proc getPeerCtx*(b: BitswapEngine, peerId: PeerID): BitswapPeerCtx = proc getPeerCtx*(b: BlockExcEngine, peerId: PeerID): BlockExcPeerCtx =
## Get the peer's context ## Get the peer's context
## ##
@ -69,7 +70,7 @@ proc getPeerCtx*(b: BitswapEngine, peerId: PeerID): BitswapPeerCtx =
return peer[0] return peer[0]
proc requestBlocks*( proc requestBlocks*(
b: BitswapEngine, b: BlockExcEngine,
cids: seq[Cid], cids: seq[Cid],
timeout = DefaultTimeout): seq[Future[bt.Block]] = timeout = DefaultTimeout): seq[Future[bt.Block]] =
## Request a block from remotes ## Request a block from remotes
@ -91,12 +92,11 @@ proc requestBlocks*(
blocks.add( blocks.add(
b.pendingBlocks.addOrAwait(c).wait(timeout)) b.pendingBlocks.addOrAwait(c).wait(timeout))
var peers = b.peers var peers = b.peers
# get the first peer with at least one (any) # get the first peer with at least one (any)
# matching cid # matching cid
var blockPeer: BitswapPeerCtx var blockPeer: BlockExcPeerCtx
for i, p in peers: for i, p in peers:
let has = cids.anyIt( let has = cids.anyIt(
it in p.peerHave it in p.peerHave
@ -125,7 +125,7 @@ proc requestBlocks*(
if peers.len == 0: if peers.len == 0:
return blocks # no peers to send wants to return blocks # no peers to send wants to
template sendWants(ctx: BitswapPeerCtx) = template sendWants(ctx: BlockExcPeerCtx) =
# just send wants # just send wants
b.request.sendWantList( b.request.sendWantList(
ctx.id, ctx.id,
@ -142,7 +142,7 @@ proc requestBlocks*(
return blocks return blocks
proc blockPresenceHandler*( proc blockPresenceHandler*(
b: BitswapEngine, b: BlockExcEngine,
peer: PeerID, peer: PeerID,
blocks: seq[BlockPresence]) = blocks: seq[BlockPresence]) =
## Handle block presence ## Handle block presence
@ -156,7 +156,7 @@ proc blockPresenceHandler*(
if presence =? Presence.init(blk): if presence =? Presence.init(blk):
peerCtx.updatePresence(presence) peerCtx.updatePresence(presence)
proc scheduleTasks(b: BitswapEngine, blocks: seq[bt.Block]) = proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) =
trace "Schedule a task for new blocks" trace "Schedule a task for new blocks"
let cids = blocks.mapIt( it.cid ) let cids = blocks.mapIt( it.cid )
@ -170,7 +170,7 @@ proc scheduleTasks(b: BitswapEngine, blocks: seq[bt.Block]) =
trace "Unable to schedule task for peer", peer = p.id trace "Unable to schedule task for peer", peer = p.id
break # do next peer break # do next peer
proc resolveBlocks*(b: BitswapEngine, blocks: seq[bt.Block]) = proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) =
## Resolve pending blocks from the pending blocks manager ## Resolve pending blocks from the pending blocks manager
## and schedule any new task to be ran ## and schedule any new task to be ran
## ##
@ -179,8 +179,8 @@ proc resolveBlocks*(b: BitswapEngine, blocks: seq[bt.Block]) =
b.pendingBlocks.resolve(blocks) b.pendingBlocks.resolve(blocks)
b.scheduleTasks(blocks) b.scheduleTasks(blocks)
proc payForBlocks(engine: BitswapEngine, proc payForBlocks(engine: BlockExcEngine,
peer: BitswapPeerCtx, peer: BlockExcPeerCtx,
blocks: seq[bt.Block]) = blocks: seq[bt.Block]) =
let sendPayment = engine.request.sendPayment let sendPayment = engine.request.sendPayment
if sendPayment.isNil: if sendPayment.isNil:
@ -191,7 +191,7 @@ proc payForBlocks(engine: BitswapEngine,
sendPayment(peer.id, payment) sendPayment(peer.id, payment)
proc blocksHandler*( proc blocksHandler*(
b: BitswapEngine, b: BlockExcEngine,
peer: PeerID, peer: PeerID,
blocks: seq[bt.Block]) = blocks: seq[bt.Block]) =
## handle incoming blocks ## handle incoming blocks
@ -206,7 +206,7 @@ proc blocksHandler*(
b.payForBlocks(peerCtx, blocks) b.payForBlocks(peerCtx, blocks)
proc wantListHandler*( proc wantListHandler*(
b: BitswapEngine, b: BlockExcEngine,
peer: PeerID, peer: PeerID,
wantList: WantList) = wantList: WantList) =
## Handle incoming want lists ## Handle incoming want lists
@ -250,14 +250,14 @@ proc wantListHandler*(
if not b.scheduleTask(peerCtx): if not b.scheduleTask(peerCtx):
trace "Unable to schedule task for peer", peer trace "Unable to schedule task for peer", peer
proc accountHandler*(engine: BitswapEngine, peer: PeerID, account: Account) = proc accountHandler*(engine: BlockExcEngine, peer: PeerID, account: Account) =
let context = engine.getPeerCtx(peer) let context = engine.getPeerCtx(peer)
if context.isNil: if context.isNil:
return return
context.account = account.some context.account = account.some
proc paymentHandler*(engine: BitswapEngine, peer: PeerId, payment: SignedState) = proc paymentHandler*(engine: BlockExcEngine, peer: PeerId, payment: SignedState) =
without context =? engine.getPeerCtx(peer).option and without context =? engine.getPeerCtx(peer).option and
account =? context.account: account =? context.account:
return return
@ -268,14 +268,14 @@ proc paymentHandler*(engine: BitswapEngine, peer: PeerId, payment: SignedState)
else: else:
context.paymentChannel = engine.wallet.acceptChannel(payment).option context.paymentChannel = engine.wallet.acceptChannel(payment).option
proc setupPeer*(b: BitswapEngine, peer: PeerID) = proc setupPeer*(b: BlockExcEngine, peer: PeerID) =
## Perform initial setup, such as want ## Perform initial setup, such as want
## list exchange ## list exchange
## ##
trace "Setting up new peer", peer trace "Setting up new peer", peer
if peer notin b.peers: if peer notin b.peers:
b.peers.add(BitswapPeerCtx( b.peers.add(BlockExcPeerCtx(
id: peer id: peer
)) ))
@ -286,7 +286,7 @@ proc setupPeer*(b: BitswapEngine, peer: PeerID) =
if address =? b.pricing.?address: if address =? b.pricing.?address:
b.request.sendAccount(peer, Account(address: address)) b.request.sendAccount(peer, Account(address: address))
proc dropPeer*(b: BitswapEngine, peer: PeerID) = proc dropPeer*(b: BlockExcEngine, peer: PeerID) =
## Cleanup disconnected peer ## Cleanup disconnected peer
## ##
@ -295,7 +295,7 @@ proc dropPeer*(b: BitswapEngine, peer: PeerID) =
# drop the peer from the peers table # drop the peer from the peers table
b.peers.keepItIf( it.id != peer ) b.peers.keepItIf( it.id != peer )
proc taskHandler*(b: BitswapEngine, task: BitswapPeerCtx) {.gcsafe, async.} = proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
trace "Handling task for peer", peer = task.id trace "Handling task for peer", peer = task.id
var wantsBlocks = newAsyncHeapQueue[Entry](queueType = QueueType.Max) var wantsBlocks = newAsyncHeapQueue[Entry](queueType = QueueType.Max)
@ -334,19 +334,19 @@ proc taskHandler*(b: BitswapEngine, task: BitswapPeerCtx) {.gcsafe, async.} =
if wants.len > 0: if wants.len > 0:
b.request.sendPresence(task.id, wants) b.request.sendPresence(task.id, wants)
proc new*( func new*(
T: type BitswapEngine, T: type BlockExcEngine,
localStore: BlockStore, localStore: BlockStore,
wallet: WalletRef, wallet: WalletRef,
request: BitswapRequest = BitswapRequest(), request: BlockExcRequest = BlockExcRequest(),
scheduleTask: TaskScheduler = nil, scheduleTask: TaskScheduler = nil,
peersPerRequest = DefaultMaxPeersPerRequest): T = peersPerRequest = DefaultMaxPeersPerRequest): T =
proc taskScheduler(task: BitswapPeerCtx): bool = proc taskScheduler(task: BlockExcPeerCtx): bool =
if not isNil(scheduleTask): if not isNil(scheduleTask):
return scheduleTask(task) return scheduleTask(task)
let b = BitswapEngine( let b = BlockExcEngine(
localStore: localStore, localStore: localStore,
pendingBlocks: PendingBlocksManager.new(), pendingBlocks: PendingBlocksManager.new(),
peersPerRequest: peersPerRequest, peersPerRequest: peersPerRequest,

View File

@ -18,7 +18,7 @@ func openLedgerChannel*(wallet: WalletRef,
asset: EthAddress): ?!ChannelId = asset: EthAddress): ?!ChannelId =
wallet.openLedgerChannel(hub, ChainId, asset, AmountPerChannel) wallet.openLedgerChannel(hub, ChainId, asset, AmountPerChannel)
func getOrOpenChannel(wallet: WalletRef, peer: BitswapPeerCtx): ?!ChannelId = func getOrOpenChannel(wallet: WalletRef, peer: BlockExcPeerCtx): ?!ChannelId =
if channel =? peer.paymentChannel: if channel =? peer.paymentChannel:
success channel success channel
elif account =? peer.account: elif account =? peer.account:
@ -29,7 +29,7 @@ func getOrOpenChannel(wallet: WalletRef, peer: BitswapPeerCtx): ?!ChannelId =
failure "no account set for peer" failure "no account set for peer"
func pay*(wallet: WalletRef, func pay*(wallet: WalletRef,
peer: BitswapPeerCtx, peer: BlockExcPeerCtx,
amount: UInt256): ?!SignedState = amount: UInt256): ?!SignedState =
if account =? peer.account: if account =? peer.account:
let asset = Asset let asset = Asset

View File

@ -13,19 +13,20 @@ import pkg/chronicles
import pkg/chronos import pkg/chronos
import pkg/libp2p import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import ../blocktype as bt import ../blocktype as bt
import ./protobuf/bitswap as pb import ./protobuf/blockexc as pb
import ./protobuf/payments import ./protobuf/payments
import ./networkpeer import ./networkpeer
export pb, networkpeer export networkpeer, payments
export payments
logScope: logScope:
topics = "dagger bitswap network" topics = "dagger blockexc network"
const Codec* = "/ipfs/bitswap/1.2.0" const Codec* = "/dagger/blockexc/1.0.0"
type type
WantListHandler* = proc(peer: PeerID, wantList: WantList) {.gcsafe.} WantListHandler* = proc(peer: PeerID, wantList: WantList) {.gcsafe.}
@ -34,7 +35,7 @@ type
AccountHandler* = proc(peer: PeerID, account: Account) {.gcsafe.} AccountHandler* = proc(peer: PeerID, account: Account) {.gcsafe.}
PaymentHandler* = proc(peer: PeerID, payment: SignedState) {.gcsafe.} PaymentHandler* = proc(peer: PeerID, payment: SignedState) {.gcsafe.}
BitswapHandlers* = object BlockExcHandlers* = object
onWantList*: WantListHandler onWantList*: WantListHandler
onBlocks*: BlocksHandler onBlocks*: BlocksHandler
onPresence*: BlockPresenceHandler onPresence*: BlockPresenceHandler
@ -55,22 +56,22 @@ type
AccountBroadcaster* = proc(peer: PeerID, account: Account) {.gcsafe.} AccountBroadcaster* = proc(peer: PeerID, account: Account) {.gcsafe.}
PaymentBroadcaster* = proc(peer: PeerID, payment: SignedState) {.gcsafe.} PaymentBroadcaster* = proc(peer: PeerID, payment: SignedState) {.gcsafe.}
BitswapRequest* = object BlockExcRequest* = object
sendWantList*: WantListBroadcaster sendWantList*: WantListBroadcaster
sendBlocks*: BlocksBroadcaster sendBlocks*: BlocksBroadcaster
sendPresence*: PresenceBroadcaster sendPresence*: PresenceBroadcaster
sendAccount*: AccountBroadcaster sendAccount*: AccountBroadcaster
sendPayment*: PaymentBroadcaster sendPayment*: PaymentBroadcaster
BitswapNetwork* = ref object of LPProtocol BlockExcNetwork* = ref object of LPProtocol
peers*: Table[PeerID, NetworkPeer] peers*: Table[PeerID, NetworkPeer]
switch*: Switch switch*: Switch
handlers*: BitswapHandlers handlers*: BlockExcHandlers
request*: BitswapRequest request*: BlockExcRequest
getConn: ConnProvider getConn: ConnProvider
proc handleWantList( proc handleWantList(
b: BitswapNetwork, b: BlockExcNetwork,
peer: NetworkPeer, peer: NetworkPeer,
list: WantList) = list: WantList) =
## Handle incoming want list ## Handle incoming want list
@ -102,7 +103,7 @@ proc makeWantList*(
WantList(entries: entries, full: full) WantList(entries: entries, full: full)
proc broadcastWantList*( proc broadcastWantList*(
b: BitswapNetwork, b: BlockExcNetwork,
id: PeerID, id: PeerID,
cids: seq[Cid], cids: seq[Cid],
priority: int32 = 0, priority: int32 = 0,
@ -128,7 +129,7 @@ proc broadcastWantList*(
asyncSpawn b.peers[id].send(Message(wantlist: wantList)) asyncSpawn b.peers[id].send(Message(wantlist: wantList))
proc handleBlocks( proc handleBlocks(
b: BitswapNetwork, b: BlockExcNetwork,
peer: NetworkPeer, peer: NetworkPeer,
blocks: seq[auto]) = blocks: seq[auto]) =
## Handle incoming blocks ## Handle incoming blocks
@ -142,9 +143,11 @@ proc handleBlocks(
var blks: seq[bt.Block] var blks: seq[bt.Block]
for blk in blocks: for blk in blocks:
when blk is pb.Block: when blk is pb.Block:
blks.add(bt.Block.new(Cid.init(blk.prefix).get(), blk.data)) if b =? bt.Block.new(Cid.init(blk.prefix).get(), blk.data):
blks.add(b)
elif blk is seq[byte]: elif blk is seq[byte]:
blks.add(bt.Block.new(Cid.init(blk).get(), blk)) if b =? bt.Block.new(Cid.init(blk).get(), blk):
blks.add(b)
else: else:
error("Invalid block type") error("Invalid block type")
@ -155,7 +158,6 @@ template makeBlocks*(
seq[pb.Block] = seq[pb.Block] =
var blks: seq[pb.Block] var blks: seq[pb.Block]
for blk in blocks: for blk in blocks:
# for now only send bitswap `1.1.0`
blks.add(pb.Block( blks.add(pb.Block(
prefix: blk.cid.data.buffer, prefix: blk.cid.data.buffer,
data: blk.data data: blk.data
@ -164,7 +166,7 @@ template makeBlocks*(
blks blks
proc broadcastBlocks*( proc broadcastBlocks*(
b: BitswapNetwork, b: BlockExcNetwork,
id: PeerID, id: PeerID,
blocks: seq[bt.Block]) = blocks: seq[bt.Block]) =
## Send blocks to remote ## Send blocks to remote
@ -177,7 +179,7 @@ proc broadcastBlocks*(
asyncSpawn b.peers[id].send(pb.Message(payload: makeBlocks(blocks))) asyncSpawn b.peers[id].send(pb.Message(payload: makeBlocks(blocks)))
proc handleBlockPresence( proc handleBlockPresence(
b: BitswapNetwork, b: BlockExcNetwork,
peer: NetworkPeer, peer: NetworkPeer,
presence: seq[BlockPresence]) = presence: seq[BlockPresence]) =
## Handle block presence ## Handle block presence
@ -190,7 +192,7 @@ proc handleBlockPresence(
b.handlers.onPresence(peer.id, presence) b.handlers.onPresence(peer.id, presence)
proc broadcastBlockPresence*( proc broadcastBlockPresence*(
b: BitswapNetwork, b: BlockExcNetwork,
id: PeerID, id: PeerID,
presence: seq[BlockPresence]) = presence: seq[BlockPresence]) =
## Send presence to remote ## Send presence to remote
@ -202,14 +204,14 @@ proc broadcastBlockPresence*(
trace "Sending presence to peer", peer = id trace "Sending presence to peer", peer = id
asyncSpawn b.peers[id].send(Message(blockPresences: presence)) asyncSpawn b.peers[id].send(Message(blockPresences: presence))
proc handleAccount(network: BitswapNetwork, proc handleAccount(network: BlockExcNetwork,
peer: NetworkPeer, peer: NetworkPeer,
account: Account) = account: Account) =
if network.handlers.onAccount.isNil: if network.handlers.onAccount.isNil:
return return
network.handlers.onAccount(peer.id, account) network.handlers.onAccount(peer.id, account)
proc broadcastAccount*(network: BitswapNetwork, proc broadcastAccount*(network: BlockExcNetwork,
id: PeerId, id: PeerId,
account: Account) = account: Account) =
if id notin network.peers: if id notin network.peers:
@ -218,7 +220,7 @@ proc broadcastAccount*(network: BitswapNetwork,
let message = Message(account: AccountMessage.init(account)) let message = Message(account: AccountMessage.init(account))
asyncSpawn network.peers[id].send(message) asyncSpawn network.peers[id].send(message)
proc broadcastPayment*(network: BitswapNetwork, proc broadcastPayment*(network: BlockExcNetwork,
id: PeerId, id: PeerId,
payment: SignedState) = payment: SignedState) =
if id notin network.peers: if id notin network.peers:
@ -227,21 +229,18 @@ proc broadcastPayment*(network: BitswapNetwork,
let message = Message(payment: StateChannelUpdate.init(payment)) let message = Message(payment: StateChannelUpdate.init(payment))
asyncSpawn network.peers[id].send(message) asyncSpawn network.peers[id].send(message)
proc handlePayment(network: BitswapNetwork, proc handlePayment(network: BlockExcNetwork,
peer: NetworkPeer, peer: NetworkPeer,
payment: SignedState) = payment: SignedState) =
if network.handlers.onPayment.isNil: if network.handlers.onPayment.isNil:
return return
network.handlers.onPayment(peer.id, payment) network.handlers.onPayment(peer.id, payment)
proc rpcHandler(b: BitswapNetwork, peer: NetworkPeer, msg: Message) {.async.} = proc rpcHandler(b: BlockExcNetwork, peer: NetworkPeer, msg: Message) {.async.} =
try: try:
if msg.wantlist.entries.len > 0: if msg.wantlist.entries.len > 0:
b.handleWantList(peer, msg.wantlist) b.handleWantList(peer, msg.wantlist)
if msg.blocks.len > 0:
b.handleBlocks(peer, msg.blocks)
if msg.payload.len > 0: if msg.payload.len > 0:
b.handleBlocks(peer, msg.payload) b.handleBlocks(peer, msg.payload)
@ -255,10 +254,10 @@ proc rpcHandler(b: BitswapNetwork, peer: NetworkPeer, msg: Message) {.async.} =
b.handlePayment(peer, payment) b.handlePayment(peer, payment)
except CatchableError as exc: except CatchableError as exc:
trace "Exception in bitswap rpc handler", exc = exc.msg trace "Exception in blockexc rpc handler", exc = exc.msg
proc getOrCreatePeer(b: BitswapNetwork, peer: PeerID): NetworkPeer = proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerID): NetworkPeer =
## Creates or retrieves a BitswapNetwork Peer ## Creates or retrieves a BlockExcNetwork Peer
## ##
if peer in b.peers: if peer in b.peers:
@ -268,7 +267,7 @@ proc getOrCreatePeer(b: BitswapNetwork, peer: PeerID): NetworkPeer =
try: try:
return await b.switch.dial(peer, Codec) return await b.switch.dial(peer, Codec)
except CatchableError as exc: except CatchableError as exc:
trace "unable to connect to bitswap peer", exc = exc.msg trace "unable to connect to blockexc peer", exc = exc.msg
if not isNil(b.getConn): if not isNil(b.getConn):
getConn = b.getConn getConn = b.getConn
@ -277,31 +276,35 @@ proc getOrCreatePeer(b: BitswapNetwork, peer: PeerID): NetworkPeer =
b.rpcHandler(p, msg) b.rpcHandler(p, msg)
# create new pubsub peer # create new pubsub peer
let bitSwapPeer = NetworkPeer.new(peer, getConn, rpcHandler) let blockExcPeer = NetworkPeer.new(peer, getConn, rpcHandler)
debug "created new bitswap peer", peer debug "created new blockexc peer", peer
b.peers[peer] = bitSwapPeer b.peers[peer] = blockExcPeer
return bitSwapPeer return blockExcPeer
proc setupPeer*(b: BitswapNetwork, peer: PeerID) = proc setupPeer*(b: BlockExcNetwork, peer: PeerID) =
## Perform initial setup, such as want ## Perform initial setup, such as want
## list exchange ## list exchange
## ##
discard b.getOrCreatePeer(peer) discard b.getOrCreatePeer(peer)
proc dropPeer*(b: BitswapNetwork, peer: PeerID) = proc dropPeer*(b: BlockExcNetwork, peer: PeerID) =
## Cleanup disconnected peer ## Cleanup disconnected peer
## ##
b.peers.del(peer) b.peers.del(peer)
method init*(b: BitswapNetwork) = method init*(b: BlockExcNetwork) =
## Perform protocol initialization ## Perform protocol initialization
## ##
proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} = proc peerEventHandler(peerInfo: PeerInfo, event: PeerEvent) {.async.} =
# TODO: temporary until libp2p moves back to PeerID
let
peerId = peerInfo.peerId
if event.kind == PeerEventKind.Joined: if event.kind == PeerEventKind.Joined:
b.setupPeer(peerId) b.setupPeer(peerId)
else: else:
@ -312,20 +315,20 @@ method init*(b: BitswapNetwork) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let peerId = conn.peerInfo.peerId let peerId = conn.peerInfo.peerId
let bitswapPeer = b.getOrCreatePeer(peerId) let blockexcPeer = b.getOrCreatePeer(peerId)
await bitswapPeer.readLoop(conn) # attach read loop await blockexcPeer.readLoop(conn) # attach read loop
b.handler = handle b.handler = handle
b.codec = Codec b.codec = Codec
proc new*( proc new*(
T: type BitswapNetwork, T: type BlockExcNetwork,
switch: Switch, switch: Switch,
connProvider: ConnProvider = nil): T = connProvider: ConnProvider = nil): T =
## Create a new BitswapNetwork instance ## Create a new BlockExcNetwork instance
## ##
let b = BitswapNetwork( let b = BlockExcNetwork(
switch: switch, switch: switch,
getConn: connProvider) getConn: connProvider)
@ -353,7 +356,7 @@ proc new*(
proc sendPayment(id: PeerID, payment: SignedState) = proc sendPayment(id: PeerID, payment: SignedState) =
b.broadcastPayment(id, payment) b.broadcastPayment(id, payment)
b.request = BitswapRequest( b.request = BlockExcRequest(
sendWantList: sendWantList, sendWantList: sendWantList,
sendBlocks: sendBlocks, sendBlocks: sendBlocks,
sendPresence: sendPresence, sendPresence: sendPresence,

View File

@ -12,10 +12,10 @@ import pkg/chronicles
import pkg/protobuf_serialization import pkg/protobuf_serialization
import pkg/libp2p import pkg/libp2p
import ./protobuf/bitswap import ./protobuf/blockexc
logScope: logScope:
topics = "dagger bitswap networkpeer" topics = "dagger blockexc networkpeer"
const MaxMessageSize = 8 * 1024 * 1024 const MaxMessageSize = 8 * 1024 * 1024
@ -43,7 +43,7 @@ proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} =
trace "Got message for peer", peer = b.id, msg trace "Got message for peer", peer = b.id, msg
await b.handler(b, msg) await b.handler(b, msg)
except CatchableError as exc: except CatchableError as exc:
trace "Exception in bitswap read loop", exc = exc.msg trace "Exception in blockexc read loop", exc = exc.msg
finally: finally:
await conn.close() await conn.close()
@ -65,7 +65,7 @@ proc send*(b: NetworkPeer, msg: Message) {.async.} =
trace "Sending message to remote", peer = b.id, msg = $msg trace "Sending message to remote", peer = b.id, msg = $msg
await conn.writeLp(Protobuf.encode(msg)) await conn.writeLp(Protobuf.encode(msg))
proc new*( func new*(
T: type NetworkPeer, T: type NetworkPeer,
peer: PeerId, peer: PeerId,
connProvider: ConnProvider, connProvider: ConnProvider,

View File

@ -4,15 +4,14 @@ import pkg/libp2p
import pkg/chronos import pkg/chronos
import pkg/nitro import pkg/nitro
import pkg/questionable import pkg/questionable
import ./protobuf/bitswap import ./protobuf/blockexc
import ./protobuf/payments import ./protobuf/payments
import ./protobuf/presence import ./protobuf/presence
export payments export payments, nitro
export nitro
type type
BitswapPeerCtx* = ref object of RootObj BlockExcPeerCtx* = ref object of RootObj
id*: PeerID id*: PeerID
peerPrices*: Table[Cid, UInt256] # remote peer have list including price peerPrices*: Table[Cid, UInt256] # remote peer have list including price
peerWants*: seq[Entry] # remote peers want lists peerWants*: seq[Entry] # remote peers want lists
@ -21,16 +20,16 @@ type
account*: ?Account # ethereum account of this peer account*: ?Account # ethereum account of this peer
paymentChannel*: ?ChannelId # payment channel id paymentChannel*: ?ChannelId # payment channel id
proc peerHave*(context: BitswapPeerCtx): seq[Cid] = proc peerHave*(context: BlockExcPeerCtx): seq[Cid] =
toSeq(context.peerPrices.keys) toSeq(context.peerPrices.keys)
proc contains*(a: openArray[BitswapPeerCtx], b: PeerID): bool = proc contains*(a: openArray[BlockExcPeerCtx], b: PeerID): bool =
## Convenience method to check for peer prepense ## Convenience method to check for peer prepense
## ##
a.anyIt( it.id == b ) a.anyIt( it.id == b )
func updatePresence*(context: BitswapPeerCtx, presence: Presence) = func updatePresence*(context: BlockExcPeerCtx, presence: Presence) =
let cid = presence.cid let cid = presence.cid
let price = presence.price let price = presence.price
@ -39,7 +38,7 @@ func updatePresence*(context: BitswapPeerCtx, presence: Presence) =
elif cid in context.peerHave and not presence.have: elif cid in context.peerHave and not presence.have:
context.peerPrices.del(cid) context.peerPrices.del(cid)
func price*(context: BitswapPeerCtx, cids: seq[Cid]): UInt256 = func price*(context: BlockExcPeerCtx, cids: seq[Cid]): UInt256 =
for cid in cids: for cid in cids:
if price =? context.peerPrices.?[cid]: if price =? context.peerPrices.?[cid]:
result += price result += price

View File

@ -16,7 +16,7 @@ import pkg/libp2p
import ../blocktype import ../blocktype
logScope: logScope:
topics = "dagger bitswap pendingblocks" topics = "dagger blockexc pendingblocks"
type type
PendingBlocksManager* = ref object of RootObj PendingBlocksManager* = ref object of RootObj
@ -67,7 +67,7 @@ proc contains*(
p: PendingBlocksManager, p: PendingBlocksManager,
cid: Cid): bool = p.pending(cid) cid: Cid): bool = p.pending(cid)
proc new*(T: type PendingBlocksManager): T = func new*(T: type PendingBlocksManager): T =
T( T(
blocks: initTable[Cid, Future[Block]]() blocks: initTable[Cid, Future[Block]]()
) )

View File

@ -1,6 +1,6 @@
syntax = "proto3"; syntax = "proto3";
package bitswap.message.pb; package blockexc.message.pb;
message Message { message Message {
@ -11,7 +11,7 @@ message Message {
} }
message Entry { message Entry {
bytes block = 1; // the block cid (cidV0 in bitswap 1.0.0, cidV1 in bitswap 1.1.0) bytes block = 1; // the block cid
int32 priority = 2; // the priority (normalized). default to 1 int32 priority = 2; // the priority (normalized). default to 1
bool cancel = 3; // whether this revokes an entry bool cancel = 3; // whether this revokes an entry
WantType wantType = 4; // Note: defaults to enum 0, ie Block WantType wantType = 4; // Note: defaults to enum 0, ie Block
@ -47,10 +47,9 @@ message Message {
} }
Wantlist wantlist = 1; Wantlist wantlist = 1;
repeated bytes blocks = 2; // used to send Blocks in bitswap 1.0.0 repeated Block payload = 2;
repeated Block payload = 3; // used to send Blocks in bitswap 1.1.0 repeated BlockPresence blockPresences = 3;
repeated BlockPresence blockPresences = 4; int32 pendingBytes = 4;
int32 pendingBytes = 5; AccountMessage account = 5;
AccountMessage account = 6; StateChannelUpdate payment = 6;
StateChannelUpdate payment = 7;
} }

View File

@ -4,7 +4,7 @@ import pkg/stint
import pkg/nitro import pkg/nitro
import pkg/questionable import pkg/questionable
import pkg/upraises import pkg/upraises
import ./bitswap import ./blockexc
export AccountMessage export AccountMessage
export StateChannelUpdate export StateChannelUpdate

View File

@ -3,7 +3,7 @@ import pkg/stint
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import pkg/upraises import pkg/upraises
import ./bitswap import ./blockexc
export questionable export questionable
export stint export stint
@ -12,7 +12,7 @@ export BlockPresenceType
upraises.push: {.upraises: [].} upraises.push: {.upraises: [].}
type type
PresenceMessage* = bitswap.BlockPresence PresenceMessage* = blockexc.BlockPresence
Presence* = object Presence* = object
cid*: Cid cid*: Cid
have*: bool have*: bool

62
dagger/blockset.nim Normal file
View File

@ -0,0 +1,62 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import ./blockstream
export blockstream
type
BlockSetRef* = ref object of BlockStreamRef
stream*: BlockStreamRef
hcodec*: MultiCodec
proc hashBytes*(mh: MultiHash): seq[byte] =
mh.data.buffer[mh.dpos..(mh.dpos + mh.size - 1)]
proc hashBytes*(b: Block): seq[byte] =
if mh =? b.cid.mhash:
return mh.hashBytes()
method nextBlock*(d: BlockSetRef): ?!Block =
d.stream.nextBlock()
proc treeHash*(d: BlockSetRef): ?!MultiHash =
var
stack: seq[seq[byte]]
while true:
let (blk1, blk2) = (d.nextBlock().option, d.nextBlock().option)
if blk1.isNone and blk2.isNone and stack.len == 1:
let res = MultiHash.digest($d.hcodec, stack[0])
if mh =? res:
return success mh
return failure($res.error)
if blk1.isSome: stack.add((!blk1).hashBytes())
if blk2.isSome: stack.add((!blk2).hashBytes())
while stack.len > 1:
let (b1, b2) = (stack.pop(), stack.pop())
let res = MultiHash.digest($d.hcodec, b1 & b2)
if mh =? res:
stack.add(mh.hashBytes())
else:
return failure($res.error)
func new*(
T: type BlockSetRef,
stream: BlockStreamRef,
hcodec: MultiCodec = multiCodec("sha2-256")): T =
T(stream: stream, hcodec: hcodec)

12
dagger/blockstream.nim Normal file
View File

@ -0,0 +1,12 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import ./blockstream/[blockstream, chunkedblockstream]
export blockstream, chunkedblockstream

View File

@ -0,0 +1,29 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import pkg/questionable
import pkg/questionable/results
import ../blocktype
export blocktype
type
BlockStreamRef* = ref object of RootObj
method nextBlock*(b: BlockStreamRef): ?!Block {.base.} =
doAssert(false, "Not implemented!")
iterator items*(b: BlockStreamRef): Block =
while true:
without blk =? b.nextBlock():
break
yield blk

View File

@ -0,0 +1,28 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import pkg/questionable
import pkg/questionable/results
import ./blockstream
import ../chunker
type
ChunkedBlockStreamRef* = ref object of BlockStreamRef
chunker*: Chunker
method nextBlock*(c: ChunkedBlockStreamRef): ?!Block =
let data: seq[byte] = c.chunker.getBytes()
if data.len > 0:
return Block.new(data)
func new*(T: type ChunkedBlockStreamRef, chunker: Chunker): T =
T(chunker: chunker)

View File

@ -7,16 +7,14 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import pkg/libp2p/multihash {.push raises: [Defect].}
import pkg/libp2p/multicodec
import pkg/libp2p/cid import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import pkg/stew/byteutils import pkg/stew/byteutils
export cid, multihash, multicodec
type type
CidDontMatchError* = object of CatchableError
Block* = object of RootObj Block* = object of RootObj
cid*: Cid cid*: Cid
data*: seq[byte] data*: seq[byte]
@ -25,31 +23,31 @@ proc `$`*(b: Block): string =
result &= "cid: " & $b.cid result &= "cid: " & $b.cid
result &= "\ndata: " & string.fromBytes(b.data) result &= "\ndata: " & string.fromBytes(b.data)
proc new*( func new*(
T: type Block,
data: openArray[byte] = [],
version = CIDv1,
hcodec = multiCodec("sha2-256"),
codec = multiCodec("raw")): ?!T =
let hash = MultiHash.digest($hcodec, data).get()
success Block(
cid: Cid.init(version, codec, hash).get(),
data: @data)
func new*(
T: type Block, T: type Block,
cid: Cid, cid: Cid,
data: openArray[byte] = [], data: openArray[byte] = [],
verify: bool = false): T = verify: bool = false): ?!T =
let b = Block.new( let res = Block.new(
data, data,
cid.cidver, cid.cidver,
cid.mhash.get().mcodec, cid.mhash.get().mcodec,
cid.mcodec cid.mcodec
) )
if verify and cid != b.cid: if b =? res:
raise newException(CidDontMatchError, if verify and cid != b.cid:
"The suplied Cid doesn't match the data!") return failure("The suplied Cid doesn't match the data!")
return b res
proc new*(
T: type Block,
data: openArray[byte] = [],
version = CIDv0,
hcodec = multiCodec("sha2-256"),
codec = multiCodec("dag-pb")): T =
let hash = MultiHash.digest($hcodec, data).get()
Block(
cid: Cid.init(version, codec, hash).get(),
data: @data)

View File

@ -7,11 +7,16 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
# TODO: This is super inneficient and merits a rewrite, but it'll do for now # TODO: This is super inneficient and needs a rewrite, but it'll do for now
{.push raises: [Defect].}
import std/sequtils import std/sequtils
import ./p2p/rng import pkg/questionable
import pkg/questionable/results
import ./rng
import ./blocktype import ./blocktype
export blocktype export blocktype
@ -21,7 +26,9 @@ const
type type
# default reader type # default reader type
Reader* = proc(data: var openArray[byte], offset: Natural = 0): int {.gcsafe, closure.} Reader* =
proc(data: var openArray[byte], offset: Natural = 0): int
{.gcsafe, closure, raises: [Defect].}
ChunkerType* {.pure.} = enum ChunkerType* {.pure.} = enum
SizedChunker SizedChunker
@ -63,7 +70,7 @@ iterator items*(c: Chunker): seq[byte] =
yield chunk yield chunk
proc new*( func new*(
T: type Chunker, T: type Chunker,
kind = ChunkerType.SizedChunker, kind = ChunkerType.SizedChunker,
reader: Reader, reader: Reader,
@ -122,11 +129,15 @@ proc newFileChunker*(
## ##
proc reader(data: var openArray[byte], offset: Natural = 0): int = proc reader(data: var openArray[byte], offset: Natural = 0): int =
return file.readBytes(data, 0, data.len) try:
return file.readBytes(data, 0, data.len)
except IOError as exc:
# TODO: revisit error handling - should this be fatal?
raise newException(Defect, exc.msg)
Chunker.new( Chunker.new(
kind = ChunkerType.SizedChunker, kind = ChunkerType.SizedChunker,
reader = reader, reader = reader,
size = file.getFileSize(), size = try: file.getFileSize() except: 0, # TODO: should do something smarter abou this
pad = pad, pad = pad,
chunkSize = chunkSize) chunkSize = chunkSize)

9
dagger/dagger.nim Normal file
View File

@ -0,0 +1,9 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.

3
dagger/stores.nim Normal file
View File

@ -0,0 +1,3 @@
import ./stores/[memorystore, blockstore, blockexchange]
export memorystore, blockstore, blockexchange

View File

@ -14,18 +14,19 @@ import pkg/chronos
import pkg/libp2p import pkg/libp2p
import pkg/libp2p/errors import pkg/libp2p/errors
import ./bitswap/protobuf/bitswap as pb import ../blocktype as bt
import ./blocktype as bt import ../utils/asyncheapqueue
import ./stores/blockstore
import ./utils/asyncheapqueue
import ./bitswap/network import ./blockstore
import ./bitswap/engine import ../blockexchange/network
import ../blockexchange/engine
import ../blockexchange/peercontext
import ../blockexchange/protobuf/blockexc as pb
export network, blockstore, asyncheapqueue, engine export blockstore, network, engine, asyncheapqueue
logScope: logScope:
topics = "dagger bitswap" topics = "dagger blockexc"
const const
DefaultTaskQueueSize = 100 DefaultTaskQueueSize = 100
@ -33,58 +34,58 @@ const
DefaultMaxRetries = 3 DefaultMaxRetries = 3
type type
Bitswap* = ref object of BlockStore BlockExc* = ref object of BlockStore
engine*: BitswapEngine # bitswap decision engine engine*: BlockExcEngine # blockexc decision engine
taskQueue*: AsyncHeapQueue[BitswapPeerCtx] # peers we're currently processing tasks for taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] # peers we're currently processing tasks for
bitswapTasks: seq[Future[void]] # future to control bitswap task blockexcTasks: seq[Future[void]] # future to control blockexc task
bitswapRunning: bool # indicates if the bitswap task is running blockexcRunning: bool # indicates if the blockexc task is running
concurrentTasks: int # number of concurrent peers we're serving at any given time concurrentTasks: int # number of concurrent peers we're serving at any given time
maxRetries: int # max number of tries for a failed block maxRetries: int # max number of tries for a failed block
taskHandler: TaskHandler # handler provided by the engine called by the runner taskHandler: TaskHandler # handler provided by the engine called by the runner
proc bitswapTaskRunner(b: Bitswap) {.async.} = proc blockexcTaskRunner(b: BlockExc) {.async.} =
## process tasks ## process tasks
## ##
while b.bitswapRunning: while b.blockexcRunning:
let peerCtx = await b.taskQueue.pop() let peerCtx = await b.taskQueue.pop()
asyncSpawn b.taskHandler(peerCtx) asyncSpawn b.taskHandler(peerCtx)
trace "Exiting bitswap task runner" trace "Exiting blockexc task runner"
proc start*(b: Bitswap) {.async.} = proc start*(b: BlockExc) {.async.} =
## Start the bitswap task ## Start the blockexc task
## ##
trace "bitswap start" trace "blockexc start"
if b.bitswapTasks.len > 0: if b.blockexcTasks.len > 0:
warn "Starting bitswap twice" warn "Starting blockexc twice"
return return
b.bitswapRunning = true b.blockexcRunning = true
for i in 0..<b.concurrentTasks: for i in 0..<b.concurrentTasks:
b.bitswapTasks.add(b.bitswapTaskRunner) b.blockexcTasks.add(b.blockexcTaskRunner)
proc stop*(b: Bitswap) {.async.} = proc stop*(b: BlockExc) {.async.} =
## Stop the bitswap bitswap ## Stop the blockexc blockexc
## ##
trace "Bitswap stop" trace "BlockExc stop"
if b.bitswapTasks.len <= 0: if b.blockexcTasks.len <= 0:
warn "Stopping bitswap without starting it" warn "Stopping blockexc without starting it"
return return
b.bitswapRunning = false b.blockexcRunning = false
for t in b.bitswapTasks: for t in b.blockexcTasks:
if not t.finished: if not t.finished:
trace "Awaiting task to stop" trace "Awaiting task to stop"
t.cancel() t.cancel()
trace "Task stopped" trace "Task stopped"
trace "Bitswap stopped" trace "BlockExc stopped"
method getBlocks*(b: Bitswap, cid: seq[Cid]): Future[seq[bt.Block]] {.async.} = method getBlocks*(b: BlockExc, cid: seq[Cid]): Future[seq[bt.Block]] {.async.} =
## Get a block from a remote peer ## Get a block from a remote peer
## ##
@ -95,45 +96,47 @@ method getBlocks*(b: Bitswap, cid: seq[Cid]): Future[seq[bt.Block]] {.async.} =
it.read it.read
) )
method putBlocks*(b: Bitswap, blocks: seq[bt.Block]) = method putBlocks*(b: BlockExc, blocks: seq[bt.Block]) =
b.engine.resolveBlocks(blocks) b.engine.resolveBlocks(blocks)
procCall BlockStore(b).putBlocks(blocks) procCall BlockStore(b).putBlocks(blocks)
proc new*( proc new*(
T: type Bitswap, T: type BlockExc,
localStore: BlockStore, localStore: BlockStore,
wallet: WalletRef, wallet: WalletRef,
network: BitswapNetwork, network: BlockExcNetwork,
concurrentTasks = DefaultConcurrentTasks, concurrentTasks = DefaultConcurrentTasks,
maxRetries = DefaultMaxRetries, maxRetries = DefaultMaxRetries,
peersPerRequest = DefaultMaxPeersPerRequest): T = peersPerRequest = DefaultMaxPeersPerRequest): T =
let engine = BitswapEngine.new( let engine = BlockExcEngine.new(
localStore = localStore, localStore = localStore,
wallet = wallet, wallet = wallet,
peersPerRequest = peersPerRequest, peersPerRequest = peersPerRequest,
request = network.request, request = network.request,
) )
let b = Bitswap( let b = BlockExc(
engine: engine, engine: engine,
taskQueue: newAsyncHeapQueue[BitswapPeerCtx](DefaultTaskQueueSize), taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize),
concurrentTasks: concurrentTasks, concurrentTasks: concurrentTasks,
maxRetries: maxRetries, maxRetries: maxRetries,
) )
# attach engine's task handler # attach engine's task handler
b.taskHandler = proc(task: BitswapPeerCtx): b.taskHandler = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.} =
Future[void] {.gcsafe.} =
engine.taskHandler(task) engine.taskHandler(task)
# attach task scheduler to engine # attach task scheduler to engine
engine.scheduleTask = proc(task: BitswapPeerCtx): engine.scheduleTask = proc(task: BlockExcPeerCtx): bool {.gcsafe} =
bool {.gcsafe} =
b.taskQueue.pushOrUpdateNoWait(task).isOk() b.taskQueue.pushOrUpdateNoWait(task).isOk()
proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} = proc peerEventHandler(peerInfo: PeerInfo, event: PeerEvent) {.async.} =
# TODO: temporary until libp2p moves back to PeerID
let
peerId = peerInfo.peerId
if event.kind == PeerEventKind.Joined: if event.kind == PeerEventKind.Joined:
b.engine.setupPeer(peerId) b.engine.setupPeer(peerId)
else: else:
@ -163,7 +166,7 @@ proc new*(
proc paymentHandler(peer: PeerId, payment: SignedState) = proc paymentHandler(peer: PeerId, payment: SignedState) =
engine.paymentHandler(peer, payment) engine.paymentHandler(peer, payment)
network.handlers = BitswapHandlers( network.handlers = BlockExcHandlers(
onWantList: blockWantListHandler, onWantList: blockWantListHandler,
onBlocks: blocksHandler, onBlocks: blocksHandler,
onPresence: blockPresenceHandler, onPresence: blockPresenceHandler,

View File

@ -8,10 +8,13 @@
## those terms. ## those terms.
import std/sequtils import std/sequtils
import chronos
import pkg/chronos
import pkg/libp2p
import ../blocktype import ../blocktype
export blocktype export blocktype, libp2p
type type
ChangeType* {.pure.} = enum ChangeType* {.pure.} = enum

View File

@ -54,7 +54,7 @@ method delBlocks*(s: MemoryStore, cids: seq[Cid]) =
procCall BlockStore(s).delBlocks(cids) procCall BlockStore(s).delBlocks(cids)
proc new*(T: type MemoryStore, blocks: openArray[Block] = []): MemoryStore = func new*(T: type MemoryStore, blocks: openArray[Block] = []): MemoryStore =
MemoryStore( MemoryStore(
blocks: @blocks blocks: @blocks
) )

View File

@ -1 +1 @@
-d:"chronicles_enabled=off" # disable logging by default -d:"chronicles_enabled=on" # disable logging by default

View File

@ -1,5 +1,6 @@
import std/unittest import std/unittest
import pkg/dagger/bitswap/engine/payments
import pkg/dagger/stores
import ../../examples import ../../examples
suite "engine payments": suite "engine payments":
@ -8,11 +9,11 @@ suite "engine payments":
let amount = 42.u256 let amount = 42.u256
var wallet: WalletRef var wallet: WalletRef
var peer: BitswapPeerCtx var peer: BlockExcPeerCtx
setup: setup:
wallet = WalletRef.example wallet = WalletRef.example
peer = BitswapPeerCtx.example peer = BlockExcPeerCtx.example
peer.account = Account(address: address).some peer.account = Account(address: address).some
test "pays for received blocks": test "pays for received blocks":

View File

@ -2,7 +2,7 @@ import pkg/asynctest
import pkg/chronos import pkg/chronos
import pkg/stew/byteutils import pkg/stew/byteutils
import ../../examples import ../../examples
import ../../../../dagger/bitswap/protobuf/payments import pkg/dagger/stores
suite "account protobuf messages": suite "account protobuf messages":

View File

@ -2,7 +2,8 @@ import std/sequtils
import pkg/asynctest import pkg/asynctest
import pkg/chronos import pkg/chronos
import pkg/libp2p import pkg/libp2p
import ../../../../dagger/bitswap/protobuf/presence
import pkg/dagger/blockexchange/protobuf/presence
import ../../examples import ../../examples
suite "block presence protobuf messages": suite "block presence protobuf messages":

View File

@ -7,25 +7,23 @@ import pkg/chronos
import pkg/libp2p import pkg/libp2p
import pkg/libp2p/errors import pkg/libp2p/errors
import pkg/dagger/p2p/rng import pkg/dagger/rng
import pkg/dagger/bitswap import pkg/dagger/blockexchange
import pkg/dagger/bitswap/pendingblocks import pkg/dagger/stores
import pkg/dagger/bitswap/engine/payments
import pkg/dagger/bitswap/protobuf/presence
import pkg/dagger/stores/memorystore
import pkg/dagger/chunker import pkg/dagger/chunker
import pkg/dagger/blocktype as bt import pkg/dagger/blocktype as bt
import pkg/dagger/utils/asyncheapqueue
import ../helpers import ../helpers
import ../examples import ../examples
suite "Bitswap engine basic": suite "BlockExc engine basic":
let let
rng = Rng.instance() rng = Rng.instance()
seckey = PrivateKey.random(rng[]).tryGet() seckey = PrivateKey.random(rng[]).tryGet()
peerId = PeerID.init(seckey.getKey().tryGet()).tryGet() peerId = PeerID.init(seckey.getKey().tryGet()).tryGet()
chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256) chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
blocks = chunker.mapIt( bt.Block.new(it) ) blocks = chunker.mapIt( !bt.Block.new(it) )
wallet = WalletRef.example wallet = WalletRef.example
var var
@ -47,11 +45,11 @@ suite "Bitswap engine basic":
done.complete() done.complete()
let request = BitswapRequest( let request = BlockExcRequest(
sendWantList: sendWantList, sendWantList: sendWantList,
) )
let engine = BitswapEngine.new(MemoryStore.new(blocks), wallet, request) let engine = BlockExcEngine.new(MemoryStore.new(blocks), wallet, request)
engine.wantList = blocks.mapIt( it.cid ) engine.wantList = blocks.mapIt( it.cid )
engine.setupPeer(peerId) engine.setupPeer(peerId)
@ -64,39 +62,38 @@ suite "Bitswap engine basic":
check account.address == pricing.address check account.address == pricing.address
done.complete() done.complete()
let request = BitswapRequest(sendAccount: sendAccount) let request = BlockExcRequest(sendAccount: sendAccount)
let engine = BitswapEngine.new(MemoryStore.new, wallet, request) let engine = BlockExcEngine.new(MemoryStore.new, wallet, request)
engine.pricing = pricing.some engine.pricing = pricing.some
engine.setupPeer(peerId) engine.setupPeer(peerId)
await done.wait(100.millis) await done.wait(100.millis)
suite "Bitswap engine handlers": suite "BlockExc engine handlers":
let let
rng = Rng.instance() rng = Rng.instance()
seckey = PrivateKey.random(rng[]).tryGet() seckey = PrivateKey.random(rng[]).tryGet()
peerId = PeerID.init(seckey.getKey().tryGet()).tryGet() peerId = PeerID.init(seckey.getKey().tryGet()).tryGet()
chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256) chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
blocks = chunker.mapIt( bt.Block.new(it) ) blocks = chunker.mapIt( !bt.Block.new(it) )
wallet = WalletRef.example wallet = WalletRef.example
var var
engine: BitswapEngine engine: BlockExcEngine
peerCtx: BitswapPeerCtx peerCtx: BlockExcPeerCtx
done: Future[void] done: Future[void]
setup: setup:
done = newFuture[void]() done = newFuture[void]()
engine = BitswapEngine.new(MemoryStore.new(), wallet) engine = BlockExcEngine.new(MemoryStore.new(), wallet)
peerCtx = BitswapPeerCtx( peerCtx = BlockExcPeerCtx(
id: peerId id: peerId
) )
engine.peers.add(peerCtx) engine.peers.add(peerCtx)
test "should handle want list": test "should handle want list":
let wantList = makeWantList(blocks.mapIt( it.cid )) let wantList = makeWantList(blocks.mapIt( it.cid ))
proc taskScheduler(ctx: BitswapPeerCtx): bool = proc taskScheduler(ctx: BlockExcPeerCtx): bool =
check ctx.id == peerId check ctx.id == peerId
check ctx.peerWants.mapIt( it.cid ) == blocks.mapIt( it.cid ) check ctx.peerWants.mapIt( it.cid ) == blocks.mapIt( it.cid )
@ -117,7 +114,7 @@ suite "Bitswap engine handlers":
done.complete() done.complete()
engine.request = BitswapRequest( engine.request = BlockExcRequest(
sendPresence: sendPresence sendPresence: sendPresence
) )
@ -135,7 +132,7 @@ suite "Bitswap engine handlers":
done.complete() done.complete()
engine.request = BitswapRequest(sendPresence: sendPresence) engine.request = BlockExcRequest(sendPresence: sendPresence)
engine.localStore.putBlocks(@[blocks[0], blocks[1]]) engine.localStore.putBlocks(@[blocks[0], blocks[1]])
engine.wantListHandler(peerId, wantList) engine.wantListHandler(peerId, wantList)
@ -190,25 +187,25 @@ suite "Task Handler":
let let
rng = Rng.instance() rng = Rng.instance()
chunker = newRandomChunker(Rng.instance(), size = 2048, chunkSize = 256) chunker = newRandomChunker(Rng.instance(), size = 2048, chunkSize = 256)
blocks = chunker.mapIt( bt.Block.new(it) ) blocks = chunker.mapIt( !bt.Block.new(it) )
wallet = WalletRef.example wallet = WalletRef.example
var var
engine: BitswapEngine engine: BlockExcEngine
peersCtx: seq[BitswapPeerCtx] peersCtx: seq[BlockExcPeerCtx]
peers: seq[PeerID] peers: seq[PeerID]
done: Future[void] done: Future[void]
setup: setup:
done = newFuture[void]() done = newFuture[void]()
engine = BitswapEngine.new(MemoryStore.new(), wallet) engine = BlockExcEngine.new(MemoryStore.new(), wallet)
peersCtx = @[] peersCtx = @[]
for i in 0..3: for i in 0..3:
let seckey = PrivateKey.random(rng[]).tryGet() let seckey = PrivateKey.random(rng[]).tryGet()
peers.add(PeerID.init(seckey.getKey().tryGet()).tryGet()) peers.add(PeerID.init(seckey.getKey().tryGet()).tryGet())
peersCtx.add(BitswapPeerCtx( peersCtx.add(BlockExcPeerCtx(
id: peers[i] id: peers[i]
)) ))
@ -251,7 +248,7 @@ suite "Task Handler":
test "Should send presence": test "Should send presence":
let present = blocks let present = blocks
let missing = @[bt.Block.new("missing".toBytes)] let missing = @[!bt.Block.new("missing".toBytes)]
let price = (!engine.pricing).price let price = (!engine.pricing).price
proc sendPresence(id: PeerID, presence: seq[BlockPresence]) = proc sendPresence(id: PeerID, presence: seq[BlockPresence]) =

View File

@ -5,28 +5,28 @@ import pkg/asynctest
import pkg/chronos import pkg/chronos
import pkg/libp2p import pkg/libp2p
import pkg/libp2p/errors import pkg/libp2p/errors
import questionable
import questionable/results
import pkg/protobuf_serialization import pkg/protobuf_serialization
import pkg/dagger/stores/memorystore import pkg/dagger/rng
import pkg/dagger/bitswap/network
import pkg/dagger/bitswap/protobuf/payments
import pkg/dagger/p2p/rng
import pkg/dagger/chunker import pkg/dagger/chunker
import pkg/dagger/blocktype as bt import pkg/dagger/blocktype as bt
import pkg/dagger/blockexchange
import ../helpers import ../helpers
import ../examples import ../examples
suite "Bitswap network": suite "BlockExc network":
let let
rng = Rng.instance() rng = Rng.instance()
seckey = PrivateKey.random(rng[]).tryGet() seckey = PrivateKey.random(rng[]).tryGet()
peerId = PeerID.init(seckey.getKey().tryGet()).tryGet() peerId = PeerID.init(seckey.getKey().tryGet()).tryGet()
chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256) chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
blocks = chunker.mapIt( bt.Block.new(it) ) blocks = chunker.mapIt( !bt.Block.new(it) )
var var
network: BitswapNetwork network: BlockExcNetwork
networkPeer: NetworkPeer networkPeer: NetworkPeer
buffer: BufferStream buffer: BufferStream
done: Future[void] done: Future[void]
@ -37,7 +37,7 @@ suite "Bitswap network":
setup: setup:
done = newFuture[void]() done = newFuture[void]()
buffer = newBufferStream() buffer = newBufferStream()
network = BitswapNetwork.new( network = BlockExcNetwork.new(
switch = newStandardSwitch(), switch = newStandardSwitch(),
connProvider = getConn) connProvider = getConn)
network.setupPeer(peerId) network.setupPeer(peerId)
@ -131,14 +131,14 @@ suite "Bitswap network":
await done.wait(100.millis) await done.wait(100.millis)
suite "Bitswap Network - e2e": suite "BlockExc Network - e2e":
let let
chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256) chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
blocks = chunker.mapIt( bt.Block.new(it) ) blocks = chunker.mapIt( !bt.Block.new(it) )
var var
switch1, switch2: Switch switch1, switch2: Switch
network1, network2: BitswapNetwork network1, network2: BlockExcNetwork
awaiters: seq[Future[void]] awaiters: seq[Future[void]]
done: Future[void] done: Future[void]
@ -149,11 +149,11 @@ suite "Bitswap Network - e2e":
awaiters.add(await switch1.start()) awaiters.add(await switch1.start())
awaiters.add(await switch2.start()) awaiters.add(await switch2.start())
network1 = BitswapNetwork.new( network1 = BlockExcNetwork.new(
switch = switch1) switch = switch1)
switch1.mount(network1) switch1.mount(network1)
network2 = BitswapNetwork.new( network2 = BlockExcNetwork.new(
switch = switch2) switch = switch2)
switch2.mount(network2) switch2.mount(network2)

View File

@ -2,10 +2,8 @@ import std/random
import std/sequtils import std/sequtils
import pkg/libp2p import pkg/libp2p
import pkg/nitro import pkg/nitro
import pkg/dagger/p2p/rng import pkg/dagger/rng
import pkg/dagger/bitswap/protobuf/payments import pkg/dagger/stores
import pkg/dagger/bitswap/peercontext
import pkg/dagger/bitswap/engine
import pkg/dagger/blocktype import pkg/dagger/blocktype
proc example*(_: type EthAddress): EthAddress = proc example*(_: type EthAddress): EthAddress =
@ -44,14 +42,14 @@ proc example*(_: type Pricing): Pricing =
proc example*(_: type Block): Block = proc example*(_: type Block): Block =
let length = rand(4096) let length = rand(4096)
let bytes = newSeqWith(length, rand(uint8)) let bytes = newSeqWith(length, rand(uint8))
Block.new(bytes) !Block.new(bytes)
proc example*(_: type PeerId): PeerID = proc example*(_: type PeerId): PeerID =
let key = PrivateKey.random(Rng.instance[]).get let key = PrivateKey.random(Rng.instance[]).get
PeerId.init(key.getKey().get).get PeerId.init(key.getKey().get).get
proc example*(_: type BitswapPeerCtx): BitswapPeerCtx = proc example*(_: type BlockExcPeerCtx): BlockExcPeerCtx =
BitswapPeerCtx(id: PeerID.example) BlockExcPeerCtx(id: PeerID.example)
proc example*(_: type Cid): Cid = proc example*(_: type Cid): Cid =
Block.example.cid Block.example.cid

View File

@ -2,9 +2,22 @@ import pkg/libp2p/varint
import pkg/dagger/chunker import pkg/dagger/chunker
import pkg/dagger/blocktype import pkg/dagger/blocktype
import pkg/dagger/blockstream
import pkg/questionable
import pkg/questionable/results
export chunker export chunker
type
TestStreamProc* = proc(): ?!Block {.raises: [Defect].}
TestStream* = ref object of BlockStreamRef
handler*: TestStreamProc
method nextBlock*(b: TestStream): ?!Block =
b.handler()
proc lenPrefix*(msg: openArray[byte]): seq[byte] = proc lenPrefix*(msg: openArray[byte]): seq[byte] =
## Write `msg` with a varint-encoded length prefix ## Write `msg` with a varint-encoded length prefix
## ##

View File

@ -4,38 +4,37 @@ import std/algorithm
import pkg/asynctest import pkg/asynctest
import pkg/chronos import pkg/chronos
import pkg/stew/byteutils import pkg/stew/byteutils
import pkg/libp2p import pkg/libp2p
import pkg/libp2p/errors import pkg/libp2p/errors
import pkg/dagger/p2p/rng import pkg/dagger/rng
import pkg/dagger/bitswap import pkg/dagger/stores
import pkg/dagger/bitswap/engine/payments import pkg/dagger/blockexchange
import pkg/dagger/stores/memorystore
import pkg/dagger/chunker import pkg/dagger/chunker
import pkg/dagger/blocktype as bt import pkg/dagger/blocktype as bt
import pkg/dagger/utils/asyncheapqueue
import ./utils import ./utils
import ../helpers import ../helpers
import ../examples import ../examples
suite "Bitswap engine - 2 nodes": suite "BlockExc engine - 2 nodes":
let let
chunker1 = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256) chunker1 = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
blocks1 = chunker1.mapIt( bt.Block.new(it) ) blocks1 = chunker1.mapIt( !bt.Block.new(it) )
chunker2 = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256) chunker2 = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
blocks2 = chunker2.mapIt( bt.Block.new(it) ) blocks2 = chunker2.mapIt( !bt.Block.new(it) )
var var
switch1, switch2: Switch switch1, switch2: Switch
wallet1, wallet2: WalletRef wallet1, wallet2: WalletRef
pricing1, pricing2: Pricing pricing1, pricing2: Pricing
network1, network2: BitswapNetwork network1, network2: BlockExcNetwork
bitswap1, bitswap2: Bitswap blockexc1, blockexc2: BlockExc
awaiters: seq[Future[void]] awaiters: seq[Future[void]]
peerId1, peerId2: PeerID peerId1, peerId2: PeerID
peerCtx1, peerCtx2: BitswapPeerCtx peerCtx1, peerCtx2: BlockExcPeerCtx
done: Future[void] done: Future[void]
setup: setup:
@ -53,40 +52,40 @@ suite "Bitswap engine - 2 nodes":
peerId1 = switch1.peerInfo.peerId peerId1 = switch1.peerInfo.peerId
peerId2 = switch2.peerInfo.peerId peerId2 = switch2.peerInfo.peerId
network1 = BitswapNetwork.new(switch = switch1) network1 = BlockExcNetwork.new(switch = switch1)
bitswap1 = Bitswap.new(MemoryStore.new(blocks1), wallet1, network1) blockexc1 = BlockExc.new(MemoryStore.new(blocks1), wallet1, network1)
switch1.mount(network1) switch1.mount(network1)
network2 = BitswapNetwork.new(switch = switch2) network2 = BlockExcNetwork.new(switch = switch2)
bitswap2 = Bitswap.new(MemoryStore.new(blocks2), wallet2, network2) blockexc2 = BlockExc.new(MemoryStore.new(blocks2), wallet2, network2)
switch2.mount(network2) switch2.mount(network2)
await allFuturesThrowing( await allFuturesThrowing(
bitswap1.start(), blockexc1.start(),
bitswap2.start(), blockexc2.start(),
) )
# initialize our want lists # initialize our want lists
bitswap1.engine.wantList = blocks2.mapIt( it.cid ) blockexc1.engine.wantList = blocks2.mapIt( it.cid )
bitswap2.engine.wantList = blocks1.mapIt( it.cid ) blockexc2.engine.wantList = blocks1.mapIt( it.cid )
pricing1.address = wallet1.address pricing1.address = wallet1.address
pricing2.address = wallet2.address pricing2.address = wallet2.address
bitswap1.engine.pricing = pricing1.some blockexc1.engine.pricing = pricing1.some
bitswap2.engine.pricing = pricing2.some blockexc2.engine.pricing = pricing2.some
await switch1.connect( await switch1.connect(
switch2.peerInfo.peerId, switch2.peerInfo.peerId,
switch2.peerInfo.addrs) switch2.peerInfo.addrs)
await sleepAsync(1.seconds) # give some time to exchange lists await sleepAsync(1.seconds) # give some time to exchange lists
peerCtx2 = bitswap1.engine.getPeerCtx(peerId2) peerCtx2 = blockexc1.engine.getPeerCtx(peerId2)
peerCtx1 = bitswap2.engine.getPeerCtx(peerId1) peerCtx1 = blockexc2.engine.getPeerCtx(peerId1)
teardown: teardown:
await allFuturesThrowing( await allFuturesThrowing(
bitswap1.stop(), blockexc1.stop(),
bitswap2.stop(), blockexc2.stop(),
switch1.stop(), switch1.stop(),
switch2.stop()) switch2.stop())
@ -98,18 +97,18 @@ suite "Bitswap engine - 2 nodes":
check: check:
peerCtx1.peerHave.mapIt( $it ).sorted(cmp[string]) == peerCtx1.peerHave.mapIt( $it ).sorted(cmp[string]) ==
bitswap2.engine.wantList.mapIt( $it ).sorted(cmp[string]) blockexc2.engine.wantList.mapIt( $it ).sorted(cmp[string])
peerCtx2.peerHave.mapIt( $it ).sorted(cmp[string]) == peerCtx2.peerHave.mapIt( $it ).sorted(cmp[string]) ==
bitswap1.engine.wantList.mapIt( $it ).sorted(cmp[string]) blockexc1.engine.wantList.mapIt( $it ).sorted(cmp[string])
test "exchanges accounts on connect": test "exchanges accounts on connect":
check peerCtx1.account.?address == pricing1.address.some check peerCtx1.account.?address == pricing1.address.some
check peerCtx2.account.?address == pricing2.address.some check peerCtx2.account.?address == pricing2.address.some
test "should send want-have for block": test "should send want-have for block":
let blk = bt.Block.new("Block 1".toBytes) let blk = !bt.Block.new("Block 1".toBytes)
bitswap2.engine.localStore.putBlocks(@[blk]) blockexc2.engine.localStore.putBlocks(@[blk])
let entry = Entry( let entry = Entry(
`block`: blk.cid.data.buffer, `block`: blk.cid.data.buffer,
@ -119,58 +118,58 @@ suite "Bitswap engine - 2 nodes":
sendDontHave: false) sendDontHave: false)
peerCtx1.peerWants.add(entry) peerCtx1.peerWants.add(entry)
check bitswap2.taskQueue.pushOrUpdateNoWait(peerCtx1).isOk check blockexc2.taskQueue.pushOrUpdateNoWait(peerCtx1).isOk
await sleepAsync(100.millis) await sleepAsync(100.millis)
check bitswap1.engine.localStore.hasBlock(blk.cid) check blockexc1.engine.localStore.hasBlock(blk.cid)
test "should get blocks from remote": test "should get blocks from remote":
let blocks = await bitswap1.getBlocks(blocks2.mapIt( it.cid )) let blocks = await blockexc1.getBlocks(blocks2.mapIt( it.cid ))
check blocks == blocks2 check blocks == blocks2
test "remote should send blocks when available": test "remote should send blocks when available":
let blk = bt.Block.new("Block 1".toBytes) let blk = !bt.Block.new("Block 1".toBytes)
# should fail retrieving block from remote # should fail retrieving block from remote
check not await bitswap1.getBlocks(@[blk.cid]) check not await blockexc1.getBlocks(@[blk.cid])
.withTimeout(100.millis) # should expire .withTimeout(100.millis) # should expire
proc onBlocks(evt: BlockStoreChangeEvt) = proc onBlocks(evt: BlockStoreChangeEvt) =
check evt.cids == @[blk.cid] check evt.cids == @[blk.cid]
done.complete() done.complete()
bitswap1.engine.localStore.addChangeHandler(onBlocks, ChangeType.Added) blockexc1.engine.localStore.addChangeHandler(onBlocks, ChangeType.Added)
# first put the required block in the local store # first put the required block in the local store
bitswap2.engine.localStore.putBlocks(@[blk]) blockexc2.engine.localStore.putBlocks(@[blk])
# second trigger bitswap to resolve any pending requests # second trigger blockexc to resolve any pending requests
# for the block # for the block
bitswap2.putBlocks(@[blk]) blockexc2.putBlocks(@[blk])
await done await done
test "receives payments for blocks that were sent": test "receives payments for blocks that were sent":
let blocks = await bitswap1.getBlocks(blocks2.mapIt(it.cid)) let blocks = await blockexc1.getBlocks(blocks2.mapIt(it.cid))
await sleepAsync(100.millis) await sleepAsync(100.millis)
let channel = !peerCtx1.paymentChannel let channel = !peerCtx1.paymentChannel
check wallet2.balance(channel, Asset) > 0 check wallet2.balance(channel, Asset) > 0
suite "Bitswap - multiple nodes": suite "BlockExc - multiple nodes":
let let
chunker = newRandomChunker(Rng.instance(), size = 4096, chunkSize = 256) chunker = newRandomChunker(Rng.instance(), size = 4096, chunkSize = 256)
blocks = chunker.mapIt( bt.Block.new(it) ) blocks = chunker.mapIt( !bt.Block.new(it) )
var var
switch: seq[Switch] switch: seq[Switch]
bitswap: seq[Bitswap] blockexc: seq[BlockExc]
awaiters: seq[Future[void]] awaiters: seq[Future[void]]
setup: setup:
for e in generateNodes(5): for e in generateNodes(5):
switch.add(e.switch) switch.add(e.switch)
bitswap.add(e.bitswap) blockexc.add(e.blockexc)
await e.bitswap.start() await e.blockexc.start()
awaiters = switch.mapIt( awaiters = switch.mapIt(
(await it.start()) (await it.start())
@ -185,17 +184,17 @@ suite "Bitswap - multiple nodes":
test "should receive haves for own want list": test "should receive haves for own want list":
let let
downloader = bitswap[4] downloader = blockexc[4]
engine = downloader.engine engine = downloader.engine
# Add blocks from 1st peer to want list # Add blocks from 1st peer to want list
engine.wantList &= blocks[0..3].mapIt( it.cid ) engine.wantList &= blocks[0..3].mapIt( it.cid )
engine.wantList &= blocks[12..15].mapIt( it.cid ) engine.wantList &= blocks[12..15].mapIt( it.cid )
bitswap[0].engine.localStore.putBlocks(blocks[0..3]) blockexc[0].engine.localStore.putBlocks(blocks[0..3])
bitswap[1].engine.localStore.putBlocks(blocks[4..7]) blockexc[1].engine.localStore.putBlocks(blocks[4..7])
bitswap[2].engine.localStore.putBlocks(blocks[8..11]) blockexc[2].engine.localStore.putBlocks(blocks[8..11])
bitswap[3].engine.localStore.putBlocks(blocks[12..15]) blockexc[3].engine.localStore.putBlocks(blocks[12..15])
await connectNodes(switch) await connectNodes(switch)
@ -209,17 +208,17 @@ suite "Bitswap - multiple nodes":
test "should exchange blocks with multiple nodes": test "should exchange blocks with multiple nodes":
let let
downloader = bitswap[4] downloader = blockexc[4]
engine = downloader.engine engine = downloader.engine
# Add blocks from 1st peer to want list # Add blocks from 1st peer to want list
engine.wantList &= blocks[0..3].mapIt( it.cid ) engine.wantList &= blocks[0..3].mapIt( it.cid )
engine.wantList &= blocks[12..15].mapIt( it.cid ) engine.wantList &= blocks[12..15].mapIt( it.cid )
bitswap[0].engine.localStore.putBlocks(blocks[0..3]) blockexc[0].engine.localStore.putBlocks(blocks[0..3])
bitswap[1].engine.localStore.putBlocks(blocks[4..7]) blockexc[1].engine.localStore.putBlocks(blocks[4..7])
bitswap[2].engine.localStore.putBlocks(blocks[8..11]) blockexc[2].engine.localStore.putBlocks(blocks[8..11])
bitswap[3].engine.localStore.putBlocks(blocks[12..15]) blockexc[3].engine.localStore.putBlocks(blocks[12..15])
await connectNodes(switch) await connectNodes(switch)
let wantListBlocks = await downloader.getBlocks(blocks[0..3].mapIt( it.cid )) let wantListBlocks = await downloader.getBlocks(blocks[0..3].mapIt( it.cid ))

View File

@ -1,20 +1,22 @@
import std/sequtils import std/sequtils
import pkg/chronos import pkg/chronos
import pkg/asynctest import pkg/asynctest
import pkg/libp2p import pkg/libp2p
import pkg/stew/byteutils import pkg/stew/byteutils
import pkg/questionable
import pkg/dagger/p2p/rng import pkg/questionable/results
import pkg/dagger/rng
import pkg/dagger/stores/memorystore import pkg/dagger/stores/memorystore
import pkg/dagger/chunker import pkg/dagger/chunker
import ./helpers import ../helpers
suite "Memory Store": suite "Memory Store":
var store: MemoryStore var store: MemoryStore
var chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256) var chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
var blocks = chunker.mapIt( Block.new(it) ) var blocks = chunker.mapIt( !Block.new(it) )
setup: setup:
store = MemoryStore.new(blocks) store = MemoryStore.new(blocks)
@ -40,9 +42,9 @@ suite "Memory Store":
test "add blocks change handler": test "add blocks change handler":
let blocks = @[ let blocks = @[
Block.new("Block 1".toBytes), !Block.new("Block 1".toBytes),
Block.new("Block 2".toBytes), !Block.new("Block 2".toBytes),
Block.new("Block 3".toBytes), !Block.new("Block 3".toBytes),
] ]
var triggered = false var triggered = false
@ -59,9 +61,9 @@ suite "Memory Store":
test "add blocks change handler": test "add blocks change handler":
let blocks = @[ let blocks = @[
Block.new("Block 1".toBytes), !Block.new("Block 1".toBytes),
Block.new("Block 2".toBytes), !Block.new("Block 2".toBytes),
Block.new("Block 3".toBytes), !Block.new("Block 3".toBytes),
] ]
var triggered = false var triggered = false

View File

@ -3,8 +3,7 @@ import std/sequtils
import pkg/chronos import pkg/chronos
import pkg/libp2p import pkg/libp2p
import pkg/dagger/bitswap import pkg/dagger/stores
import pkg/dagger/stores/memorystore
import pkg/dagger/blocktype as bt import pkg/dagger/blocktype as bt
import ../examples import ../examples
@ -14,21 +13,20 @@ proc generateNodes*(
blocks: openArray[bt.Block] = [], blocks: openArray[bt.Block] = [],
secureManagers: openarray[SecureProtocol] = [ secureManagers: openarray[SecureProtocol] = [
SecureProtocol.Noise, SecureProtocol.Noise,
]): seq[tuple[switch: Switch, bitswap: Bitswap]] = ]): seq[tuple[switch: Switch, blockexc: BlockExc]] =
for i in 0..<num: for i in 0..<num:
let let
switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr}) switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
wallet = WalletRef.example wallet = WalletRef.example
network = BitswapNetwork.new(switch) network = BlockExcNetwork.new(switch)
bitswap = Bitswap.new(MemoryStore.new(blocks), wallet, network) blockexc = BlockExc.new(MemoryStore.new(blocks), wallet, network)
switch.mount(network) switch.mount(network)
# initialize our want lists # initialize our want lists
bitswap.engine.wantList = blocks.mapIt( it.cid ) blockexc.engine.wantList = blocks.mapIt( it.cid )
switch.mount(network) switch.mount(network)
result.add((switch, bitswap)) result.add((switch, blockexc))
proc connectNodes*(nodes: seq[Switch]) {.async.} = proc connectNodes*(nodes: seq[Switch]) {.async.} =
for dialer in nodes: for dialer in nodes:

View File

@ -3,7 +3,7 @@ import pkg/asynctest
import pkg/stew/results import pkg/stew/results
import pkg/dagger/utils/asyncheapqueue import pkg/dagger/utils/asyncheapqueue
import pkg/dagger/p2p/rng import pkg/dagger/rng
type type
Task* = tuple[name: string, priority: int] Task* = tuple[name: string, priority: int]

View File

@ -0,0 +1,5 @@
import ./blockexc/testengine
import ./blockexc/testnetwork
import ./blockexc/protobuf/testpayments as testprotobufpayments
import ./blockexc/protobuf/testpresence
import ./blockexc/engine/testpayments as testenginepayments

View File

@ -0,0 +1,52 @@
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/asynctest
import pkg/libp2p
import pkg/stew/byteutils as stew
import pkg/dagger/chunker
import pkg/dagger/rng
import pkg/dagger/blocktype as bt
import pkg/dagger/blockstream
import pkg/dagger/blockset
import ./helpers
suite "BlockSet":
test "Should produce valid tree hash checksum":
let
blocks = @[
!Block.new("Block 1".toBytes),
!Block.new("Block 2".toBytes),
!Block.new("Block 3".toBytes),
!Block.new("Block 4".toBytes),
!Block.new("Block 5".toBytes),
!Block.new("Block 6".toBytes),
!Block.new("Block 7".toBytes),
]
checksum = @[byte(43), 2, 105, 202, 45, 227,
178, 211, 83, 246, 56, 250, 210,
160, 210, 98, 123, 87, 139, 157,
188, 221, 252, 255, 17, 11, 79,
85, 220, 161, 238, 108]
var idx = 0
proc nextBlockHandler(): ?!Block =
let blk = if idx < blocks.len: blocks[idx] else: return
idx.inc()
return success blk
let
blockStream = TestStream(handler: nextBlockHandler)
blockSet = BlockSetRef.new(stream = blockStream)
let res = blockSet.treeHash()
check res.isOK
if h =? res:
check h.hashBytes() == checksum
return
check false

View File

@ -4,7 +4,8 @@ import pkg/dagger/chunker
suite "Chunking": suite "Chunking":
test "should return proper size chunks": test "should return proper size chunks":
proc reader(data: var openArray[byte], offset: Natural = 0): int {.gcsafe, closure.} = proc reader(data: var openArray[byte], offset: Natural = 0): int
{.gcsafe, closure, raises: [Defect].} =
let contents = "1234567890".toBytes let contents = "1234567890".toBytes
copyMem(addr data[0], unsafeAddr contents[offset], data.len) copyMem(addr data[0], unsafeAddr contents[offset], data.len)
return data.len return data.len

View File

@ -0,0 +1,2 @@
import ./stores/testblockexc
import ./stores/testblockstore

View File

@ -1,11 +1,7 @@
import ./dagger/bitswap/testbitswap import ./dagger/teststores
import ./dagger/bitswap/testengine import ./dagger/testblockexc
import ./dagger/bitswap/testnetwork
import ./dagger/bitswap/protobuf/testpayments as testprotobufpayments
import ./dagger/bitswap/protobuf/testpresence
import ./dagger/bitswap/engine/testpayments as testenginepayments
import ./dagger/testasyncheapqueue import ./dagger/testasyncheapqueue
import ./dagger/testblockstore
import ./dagger/testchunking import ./dagger/testchunking
import ./dagger/testblockset
{.warning[UnusedImport]: off.} {.warning[UnusedImport]: off.}