* moving protobuf into bitswap

* adding block type

* reworking bitswap

* adding chunker

* adding license header

* use 1.2.6

* adding fixed size chunker

* add blockstore

* add iterator to chunker

* more bitswap changes

* rename ipfs to dagger

* rename to dagger

* blockstore inherits from BlockProvider

* wip - add core block handling logic

* smal changes

* use proper block store methods

* adding asynq heapqueue

* wip prepare for bitswap task runner

* adding `$`

* adding memory store and tests

* fixed chunking

* extracted desicion engine from bitswap

* added helper random funcs

* adding testing helpers

* only handle seqs

* add peer events

* cleanup pending blocks on blockstore event

* allow nil handlers

* move protobuf type helpers

* allow initializing block from Cid

* testing and fixes

* small fixes

* expose `<`

* spelling

* default value

* spelling

* pending blocks manager

* adding stores manager

* more tests a wip around bitswap

* small changes

* merge bitswap and engine for now

* for now run only the new poc's tests

* add a more complete ci setup

* use template in map

* remove p2pd

* remove go

* dont use asyncCheck

* few small changes

* adding ability to update items

* adding multiple task runners

* handle cancelation properly

* use Result instead of throwing

* wip bitswap tests

* moving things around

* split out engine again

* add request and handlers interface

* fix tests

* wip - engine tests

* remove unused imports

* fix tests

* cleanup block requesting logic

* add block request tests

* more block requests

* add support for max heap

* don't use result

* use max heap & send block presence in task handler

* add task handler tests

* rename store to localStore

* cleanup & logging

* cancel task on stop

* don't depend on local store for events

* dont use heap queue for wants

* add chronicles

* fix issue with peer wants

* add test for delayed block sends

* remove obsolete tests

* wip chunker

* run all tests

* add todo

* misc

* remove irrelevant files

* removing more files

* adding helpers for bitswap tests

* moved bitswap file

* misc

* make blocks timeout longer

* adjust block timeout

* speedup test

* compile with threads

* import missing crypto

* misc

* disable threads for now

* fix 32 bit platforms

* re-enable threads support in tests
This commit is contained in:
Dmitriy Ryajov 2021-02-25 18:23:22 -06:00 committed by GitHub
parent 8e76ecfa9f
commit 6c92b3dc25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
56 changed files with 3166 additions and 739 deletions

173
.github/workflows/ci.yml vendored Normal file
View File

@ -0,0 +1,173 @@
name: nim-dagger CI
on: [push, pull_request]
jobs:
build:
strategy:
fail-fast: false
max-parallel: 20
matrix:
branch: [v1.2.6]
target:
# Unit tests
- os: linux
cpu: amd64
TEST_KIND: unit-tests
- os: linux
cpu: i386
TEST_KIND: unit-tests
- os: macos
cpu: amd64
TEST_KIND: unit-tests
- os: windows
cpu: i386
TEST_KIND: unit-tests
- os: windows
cpu: amd64
TEST_KIND: unit-tests
include:
- target:
os: linux
builder: ubuntu-18.04
- target:
os: macos
builder: macos-10.15
- target:
os: windows
builder: windows-2019
name: '${{ matrix.target.os }}-${{ matrix.target.cpu }} (${{ matrix.branch }})'
runs-on: ${{ matrix.builder }}
steps:
- name: Checkout nim-dagger
uses: actions/checkout@v2
with:
path: nim-dagger
submodules: true
- name: Install build dependencies (Linux i386)
if: runner.os == 'Linux' && matrix.target.cpu == 'i386'
run: |
sudo dpkg --add-architecture i386
sudo apt-fast update -qq
sudo DEBIAN_FRONTEND='noninteractive' apt-fast install \
--no-install-recommends -yq gcc-multilib g++-multilib \
libssl-dev:i386
mkdir -p external/bin
cat << EOF > external/bin/gcc
#!/bin/bash
exec $(which gcc) -m32 "\$@"
EOF
cat << EOF > external/bin/g++
#!/bin/bash
exec $(which g++) -m32 "\$@"
EOF
chmod 755 external/bin/gcc external/bin/g++
echo '${{ github.workspace }}/external/bin' >> $GITHUB_PATH
- name: Install build dependencies (Windows)
if: runner.os == 'Windows'
shell: bash
run: |
mkdir external
if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then
arch=64
else
arch=32
fi
curl -L "https://nim-lang.org/download/mingw$arch-6.3.0.7z" -o "external/mingw$arch.7z"
curl -L "https://nim-lang.org/download/windeps.zip" -o external/windeps.zip
7z x "external/mingw$arch.7z" -oexternal/
7z x external/windeps.zip -oexternal/dlls
echo '${{ github.workspace }}'"/external/mingw$arch/bin" >> $GITHUB_PATH
echo '${{ github.workspace }}'"/external/dlls" >> $GITHUB_PATH
- name: Setup environment
shell: bash
run: echo '${{ github.workspace }}/nim/bin' >> $GITHUB_PATH
- name: Get latest Nim commit hash
id: versions
shell: bash
run: |
getHash() {
git ls-remote "https://github.com/$1" "${2:-HEAD}" | cut -f 1
}
nimHash=$(getHash nim-lang/Nim '${{ matrix.branch }}')
csourcesHash=$(getHash nim-lang/csources)
echo "::set-output name=nim::$nimHash"
echo "::set-output name=csources::$csourcesHash"
- name: Restore prebuilt Nim from cache
id: nim-cache
uses: actions/cache@v1
with:
path: nim
key: "nim-${{ matrix.target.os }}-${{ matrix.target.cpu }}-${{ steps.versions.outputs.nim }}"
- name: Restore prebuilt csources from cache
if: steps.nim-cache.outputs.cache-hit != 'true'
id: csources-cache
uses: actions/cache@v1
with:
path: csources/bin
key: "csources-${{ matrix.target.os }}-${{ matrix.target.cpu }}-${{ steps.versions.outputs.csources }}"
- name: Checkout Nim csources
if: >
steps.csources-cache.outputs.cache-hit != 'true' &&
steps.nim-cache.outputs.cache-hit != 'true'
uses: actions/checkout@v2
with:
repository: nim-lang/csources
path: csources
ref: ${{ steps.versions.outputs.csources }}
- name: Checkout Nim
if: steps.nim-cache.outputs.cache-hit != 'true'
uses: actions/checkout@v2
with:
repository: nim-lang/Nim
path: nim
ref: ${{ steps.versions.outputs.nim }}
- name: Build Nim and associated tools
if: steps.nim-cache.outputs.cache-hit != 'true'
shell: bash
run: |
ncpu=
ext=
case '${{ runner.os }}' in
'Linux')
ncpu=$(nproc)
;;
'macOS')
ncpu=$(sysctl -n hw.ncpu)
;;
'Windows')
ncpu=$NUMBER_OF_PROCESSORS
ext=.exe
;;
esac
[[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1
if [[ ! -e csources/bin/nim$ext ]]; then
make -C csources -j $ncpu CC=gcc ucpu='${{ matrix.target.cpu }}'
else
echo 'Using prebuilt csources'
fi
cp -v csources/bin/nim$ext nim/bin
cd nim
nim c koch
./koch boot -d:release
./koch tools -d:release
# clean up to save cache space
rm koch
rm -rf nimcache
rm -rf dist
rm -rf .git
- name: Run nim-dagger tests
shell: bash
run: |
export UCPU="$cpu"
cd nim-dagger
nimble install -y --depsOnly
nimble test

View File

@ -1,15 +0,0 @@
name: CI
on: [push, pull_request]
jobs:
test:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macOS-latest, windows-latest]
steps:
- uses: actions/checkout@v2
- uses: iffy/install-nim@v3
- name: Test
run: nimble test -y

View File

@ -1 +1 @@
nim 1.4.2 nim 1.2.6

17
dagger.nimble Normal file
View File

@ -0,0 +1,17 @@
version = "0.1.0"
author = "Dagger Team"
description = "The hardrive for Web3"
license = "MIT"
requires "nim >= 1.2.6",
"libp2p >= 0.0.2 & < 0.1.0",
"nimcrypto >= 0.4.1",
"bearssl >= 0.1.4",
"chronicles >= 0.7.2",
"chronos >= 2.5.2",
"metrics",
"secp256k1",
"stew#head",
"protobufserialization >= 0.2.0 & < 0.3.0",
"asynctest >= 0.2.1 & < 0.3.0",
"stew"

183
dagger/bitswap.nim Normal file
View File

@ -0,0 +1,183 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/sequtils
import pkg/chronicles
import pkg/chronos
import pkg/libp2p
import pkg/libp2p/errors
import ./bitswap/protobuf/bitswap as pb
import ./blocktype as bt
import ./stores/blockstore
import ./utils/asyncheapqueue
import ./bitswap/network
import ./bitswap/engine
export network, blockstore, asyncheapqueue, engine
logScope:
topics = "dagger bitswap"
const
DefaultTaskQueueSize = 100
DefaultConcurrentTasks = 10
DefaultMaxRetries = 3
type
Bitswap* = ref object of BlockStore
engine*: BitswapEngine # bitswap decision engine
taskQueue*: AsyncHeapQueue[BitswapPeerCtx] # peers we're currently processing tasks for
bitswapTasks: seq[Future[void]] # future to control bitswap task
bitswapRunning: bool # indicates if the bitswap task is running
concurrentTasks: int # number of concurrent peers we're serving at any given time
maxRetries: int # max number of tries for a failed block
taskHandler: TaskHandler # handler provided by the engine called by the runner
proc bitswapTaskRunner(b: Bitswap) {.async.} =
## process tasks in order of least amount of
## debt ratio
##
while b.bitswapRunning:
let peerCtx = await b.taskQueue.pop()
asyncSpawn b.taskHandler(peerCtx)
trace "Exiting bitswap task runner"
proc start*(b: Bitswap) {.async.} =
## Start the bitswap task
##
trace "bitswap start"
if b.bitswapTasks.len > 0:
warn "Starting bitswap twice"
return
b.bitswapRunning = true
for i in 0..<b.concurrentTasks:
b.bitswapTasks.add(b.bitswapTaskRunner)
proc stop*(b: Bitswap) {.async.} =
## Stop the bitswap bitswap
##
trace "Bitswap stop"
if b.bitswapTasks.len <= 0:
warn "Stopping bitswap without starting it"
return
b.bitswapRunning = false
for t in b.bitswapTasks:
if not t.finished:
trace "Awaiting task to stop"
t.cancel()
trace "Task stopped"
trace "Bitswap stopped"
method getBlocks*(b: Bitswap, cid: seq[Cid]): Future[seq[bt.Block]] {.async.} =
## Get a block from a remote peer
##
let blocks = await allFinished(b.engine.requestBlocks(cid))
return blocks.filterIt(
not it.failed
).mapIt(
it.read
)
method putBlocks*(b: Bitswap, blocks: seq[bt.Block]) =
b.engine.resolveBlocks(blocks)
procCall BlockStore(b).putBlocks(blocks)
proc new*(
T: type Bitswap,
localStore: BlockStore,
network: BitswapNetwork,
concurrentTasks = DefaultConcurrentTasks,
maxRetries = DefaultMaxRetries,
peersPerRequest = DefaultMaxPeersPerRequest): T =
proc sendWantList(
id: PeerID,
cids: seq[Cid],
priority: int32 = 0,
cancel: bool = false,
wantType: WantType = WantType.wantHave,
full: bool = false,
sendDontHave: bool = false) {.gcsafe.} =
network.broadcastWantList(
id, cids, priority, cancel,
wantType, full, sendDontHave)
proc sendBlocks(id: PeerID, blocks: seq[bt.Block]) {.gcsafe.} =
network.broadcastBlocks(id, blocks)
proc sendPresence(id: PeerID, presence: seq[BlockPresence]) {.gcsafe.} =
network.broadcastBlockPresence(id, presence)
let engine = BitswapEngine.new(
localStore = localStore,
peersPerRequest = peersPerRequest,
request = network.request,
)
let b = Bitswap(
engine: engine,
taskQueue: newAsyncHeapQueue[BitswapPeerCtx](DefaultTaskQueueSize),
concurrentTasks: concurrentTasks,
maxRetries: maxRetries,
)
# attach engine's task handler
b.taskHandler = proc(task: BitswapPeerCtx):
Future[void] {.gcsafe.} =
engine.taskHandler(task)
# attach task scheduler to engine
engine.scheduleTask = proc(task: BitswapPeerCtx):
bool {.gcsafe} =
b.taskQueue.pushOrUpdateNoWait(task).isOk()
proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} =
if event.kind == PeerEventKind.Joined:
b.engine.setupPeer(peerId)
else:
b.engine.dropPeer(peerId)
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined)
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left)
proc blockWantListHandler(
peer: PeerID,
wantList: WantList) {.gcsafe.} =
engine.wantListHandler(peer, wantList)
proc blockPresenceHandler(
peer: PeerID,
presence: seq[BlockPresence]) {.gcsafe.} =
engine.blockPresenceHandler(peer, presence)
proc blocksHandler(
peer: PeerID,
blocks: seq[bt.Block]) {.gcsafe.} =
engine.blocksHandler(peer, blocks)
network.handlers = BitswapHandlers(
onWantList: blockWantListHandler,
onBlocks: blocksHandler,
onPresence: blockPresenceHandler,
)
return b

348
dagger/bitswap/engine.nim Normal file
View File

@ -0,0 +1,348 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/sequtils
import std/algorithm
import pkg/chronos
import pkg/chronicles
import pkg/libp2p
import pkg/libp2p/errors
import ./protobuf/bitswap as pb
import ../blocktype as bt
import ../stores/blockstore
import ../utils/asyncheapqueue
import ./network
import ./pendingblocks
logScope:
topics = "dagger bitswap engine"
const
DefaultTimeout* = 5.seconds
DefaultMaxPeersPerRequest* = 10
type
TaskHandler* = proc(task: BitswapPeerCtx): Future[void] {.gcsafe.}
TaskScheduler* = proc(task: BitswapPeerCtx): bool {.gcsafe.}
BitswapPeerCtx* = ref object of RootObj
id*: PeerID
peerHave*: seq[Cid] # remote peers have lists
peerWants*: seq[Entry] # remote peers want lists
bytesSent*: int # bytes sent to remote
bytesRecv*: int # bytes received from remote
exchanged*: int # times peer has exchanged with us
lastExchange*: Moment # last time peer has exchanged with us
BitswapEngine* = ref object of RootObj
localStore*: BlockStore # where we localStore blocks for this instance
peers*: seq[BitswapPeerCtx] # peers we're currently actively exchanging with
wantList*: seq[Cid] # local wants list
pendingBlocks*: PendingBlocksManager # blocks we're awaiting to be resolved
peersPerRequest: int # max number of peers to request from
scheduleTask*: TaskScheduler # schedule a new task with the task runner
request*: BitswapRequest # bitswap network requests
proc contains*(a: AsyncHeapQueue[Entry], b: Cid): bool =
## Convenience method to check for entry prepense
##
a.anyIt( it.cid == b )
proc contains*(a: openArray[BitswapPeerCtx], b: PeerID): bool =
## Convenience method to check for peer prepense
##
a.anyIt( it.id == b )
proc debtRatio*(b: BitswapPeerCtx): float =
b.bytesSent / (b.bytesRecv + 1)
proc `<`*(a, b: BitswapPeerCtx): bool =
a.debtRatio < b.debtRatio
proc getPeerCtx*(b: BitswapEngine, peerId: PeerID): BitswapPeerCtx =
## Get the peer's context
##
let peer = b.peers.filterIt( it.id == peerId )
if peer.len > 0:
return peer[0]
proc requestBlocks*(
b: BitswapEngine,
cids: seq[Cid],
timeout = DefaultTimeout): seq[Future[bt.Block]] =
## Request a block from remotes
##
# no Cids to request
if cids.len == 0:
return
if b.peers.len <= 0:
warn "No peers to request blocks from"
# TODO: run discovery here to get peers for the block
return
var blocks: seq[Future[bt.Block]]
for c in cids:
if c notin b.pendingBlocks:
# install events to await blocks incoming from different sources
blocks.add(
b.pendingBlocks.addOrAwait(c).wait(timeout))
proc cmp(a, b: BitswapPeerCtx): int =
if a.debtRatio == b.debtRatio:
0
elif a.debtRatio > b.debtRatio:
1
else:
-1
# sort the peers so that we request
# the blocks from a peer with the lowest
# debt ratio
var sortedPeers = b.peers.sorted(
cmp
)
# get the first peer with at least one (any)
# matching cid
var blockPeer: BitswapPeerCtx
for i, p in sortedPeers:
let has = cids.anyIt(
it in p.peerHave
)
if has:
blockPeer = p
break
# didn't find any peer with matching cids
# use the first one in the sorted array
if isNil(blockPeer):
blockPeer = sortedPeers[0]
sortedPeers.keepItIf(
it != blockPeer
)
trace "Requesting blocks from peer", peer = blockPeer.id, len = cids.len
# request block
b.request.sendWantList(
blockPeer.id,
cids,
wantType = WantType.wantBlock) # we want this remote to send us a block
if sortedPeers.len == 0:
return blocks # no peers to send wants to
template sendWants(ctx: BitswapPeerCtx) =
# just send wants
b.request.sendWantList(
ctx.id,
cids.filterIt( it notin ctx.peerHave ), # filter out those that we already know about
wantType = WantType.wantHave) # we only want to know if the peer has the block
# filter out the peer we've already requested from
var stop = sortedPeers.high
if stop > b.peersPerRequest: stop = b.peersPerRequest
trace "Sending want list requests to remaining peers", count = stop + 1
for p in sortedPeers[0..stop]:
sendWants(p)
return blocks
proc blockPresenceHandler*(
b: BitswapEngine,
peer: PeerID,
presence: seq[BlockPresence]) =
## Handle block presence
##
let peerCtx = b.getPeerCtx(peer)
if isNil(peerCtx):
return
for blk in presence:
let cid = Cid.init(blk.cid).get()
if cid notin peerCtx.peerHave:
if blk.type == BlockPresenceType.presenceHave:
peerCtx.peerHave.add(cid)
proc scheduleTasks(b: BitswapEngine, blocks: seq[bt.Block]) =
trace "Schedule a task for new blocks"
let cids = blocks.mapIt( it.cid )
# schedule any new peers to provide blocks to
for p in b.peers:
for c in cids: # for each cid
# schedule a peer if it wants at least one
# cid and we have it in our local store
if c in p.peerWants and c in b.localStore:
if not b.scheduleTask(p):
trace "Unable to schedule task for peer", peer = p.id
break # do next peer
proc resolveBlocks*(b: BitswapEngine, blocks: seq[bt.Block]) =
## Resolve pending blocks from the pending blocks manager
## and schedule any new task to be ran
##
trace "Resolving blocks"
b.pendingBlocks.resolve(blocks)
b.scheduleTasks(blocks)
proc blocksHandler*(
b: BitswapEngine,
peer: PeerID,
blocks: seq[bt.Block]) =
## handle incoming blocks
##
trace "Got blocks from peer", peer, len = blocks.len
b.localStore.putBlocks(blocks)
b.resolveBlocks(blocks)
proc wantListHandler*(
b: BitswapEngine,
peer: PeerID,
wantList: WantList) =
## Handle incoming want lists
##
trace "Got want list for peer", peer
let peerCtx = b.getPeerCtx(peer)
if isNil(peerCtx):
return
var dontHaves: seq[Cid]
let entries = wantList.entries
for e in entries:
let idx = peerCtx.peerWants.find(e)
if idx > -1:
# peer doesn't want this block anymore
if e.cancel:
peerCtx.peerWants.del(idx)
continue
peerCtx.peerWants[idx] = e # update entry
else:
peerCtx.peerWants.add(e)
trace "Added entry to peer's want list", peer = peerCtx.id, cid = $e.cid
# peer might want to ask for the same cid with
# different want params
if e.sendDontHave and not(b.localStore.hasBlock(e.cid)):
dontHaves.add(e.cid)
# send don't have's to remote
if dontHaves.len > 0:
b.request.sendPresence(
peer,
dontHaves.mapIt(
BlockPresence(
cid: it.data.buffer,
`type`: BlockPresenceType.presenceDontHave)))
if not b.scheduleTask(peerCtx):
trace "Unable to schedule task for peer", peer
proc setupPeer*(b: BitswapEngine, peer: PeerID) =
## Perform initial setup, such as want
## list exchange
##
trace "Setting up new peer", peer
if peer notin b.peers:
b.peers.add(BitswapPeerCtx(
id: peer
))
# broadcast our want list, the other peer will do the same
if b.wantList.len > 0:
b.request.sendWantList(peer, b.wantList, full = true)
proc dropPeer*(b: BitswapEngine, peer: PeerID) =
## Cleanup disconnected peer
##
trace "Dropping peer", peer
# drop the peer from the peers table
b.peers.keepItIf( it.id != peer )
proc taskHandler*(b: BitswapEngine, task: BitswapPeerCtx) {.gcsafe, async.} =
trace "Handling task for peer", peer = task.id
var wantsBlocks = newAsyncHeapQueue[Entry](queueType = QueueType.Max)
# get blocks and wants to send to the remote
for e in task.peerWants:
if e.wantType == WantType.wantBlock:
await wantsBlocks.push(e)
# TODO: There should be all sorts of accounting of
# bytes sent/received here
if wantsBlocks.len > 0:
let blocks = await b.localStore.getBlocks(
wantsBlocks.mapIt(
it.cid
))
if blocks.len > 0:
b.request.sendBlocks(task.id, blocks)
# Remove successfully sent blocks
task.peerWants.keepIf(
proc(e: Entry): bool =
not blocks.anyIt( it.cid == e.cid )
)
var wants: seq[BlockPresence]
# do not remove wants from the queue unless
# we send the block or get a cancel
for e in task.peerWants:
if e.wantType == WantType.wantHave:
wants.add(
BlockPresence(
cid: e.`block`,
`type`: if b.localStore.hasBlock(e.cid):
BlockPresenceType.presenceHave
else:
BlockPresenceType.presenceDontHave
))
if wants.len > 0:
b.request.sendPresence(task.id, wants)
proc new*(
T: type BitswapEngine,
localStore: BlockStore,
request: BitswapRequest = BitswapRequest(),
scheduleTask: TaskScheduler = nil,
peersPerRequest = DefaultMaxPeersPerRequest): T =
proc taskScheduler(task: BitswapPeerCtx): bool =
if not isNil(scheduleTask):
return scheduleTask(task)
let b = BitswapEngine(
localStore: localStore,
pendingBlocks: PendingBlocksManager.new(),
peersPerRequest: peersPerRequest,
scheduleTask: taskScheduler,
request: request,
)
return b

309
dagger/bitswap/network.nim Normal file
View File

@ -0,0 +1,309 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/tables
import pkg/chronicles
import pkg/chronos
import pkg/libp2p
import ../blocktype as bt
import ./protobuf/bitswap as pb
import ./networkpeer
export pb, networkpeer
logScope:
topics = "dagger bitswap network"
const Codec* = "/ipfs/bitswap/1.2.0"
type
WantListHandler* = proc(peer: PeerID, wantList: WantList) {.gcsafe.}
BlocksHandler* = proc(peer: PeerID, blocks: seq[bt.Block]) {.gcsafe.}
BlockPresenceHandler* = proc(peer: PeerID, precense: seq[BlockPresence]) {.gcsafe.}
BitswapHandlers* = object
onWantList*: WantListHandler
onBlocks*: BlocksHandler
onPresence*: BlockPresenceHandler
WantListBroadcaster* = proc(
id: PeerID,
cids: seq[Cid],
priority: int32 = 0,
cancel: bool = false,
wantType: WantType = WantType.wantHave,
full: bool = false,
sendDontHave: bool = false) {.gcsafe.}
BlocksBroadcaster* = proc(peer: PeerID, presence: seq[bt.Block]) {.gcsafe.}
PresenceBroadcaster* = proc(peer: PeerID, presence: seq[BlockPresence]) {.gcsafe.}
BitswapRequest* = object
sendWantList*: WantListBroadcaster
sendBlocks*: BlocksBroadcaster
sendPresence*: PresenceBroadcaster
BitswapNetwork* = ref object of LPProtocol
peers*: Table[PeerID, NetworkPeer]
switch*: Switch
handlers*: BitswapHandlers
request*: BitswapRequest
getConn: ConnProvider
proc handleWantList(
b: BitswapNetwork,
peer: NetworkPeer,
list: WantList) =
## Handle incoming want list
##
if isNil(b.handlers.onWantList):
return
trace "Handling want list for peer", peer = peer.id
b.handlers.onWantList(peer.id, list)
# TODO: make into a template
proc makeWantList*(
cids: seq[Cid],
priority: int = 0,
cancel: bool = false,
wantType: WantType = WantType.wantHave,
full: bool = false,
sendDontHave: bool = false): WantList =
var entries: seq[Entry]
for cid in cids:
entries.add(Entry(
`block`: cid.data.buffer,
priority: priority.int32,
cancel: cancel,
wantType: wantType,
sendDontHave: sendDontHave))
WantList(entries: entries, full: full)
proc broadcastWantList*(
b: BitswapNetwork,
id: PeerID,
cids: seq[Cid],
priority: int32 = 0,
cancel: bool = false,
wantType: WantType = WantType.wantHave,
full: bool = false,
sendDontHave: bool = false) =
## send a want message to peer
##
if id notin b.peers:
return
trace "Sending want list to peer", peer = id, `type` = $wantType, len = cids.len
let wantList = makeWantList(
cids,
priority,
cancel,
wantType,
full,
sendDontHave)
asyncSpawn b.peers[id].send(Message(wantlist: wantList))
proc handleBlocks(
b: BitswapNetwork,
peer: NetworkPeer,
blocks: seq[auto]) =
## Handle incoming blocks
##
if isNil(b.handlers.onBlocks):
return
trace "Handling blocks for peer", peer = peer.id
var blks: seq[bt.Block]
for blk in blocks:
when blk is pb.Block:
blks.add(bt.Block.new(Cid.init(blk.prefix).get(), blk.data))
elif blk is seq[byte]:
blks.add(bt.Block.new(Cid.init(blk).get(), blk))
else:
error("Invalid block type")
b.handlers.onBlocks(peer.id, blks)
template makeBlocks*(
blocks: seq[bt.Block]):
seq[pb.Block] =
var blks: seq[pb.Block]
for blk in blocks:
# for now only send bitswap `1.1.0`
blks.add(pb.Block(
prefix: blk.cid.data.buffer,
data: blk.data
))
blks
proc broadcastBlocks*(
b: BitswapNetwork,
id: PeerID,
blocks: seq[bt.Block]) =
## Send blocks to remote
##
if id notin b.peers:
return
trace "Sending blocks to peer", peer = id, len = blocks.len
asyncSpawn b.peers[id].send(pb.Message(payload: makeBlocks(blocks)))
proc handleBlockPresence(
b: BitswapNetwork,
peer: NetworkPeer,
presence: seq[BlockPresence]) =
## Handle block presence
##
if isNil(b.handlers.onPresence):
return
trace "Handling block presence for peer", peer = peer.id
b.handlers.onPresence(peer.id, presence)
proc broadcastBlockPresence*(
b: BitswapNetwork,
id: PeerID,
presence: seq[BlockPresence]) =
## Send presence to remote
##
if id notin b.peers:
return
trace "Sending presence to peer", peer = id
asyncSpawn b.peers[id].send(Message(blockPresences: presence))
proc rpcHandler(b: BitswapNetwork, peer: NetworkPeer, msg: Message) {.async.} =
try:
if msg.wantlist.entries.len > 0:
b.handleWantList(peer, msg.wantlist)
if msg.blocks.len > 0:
b.handleBlocks(peer, msg.blocks)
if msg.payload.len > 0:
b.handleBlocks(peer, msg.payload)
if msg.blockPresences.len > 0:
b.handleBlockPresence(peer, msg.blockPresences)
except CatchableError as exc:
trace "Exception in bitswap rpc handler", exc = exc.msg
proc getOrCreatePeer(b: BitswapNetwork, peer: PeerID): NetworkPeer =
## Creates or retrieves a BitswapNetwork Peer
##
if peer in b.peers:
return b.peers[peer]
var getConn = proc(): Future[Connection] {.async.} =
try:
return await b.switch.dial(peer, Codec)
except CatchableError as exc:
trace "unable to connect to bitswap peer", exc = exc.msg
if not isNil(b.getConn):
getConn = b.getConn
let rpcHandler = proc (p: NetworkPeer, msg: Message): Future[void] =
b.rpcHandler(p, msg)
# create new pubsub peer
let bitSwapPeer = NetworkPeer.new(peer, getConn, rpcHandler)
debug "created new bitswap peer", peer
b.peers[peer] = bitSwapPeer
return bitSwapPeer
proc setupPeer*(b: BitswapNetwork, peer: PeerID) =
## Perform initial setup, such as want
## list exchange
##
discard b.getOrCreatePeer(peer)
proc dropPeer*(b: BitswapNetwork, peer: PeerID) =
## Cleanup disconnected peer
##
b.peers.del(peer)
method init*(b: BitswapNetwork) =
## Perform protocol initialization
##
proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} =
if event.kind == PeerEventKind.Joined:
b.setupPeer(peerId)
else:
b.dropPeer(peerId)
b.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined)
b.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left)
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let peerId = conn.peerInfo.peerId
let bitswapPeer = b.getOrCreatePeer(peerId)
await bitswapPeer.readLoop(conn) # attach read loop
b.handler = handle
b.codec = Codec
proc new*(
T: type BitswapNetwork,
switch: Switch,
connProvider: ConnProvider = nil): T =
## Create a new BitswapNetwork instance
##
let b = BitswapNetwork(
switch: switch,
getConn: connProvider)
proc sendWantList(
id: PeerID,
cids: seq[Cid],
priority: int32 = 0,
cancel: bool = false,
wantType: WantType = WantType.wantHave,
full: bool = false,
sendDontHave: bool = false) {.gcsafe.} =
b.broadcastWantList(
id, cids, priority, cancel,
wantType, full, sendDontHave)
proc sendBlocks(id: PeerID, blocks: seq[bt.Block]) {.gcsafe.} =
b.broadcastBlocks(id, blocks)
proc sendPresence(id: PeerID, presence: seq[BlockPresence]) {.gcsafe.} =
b.broadcastBlockPresence(id, presence)
b.request = BitswapRequest(
sendWantList: sendWantList,
sendBlocks: sendBlocks,
sendPresence: sendPresence,
)
b.init()
return b

View File

@ -0,0 +1,80 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/chronos
import pkg/chronicles
import pkg/protobuf_serialization
import pkg/libp2p
import ./protobuf/bitswap
logScope:
topics = "dagger bitswap networkpeer"
const MaxMessageSize = 8 * 1024 * 1024
type
RPCHandler* = proc(peer: NetworkPeer, msg: Message): Future[void] {.gcsafe.}
NetworkPeer* = ref object of RootObj
id*: PeerId
handler*: RPCHandler
sendConn: Connection
getConn: ConnProvider
proc connected*(b: NetworkPeer): bool =
not(isNil(b.sendConn)) and
not(b.sendConn.closed or b.sendConn.atEof)
proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} =
if isNil(conn):
return
try:
while not conn.atEof:
let data = await conn.readLp(MaxMessageSize)
let msg: Message = Protobuf.decode(data, Message)
trace "Got message for peer", peer = b.id, msg
await b.handler(b, msg)
except CatchableError as exc:
trace "Exception in bitswap read loop", exc = exc.msg
finally:
await conn.close()
proc connect*(b: NetworkPeer): Future[Connection] {.async.} =
if b.connected:
return b.sendConn
b.sendConn = await b.getConn()
asyncSpawn b.readLoop(b.sendConn)
return b.sendConn
proc send*(b: NetworkPeer, msg: Message) {.async.} =
let conn = await b.connect()
if isNil(conn):
trace "Unable to get send connection for peer message not sent", peer = b.id
return
trace "Sending message to remote", peer = b.id, msg = $msg
await conn.writeLp(Protobuf.encode(msg))
proc new*(
T: type NetworkPeer,
peer: PeerId,
connProvider: ConnProvider,
rpcHandler: RPCHandler): T =
doAssert(not isNil(connProvider),
"should supply connection provider")
NetworkPeer(
id: peer,
getConn: connProvider,
handler: rpcHandler)

View File

@ -0,0 +1,73 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/tables
import pkg/chronicles
import pkg/chronos
import pkg/libp2p
import ../blocktype
logScope:
topics = "dagger bitswap pendingblocks"
type
PendingBlocksManager* = ref object of RootObj
blocks*: Table[Cid, Future[Block]] # pending Block requests
proc addOrAwait*(
p: PendingBlocksManager,
cid: Cid):
Future[Block] {.async.} =
## Add an event for a block
##
if cid notin p.blocks:
p.blocks[cid] = newFuture[Block]()
trace "Adding pending future for block", cid
let blk = p.blocks[cid]
try:
return await blk
except CancelledError as exc:
trace "Blocks cancelled", exc = exc.msg, cid
raise exc
except CatchableError as exc:
trace "Pending WANT failed or expired", exc = exc.msg
finally:
p.blocks.del(cid)
proc resolve*(
p: PendingBlocksManager,
blocks: seq[Block]) =
## Resolve pending blocks
##
for blk in blocks:
# resolve any pending blocks
if blk.cid in p.blocks:
let pending = p.blocks[blk.cid]
if not pending.finished:
trace "Resolving block", cid = $blk.cid
pending.complete(blk)
p.blocks.del(blk.cid)
proc pending*(
p: PendingBlocksManager,
cid: Cid): bool = cid in p.blocks
proc contains*(
p: PendingBlocksManager,
cid: Cid): bool = p.pending(cid)
proc new*(T: type PendingBlocksManager): T =
T(
blocks: initTable[Cid, Future[Block]]()
)

View File

@ -0,0 +1,55 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/hashes
import std/sequtils
import pkg/protobuf_serialization
import pkg/libp2p
import_proto3 "message.proto"
export Message
export Wantlist, WantType, Entry
export Block, BlockPresenceType, BlockPresence
proc hash*(e: Entry): Hash =
hash(e.`block`)
proc cid*(e: Entry): Cid =
## Helper to convert raw bytes to Cid
##
Cid.init(e.`block`).get()
proc contains*(a: openArray[Entry], b: Cid): bool =
## Convenience method to check for peer precense
##
a.filterIt( it.cid == b ).len > 0
proc `==`*(a: Entry, cid: Cid): bool =
return a.cid == cid
proc `<`*(a, b: Entry): bool =
a.priority < b.priority
proc cid*(e: BlockPresence): Cid =
## Helper to convert raw bytes to Cid
##
Cid.init(e.cid).get()
proc `==`*(a: BlockPresence, cid: Cid): bool =
return cid(a) == cid
proc contains*(a: openArray[BlockPresence], b: Cid): bool =
## Convenience method to check for peer precense
##
a.filterIt( cid(it) == b ).len > 0

View File

@ -0,0 +1,45 @@
syntax = "proto3";
package bitswap.message.pb;
message Message {
message Wantlist {
enum WantType {
wantBlock = 0;
wantHave = 1;
}
message Entry {
bytes block = 1; // the block cid (cidV0 in bitswap 1.0.0, cidV1 in bitswap 1.1.0)
int32 priority = 2; // the priority (normalized). default to 1
bool cancel = 3; // whether this revokes an entry
WantType wantType = 4; // Note: defaults to enum 0, ie Block
bool sendDontHave = 5; // Note: defaults to false
}
repeated Entry entries = 1; // a list of wantlist entries
bool full = 2; // whether this is the full wantlist. default to false
}
message Block {
bytes prefix = 1; // CID prefix (cid version, multicodec and multihash prefix (type + length)
bytes data = 2;
}
enum BlockPresenceType {
presenceHave = 0;
presenceDontHave = 1;
}
message BlockPresence {
bytes cid = 1;
BlockPresenceType type = 2;
}
Wantlist wantlist = 1;
repeated bytes blocks = 2; // used to send Blocks in bitswap 1.0.0
repeated Block payload = 3; // used to send Blocks in bitswap 1.1.0
repeated BlockPresence blockPresences = 4;
int32 pendingBytes = 5;
}

55
dagger/blocktype.nim Normal file
View File

@ -0,0 +1,55 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/libp2p/multihash
import pkg/libp2p/multicodec
import pkg/libp2p/cid
import pkg/stew/byteutils
export cid, multihash, multicodec
type
CidDontMatchError* = object of CatchableError
Block* = object of RootObj
cid*: Cid
data*: seq[byte]
proc `$`*(b: Block): string =
result &= "cid: " & $b.cid
result &= "\ndata: " & string.fromBytes(b.data)
proc new*(
T: type Block,
cid: Cid,
data: openArray[byte] = [],
verify: bool = false): T =
let b = Block.new(
data,
cid.cidver,
cid.mhash.get().mcodec,
cid.mcodec
)
if verify and cid != b.cid:
raise newException(CidDontMatchError,
"The suplied Cid doesn't match the data!")
return b
proc new*(
T: type Block,
data: openArray[byte] = [],
version = CIDv0,
hcodec = multiCodec("sha2-256"),
codec = multiCodec("dag-pb")): T =
let hash = MultiHash.digest($hcodec, data).get()
Block(
cid: Cid.init(version, codec, hash).get(),
data: @data)

132
dagger/chunker.nim Normal file
View File

@ -0,0 +1,132 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
# TODO: This is super inneficient and merits a rewrite, but it'll do for now
import std/sequtils
import ./p2p/rng
import ./blocktype
export blocktype
const
DefaultChunkSize*: int64 = 1024 * 256
type
# default reader type
Reader* = proc(data: var openArray[byte], offset: Natural = 0): int {.gcsafe, closure.}
ChunkerType* {.pure.} = enum
SizedChunker
RabinChunker
Chunker* = ref object of RootObj
reader*: Reader
size*: Natural
pos*: Natural
case kind*: ChunkerType:
of SizedChunker:
chunkSize*: Natural
pad*: bool # pad last block if less than size
of RabinChunker:
discard
proc getBytes*(c: Chunker): seq[byte] =
## returns a chunk of bytes from
## the instantiated chunker
##
if c.pos >= c.size:
return
var bytes = newSeq[byte](c.chunkSize)
let read = c.reader(bytes, c.pos)
c.pos += read
if not c.pad and bytes.len != read:
bytes.setLen(read)
return bytes
iterator items*(c: Chunker): seq[byte] =
while true:
let chunk = c.getBytes()
if chunk.len <= 0:
break
yield chunk
proc new*(
T: type Chunker,
kind = ChunkerType.SizedChunker,
reader: Reader,
size: Natural,
chunkSize = DefaultChunkSize,
pad = false): T =
var chunker = Chunker(
kind: kind,
reader: reader,
size: size)
if kind == ChunkerType.SizedChunker:
chunker.pad = pad
chunker.chunkSize = chunkSize
return chunker
proc newRandomChunker*(
rng: Rng,
size: int64,
kind = ChunkerType.SizedChunker,
chunkSize = DefaultChunkSize,
pad = false): Chunker =
## create a chunker that produces
## random data
##
proc reader(data: var openArray[byte], offset: Natural = 0): int =
var alpha = toSeq(byte('A')..byte('z'))
var read = 0
while read <= data.high:
rng.shuffle(alpha)
for a in alpha:
if read > data.high:
break
data[read] = a
read.inc
return read
Chunker.new(
kind = ChunkerType.SizedChunker,
reader = reader,
size = size,
pad = pad,
chunkSize = chunkSize)
proc newFileChunker*(
file: File,
kind = ChunkerType.SizedChunker,
chunkSize = DefaultChunkSize,
pad = false): Chunker =
## create the default File chunker
##
proc reader(data: var openArray[byte], offset: Natural = 0): int =
return file.readBytes(data, 0, data.len)
Chunker.new(
kind = ChunkerType.SizedChunker,
reader = reader,
size = file.getFileSize(),
pad = pad,
chunkSize = chunkSize)

42
dagger/p2p/rng.nim Normal file
View File

@ -0,0 +1,42 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/libp2p/crypto/crypto
import pkg/bearssl
type
Rng* = ref BrHmacDrbgContext
var rng {.threadvar.}: Rng
proc instance*(t: type Rng): Rng =
if rng.isNil:
rng = newRng()
rng
# Random helpers: similar as in stdlib, but with BrHmacDrbgContext rng
# TODO: Move these somewhere else?
const randMax = 18_446_744_073_709_551_615'u64
proc rand*(rng: Rng, max: Natural): int =
if max == 0: return 0
var x: uint64
while true:
brHmacDrbgGenerate(addr rng[], addr x, csize_t(sizeof(x)))
if x < randMax - (randMax mod (uint64(max) + 1'u64)): # against modulo bias
return int(x mod (uint64(max) + 1'u64))
proc sample*[T](rng: Rng, a: openArray[T]): T =
result = a[rng.rand(a.high)]
proc shuffle*[T](rng: Rng, a: var openArray[T]) =
for i in countdown(a.high, 1):
let j = rng.rand(i)
swap(a[i], a[j])

View File

@ -0,0 +1,79 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/sets
import std/sequtils
import chronos
import ../blocktype
export blocktype
type
ChangeType* {.pure.} = enum
Added, Removed
BlockStoreChangeEvt* = object
cids*: seq[Cid]
kind*: ChangeType
BlocksChangeHandler* = proc(evt: BlockStoreChangeEvt) {.gcsafe, closure.}
BlockStore* = ref object of RootObj
changeHandlers: array[ChangeType, seq[BlocksChangeHandler]]
proc addChangeHandler*(
s: BlockStore,
handler: BlocksChangeHandler,
changeType: ChangeType) =
s.changeHandlers[changeType].add(handler)
proc removeChangeHandler*(
s: BlockStore,
handler: BlocksChangeHandler,
changeType: ChangeType) =
s.changeHandlers[changeType].keepItIf( it != handler )
proc triggerChange(
s: BlockStore,
changeType: ChangeType,
cids: seq[Cid]) =
let evt = BlockStoreChangeEvt(
kind: changeType,
cids: cids,
)
for handler in s.changeHandlers[changeType]:
handler(evt)
method getBlocks*(b: BlockStore, cid: seq[Cid]): Future[seq[Block]] {.base.} =
## Get a block from the stores
##
doAssert(false, "Not implemented!")
method hasBlock*(s: BlockStore, cid: Cid): bool {.base.} =
## Check if the block exists in the blockstore
##
return false
method putBlocks*(s: BlockStore, blocks: seq[Block]) {.base.} =
## Put a block to the blockstore
##
s.triggerChange(ChangeType.Added, blocks.mapIt( it.cid ))
method delBlocks*(s: BlockStore, blocks: seq[Cid]) {.base.} =
## Delete a block/s from the block store
##
s.triggerChange(ChangeType.Removed, blocks)
proc contains*(s: BlockStore, blk: Cid): bool =
s.hasBlock(blk)

45
dagger/stores/manager.nim Normal file
View File

@ -0,0 +1,45 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/sequtils
import pkg/chronos
import ./blockstore
type
BlockStoreManager* = ref object of BlockStore
stores: seq[BlockStore]
proc addProvider*(b: BlockStoreManager, provider: BlockStore) =
b.stores.add(provider)
proc removeProvider*(b: BlockStoreManager, provider: BlockStore) =
b.stores.keepItIf( it != provider )
method addChangeHandler*(
s: BlockStoreManager,
handler: BlocksChangeHandler,
changeType: ChangeType) =
## Add change handler to all registered
## block stores
##
for p in s.stores:
p.addChangeHandler(handler, changeType)
method removeChangeHandler*(
s: BlockStoreManager,
handler: BlocksChangeHandler,
changeType: ChangeType) =
## Remove change handler from all registered
## block stores
##
for p in s.stores:
p.removeChangeHandler(handler, changeType)

View File

@ -0,0 +1,60 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/sequtils
import pkg/chronos
import pkg/libp2p
import ../stores/blockstore
import ../blocktype
export blockstore
type
MemoryStore* = ref object of BlockStore
blocks: seq[Block] # TODO: Should be an LRU cache
method getBlocks*(
s: MemoryStore,
cids: seq[Cid]): Future[seq[Block]] {.async.} =
## Get a block from the stores
##
var res: seq[Block]
for c in cids:
res.add(s.blocks.filterIt( it.cid == c ))
return res
method hasBlock*(s: MemoryStore, cid: Cid): bool =
## check if the block exists
##
s.blocks.filterIt( it.cid == cid ).len > 0
method putBlocks*(s: MemoryStore, blocks: seq[Block]) =
## Put a block to the blockstore
##
s.blocks.add(blocks)
procCall BlockStore(s).putBlocks(blocks)
method delBlocks*(s: MemoryStore, cids: seq[Cid]) =
## delete a block/s from the block store
##
for c in cids:
s.blocks.keepItIf( it.cid != c )
procCall BlockStore(s).delBlocks(cids)
proc new*(T: type MemoryStore, blocks: openArray[Block] = []): MemoryStore =
MemoryStore(
blocks: @blocks
)

View File

@ -0,0 +1,326 @@
## Nim-Dagger
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/sequtils
import pkg/chronos
import pkg/stew/results
# Based on chronos AsyncHeapQueue and std/heapqueue
type
QueueType* {.pure.} = enum
Min, Max
AsyncHeapQueue*[T] = ref object of RootRef
## A priority queue
##
## If ``maxsize`` is less than or equal to zero, the queue size is
## infinite. If it is an integer greater than ``0``, then "await put()"
## will block when the queue reaches ``maxsize``, until an item is
## removed by "await get()".
queueType: QueueType
getters: seq[Future[void]]
putters: seq[Future[void]]
queue: seq[T]
maxsize: int
AsyncHQErrors* {.pure.} = enum
Empty, Full
proc newAsyncHeapQueue*[T](
maxsize: int = 0,
queueType: QueueType = QueueType.Min): AsyncHeapQueue[T] =
## Creates a new asynchronous queue ``AsyncHeapQueue``.
##
AsyncHeapQueue[T](
getters: newSeq[Future[void]](),
putters: newSeq[Future[void]](),
queue: newSeqOfCap[T](maxsize),
maxsize: maxsize,
queueType: queueType,
)
proc wakeupNext(waiters: var seq[Future[void]]) {.inline.} =
var i = 0
while i < len(waiters):
var waiter = waiters[i]
inc(i)
if not(waiter.finished()):
waiter.complete()
break
if i > 0:
waiters.delete(0, i - 1)
proc heapCmp[T](x, y: T, max: bool = false): bool {.inline.} =
if max:
return (y < x)
else:
return (x < y)
proc siftdown[T](heap: AsyncHeapQueue[T], startpos, p: int) =
## 'heap' is a heap at all indices >= startpos, except
## possibly for pos. pos is the index of a leaf with a
## possibly out-of-order value. Restore the heap invariant.
##
var pos = p
var newitem = heap[pos]
# Follow the path to the root, moving parents down until
# finding a place newitem fits.
while pos > startpos:
let parentpos = (pos - 1) shr 1
let parent = heap[parentpos]
if heapCmp(newitem, parent, heap.queueType == QueueType.Max):
heap.queue[pos] = parent
pos = parentpos
else:
break
heap.queue[pos] = newitem
proc siftup[T](heap: AsyncHeapQueue[T], p: int) =
let endpos = len(heap)
var pos = p
let startpos = pos
let newitem = heap[pos]
# Bubble up the smaller child until hitting a leaf.
var childpos = 2*pos + 1 # leftmost child position
while childpos < endpos:
# Set childpos to index of smaller child.
let rightpos = childpos + 1
if rightpos < endpos and
not heapCmp(heap[childpos], heap[rightpos], heap.queueType == QueueType.Max):
childpos = rightpos
# Move the smaller child up.
heap.queue[pos] = heap[childpos]
pos = childpos
childpos = 2*pos + 1
# The leaf at pos is empty now. Put newitem there, and bubble it up
# to its final resting place (by sifting its parents down).
heap.queue[pos] = newitem
siftdown(heap, startpos, pos)
proc full*[T](heap: AsyncHeapQueue[T]): bool {.inline.} =
## Return ``true`` if there are ``maxsize`` items in the queue.
##
## Note: If the ``heap`` was initialized with ``maxsize = 0`` (default),
## then ``full()`` is never ``true``.
if heap.maxsize <= 0:
false
else:
(len(heap.queue) >= heap.maxsize)
proc empty*[T](heap: AsyncHeapQueue[T]): bool {.inline.} =
## Return ``true`` if the queue is empty, ``false`` otherwise.
(len(heap.queue) == 0)
proc pushNoWait*[T](heap: AsyncHeapQueue[T], item: T): Result[void, AsyncHQErrors] =
## Push `item` onto heap, maintaining the heap invariant.
##
if heap.full():
return err(AsyncHQErrors.Full)
heap.queue.add(item)
siftdown(heap, 0, len(heap)-1)
heap.getters.wakeupNext()
return ok()
proc push*[T](heap: AsyncHeapQueue[T], item: T) {.async, gcsafe.} =
## Push item into the queue, awaiting for an available slot
## when it's full
##
while heap.full():
var putter = newFuture[void]("AsyncHeapQueue.push")
heap.putters.add(putter)
try:
await putter
except CatchableError as exc:
if not(heap.full()) and not(putter.cancelled()):
heap.putters.wakeupNext()
raise exc
heap.pushNoWait(item).tryGet()
proc popNoWait*[T](heap: AsyncHeapQueue[T]): Result[T, AsyncHQErrors] =
## Pop and return the smallest item from `heap`,
## maintaining the heap invariant.
##
if heap.empty():
return err(AsyncHQErrors.Empty)
let lastelt = heap.queue.pop()
if heap.len > 0:
result = ok(heap[0])
heap.queue[0] = lastelt
siftup(heap, 0)
else:
result = ok(lastelt)
heap.putters.wakeupNext()
proc pop*[T](heap: AsyncHeapQueue[T]): Future[T] {.async.} =
## Remove and return an ``item`` from the beginning of the queue ``heap``.
## If the queue is empty, wait until an item is available.
while heap.empty():
var getter = newFuture[void]("AsyncHeapQueue.pop")
heap.getters.add(getter)
try:
await getter
except CatchableError as exc:
if not(heap.empty()) and not(getter.cancelled()):
heap.getters.wakeupNext()
raise exc
return heap.popNoWait().tryGet()
proc del*[T](heap: AsyncHeapQueue[T], index: Natural) =
## Removes the element at `index` from `heap`,
## maintaining the heap invariant.
##
if heap.empty():
return
swap(heap.queue[^1], heap.queue[index])
let newLen = heap.len - 1
heap.queue.setLen(newLen)
if index < newLen:
heap.siftup(index)
heap.putters.wakeupNext()
proc delete*[T](heap: AsyncHeapQueue[T], item: T) =
## Find and delete an `item` from the `heap`
##
let index = heap.find(item)
if index > -1:
heap.del(index)
proc update*[T](heap: AsyncHeapQueue[T], item: T): bool =
## Update an entry in the heap by reshufling its
## possition, maintaining the heap invariant.
##
let index = heap.find(item)
if index > -1:
# replace item with new one in case it's a copy
heap.queue[index] = item
# re-establish heap order
# TODO: don't start at 0 to avoid reshuffling
# entire heap
heap.siftup(0)
return true
proc pushOrUpdateNoWait*[T](heap: AsyncHeapQueue[T], item: T): Result[void, AsyncHQErrors] =
## Update an item if it exists or push a new one
##
if heap.update(item):
return ok()
return heap.pushNoWait(item)
proc pushOrUpdate*[T](heap: AsyncHeapQueue[T], item: T) {.async.} =
## Update an item if it exists or push a new one
## awaiting until a slot becomes available
##
if not heap.update(item):
await heap.push(item)
proc replace*[T](heap: AsyncHeapQueue[T], item: T): Result[T, AsyncHQErrors] =
## Pop and return the current smallest value, and add the new item.
## This is more efficient than pop() followed by push(), and can be
## more appropriate when using a fixed-size heap. Note that the value
## returned may be larger than item! That constrains reasonable uses of
## this routine unless written as part of a conditional replacement:
##
## .. code-block:: nim
## if item > heap[0]:
## item = replace(heap, item)
##
if heap.empty():
error(AsyncHQErrors.Empty)
result = heap[0]
heap.queue[0] = item
siftup(heap, 0)
proc pushPopNoWait*[T](heap: AsyncHeapQueue[T], item: T): Result[T, AsyncHQErrors] =
## Fast version of a push followed by a pop.
##
if heap.empty():
err(AsyncHQErrors.Empty)
if heap.len > 0 and heapCmp(heap[0], item, heap.queueType == QueueType.Max):
swap(item, heap[0])
siftup(heap, 0)
return item
proc clear*[T](heap: AsyncHeapQueue[T]) {.inline.} =
## Clears all elements of queue ``heap``.
heap.queue.setLen(0)
proc len*[T](heap: AsyncHeapQueue[T]): int {.inline.} =
## Return the number of elements in ``heap``.
len(heap.queue)
proc size*[T](heap: AsyncHeapQueue[T]): int {.inline.} =
## Return the maximum number of elements in ``heap``.
len(heap.maxsize)
proc `[]`*[T](heap: AsyncHeapQueue[T], i: Natural) : T {.inline.} =
## Access the i-th element of ``heap`` by order from first to last.
## ``heap[0]`` is the first element, ``heap[^1]`` is the last element.
heap.queue[i]
proc `[]`*[T](heap: AsyncHeapQueue[T], i: BackwardsIndex) : T {.inline.} =
## Access the i-th element of ``heap`` by order from first to last.
## ``heap[0]`` is the first element, ``heap[^1]`` is the last element.
heap.queue[len(heap.queue) - int(i)]
iterator items*[T](heap: AsyncHeapQueue[T]): T {.inline.} =
## Yield every element of ``heap``.
for item in heap.queue.items():
yield item
iterator mitems*[T](heap: AsyncHeapQueue[T]): var T {.inline.} =
## Yield every element of ``heap``.
for mitem in heap.queue.mitems():
yield mitem
iterator pairs*[T](heap: AsyncHeapQueue[T]): tuple[key: int, val: T] {.inline.} =
## Yield every (position, value) of ``heap``.
for pair in heap.queue.pairs():
yield pair
proc contains*[T](heap: AsyncHeapQueue[T], item: T): bool {.inline.} =
## Return true if ``item`` is in ``heap`` or false if not found. Usually used
## via the ``in`` operator.
for e in heap.queue.items():
if e == item: return true
return false
proc `$`*[T](heap: AsyncHeapQueue[T]): string =
## Turn an async queue ``heap`` into its string representation.
var res = "["
for item in heap.queue.items():
if len(res) > 1: res.add(", ")
res.addQuoted(item)
res.add("]")
res

View File

@ -1,10 +0,0 @@
version = "0.1.0"
author = "Dagger Team"
description = "Decentralized storage in Nim"
license = "MIT"
requires "nim >= 1.4.2 & < 2.0.0"
requires "libp2p >= 0.0.2 & < 0.1.0"
requires "chronos >= 2.5.2 & < 3.0.0"
requires "protobufserialization >= 0.2.0 & < 0.3.0"
requires "asynctest >= 0.2.1 & < 0.3.0"

View File

@ -1,50 +0,0 @@
import std/options
import pkg/chronos
import pkg/libp2p/cid
import ./ipfsobject
import ./repo
import ./p2p/switch
import ./bitswap/protocol
import ./bitswap/exchange
export options
export Cid
export Switch
type
Bitswap* = ref object
repo: Repo
switch: Switch
exchanges: seq[Exchange] # TODO: never cleaned
proc startExchange(bitswap: Bitswap, stream: BitswapStream) =
let exchange = Exchange.start(bitswap.repo, stream)
bitswap.exchanges.add(exchange)
proc start*(_: type Bitswap, switch: Switch, repo = Repo()): Bitswap =
let bitswap = Bitswap(repo: repo, switch: switch)
let protocol = BitswapProtocol.new()
proc acceptLoop {.async.} =
while true:
let stream = await protocol.accept()
bitswap.startExchange(stream)
asyncSpawn acceptLoop()
switch.mount(protocol)
bitswap
proc connect*(bitswap: Bitswap, peer: PeerInfo) {.async.} =
let stream = await bitswap.switch.dial(peer, BitswapProtocol)
bitswap.startExchange(stream)
proc store*(bitswap: Bitswap, obj: IpfsObject) =
bitswap.repo.store(obj)
proc retrieve*(bitswap: Bitswap,
cid: Cid,
timeout = 30.seconds): Future[Option[IpfsObject]] {.async.} =
result = bitswap.repo.retrieve(cid)
if result.isNone:
for exchange in bitswap.exchanges:
await exchange.want(cid)
await bitswap.repo.wait(cid, timeout)
result = bitswap.repo.retrieve(cid)

View File

@ -1,38 +0,0 @@
import pkg/chronos
import pkg/libp2p/cid
import ../repo
import ./stream
import ./messages
type Exchange* = object
repo: Repo
stream: BitswapStream
proc want*(exchange: Exchange, cid: Cid) {.async.} =
await exchange.stream.write(Message.want(cid))
proc send*(exchange: Exchange, obj: IpfsObject) {.async.} =
await exchange.stream.write(Message.send(obj.data))
proc handlePayload(exchange: Exchange, message: Message) {.async.} =
for bloc in message.payload:
let obj = IpfsObject(data: bloc.data)
exchange.repo.store(obj)
proc handleWants(exchange: Exchange, message: Message) {.async.} =
for want in message.wantlist.entries:
let cid = Cid.init(want.`block`).get()
let obj = exchange.repo.retrieve(cid)
if obj.isSome:
await exchange.send(obj.get())
proc readLoop(exchange: Exchange) {.async.} =
while true:
let message = await exchange.stream.read()
await exchange.handlePayload(message)
await exchange.handleWants(message)
proc start*(_: type Exchange, repo: Repo, stream: BitswapStream): Exchange =
let exchange = Exchange(repo: repo, stream: stream)
asyncSpawn exchange.readLoop()
exchange

View File

@ -1,15 +0,0 @@
import pkg/libp2p
import ../protobuf/bitswap
export Cid
export Message
proc want*(t: type Message, cids: varargs[Cid]): Message =
for cid in cids:
let entry = Entry(`block`: cid.data.buffer)
result.wantlist.entries.add(entry)
proc send*(t: type Message, blocks: varargs[seq[byte]]): Message =
for data in blocks:
let bloc = Block(data: data)
result.payload.add(bloc)

View File

@ -1,33 +0,0 @@
import pkg/chronos
import pkg/libp2p/switch
import pkg/libp2p/stream/connection
import pkg/libp2p/protocols/protocol
import ./stream
export stream except readLoop
const Codec = "/ipfs/bitswap/1.2.0"
type
BitswapProtocol* = ref object of LPProtocol
connections: AsyncQueue[BitswapStream]
proc new*(t: type BitswapProtocol): BitswapProtocol =
let connections = newAsyncQueue[BitswapStream](1)
proc handle(connection: Connection, proto: string) {.async.} =
let stream = BitswapStream.new(connection)
await connections.put(stream)
await stream.readLoop()
BitswapProtocol(connections: connections, codecs: @[Codec], handler: handle)
proc dial*(switch: Switch,
peer: PeerInfo,
t: type BitswapProtocol):
Future[BitswapStream] {.async.} =
let connection = await switch.dial(peer.peerId, peer.addrs, Codec)
let stream = BitswapStream.new(connection)
asyncSpawn stream.readLoop()
result = stream
proc accept*(bitswap: BitswapProtocol): Future[BitswapStream] {.async.} =
result = await bitswap.connections.get()

View File

@ -1,38 +0,0 @@
import pkg/chronos
import pkg/protobuf_serialization
import pkg/libp2p/stream/connection
import ./messages
export messages
const MaxMessageSize = 8 * 1024 * 1024
type
BitswapStream* = ref object
bytestream: LpStream
messages: AsyncQueue[Message]
proc new*(t: type BitswapStream, bytestream: LpStream): BitswapStream =
BitswapStream(bytestream: bytestream, messages: newAsyncQueue[Message](1))
proc readOnce(stream: BitswapStream) {.async.} =
let encoded = await stream.bytestream.readLp(MaxMessageSize)
let message = Protobuf.decode(encoded, Message)
await stream.messages.put(message)
proc readLoop*(stream: BitswapStream) {.async.} =
while true:
try:
await stream.readOnce()
except LPStreamEOFError:
break
proc write*(stream: BitswapStream, message: Message) {.async.} =
let encoded = Protobuf.encode(message)
await stream.bytestream.writeLp(encoded)
proc read*(stream: BitswapStream): Future[Message] {.async.} =
result = await stream.messages.get()
proc close*(stream: BitswapStream) {.async.} =
await stream.bytestream.close()

View File

@ -1,10 +0,0 @@
import ./ipfsobject
export ipfsobject
proc createObject*(file: File): IpfsObject =
let contents = file.readAll()
IpfsObject(data: cast[seq[byte]](contents))
proc writeToFile*(obj: IpfsObject, output: File) =
output.write(cast[string](obj.data))

View File

@ -1,15 +0,0 @@
import pkg/libp2p/peerinfo
import pkg/libp2p/cid
export peerinfo
export cid
type
RoutingTable* = object
peers: seq[PeerInfo]
proc add*(table: var RoutingTable, peer: PeerInfo) =
table.peers.add(peer)
proc closest*(table: RoutingTable, id: Cid): seq[PeerInfo] =
table.peers

View File

@ -1,14 +0,0 @@
import pkg/libp2p/multihash
import pkg/libp2p/multicodec
import pkg/libp2p/cid
export cid
type
IpfsObject* = object
data*: seq[byte]
proc cid*(obj: IpfsObject): Cid =
let codec = multiCodec("dag-pb")
let hash = MultiHash.digest("sha2-256", obj.data).get()
Cid.init(CIDv0, codec, hash).get()

View File

@ -1,13 +0,0 @@
import pkg/libp2p/crypto/crypto
import pkg/bearssl
type
Rng* = RandomNumberGenerator
RandomNumberGenerator = ref BrHmacDrbgContext
var rng {.threadvar.}: Rng
proc instance*(t: type Rng): Rng =
if rng.isNil:
rng = newRng()
rng

View File

@ -1,32 +0,0 @@
import std/tables
import pkg/chronos
import pkg/libp2p/switch
import pkg/libp2p/crypto/crypto
import pkg/libp2p/peerinfo
import pkg/libp2p/protocols/identify
import pkg/libp2p/stream/connection
import pkg/libp2p/muxers/muxer
import pkg/libp2p/muxers/mplex/mplex
import pkg/libp2p/transports/transport
import pkg/libp2p/transports/tcptransport
import pkg/libp2p/protocols/secure/secure
import pkg/libp2p/protocols/secure/noise
import pkg/libp2p/protocols/secure/secio
import ./rng
export switch
proc create*(t: type Switch): Switch =
proc createMplex(conn: Connection): Muxer =
Mplex.init(conn)
let privateKey = PrivateKey.random(Ed25519, Rng.instance[]).get()
let peerInfo = PeerInfo.init(privateKey)
let identify = newIdentify(peerInfo)
let mplexProvider = newMuxerProvider(createMplex, MplexCodec)
let transports = @[Transport(TcpTransport.init({ReuseAddr}))]
let muxers = [(MplexCodec, mplexProvider)].toTable
let secureManagers = [Secure(newNoise(Rng.instance, privateKey))]
newSwitch(peerInfo, transports, identify, muxers, secureManagers)

View File

@ -1,7 +0,0 @@
import pkg/protobuf_serialization
import_proto3 "message.proto"
export Message
export Wantlist, WantType, Entry
export Block, BlockPresenceType, BlockPresence

View File

@ -1,44 +0,0 @@
syntax = "proto3";
package bitswap.message.pb;
message Message {
message Wantlist {
enum WantType {
wantBlock = 0;
wantHave = 1;
}
message Entry {
bytes block = 1; // the block cid (cidV0 in bitswap 1.0.0, cidV1 in bitswap 1.1.0)
int32 priority = 2; // the priority (normalized). default to 1
bool cancel = 3; // whether this revokes an entry
WantType wantType = 4; // Note: defaults to enum 0, ie Block
bool sendDontHave = 5; // Note: defaults to false
}
repeated Entry entries = 1; // a list of wantlist entries
bool full = 2; // whether this is the full wantlist. default to false
}
message Block {
bytes prefix = 1; // CID prefix (cid version, multicodec and multihash prefix (type + length)
bytes data = 2;
}
enum BlockPresenceType {
presenceHave = 0;
presenceDontHave = 1;
}
message BlockPresence {
bytes cid = 1;
BlockPresenceType type = 2;
}
Wantlist wantlist = 1;
repeated bytes blocks = 2; // used to send Blocks in bitswap 1.0.0
repeated Block payload = 3; // used to send Blocks in bitswap 1.1.0
repeated BlockPresence blockPresences = 4;
int32 pendingBytes = 5;
}

View File

@ -1,41 +0,0 @@
import std/options
import std/tables
import std/hashes
import pkg/chronos
import pkg/libp2p
import ./ipfsobject
import ./repo/waitinglist
export options
export ipfsobject
type
Repo* = ref object
storage: Table[Cid, IpfsObject]
waiting: WaitingList[Cid]
proc hash(id: Cid): Hash =
hash($id)
proc store*(repo: Repo, obj: IpfsObject) =
let id = obj.cid
repo.storage[id] = obj
repo.waiting.deliver(id)
proc contains*(repo: Repo, id: Cid): bool =
repo.storage.hasKey(id)
proc retrieve*(repo: Repo, id: Cid): Option[IpfsObject] =
if repo.contains(id):
repo.storage[id].some
else:
IpfsObject.none
proc wait*(repo: Repo, id: Cid, timeout: Duration): Future[void] =
var future: Future[void]
if repo.contains(id):
future = newFuture[void]()
future.complete()
else:
future = repo.waiting.wait(id, timeout)
future

View File

@ -1,20 +0,0 @@
import std/tables
import pkg/chronos
type WaitingList*[T] = object
futures: Table[T, seq[Future[void]]]
proc wait*[T](list: var WaitingList, item: T, timeout: Duration): Future[void] =
let future = newFuture[void]("waitinglist.wait")
proc onTimeout(_: pointer) =
if not future.finished:
future.complete()
discard setTimer(Moment.fromNow(timeout), onTimeout, nil)
list.futures.mgetOrPut(item, @[]).add(future)
future
proc deliver*[T](list: var WaitingList, item: T) =
if list.futures.hasKey(item):
for future in list.futures[item]:
future.complete()
list.futures.del(item)

View File

@ -0,0 +1,201 @@
import std/sets
import std/sequtils
import std/algorithm
import pkg/asynctest
import pkg/chronos
import pkg/stew/byteutils
import pkg/libp2p
import pkg/libp2p/errors
import pkg/dagger/p2p/rng
import pkg/dagger/bitswap
import pkg/dagger/stores/memorystore
import pkg/dagger/chunker
import pkg/dagger/blocktype as bt
import pkg/dagger/utils/asyncheapqueue
import ./utils
import ../helpers
suite "Bitswap engine - 2 nodes":
let
chunker1 = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
blocks1 = chunker1.mapIt( bt.Block.new(it) )
chunker2 = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
blocks2 = chunker2.mapIt( bt.Block.new(it) )
var
switch1, switch2: Switch
network1, network2: BitswapNetwork
bitswap1, bitswap2: Bitswap
awaiters: seq[Future[void]]
peerId1, peerId2: PeerID
peerCtx1, peerCtx2: BitswapPeerCtx
done: Future[void]
setup:
done = newFuture[void]()
switch1 = newStandardSwitch()
switch2 = newStandardSwitch()
awaiters.add(await switch1.start())
awaiters.add(await switch2.start())
peerId1 = switch1.peerInfo.peerId
peerId2 = switch2.peerInfo.peerId
network1 = BitswapNetwork.new(switch = switch1)
bitswap1 = Bitswap.new(MemoryStore.new(blocks1), network1)
switch1.mount(network1)
network2 = BitswapNetwork.new(switch = switch2)
bitswap2 = Bitswap.new(MemoryStore.new(blocks2), network2)
switch2.mount(network2)
await allFuturesThrowing(
bitswap1.start(),
bitswap2.start(),
)
# initialize our want lists
bitswap1.engine.wantList = blocks2.mapIt( it.cid )
bitswap2.engine.wantList = blocks1.mapIt( it.cid )
await switch1.connect(
switch2.peerInfo.peerId,
switch2.peerInfo.addrs)
await sleepAsync(1.seconds) # give some time to exchange lists
peerCtx2 = bitswap1.engine.getPeerCtx(peerId2)
peerCtx1 = bitswap2.engine.getPeerCtx(peerId1)
teardown:
await allFuturesThrowing(
bitswap1.stop(),
bitswap2.stop(),
switch1.stop(),
switch2.stop())
await allFuturesThrowing(awaiters)
test "should exchange want lists on connect":
check not isNil(peerCtx1)
check not isNil(peerCtx2)
check:
peerCtx1.peerHave.mapIt( $it ).sorted(cmp[string]) ==
bitswap2.engine.wantList.mapIt( $it ).sorted(cmp[string])
peerCtx2.peerHave.mapIt( $it ).sorted(cmp[string]) ==
bitswap1.engine.wantList.mapIt( $it ).sorted(cmp[string])
test "should send want-have for block":
let blk = bt.Block.new("Block 1".toBytes)
bitswap2.engine.localStore.putBlocks(@[blk])
let entry = Entry(
`block`: blk.cid.data.buffer,
priority: 1,
cancel: false,
wantType: WantType.wantBlock,
sendDontHave: false)
peerCtx1.peerWants.add(entry)
check bitswap2.taskQueue.pushOrUpdateNoWait(peerCtx1).isOk
await sleepAsync(100.millis)
check bitswap1.engine.localStore.hasBlock(blk.cid)
test "should get blocks from remote":
let blocks = await bitswap1.getBlocks(blocks2.mapIt( it.cid ))
check blocks == blocks2
test "remote should send blocks when available":
let blk = bt.Block.new("Block 1".toBytes)
# should fail retrieving block from remote
check not await bitswap1.getBlocks(@[blk.cid])
.withTimeout(100.millis) # should expire
proc onBlocks(evt: BlockStoreChangeEvt) =
check evt.cids == @[blk.cid]
done.complete()
bitswap1.engine.localStore.addChangeHandler(onBlocks, ChangeType.Added)
# first put the required block in the local store
bitswap2.engine.localStore.putBlocks(@[blk])
# second trigger bitswap to resolve any pending requests
# for the block
bitswap2.putBlocks(@[blk])
await done
suite "Bitswap - multiple nodes":
let
chunker = newRandomChunker(Rng.instance(), size = 4096, chunkSize = 256)
blocks = chunker.mapIt( bt.Block.new(it) )
var
switch: seq[Switch]
bitswap: seq[Bitswap]
awaiters: seq[Future[void]]
setup:
for e in generateNodes(5):
switch.add(e.switch)
bitswap.add(e.bitswap)
await e.bitswap.start()
awaiters = switch.mapIt(
(await it.start())
).concat()
teardown:
await allFuturesThrowing(
switch.mapIt( it.stop() )
)
await allFuturesThrowing(awaiters)
test "should receive haves for own want list":
let
downloader = bitswap[4]
engine = downloader.engine
# Add blocks from 1st peer to want list
engine.wantList &= blocks[0..3].mapIt( it.cid )
engine.wantList &= blocks[12..15].mapIt( it.cid )
bitswap[0].engine.localStore.putBlocks(blocks[0..3])
bitswap[1].engine.localStore.putBlocks(blocks[4..7])
bitswap[2].engine.localStore.putBlocks(blocks[8..11])
bitswap[3].engine.localStore.putBlocks(blocks[12..15])
await connectNodes(switch)
await sleepAsync(1.seconds)
check engine.peers[0].peerHave == blocks[0..3].mapIt( it.cid )
check engine.peers[3].peerHave == blocks[12..15].mapIt( it.cid )
test "should exchange blocks with multiple nodes":
let
downloader = bitswap[4]
engine = downloader.engine
# Add blocks from 1st peer to want list
engine.wantList &= blocks[0..3].mapIt( it.cid )
engine.wantList &= blocks[12..15].mapIt( it.cid )
bitswap[0].engine.localStore.putBlocks(blocks[0..3])
bitswap[1].engine.localStore.putBlocks(blocks[4..7])
bitswap[2].engine.localStore.putBlocks(blocks[8..11])
bitswap[3].engine.localStore.putBlocks(blocks[12..15])
await connectNodes(switch)
let wantListBlocks = await downloader.getBlocks(blocks[0..3].mapIt( it.cid ))
check wantListBlocks == blocks[0..3]

View File

@ -0,0 +1,346 @@
import std/sequtils
import pkg/stew/byteutils
import pkg/asynctest
import pkg/chronos
import pkg/libp2p
import pkg/libp2p/errors
import pkg/dagger/p2p/rng
import pkg/dagger/bitswap
import pkg/dagger/bitswap/pendingblocks
import pkg/dagger/stores/memorystore
import pkg/dagger/chunker
import pkg/dagger/blocktype as bt
import ../helpers
suite "Bitswap engine basic":
let
rng = Rng.instance()
seckey = PrivateKey.random(rng[]).tryGet()
peerId = PeerID.init(seckey.getKey().tryGet()).tryGet()
chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
blocks = chunker.mapIt( bt.Block.new(it) )
var
done: Future[void]
setup:
done = newFuture[void]()
test "should send want list to new peers":
proc sendWantList(
id: PeerID,
cids: seq[Cid],
priority: int32 = 0,
cancel: bool = false,
wantType: WantType = WantType.wantHave,
full: bool = false,
sendDontHave: bool = false) {.gcsafe.} =
check cids == blocks.mapIt( it.cid )
done.complete()
let request = BitswapRequest(
sendWantList: sendWantList,
)
let engine = BitswapEngine.new(MemoryStore.new(blocks), request)
engine.wantList = blocks.mapIt( it.cid )
engine.setupPeer(peerId)
await done
suite "Bitswap engine handlers":
let
rng = Rng.instance()
seckey = PrivateKey.random(rng[]).tryGet()
peerId = PeerID.init(seckey.getKey().tryGet()).tryGet()
chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
blocks = chunker.mapIt( bt.Block.new(it) )
var
engine: BitswapEngine
peerCtx: BitswapPeerCtx
done: Future[void]
setup:
done = newFuture[void]()
engine = BitswapEngine.new(MemoryStore.new())
peerCtx = BitswapPeerCtx(
id: peerId
)
engine.peers.add(peerCtx)
test "should handle want list":
let wantList = makeWantList(blocks.mapIt( it.cid ))
proc taskScheduler(ctx: BitswapPeerCtx): bool =
check ctx.id == peerId
check ctx.peerWants.mapIt( it.cid ) == blocks.mapIt( it.cid )
done.complete()
engine.scheduleTask = taskScheduler
engine.wantListHandler(peerId, wantList)
await done
test "should handle want list - `dont-have`":
let wantList = makeWantList(blocks.mapIt( it.cid ), sendDontHave = true)
proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) =
check presence.mapIt( it.cid ) == wantList.entries.mapIt( it.`block` )
for p in presence:
check:
p.`type` == BlockPresenceType.presenceDontHave
done.complete()
engine.request = BitswapRequest(
sendPresence: sendPresence
)
engine.wantListHandler(peerId, wantList)
await done
test "should handle want list - `dont-have` some blocks":
let wantList = makeWantList(blocks.mapIt( it.cid ), sendDontHave = true)
proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) =
check presence.mapIt( it.cid ) == blocks[2..blocks.high].mapIt( it.cid.data.buffer )
for p in presence:
check:
p.`type` == BlockPresenceType.presenceDontHave
done.complete()
engine.request = BitswapRequest(sendPresence: sendPresence)
engine.localStore.putBlocks(@[blocks[0], blocks[1]])
engine.wantListHandler(peerId, wantList)
await done
test "should handle blocks":
let pending = blocks.mapIt(
engine.pendingBlocks.addOrAwait( it.cid )
)
engine.blocksHandler(peerId, blocks)
let resolved = await allFinished(pending)
check resolved.mapIt( it.read ) == blocks
for b in blocks:
check engine.localStore.hasBlock(b.cid)
test "should handle block presence":
engine.blockPresenceHandler(
peerId,
blocks.mapIt(
BlockPresence(
cid: it.cid.data.buffer,
`type`: BlockPresenceType.presenceHave
)))
check peerCtx.peerHave == blocks.mapIt( it.cid )
suite "Bitswap engine blocks":
let
rng = Rng.instance()
chunker = newRandomChunker(Rng.instance(), size = 2048, chunkSize = 256)
blocks = chunker.mapIt( bt.Block.new(it) )
var
engine: BitswapEngine
peersCtx: seq[BitswapPeerCtx]
peers: seq[PeerID]
done: Future[void]
setup:
done = newFuture[void]()
engine = BitswapEngine.new(MemoryStore.new())
peersCtx = @[]
for i in 0..3:
let seckey = PrivateKey.random(rng[]).tryGet()
peers.add(PeerID.init(seckey.getKey().tryGet()).tryGet())
peersCtx.add(BitswapPeerCtx(
id: peers[i]
))
# set debt ratios
# ratio > 1
peersCtx[0].bytesSent = 1000
peersCtx[0].bytesRecv = 100
# ratio < 1
peersCtx[1].bytesSent = 100
peersCtx[1].bytesRecv = 1000
# ratio > 1
peersCtx[2].bytesSent = 100
peersCtx[2].bytesRecv = 99
# ratio == 0
peersCtx[3].bytesSent = 100
peersCtx[3].bytesRecv = 100
engine.peers = peersCtx
test "should select peer with least debt ratio":
proc sendWantList(
id: PeerID,
cids: seq[Cid],
priority: int32 = 0,
cancel: bool = false,
wantType: WantType = WantType.wantHave,
full: bool = false,
sendDontHave: bool = false) {.gcsafe.} =
check cids == blocks.mapIt( it.cid )
if peersCtx[1].id == id: # second peer has the least debt ratio
check wantType == WantType.wantBlock
engine.resolveBlocks(blocks)
else:
check wantType == WantType.wantHave
engine.request.sendWantList = sendWantList
let pending = engine.requestBlocks(blocks.mapIt( it.cid ))
let resolved = await allFinished(pending)
check resolved.mapIt( it.read ) == blocks
test "should select peer with least debt ratio and have CIDs":
proc sendWantList(
id: PeerID,
cids: seq[Cid],
priority: int32 = 0,
cancel: bool = false,
wantType: WantType = WantType.wantHave,
full: bool = false,
sendDontHave: bool = false) {.gcsafe.} =
check cids == blocks.mapIt( it.cid )
if peersCtx[3].id == id: # 4th peer has the least debt ratio and has cids
check wantType == WantType.wantBlock
engine.resolveBlocks(blocks)
else:
check wantType == WantType.wantHave
engine.request.sendWantList = sendWantList
peersCtx[3].peerHave = blocks.mapIt( it.cid )
let pending = engine.requestBlocks(blocks.mapIt( it.cid ))
let resolved = await allFinished(pending)
check resolved.mapIt( it.read ) == blocks
suite "Task Handler":
let
rng = Rng.instance()
chunker = newRandomChunker(Rng.instance(), size = 2048, chunkSize = 256)
blocks = chunker.mapIt( bt.Block.new(it) )
var
engine: BitswapEngine
peersCtx: seq[BitswapPeerCtx]
peers: seq[PeerID]
done: Future[void]
setup:
done = newFuture[void]()
engine = BitswapEngine.new(MemoryStore.new())
peersCtx = @[]
for i in 0..3:
let seckey = PrivateKey.random(rng[]).tryGet()
peers.add(PeerID.init(seckey.getKey().tryGet()).tryGet())
peersCtx.add(BitswapPeerCtx(
id: peers[i]
))
engine.peers = peersCtx
test "Should send want-blocks in priority order":
proc sendBlocks(
id: PeerID,
blks: seq[bt.Block]) {.gcsafe.} =
check blks.len == 2
check:
blks[1].cid == blocks[0].cid
blks[0].cid == blocks[1].cid
engine.localStore.putBlocks(blocks)
engine.request.sendBlocks = sendBlocks
# second block to send by priority
peersCtx[0].peerWants.add(
Entry(
`block`: blocks[0].cid.data.buffer,
priority: 49,
cancel: false,
wantType: WantType.wantBlock,
sendDontHave: false)
)
# first block to send by priority
peersCtx[0].peerWants.add(
Entry(
`block`: blocks[1].cid.data.buffer,
priority: 50,
cancel: false,
wantType: WantType.wantBlock,
sendDontHave: false)
)
await engine.taskHandler(peersCtx[0])
test "Should send presence":
proc sendPresence(
id: PeerID,
presence: seq[BlockPresence]) {.gcsafe.} =
check presence.len == 3
check:
presence[0].cid == blocks[0].cid.data.buffer
presence[0].`type` == BlockPresenceType.presenceHave
presence[1].cid == blocks[1].cid.data.buffer
presence[1].`type` == BlockPresenceType.presenceHave
presence[2].`type` == BlockPresenceType.presenceDontHave
engine.localStore.putBlocks(blocks)
engine.request.sendPresence = sendPresence
# have block
peersCtx[0].peerWants.add(
Entry(
`block`: blocks[0].cid.data.buffer,
priority: 1,
cancel: false,
wantType: WantType.wantHave,
sendDontHave: false)
)
# have block
peersCtx[0].peerWants.add(
Entry(
`block`: blocks[1].cid.data.buffer,
priority: 1,
cancel: false,
wantType: WantType.wantHave,
sendDontHave: false)
)
# don't have block
peersCtx[0].peerWants.add(
Entry(
`block`: bt.Block.new("Block 1".toBytes).cid.data.buffer,
priority: 1,
cancel: false,
wantType: WantType.wantHave,
sendDontHave: false)
)
await engine.taskHandler(peersCtx[0])

View File

@ -0,0 +1,196 @@
import std/sequtils
import std/tables
import pkg/asynctest
import pkg/chronos
import pkg/libp2p
import pkg/libp2p/standard_setup
import pkg/libp2p/errors
import pkg/protobuf_serialization
import pkg/dagger/stores/memorystore
import pkg/dagger/bitswap/network
import pkg/dagger/p2p/rng
import pkg/dagger/chunker
import pkg/dagger/blocktype as bt
import ../helpers
suite "Bitswap network":
let
rng = Rng.instance()
seckey = PrivateKey.random(rng[]).tryGet()
peerId = PeerID.init(seckey.getKey().tryGet()).tryGet()
chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
blocks = chunker.mapIt( bt.Block.new(it) )
var
network: BitswapNetwork
networkPeer: NetworkPeer
buffer: BufferStream
done: Future[void]
proc getConn(): Future[Connection] {.async.} =
return Connection(buffer)
setup:
done = newFuture[void]()
buffer = newBufferStream()
network = BitswapNetwork.new(
switch = newStandardSwitch(),
connProvider = getConn)
network.setupPeer(peerId)
networkPeer = network.peers[peerId]
discard await networkPeer.connect()
test "Want List handler":
proc wantListHandler(peer: PeerID, wantList: WantList) {.gcsafe.} =
# check that we got the correct amount of entries
check wantList.entries.len == 4
for b in blocks:
check b.cid in wantList.entries
let entry = wantList.entries[wantList.entries.find(b.cid)]
check entry.wantType == WantType.wantHave
check entry.priority == 1
check entry.cancel == true
check entry.sendDontHave == true
done.complete()
network.handlers.onWantList = wantListHandler
let wantList = makeWantList(
blocks.mapIt( it.cid ),
1, true, WantType.wantHave,
true, true)
let msg = Message(wantlist: wantList)
await buffer.pushData(lenPrefix(Protobuf.encode(msg)))
await done.wait(500.millis)
test "Blocks Handler":
proc blocksHandler(peer: PeerID, blks: seq[bt.Block]) {.gcsafe.} =
check blks == blocks
done.complete()
network.handlers.onBlocks = blocksHandler
let msg = Message(payload: makeBlocks(blocks))
await buffer.pushData(lenPrefix(Protobuf.encode(msg)))
await done.wait(500.millis)
test "Precense Handler":
proc presenceHandler(peer: PeerID, precense: seq[BlockPresence]) {.gcsafe.} =
for b in blocks:
check:
b.cid in precense
done.complete()
network.handlers.onPresence = presenceHandler
let msg = Message(
blockPresences: blocks.mapIt(
BlockPresence(
cid: it.cid.data.buffer,
type: BlockPresenceType.presenceHave
)))
await buffer.pushData(lenPrefix(Protobuf.encode(msg)))
await done.wait(500.millis)
suite "Bitswap Network - e2e":
let
chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
blocks = chunker.mapIt( bt.Block.new(it) )
var
switch1, switch2: Switch
network1, network2: BitswapNetwork
awaiters: seq[Future[void]]
done: Future[void]
setup:
done = newFuture[void]()
switch1 = newStandardSwitch()
switch2 = newStandardSwitch()
awaiters.add(await switch1.start())
awaiters.add(await switch2.start())
network1 = BitswapNetwork.new(
switch = switch1)
switch1.mount(network1)
network2 = BitswapNetwork.new(
switch = switch2)
switch2.mount(network2)
await switch1.connect(
switch2.peerInfo.peerId,
switch2.peerInfo.addrs)
teardown:
await allFuturesThrowing(
switch1.stop(),
switch2.stop())
await allFuturesThrowing(awaiters)
test "broadcast want list":
proc wantListHandler(peer: PeerID, wantList: WantList) {.gcsafe.} =
# check that we got the correct amount of entries
check wantList.entries.len == 4
for b in blocks:
check b.cid in wantList.entries
let entry = wantList.entries[wantList.entries.find(b.cid)]
check entry.wantType == WantType.wantHave
check entry.priority == 1
check entry.cancel == true
check entry.sendDontHave == true
done.complete()
network2.handlers.onWantList = wantListHandler
network1.broadcastWantList(
switch2.peerInfo.peerId,
blocks.mapIt( it.cid ),
1, true, WantType.wantHave,
true, true)
await done.wait(500.millis)
test "broadcast blocks":
proc blocksHandler(peer: PeerID, blks: seq[bt.Block]) {.gcsafe.} =
check blks == blocks
done.complete()
network2.handlers.onBlocks = blocksHandler
network1.broadcastBlocks(
switch2.peerInfo.peerId,
blocks)
await done.wait(500.millis)
test "broadcast precense":
proc presenceHandler(peer: PeerID, precense: seq[BlockPresence]) {.gcsafe.} =
for b in blocks:
check:
b.cid in precense
done.complete()
network2.handlers.onPresence = presenceHandler
network1.broadcastBlockPresence(
switch2.peerInfo.peerId,
blocks.mapIt(
BlockPresence(
cid: it.cid.data.buffer,
type: BlockPresenceType.presenceHave
)))
await done.wait(500.millis)

View File

@ -0,0 +1,34 @@
import std/sequtils
import pkg/chronos
import pkg/libp2p
import pkg/dagger/bitswap
import pkg/dagger/stores/memorystore
import pkg/dagger/blocktype as bt
proc generateNodes*(
num: Natural,
blocks: openArray[bt.Block] = [],
secureManagers: openarray[SecureProtocol] = [
SecureProtocol.Noise,
]): seq[tuple[switch: Switch, bitswap: Bitswap]] =
for i in 0..<num:
let
switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
network = BitswapNetwork.new(switch = switch)
bitswap = Bitswap.new(MemoryStore.new(blocks), network)
switch.mount(network)
# initialize our want lists
bitswap.engine.wantList = blocks.mapIt( it.cid )
switch.mount(network)
result.add((switch, bitswap))
proc connectNodes*(nodes: seq[Switch]) {.async.} =
for dialer in nodes:
for node in nodes:
if dialer.peerInfo.peerId != node.peerInfo.peerId:
await dialer.connect(node.peerInfo.peerId, node.peerInfo.addrs)

17
tests/dagger/helpers.nim Normal file
View File

@ -0,0 +1,17 @@
import pkg/libp2p/varint
import pkg/dagger/chunker
import pkg/dagger/blocktype
export chunker
proc lenPrefix*(msg: openArray[byte]): seq[byte] =
## Write `msg` with a varint-encoded length prefix
##
let vbytes = PB.toBytes(msg.len().uint64)
var buf = newSeqUninitialized[byte](msg.len() + vbytes.len)
buf[0..<vbytes.len] = vbytes.toOpenArray()
buf[vbytes.len..<buf.len] = msg
return buf

View File

@ -0,0 +1,221 @@
import pkg/chronos
import pkg/asynctest
import pkg/stew/results
import pkg/dagger/utils/asyncheapqueue
import pkg/dagger/p2p/rng
type
Task* = tuple[name: string, priority: int]
proc `<`*(a, b: Task): bool =
a.priority < b.priority
proc `==`*(a, b: Task): bool =
a.name == b.name
proc toSortedSeq[T](h: AsyncHeapQueue[T], queueType = QueueType.Min): seq[T] =
var tmp = newAsyncHeapQueue[T](queueType = queueType)
for d in h:
check tmp.pushNoWait(d).isOk
while tmp.len > 0:
result.add(popNoWait(tmp).get())
suite "synchronous tests":
test "test pushNoWait - Min":
var heap = newAsyncHeapQueue[int]()
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
check heap.pushNoWait(item).isOk
check heap[0] == 0
check heap.toSortedSeq == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
test "test pushNoWait - Max":
var heap = newAsyncHeapQueue[int](queueType = QueueType.Max)
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
check heap.pushNoWait(item).isOk
check heap[0] == 9
check heap.toSortedSeq(QueueType.Max) == @[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
test "test popNoWait":
var heap = newAsyncHeapQueue[int]()
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
check heap.pushNoWait(item).isOk
var res: seq[int]
while heap.len > 0:
let r = heap.popNoWait()
if r.isOk:
res.add(r.get)
check res == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
test "test popNoWait - Max":
var heap = newAsyncHeapQueue[int](queueType = QueueType.Max)
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
check heap.pushNoWait(item).isOk
var res: seq[int]
while heap.len > 0:
let r = heap.popNoWait()
if r.isOk:
res.add(r.get)
check res == @[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
test "test del": # Test del
var heap = newAsyncHeapQueue[int]()
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
check heap.pushNoWait(item).isOk
heap.del(0)
doAssert(heap[0] == 1)
heap.del(heap.find(7))
check heap.toSortedSeq == @[1, 2, 3, 4, 5, 6, 8, 9]
heap.del(heap.find(5))
check heap.toSortedSeq == @[1, 2, 3, 4, 6, 8, 9]
heap.del(heap.find(6))
check heap.toSortedSeq == @[1, 2, 3, 4, 8, 9]
heap.del(heap.find(2))
check heap.toSortedSeq == @[1, 3, 4, 8, 9]
test "del last": # Test del last
var heap = newAsyncHeapQueue[int]()
let data = [1, 2, 3]
for item in data:
check heap.pushNoWait(item).isOk
heap.del(2)
check heap.toSortedSeq == @[1, 2]
heap.del(1)
check heap.toSortedSeq == @[1]
heap.del(0)
check heap.toSortedSeq == newSeq[int]() # empty seq has no type
test "should throw popping from an empty queue":
var heap = newAsyncHeapQueue[int]()
let err = heap.popNoWait()
check err.isErr
check err.error == AsyncHQErrors.Empty
test "should throw pushing to an full queue":
var heap = newAsyncHeapQueue[int](1)
check heap.pushNoWait(1).isOk
let err = heap.pushNoWait(2)
check err.isErr
check err.error == AsyncHQErrors.Full
test "test clear":
var heap = newAsyncHeapQueue[int]()
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
check heap.pushNoWait(item).isOk
check heap.len == 10
heap.clear()
check heap.len == 0
suite "asynchronous tests":
test "test push":
var heap = newAsyncHeapQueue[int]()
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
await push(heap, item)
check heap[0] == 0
check heap.toSortedSeq == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
test "test push and pop with maxSize":
var heap = newAsyncHeapQueue[int](5)
let data = [1, 9, 5, 3, 7, 4, 2]
proc pushTask() {.async.} =
for item in data:
await push(heap, item)
asyncCheck pushTask()
check heap.len == 5
check heap[0] == 1 # because we haven't pushed 0 yet
check (await heap.pop) == 1
check (await heap.pop) == 3
check (await heap.pop) == 5
check (await heap.pop) == 7
check (await heap.pop) == 9
await sleepAsync(1.milliseconds) # allow poll to run once more
check (await heap.pop) == 2
check (await heap.pop) == 4
test "test update":
var heap = newAsyncHeapQueue[Task](5)
for item in [("a", 4), ("b", 3), ("c", 2)]:
check heap.pushNoWait(item).isOk
check heap[0] == (name: "c", priority: 2)
check heap.update((name: "a", priority: 1))
check heap[0] == (name: "a", priority: 1)
test "test pushOrUpdate - update":
var heap = newAsyncHeapQueue[Task](3)
for item in [("a", 4), ("b", 3), ("c", 2)]:
check heap.pushNoWait(item).isOk
check heap[0] == (name: "c", priority: 2)
await heap.pushOrUpdate((name: "a", priority: 1))
check heap[0] == (name: "a", priority: 1)
test "test pushOrUpdate - push":
var heap = newAsyncHeapQueue[Task](2)
for item in [("a", 4), ("b", 3)]:
check heap.pushNoWait(item).isOk
check heap[0] == ("b", 3) # sanity check for order
let fut = heap.pushOrUpdate(("c", 2)) # attempt to push a non existen item but block
check heap.popNoWait().get() == ("b", 3) # pop one off
await fut # wait for push to complete
check heap[0] == (name: "c", priority: 2) # check order again
test "test pop":
var heap = newAsyncHeapQueue[int]()
let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0]
for item in data:
check heap.pushNoWait(item).isOk
var res: seq[int]
while heap.len > 0:
res.add((await heap.pop()))
check res == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
test "test delete":
var heap = newAsyncHeapQueue[Task]()
let data = ["d", "b", "c", "a", "h", "e", "f", "g"]
for item in data:
check heap.pushNoWait((
name: item,
priority: Rng.instance().rand(data.len)
)).isOk
let del = heap[3]
heap.delete(del)
check heap.find(del) < 0

View File

@ -0,0 +1,82 @@
import std/sequtils
import pkg/chronos
import pkg/asynctest
import pkg/libp2p
import pkg/stew/byteutils
import pkg/dagger/p2p/rng
import pkg/dagger/stores/memorystore
import pkg/dagger/chunker
import ./helpers
suite "Memory Store":
var store: MemoryStore
var chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
var blocks = chunker.mapIt( Block.new(it) )
setup:
store = MemoryStore.new(blocks)
test "getBlocks single":
let blk = await store.getBlocks(@[blocks[0].cid])
check blk[0] == blocks[0]
test "getBlocks multiple":
let blk = await store.getBlocks(blocks[0..2].mapIt( it.cid ))
check blk == blocks[0..2]
test "hasBlock":
check store.hasBlock(blocks[0].cid)
test "delBlocks single":
let blks = blocks[1..3].mapIt( it.cid )
store.delBlocks(blks)
check not store.hasBlock(blks[0])
check not store.hasBlock(blks[1])
check not store.hasBlock(blks[2])
test "add blocks change handler":
let blocks = @[
Block.new("Block 1".toBytes),
Block.new("Block 2".toBytes),
Block.new("Block 3".toBytes),
]
var triggered = false
store.addChangeHandler(
proc(evt: BlockStoreChangeEvt) =
check evt.kind == ChangeType.Added
check evt.cids == blocks.mapIt( it.cid )
triggered = true
, ChangeType.Added
)
store.putBlocks(blocks)
check triggered
test "add blocks change handler":
let blocks = @[
Block.new("Block 1".toBytes),
Block.new("Block 2".toBytes),
Block.new("Block 3".toBytes),
]
var triggered = false
store.addChangeHandler(
proc(evt: BlockStoreChangeEvt) =
check evt.kind == ChangeType.Removed
check evt.cids == blocks.mapIt( it.cid )
triggered = true
, ChangeType.Removed
)
store.putBlocks(blocks)
check store.hasBlock(blocks[0].cid)
check store.hasBlock(blocks[1].cid)
check store.hasBlock(blocks[2].cid)
store.delBlocks(blocks.mapIt( it.cid ))
check triggered

View File

@ -0,0 +1,39 @@
import std/unittest
import pkg/stew/byteutils
import pkg/dagger/chunker
suite "Chunking":
test "should return proper size chunks":
proc reader(data: var openArray[byte], offset: Natural = 0): int {.gcsafe, closure.} =
let contents = "1234567890".toBytes
copyMem(addr data[0], unsafeAddr contents[offset], data.len)
return data.len
let chunker = Chunker.new(
reader = reader,
size = 10,
chunkSize = 2)
check chunker.getBytes() == "12".toBytes
check chunker.getBytes() == "34".toBytes
check chunker.getBytes() == "56".toBytes
check chunker.getBytes() == "78".toBytes
check chunker.getBytes() == "90".toBytes
check chunker.getBytes() == "".toBytes
test "should chunk file":
let (fileName, _, _) = instantiationInfo() # get this file's name
let path = "tests/dagger/" & filename
let file = open(path)
let fileChunker = newFileChunker(file = file)
var data: seq[byte]
while true:
let buff = fileChunker.getBytes()
if buff.len <= 0:
break
check buff.len <= fileChunker.chunkSize
data.add(buff)
check string.fromBytes(data) == readFile(path)

View File

@ -1,13 +0,0 @@
import std/sequtils
import std/random
import pkg/libp2p
import pkg/ipfs/ipfsobject
proc example*(t: type seq[byte]): seq[byte] =
newSeqWith(10, rand(byte))
proc example*(t: type IpfsObject): IpfsObject =
IpfsObject(data: seq[byte].example)
proc example*(t: type Cid): Cid =
IpfsObject.example.cid

View File

@ -1,41 +0,0 @@
import pkg/chronos
import pkg/asynctest
import pkg/ipfs/ipfsobject
import pkg/ipfs/p2p/switch
import pkg/ipfs/bitswap
suite "bitswap":
let address = MultiAddress.init("/ip4/127.0.0.1/tcp/40981").get()
let obj = IpfsObject(data: @[1'u8, 2'u8, 3'u8])
var bitswap1, bitswap2: Bitswap
var peer1, peer2: Switch
setup:
peer1 = Switch.create()
peer2 = Switch.create()
peer1.peerInfo.addrs.add(address)
discard await peer1.start()
discard await peer2.start()
bitswap1 = Bitswap.start(peer1)
bitswap2 = Bitswap.start(peer2)
teardown:
await peer1.stop()
await peer2.stop()
test "stores ipfs objects":
bitswap1.store(obj)
test "retrieves local objects":
bitswap1.store(obj)
check (await bitswap1.retrieve(obj.cid)).get() == obj
test "signals retrieval failure":
check (await bitswap1.retrieve(obj.cid, 100.milliseconds)).isNone
test "retrieves objects from network":
bitswap1.store(obj)
await bitswap2.connect(peer1.peerInfo)
check (await bitswap2.retrieve(obj.cid)).get() == obj

View File

@ -1,23 +0,0 @@
import std/unittest
import pkg/libp2p
import pkg/ipfs/protobuf/bitswap
import pkg/ipfs/bitswap/messages
import ../helpers/examples
suite "bitswap messages":
test "creates message with want list":
let cid1, cid2 = Cid.example
let message = Message.want(cid1, cid2)
check message == Message(wantlist: WantList(entries: @[
Entry(`block`: cid1.data.buffer),
Entry(`block`: cid2.data.buffer)
]))
test "creates message that sends blocks":
let block1, block2 = seq[byte].example
let message = Message.send(block1, block2)
check message == Message(payload: @[
Block(data: block1),
Block(data: block2)
])

View File

@ -1,57 +0,0 @@
import pkg/chronos
import pkg/asynctest
import pkg/ipfs/p2p/switch
import pkg/ipfs/bitswap/messages
import pkg/ipfs/bitswap/protocol
suite "bitswap protocol":
let address = MultiAddress.init("/ip4/127.0.0.1/tcp/45344").get()
let message = Message.send(@[1'u8, 2'u8, 3'u8])
var peer1, peer2: Switch
var bitswap: BitswapProtocol
setup:
peer1 = Switch.create()
peer2 = Switch.create()
bitswap = BitswapProtocol.new()
peer1.peerInfo.addrs.add(address)
peer1.mount(bitswap)
discard await peer1.start()
discard await peer2.start()
teardown:
await peer1.stop()
await peer2.stop()
test "opens a stream to another peer":
let stream = await peer2.dial(peer1.peerInfo, BitswapProtocol)
await stream.close()
test "accepts a stream from another peer":
let outgoing = await peer2.dial(peer1.peerInfo, BitswapProtocol)
let incoming = await bitswap.accept()
await outgoing.close()
await incoming.close()
test "writes messages to a stream":
let stream = await peer2.dial(peer1.peerInfo, BitswapProtocol)
await stream.write(message)
await stream.close()
test "reads messages from incoming stream":
let outgoing = await peer2.dial(peer1.peerInfo, BitswapProtocol)
let incoming = await bitswap.accept()
await outgoing.write(message)
check (await incoming.read()) == message
await outgoing.close()
await incoming.close()
test "reads messages from outgoing stream":
let outgoing = await peer2.dial(peer1.peerInfo, BitswapProtocol)
let incoming = await bitswap.accept()
await incoming.write(message)
check (await outgoing.read()) == message
await outgoing.close()
await incoming.close()

View File

@ -1,31 +0,0 @@
import std/unittest
import std/os
import pkg/ipfs/chunking
suite "chunking":
var input, output: File
setup:
input = open("tests/input.txt", fmReadWrite)
output = open("tests/output.txt", fmReadWrite)
input.write("foo")
input.setFilePos(0)
teardown:
input.close()
output.close()
removeFile("tests/input.txt")
removeFile("tests/output.txt")
test "creates an IPFS object from a file":
check createObject(input) != IpfsObject.default
test "writes an IPFS object to a file":
let obj = createObject(input)
writeToFile(obj, output)
input.setFilePos(0)
output.setFilePos(0)
check output.readAll() == input.readAll()

View File

@ -1,16 +0,0 @@
import std/unittest
import pkg/ipfs/ipfsobject
import pkg/ipfs/dht/routing
suite "DHT routing table":
test "finds peers closest to some content":
let peer1 = PeerInfo(peerId: PeerId(data: @[1'u8]))
let peer2 = PeerInfo(peerId: PeerId(data: @[2'u8]))
let contentId = IpfsObject(data: @[]).cid
var table = RoutingTable()
table.add(peer1)
table.add(peer2)
check table.closest(contentId) == @[peer1, peer2]

View File

@ -1,48 +0,0 @@
import std/os
import pkg/asynctest
import pkg/chronos
import pkg/ipfs
suite "integration":
let address = MultiAddress.init("/ip4/127.0.0.1/tcp/48952").get()
var peer1, peer2: Ipfs
var input, output: File
proc setupPeers {.async.} =
peer1 = await Ipfs.start(address)
peer2 = await Ipfs.start()
await peer2.connect(peer1.info)
proc setupFiles =
input = open("tests/input.txt", fmReadWrite)
output = open("tests/output.txt", fmReadWrite)
input.write("foo")
input.setFilePos(0)
proc teardownPeers {.async.} =
await peer1.stop()
await peer2.stop()
proc teardownFiles =
input.close()
output.close()
removeFile("tests/input.txt")
removeFile("tests/output.txt")
setup:
await setupPeers()
setupFiles()
teardown:
await teardownPeers()
teardownFiles()
test "file can be transferred from one peer to another":
let identifier = await peer1.add(input)
await peer2.get(identifier, output)
input.setFilePos(0)
output.setFilePos(0)
check output.readAll() == input.readAll()

View File

@ -1,12 +0,0 @@
import std/unittest
import pkg/libp2p
import pkg/ipfs/ipfsobject
suite "IPFS Object":
test "has a content id":
let dag1 = IpfsObject(data: @[1'u8, 2'u8, 3'u8])
let dag2 = IpfsObject(data: @[4'u8, 5'u8, 6'u8])
let dag3 = IpfsObject(data: @[4'u8, 5'u8, 6'u8])
check dag1.cid != dag2.cid
check dag2.cid == dag3.cid

View File

@ -1,25 +0,0 @@
import std/unittest
import pkg/libp2p
import pkg/protobuf_serialization
import pkg/ipfs/protobuf/bitswap
import ../helpers/examples
suite "protobuf messages":
test "serializes bitswap want lists":
let cid = Cid.example
let entry = Entry(`block`: cid.data.buffer)
let wantlist = WantList(entries: @[entry])
let message = Message(wantlist: wantlist)
let encoded = Protobuf.encode(message)
check Protobuf.decode(encoded, Message) == message
test "serializes bitswap blocks":
let bloc = Block(data: seq[byte].example)
let message = Message(payload: @[bloc])
let encoded = Protobuf.encode(message)
check Protobuf.decode(encoded, Message) == message

View File

@ -1,36 +0,0 @@
import pkg/asynctest
import pkg/chronos
import pkg/ipfs/repo
suite "repo":
let obj = IpfsObject(data: @[1'u8, 2'u8, 3'u8])
var repo: Repo
setup:
repo = Repo()
test "stores IPFS objects":
repo.store(obj)
test "retrieves IPFS objects by their content id":
repo.store(obj)
check repo.retrieve(obj.cid).get() == obj
test "signals retrieval failure":
check repo.retrieve(obj.cid).isNone
test "knows which content ids are stored":
check repo.contains(obj.cid) == false
repo.store(obj)
check repo.contains(obj.cid) == true
test "waits for IPFS object to arrive":
let waiting = repo.wait(obj.cid, 1.minutes)
check not waiting.finished
repo.store(obj)
check waiting.finished
test "does not wait when IPFS object is already stored":
repo.store(obj)
check repo.wait(obj.cid, 1.minutes).finished

View File

@ -1,31 +0,0 @@
import pkg/asynctest
import pkg/chronos
import ipfs/repo/waitinglist
suite "waiting list":
var list: WaitingList[string]
setup:
list = WaitingList[string]()
test "waits for item to be delivered":
let waiting = list.wait("apple", 1.minutes)
check not waiting.finished
list.deliver("orange")
check not waiting.finished
list.deliver("apple")
check waiting.finished
test "notifies everyone who is waiting":
let wait1 = list.wait("apple", 1.minutes)
let wait2 = list.wait("apple", 1.minutes)
list.deliver("apple")
check wait1.finished
check wait2.finished
test "stops waiting after timeout":
let wait = list.wait("apple", 100.milliseconds)
check not wait.finished
await sleepAsync(100.milliseconds)
check wait.finished

View File

@ -1 +1,3 @@
--path:".." --path:".."
--threads:on
--tlsEmulation:off

View File

@ -1,12 +1,7 @@
import ./ipfs/testObject import ./dagger/bitswap/testbitswap
import ./ipfs/testChunking import ./dagger/bitswap/testnetwork
import ./ipfs/testWaitingList import ./dagger/testasyncheapqueue
import ./ipfs/testRepo import ./dagger/testblockstore
import ./ipfs/testDhtRouting import ./dagger/testchunking
import ./ipfs/testProtobuf
import ./ipfs/testBitswapMessages
import ./ipfs/testBitswapProtocol
import ./ipfs/testBitswap
import ./ipfs/testIpfs
{.warning[UnusedImport]: off.} {.warning[UnusedImport]: off.}