diff --git a/codex/blockexchange/network/networkpeer.nim b/codex/blockexchange/network/networkpeer.nim index 9f648e37..aee935f9 100644 --- a/codex/blockexchange/network/networkpeer.nim +++ b/codex/blockexchange/network/networkpeer.nim @@ -9,10 +9,10 @@ import pkg/chronos import pkg/chronicles -import pkg/protobuf_serialization import pkg/libp2p import ../protobuf/blockexc +import ../../errors logScope: topics = "codex blockexc networkpeer" @@ -41,7 +41,7 @@ proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} = while not conn.atEof or not conn.closed: let data = await conn.readLp(MaxMessageSize) - msg: Message = Protobuf.decode(data, Message) + msg = Message.ProtobufDecode(data).mapFailure().tryGet() trace "Got message for peer", peer = b.id await b.handler(b, msg) except CatchableError as exc: @@ -65,7 +65,7 @@ proc send*(b: NetworkPeer, msg: Message) {.async.} = return trace "Sending message to remote", peer = b.id - await conn.writeLp(Protobuf.encode(msg)) + await conn.writeLp(ProtobufEncode(msg)) proc broadcast*(b: NetworkPeer, msg: Message) = proc sendAwaiter() {.async.} = diff --git a/codex/blockexchange/protobuf/blockexc.nim b/codex/blockexchange/protobuf/blockexc.nim index 326a098a..f52f8042 100644 --- a/codex/blockexchange/protobuf/blockexc.nim +++ b/codex/blockexchange/protobuf/blockexc.nim @@ -9,12 +9,11 @@ import std/hashes import std/sequtils -import pkg/protobuf_serialization import pkg/libp2p -import_proto3 "message.proto" +import message -export Message +export Message, ProtobufEncode, ProtobufDecode export Wantlist, WantType, Entry export Block, BlockPresenceType, BlockPresence export AccountMessage, StateChannelUpdate diff --git a/codex/blockexchange/protobuf/message.nim b/codex/blockexchange/protobuf/message.nim new file mode 100644 index 00000000..a8849aae --- /dev/null +++ b/codex/blockexchange/protobuf/message.nim @@ -0,0 +1,194 @@ +# Protocol of data exchange between Codex nodes +# and Protobuf encoder/decoder for these messages. +# +# Eventually all this code should be auto-generated from message.proto. + +import pkg/libp2p/protobuf/minprotobuf + + +type + WantType* = enum + wantBlock = 0, + wantHave = 1 + + Entry* = object + `block`*: seq[byte] # The block cid + priority*: int32 # The priority (normalized). default to 1 + cancel*: bool # Whether this revokes an entry + wantType*: WantType # Note: defaults to enum 0, ie Block + sendDontHave*: bool # Note: defaults to false + + Wantlist* = object + entries*: seq[Entry] # A list of wantlist entries + full*: bool # Whether this is the full wantlist. default to false + + Block* = object + prefix*: seq[byte] # CID prefix (cid version, multicodec and multihash prefix (type + length) + data*: seq[byte] + + BlockPresenceType* = enum + presenceHave = 0, + presenceDontHave = 1 + + BlockPresence* = object + cid*: seq[byte] # The block cid + `type`*: BlockPresenceType + price*: seq[byte] # Amount of assets to pay for the block (UInt256) + + AccountMessage* = object + address*: seq[byte] # Ethereum address to which payments should be made + + StateChannelUpdate* = object + update*: seq[byte] # Signed Nitro state, serialized as JSON + + Message* = object + wantlist*: Wantlist + payload*: seq[Block] + blockPresences*: seq[BlockPresence] + pendingBytes*: uint + account*: AccountMessage + payment*: StateChannelUpdate + +# +# Encoding Message into seq[byte] in Protobuf format +# + +proc write*(pb: var ProtoBuffer, field: int, value: Entry) = + var ipb = initProtoBuffer() + ipb.write(1, value.`block`) + ipb.write(2, value.priority.uint64) + ipb.write(3, value.cancel.uint) + ipb.write(4, value.wantType.uint) + ipb.write(5, value.sendDontHave.uint) + ipb.finish() + pb.write(field, ipb) + +proc write*(pb: var ProtoBuffer, field: int, value: Wantlist) = + var ipb = initProtoBuffer() + for v in value.entries: + ipb.write(1, v) + ipb.write(2, value.full.uint) + ipb.finish() + pb.write(field, ipb) + +proc write*(pb: var ProtoBuffer, field: int, value: Block) = + var ipb = initProtoBuffer() + ipb.write(1, value.prefix) + ipb.write(2, value.data) + ipb.finish() + pb.write(field, ipb) + +proc write*(pb: var ProtoBuffer, field: int, value: BlockPresence) = + var ipb = initProtoBuffer() + ipb.write(1, value.cid) + ipb.write(2, value.`type`.uint) + ipb.write(3, value.price) + ipb.finish() + pb.write(field, ipb) + +proc write*(pb: var ProtoBuffer, field: int, value: AccountMessage) = + var ipb = initProtoBuffer() + ipb.write(1, value.address) + ipb.finish() + pb.write(field, ipb) + +proc write*(pb: var ProtoBuffer, field: int, value: StateChannelUpdate) = + var ipb = initProtoBuffer() + ipb.write(1, value.update) + ipb.finish() + pb.write(field, ipb) + +proc ProtobufEncode*(value: Message): seq[byte] = + var ipb = initProtoBuffer() + ipb.write(1, value.wantlist) + for v in value.payload: + ipb.write(3, v) + for v in value.blockPresences: + ipb.write(4, v) + ipb.write(5, value.pendingBytes) + ipb.write(6, value.account) + ipb.write(7, value.payment) + ipb.finish() + ipb.buffer + + +# +# Decoding Message from seq[byte] in Protobuf format +# + +proc decode*(_: type Entry, pb: ProtoBuffer): ProtoResult[Entry] = + var + value = Entry() + field: uint64 + discard ? pb.getField(1, value.`block`) + if ? pb.getField(2, field): + value.priority = int32(field) + if ? pb.getField(3, field): + value.cancel = bool(field) + if ? pb.getField(4, field): + value.wantType = WantType(field) + if ? pb.getField(5, field): + value.sendDontHave = bool(field) + ok(value) + +proc decode*(_: type Wantlist, pb: ProtoBuffer): ProtoResult[Wantlist] = + var + value = Wantlist() + field: uint64 + sublist: seq[seq[byte]] + if ? pb.getRepeatedField(1, sublist): + for item in sublist: + value.entries.add(? Entry.decode(initProtoBuffer(item))) + if ? pb.getField(2, field): + value.full = bool(field) + ok(value) + +proc decode*(_: type Block, pb: ProtoBuffer): ProtoResult[Block] = + var + value = Block() + discard ? pb.getField(1, value.prefix) + discard ? pb.getField(2, value.data) + ok(value) + +proc decode*(_: type BlockPresence, pb: ProtoBuffer): ProtoResult[BlockPresence] = + var + value = BlockPresence() + field: uint64 + discard ? pb.getField(1, value.cid) + if ? pb.getField(2, field): + value.`type` = BlockPresenceType(field) + discard ? pb.getField(3, value.price) + ok(value) + +proc decode*(_: type AccountMessage, pb: ProtoBuffer): ProtoResult[AccountMessage] = + var + value = AccountMessage() + discard ? pb.getField(1, value.address) + ok(value) + +proc decode*(_: type StateChannelUpdate, pb: ProtoBuffer): ProtoResult[StateChannelUpdate] = + var + value = StateChannelUpdate() + discard ? pb.getField(1, value.update) + ok(value) + +proc ProtobufDecode*(_: type Message, msg: seq[byte]): ProtoResult[Message] = + var + value = Message() + pb = initProtoBuffer(msg) + ipb: ProtoBuffer + sublist: seq[seq[byte]] + if ? pb.getField(1, ipb): + value.wantlist = ? Wantlist.decode(ipb) + if ? pb.getRepeatedField(3, sublist): + for item in sublist: + value.payload.add(? Block.decode(initProtoBuffer(item))) + if ? pb.getRepeatedField(4, sublist): + for item in sublist: + value.blockPresences.add(? BlockPresence.decode(initProtoBuffer(item))) + discard ? pb.getField(5, value.pendingBytes) + if ? pb.getField(6, ipb): + value.account = ? AccountMessage.decode(ipb) + if ? pb.getField(7, ipb): + value.payment = ? StateChannelUpdate.decode(ipb) + ok(value) diff --git a/codex/blockexchange/protobuf/message.proto b/codex/blockexchange/protobuf/message.proto index d1a15151..65d1f9a9 100644 --- a/codex/blockexchange/protobuf/message.proto +++ b/codex/blockexchange/protobuf/message.proto @@ -1,3 +1,6 @@ +// Protocol of data exchange between Codex nodes. +// Extended version of https://github.com/ipfs/specs/blob/main/BITSWAP.md + syntax = "proto3"; package blockexc.message.pb; diff --git a/tests/codex/blockexchange/testnetwork.nim b/tests/codex/blockexchange/testnetwork.nim index 1b01422d..1df53afb 100644 --- a/tests/codex/blockexchange/testnetwork.nim +++ b/tests/codex/blockexchange/testnetwork.nim @@ -5,7 +5,6 @@ import pkg/asynctest import pkg/chronos import pkg/libp2p import pkg/libp2p/errors -import pkg/protobuf_serialization import pkg/codex/rng import pkg/codex/chunker @@ -72,7 +71,7 @@ suite "Network - Handlers": true, true) let msg = Message(wantlist: wantList) - await buffer.pushData(lenPrefix(Protobuf.encode(msg))) + await buffer.pushData(lenPrefix(ProtobufEncode(msg))) await done.wait(500.millis) @@ -84,7 +83,7 @@ suite "Network - Handlers": network.handlers.onBlocks = blocksHandler let msg = Message(payload: makeBlocks(blocks)) - await buffer.pushData(lenPrefix(Protobuf.encode(msg))) + await buffer.pushData(lenPrefix(ProtobufEncode(msg))) await done.wait(500.millis) @@ -106,7 +105,7 @@ suite "Network - Handlers": cid: it.cid.data.buffer, type: BlockPresenceType.presenceHave ))) - await buffer.pushData(lenPrefix(Protobuf.encode(msg))) + await buffer.pushData(lenPrefix(ProtobufEncode(msg))) await done.wait(500.millis) @@ -120,7 +119,7 @@ suite "Network - Handlers": network.handlers.onAccount = handleAccount let message = Message(account: AccountMessage.init(account)) - await buffer.pushData(lenPrefix(Protobuf.encode(message))) + await buffer.pushData(lenPrefix(ProtobufEncode(message))) await done.wait(100.millis) @@ -134,7 +133,7 @@ suite "Network - Handlers": network.handlers.onPayment = handlePayment let message = Message(payment: StateChannelUpdate.init(payment)) - await buffer.pushData(lenPrefix(Protobuf.encode(message))) + await buffer.pushData(lenPrefix(ProtobufEncode(message))) await done.wait(100.millis)