mirror of
https://github.com/logos-storage/swarmsim.git
synced 2026-01-08 00:33:06 +00:00
add basic block exchange protocol; remove useless explicit peer tracking; fix test helpers
This commit is contained in:
parent
cd64de4026
commit
17f7c1cabd
68
swarmsim/codex/blockexchange.nim
Normal file
68
swarmsim/codex/blockexchange.nim
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
import std/intsets
|
||||||
|
import std/tables
|
||||||
|
import options
|
||||||
|
|
||||||
|
import ../lib/withtypeid
|
||||||
|
|
||||||
|
import ../engine
|
||||||
|
|
||||||
|
type
|
||||||
|
BlockStore* = ref object of RootObj
|
||||||
|
store: Table[string, IntSet]
|
||||||
|
|
||||||
|
Manifest* = object of RootObj
|
||||||
|
cid*: string
|
||||||
|
nBlocks*: uint
|
||||||
|
|
||||||
|
withTypeId:
|
||||||
|
type
|
||||||
|
BlockExchangeProtocol* = ref object of Protocol
|
||||||
|
store*: BlockStore
|
||||||
|
|
||||||
|
WantHave* = ref object of Message
|
||||||
|
cid*: string
|
||||||
|
wants*: IntSet
|
||||||
|
|
||||||
|
Have* = ref object of Message
|
||||||
|
cid*: string
|
||||||
|
haves*: IntSet
|
||||||
|
|
||||||
|
proc new*(t: type BlockExchangeProtocol): BlockExchangeProtocol =
|
||||||
|
BlockExchangeProtocol(
|
||||||
|
store: BlockStore(store: initTable[string, IntSet]()),
|
||||||
|
messageTypes: @[WantHave.typeId, Have.typeId]
|
||||||
|
)
|
||||||
|
|
||||||
|
proc queryBlocks*(self: BlockStore, cid: string, wants: IntSet): IntSet =
|
||||||
|
if not self.store.hasKey(cid):
|
||||||
|
return initIntSet()
|
||||||
|
|
||||||
|
return self.store[cid].intersection(wants)
|
||||||
|
|
||||||
|
proc storeBlocks*(self: BlockStore, cid: string, blocks: seq[int]): void =
|
||||||
|
if not self.store.hasKey(cid):
|
||||||
|
self.store[cid] = initIntSet()
|
||||||
|
|
||||||
|
self.store[cid] = self.store[cid].union(toIntSet(blocks))
|
||||||
|
|
||||||
|
proc newFile*(self: BlockStore, manifest: Manifest) =
|
||||||
|
self.store[manifest.cid] = initIntSet()
|
||||||
|
|
||||||
|
proc handleWantHave*(self: BlockExchangeProtocol, message: WantHave): Have =
|
||||||
|
Have(
|
||||||
|
sender: message.receiver.some,
|
||||||
|
receiver: message.sender.get(),
|
||||||
|
cid: message.cid,
|
||||||
|
haves: self.store.queryBlocks(message.cid, message.wants)
|
||||||
|
)
|
||||||
|
|
||||||
|
method deliver*(
|
||||||
|
self: BlockExchangeProtocol,
|
||||||
|
message: Message,
|
||||||
|
engine: EventDrivenEngine,
|
||||||
|
network: Network
|
||||||
|
) =
|
||||||
|
if message of WantHave:
|
||||||
|
discard network.send(self.handleWantHave(WantHave(message)))
|
||||||
|
|
||||||
|
|
||||||
@ -1,12 +1,10 @@
|
|||||||
import std/options
|
import std/options
|
||||||
import std/sets
|
|
||||||
|
|
||||||
import ./types
|
import ./types
|
||||||
import ./peer
|
import ./peer
|
||||||
import ./eventdrivenengine
|
import ./eventdrivenengine
|
||||||
|
|
||||||
export options
|
export options
|
||||||
export sets
|
|
||||||
export peer
|
export peer
|
||||||
export eventdrivenengine
|
export eventdrivenengine
|
||||||
export types
|
export types
|
||||||
@ -26,18 +24,9 @@ proc new*(
|
|||||||
): Network =
|
): Network =
|
||||||
Network(
|
Network(
|
||||||
engine: engine,
|
engine: engine,
|
||||||
defaultLinkDelay: defaultLinkDelay,
|
defaultLinkDelay: defaultLinkDelay
|
||||||
peers: HashSet[Peer]()
|
|
||||||
)
|
)
|
||||||
|
|
||||||
proc add*(self: Network, peer: Peer): void =
|
|
||||||
# TODO: this can be very slow if the array keeps being resized, but for
|
|
||||||
# now I won't care much.
|
|
||||||
self.peers.incl(peer)
|
|
||||||
|
|
||||||
proc remove*(self: Network, peer: Peer) =
|
|
||||||
self.peers.excl(peer)
|
|
||||||
|
|
||||||
proc send*(self: Network, message: Message,
|
proc send*(self: Network, message: Message,
|
||||||
linkDelay: Option[uint64] = none(uint64)): ScheduledEvent =
|
linkDelay: Option[uint64] = none(uint64)): ScheduledEvent =
|
||||||
|
|
||||||
|
|||||||
@ -3,6 +3,7 @@ import engine/tschedulableevent
|
|||||||
import engine/tnetwork
|
import engine/tnetwork
|
||||||
import engine/tpeer
|
import engine/tpeer
|
||||||
import codex/tdhttracker
|
import codex/tdhttracker
|
||||||
|
import codex/tblockexchange
|
||||||
import lib/tmultitable
|
import lib/tmultitable
|
||||||
import lib/twithtypeid
|
import lib/twithtypeid
|
||||||
|
|
||||||
|
|||||||
43
tests/codex/tblockexchange.nim
Normal file
43
tests/codex/tblockexchange.nim
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
import unittest
|
||||||
|
|
||||||
|
import std/intsets
|
||||||
|
|
||||||
|
import swarmsim/engine
|
||||||
|
import swarmsim/codex/blockexchange
|
||||||
|
|
||||||
|
import ../helpers/testpeer
|
||||||
|
|
||||||
|
suite "block exchange":
|
||||||
|
|
||||||
|
test "should respond to want-block message with a list of the blocks it has":
|
||||||
|
let engine = EventDrivenEngine()
|
||||||
|
let network = Network(engine: engine)
|
||||||
|
let sender = TestPeer.new(network)
|
||||||
|
|
||||||
|
let bex = BlockExchangeProtocol.new()
|
||||||
|
|
||||||
|
let file = Manifest(cid: "QmHash", nBlocks: 4)
|
||||||
|
bex.store.newFile(file)
|
||||||
|
bex.store.storeBlocks(cid = "QmHash", blocks = @[0, 1, 2, 4])
|
||||||
|
|
||||||
|
let peer = Peer.new(
|
||||||
|
peerId = 1.some,
|
||||||
|
protocols = @[Protocol bex]
|
||||||
|
)
|
||||||
|
|
||||||
|
let message = WantHave(
|
||||||
|
sender: (Peer sender).some,
|
||||||
|
receiver: peer,
|
||||||
|
cid: file.cid,
|
||||||
|
wants: toIntSet([0, 1, 3, 4])
|
||||||
|
)
|
||||||
|
|
||||||
|
discard sender.send(message)
|
||||||
|
|
||||||
|
engine.run()
|
||||||
|
|
||||||
|
check(len(sender.inbox.messages) == 1)
|
||||||
|
|
||||||
|
let response = Have(sender.inbox.messages[0])
|
||||||
|
|
||||||
|
check(response.haves == toIntSet([0, 1, 4]))
|
||||||
@ -36,7 +36,6 @@ suite "tracker node":
|
|||||||
)
|
)
|
||||||
|
|
||||||
let network = Network.new(engine = engine)
|
let network = Network.new(engine = engine)
|
||||||
network.add(trackerPeer)
|
|
||||||
|
|
||||||
test "should retain published descriptors":
|
test "should retain published descriptors":
|
||||||
announcePeer(network, trackerPeer, 25)
|
announcePeer(network, trackerPeer, 25)
|
||||||
|
|||||||
@ -93,7 +93,17 @@ suite "event driven engine tests":
|
|||||||
|
|
||||||
check(engine.currentTime == 10)
|
check(engine.currentTime == 10)
|
||||||
|
|
||||||
|
test "should run to completion":
|
||||||
|
let times = @[1'u64, 2, 3, 4, 5, 6, 7, 8]
|
||||||
|
|
||||||
|
let engine = EventDrivenEngine()
|
||||||
|
|
||||||
|
let handles = times.map(time =>
|
||||||
|
engine.awaitableSchedule(TestSchedulable(time: time)))
|
||||||
|
|
||||||
|
check(handles.allIt(it.schedulable.completed) == false)
|
||||||
|
|
||||||
|
engine.run()
|
||||||
|
|
||||||
|
check(engine.currentTime == 8)
|
||||||
|
check(handles.allIt(it.schedulable.completed) == true)
|
||||||
|
|||||||
@ -22,9 +22,6 @@ suite "network":
|
|||||||
|
|
||||||
let network = Network.new(engine = engine, defaultLinkDelay = 20)
|
let network = Network.new(engine = engine, defaultLinkDelay = 20)
|
||||||
|
|
||||||
network.add(p1)
|
|
||||||
network.add(p2)
|
|
||||||
|
|
||||||
let m1: Message = FreelyTypedMessage(receiver: p1, messageType: "m")
|
let m1: Message = FreelyTypedMessage(receiver: p1, messageType: "m")
|
||||||
let m2: Message = FreelyTypedMessage(receiver: p2, messageType: "m")
|
let m2: Message = FreelyTypedMessage(receiver: p2, messageType: "m")
|
||||||
|
|
||||||
|
|||||||
@ -2,8 +2,10 @@ import swarmsim/engine/types
|
|||||||
import swarmsim/engine/peer
|
import swarmsim/engine/peer
|
||||||
import swarmsim/engine/protocol
|
import swarmsim/engine/protocol
|
||||||
import swarmsim/engine/network
|
import swarmsim/engine/network
|
||||||
|
import swarmsim/lib/withtypeid
|
||||||
|
|
||||||
type
|
withTypeId:
|
||||||
|
type
|
||||||
Inbox* = ref object of Protocol
|
Inbox* = ref object of Protocol
|
||||||
protocolId*: string
|
protocolId*: string
|
||||||
messages*: seq[Message]
|
messages*: seq[Message]
|
||||||
|
|||||||
@ -1,5 +1,4 @@
|
|||||||
import std/options
|
import std/options
|
||||||
import std/random
|
|
||||||
|
|
||||||
import swarmsim/engine
|
import swarmsim/engine
|
||||||
import swarmsim/engine/peer
|
import swarmsim/engine/peer
|
||||||
@ -15,11 +14,16 @@ proc new*(
|
|||||||
peerId: Option[int] = none(int),
|
peerId: Option[int] = none(int),
|
||||||
): TestPeer =
|
): TestPeer =
|
||||||
let peer: TestPeer = TestPeer(network: network)
|
let peer: TestPeer = TestPeer(network: network)
|
||||||
discard peer.initPeer(protocols = @[Protocol Inbox()])
|
discard peer.initPeer(
|
||||||
|
protocols = @[Protocol Inbox(
|
||||||
|
protocolId: Inbox.typeId,
|
||||||
|
messageTypes: @["*"]
|
||||||
|
)
|
||||||
|
])
|
||||||
peer
|
peer
|
||||||
|
|
||||||
proc inbox*(peer: TestPeer): Inbox =
|
proc inbox*(peer: TestPeer): Inbox =
|
||||||
Inbox peer.getProtocol(Inbox.protocolName).get()
|
Inbox peer.getProtocol(Inbox.typeId).get()
|
||||||
|
|
||||||
proc send*(self: TestPeer, msg: Message): ScheduledEvent =
|
proc send*(self: TestPeer, msg: Message): ScheduledEvent =
|
||||||
msg.sender = Peer(self).some
|
msg.sender = Peer(self).some
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user