add block knowledge bootstrap

This commit is contained in:
gmega 2023-09-01 19:35:31 -03:00
parent 44541444e4
commit 7ae6e6cf2d
14 changed files with 328 additions and 129 deletions

View File

@ -1,68 +1,4 @@
import std/intsets
import std/tables
import options
import ../lib/withtypeid
import ../engine
type
BlockStore* = ref object of RootObj
store: Table[string, IntSet]
Manifest* = object of RootObj
cid*: string
nBlocks*: uint
withTypeId:
type
BlockExchangeProtocol* = ref object of Protocol
store*: BlockStore
WantHave* = ref object of Message
cid*: string
wants*: IntSet
Have* = ref object of Message
cid*: string
haves*: IntSet
proc new*(t: type BlockExchangeProtocol): BlockExchangeProtocol =
BlockExchangeProtocol(
store: BlockStore(store: initTable[string, IntSet]()),
messageTypes: @[WantHave.typeId, Have.typeId]
)
proc queryBlocks*(self: BlockStore, cid: string, wants: IntSet): IntSet =
if not self.store.hasKey(cid):
return initIntSet()
return self.store[cid].intersection(wants)
proc storeBlocks*(self: BlockStore, cid: string, blocks: seq[int]): void =
if not self.store.hasKey(cid):
self.store[cid] = initIntSet()
self.store[cid] = self.store[cid].union(toIntSet(blocks))
proc newFile*(self: BlockStore, manifest: Manifest) =
self.store[manifest.cid] = initIntSet()
proc handleWantHave*(self: BlockExchangeProtocol, message: WantHave): Have =
Have(
sender: message.receiver.some,
receiver: message.sender.get(),
cid: message.cid,
haves: self.store.queryBlocks(message.cid, message.wants)
)
method deliver*(
self: BlockExchangeProtocol,
message: Message,
engine: EventDrivenEngine,
network: Network
) =
if message of WantHave:
discard network.send(self.handleWantHave(WantHave(message)))
import blockexchange/downloadsession
import blockexchange/blockexchange
export downloadsession, blockexchange

View File

@ -0,0 +1,71 @@
import std/intsets
import std/tables
import options
import ../../lib/withtypeid
import ../../engine
import ./downloadsession
import ./types
withTypeId:
type
BlockExchangeProtocol* = ref object of Protocol
sessions: Table[string, DownloadSession]
WantHave* = ref object of Message
cid*: string
haves*: IntSet
request*: bool
proc newSession*(self: BlockExchangeProtocol, manifest: Manifest): DownloadSession =
self.sessions.mgetOrPut(manifest.cid, DownloadSession(manifest: manifest))
proc session*(self: BlockExchangeProtocol, manifest: Manifest): var DownloadSession =
self.sessions[manifest.cid]
proc handleWantHave*(self: var DownloadSession, message: WantHave,
network: Network): void =
self.storeBlockPeerMapping(message.sender.get().peerId, message.haves)
if message.request:
discard network.send(WantHave(
sender: message.receiver.some,
receiver: message.sender.get(),
cid: message.cid,
haves: self.blocks,
request: false
))
proc handleWantHave*(self: BlockExchangeProtocol, message: WantHave,
network: Network): void =
self.sessions[message.cid].handleWantHave(message, network)
proc neighborAdded*(
self: BlockExchangeProtocol,
parent: Peer,
neighbor: Peer,
manifest: Manifest,
network: Network
) =
discard network.send(
WantHave(
request: true,
sender: parent.some,
receiver: neighbor,
cid: manifest.cid,
haves: self.newSession(manifest).blocks
)
)
method deliver*(
self: BlockExchangeProtocol,
message: Message,
engine: EventDrivenEngine,
network: Network
): void =
if message of WantHave:
self.handleWantHave(WantHave(message), network)
proc new*(t: typedesc[BlockExchangeProtocol]): BlockExchangeProtocol =
BlockExchangeProtocol(messageTypes: @[WantHave.typeId])

View File

@ -0,0 +1,42 @@
import std/intsets
import std/tables
import options
import sugar
import ./types
import ../../lib/tables
type BlockStore* = IntSet
type BlockPeerMap* = Table[int, IntSet]
type Block* = int
type
DownloadSession* = object of RootObj
manifest*: Manifest
blocks*: BlockStore
blockPeerMap*: Table[int, IntSet]
export types
proc storeBlock*(self: var DownloadSession, aBlock: Block) =
self.blocks.incl(aBlock)
proc storeBlocks*(self: var DownloadSession, blocks: seq[Block]) =
for aBlock in blocks:
self.storeBlock(aBlock)
proc queryKnownBlocks*(self: DownloadSession, blocks: Option[IntSet]): IntSet =
blocks.map(query => self.blocks.intersection(query)).get(self.blocks)
proc queryKnownBlocks*(self: DownloadSession, blocks: Option[seq[Block]] =
none(seq[Block])): IntSet =
self.queryKnownBlocks(blocks.map(arr => toIntSet(arr)))
proc storeBlockPeerMapping*(self: var DownloadSession, peerId: int,
blocks: IntSet) =
for aBlock in blocks:
self.blockPeerMap.getDefault(aBlock, peers): peers[].incl(peerId)
proc peersForBlock*(self: DownloadSession, aBlock: Block): IntSet =
self.blockPeerMap.getOrDefault(aBlock, IntSet())

View File

@ -0,0 +1,4 @@
type
Manifest* = object of RootObj
cid*: string
nBlocks*: uint

View File

@ -1,6 +1,7 @@
import std/options
import std/strformat
import std/times
import sugar
import ./types
import ./schedulableevent
@ -14,9 +15,12 @@ type
schedulable*: SchedulableEvent
engine: EventDrivenEngine
Predicate* = (EventDrivenEngine, SchedulableEvent) -> bool
proc currentTime*(self: EventDrivenEngine): uint64 {.inline.} = self.currentTime
proc schedule*(self: EventDrivenEngine, schedulable: SchedulableEvent): void =
## Schedules a `SchedulableEvent` for execution.
if schedulable.time < self.currentTime:
raise (ref Defect)(
msg: "Cannot schedule an event in the past " &
@ -30,15 +34,16 @@ proc awaitableSchedule*(self: EventDrivenEngine,
proc scheduleAll*[T: SchedulableEvent](self: EventDrivenEngine,
schedulables: seq[T]): void =
for schedulable in schedulables:
self.schedule(schedulable)
schedulables.apply((s: T) => self.schedule(s))
proc stepUntil(self: EventDrivenEngine,
until: Option[uint64] = none(uint64)): Option[SchedulableEvent] =
timeout: Option[uint64] = none(uint64)): Option[SchedulableEvent] =
while len(self.queue) > 0:
if until.isSome and self.queue[0].time > until.get:
self.currentTime = until.get
# This allows us to halt execution even when in-between events if
# a time predicate is satistifed.
if timeout.isSome and self.queue[0].time > timeout.get:
self.currentTime = timeout.get
return none(SchedulableEvent)
let schedulable = self.queue.pop()
@ -53,12 +58,28 @@ proc stepUntil(self: EventDrivenEngine,
return none(SchedulableEvent)
proc nextStep*(self: EventDrivenEngine): Option[SchedulableEvent] =
## Runs the engine until the next event, returning none(SchedulableEvent)
## if no there are no events left.
self.stepUntil()
proc runUntil*(self: EventDrivenEngine, until: uint64): void =
while self.stepUntil(until.some).isSome and self.currentTime <= until:
proc runUntil*(self: EventDrivenEngine, timeout: uint64): void =
## Runs the engine until the specified simulation time. Can be used to
## implement awaits with timeouts, and for testing.
while self.stepUntil(timeout.some).isSome and self.currentTime <= timeout:
discard
proc runUntil*(self: EventDrivenEngine, predicate: Predicate,
timeout: Option[uint64] = none(uint64)): bool =
## Runs the engine until a `Predicate` is true, or a specified time is
## reached -- whichever happens first.
while true:
let schedulable = self.stepUntil(timeout)
if schedulable.isNone:
return false
if predicate(self, schedulable.get):
return true
proc runUntil*(self: EventDrivenEngine, until: Duration): void =
self.runUntil(uint64(until.inSeconds()))

View File

@ -27,8 +27,11 @@ proc new*(
defaultLinkDelay: defaultLinkDelay
)
proc send*(self: Network, message: Message,
linkDelay: Option[uint64] = none(uint64)): ScheduledEvent =
proc send*(
self: Network,
message: Message,
linkDelay: Option[uint64] = none(uint64)
): ScheduledEvent =
let delay = linkDelay.get(self.defaultLinkDelay)

View File

@ -1,6 +1,5 @@
import std/heapqueue
import std/tables
import std/sets
import std/options
import std/random
@ -36,15 +35,20 @@ type
## the same `Peer`.
messageTypes*: seq[string]
type
LifecycleEventType* = enum
started
up
down
Peer* = ref object of RootObj
## A `Peer` in our `Network` which runs `Protocols`. Together with other
## `Peer`s, forms a P2P network.
peerId*: int
up*: bool = false
# FIXME these are expensive data structures to have per-peer, and can
# significantly affect memory scalability. If this turns out to be the
# memory bottleneck, we can fliweight those by using fixed peer types.
protocols*: Table[string, Protocol]
dispatch*: MultiTable[string, Protocol]
@ -57,10 +61,8 @@ type
type
Network* = ref object of RootObj
## A `Network` is a collection of `Peer`s that can communicate with each
## other.
## A `Network` allows `Peer`s to send `Message`s to one another.
##
engine*: EventDrivenEngine
defaultLinkDelay*: uint64
peers*: HashSet[Peer] # TODO: use an array

31
swarmsim/lib/tables.nim Normal file
View File

@ -0,0 +1,31 @@
import std/tables
export tables
template getDefault*[K, V](self: var Table[K, V], key: K, alias,
body: untyped): void =
## An optimized template for getting a value from a table with a default fallback
## that gets inserted in the table as the key is accessed. This is essentially
## syntactic sugar on top of Table.withValue.
##
runnableExamples:
var table: Table[string, int] = initTable[string, int]()
table.getDefault("hello", c):
# like withValue, c is a pointer.
c[] += 1
table.getDefault("hello", c):
# like withValue, c is a pointer.
c[] += 1
echo table["hello"] # 2
self.withValue(key, v):
var alias {.inject.} = v
body
do:
var newVal = V.default()
var alias {.inject.} = addr newVal
body
self[key] = alias[]

View File

@ -0,0 +1,63 @@
import unittest
import sugar
import std/intsets
import std/sequtils
import swarmsim/engine
import swarmsim/codex/blockexchange
import ../../helpers/testpeer
proc newBexPeer(manifest: Manifest, peerId: int, has: seq[int] = @[]): Peer =
var bex = BlockExchangeProtocol.new()
discard bex.newSession(manifest)
bex.session(manifest).storeBlocks(blocks = has)
Peer.new(
peerId = peerId.some,
protocols = @[Protocol bex]
)
suite "block exchange":
setup:
let engine = EventDrivenEngine()
let network = Network(engine: engine)
test "should bootstrap block knowledge from newly added neighbors":
let manifest = Manifest(cid: "QmHash", nBlocks: 10)
let swarm = @[
newBexPeer(manifest, 1, has = @[0, 1, 3, 5]),
newBexPeer(manifest, 2, has = @[0, 2, 4, 6]),
newBexPeer(manifest, 3, has = @[0, 1, 3, 5]),
newBexPeer(manifest, 4, has = @[7, 8, 9]),
]
let newcomer = newBexPeer(manifest, 5)
var bex = BlockExchangeProtocol(
newcomer.getProtocol(BlockExchangeProtocol.typeId).get)
swarm.apply((neighbor: Peer) =>
bex.neighborAdded(newcomer, neighbor, manifest, network))
check(engine.runUntil(
proc (engine: EventDrivenEngine, schedulable: SchedulableEvent): bool =
len(bex.session(manifest).blockPeerMap) == 10))
let blockPeerMap = bex.session(manifest).blockPeerMap
check(blockPeerMap[0] == toIntSet([1, 2, 3]))
check(blockPeerMap[1] == toIntSet([1, 3]))
check(blockPeerMap[2] == toIntSet([2]))
check(blockPeerMap[3] == toIntSet([1, 3]))
check(blockPeerMap[4] == toIntSet([2]))
check(blockPeerMap[5] == toIntSet([1, 3]))
check(blockPeerMap[6] == toIntSet([2]))
check(blockPeerMap[7] == toIntSet([4]))
check(blockPeerMap[8] == toIntSet([4]))
check(blockPeerMap[9] == toIntSet([4]))

View File

@ -0,0 +1,37 @@
import unittest
import std/intsets
import options
import swarmsim/codex/blockexchange/downloadsession
suite "dowload session":
setup:
let manifest = Manifest(cid: "QmHash", nBlocks: 10)
var session = DownloadSession(manifest: manifest)
test "should allow known stored blocks to be queried":
session.storeBlocks(@[0, 1, 5, 8])
check(session.queryKnownBlocks() == toIntSet(@[0, 1, 5, 8]))
test "should return queries on overlapping known blocks":
session.storeBlocks(@[0, 1, 2, 3, 4, 5])
check(session.queryKnownBlocks(@[0, 3, 8].some) == toIntSet(@[0, 3]))
check(session.queryKnownBlocks(toIntSet(@[0, 3, 8]).some) ==
toIntSet(@[0, 3]))
test "should allow querying for peers that know about a block":
session.storeBlockPeerMapping(peerId = 1, toIntSet(@[0, 1, 3, 4, 9]))
session.storeBlockPeerMapping(peerId = 2, toIntSet(@[0, 1, 3, 5, 8]))
session.storeBlockPeerMapping(peerId = 3, toIntSet(@[0, 7, 8]))
check(session.peersForBlock(0) == toIntSet(@[1, 2, 3]))
check(session.peersForBlock(1) == toIntSet(@[1, 2]))
check(session.peersForBlock(2) == toIntSet(@[]))
check(session.peersForBlock(3) == toIntSet(@[1, 2]))
check(session.peersForBlock(4) == toIntSet(@[1]))
check(session.peersForBlock(5) == toIntSet(@[2]))
check(session.peersForBlock(7) == toIntSet(@[3]))
check(session.peersForBlock(8) == toIntSet(@[2, 3]))
check(session.peersForBlock(9) == toIntSet(@[1]))

View File

@ -1,48 +1,4 @@
import unittest
import std/intsets
import swarmsim/engine
import swarmsim/codex/blockexchange
import ../helpers/testpeer
suite "block exchange":
setup:
let engine = EventDrivenEngine()
let network = Network(engine: engine)
test "should respond to want-have message with a list of the blocks it has":
let sender = TestPeer.new(network)
let bex = BlockExchangeProtocol.new()
let file = Manifest(cid: "QmHash", nBlocks: 4)
bex.store.newFile(file)
bex.store.storeBlocks(cid = "QmHash", blocks = @[0, 1, 2, 4])
let peer = Peer.new(
peerId = 1.some,
protocols = @[Protocol bex]
)
let message = WantHave(
sender: (Peer sender).some,
receiver: peer,
cid: file.cid,
wants: toIntSet([0, 1, 3, 4])
)
discard sender.send(message)
engine.run()
check(len(sender.inbox.messages) == 1)
let response = Have(sender.inbox.messages[0])
check(response.haves == toIntSet([0, 1, 4]))
import ./blockexchange/tblockexchange
import ./blockexchange/tdownloadsession
{.warning[UnusedImport]: off.}

View File

@ -107,3 +107,22 @@ suite "event driven engine tests":
check(engine.currentTime == 8)
check(handles.allIt(it.schedulable.completed) == true)
test "should allow clients to run until a predicate is satistified":
let times = @[50'u64, 100, 150, 200]
let engine = EventDrivenEngine()
times.apply((time: uint64) => engine.schedule(TestSchedulable(time: time)))
const stopIndex = 3
var index = 0
check(engine.runUntil(
proc (engine: EventDrivenEngine, schedulable: SchedulableEvent): bool =
index += 1
index == stopIndex
))
check(index == stopIndex)
check(engine.currentTime == times[stopIndex - 1])

View File

@ -25,6 +25,6 @@ proc new*(
proc inbox*(peer: TestPeer): Inbox =
Inbox peer.getProtocol(Inbox.typeId).get()
proc send*(self: TestPeer, msg: Message): ScheduledEvent =
proc send*(self: TestPeer, msg: var Message): ScheduledEvent =
msg.sender = Peer(self).some
self.network.send(msg)

14
tests/lib/ttables.nim Normal file
View File

@ -0,0 +1,14 @@
import unittest
import swarmsim/lib/tables
suite "tables":
test "should create a default value and allow modification":
var table: Table[string, int]
table.getDefault("hello", c): c[] += 1
table.getDefault("hello", c): c[] += 1
check(table["hello"] == 2)