From 7ae6e6cf2de66f1e445ec8087157bccad9072547 Mon Sep 17 00:00:00 2001 From: gmega Date: Fri, 1 Sep 2023 19:35:31 -0300 Subject: [PATCH] add block knowledge bootstrap --- swarmsim/codex/blockexchange.nim | 70 +----------------- .../codex/blockexchange/blockexchange.nim | 71 +++++++++++++++++++ .../codex/blockexchange/downloadsession.nim | 42 +++++++++++ swarmsim/codex/blockexchange/types.nim | 4 ++ swarmsim/engine/eventdrivenengine.nim | 35 +++++++-- swarmsim/engine/network.nim | 7 +- swarmsim/engine/types.nim | 12 ++-- swarmsim/lib/tables.nim | 31 ++++++++ tests/codex/blockexchange/tblockexchange.nim | 63 ++++++++++++++++ .../codex/blockexchange/tdownloadsession.nim | 37 ++++++++++ tests/codex/tblockexchange.nim | 50 +------------ tests/engine/teventdrivenengine.nim | 19 +++++ tests/helpers/testpeer.nim | 2 +- tests/lib/ttables.nim | 14 ++++ 14 files changed, 328 insertions(+), 129 deletions(-) create mode 100644 swarmsim/codex/blockexchange/blockexchange.nim create mode 100644 swarmsim/codex/blockexchange/downloadsession.nim create mode 100644 swarmsim/codex/blockexchange/types.nim create mode 100644 swarmsim/lib/tables.nim create mode 100644 tests/codex/blockexchange/tblockexchange.nim create mode 100644 tests/codex/blockexchange/tdownloadsession.nim create mode 100644 tests/lib/ttables.nim diff --git a/swarmsim/codex/blockexchange.nim b/swarmsim/codex/blockexchange.nim index 7075565..9418ff1 100644 --- a/swarmsim/codex/blockexchange.nim +++ b/swarmsim/codex/blockexchange.nim @@ -1,68 +1,4 @@ -import std/intsets -import std/tables -import options - -import ../lib/withtypeid - -import ../engine - -type - BlockStore* = ref object of RootObj - store: Table[string, IntSet] - - Manifest* = object of RootObj - cid*: string - nBlocks*: uint - -withTypeId: - type - BlockExchangeProtocol* = ref object of Protocol - store*: BlockStore - - WantHave* = ref object of Message - cid*: string - wants*: IntSet - - Have* = ref object of Message - cid*: string - haves*: IntSet - -proc new*(t: type BlockExchangeProtocol): BlockExchangeProtocol = - BlockExchangeProtocol( - store: BlockStore(store: initTable[string, IntSet]()), - messageTypes: @[WantHave.typeId, Have.typeId] - ) - -proc queryBlocks*(self: BlockStore, cid: string, wants: IntSet): IntSet = - if not self.store.hasKey(cid): - return initIntSet() - - return self.store[cid].intersection(wants) - -proc storeBlocks*(self: BlockStore, cid: string, blocks: seq[int]): void = - if not self.store.hasKey(cid): - self.store[cid] = initIntSet() - - self.store[cid] = self.store[cid].union(toIntSet(blocks)) - -proc newFile*(self: BlockStore, manifest: Manifest) = - self.store[manifest.cid] = initIntSet() - -proc handleWantHave*(self: BlockExchangeProtocol, message: WantHave): Have = - Have( - sender: message.receiver.some, - receiver: message.sender.get(), - cid: message.cid, - haves: self.store.queryBlocks(message.cid, message.wants) - ) - -method deliver*( - self: BlockExchangeProtocol, - message: Message, - engine: EventDrivenEngine, - network: Network -) = - if message of WantHave: - discard network.send(self.handleWantHave(WantHave(message))) - +import blockexchange/downloadsession +import blockexchange/blockexchange +export downloadsession, blockexchange diff --git a/swarmsim/codex/blockexchange/blockexchange.nim b/swarmsim/codex/blockexchange/blockexchange.nim new file mode 100644 index 0000000..ea8fd59 --- /dev/null +++ b/swarmsim/codex/blockexchange/blockexchange.nim @@ -0,0 +1,71 @@ +import std/intsets +import std/tables +import options + +import ../../lib/withtypeid +import ../../engine + +import ./downloadsession +import ./types + +withTypeId: + type + BlockExchangeProtocol* = ref object of Protocol + sessions: Table[string, DownloadSession] + + WantHave* = ref object of Message + cid*: string + haves*: IntSet + request*: bool + +proc newSession*(self: BlockExchangeProtocol, manifest: Manifest): DownloadSession = + self.sessions.mgetOrPut(manifest.cid, DownloadSession(manifest: manifest)) + +proc session*(self: BlockExchangeProtocol, manifest: Manifest): var DownloadSession = + self.sessions[manifest.cid] + +proc handleWantHave*(self: var DownloadSession, message: WantHave, + network: Network): void = + self.storeBlockPeerMapping(message.sender.get().peerId, message.haves) + + if message.request: + discard network.send(WantHave( + sender: message.receiver.some, + receiver: message.sender.get(), + cid: message.cid, + haves: self.blocks, + request: false + )) + +proc handleWantHave*(self: BlockExchangeProtocol, message: WantHave, + network: Network): void = + self.sessions[message.cid].handleWantHave(message, network) + +proc neighborAdded*( + self: BlockExchangeProtocol, + parent: Peer, + neighbor: Peer, + manifest: Manifest, + network: Network +) = + discard network.send( + WantHave( + request: true, + sender: parent.some, + receiver: neighbor, + cid: manifest.cid, + haves: self.newSession(manifest).blocks + ) + ) + +method deliver*( + self: BlockExchangeProtocol, + message: Message, + engine: EventDrivenEngine, + network: Network +): void = + if message of WantHave: + self.handleWantHave(WantHave(message), network) + +proc new*(t: typedesc[BlockExchangeProtocol]): BlockExchangeProtocol = + BlockExchangeProtocol(messageTypes: @[WantHave.typeId]) diff --git a/swarmsim/codex/blockexchange/downloadsession.nim b/swarmsim/codex/blockexchange/downloadsession.nim new file mode 100644 index 0000000..ae7a998 --- /dev/null +++ b/swarmsim/codex/blockexchange/downloadsession.nim @@ -0,0 +1,42 @@ +import std/intsets +import std/tables + +import options +import sugar + +import ./types +import ../../lib/tables + +type BlockStore* = IntSet +type BlockPeerMap* = Table[int, IntSet] +type Block* = int + +type + DownloadSession* = object of RootObj + manifest*: Manifest + blocks*: BlockStore + blockPeerMap*: Table[int, IntSet] + +export types + +proc storeBlock*(self: var DownloadSession, aBlock: Block) = + self.blocks.incl(aBlock) + +proc storeBlocks*(self: var DownloadSession, blocks: seq[Block]) = + for aBlock in blocks: + self.storeBlock(aBlock) + +proc queryKnownBlocks*(self: DownloadSession, blocks: Option[IntSet]): IntSet = + blocks.map(query => self.blocks.intersection(query)).get(self.blocks) + +proc queryKnownBlocks*(self: DownloadSession, blocks: Option[seq[Block]] = + none(seq[Block])): IntSet = + self.queryKnownBlocks(blocks.map(arr => toIntSet(arr))) + +proc storeBlockPeerMapping*(self: var DownloadSession, peerId: int, + blocks: IntSet) = + for aBlock in blocks: + self.blockPeerMap.getDefault(aBlock, peers): peers[].incl(peerId) + +proc peersForBlock*(self: DownloadSession, aBlock: Block): IntSet = + self.blockPeerMap.getOrDefault(aBlock, IntSet()) diff --git a/swarmsim/codex/blockexchange/types.nim b/swarmsim/codex/blockexchange/types.nim new file mode 100644 index 0000000..088ea24 --- /dev/null +++ b/swarmsim/codex/blockexchange/types.nim @@ -0,0 +1,4 @@ +type + Manifest* = object of RootObj + cid*: string + nBlocks*: uint diff --git a/swarmsim/engine/eventdrivenengine.nim b/swarmsim/engine/eventdrivenengine.nim index 861980a..62538c3 100644 --- a/swarmsim/engine/eventdrivenengine.nim +++ b/swarmsim/engine/eventdrivenengine.nim @@ -1,6 +1,7 @@ import std/options import std/strformat import std/times +import sugar import ./types import ./schedulableevent @@ -14,9 +15,12 @@ type schedulable*: SchedulableEvent engine: EventDrivenEngine + Predicate* = (EventDrivenEngine, SchedulableEvent) -> bool + proc currentTime*(self: EventDrivenEngine): uint64 {.inline.} = self.currentTime proc schedule*(self: EventDrivenEngine, schedulable: SchedulableEvent): void = + ## Schedules a `SchedulableEvent` for execution. if schedulable.time < self.currentTime: raise (ref Defect)( msg: "Cannot schedule an event in the past " & @@ -30,15 +34,16 @@ proc awaitableSchedule*(self: EventDrivenEngine, proc scheduleAll*[T: SchedulableEvent](self: EventDrivenEngine, schedulables: seq[T]): void = - for schedulable in schedulables: - self.schedule(schedulable) + schedulables.apply((s: T) => self.schedule(s)) proc stepUntil(self: EventDrivenEngine, - until: Option[uint64] = none(uint64)): Option[SchedulableEvent] = + timeout: Option[uint64] = none(uint64)): Option[SchedulableEvent] = while len(self.queue) > 0: - if until.isSome and self.queue[0].time > until.get: - self.currentTime = until.get + # This allows us to halt execution even when in-between events if + # a time predicate is satistifed. + if timeout.isSome and self.queue[0].time > timeout.get: + self.currentTime = timeout.get return none(SchedulableEvent) let schedulable = self.queue.pop() @@ -53,12 +58,28 @@ proc stepUntil(self: EventDrivenEngine, return none(SchedulableEvent) proc nextStep*(self: EventDrivenEngine): Option[SchedulableEvent] = + ## Runs the engine until the next event, returning none(SchedulableEvent) + ## if no there are no events left. self.stepUntil() -proc runUntil*(self: EventDrivenEngine, until: uint64): void = - while self.stepUntil(until.some).isSome and self.currentTime <= until: +proc runUntil*(self: EventDrivenEngine, timeout: uint64): void = + ## Runs the engine until the specified simulation time. Can be used to + ## implement awaits with timeouts, and for testing. + while self.stepUntil(timeout.some).isSome and self.currentTime <= timeout: discard +proc runUntil*(self: EventDrivenEngine, predicate: Predicate, + timeout: Option[uint64] = none(uint64)): bool = + ## Runs the engine until a `Predicate` is true, or a specified time is + ## reached -- whichever happens first. + while true: + let schedulable = self.stepUntil(timeout) + if schedulable.isNone: + return false + + if predicate(self, schedulable.get): + return true + proc runUntil*(self: EventDrivenEngine, until: Duration): void = self.runUntil(uint64(until.inSeconds())) diff --git a/swarmsim/engine/network.nim b/swarmsim/engine/network.nim index d957914..265e045 100644 --- a/swarmsim/engine/network.nim +++ b/swarmsim/engine/network.nim @@ -27,8 +27,11 @@ proc new*( defaultLinkDelay: defaultLinkDelay ) -proc send*(self: Network, message: Message, - linkDelay: Option[uint64] = none(uint64)): ScheduledEvent = +proc send*( + self: Network, + message: Message, + linkDelay: Option[uint64] = none(uint64) +): ScheduledEvent = let delay = linkDelay.get(self.defaultLinkDelay) diff --git a/swarmsim/engine/types.nim b/swarmsim/engine/types.nim index 630366e..4e79998 100644 --- a/swarmsim/engine/types.nim +++ b/swarmsim/engine/types.nim @@ -1,6 +1,5 @@ import std/heapqueue import std/tables -import std/sets import std/options import std/random @@ -36,15 +35,20 @@ type ## the same `Peer`. messageTypes*: seq[string] -type LifecycleEventType* = enum started up down Peer* = ref object of RootObj + ## A `Peer` in our `Network` which runs `Protocols`. Together with other + ## `Peer`s, forms a P2P network. peerId*: int up*: bool = false + + # FIXME these are expensive data structures to have per-peer, and can + # significantly affect memory scalability. If this turns out to be the + # memory bottleneck, we can fliweight those by using fixed peer types. protocols*: Table[string, Protocol] dispatch*: MultiTable[string, Protocol] @@ -57,10 +61,8 @@ type type Network* = ref object of RootObj - ## A `Network` is a collection of `Peer`s that can communicate with each - ## other. + ## A `Network` allows `Peer`s to send `Message`s to one another. ## engine*: EventDrivenEngine defaultLinkDelay*: uint64 - peers*: HashSet[Peer] # TODO: use an array diff --git a/swarmsim/lib/tables.nim b/swarmsim/lib/tables.nim new file mode 100644 index 0000000..2215272 --- /dev/null +++ b/swarmsim/lib/tables.nim @@ -0,0 +1,31 @@ +import std/tables + +export tables + +template getDefault*[K, V](self: var Table[K, V], key: K, alias, + body: untyped): void = + ## An optimized template for getting a value from a table with a default fallback + ## that gets inserted in the table as the key is accessed. This is essentially + ## syntactic sugar on top of Table.withValue. + ## + runnableExamples: + var table: Table[string, int] = initTable[string, int]() + + table.getDefault("hello", c): + # like withValue, c is a pointer. + c[] += 1 + + table.getDefault("hello", c): + # like withValue, c is a pointer. + c[] += 1 + + echo table["hello"] # 2 + + self.withValue(key, v): + var alias {.inject.} = v + body + do: + var newVal = V.default() + var alias {.inject.} = addr newVal + body + self[key] = alias[] diff --git a/tests/codex/blockexchange/tblockexchange.nim b/tests/codex/blockexchange/tblockexchange.nim new file mode 100644 index 0000000..175fd39 --- /dev/null +++ b/tests/codex/blockexchange/tblockexchange.nim @@ -0,0 +1,63 @@ +import unittest +import sugar + +import std/intsets +import std/sequtils + +import swarmsim/engine +import swarmsim/codex/blockexchange + +import ../../helpers/testpeer + +proc newBexPeer(manifest: Manifest, peerId: int, has: seq[int] = @[]): Peer = + var bex = BlockExchangeProtocol.new() + + discard bex.newSession(manifest) + bex.session(manifest).storeBlocks(blocks = has) + + Peer.new( + peerId = peerId.some, + protocols = @[Protocol bex] + ) + +suite "block exchange": + + setup: + let engine = EventDrivenEngine() + let network = Network(engine: engine) + + test "should bootstrap block knowledge from newly added neighbors": + + let manifest = Manifest(cid: "QmHash", nBlocks: 10) + + let swarm = @[ + newBexPeer(manifest, 1, has = @[0, 1, 3, 5]), + newBexPeer(manifest, 2, has = @[0, 2, 4, 6]), + newBexPeer(manifest, 3, has = @[0, 1, 3, 5]), + newBexPeer(manifest, 4, has = @[7, 8, 9]), + ] + + let newcomer = newBexPeer(manifest, 5) + + var bex = BlockExchangeProtocol( + newcomer.getProtocol(BlockExchangeProtocol.typeId).get) + + swarm.apply((neighbor: Peer) => + bex.neighborAdded(newcomer, neighbor, manifest, network)) + + check(engine.runUntil( + proc (engine: EventDrivenEngine, schedulable: SchedulableEvent): bool = + len(bex.session(manifest).blockPeerMap) == 10)) + + let blockPeerMap = bex.session(manifest).blockPeerMap + + check(blockPeerMap[0] == toIntSet([1, 2, 3])) + check(blockPeerMap[1] == toIntSet([1, 3])) + check(blockPeerMap[2] == toIntSet([2])) + check(blockPeerMap[3] == toIntSet([1, 3])) + check(blockPeerMap[4] == toIntSet([2])) + check(blockPeerMap[5] == toIntSet([1, 3])) + check(blockPeerMap[6] == toIntSet([2])) + check(blockPeerMap[7] == toIntSet([4])) + check(blockPeerMap[8] == toIntSet([4])) + check(blockPeerMap[9] == toIntSet([4])) diff --git a/tests/codex/blockexchange/tdownloadsession.nim b/tests/codex/blockexchange/tdownloadsession.nim new file mode 100644 index 0000000..d06606f --- /dev/null +++ b/tests/codex/blockexchange/tdownloadsession.nim @@ -0,0 +1,37 @@ +import unittest + +import std/intsets +import options + +import swarmsim/codex/blockexchange/downloadsession + +suite "dowload session": + setup: + let manifest = Manifest(cid: "QmHash", nBlocks: 10) + var session = DownloadSession(manifest: manifest) + + test "should allow known stored blocks to be queried": + session.storeBlocks(@[0, 1, 5, 8]) + check(session.queryKnownBlocks() == toIntSet(@[0, 1, 5, 8])) + + test "should return queries on overlapping known blocks": + session.storeBlocks(@[0, 1, 2, 3, 4, 5]) + check(session.queryKnownBlocks(@[0, 3, 8].some) == toIntSet(@[0, 3])) + check(session.queryKnownBlocks(toIntSet(@[0, 3, 8]).some) == + toIntSet(@[0, 3])) + + test "should allow querying for peers that know about a block": + session.storeBlockPeerMapping(peerId = 1, toIntSet(@[0, 1, 3, 4, 9])) + session.storeBlockPeerMapping(peerId = 2, toIntSet(@[0, 1, 3, 5, 8])) + session.storeBlockPeerMapping(peerId = 3, toIntSet(@[0, 7, 8])) + + check(session.peersForBlock(0) == toIntSet(@[1, 2, 3])) + check(session.peersForBlock(1) == toIntSet(@[1, 2])) + check(session.peersForBlock(2) == toIntSet(@[])) + check(session.peersForBlock(3) == toIntSet(@[1, 2])) + check(session.peersForBlock(4) == toIntSet(@[1])) + check(session.peersForBlock(5) == toIntSet(@[2])) + check(session.peersForBlock(7) == toIntSet(@[3])) + check(session.peersForBlock(8) == toIntSet(@[2, 3])) + check(session.peersForBlock(9) == toIntSet(@[1])) + diff --git a/tests/codex/tblockexchange.nim b/tests/codex/tblockexchange.nim index 8b8fa0f..c6f55e4 100644 --- a/tests/codex/tblockexchange.nim +++ b/tests/codex/tblockexchange.nim @@ -1,48 +1,4 @@ -import unittest - -import std/intsets - -import swarmsim/engine -import swarmsim/codex/blockexchange - -import ../helpers/testpeer - -suite "block exchange": - - setup: - let engine = EventDrivenEngine() - let network = Network(engine: engine) - - test "should respond to want-have message with a list of the blocks it has": - let sender = TestPeer.new(network) - - let bex = BlockExchangeProtocol.new() - - let file = Manifest(cid: "QmHash", nBlocks: 4) - bex.store.newFile(file) - bex.store.storeBlocks(cid = "QmHash", blocks = @[0, 1, 2, 4]) - - let peer = Peer.new( - peerId = 1.some, - protocols = @[Protocol bex] - ) - - let message = WantHave( - sender: (Peer sender).some, - receiver: peer, - cid: file.cid, - wants: toIntSet([0, 1, 3, 4]) - ) - - discard sender.send(message) - - engine.run() - - check(len(sender.inbox.messages) == 1) - - let response = Have(sender.inbox.messages[0]) - - check(response.haves == toIntSet([0, 1, 4])) - - +import ./blockexchange/tblockexchange +import ./blockexchange/tdownloadsession +{.warning[UnusedImport]: off.} diff --git a/tests/engine/teventdrivenengine.nim b/tests/engine/teventdrivenengine.nim index 15d390c..1709b02 100644 --- a/tests/engine/teventdrivenengine.nim +++ b/tests/engine/teventdrivenengine.nim @@ -107,3 +107,22 @@ suite "event driven engine tests": check(engine.currentTime == 8) check(handles.allIt(it.schedulable.completed) == true) + + test "should allow clients to run until a predicate is satistified": + let times = @[50'u64, 100, 150, 200] + + let engine = EventDrivenEngine() + + times.apply((time: uint64) => engine.schedule(TestSchedulable(time: time))) + + const stopIndex = 3 + var index = 0 + + check(engine.runUntil( + proc (engine: EventDrivenEngine, schedulable: SchedulableEvent): bool = + index += 1 + index == stopIndex + )) + + check(index == stopIndex) + check(engine.currentTime == times[stopIndex - 1]) diff --git a/tests/helpers/testpeer.nim b/tests/helpers/testpeer.nim index a126fec..05a3ede 100644 --- a/tests/helpers/testpeer.nim +++ b/tests/helpers/testpeer.nim @@ -25,6 +25,6 @@ proc new*( proc inbox*(peer: TestPeer): Inbox = Inbox peer.getProtocol(Inbox.typeId).get() -proc send*(self: TestPeer, msg: Message): ScheduledEvent = +proc send*(self: TestPeer, msg: var Message): ScheduledEvent = msg.sender = Peer(self).some self.network.send(msg) diff --git a/tests/lib/ttables.nim b/tests/lib/ttables.nim new file mode 100644 index 0000000..1c9d78f --- /dev/null +++ b/tests/lib/ttables.nim @@ -0,0 +1,14 @@ +import unittest + +import swarmsim/lib/tables + +suite "tables": + test "should create a default value and allow modification": + var table: Table[string, int] + + table.getDefault("hello", c): c[] += 1 + + table.getDefault("hello", c): c[] += 1 + + check(table["hello"] == 2) +