From 520f3f3bc976d22886db5760db5b17c55e6b0e10 Mon Sep 17 00:00:00 2001 From: Mark Spanbroek Date: Wed, 27 Jan 2021 12:03:54 +0100 Subject: [PATCH] Add Bitswap protocol for libp2p --- ipfs/bitswap/protocol.nim | 33 +++++++++++++++++ ipfs/bitswap/stream.nim | 38 ++++++++++++++++++++ ipfs/p2p/rng.nim | 13 +++++++ ipfs/p2p/switch.nim | 32 +++++++++++++++++ nim.cfg | 2 ++ tests/ipfs/testBitswapProtocol.nim | 57 ++++++++++++++++++++++++++++++ tests/testAll.nim | 1 + 7 files changed, 176 insertions(+) create mode 100644 ipfs/bitswap/protocol.nim create mode 100644 ipfs/bitswap/stream.nim create mode 100644 ipfs/p2p/rng.nim create mode 100644 ipfs/p2p/switch.nim create mode 100644 nim.cfg create mode 100644 tests/ipfs/testBitswapProtocol.nim diff --git a/ipfs/bitswap/protocol.nim b/ipfs/bitswap/protocol.nim new file mode 100644 index 00000000..cf2fc6b1 --- /dev/null +++ b/ipfs/bitswap/protocol.nim @@ -0,0 +1,33 @@ +import pkg/chronos +import pkg/libp2p/switch +import pkg/libp2p/stream/connection +import pkg/libp2p/protocols/protocol +import ./stream + +export stream except readLoop + +const Codec = "/ipfs/bitswap/1.2.0" + +type + BitswapProtocol* = ref object of LPProtocol + connections: AsyncQueue[BitswapStream] + +proc new*(t: type BitswapProtocol): BitswapProtocol = + let connections = newAsyncQueue[BitswapStream](1) + proc handle(connection: Connection, proto: string) {.async.} = + let stream = BitswapStream.new(connection) + await connections.put(stream) + await stream.readLoop() + BitswapProtocol(connections: connections, codecs: @[Codec], handler: handle) + +proc dial*(switch: Switch, + peer: PeerInfo, + t: type BitswapProtocol): + Future[BitswapStream] {.async.} = + let connection = await switch.dial(peer.peerId, peer.addrs, Codec) + let stream = BitswapStream.new(connection) + asyncSpawn stream.readLoop() + result = stream + +proc accept*(bitswap: BitswapProtocol): Future[BitswapStream] {.async.} = + result = await bitswap.connections.get() diff --git a/ipfs/bitswap/stream.nim b/ipfs/bitswap/stream.nim new file mode 100644 index 00000000..dd1bbf88 --- /dev/null +++ b/ipfs/bitswap/stream.nim @@ -0,0 +1,38 @@ +import pkg/chronos +import pkg/protobuf_serialization +import pkg/libp2p/stream/connection +import ./messages + +export messages + +const MaxMessageSize = 8 * 1024 * 1024 + +type + BitswapStream* = ref object + bytestream: LpStream + messages: AsyncQueue[Message] + +proc new*(t: type BitswapStream, bytestream: LpStream): BitswapStream = + BitswapStream(bytestream: bytestream, messages: newAsyncQueue[Message](1)) + +proc readOnce(stream: BitswapStream) {.async.} = + let encoded = await stream.bytestream.readLp(MaxMessageSize) + let message = Protobuf.decode(encoded, Message) + await stream.messages.put(message) + +proc readLoop*(stream: BitswapStream) {.async.} = + while true: + try: + await stream.readOnce() + except LPStreamEOFError: + break + +proc write*(stream: BitswapStream, message: Message) {.async.} = + let encoded = Protobuf.encode(message) + await stream.bytestream.writeLp(encoded) + +proc read*(stream: BitswapStream): Future[Message] {.async.} = + result = await stream.messages.get() + +proc close*(stream: BitswapStream) {.async.} = + await stream.bytestream.close() diff --git a/ipfs/p2p/rng.nim b/ipfs/p2p/rng.nim new file mode 100644 index 00000000..5fb187e3 --- /dev/null +++ b/ipfs/p2p/rng.nim @@ -0,0 +1,13 @@ +import pkg/libp2p/crypto/crypto +import pkg/bearssl + +type + Rng* = RandomNumberGenerator + RandomNumberGenerator = ref BrHmacDrbgContext + +var rng {.threadvar.}: Rng + +proc instance*(t: type Rng): Rng = + if rng.isNil: + rng = newRng() + rng diff --git a/ipfs/p2p/switch.nim b/ipfs/p2p/switch.nim new file mode 100644 index 00000000..8c67b7ae --- /dev/null +++ b/ipfs/p2p/switch.nim @@ -0,0 +1,32 @@ +import std/tables +import pkg/chronos +import pkg/libp2p/switch +import pkg/libp2p/crypto/crypto +import pkg/libp2p/peerinfo +import pkg/libp2p/protocols/identify +import pkg/libp2p/stream/connection +import pkg/libp2p/muxers/muxer +import pkg/libp2p/muxers/mplex/mplex +import pkg/libp2p/transports/transport +import pkg/libp2p/transports/tcptransport +import pkg/libp2p/protocols/secure/secure +import pkg/libp2p/protocols/secure/noise +import pkg/libp2p/protocols/secure/secio +import ./rng + +export switch + +proc create*(t: type Switch): Switch = + + proc createMplex(conn: Connection): Muxer = + Mplex.init(conn) + + let privateKey = PrivateKey.random(Ed25519, Rng.instance[]).get() + let peerInfo = PeerInfo.init(privateKey) + let identify = newIdentify(peerInfo) + let mplexProvider = newMuxerProvider(createMplex, MplexCodec) + let transports = @[Transport(TcpTransport.init({ReuseAddr}))] + let muxers = [(MplexCodec, mplexProvider)].toTable + let secureManagers = [Secure(newNoise(Rng.instance, privateKey))] + + newSwitch(peerInfo, transports, identify, muxers, secureManagers) diff --git a/nim.cfg b/nim.cfg new file mode 100644 index 00000000..2a83095e --- /dev/null +++ b/nim.cfg @@ -0,0 +1,2 @@ +-d:"chronicles_enabled=off" # disable logging by default +-d:"nimWorkaround14447" # libp2p needs this for nim 1.4.2 diff --git a/tests/ipfs/testBitswapProtocol.nim b/tests/ipfs/testBitswapProtocol.nim new file mode 100644 index 00000000..a895b079 --- /dev/null +++ b/tests/ipfs/testBitswapProtocol.nim @@ -0,0 +1,57 @@ +import pkg/chronos +import pkg/asynctest +import pkg/ipfs/p2p/switch +import pkg/ipfs/bitswap/messages +import pkg/ipfs/bitswap/protocol + +suite "bitswap protocol": + + let address = MultiAddress.init("/ip4/127.0.0.1/tcp/45344").get() + let message = Message.send(@[1'u8, 2'u8, 3'u8]) + + var peer1, peer2: Switch + var bitswap: BitswapProtocol + + setup: + peer1 = Switch.create() + peer2 = Switch.create() + bitswap = BitswapProtocol.new() + peer1.peerInfo.addrs.add(address) + peer1.mount(bitswap) + discard await peer1.start() + discard await peer2.start() + + teardown: + await peer1.stop() + await peer2.stop() + + test "opens a stream to another peer": + let stream = await peer2.dial(peer1.peerInfo, BitswapProtocol) + await stream.close() + + test "accepts a stream from another peer": + let outgoing = await peer2.dial(peer1.peerInfo, BitswapProtocol) + let incoming = await bitswap.accept() + await outgoing.close() + await incoming.close() + + test "writes messages to a stream": + let stream = await peer2.dial(peer1.peerInfo, BitswapProtocol) + await stream.write(message) + await stream.close() + + test "reads messages from incoming stream": + let outgoing = await peer2.dial(peer1.peerInfo, BitswapProtocol) + let incoming = await bitswap.accept() + await outgoing.write(message) + check (await incoming.read()) == message + await outgoing.close() + await incoming.close() + + test "reads messages from outgoing stream": + let outgoing = await peer2.dial(peer1.peerInfo, BitswapProtocol) + let incoming = await bitswap.accept() + await incoming.write(message) + check (await outgoing.read()) == message + await outgoing.close() + await incoming.close() diff --git a/tests/testAll.nim b/tests/testAll.nim index 4ec133cc..cb61c65b 100644 --- a/tests/testAll.nim +++ b/tests/testAll.nim @@ -4,6 +4,7 @@ import ./ipfs/testRepo import ./ipfs/testDhtRouting import ./ipfs/testProtobuf import ./ipfs/testBitswapMessages +import ./ipfs/testBitswapProtocol import ./ipfs/testIpfs {.warning[UnusedImport]: off.}