mirror of
https://github.com/logos-storage/swarmsim.git
synced 2026-01-08 00:33:06 +00:00
add multiple message support and message naming macros
This commit is contained in:
parent
cc2f854bb1
commit
de6bf48b57
@ -5,6 +5,7 @@ import std/tables
|
||||
import std/sequtils
|
||||
|
||||
import ../engine
|
||||
import ../engine/message
|
||||
|
||||
export protocol
|
||||
export options
|
||||
@ -24,21 +25,22 @@ type
|
||||
peers: OrderedTable[int, PeerDescriptor]
|
||||
shuffler: ArrayShuffler
|
||||
|
||||
PeerAnnouncement* = ref object of Message
|
||||
peerId*: int
|
||||
|
||||
SampleSwarm* = ref object of Message
|
||||
numPeers: uint
|
||||
|
||||
ExpirationTimer* = ref object of SchedulableEvent
|
||||
peerId*: int
|
||||
tracker: DHTTracker
|
||||
|
||||
typedMessage:
|
||||
type
|
||||
PeerAnnouncement* = ref object of Message
|
||||
peerId*: int
|
||||
|
||||
SampleSwarm* = ref object of Message
|
||||
numPeers: uint
|
||||
|
||||
let RandomShuffler = proc (arr: var seq[PeerDescriptor]) =
|
||||
discard arr.nextPermutation()
|
||||
|
||||
proc protocolId*(T: type DHTTracker): string = "DHTTracker"
|
||||
|
||||
proc defaultExpiry*(T: type DHTTracker): Duration = 15.dminutes
|
||||
|
||||
proc new*(
|
||||
@ -49,11 +51,12 @@ proc new*(
|
||||
): DHTTracker =
|
||||
DHTTracker(
|
||||
# This should in general be safe as those are always positive.
|
||||
id: "DHTTracker",
|
||||
peerExpiration: peerExpiration,
|
||||
maxPeers: maxPeers,
|
||||
shuffler: shuffler,
|
||||
peers: initOrderedTable[int, PeerDescriptor](),
|
||||
protocolId: DHTTracker.protocolId
|
||||
messageTypes: @[PeerAnnouncement.messageType, SampleSwarm.messageType]
|
||||
)
|
||||
|
||||
proc peers*(self: DHTTracker): seq[PeerDescriptor] = self.peers.values.toSeq()
|
||||
|
||||
52
swarmsim/engine/message.nim
Normal file
52
swarmsim/engine/message.nim
Normal file
@ -0,0 +1,52 @@
|
||||
import options
|
||||
import macros
|
||||
|
||||
import ./types
|
||||
|
||||
method `messageType`*(self: Message): string {.base.} =
|
||||
raise newException(CatchableError, "Method without implementation override")
|
||||
|
||||
method `messageType`*(self: FreelyTypedMessage): string = self.messageType
|
||||
|
||||
func typeName(typeDef: NimNode): Option[NimNode] =
|
||||
expectKind typeDef, nnkTypeDef
|
||||
|
||||
return if typeDef[0].kind == nnkIdent:
|
||||
typeDef[0].some
|
||||
elif typeDef[0].kind == nnkPostfix:
|
||||
typeDef[0][1].some
|
||||
else:
|
||||
none(NimNode)
|
||||
|
||||
macro typedMessage*(body: untyped): untyped =
|
||||
expectKind body, nnkStmtList
|
||||
expectKind body[0], nnkTypeSection
|
||||
|
||||
for statement in body[0]:
|
||||
if statement.kind != nnkTypeDef:
|
||||
continue
|
||||
|
||||
let maybeTypename = typeName(statement)
|
||||
if maybeTypename.isNone:
|
||||
error("unable to get type name from AST. Sorry.")
|
||||
|
||||
let typeIdent = maybeTypename.get
|
||||
let typeName = newLit(typeIdent.strVal)
|
||||
|
||||
let typeProc = quote do:
|
||||
proc messageType*(self: type `typeIdent`): string = `typeName`
|
||||
|
||||
let instanceProc = quote do:
|
||||
method messageType*(self: `typeIdent`): string = `typeIdent`.messageType
|
||||
|
||||
# We replace the proc name with a quoted symbol so it turns into a
|
||||
# getter.
|
||||
typeProc[0][1] = newTree(nnkAccQuoted, typeProc[0][1])
|
||||
instanceProc[0][1] = newTree(nnkAccQuoted, instanceProc[0][1])
|
||||
|
||||
body.add(typeProc)
|
||||
body.add(instanceProc)
|
||||
|
||||
return body
|
||||
|
||||
export Message, FreelyTypedMessage
|
||||
@ -1,33 +1,42 @@
|
||||
import std/tables
|
||||
import std/options
|
||||
import std/random
|
||||
import std/hashes
|
||||
import sequtils
|
||||
|
||||
import ./types
|
||||
import ./message
|
||||
import ./protocol
|
||||
import ./eventdrivenengine
|
||||
import ../lib/multitable
|
||||
|
||||
export options
|
||||
export tables
|
||||
export multitable
|
||||
export protocol
|
||||
export eventdrivenengine
|
||||
export Peer
|
||||
|
||||
proc getProtocol*(self: Peer, protocolId: string): Option[Protocol] =
|
||||
if self.protocols.hasKey(protocolId):
|
||||
return self.protocols[protocolId].some
|
||||
proc getProtocol*(self: Peer, id: string): Option[Protocol] =
|
||||
if self.protocols.hasKey(id):
|
||||
return self.protocols[id].some
|
||||
|
||||
none(Protocol)
|
||||
|
||||
proc addProtocol*(self: Peer, protocol: Protocol): void =
|
||||
self.protocols[protocol.id] = protocol
|
||||
|
||||
proc deliver*(self: Peer, message: Message, engine: EventDrivenEngine,
|
||||
network: Network): void =
|
||||
self.getProtocol(message.protocolId).map(
|
||||
self.dispatch.getOrDefault(message.messageType, @[]).apply(
|
||||
proc (p: Protocol): void = p.deliver(message, engine, network))
|
||||
|
||||
proc initPeer(self: Peer, protocols: seq[Protocol]): Peer =
|
||||
# XXX integer indexes or an enum would be better, but this is easier
|
||||
for protocol in protocols:
|
||||
self.protocols[protocol.protocolId] = protocol
|
||||
let protocol = protocol # https://github.com/nim-lang/Nim/issues/16740
|
||||
|
||||
self.protocols[protocol.id] = protocol
|
||||
protocol.messageTypes.apply(proc (m: string): void =
|
||||
self.dispatch.add(m, protocol))
|
||||
|
||||
self
|
||||
|
||||
@ -41,4 +50,8 @@ proc new*(
|
||||
# XXX I can't have put this in the init proc as that would mean allowing public
|
||||
# write access to every field in Peer. Not sure how to solve this in nim.
|
||||
let peerId = peerId.get(rand(high(int)))
|
||||
initPeer(Peer(protocols: initTable[string, Protocol](), peerId: peerId), protocols)
|
||||
initPeer(Peer(
|
||||
protocols: initTable[string, Protocol](),
|
||||
peerId: peerId,
|
||||
dispatch: MultiTable[string, Protocol].new()
|
||||
), protocols)
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
import typetraits
|
||||
|
||||
import ./types
|
||||
import ./eventdrivenengine
|
||||
|
||||
@ -15,5 +17,6 @@ method uncheckedDeliver(
|
||||
|
||||
proc deliver*(self: Protocol, message: Message, engine: EventDrivenEngine,
|
||||
network: Network): void =
|
||||
assert(self.protocolId == message.protocolId)
|
||||
self.uncheckedDeliver(message, engine, network)
|
||||
|
||||
proc protocolName*[T: Protocol](self: type T): string = name(T)
|
||||
|
||||
@ -4,6 +4,8 @@ import std/sets
|
||||
import std/options
|
||||
import std/random
|
||||
|
||||
import ../lib/multitable
|
||||
|
||||
export heapqueue
|
||||
export option
|
||||
export random
|
||||
@ -30,21 +32,27 @@ type
|
||||
## A `Protocol` defines a P2P protocol. It handles messages meant for it,
|
||||
## keeps internal state, and may expose services to other `Protocol`s within
|
||||
## the same `Peer`.
|
||||
protocolId*: string
|
||||
id*: string
|
||||
messageTypes*: seq[string]
|
||||
|
||||
type
|
||||
Peer* = ref object of RootObj
|
||||
peerId*: int
|
||||
protocols*: Table[string, Protocol]
|
||||
dispatch*: MultiTable[string, Protocol]
|
||||
|
||||
type
|
||||
Message* = ref object of RootObj
|
||||
## A `Message` is a piece of data that is sent over the network. Its meaning
|
||||
## is typically protocol-specific.
|
||||
protocolId*: string
|
||||
sender*: Option[Peer] = none(Peer)
|
||||
receiver*: Peer
|
||||
|
||||
FreelyTypedMessage* = ref object of Message
|
||||
## A `FreelyTypedMessage` is a `Message` that can be of any type.
|
||||
##
|
||||
messageType*: string
|
||||
|
||||
type
|
||||
Network* = ref object of RootObj
|
||||
## A `Network` is a collection of `Peer`s that can communicate with each
|
||||
|
||||
28
swarmsim/lib/multitable.nim
Normal file
28
swarmsim/lib/multitable.nim
Normal file
@ -0,0 +1,28 @@
|
||||
import std/tables
|
||||
|
||||
type MultiTable[K, V] = Table[K, seq[V]]
|
||||
|
||||
proc new*[K, V](T: type MultiTable[K, V]): MultiTable[K, V] =
|
||||
initTable[K, seq[V]]()
|
||||
|
||||
proc add*[K, V](self: var MultiTable[K, V], key: K, value: V) =
|
||||
discard self.hasKeyOrPut(key, @[])
|
||||
self[key].add(value)
|
||||
|
||||
proc remove*[K, V](self: var MultiTable[K, V], key: K, value: V) =
|
||||
if not self.hasKey(key):
|
||||
return
|
||||
|
||||
var values = self[key]
|
||||
let index = values.find(value)
|
||||
if index <= 0:
|
||||
return
|
||||
|
||||
values.delete(index)
|
||||
if values.len == 0:
|
||||
self.del(key)
|
||||
else:
|
||||
self[key] = values
|
||||
|
||||
export MultiTable
|
||||
export tables
|
||||
@ -2,7 +2,9 @@ import engine/teventdrivenengine
|
||||
import engine/tschedulableevent
|
||||
import engine/tnetwork
|
||||
import engine/tpeer
|
||||
import engine/tmessage
|
||||
import codex/tdhttracker
|
||||
import lib/tmultitable
|
||||
|
||||
{.warning[UnusedImport]: off.}
|
||||
|
||||
|
||||
@ -13,7 +13,7 @@ import pkg/swarmsim/timeutils
|
||||
|
||||
proc getPeerArray(tracker: Peer): seq[PeerDescriptor] =
|
||||
DHTTracker(
|
||||
tracker.getProtocol(DHTTracker.protocolId).get()).peers
|
||||
tracker.getProtocol(DHTTracker.protocolName).get()).peers
|
||||
|
||||
proc getPeerIdArray(tracker: Peer): seq[int] =
|
||||
getPeerArray(tracker).map(p => p.peerId)
|
||||
@ -21,8 +21,7 @@ proc getPeerIdArray(tracker: Peer): seq[int] =
|
||||
proc announcePeer(network: Network, tracker: Peer, peerId: int,
|
||||
delay: uint64 = 0) =
|
||||
network.send(
|
||||
PeerAnnouncement(receiver: tracker,
|
||||
protocolId: DHTTracker.protocolId, peerId: peerId),
|
||||
PeerAnnouncement(receiver: tracker, peerId: peerId),
|
||||
delay.some).doAwait()
|
||||
|
||||
suite "tracker node":
|
||||
|
||||
20
tests/engine/tmessage.nim
Normal file
20
tests/engine/tmessage.nim
Normal file
@ -0,0 +1,20 @@
|
||||
import unittest
|
||||
|
||||
import swarmsim/engine/message
|
||||
|
||||
typedMessage:
|
||||
type
|
||||
PeerAnnouncement* = object of Message
|
||||
peerId*: int
|
||||
|
||||
PrivateMessage = object of Message
|
||||
|
||||
suite "message":
|
||||
test "should automatically generate a type string for typedMessage types":
|
||||
check(PeerAnnouncement.messageType == "PeerAnnouncement")
|
||||
check(PrivateMessage.messageType == "PrivateMessage")
|
||||
|
||||
test "should automatically generate a type string for typedMessage instances":
|
||||
check(PeerAnnouncement(peerId: 1).messageType == "PeerAnnouncement")
|
||||
check(PrivateMessage().messageType == "PrivateMessage")
|
||||
|
||||
@ -1,53 +1,46 @@
|
||||
import unittest
|
||||
|
||||
import swarmsim/engine/message
|
||||
import swarmsim/engine/eventdrivenengine
|
||||
import swarmsim/engine/network
|
||||
import swarmsim/engine/peer
|
||||
import swarmsim/engine/protocol
|
||||
|
||||
type
|
||||
FakeProtocol = ref object of Protocol
|
||||
received: bool
|
||||
|
||||
method uncheckedDeliver(self: FakeProtocol, message: Message,
|
||||
engine: EventDrivenEngine, network: Network) =
|
||||
self.received = true
|
||||
|
||||
proc getFakeProtocol(peer: Peer, protocolId: string): FakeProtocol =
|
||||
let protocol = peer.getProtocol(protocolId)
|
||||
check(protocol.isSome)
|
||||
return FakeProtocol(protocol.get())
|
||||
import ../helpers/inbox
|
||||
|
||||
suite "network":
|
||||
test "should dispatch message to the correct protocol within a peer":
|
||||
test "should dispatch message to the correct peer":
|
||||
|
||||
let engine = EventDrivenEngine()
|
||||
|
||||
let peer = Peer.new(
|
||||
protocols = @[
|
||||
Protocol FakeProtocol(messageType: "protocol1", received: false),
|
||||
FakeProtocol(messageType: "protocol2", received: false)
|
||||
]
|
||||
)
|
||||
let i1 = Inbox(id: "inbox", messageTypes: @["m"])
|
||||
let i2 = Inbox(id: "inbox", messageTypes: @["m"])
|
||||
|
||||
let p1 = Peer.new(protocols = @[Protocol i1])
|
||||
let p2 = Peer.new(protocols = @[Protocol i2])
|
||||
|
||||
let network = Network.new(engine = engine, defaultLinkDelay = 20)
|
||||
|
||||
network.add(peer)
|
||||
network.add(p1)
|
||||
network.add(p2)
|
||||
|
||||
let m1 = Message(receiver: peer, messageType: "protocol1")
|
||||
let m2 = Message(receiver: peer, messageType: "protocol2")
|
||||
let m1: Message = FreelyTypedMessage(receiver: p1, messageType: "m")
|
||||
let m2: Message = FreelyTypedMessage(receiver: p2, messageType: "m")
|
||||
|
||||
let message2handle = network.send(m2, linkDelay = uint64(10).some)
|
||||
let message1handle = network.send(m1, linkDelay = uint64(5).some)
|
||||
|
||||
check(not peer.getFakeProtocol("protocol1").received)
|
||||
check(not peer.getFakeProtocol("protocol2").received)
|
||||
let noMessages: seq[Message] = @[]
|
||||
|
||||
check(i1.messages == noMessages)
|
||||
check(i2.messages == noMessages)
|
||||
|
||||
message1Handle.doAwait()
|
||||
|
||||
check(peer.getFakeProtocol("protocol1").received)
|
||||
check(not peer.getFakeProtocol("protocol2").received)
|
||||
check(i1.messages == @[m1])
|
||||
check(i2.messages == noMessages)
|
||||
|
||||
message2Handle.doAwait()
|
||||
|
||||
check(peer.getFakeProtocol("protocol1").received)
|
||||
check(peer.getFakeProtocol("protocol2").received)
|
||||
|
||||
check(i1.messages == @[m1])
|
||||
check(i2.messages == @[m2])
|
||||
|
||||
@ -1,9 +1,20 @@
|
||||
import std/unittest
|
||||
import unittest
|
||||
import std/sets
|
||||
|
||||
import swarmsim/engine/eventdrivenengine
|
||||
import swarmsim/engine/network
|
||||
import swarmsim/engine/peer
|
||||
|
||||
import ../helpers/inbox
|
||||
|
||||
# We need this here as otherwise for some reason the nim compiler trips.
|
||||
proc `$`*(m: Message): string = repr m
|
||||
|
||||
suite "peer":
|
||||
setup:
|
||||
let engine = EventDrivenEngine()
|
||||
let network = Network.new(engine = engine, defaultLinkDelay = 20)
|
||||
|
||||
test "should allow inclusion and membership tests on a HashSet":
|
||||
var peerSet = HashSet[Peer]()
|
||||
|
||||
@ -18,3 +29,51 @@ suite "peer":
|
||||
peerSet.excl(p1)
|
||||
|
||||
check(not peerSet.contains(p1))
|
||||
|
||||
test "should dispatch message to correct protocol":
|
||||
let i1 = Inbox(id: "protocol1", messageTypes: @["m1"])
|
||||
let i2 = Inbox(id: "protocol2", messageTypes: @["m2"])
|
||||
|
||||
let peer = Peer.new(protocols = @[Protocol i1, i2])
|
||||
|
||||
let m1: Message = FreelyTypedMessage(receiver: peer, messageType: "m1")
|
||||
let m2: Message = FreelyTypedMessage(receiver: peer, messageType: "m2")
|
||||
|
||||
peer.deliver(m1, engine, network)
|
||||
|
||||
check(i1.messages == @[m1])
|
||||
check(i2.messages == seq[Message] @[])
|
||||
|
||||
peer.deliver(m2, engine, network)
|
||||
|
||||
check(i1.messages == @[m1])
|
||||
check(i2.messages == @[m2])
|
||||
|
||||
test "should dispatch a message to multiple protocols if they are listening on the same message type":
|
||||
let i1 = Inbox(id: "protocol1", messageTypes: @["m1"])
|
||||
let i2 = Inbox(id: "protocol2", messageTypes: @["m1"])
|
||||
|
||||
let peer = Peer.new(protocols = @[Protocol i1, i2])
|
||||
|
||||
let m1: Message = FreelyTypedMessage(receiver: peer, messageType: "m1")
|
||||
|
||||
peer.deliver(m1, engine, network)
|
||||
|
||||
check(i1.messages == @[m1])
|
||||
check(i2.messages == @[m1])
|
||||
|
||||
test "should allow protocol to listen on multiple message types":
|
||||
let i1 = Inbox(id: "protocol1", messageTypes: @["m1", "m2"])
|
||||
|
||||
let peer = Peer.new(protocols = @[Protocol i1])
|
||||
|
||||
let m1: Message = FreelyTypedMessage(receiver: peer, messageType: "m1")
|
||||
let m2: Message = FreelyTypedMessage(receiver: peer, messageType: "m2")
|
||||
let m3: Message = FreelyTypedMessage(receiver: peer, messageType: "m3")
|
||||
|
||||
peer.deliver(m1, engine, network)
|
||||
peer.deliver(m2, engine, network)
|
||||
peer.deliver(m3, engine, network)
|
||||
|
||||
check(i1.messages == @[m1, m2])
|
||||
|
||||
|
||||
21
tests/helpers/inbox.nim
Normal file
21
tests/helpers/inbox.nim
Normal file
@ -0,0 +1,21 @@
|
||||
import swarmsim/engine/types
|
||||
import swarmsim/engine/peer
|
||||
import swarmsim/engine/protocol
|
||||
import swarmsim/engine/network
|
||||
|
||||
type
|
||||
Inbox* = ref object of Protocol
|
||||
messages*: seq[Message]
|
||||
|
||||
method uncheckedDeliver*(
|
||||
self: Inbox,
|
||||
message: Message,
|
||||
engine: EventDrivenEngine,
|
||||
network: Network
|
||||
) =
|
||||
self.messages.add(message)
|
||||
|
||||
export Message
|
||||
export peer
|
||||
export protocol
|
||||
export network
|
||||
24
tests/lib/tmultitable.nim
Normal file
24
tests/lib/tmultitable.nim
Normal file
@ -0,0 +1,24 @@
|
||||
import std/unittest
|
||||
|
||||
import pkg/swarmsim/lib/multitable
|
||||
|
||||
suite "multitable":
|
||||
test "should allow adding multiple values per key":
|
||||
var table = MultiTable[string, int].new()
|
||||
|
||||
table.add("key", 1)
|
||||
table.add("key", 2)
|
||||
table.add("key", 3)
|
||||
|
||||
check(table["key"] == [1, 2, 3])
|
||||
|
||||
test "should allow removal of values bound to a key":
|
||||
var table = MultiTable[string, int].new()
|
||||
|
||||
table.add("key", 1)
|
||||
table.add("key", 2)
|
||||
table.add("key", 3)
|
||||
|
||||
table.remove("key", 2)
|
||||
|
||||
check(table["key"] == [1, 3])
|
||||
Loading…
x
Reference in New Issue
Block a user