Add Bitswap protocol for libp2p
This commit is contained in:
parent
e6d84b9c0d
commit
520f3f3bc9
|
@ -0,0 +1,33 @@
|
||||||
|
import pkg/chronos
|
||||||
|
import pkg/libp2p/switch
|
||||||
|
import pkg/libp2p/stream/connection
|
||||||
|
import pkg/libp2p/protocols/protocol
|
||||||
|
import ./stream
|
||||||
|
|
||||||
|
export stream except readLoop
|
||||||
|
|
||||||
|
const Codec = "/ipfs/bitswap/1.2.0"
|
||||||
|
|
||||||
|
type
|
||||||
|
BitswapProtocol* = ref object of LPProtocol
|
||||||
|
connections: AsyncQueue[BitswapStream]
|
||||||
|
|
||||||
|
proc new*(t: type BitswapProtocol): BitswapProtocol =
|
||||||
|
let connections = newAsyncQueue[BitswapStream](1)
|
||||||
|
proc handle(connection: Connection, proto: string) {.async.} =
|
||||||
|
let stream = BitswapStream.new(connection)
|
||||||
|
await connections.put(stream)
|
||||||
|
await stream.readLoop()
|
||||||
|
BitswapProtocol(connections: connections, codecs: @[Codec], handler: handle)
|
||||||
|
|
||||||
|
proc dial*(switch: Switch,
|
||||||
|
peer: PeerInfo,
|
||||||
|
t: type BitswapProtocol):
|
||||||
|
Future[BitswapStream] {.async.} =
|
||||||
|
let connection = await switch.dial(peer.peerId, peer.addrs, Codec)
|
||||||
|
let stream = BitswapStream.new(connection)
|
||||||
|
asyncSpawn stream.readLoop()
|
||||||
|
result = stream
|
||||||
|
|
||||||
|
proc accept*(bitswap: BitswapProtocol): Future[BitswapStream] {.async.} =
|
||||||
|
result = await bitswap.connections.get()
|
|
@ -0,0 +1,38 @@
|
||||||
|
import pkg/chronos
|
||||||
|
import pkg/protobuf_serialization
|
||||||
|
import pkg/libp2p/stream/connection
|
||||||
|
import ./messages
|
||||||
|
|
||||||
|
export messages
|
||||||
|
|
||||||
|
const MaxMessageSize = 8 * 1024 * 1024
|
||||||
|
|
||||||
|
type
|
||||||
|
BitswapStream* = ref object
|
||||||
|
bytestream: LpStream
|
||||||
|
messages: AsyncQueue[Message]
|
||||||
|
|
||||||
|
proc new*(t: type BitswapStream, bytestream: LpStream): BitswapStream =
|
||||||
|
BitswapStream(bytestream: bytestream, messages: newAsyncQueue[Message](1))
|
||||||
|
|
||||||
|
proc readOnce(stream: BitswapStream) {.async.} =
|
||||||
|
let encoded = await stream.bytestream.readLp(MaxMessageSize)
|
||||||
|
let message = Protobuf.decode(encoded, Message)
|
||||||
|
await stream.messages.put(message)
|
||||||
|
|
||||||
|
proc readLoop*(stream: BitswapStream) {.async.} =
|
||||||
|
while true:
|
||||||
|
try:
|
||||||
|
await stream.readOnce()
|
||||||
|
except LPStreamEOFError:
|
||||||
|
break
|
||||||
|
|
||||||
|
proc write*(stream: BitswapStream, message: Message) {.async.} =
|
||||||
|
let encoded = Protobuf.encode(message)
|
||||||
|
await stream.bytestream.writeLp(encoded)
|
||||||
|
|
||||||
|
proc read*(stream: BitswapStream): Future[Message] {.async.} =
|
||||||
|
result = await stream.messages.get()
|
||||||
|
|
||||||
|
proc close*(stream: BitswapStream) {.async.} =
|
||||||
|
await stream.bytestream.close()
|
|
@ -0,0 +1,13 @@
|
||||||
|
import pkg/libp2p/crypto/crypto
|
||||||
|
import pkg/bearssl
|
||||||
|
|
||||||
|
type
|
||||||
|
Rng* = RandomNumberGenerator
|
||||||
|
RandomNumberGenerator = ref BrHmacDrbgContext
|
||||||
|
|
||||||
|
var rng {.threadvar.}: Rng
|
||||||
|
|
||||||
|
proc instance*(t: type Rng): Rng =
|
||||||
|
if rng.isNil:
|
||||||
|
rng = newRng()
|
||||||
|
rng
|
|
@ -0,0 +1,32 @@
|
||||||
|
import std/tables
|
||||||
|
import pkg/chronos
|
||||||
|
import pkg/libp2p/switch
|
||||||
|
import pkg/libp2p/crypto/crypto
|
||||||
|
import pkg/libp2p/peerinfo
|
||||||
|
import pkg/libp2p/protocols/identify
|
||||||
|
import pkg/libp2p/stream/connection
|
||||||
|
import pkg/libp2p/muxers/muxer
|
||||||
|
import pkg/libp2p/muxers/mplex/mplex
|
||||||
|
import pkg/libp2p/transports/transport
|
||||||
|
import pkg/libp2p/transports/tcptransport
|
||||||
|
import pkg/libp2p/protocols/secure/secure
|
||||||
|
import pkg/libp2p/protocols/secure/noise
|
||||||
|
import pkg/libp2p/protocols/secure/secio
|
||||||
|
import ./rng
|
||||||
|
|
||||||
|
export switch
|
||||||
|
|
||||||
|
proc create*(t: type Switch): Switch =
|
||||||
|
|
||||||
|
proc createMplex(conn: Connection): Muxer =
|
||||||
|
Mplex.init(conn)
|
||||||
|
|
||||||
|
let privateKey = PrivateKey.random(Ed25519, Rng.instance[]).get()
|
||||||
|
let peerInfo = PeerInfo.init(privateKey)
|
||||||
|
let identify = newIdentify(peerInfo)
|
||||||
|
let mplexProvider = newMuxerProvider(createMplex, MplexCodec)
|
||||||
|
let transports = @[Transport(TcpTransport.init({ReuseAddr}))]
|
||||||
|
let muxers = [(MplexCodec, mplexProvider)].toTable
|
||||||
|
let secureManagers = [Secure(newNoise(Rng.instance, privateKey))]
|
||||||
|
|
||||||
|
newSwitch(peerInfo, transports, identify, muxers, secureManagers)
|
|
@ -0,0 +1,2 @@
|
||||||
|
-d:"chronicles_enabled=off" # disable logging by default
|
||||||
|
-d:"nimWorkaround14447" # libp2p needs this for nim 1.4.2
|
|
@ -0,0 +1,57 @@
|
||||||
|
import pkg/chronos
|
||||||
|
import pkg/asynctest
|
||||||
|
import pkg/ipfs/p2p/switch
|
||||||
|
import pkg/ipfs/bitswap/messages
|
||||||
|
import pkg/ipfs/bitswap/protocol
|
||||||
|
|
||||||
|
suite "bitswap protocol":
|
||||||
|
|
||||||
|
let address = MultiAddress.init("/ip4/127.0.0.1/tcp/45344").get()
|
||||||
|
let message = Message.send(@[1'u8, 2'u8, 3'u8])
|
||||||
|
|
||||||
|
var peer1, peer2: Switch
|
||||||
|
var bitswap: BitswapProtocol
|
||||||
|
|
||||||
|
setup:
|
||||||
|
peer1 = Switch.create()
|
||||||
|
peer2 = Switch.create()
|
||||||
|
bitswap = BitswapProtocol.new()
|
||||||
|
peer1.peerInfo.addrs.add(address)
|
||||||
|
peer1.mount(bitswap)
|
||||||
|
discard await peer1.start()
|
||||||
|
discard await peer2.start()
|
||||||
|
|
||||||
|
teardown:
|
||||||
|
await peer1.stop()
|
||||||
|
await peer2.stop()
|
||||||
|
|
||||||
|
test "opens a stream to another peer":
|
||||||
|
let stream = await peer2.dial(peer1.peerInfo, BitswapProtocol)
|
||||||
|
await stream.close()
|
||||||
|
|
||||||
|
test "accepts a stream from another peer":
|
||||||
|
let outgoing = await peer2.dial(peer1.peerInfo, BitswapProtocol)
|
||||||
|
let incoming = await bitswap.accept()
|
||||||
|
await outgoing.close()
|
||||||
|
await incoming.close()
|
||||||
|
|
||||||
|
test "writes messages to a stream":
|
||||||
|
let stream = await peer2.dial(peer1.peerInfo, BitswapProtocol)
|
||||||
|
await stream.write(message)
|
||||||
|
await stream.close()
|
||||||
|
|
||||||
|
test "reads messages from incoming stream":
|
||||||
|
let outgoing = await peer2.dial(peer1.peerInfo, BitswapProtocol)
|
||||||
|
let incoming = await bitswap.accept()
|
||||||
|
await outgoing.write(message)
|
||||||
|
check (await incoming.read()) == message
|
||||||
|
await outgoing.close()
|
||||||
|
await incoming.close()
|
||||||
|
|
||||||
|
test "reads messages from outgoing stream":
|
||||||
|
let outgoing = await peer2.dial(peer1.peerInfo, BitswapProtocol)
|
||||||
|
let incoming = await bitswap.accept()
|
||||||
|
await incoming.write(message)
|
||||||
|
check (await outgoing.read()) == message
|
||||||
|
await outgoing.close()
|
||||||
|
await incoming.close()
|
|
@ -4,6 +4,7 @@ import ./ipfs/testRepo
|
||||||
import ./ipfs/testDhtRouting
|
import ./ipfs/testDhtRouting
|
||||||
import ./ipfs/testProtobuf
|
import ./ipfs/testProtobuf
|
||||||
import ./ipfs/testBitswapMessages
|
import ./ipfs/testBitswapMessages
|
||||||
|
import ./ipfs/testBitswapProtocol
|
||||||
import ./ipfs/testIpfs
|
import ./ipfs/testIpfs
|
||||||
|
|
||||||
{.warning[UnusedImport]: off.}
|
{.warning[UnusedImport]: off.}
|
||||||
|
|
Loading…
Reference in New Issue