feat: state machine (#2656)

This commit is contained in:
Simon-Pierre Vivier 2024-05-06 09:28:50 -04:00 committed by SionoiS
parent 769f89d75e
commit d1c9453551
No known key found for this signature in database
GPG Key ID: C9458A8CB1852951
7 changed files with 361 additions and 183 deletions

2
.gitmodules vendored
View File

@ -168,4 +168,4 @@
ignore = untracked
path = vendor/negentropy
url = https://github.com/waku-org/negentropy.git
branch = master
branch = master

View File

@ -11,7 +11,7 @@ proc newTestWakuSync*(
const DefaultFrameSize = 153600
let
peerManager = PeerManager.new(switch)
proto = WakuSync.new(peerManager, DefaultFrameSize, 0.seconds, some(handler))
proto = WakuSync.new(peerManager, DefaultFrameSize, 0.seconds, 0, some(handler))
assert proto != nil
proto.start()

View File

@ -429,10 +429,10 @@ suite "Waku Sync":
check:
s2.insert(msg2.timestamp, msgHash2).isOk()
let subrange1Res = SubRange.new(s1, 0, int64.high)
let subrange1Res = SubRange.new(s1, 0, uint64.high)
assert subrange1Res.isOk(), $subrange1Res.error
let subrange1 = subrange1Res.value
let subrange2Res = SubRange.new(s2, 0, int64.high)
let subrange2Res = SubRange.new(s2, 0, uint64.high)
assert subrange2Res.isOk(), $subrange2Res.error
let subrange2 = subrange2Res.value

View File

@ -59,6 +59,7 @@ proc decode*(T: type SyncPayload, buffer: seq[byte]): ProtobufResult[T] =
else:
req.hashes = newSeqOfCap[WakuMessageHash](buffer.len)
for buf in buffer:
req.messageHashes.add(WakuMessageHash.fromBytes(buf))
let msg: WakuMessageHash = fromBytes(buf)
req.hashes.add(msg)
return ok(req)

View File

@ -3,10 +3,16 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}
import std/[options]
import std/[options], chronos
import ../waku_core
const DefaultSyncInterval*: timer.Duration = Hour
const WakuSyncCodec* = "/vac/waku/sync/1.0.0"
const DefaultFrameSize* = 153600
type WakuSyncCallback* = proc(hashes: seq[WakuMessageHash], syncPeer: RemotePeerInfo) {.
async: (raises: []), closure
.}
type SyncPayload* = object
rangeStart*: Option[uint64]
@ -14,6 +20,6 @@ type SyncPayload* = object
frameSize*: Option[uint64]
negentropy*: seq[byte]
negentropy*: seq[byte] # negentropy protocol payload
hashes*: seq[WakuMessageHash]

View File

@ -27,21 +27,14 @@ import
logScope:
topics = "waku sync"
const DefaultSyncInterval: timer.Duration = Hour
const DefaultFrameSize = 153600
type
WakuSyncCallback* = proc(hashes: seq[WakuMessageHash], syncPeer: RemotePeerInfo) {.
async: (raises: []), closure, gcsafe
.}
WakuSync* = ref object of LPProtocol
storage: Storage
peerManager: PeerManager
maxFrameSize: int # Not sure if this should be protocol defined or not...
syncInterval: timer.Duration
callback: Option[WakuSyncCallback]
periodicSyncFut: Future[void]
type WakuSync* = ref object of LPProtocol
storage: Storage # Negentropy protocol storage
peerManager: PeerManager
maxFrameSize: int # Not sure if this should be protocol defined or not...
syncInterval: timer.Duration # Time between each syncronisation attempt
relayJitter: int64 # Time delay until all messages are mostly received network wide
callback: Option[WakuSyncCallback] # Callback with the result of the syncronisation
periodicSyncFut: Future[void]
proc ingessMessage*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) =
if msg.ephemeral:
@ -50,24 +43,69 @@ proc ingessMessage*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage)
# because what if messages is received via gossip and sync as well?
# Might 2 entries to be inserted into storage which is inefficient.
let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic, msg)
info "inserting message into storage ", hash = msgHash, timestamp = msg.timestamp
trace "inserting message into storage ",
hash = msgHash.toHex(), timestamp = msg.timestamp
if self.storage.insert(msg.timestamp, msgHash).isErr():
debug "failed to insert message ", hash = msgHash.toHex()
proc calculateRange(relayJitter: int64): (int64, int64) =
var now = getNowInNanosecondTime()
# Because of message jitter inherent to GossipSub
now -= relayJitter
let range = getNanosecondTime(3600) # 1 hour
let start = now - range
let `end` = now
return (start, `end`)
proc request(
self: WakuSync, conn: Connection
): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} =
let syncSession = SyncSession(
sessType: SyncSessionType.CLIENT,
curState: SyncSessionState.INIT,
frameSize: DefaultFrameSize,
rangeStart: 0, #TODO: Pass start of this hour??
rangeEnd: times.getTime().toUnix(),
)
let hashes = (await syncSession.HandleClientSession(conn, self.storage)).valueOr:
return err(error)
return ok(hashes)
let (start, `end`) = calculateRange(self.relayJitter)
let frameSize = DefaultFrameSize
let initialized = ?clientInitialize(self.storage, conn, frameSize, start, `end`)
debug "sync session initialized",
client = self.peerManager.switch.peerInfo.peerId,
server = conn.peerId,
frameSize = frameSize,
timeStart = start,
timeEnd = `end`
var hashes: seq[WakuMessageHash]
var reconciled = initialized
while true:
let sent = ?await reconciled.send()
trace "sync payload sent",
client = self.peerManager.switch.peerInfo.peerId,
server = conn.peerId,
payload = reconciled.payload
let received = ?await sent.listenBack()
trace "sync payload received",
client = self.peerManager.switch.peerInfo.peerId,
server = conn.peerId,
payload = received.payload
reconciled = (?received.clientReconcile(hashes)).valueOr:
let completed = error # Result[Reconciled, Completed]
?await completed.clientTerminate()
debug "sync session ended gracefully",
client = self.peerManager.switch.peerInfo.peerId, server = conn.peerId
return ok(hashes)
proc sync*(
self: WakuSync
@ -79,6 +117,10 @@ proc sync*(
return err("Cannot establish sync connection")
let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr:
debug "sync session ended",
server = self.peerManager.switch.peerInfo.peerId,
client = conn.peerId,
error = $error
return err("Sync request error: " & error)
return ok((hashes, peer))
@ -91,24 +133,66 @@ proc sync*(
return err("Cannot establish sync connection")
let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr:
debug "sync session ended",
server = self.peerManager.switch.peerInfo.peerId,
client = conn.peerId,
error = $error
return err("Sync request error: " & error)
return ok(hashes)
proc handleLoop(
self: WakuSync, conn: Connection
): Future[Result[seq[WakuMessageHash], string]] {.async.} =
let (start, `end`) = calculateRange(self.relayJitter)
let frameSize = DefaultFrameSize
let initialized = ?serverInitialize(self.storage, conn, frameSize, start, `end`)
var sent = initialized
while true:
let received = ?await sent.listenBack()
trace "sync payload received",
server = self.peerManager.switch.peerInfo.peerId,
client = conn.peerId,
payload = received.payload
let reconciled = (?received.serverReconcile()).valueOr:
let completed = error # Result[Reconciled, Completed]
let hashes = await completed.serverTerminate()
return ok(hashes)
sent = ?await reconciled.send()
trace "sync payload sent",
server = self.peerManager.switch.peerInfo.peerId,
client = conn.peerId,
payload = reconciled.payload
proc initProtocolHandler(self: WakuSync) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let syncSession = SyncSession(
sessType: SyncSessionType.SERVER,
curState: SyncSessionState.INIT,
frameSize: DefaultFrameSize,
rangeStart: 0, #TODO: Pass start of this hour??
rangeEnd: 0,
)
debug "Server sync session requested", remotePeer = $conn.peerId
debug "sync session requested",
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId
await syncSession.HandleServerSession(conn, self.storage)
let hashes = (await self.handleLoop(conn)).valueOr:
debug "sync session ended",
server = self.peerManager.switch.peerInfo.peerId,
client = conn.peerId,
error = $error
debug "Server sync session ended"
#TODO send error code and desc to client
return
#TODO handle the hashes that the server need from the client
debug "sync session ended gracefully",
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId
self.handler = handle
self.codec = WakuSyncCodec
@ -118,10 +202,11 @@ proc new*(
peerManager: PeerManager,
maxFrameSize: int = DefaultFrameSize,
syncInterval: timer.Duration = DefaultSyncInterval,
relayJitter: int64 = 20, #Default gossipsub jitter in network.
callback: Option[WakuSyncCallback] = none(WakuSyncCallback),
): T =
let storage = Storage.new().valueOr:
error "storage creation failed"
debug "storage creation failed"
return
let sync = WakuSync(
@ -130,11 +215,12 @@ proc new*(
maxFrameSize: maxFrameSize,
syncInterval: syncInterval,
callback: callback,
relayJitter: relayJitter,
)
sync.initProtocolHandler()
info "Created WakuSync protocol"
info "WakuSync protocol initialized"
return sync
@ -143,7 +229,7 @@ proc periodicSync(self: WakuSync) {.async.} =
await sleepAsync(self.syncInterval)
let (hashes, peer) = (await self.sync()).valueOr:
error "periodic sync error", error = error
debug "periodic sync error", error = error
continue
let callback = self.callback.valueOr:
@ -157,9 +243,9 @@ proc start*(self: WakuSync) =
# start periodic-sync only if interval is set.
self.periodicSyncFut = self.periodicSync()
info "WakuSync protocol started"
proc stopWait*(self: WakuSync) {.async.} =
await self.periodicSyncFut.cancelAndWait()
#[ TODO:Fetch from storageManager??
proc storageSize*(self: WakuSync): int =
return self.storage.size() ]#
info "WakuSync protocol stopped"

View File

@ -3,149 +3,234 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}
import std/options, stew/results, chronicles, chronos, libp2p/stream/connection
import std/options, stew/results, chronos, libp2p/stream/connection
import ../common/nimchronos, ../waku_core, ./raw_bindings, ./storage_manager
import
../common/nimchronos,
../common/protobuf,
../waku_core,
./raw_bindings,
./common,
./codec
logScope:
topics = "waku sync"
#TODO add states for protocol negotiation
type SyncSessionType* = enum
CLIENT = 1
SERVER = 2
### Type State ###
type SyncSessionState* = enum
INIT = 1
NEGENTROPY_SYNC = 2
COMPLETE = 3
type ClientSync* = object
haveHashes: seq[WakuMessageHash]
type SyncSession* = ref object
sessType*: SyncSessionType
curState*: SyncSessionState
frameSize*: int
rangeStart*: int64
rangeEnd*: int64
negentropy*: NegentropySubRange
type ServerSync* = object
#[
Session State Machine
1. negotiate sync params
2. start negentropy sync
3. find out local needhashes
4. If client, share peer's needhashes to peer
]#
type Reconciled*[T] = object
sync: T
negentropy: NegentropySubRange
connection: Connection
frameSize: int
payload*: SyncPayload
proc initializeNegentropy(
self: SyncSession, storage: Storage, syncStartTime: int64, syncEndTime: int64
): Result[void, string] =
#TODO Create a subrange
let subrange = SubRange.new(storage, uint64(syncStartTime), uint64(syncEndTime)).valueOr:
return err(error)
let negentropy = NegentropySubrange.new(subrange, self.frameSize).valueOr:
return err(error)
type Sent*[T] = object
sync: T
negentropy: NegentropySubRange
connection: Connection
frameSize: int
self.negentropy = negentropy
type Received*[T] = object
sync: T
negentropy: NegentropySubRange
connection: Connection
frameSize: int
payload*: SyncPayload
type Completed*[T] = object
sync: T
negentropy: NegentropySubRange
connection: Connection
haveHashes: seq[WakuMessageHash]
### State Transition ###
proc clientInitialize*(
store: Storage,
conn: Connection,
frameSize = DefaultFrameSize,
start = int64.low,
`end` = int64.high,
): Result[Reconciled[ClientSync], string] =
let subrange = ?SubRange.new(store, uint64(start), uint64(`end`))
let negentropy = ?NegentropySubrange.new(subrange, frameSize)
let negentropyPayload = ?negentropy.initiate()
let payload = SyncPayload(negentropy: seq[byte](negentropyPayload))
let sync = ClientSync()
return ok(
Reconciled[ClientSync](
sync: sync,
negentropy: negentropy,
connection: conn,
frameSize: frameSize,
payload: payload,
)
)
proc serverInitialize*(
store: Storage,
conn: Connection,
frameSize = DefaultFrameSize,
start = int64.low,
`end` = int64.high,
): Result[Sent[ServerSync], string] =
let subrange = ?SubRange.new(store, uint64(start), uint64(`end`))
let negentropy = ?NegentropySubrange.new(subrange, frameSize)
let sync = ServerSync()
return ok(
Sent[ServerSync](
sync: sync, negentropy: negentropy, connection: conn, frameSize: frameSize
)
)
proc send*[T](self: Reconciled[T]): Future[Result[Sent[T], string]] {.async.} =
let writeRes = catch:
await self.connection.writeLP(self.payload.encode().buffer)
if writeRes.isErr():
return err("send connection write error: " & writeRes.error.msg)
return ok(
Sent[T](
sync: self.sync,
negentropy: self.negentropy,
connection: self.connection,
frameSize: self.frameSize,
)
)
proc listenBack*[T](self: Sent[T]): Future[Result[Received[T], string]] {.async.} =
let readRes = catch:
await self.connection.readLp(-1)
let buffer: seq[byte] =
if readRes.isOk():
readRes.get()
else:
return err("listenBack connection read error: " & readRes.error.msg)
# can't otherwise the compiler complains
#let payload = SyncPayload.decode(buffer).valueOr:
#return err($error)
let decodeRes = SyncPayload.decode(buffer)
let payload =
if decodeRes.isOk():
decodeRes.get()
else:
let decodeError: ProtobufError = decodeRes.error
let errMsg = $decodeError
return err("listenBack decoding error: " & errMsg)
return ok(
Received[T](
sync: self.sync,
negentropy: self.negentropy,
connection: self.connection,
frameSize: self.frameSize,
payload: payload,
)
)
proc clientReconcile*(
self: Received[ClientSync], needHashes: var seq[WakuMessageHash]
): Result[Result[Reconciled[ClientSync], Completed[ClientSync]], string] =
var haves = self.sync.haveHashes
let responseOpt =
?self.negentropy.clientReconcile(
NegentropyPayload(self.payload.negentropy), haves, needHashes
)
let sync = ClientSync(haveHashes: haves)
let response = responseOpt.valueOr:
let res = Result[Reconciled[ClientSync], Completed[ClientSync]].err(
Completed[ClientSync](
sync: sync, negentropy: self.negentropy, connection: self.connection
)
)
return ok(res)
let payload = SyncPayload(negentropy: seq[byte](response), hashes: haves)
let res = Result[Reconciled[ClientSync], Completed[ClientSync]].ok(
Reconciled[ClientSync](
sync: sync,
negentropy: self.negentropy,
connection: self.connection,
frameSize: self.frameSize,
payload: payload,
)
)
return ok(res)
proc serverReconcile*(
self: Received[ServerSync]
): Result[Result[Reconciled[ServerSync], Completed[ServerSync]], string] =
if self.payload.negentropy.len == 0:
let res = Result[Reconciled[ServerSync], Completed[ServerSync]].err(
Completed[ServerSync](
sync: self.sync,
negentropy: self.negentropy,
connection: self.connection,
haveHashes: self.payload.hashes,
)
)
return ok(res)
let response =
?self.negentropy.serverReconcile(NegentropyPayload(self.payload.negentropy))
let payload = SyncPayload(negentropy: seq[byte](response))
let res = Result[Reconciled[ServerSync], Completed[ServerSync]].ok(
Reconciled[ServerSync](
sync: self.sync,
negentropy: self.negentropy,
connection: self.connection,
frameSize: self.frameSize,
payload: payload,
)
)
return ok(res)
proc clientTerminate*(
self: Completed[ClientSync]
): Future[Result[void, string]] {.async.} =
let payload = SyncPayload(hashes: self.sync.haveHashes)
let writeRes = catch:
await self.connection.writeLp(payload.encode().buffer)
if writeRes.isErr():
return err("clientTerminate connection write error: " & writeRes.error.msg)
self.negentropy.delete()
return ok()
proc HandleClientSession*(
self: SyncSession, conn: Connection, storage: Storage
): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} =
if self
.initializeNegentropy(
storage,
timestampInSeconds(getNowInNanosecondTime()),
# now , TODO: this needs to be tuned maybe consider 20 seconds jitter in network.
int64.high, #timestampInSeconds(getNowInNanosecondTime()) - 60 * 60, # 1 hour
)
.isErr():
return
defer:
self.negentropy.delete()
proc serverTerminate*(
self: Completed[ServerSync]
): Future[seq[WakuMessageHash]] {.async.} =
self.negentropy.delete()
let payload = self.negentropy.initiate().valueOr:
return err(error)
debug "Client sync session initialized", remotePeer = conn.peerId
let writeRes = catch:
await conn.writeLP(seq[byte](payload))
trace "request sent to server", payload = toHex(seq[byte](payload))
if writeRes.isErr():
return err(writeRes.error.msg)
var
haveHashes: seq[WakuMessageHash] # Send it across to Server at the end of sync
needHashes: seq[WakuMessageHash]
while true:
let readRes = catch:
await conn.readLp(self.frameSize)
let buffer: seq[byte] = readRes.valueOr:
return err(error.msg)
trace "Received Sync request from peer", payload = toHex(buffer)
let request = NegentropyPayload(buffer)
let responseOpt = self.negentropy.clientReconcile(request, haveHashes, needHashes).valueOr:
return err(error)
let response = responseOpt.valueOr:
debug "Closing connection, client sync session is done"
await conn.close()
break
trace "Sending Sync response to peer", payload = toHex(seq[byte](response))
let writeRes = catch:
await conn.writeLP(seq[byte](response))
if writeRes.isErr():
return err(writeRes.error.msg)
return ok(needHashes)
proc HandleServerSession*(
self: SyncSession, conn: Connection, storage: Storage
) {.async, gcsafe.} =
#TODO: Pass sync time based on data in request??
#TODO: Return error rather than closing stream abruptly?
if self
.initializeNegentropy(
storage,
timestampInSeconds(getNowInNanosecondTime()),
int64.high, #timestampInSeconds(getNowInNanosecondTime()) - 60 * 60,
)
.isErr():
return
defer:
self.negentropy.delete()
while not conn.isClosed:
let requestRes = catch:
await conn.readLp(self.frameSize)
let buffer = requestRes.valueOr:
if error.name != $LPStreamRemoteClosedError or error.name != $LPStreamClosedError:
debug "Connection reading error", error = error.msg
break
#TODO: Once we receive needHashes or endOfSync, we should close this stream.
let request = NegentropyPayload(buffer)
let response = self.negentropy.serverReconcile(request).valueOr:
error "Reconciliation error", error = error
break
let writeRes = catch:
await conn.writeLP(seq[byte](response))
if writeRes.isErr():
error "Connection write error", error = writeRes.error.msg
break
return
return self.haveHashes