From e9673fa20843146c3c7ce88c9134f3ee80b45dcc Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Wed, 15 Jul 2020 03:08:09 -0500 Subject: [PATCH] MVDS implementation in Nim. Still requires testing against the reference implementation, and the API needs discussion. --- .gitignore | 1 + LICENSE | 21 ++++++ mvds.nim | 29 ++++++++ mvds.nimble | 15 +++++ mvds/Message.nim | 45 +++++++++++++ mvds/State.nim | 133 +++++++++++++++++++++++++++++++++++++ tests/test_all.nim | 5 ++ tests/test_batch.nim | 99 +++++++++++++++++++++++++++ tests/test_go_vectors.nim | 0 tests/test_interactive.nim | 81 ++++++++++++++++++++++ 10 files changed, 429 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 mvds.nim create mode 100644 mvds.nimble create mode 100644 mvds/Message.nim create mode 100644 mvds/State.nim create mode 100644 tests/test_all.nim create mode 100644 tests/test_batch.nim create mode 100644 tests/test_go_vectors.nim create mode 100644 tests/test_interactive.nim diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..67d9b34 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +nimcache/ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..9625288 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2020 Status Research & Development GmbH + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/mvds.nim b/mvds.nim new file mode 100644 index 0000000..267c3a7 --- /dev/null +++ b/mvds.nim @@ -0,0 +1,29 @@ +import protobuf_serialization + +import mvds/Message +export Message + +import mvds/State + +type MVDSNode* = ref object + when defined(MVDS_TESTS): + state*: State + else: + state: State + +proc newMVDSNode*(interactive: bool): MVDSNode {.inline.} = + MVDSNode( + state: newState(interactive) + ) + +proc offer*(node: MVDSNode, msg: Message, epoch: int) {.inline.} = + node.state.offer(msg, epoch) + +proc updateEpoch*(node: MVDSNode, msgID: seq[byte], epoch: int): bool = + node.state.updateEpoch(msgID, epoch) + +proc handle*(node: MVDSNode, msg: seq[byte]): tuple[messages: seq[Message], response: seq[byte]] = + var payload: Payload = Protobuf.decode(msg, Payload) + for msg in payload.messages: + msg.hash() + return (payload.messages, Protobuf.encode(node.state.handle(payload))) diff --git a/mvds.nimble b/mvds.nimble new file mode 100644 index 0000000..4860a8e --- /dev/null +++ b/mvds.nimble @@ -0,0 +1,15 @@ +mode = ScriptMode.Verbose + +version = "0.1.0" +author = "Status Research & Development GmbH" +description = "Implementation of the Minimum Viable Data Sync protocol." +license = "MIT" +skipDirs = @["tests"] + +requires "nim >= 1.2.0", + "stew", + "nimcrypto", + "https://github.com/status-im/nim-protobuf-serialization" + +task test, "Run all tests": + exec "nim c -d:MVDS_TESTS -r tests/test_all" diff --git a/mvds/Message.nim b/mvds/Message.nim new file mode 100644 index 0000000..a5fb72c --- /dev/null +++ b/mvds/Message.nim @@ -0,0 +1,45 @@ +import times + +import stew/endians2 +import nimcrypto +import protobuf_serialization + +type + Message* = ref object + group* {.fieldNumber: 1.}: seq[byte] #Assigned into a group by developer, not protocol. + time* {.pint, fieldNumber: 2.}: int64 + body* {.fieldNumber: 3.}: seq[byte] + id* {.dontSerialize.}: seq[byte] + + Payload* = object + acks* {.fieldNumber: 1.}: seq[seq[byte]] + offers* {.fieldNumber: 2.}: seq[seq[byte]] + requests* {.fieldNumber: 3.}: seq[seq[byte]] + messages* {.fieldNumber: 4.}: seq[Message] + +proc hash*(msg: Message) = + msg.id = @( + sha256.digest( + cast[seq[byte]]("MESSAGE_ID") & + msg.group & + @(uint64(msg.time).toBytesLE()) & + msg.body + ).data + ) + +proc newMessage*(group: seq[byte], body: seq[byte]): Message = + result = Message( + group: group, + time: getTime().toUnix(), + body: body + ) + result.hash() + +when defined MVDS_TESTS: + proc `==`*(lhs: Message, rhs: Message): bool {.inline.} = + ( + (lhs.group == rhs.group) and + (lhs.time == rhs.time) and + (lhs.body == rhs.body) and + (lhs.id == rhs.id) + ) diff --git a/mvds/State.nim b/mvds/State.nim new file mode 100644 index 0000000..06ac1db --- /dev/null +++ b/mvds/State.nim @@ -0,0 +1,133 @@ +import options +import tables + +import Message as MessageFile + +type + RecordKind* = enum + OfferRecord, #An offer we have for the offer party. + RequestRecord, #A request for something we want from the other party. + MessageRecord #A response to a request from the other party. + + Record* = ref object + kind*: RecordKind + count*: int + epoch*: int + #This is an Option for two reasons. + #1) To signify that there may not be a message attached. + #2) So if a message is ever improperly attached/not attached, we cause a fatal error. + message*: Option[Message] + + State* = ref object + interactive: bool + epoch: int + when defined(MVDS_TESTS): + messages*: Table[seq[byte], Record] + else: + messages: Table[seq[byte], Record] + +proc newState*(interactive: bool): State {.inline.} = + State( + interactive: interactive, + epoch: 0, + messages: initTable[seq[byte], Record]() + ) + +#Offer a new message to this peer. +proc offer*(state: State, msg: Message, epoch: int) {.inline.} = + state.messages[msg.id] = Record( + kind: if state.interactive: OfferRecord else: MessageRecord, + count: 0, + epoch: epoch, + message: some(msg) + ) + +#Returns false if the message has already finished. +proc updateEpoch*(state: State, msgID: seq[byte], epoch: int): bool = + if state.messages.hasKey(msgID): + state.messages[msgID].epoch = epoch + return true + +#Handle a new Payload and generate the next one. +proc handle*(state: State, incoming: Payload): Payload = + #Handle acks. + for ack in incoming.acks: + state.messages.del(ack) + + #Handle offers. + #All we need to create matching requests. + for offer in incoming.offers: + state.messages[offer] = Record( + kind: RequestRecord, + count: 0, + epoch: state.epoch + 1 + ) + + #Handle requests. + #We need to update our existing offer, if it exists, to a Message. + for req in incoming.requests: + if not state.messages.hasKey(req): + continue + state.messages[req] = Record( + kind: MessageRecord, + count: 0, + epoch: state.epoch + 1, + message: state.messages[req].message + ) + + #Handle messages. + #We need to remove our matching requests and create the ack. + result.acks = newSeq[seq[byte]](incoming.messages.len) + for m in 0 ..< incoming.messages.len: + result.acks[m] = incoming.messages[m].id + state.messages.del(incoming.messages[m].id) + + #Generate offers, requests, and messages. + var + o: int = 0 + r: int = 0 + m: int = 0 + result.offers.setLen(state.messages.len) + result.requests.setLen(state.messages.len) + result.messages.setLen(state.messages.len) + + for msg in state.messages.keys(): + var record: Record = state.messages[msg] + #Only transmit this message if it's the first time or its time to retransmit it again. + if (record.count != 0) and (record.epoch > state.epoch): + continue + #Increment the count. + inc(record.count) + + case record.kind: + of OfferRecord: + result.offers[o] = msg + inc(o) + of RequestRecord: + result.requests[r] = msg + inc(r) + of MessageRecord: + result.messages[m] = record.message.get() + inc(m) + + result.offers.setLen(o) + result.requests.setLen(r) + result.messages.setLen(m) + + inc(state.epoch) + +when defined MVDS_TESTS: + proc `==`*(lhs: Record, rhs: Record): bool {.inline.} = + ( + (lhs.kind == rhs.kind) and + (lhs.count == rhs.count) and + (lhs.epoch == rhs.epoch) and + ( + (lhs.message.isNone() and rhs.message.isNone()) or + ( + lhs.message.isSome() and + rhs.message.isSome() and + (lhs.message.get() == rhs.message.get()) + ) + ) + ) diff --git a/tests/test_all.nim b/tests/test_all.nim new file mode 100644 index 0000000..a7fc093 --- /dev/null +++ b/tests/test_all.nim @@ -0,0 +1,5 @@ +{.warning[UnusedImport]: off} + +import test_batch +import test_interactive +import test_go_vectors diff --git a/tests/test_batch.nim b/tests/test_batch.nim new file mode 100644 index 0000000..91d4883 --- /dev/null +++ b/tests/test_batch.nim @@ -0,0 +1,99 @@ +import random +import times +import options +import tables +import unittest + +import ../mvds +import../mvds/State + +suite "Batch": + randomize(getTime().toUnix()) + + setup: + var + alice: MVDSNode = newMVDSNode(false) + bob: MVDSNode = newMVDSNode(false) + res: tuple[messages: seq[Message], response: seq[byte]] + var + groupID: seq[byte] = newSeq[byte](rand(100)) + body: seq[byte] = newSeq[byte](rand(500)) + for i in 0 ..< groupID.len: + groupID[i] = byte(rand(255)) + for i in 0 ..< body.len: + body[i] = byte(rand(255)) + var msg: Message = newMessage(groupID, body) + + test "Nothing": + check alice.handle(@[]) == res + + test "Single message": + alice.offer(msg, 0) + res = alice.handle(@[]) + check: + res.messages.len == 0 + alice.state.messages.len == 1 + alice.state.messages.hasKey(msg.id) + + alice.state.messages[msg.id] == Record( + kind: MessageRecord, + count: 1, + epoch: 0, + message: some(msg) + ) + + res = bob.handle(res.response) + check: + res.messages.len == 1 + res.messages[0] == msg + bob.state.messages.len == 0 + + check: + alice.handle(res.response) == (messages: @[], response: @[]) + alice.state.messages.len == 0 + + test "No ack": + alice.offer(msg, 0) + var origRes: tuple[messages: seq[Message], response: seq[byte]] = alice.handle(@[]) + for i in 2 ..< 7: + res = alice.handle(@[]) + check: + res == origRes + res.messages.len == 0 + alice.state.messages.len == 1 + alice.state.messages.hasKey(msg.id) + + alice.state.messages[msg.id] == Record( + kind: MessageRecord, + count: i, + epoch: 0, + message: some(msg) + ) + + test "Epoch manipulation": + alice.offer(msg, 0) + var origRes: tuple[messages: seq[Message], response: seq[byte]] = alice.handle(@[]) + check alice.updateEpoch(msg.id, 7) + for i in 1 ..< 7: + res = alice.handle(@[]) + check: + res.messages.len == 0 + res.response.len == 0 + alice.state.messages.len == 1 + alice.state.messages.hasKey(msg.id) + + alice.state.messages[msg.id] == Record( + kind: MessageRecord, + count: 1, + epoch: 7, + message: some(msg) + ) + + check: + alice.handle(@[]) == origRes + alice.state.messages.len == 1 + + discard alice.handle(bob.handle(origRes.response).response) + check: + alice.state.messages.len == 0 + not alice.updateEpoch(msg.id, 9) diff --git a/tests/test_go_vectors.nim b/tests/test_go_vectors.nim new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_interactive.nim b/tests/test_interactive.nim new file mode 100644 index 0000000..da19287 --- /dev/null +++ b/tests/test_interactive.nim @@ -0,0 +1,81 @@ +import random +import times +import options +import tables +import unittest + +import ../mvds +import../mvds/State + +suite "Interactive": + randomize(getTime().toUnix()) + + setup: + var + alice: MVDSNode = newMVDSNode(true) + bob: MVDSNode = newMVDSNode(true) + res: tuple[messages: seq[Message], response: seq[byte]] + + test "Nothing": + check alice.handle(@[]) == res + + test "Single message": + var + groupID: seq[byte] = newSeq[byte](rand(100)) + body: seq[byte] = newSeq[byte](rand(500)) + for i in 0 ..< groupID.len: + groupID[i] = byte(rand(255)) + for i in 0 ..< body.len: + body[i] = byte(rand(255)) + + var msg: Message = newMessage(groupID, body) + alice.offer(msg, 0) + res = alice.handle(@[]) + check: + res.messages.len == 0 + alice.state.messages.len == 1 + alice.state.messages.hasKey(msg.id) + + alice.state.messages[msg.id] == Record( + kind: OfferRecord, + count: 1, + epoch: 0, + message: some(msg) + ) + + res = bob.handle(res.response) + check: + res.messages.len == 0 + bob.state.messages.len == 1 + bob.state.messages.hasKey(msg.id) + + bob.state.messages[msg.id] == Record( + kind: RequestRecord, + count: 1, + epoch: 1, + message: none(Message) + ) + + res = alice.handle(res.response) + check: + res.messages.len == 0 + alice.state.messages.len == 1 + alice.state.messages.hasKey(msg.id) + + alice.state.messages[msg.id] == Record( + kind: MessageRecord, + count: 1, + epoch: 2, + message: some(msg) + ) + + res = bob.handle(res.response) + check: + res.messages.len == 1 + res.messages[0] == msg + bob.state.messages.len == 0 + + res = alice.handle(res.response) + check: + alice.handle(res.response) == (messages: @[], response: @[]) + alice.state.messages.len == 0