Exchange objects between peers using bitswap

This commit is contained in:
Mark Spanbroek 2021-01-28 17:40:16 +01:00 committed by markspanbroek
parent 28d1ddf7e3
commit a63000f602
4 changed files with 128 additions and 0 deletions

48
ipfs/bitswap.nim Normal file
View File

@ -0,0 +1,48 @@
import std/options
import pkg/chronos
import pkg/libp2p/cid
import ./ipfsobject
import ./repo
import ./p2p/switch
import ./bitswap/protocol
import ./bitswap/exchange
export options
export Cid
export Switch
type
Bitswap* = ref object
repo: Repo
switch: Switch
exchanges: seq[Exchange] # TODO: never cleaned
proc startExchange(bitswap: Bitswap, stream: BitswapStream) =
let exchange = Exchange.start(bitswap.repo, stream)
bitswap.exchanges.add(exchange)
proc start*(_: type Bitswap, switch: Switch, repo = Repo()): Bitswap =
let bitswap = Bitswap(repo: repo, switch: switch)
let protocol = BitswapProtocol.new()
proc acceptLoop {.async.} =
while true:
let stream = await protocol.accept()
bitswap.startExchange(stream)
asyncSpawn acceptLoop()
switch.mount(protocol)
bitswap
proc connect*(bitswap: Bitswap, peer: PeerInfo) {.async.} =
let stream = await bitswap.switch.dial(peer, BitswapProtocol)
bitswap.startExchange(stream)
proc store*(bitswap: Bitswap, obj: IpfsObject) =
bitswap.repo.store(obj)
proc retrieve*(bitswap: Bitswap, cid: Cid): Future[Option[IpfsObject]] {.async.} =
result = bitswap.repo.retrieve(cid)
if result.isNone:
for exchange in bitswap.exchanges:
await exchange.want(cid)
await sleepAsync(1.seconds) # TODO
result = bitswap.repo.retrieve(cid)

38
ipfs/bitswap/exchange.nim Normal file
View File

@ -0,0 +1,38 @@
import pkg/chronos
import pkg/libp2p/cid
import ../repo
import ./stream
import ./messages
type Exchange* = object
repo: Repo
stream: BitswapStream
proc want*(exchange: Exchange, cid: Cid) {.async.} =
await exchange.stream.write(Message.want(cid))
proc send*(exchange: Exchange, obj: IpfsObject) {.async.} =
await exchange.stream.write(Message.send(obj.data))
proc handlePayload(exchange: Exchange, message: Message) {.async.} =
for bloc in message.payload:
let obj = IpfsObject(data: bloc.data)
exchange.repo.store(obj)
proc handleWants(exchange: Exchange, message: Message) {.async.} =
for want in message.wantlist.entries:
let cid = Cid.init(want.`block`).get()
let obj = exchange.repo.retrieve(cid)
if obj.isSome:
await exchange.send(obj.get())
proc readLoop(exchange: Exchange) {.async.} =
while true:
let message = await exchange.stream.read()
await exchange.handlePayload(message)
await exchange.handleWants(message)
proc start*(_: type Exchange, repo: Repo, stream: BitswapStream): Exchange =
let exchange = Exchange(repo: repo, stream: stream)
asyncSpawn exchange.readLoop()
exchange

View File

@ -0,0 +1,41 @@
import pkg/chronos
import pkg/asynctest
import pkg/ipfs/ipfsobject
import pkg/ipfs/p2p/switch
import pkg/ipfs/bitswap
suite "bitswap":
let address = MultiAddress.init("/ip4/127.0.0.1/tcp/40981").get()
let obj = IpfsObject(data: @[1'u8, 2'u8, 3'u8])
var bitswap1, bitswap2: Bitswap
var peer1, peer2: Switch
setup:
peer1 = Switch.create()
peer2 = Switch.create()
peer1.peerInfo.addrs.add(address)
discard await peer1.start()
discard await peer2.start()
bitswap1 = Bitswap.start(peer1)
bitswap2 = Bitswap.start(peer2)
teardown:
await peer1.stop()
await peer2.stop()
test "stores ipfs objects":
bitswap1.store(obj)
test "retrieves local objects":
bitswap1.store(obj)
check (await bitswap1.retrieve(obj.cid)).get() == obj
test "signals retrieval failure":
check (await bitswap1.retrieve(obj.cid)).isNone
test "retrieves objects from network":
bitswap1.store(obj)
await bitswap2.connect(peer1.peerInfo)
check (await bitswap2.retrieve(obj.cid)).get() == obj

View File

@ -5,6 +5,7 @@ import ./ipfs/testDhtRouting
import ./ipfs/testProtobuf
import ./ipfs/testBitswapMessages
import ./ipfs/testBitswapProtocol
import ./ipfs/testBitswap
import ./ipfs/testIpfs
{.warning[UnusedImport]: off.}