Replace protobuf serialization for Block exchange with minprotobuf (#271)

* minprotobuf serialization for Block exchange

* Handle decoding errors by stopping peer connection
This commit is contained in:
Bulat-Ziganshin 2022-10-14 02:58:57 +03:00 committed by GitHub
parent 6e6f40016c
commit 9939d85b74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 207 additions and 12 deletions

View File

@ -9,10 +9,10 @@
import pkg/chronos import pkg/chronos
import pkg/chronicles import pkg/chronicles
import pkg/protobuf_serialization
import pkg/libp2p import pkg/libp2p
import ../protobuf/blockexc import ../protobuf/blockexc
import ../../errors
logScope: logScope:
topics = "codex blockexc networkpeer" topics = "codex blockexc networkpeer"
@ -41,7 +41,7 @@ proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} =
while not conn.atEof or not conn.closed: while not conn.atEof or not conn.closed:
let let
data = await conn.readLp(MaxMessageSize) data = await conn.readLp(MaxMessageSize)
msg: Message = Protobuf.decode(data, Message) msg = Message.ProtobufDecode(data).mapFailure().tryGet()
trace "Got message for peer", peer = b.id trace "Got message for peer", peer = b.id
await b.handler(b, msg) await b.handler(b, msg)
except CatchableError as exc: except CatchableError as exc:
@ -65,7 +65,7 @@ proc send*(b: NetworkPeer, msg: Message) {.async.} =
return return
trace "Sending message to remote", peer = b.id trace "Sending message to remote", peer = b.id
await conn.writeLp(Protobuf.encode(msg)) await conn.writeLp(ProtobufEncode(msg))
proc broadcast*(b: NetworkPeer, msg: Message) = proc broadcast*(b: NetworkPeer, msg: Message) =
proc sendAwaiter() {.async.} = proc sendAwaiter() {.async.} =

View File

@ -9,12 +9,11 @@
import std/hashes import std/hashes
import std/sequtils import std/sequtils
import pkg/protobuf_serialization
import pkg/libp2p import pkg/libp2p
import_proto3 "message.proto" import message
export Message export Message, ProtobufEncode, ProtobufDecode
export Wantlist, WantType, Entry export Wantlist, WantType, Entry
export Block, BlockPresenceType, BlockPresence export Block, BlockPresenceType, BlockPresence
export AccountMessage, StateChannelUpdate export AccountMessage, StateChannelUpdate

View File

@ -0,0 +1,194 @@
# Protocol of data exchange between Codex nodes
# and Protobuf encoder/decoder for these messages.
#
# Eventually all this code should be auto-generated from message.proto.
import pkg/libp2p/protobuf/minprotobuf
type
WantType* = enum
wantBlock = 0,
wantHave = 1
Entry* = object
`block`*: seq[byte] # The block cid
priority*: int32 # The priority (normalized). default to 1
cancel*: bool # Whether this revokes an entry
wantType*: WantType # Note: defaults to enum 0, ie Block
sendDontHave*: bool # Note: defaults to false
Wantlist* = object
entries*: seq[Entry] # A list of wantlist entries
full*: bool # Whether this is the full wantlist. default to false
Block* = object
prefix*: seq[byte] # CID prefix (cid version, multicodec and multihash prefix (type + length)
data*: seq[byte]
BlockPresenceType* = enum
presenceHave = 0,
presenceDontHave = 1
BlockPresence* = object
cid*: seq[byte] # The block cid
`type`*: BlockPresenceType
price*: seq[byte] # Amount of assets to pay for the block (UInt256)
AccountMessage* = object
address*: seq[byte] # Ethereum address to which payments should be made
StateChannelUpdate* = object
update*: seq[byte] # Signed Nitro state, serialized as JSON
Message* = object
wantlist*: Wantlist
payload*: seq[Block]
blockPresences*: seq[BlockPresence]
pendingBytes*: uint
account*: AccountMessage
payment*: StateChannelUpdate
#
# Encoding Message into seq[byte] in Protobuf format
#
proc write*(pb: var ProtoBuffer, field: int, value: Entry) =
var ipb = initProtoBuffer()
ipb.write(1, value.`block`)
ipb.write(2, value.priority.uint64)
ipb.write(3, value.cancel.uint)
ipb.write(4, value.wantType.uint)
ipb.write(5, value.sendDontHave.uint)
ipb.finish()
pb.write(field, ipb)
proc write*(pb: var ProtoBuffer, field: int, value: Wantlist) =
var ipb = initProtoBuffer()
for v in value.entries:
ipb.write(1, v)
ipb.write(2, value.full.uint)
ipb.finish()
pb.write(field, ipb)
proc write*(pb: var ProtoBuffer, field: int, value: Block) =
var ipb = initProtoBuffer()
ipb.write(1, value.prefix)
ipb.write(2, value.data)
ipb.finish()
pb.write(field, ipb)
proc write*(pb: var ProtoBuffer, field: int, value: BlockPresence) =
var ipb = initProtoBuffer()
ipb.write(1, value.cid)
ipb.write(2, value.`type`.uint)
ipb.write(3, value.price)
ipb.finish()
pb.write(field, ipb)
proc write*(pb: var ProtoBuffer, field: int, value: AccountMessage) =
var ipb = initProtoBuffer()
ipb.write(1, value.address)
ipb.finish()
pb.write(field, ipb)
proc write*(pb: var ProtoBuffer, field: int, value: StateChannelUpdate) =
var ipb = initProtoBuffer()
ipb.write(1, value.update)
ipb.finish()
pb.write(field, ipb)
proc ProtobufEncode*(value: Message): seq[byte] =
var ipb = initProtoBuffer()
ipb.write(1, value.wantlist)
for v in value.payload:
ipb.write(3, v)
for v in value.blockPresences:
ipb.write(4, v)
ipb.write(5, value.pendingBytes)
ipb.write(6, value.account)
ipb.write(7, value.payment)
ipb.finish()
ipb.buffer
#
# Decoding Message from seq[byte] in Protobuf format
#
proc decode*(_: type Entry, pb: ProtoBuffer): ProtoResult[Entry] =
var
value = Entry()
field: uint64
discard ? pb.getField(1, value.`block`)
if ? pb.getField(2, field):
value.priority = int32(field)
if ? pb.getField(3, field):
value.cancel = bool(field)
if ? pb.getField(4, field):
value.wantType = WantType(field)
if ? pb.getField(5, field):
value.sendDontHave = bool(field)
ok(value)
proc decode*(_: type Wantlist, pb: ProtoBuffer): ProtoResult[Wantlist] =
var
value = Wantlist()
field: uint64
sublist: seq[seq[byte]]
if ? pb.getRepeatedField(1, sublist):
for item in sublist:
value.entries.add(? Entry.decode(initProtoBuffer(item)))
if ? pb.getField(2, field):
value.full = bool(field)
ok(value)
proc decode*(_: type Block, pb: ProtoBuffer): ProtoResult[Block] =
var
value = Block()
discard ? pb.getField(1, value.prefix)
discard ? pb.getField(2, value.data)
ok(value)
proc decode*(_: type BlockPresence, pb: ProtoBuffer): ProtoResult[BlockPresence] =
var
value = BlockPresence()
field: uint64
discard ? pb.getField(1, value.cid)
if ? pb.getField(2, field):
value.`type` = BlockPresenceType(field)
discard ? pb.getField(3, value.price)
ok(value)
proc decode*(_: type AccountMessage, pb: ProtoBuffer): ProtoResult[AccountMessage] =
var
value = AccountMessage()
discard ? pb.getField(1, value.address)
ok(value)
proc decode*(_: type StateChannelUpdate, pb: ProtoBuffer): ProtoResult[StateChannelUpdate] =
var
value = StateChannelUpdate()
discard ? pb.getField(1, value.update)
ok(value)
proc ProtobufDecode*(_: type Message, msg: seq[byte]): ProtoResult[Message] =
var
value = Message()
pb = initProtoBuffer(msg)
ipb: ProtoBuffer
sublist: seq[seq[byte]]
if ? pb.getField(1, ipb):
value.wantlist = ? Wantlist.decode(ipb)
if ? pb.getRepeatedField(3, sublist):
for item in sublist:
value.payload.add(? Block.decode(initProtoBuffer(item)))
if ? pb.getRepeatedField(4, sublist):
for item in sublist:
value.blockPresences.add(? BlockPresence.decode(initProtoBuffer(item)))
discard ? pb.getField(5, value.pendingBytes)
if ? pb.getField(6, ipb):
value.account = ? AccountMessage.decode(ipb)
if ? pb.getField(7, ipb):
value.payment = ? StateChannelUpdate.decode(ipb)
ok(value)

View File

@ -1,3 +1,6 @@
// Protocol of data exchange between Codex nodes.
// Extended version of https://github.com/ipfs/specs/blob/main/BITSWAP.md
syntax = "proto3"; syntax = "proto3";
package blockexc.message.pb; package blockexc.message.pb;

View File

@ -5,7 +5,6 @@ import pkg/asynctest
import pkg/chronos import pkg/chronos
import pkg/libp2p import pkg/libp2p
import pkg/libp2p/errors import pkg/libp2p/errors
import pkg/protobuf_serialization
import pkg/codex/rng import pkg/codex/rng
import pkg/codex/chunker import pkg/codex/chunker
@ -72,7 +71,7 @@ suite "Network - Handlers":
true, true) true, true)
let msg = Message(wantlist: wantList) let msg = Message(wantlist: wantList)
await buffer.pushData(lenPrefix(Protobuf.encode(msg))) await buffer.pushData(lenPrefix(ProtobufEncode(msg)))
await done.wait(500.millis) await done.wait(500.millis)
@ -84,7 +83,7 @@ suite "Network - Handlers":
network.handlers.onBlocks = blocksHandler network.handlers.onBlocks = blocksHandler
let msg = Message(payload: makeBlocks(blocks)) let msg = Message(payload: makeBlocks(blocks))
await buffer.pushData(lenPrefix(Protobuf.encode(msg))) await buffer.pushData(lenPrefix(ProtobufEncode(msg)))
await done.wait(500.millis) await done.wait(500.millis)
@ -106,7 +105,7 @@ suite "Network - Handlers":
cid: it.cid.data.buffer, cid: it.cid.data.buffer,
type: BlockPresenceType.presenceHave type: BlockPresenceType.presenceHave
))) )))
await buffer.pushData(lenPrefix(Protobuf.encode(msg))) await buffer.pushData(lenPrefix(ProtobufEncode(msg)))
await done.wait(500.millis) await done.wait(500.millis)
@ -120,7 +119,7 @@ suite "Network - Handlers":
network.handlers.onAccount = handleAccount network.handlers.onAccount = handleAccount
let message = Message(account: AccountMessage.init(account)) let message = Message(account: AccountMessage.init(account))
await buffer.pushData(lenPrefix(Protobuf.encode(message))) await buffer.pushData(lenPrefix(ProtobufEncode(message)))
await done.wait(100.millis) await done.wait(100.millis)
@ -134,7 +133,7 @@ suite "Network - Handlers":
network.handlers.onPayment = handlePayment network.handlers.onPayment = handlePayment
let message = Message(payment: StateChannelUpdate.init(payment)) let message = Message(payment: StateChannelUpdate.init(payment))
await buffer.pushData(lenPrefix(Protobuf.encode(message))) await buffer.pushData(lenPrefix(ProtobufEncode(message)))
await done.wait(100.millis) await done.wait(100.millis)