rename bitwsap to blockexc and move to stores

This commit is contained in:
Dmitriy Ryajov 2021-08-27 16:49:12 -06:00
parent 163fcefb31
commit 231276d6fe
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
13 changed files with 112 additions and 112 deletions

View File

@ -14,13 +14,13 @@ import pkg/chronos
import pkg/libp2p
import pkg/libp2p/errors
import ./blockexc/protobuf/blockexc as pb
import ./stores/network/protobuf/blockexc as pb
import ./blocktype as bt
import ./stores/blockstore
import ./utils/asyncheapqueue
import ./blockexc/network
import ./blockexc/engine
import ./stores/network/network
import ./stores/network/engine
export network, blockstore, asyncheapqueue, engine
@ -100,7 +100,7 @@ method putBlocks*(b: BlockExc, blocks: seq[bt.Block]) =
procCall BlockStore(b).putBlocks(blocks)
func new*(
proc new*(
T: type BlockExc,
localStore: BlockStore,
wallet: WalletRef,

View File

@ -17,7 +17,7 @@ import pkg/libp2p/errors
import ./protobuf/blockexc
import ./protobuf/presence
import ../blocktype as bt
import ../../blocktype as bt
import ../blockstore
import ../../utils/asyncheapqueue

View File

@ -16,12 +16,12 @@ import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import ../blocktype as bt
import ./protobuf/blockexc
import ../../blocktype as bt
import ./protobuf/blockexc as pb
import ./protobuf/payments
import ./networkpeer
export pb, networkpeer
export networkpeer
export payments
logScope:
@ -242,9 +242,6 @@ proc rpcHandler(b: BlockExcNetwork, peer: NetworkPeer, msg: Message) {.async.} =
if msg.wantlist.entries.len > 0:
b.handleWantList(peer, msg.wantlist)
if msg.blocks.len > 0:
b.handleBlocks(peer, msg.blocks)
if msg.payload.len > 0:
b.handleBlocks(peer, msg.payload)
@ -325,7 +322,7 @@ method init*(b: BlockExcNetwork) =
b.handler = handle
b.codec = Codec
func new*(
proc new*(
T: type BlockExcNetwork,
switch: Switch,
connProvider: ConnProvider = nil): T =

View File

@ -13,7 +13,7 @@ import pkg/chronicles
import pkg/chronos
import pkg/libp2p
import ../blocktype
import ../../blocktype
logScope:
topics = "dagger blockexc pendingblocks"

View File

@ -1,5 +1,5 @@
import std/unittest
import pkg/dagger/bitswap/engine/payments
import pkg/dagger/stores/network/engine/payments
import ../../examples
suite "engine payments":
@ -8,11 +8,11 @@ suite "engine payments":
let amount = 42.u256
var wallet: WalletRef
var peer: BitswapPeerCtx
var peer: BlockExcPeerCtx
setup:
wallet = WalletRef.example
peer = BitswapPeerCtx.example
peer = BlockExcPeerCtx.example
peer.account = Account(address: address).some
test "pays for received blocks":

View File

@ -2,7 +2,7 @@ import pkg/asynctest
import pkg/chronos
import pkg/stew/byteutils
import ../../examples
import ../../../../dagger/bitswap/protobuf/payments
import pkg/dagger/stores/network/protobuf/payments
suite "account protobuf messages":

View File

@ -2,7 +2,7 @@ import std/sequtils
import pkg/asynctest
import pkg/chronos
import pkg/libp2p
import ../../../../dagger/bitswap/protobuf/presence
import pkg/dagger/stores/network/protobuf/presence
import ../../examples
suite "block presence protobuf messages":

View File

@ -8,8 +8,9 @@ import pkg/libp2p
import pkg/libp2p/errors
import pkg/dagger/rng
import pkg/dagger/bitswap
import pkg/dagger/bitswap/engine/payments
import pkg/dagger/blockexc
import pkg/dagger/stores/network/protobuf/blockexc as pb
import pkg/dagger/stores/network/engine/payments
import pkg/dagger/stores/memorystore
import pkg/dagger/chunker
import pkg/dagger/blocktype as bt
@ -19,7 +20,7 @@ import ./utils
import ../helpers
import ../examples
suite "Bitswap engine - 2 nodes":
suite "BlockExc engine - 2 nodes":
let
chunker1 = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
@ -31,11 +32,11 @@ suite "Bitswap engine - 2 nodes":
switch1, switch2: Switch
wallet1, wallet2: WalletRef
pricing1, pricing2: Pricing
network1, network2: BitswapNetwork
bitswap1, bitswap2: Bitswap
network1, network2: BlockExcNetwork
blockexc1, blockexc2: BlockExc
awaiters: seq[Future[void]]
peerId1, peerId2: PeerID
peerCtx1, peerCtx2: BitswapPeerCtx
peerCtx1, peerCtx2: BlockExcPeerCtx
done: Future[void]
setup:
@ -53,40 +54,40 @@ suite "Bitswap engine - 2 nodes":
peerId1 = switch1.peerInfo.peerId
peerId2 = switch2.peerInfo.peerId
network1 = BitswapNetwork.new(switch = switch1)
bitswap1 = Bitswap.new(MemoryStore.new(blocks1), wallet1, network1)
network1 = BlockExcNetwork.new(switch = switch1)
blockexc1 = BlockExc.new(MemoryStore.new(blocks1), wallet1, network1)
switch1.mount(network1)
network2 = BitswapNetwork.new(switch = switch2)
bitswap2 = Bitswap.new(MemoryStore.new(blocks2), wallet2, network2)
network2 = BlockExcNetwork.new(switch = switch2)
blockexc2 = BlockExc.new(MemoryStore.new(blocks2), wallet2, network2)
switch2.mount(network2)
await allFuturesThrowing(
bitswap1.start(),
bitswap2.start(),
blockexc1.start(),
blockexc2.start(),
)
# initialize our want lists
bitswap1.engine.wantList = blocks2.mapIt( it.cid )
bitswap2.engine.wantList = blocks1.mapIt( it.cid )
blockexc1.engine.wantList = blocks2.mapIt( it.cid )
blockexc2.engine.wantList = blocks1.mapIt( it.cid )
pricing1.address = wallet1.address
pricing2.address = wallet2.address
bitswap1.engine.pricing = pricing1.some
bitswap2.engine.pricing = pricing2.some
blockexc1.engine.pricing = pricing1.some
blockexc2.engine.pricing = pricing2.some
await switch1.connect(
switch2.peerInfo.peerId,
switch2.peerInfo.addrs)
await sleepAsync(1.seconds) # give some time to exchange lists
peerCtx2 = bitswap1.engine.getPeerCtx(peerId2)
peerCtx1 = bitswap2.engine.getPeerCtx(peerId1)
peerCtx2 = blockexc1.engine.getPeerCtx(peerId2)
peerCtx1 = blockexc2.engine.getPeerCtx(peerId1)
teardown:
await allFuturesThrowing(
bitswap1.stop(),
bitswap2.stop(),
blockexc1.stop(),
blockexc2.stop(),
switch1.stop(),
switch2.stop())
@ -98,10 +99,10 @@ suite "Bitswap engine - 2 nodes":
check:
peerCtx1.peerHave.mapIt( $it ).sorted(cmp[string]) ==
bitswap2.engine.wantList.mapIt( $it ).sorted(cmp[string])
blockexc2.engine.wantList.mapIt( $it ).sorted(cmp[string])
peerCtx2.peerHave.mapIt( $it ).sorted(cmp[string]) ==
bitswap1.engine.wantList.mapIt( $it ).sorted(cmp[string])
blockexc1.engine.wantList.mapIt( $it ).sorted(cmp[string])
test "exchanges accounts on connect":
check peerCtx1.account.?address == pricing1.address.some
@ -109,7 +110,7 @@ suite "Bitswap engine - 2 nodes":
test "should send want-have for block":
let blk = !bt.Block.new("Block 1".toBytes)
bitswap2.engine.localStore.putBlocks(@[blk])
blockexc2.engine.localStore.putBlocks(@[blk])
let entry = Entry(
`block`: blk.cid.data.buffer,
@ -119,58 +120,58 @@ suite "Bitswap engine - 2 nodes":
sendDontHave: false)
peerCtx1.peerWants.add(entry)
check bitswap2.taskQueue.pushOrUpdateNoWait(peerCtx1).isOk
check blockexc2.taskQueue.pushOrUpdateNoWait(peerCtx1).isOk
await sleepAsync(100.millis)
check bitswap1.engine.localStore.hasBlock(blk.cid)
check blockexc1.engine.localStore.hasBlock(blk.cid)
test "should get blocks from remote":
let blocks = await bitswap1.getBlocks(blocks2.mapIt( it.cid ))
let blocks = await blockexc1.getBlocks(blocks2.mapIt( it.cid ))
check blocks == blocks2
test "remote should send blocks when available":
let blk = !bt.Block.new("Block 1".toBytes)
# should fail retrieving block from remote
check not await bitswap1.getBlocks(@[blk.cid])
check not await blockexc1.getBlocks(@[blk.cid])
.withTimeout(100.millis) # should expire
proc onBlocks(evt: BlockStoreChangeEvt) =
check evt.cids == @[blk.cid]
done.complete()
bitswap1.engine.localStore.addChangeHandler(onBlocks, ChangeType.Added)
blockexc1.engine.localStore.addChangeHandler(onBlocks, ChangeType.Added)
# first put the required block in the local store
bitswap2.engine.localStore.putBlocks(@[blk])
blockexc2.engine.localStore.putBlocks(@[blk])
# second trigger bitswap to resolve any pending requests
# second trigger blockexc to resolve any pending requests
# for the block
bitswap2.putBlocks(@[blk])
blockexc2.putBlocks(@[blk])
await done
test "receives payments for blocks that were sent":
let blocks = await bitswap1.getBlocks(blocks2.mapIt(it.cid))
let blocks = await blockexc1.getBlocks(blocks2.mapIt(it.cid))
await sleepAsync(100.millis)
let channel = !peerCtx1.paymentChannel
check wallet2.balance(channel, Asset) > 0
suite "Bitswap - multiple nodes":
suite "BlockExc - multiple nodes":
let
chunker = newRandomChunker(Rng.instance(), size = 4096, chunkSize = 256)
blocks = chunker.mapIt( !bt.Block.new(it) )
var
switch: seq[Switch]
bitswap: seq[Bitswap]
blockexc: seq[BlockExc]
awaiters: seq[Future[void]]
setup:
for e in generateNodes(5):
switch.add(e.switch)
bitswap.add(e.bitswap)
await e.bitswap.start()
blockexc.add(e.blockexc)
await e.blockexc.start()
awaiters = switch.mapIt(
(await it.start())
@ -185,17 +186,17 @@ suite "Bitswap - multiple nodes":
test "should receive haves for own want list":
let
downloader = bitswap[4]
downloader = blockexc[4]
engine = downloader.engine
# Add blocks from 1st peer to want list
engine.wantList &= blocks[0..3].mapIt( it.cid )
engine.wantList &= blocks[12..15].mapIt( it.cid )
bitswap[0].engine.localStore.putBlocks(blocks[0..3])
bitswap[1].engine.localStore.putBlocks(blocks[4..7])
bitswap[2].engine.localStore.putBlocks(blocks[8..11])
bitswap[3].engine.localStore.putBlocks(blocks[12..15])
blockexc[0].engine.localStore.putBlocks(blocks[0..3])
blockexc[1].engine.localStore.putBlocks(blocks[4..7])
blockexc[2].engine.localStore.putBlocks(blocks[8..11])
blockexc[3].engine.localStore.putBlocks(blocks[12..15])
await connectNodes(switch)
@ -209,17 +210,17 @@ suite "Bitswap - multiple nodes":
test "should exchange blocks with multiple nodes":
let
downloader = bitswap[4]
downloader = blockexc[4]
engine = downloader.engine
# Add blocks from 1st peer to want list
engine.wantList &= blocks[0..3].mapIt( it.cid )
engine.wantList &= blocks[12..15].mapIt( it.cid )
bitswap[0].engine.localStore.putBlocks(blocks[0..3])
bitswap[1].engine.localStore.putBlocks(blocks[4..7])
bitswap[2].engine.localStore.putBlocks(blocks[8..11])
bitswap[3].engine.localStore.putBlocks(blocks[12..15])
blockexc[0].engine.localStore.putBlocks(blocks[0..3])
blockexc[1].engine.localStore.putBlocks(blocks[4..7])
blockexc[2].engine.localStore.putBlocks(blocks[8..11])
blockexc[3].engine.localStore.putBlocks(blocks[12..15])
await connectNodes(switch)
let wantListBlocks = await downloader.getBlocks(blocks[0..3].mapIt( it.cid ))

View File

@ -8,10 +8,11 @@ import pkg/libp2p
import pkg/libp2p/errors
import pkg/dagger/rng
import pkg/dagger/bitswap
import pkg/dagger/bitswap/pendingblocks
import pkg/dagger/bitswap/engine/payments
import pkg/dagger/bitswap/protobuf/presence
import pkg/dagger/blockexc
import pkg/dagger/stores/network/protobuf/blockexc as pb
import pkg/dagger/stores/network/pendingblocks
import pkg/dagger/stores/network/engine/payments
import pkg/dagger/stores/network/protobuf/presence
import pkg/dagger/stores/memorystore
import pkg/dagger/chunker
import pkg/dagger/blocktype as bt
@ -19,7 +20,7 @@ import pkg/dagger/blocktype as bt
import ../helpers
import ../examples
suite "Bitswap engine basic":
suite "BlockExc engine basic":
let
rng = Rng.instance()
seckey = PrivateKey.random(rng[]).tryGet()
@ -47,11 +48,11 @@ suite "Bitswap engine basic":
done.complete()
let request = BitswapRequest(
let request = BlockExcRequest(
sendWantList: sendWantList,
)
let engine = BitswapEngine.new(MemoryStore.new(blocks), wallet, request)
let engine = BlockExcEngine.new(MemoryStore.new(blocks), wallet, request)
engine.wantList = blocks.mapIt( it.cid )
engine.setupPeer(peerId)
@ -64,14 +65,14 @@ suite "Bitswap engine basic":
check account.address == pricing.address
done.complete()
let request = BitswapRequest(sendAccount: sendAccount)
let engine = BitswapEngine.new(MemoryStore.new, wallet, request)
let request = BlockExcRequest(sendAccount: sendAccount)
let engine = BlockExcEngine.new(MemoryStore.new, wallet, request)
engine.pricing = pricing.some
engine.setupPeer(peerId)
await done.wait(100.millis)
suite "Bitswap engine handlers":
suite "BlockExc engine handlers":
let
rng = Rng.instance()
seckey = PrivateKey.random(rng[]).tryGet()
@ -81,21 +82,21 @@ suite "Bitswap engine handlers":
wallet = WalletRef.example
var
engine: BitswapEngine
peerCtx: BitswapPeerCtx
engine: BlockExcEngine
peerCtx: BlockExcPeerCtx
done: Future[void]
setup:
done = newFuture[void]()
engine = BitswapEngine.new(MemoryStore.new(), wallet)
peerCtx = BitswapPeerCtx(
engine = BlockExcEngine.new(MemoryStore.new(), wallet)
peerCtx = BlockExcPeerCtx(
id: peerId
)
engine.peers.add(peerCtx)
test "should handle want list":
let wantList = makeWantList(blocks.mapIt( it.cid ))
proc taskScheduler(ctx: BitswapPeerCtx): bool =
proc taskScheduler(ctx: BlockExcPeerCtx): bool =
check ctx.id == peerId
check ctx.peerWants.mapIt( it.cid ) == blocks.mapIt( it.cid )
@ -116,7 +117,7 @@ suite "Bitswap engine handlers":
done.complete()
engine.request = BitswapRequest(
engine.request = BlockExcRequest(
sendPresence: sendPresence
)
@ -134,7 +135,7 @@ suite "Bitswap engine handlers":
done.complete()
engine.request = BitswapRequest(sendPresence: sendPresence)
engine.request = BlockExcRequest(sendPresence: sendPresence)
engine.localStore.putBlocks(@[blocks[0], blocks[1]])
engine.wantListHandler(peerId, wantList)
@ -193,21 +194,21 @@ suite "Task Handler":
wallet = WalletRef.example
var
engine: BitswapEngine
peersCtx: seq[BitswapPeerCtx]
engine: BlockExcEngine
peersCtx: seq[BlockExcPeerCtx]
peers: seq[PeerID]
done: Future[void]
setup:
done = newFuture[void]()
engine = BitswapEngine.new(MemoryStore.new(), wallet)
engine = BlockExcEngine.new(MemoryStore.new(), wallet)
peersCtx = @[]
for i in 0..3:
let seckey = PrivateKey.random(rng[]).tryGet()
peers.add(PeerID.init(seckey.getKey().tryGet()).tryGet())
peersCtx.add(BitswapPeerCtx(
peersCtx.add(BlockExcPeerCtx(
id: peers[i]
))

View File

@ -7,17 +7,18 @@ import pkg/libp2p
import pkg/libp2p/errors
import pkg/protobuf_serialization
import pkg/dagger/stores/memorystore
import pkg/dagger/bitswap/network
import pkg/dagger/bitswap/protobuf/payments
import pkg/dagger/rng
import pkg/dagger/chunker
import pkg/dagger/blocktype as bt
import pkg/dagger/stores/memorystore
import pkg/dagger/stores/network/network
import pkg/dagger/stores/network/protobuf/blockexc
import pkg/dagger/stores/network/protobuf/payments
import ../helpers
import ../examples
suite "Bitswap network":
suite "BlockExc network":
let
rng = Rng.instance()
seckey = PrivateKey.random(rng[]).tryGet()
@ -26,7 +27,7 @@ suite "Bitswap network":
blocks = chunker.mapIt( !bt.Block.new(it) )
var
network: BitswapNetwork
network: BlockExcNetwork
networkPeer: NetworkPeer
buffer: BufferStream
done: Future[void]
@ -37,7 +38,7 @@ suite "Bitswap network":
setup:
done = newFuture[void]()
buffer = newBufferStream()
network = BitswapNetwork.new(
network = BlockExcNetwork.new(
switch = newStandardSwitch(),
connProvider = getConn)
network.setupPeer(peerId)
@ -131,14 +132,14 @@ suite "Bitswap network":
await done.wait(100.millis)
suite "Bitswap Network - e2e":
suite "BlockExc Network - e2e":
let
chunker = newRandomChunker(Rng.instance(), size = 1024, chunkSize = 256)
blocks = chunker.mapIt( !bt.Block.new(it) )
var
switch1, switch2: Switch
network1, network2: BitswapNetwork
network1, network2: BlockExcNetwork
awaiters: seq[Future[void]]
done: Future[void]
@ -149,11 +150,11 @@ suite "Bitswap Network - e2e":
awaiters.add(await switch1.start())
awaiters.add(await switch2.start())
network1 = BitswapNetwork.new(
network1 = BlockExcNetwork.new(
switch = switch1)
switch1.mount(network1)
network2 = BitswapNetwork.new(
network2 = BlockExcNetwork.new(
switch = switch2)
switch2.mount(network2)

View File

@ -3,7 +3,7 @@ import std/sequtils
import pkg/chronos
import pkg/libp2p
import pkg/dagger/bitswap
import pkg/dagger/blockexc
import pkg/dagger/stores/memorystore
import pkg/dagger/blocktype as bt
@ -14,21 +14,21 @@ proc generateNodes*(
blocks: openArray[bt.Block] = [],
secureManagers: openarray[SecureProtocol] = [
SecureProtocol.Noise,
]): seq[tuple[switch: Switch, bitswap: Bitswap]] =
]): seq[tuple[switch: Switch, blockexc: BlockExc]] =
for i in 0..<num:
let
switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
wallet = WalletRef.example
network = BitswapNetwork.new(switch)
bitswap = Bitswap.new(MemoryStore.new(blocks), wallet, network)
network = BlockExcNetwork.new(switch)
blockexc = BlockExc.new(MemoryStore.new(blocks), wallet, network)
switch.mount(network)
# initialize our want lists
bitswap.engine.wantList = blocks.mapIt( it.cid )
blockexc.engine.wantList = blocks.mapIt( it.cid )
switch.mount(network)
result.add((switch, bitswap))
result.add((switch, blockexc))
proc connectNodes*(nodes: seq[Switch]) {.async.} =
for dialer in nodes:

View File

@ -3,9 +3,9 @@ import std/sequtils
import pkg/libp2p
import pkg/nitro
import pkg/dagger/rng
import pkg/dagger/bitswap/protobuf/payments
import pkg/dagger/bitswap/peercontext
import pkg/dagger/bitswap/engine
import pkg/dagger/stores/network/protobuf/payments
import pkg/dagger/stores/network/peercontext
import pkg/dagger/stores/network/engine
import pkg/dagger/blocktype
proc example*(_: type EthAddress): EthAddress =
@ -50,8 +50,8 @@ proc example*(_: type PeerId): PeerID =
let key = PrivateKey.random(Rng.instance[]).get
PeerId.init(key.getKey().get).get
proc example*(_: type BitswapPeerCtx): BitswapPeerCtx =
BitswapPeerCtx(id: PeerID.example)
proc example*(_: type BlockExcPeerCtx): BlockExcPeerCtx =
BlockExcPeerCtx(id: PeerID.example)
proc example*(_: type Cid): Cid =
Block.example.cid

View File

@ -1,9 +1,9 @@
import ./dagger/bitswap/testbitswap
import ./dagger/bitswap/testengine
import ./dagger/bitswap/testnetwork
import ./dagger/bitswap/protobuf/testpayments as testprotobufpayments
import ./dagger/bitswap/protobuf/testpresence
import ./dagger/bitswap/engine/testpayments as testenginepayments
import ./dagger/blockexc/testblockexc
import ./dagger/blockexc/testengine
import ./dagger/blockexc/testnetwork
import ./dagger/blockexc/protobuf/testpayments as testprotobufpayments
import ./dagger/blockexc/protobuf/testpresence
import ./dagger/blockexc/engine/testpayments as testenginepayments
import ./dagger/testasyncheapqueue
import ./dagger/testblockstore
import ./dagger/testchunking