MVDS implementation in Nim.
Still requires testing against the reference implementation, and the API needs discussion.
This commit is contained in:
commit
e9673fa208
|
@ -0,0 +1 @@
|
|||
nimcache/
|
|
@ -0,0 +1,21 @@
|
|||
MIT License
|
||||
|
||||
Copyright (c) 2020 Status Research & Development GmbH
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
|
@ -0,0 +1,29 @@
|
|||
import protobuf_serialization
|
||||
|
||||
import mvds/Message
|
||||
export Message
|
||||
|
||||
import mvds/State
|
||||
|
||||
type MVDSNode* = ref object
|
||||
when defined(MVDS_TESTS):
|
||||
state*: State
|
||||
else:
|
||||
state: State
|
||||
|
||||
proc newMVDSNode*(interactive: bool): MVDSNode {.inline.} =
|
||||
MVDSNode(
|
||||
state: newState(interactive)
|
||||
)
|
||||
|
||||
proc offer*(node: MVDSNode, msg: Message, epoch: int) {.inline.} =
|
||||
node.state.offer(msg, epoch)
|
||||
|
||||
proc updateEpoch*(node: MVDSNode, msgID: seq[byte], epoch: int): bool =
|
||||
node.state.updateEpoch(msgID, epoch)
|
||||
|
||||
proc handle*(node: MVDSNode, msg: seq[byte]): tuple[messages: seq[Message], response: seq[byte]] =
|
||||
var payload: Payload = Protobuf.decode(msg, Payload)
|
||||
for msg in payload.messages:
|
||||
msg.hash()
|
||||
return (payload.messages, Protobuf.encode(node.state.handle(payload)))
|
|
@ -0,0 +1,15 @@
|
|||
mode = ScriptMode.Verbose
|
||||
|
||||
version = "0.1.0"
|
||||
author = "Status Research & Development GmbH"
|
||||
description = "Implementation of the Minimum Viable Data Sync protocol."
|
||||
license = "MIT"
|
||||
skipDirs = @["tests"]
|
||||
|
||||
requires "nim >= 1.2.0",
|
||||
"stew",
|
||||
"nimcrypto",
|
||||
"https://github.com/status-im/nim-protobuf-serialization"
|
||||
|
||||
task test, "Run all tests":
|
||||
exec "nim c -d:MVDS_TESTS -r tests/test_all"
|
|
@ -0,0 +1,45 @@
|
|||
import times
|
||||
|
||||
import stew/endians2
|
||||
import nimcrypto
|
||||
import protobuf_serialization
|
||||
|
||||
type
|
||||
Message* = ref object
|
||||
group* {.fieldNumber: 1.}: seq[byte] #Assigned into a group by developer, not protocol.
|
||||
time* {.pint, fieldNumber: 2.}: int64
|
||||
body* {.fieldNumber: 3.}: seq[byte]
|
||||
id* {.dontSerialize.}: seq[byte]
|
||||
|
||||
Payload* = object
|
||||
acks* {.fieldNumber: 1.}: seq[seq[byte]]
|
||||
offers* {.fieldNumber: 2.}: seq[seq[byte]]
|
||||
requests* {.fieldNumber: 3.}: seq[seq[byte]]
|
||||
messages* {.fieldNumber: 4.}: seq[Message]
|
||||
|
||||
proc hash*(msg: Message) =
|
||||
msg.id = @(
|
||||
sha256.digest(
|
||||
cast[seq[byte]]("MESSAGE_ID") &
|
||||
msg.group &
|
||||
@(uint64(msg.time).toBytesLE()) &
|
||||
msg.body
|
||||
).data
|
||||
)
|
||||
|
||||
proc newMessage*(group: seq[byte], body: seq[byte]): Message =
|
||||
result = Message(
|
||||
group: group,
|
||||
time: getTime().toUnix(),
|
||||
body: body
|
||||
)
|
||||
result.hash()
|
||||
|
||||
when defined MVDS_TESTS:
|
||||
proc `==`*(lhs: Message, rhs: Message): bool {.inline.} =
|
||||
(
|
||||
(lhs.group == rhs.group) and
|
||||
(lhs.time == rhs.time) and
|
||||
(lhs.body == rhs.body) and
|
||||
(lhs.id == rhs.id)
|
||||
)
|
|
@ -0,0 +1,133 @@
|
|||
import options
|
||||
import tables
|
||||
|
||||
import Message as MessageFile
|
||||
|
||||
type
|
||||
RecordKind* = enum
|
||||
OfferRecord, #An offer we have for the offer party.
|
||||
RequestRecord, #A request for something we want from the other party.
|
||||
MessageRecord #A response to a request from the other party.
|
||||
|
||||
Record* = ref object
|
||||
kind*: RecordKind
|
||||
count*: int
|
||||
epoch*: int
|
||||
#This is an Option for two reasons.
|
||||
#1) To signify that there may not be a message attached.
|
||||
#2) So if a message is ever improperly attached/not attached, we cause a fatal error.
|
||||
message*: Option[Message]
|
||||
|
||||
State* = ref object
|
||||
interactive: bool
|
||||
epoch: int
|
||||
when defined(MVDS_TESTS):
|
||||
messages*: Table[seq[byte], Record]
|
||||
else:
|
||||
messages: Table[seq[byte], Record]
|
||||
|
||||
proc newState*(interactive: bool): State {.inline.} =
|
||||
State(
|
||||
interactive: interactive,
|
||||
epoch: 0,
|
||||
messages: initTable[seq[byte], Record]()
|
||||
)
|
||||
|
||||
#Offer a new message to this peer.
|
||||
proc offer*(state: State, msg: Message, epoch: int) {.inline.} =
|
||||
state.messages[msg.id] = Record(
|
||||
kind: if state.interactive: OfferRecord else: MessageRecord,
|
||||
count: 0,
|
||||
epoch: epoch,
|
||||
message: some(msg)
|
||||
)
|
||||
|
||||
#Returns false if the message has already finished.
|
||||
proc updateEpoch*(state: State, msgID: seq[byte], epoch: int): bool =
|
||||
if state.messages.hasKey(msgID):
|
||||
state.messages[msgID].epoch = epoch
|
||||
return true
|
||||
|
||||
#Handle a new Payload and generate the next one.
|
||||
proc handle*(state: State, incoming: Payload): Payload =
|
||||
#Handle acks.
|
||||
for ack in incoming.acks:
|
||||
state.messages.del(ack)
|
||||
|
||||
#Handle offers.
|
||||
#All we need to create matching requests.
|
||||
for offer in incoming.offers:
|
||||
state.messages[offer] = Record(
|
||||
kind: RequestRecord,
|
||||
count: 0,
|
||||
epoch: state.epoch + 1
|
||||
)
|
||||
|
||||
#Handle requests.
|
||||
#We need to update our existing offer, if it exists, to a Message.
|
||||
for req in incoming.requests:
|
||||
if not state.messages.hasKey(req):
|
||||
continue
|
||||
state.messages[req] = Record(
|
||||
kind: MessageRecord,
|
||||
count: 0,
|
||||
epoch: state.epoch + 1,
|
||||
message: state.messages[req].message
|
||||
)
|
||||
|
||||
#Handle messages.
|
||||
#We need to remove our matching requests and create the ack.
|
||||
result.acks = newSeq[seq[byte]](incoming.messages.len)
|
||||
for m in 0 ..< incoming.messages.len:
|
||||
result.acks[m] = incoming.messages[m].id
|
||||
state.messages.del(incoming.messages[m].id)
|
||||
|
||||
#Generate offers, requests, and messages.
|
||||
var
|
||||
o: int = 0
|
||||
r: int = 0
|
||||
m: int = 0
|
||||
result.offers.setLen(state.messages.len)
|
||||
result.requests.setLen(state.messages.len)
|
||||
result.messages.setLen(state.messages.len)
|
||||
|
||||
for msg in state.messages.keys():
|
||||
var record: Record = state.messages[msg]
|
||||
#Only transmit this message if it's the first time or its time to retransmit it again.
|
||||
if (record.count != 0) and (record.epoch > state.epoch):
|
||||
continue
|
||||
#Increment the count.
|
||||
inc(record.count)
|
||||
|
||||
case record.kind:
|
||||
of OfferRecord:
|
||||
result.offers[o] = msg
|
||||
inc(o)
|
||||
of RequestRecord:
|
||||
result.requests[r] = msg
|
||||
inc(r)
|
||||
of MessageRecord:
|
||||
result.messages[m] = record.message.get()
|
||||
inc(m)
|
||||
|
||||
result.offers.setLen(o)
|
||||
result.requests.setLen(r)
|
||||
result.messages.setLen(m)
|
||||
|
||||
inc(state.epoch)
|
||||
|
||||
when defined MVDS_TESTS:
|
||||
proc `==`*(lhs: Record, rhs: Record): bool {.inline.} =
|
||||
(
|
||||
(lhs.kind == rhs.kind) and
|
||||
(lhs.count == rhs.count) and
|
||||
(lhs.epoch == rhs.epoch) and
|
||||
(
|
||||
(lhs.message.isNone() and rhs.message.isNone()) or
|
||||
(
|
||||
lhs.message.isSome() and
|
||||
rhs.message.isSome() and
|
||||
(lhs.message.get() == rhs.message.get())
|
||||
)
|
||||
)
|
||||
)
|
|
@ -0,0 +1,5 @@
|
|||
{.warning[UnusedImport]: off}
|
||||
|
||||
import test_batch
|
||||
import test_interactive
|
||||
import test_go_vectors
|
|
@ -0,0 +1,99 @@
|
|||
import random
|
||||
import times
|
||||
import options
|
||||
import tables
|
||||
import unittest
|
||||
|
||||
import ../mvds
|
||||
import../mvds/State
|
||||
|
||||
suite "Batch":
|
||||
randomize(getTime().toUnix())
|
||||
|
||||
setup:
|
||||
var
|
||||
alice: MVDSNode = newMVDSNode(false)
|
||||
bob: MVDSNode = newMVDSNode(false)
|
||||
res: tuple[messages: seq[Message], response: seq[byte]]
|
||||
var
|
||||
groupID: seq[byte] = newSeq[byte](rand(100))
|
||||
body: seq[byte] = newSeq[byte](rand(500))
|
||||
for i in 0 ..< groupID.len:
|
||||
groupID[i] = byte(rand(255))
|
||||
for i in 0 ..< body.len:
|
||||
body[i] = byte(rand(255))
|
||||
var msg: Message = newMessage(groupID, body)
|
||||
|
||||
test "Nothing":
|
||||
check alice.handle(@[]) == res
|
||||
|
||||
test "Single message":
|
||||
alice.offer(msg, 0)
|
||||
res = alice.handle(@[])
|
||||
check:
|
||||
res.messages.len == 0
|
||||
alice.state.messages.len == 1
|
||||
alice.state.messages.hasKey(msg.id)
|
||||
|
||||
alice.state.messages[msg.id] == Record(
|
||||
kind: MessageRecord,
|
||||
count: 1,
|
||||
epoch: 0,
|
||||
message: some(msg)
|
||||
)
|
||||
|
||||
res = bob.handle(res.response)
|
||||
check:
|
||||
res.messages.len == 1
|
||||
res.messages[0] == msg
|
||||
bob.state.messages.len == 0
|
||||
|
||||
check:
|
||||
alice.handle(res.response) == (messages: @[], response: @[])
|
||||
alice.state.messages.len == 0
|
||||
|
||||
test "No ack":
|
||||
alice.offer(msg, 0)
|
||||
var origRes: tuple[messages: seq[Message], response: seq[byte]] = alice.handle(@[])
|
||||
for i in 2 ..< 7:
|
||||
res = alice.handle(@[])
|
||||
check:
|
||||
res == origRes
|
||||
res.messages.len == 0
|
||||
alice.state.messages.len == 1
|
||||
alice.state.messages.hasKey(msg.id)
|
||||
|
||||
alice.state.messages[msg.id] == Record(
|
||||
kind: MessageRecord,
|
||||
count: i,
|
||||
epoch: 0,
|
||||
message: some(msg)
|
||||
)
|
||||
|
||||
test "Epoch manipulation":
|
||||
alice.offer(msg, 0)
|
||||
var origRes: tuple[messages: seq[Message], response: seq[byte]] = alice.handle(@[])
|
||||
check alice.updateEpoch(msg.id, 7)
|
||||
for i in 1 ..< 7:
|
||||
res = alice.handle(@[])
|
||||
check:
|
||||
res.messages.len == 0
|
||||
res.response.len == 0
|
||||
alice.state.messages.len == 1
|
||||
alice.state.messages.hasKey(msg.id)
|
||||
|
||||
alice.state.messages[msg.id] == Record(
|
||||
kind: MessageRecord,
|
||||
count: 1,
|
||||
epoch: 7,
|
||||
message: some(msg)
|
||||
)
|
||||
|
||||
check:
|
||||
alice.handle(@[]) == origRes
|
||||
alice.state.messages.len == 1
|
||||
|
||||
discard alice.handle(bob.handle(origRes.response).response)
|
||||
check:
|
||||
alice.state.messages.len == 0
|
||||
not alice.updateEpoch(msg.id, 9)
|
|
@ -0,0 +1,81 @@
|
|||
import random
|
||||
import times
|
||||
import options
|
||||
import tables
|
||||
import unittest
|
||||
|
||||
import ../mvds
|
||||
import../mvds/State
|
||||
|
||||
suite "Interactive":
|
||||
randomize(getTime().toUnix())
|
||||
|
||||
setup:
|
||||
var
|
||||
alice: MVDSNode = newMVDSNode(true)
|
||||
bob: MVDSNode = newMVDSNode(true)
|
||||
res: tuple[messages: seq[Message], response: seq[byte]]
|
||||
|
||||
test "Nothing":
|
||||
check alice.handle(@[]) == res
|
||||
|
||||
test "Single message":
|
||||
var
|
||||
groupID: seq[byte] = newSeq[byte](rand(100))
|
||||
body: seq[byte] = newSeq[byte](rand(500))
|
||||
for i in 0 ..< groupID.len:
|
||||
groupID[i] = byte(rand(255))
|
||||
for i in 0 ..< body.len:
|
||||
body[i] = byte(rand(255))
|
||||
|
||||
var msg: Message = newMessage(groupID, body)
|
||||
alice.offer(msg, 0)
|
||||
res = alice.handle(@[])
|
||||
check:
|
||||
res.messages.len == 0
|
||||
alice.state.messages.len == 1
|
||||
alice.state.messages.hasKey(msg.id)
|
||||
|
||||
alice.state.messages[msg.id] == Record(
|
||||
kind: OfferRecord,
|
||||
count: 1,
|
||||
epoch: 0,
|
||||
message: some(msg)
|
||||
)
|
||||
|
||||
res = bob.handle(res.response)
|
||||
check:
|
||||
res.messages.len == 0
|
||||
bob.state.messages.len == 1
|
||||
bob.state.messages.hasKey(msg.id)
|
||||
|
||||
bob.state.messages[msg.id] == Record(
|
||||
kind: RequestRecord,
|
||||
count: 1,
|
||||
epoch: 1,
|
||||
message: none(Message)
|
||||
)
|
||||
|
||||
res = alice.handle(res.response)
|
||||
check:
|
||||
res.messages.len == 0
|
||||
alice.state.messages.len == 1
|
||||
alice.state.messages.hasKey(msg.id)
|
||||
|
||||
alice.state.messages[msg.id] == Record(
|
||||
kind: MessageRecord,
|
||||
count: 1,
|
||||
epoch: 2,
|
||||
message: some(msg)
|
||||
)
|
||||
|
||||
res = bob.handle(res.response)
|
||||
check:
|
||||
res.messages.len == 1
|
||||
res.messages[0] == msg
|
||||
bob.state.messages.len == 0
|
||||
|
||||
res = alice.handle(res.response)
|
||||
check:
|
||||
alice.handle(res.response) == (messages: @[], response: @[])
|
||||
alice.state.messages.len == 0
|
Loading…
Reference in New Issue