mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-05 23:43:07 +00:00
Accounting: End to end with Cheque PoC (#304)
This commit is contained in:
parent
6b88052cd7
commit
1b14436afb
@ -72,6 +72,8 @@ procSuite "Waku SWAP Accounting":
|
|||||||
await sleepAsync(2000.millis)
|
await sleepAsync(2000.millis)
|
||||||
|
|
||||||
node1.wakuStore.setPeer(node2.peerInfo)
|
node1.wakuStore.setPeer(node2.peerInfo)
|
||||||
|
node1.wakuSwap.setPeer(node2.peerInfo)
|
||||||
|
node2.wakuSwap.setPeer(node1.peerInfo)
|
||||||
|
|
||||||
proc storeHandler(response: HistoryResponse) {.gcsafe, closure.} =
|
proc storeHandler(response: HistoryResponse) {.gcsafe, closure.} =
|
||||||
debug "storeHandler hit"
|
debug "storeHandler hit"
|
||||||
@ -88,3 +90,51 @@ procSuite "Waku SWAP Accounting":
|
|||||||
node2.wakuSwap.accounting[node1.peerInfo.peerId] == -1
|
node2.wakuSwap.accounting[node1.peerInfo.peerId] == -1
|
||||||
await node1.stop()
|
await node1.stop()
|
||||||
await node2.stop()
|
await node2.stop()
|
||||||
|
|
||||||
|
# TODO Add cheque here
|
||||||
|
asyncTest "Update accounting state after sending cheque":
|
||||||
|
let
|
||||||
|
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
|
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||||
|
Port(60000))
|
||||||
|
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
|
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||||
|
Port(60001))
|
||||||
|
contentTopic = ContentTopic(1)
|
||||||
|
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||||
|
|
||||||
|
var futures = [newFuture[bool](), newFuture[bool]()]
|
||||||
|
|
||||||
|
# Start nodes and mount protocols
|
||||||
|
await node1.start()
|
||||||
|
node1.mountSwap()
|
||||||
|
node1.mountStore()
|
||||||
|
await node2.start()
|
||||||
|
node2.mountSwap()
|
||||||
|
node2.mountStore()
|
||||||
|
|
||||||
|
await node2.subscriptions.notify("/waku/2/default-waku/proto", message)
|
||||||
|
|
||||||
|
await sleepAsync(2000.millis)
|
||||||
|
|
||||||
|
node1.wakuStore.setPeer(node2.peerInfo)
|
||||||
|
node1.wakuSwap.setPeer(node2.peerInfo)
|
||||||
|
node2.wakuSwap.setPeer(node1.peerInfo)
|
||||||
|
|
||||||
|
proc handler1(response: HistoryResponse) {.gcsafe, closure.} =
|
||||||
|
futures[0].complete(true)
|
||||||
|
proc handler2(response: HistoryResponse) {.gcsafe, closure.} =
|
||||||
|
futures[1].complete(true)
|
||||||
|
|
||||||
|
# TODO Handshakes - for now we assume implicit, e2e still works for PoC
|
||||||
|
await node1.query(HistoryQuery(topics: @[contentTopic]), handler1)
|
||||||
|
await node1.query(HistoryQuery(topics: @[contentTopic]), handler2)
|
||||||
|
|
||||||
|
check:
|
||||||
|
(await allFutures(futures).withTimeout(5.seconds)) == true
|
||||||
|
# Accounting table updated with credit and debit, respectively
|
||||||
|
# After sending a cheque the balance is partially adjusted
|
||||||
|
node1.wakuSwap.accounting[node2.peerInfo.peerId] == 1
|
||||||
|
node2.wakuSwap.accounting[node1.peerInfo.peerId] == -1
|
||||||
|
await node1.stop()
|
||||||
|
await node2.stop()
|
||||||
|
|||||||
@ -426,6 +426,8 @@ when isMainModule:
|
|||||||
if conf.swap:
|
if conf.swap:
|
||||||
mountSwap(node)
|
mountSwap(node)
|
||||||
|
|
||||||
|
# TODO Set swap peer, for now should be same as store peer
|
||||||
|
|
||||||
if conf.store:
|
if conf.store:
|
||||||
var store: MessageStore
|
var store: MessageStore
|
||||||
|
|
||||||
|
|||||||
@ -81,10 +81,44 @@ proc init*(T: type Cheque, buffer: seq[byte]): ProtoResult[T] =
|
|||||||
|
|
||||||
# TODO Test for credit/debit operations in succession
|
# TODO Test for credit/debit operations in succession
|
||||||
|
|
||||||
|
proc sendCheque*(ws: WakuSwap) {.async.} =
|
||||||
|
# TODO Better peer selection, for now using hardcoded peer
|
||||||
|
let peer = ws.peers[0]
|
||||||
|
let conn = await ws.switch.dial(peer.peerInfo.peerId, peer.peerInfo.addrs, WakuSwapCodec)
|
||||||
|
|
||||||
|
info "sendCheque"
|
||||||
|
|
||||||
|
# TODO Add beneficiary, etc
|
||||||
|
# XXX Hardcoded amount for now
|
||||||
|
await conn.writeLP(Cheque(amount: 1).encode().buffer)
|
||||||
|
|
||||||
|
# Set new balance
|
||||||
|
# XXX Assume peerId is first peer
|
||||||
|
let peerId = ws.peers[0].peerInfo.peerId
|
||||||
|
ws.accounting[peerId] -= 1
|
||||||
|
info "New accounting state", accounting = ws.accounting[peerId]
|
||||||
|
|
||||||
|
# TODO Authenticate cheque, check beneficiary etc
|
||||||
|
proc handleCheque*(ws: WakuSwap, cheque: Cheque) =
|
||||||
|
info "handle incoming cheque"
|
||||||
|
# XXX Assume peerId is first peer
|
||||||
|
let peerId = ws.peers[0].peerInfo.peerId
|
||||||
|
ws.accounting[peerId] += int(cheque.amount)
|
||||||
|
info "New accounting state", accounting = ws.accounting[peerId]
|
||||||
|
|
||||||
proc init*(wakuSwap: WakuSwap) =
|
proc init*(wakuSwap: WakuSwap) =
|
||||||
info "wakuSwap init 1"
|
info "wakuSwap init 1"
|
||||||
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||||
info "NYI swap handle incoming connection"
|
info "swap handle incoming connection"
|
||||||
|
var message = await conn.readLp(64*1024)
|
||||||
|
# XXX This can be handshake, etc
|
||||||
|
var res = Cheque.init(message)
|
||||||
|
if res.isErr:
|
||||||
|
error "failed to decode rpc"
|
||||||
|
return
|
||||||
|
|
||||||
|
info "received cheque", value=res.value
|
||||||
|
wakuSwap.handleCheque(res.value)
|
||||||
|
|
||||||
proc credit(peerId: PeerId, n: int) {.gcsafe, closure.} =
|
proc credit(peerId: PeerId, n: int) {.gcsafe, closure.} =
|
||||||
info "Crediting peer for", peerId, n
|
info "Crediting peer for", peerId, n
|
||||||
@ -94,6 +128,14 @@ proc init*(wakuSwap: WakuSwap) =
|
|||||||
wakuSwap.accounting[peerId] = -n
|
wakuSwap.accounting[peerId] = -n
|
||||||
info "Accounting state", accounting = wakuSwap.accounting[peerId]
|
info "Accounting state", accounting = wakuSwap.accounting[peerId]
|
||||||
|
|
||||||
|
# TODO Isolate to policy function
|
||||||
|
# TODO Tunable disconnect threshhold, hard code for PoC
|
||||||
|
let disconnectThreshhold = 2
|
||||||
|
if wakuSwap.accounting[peerId] >= disconnectThreshhold:
|
||||||
|
info "Disconnect threshhold hit, disconnect peer"
|
||||||
|
else:
|
||||||
|
info "Disconnect threshhold not hit"
|
||||||
|
|
||||||
# TODO Debit and credit here for Karma asset
|
# TODO Debit and credit here for Karma asset
|
||||||
proc debit(peerId: PeerId, n: int) {.gcsafe, closure.} =
|
proc debit(peerId: PeerId, n: int) {.gcsafe, closure.} =
|
||||||
info "Debiting peer for", peerId, n
|
info "Debiting peer for", peerId, n
|
||||||
@ -103,6 +145,15 @@ proc init*(wakuSwap: WakuSwap) =
|
|||||||
wakuSwap.accounting[peerId] = n
|
wakuSwap.accounting[peerId] = n
|
||||||
info "Accounting state", accounting = wakuSwap.accounting[peerId]
|
info "Accounting state", accounting = wakuSwap.accounting[peerId]
|
||||||
|
|
||||||
|
# TODO Isolate to policy function
|
||||||
|
# TODO Tunable payment threshhold, hard code for PoC
|
||||||
|
let paymentThreshhold = 1
|
||||||
|
if wakuSwap.accounting[peerId] >= paymentThreshhold:
|
||||||
|
info "Payment threshhold hit, send cheque"
|
||||||
|
discard wakuSwap.sendCheque()
|
||||||
|
else:
|
||||||
|
info "Payment threshhold not hit"
|
||||||
|
|
||||||
wakuSwap.handler = handle
|
wakuSwap.handler = handle
|
||||||
wakuSwap.codec = WakuSwapCodec
|
wakuSwap.codec = WakuSwapCodec
|
||||||
wakuSwap.credit = credit
|
wakuSwap.credit = credit
|
||||||
@ -118,4 +169,8 @@ proc init*(T: type WakuSwap, switch: Switch, rng: ref BrHmacDrbgContext): T =
|
|||||||
result.text = "test"
|
result.text = "test"
|
||||||
result.init()
|
result.init()
|
||||||
|
|
||||||
|
proc setPeer*(ws: WakuSwap, peer: PeerInfo) =
|
||||||
|
ws.peers.add(SwapPeer(peerInfo: peer))
|
||||||
|
|
||||||
# TODO End to end communication
|
# TODO End to end communication
|
||||||
|
|
||||||
|
|||||||
@ -20,10 +20,13 @@ type
|
|||||||
CreditHandler* = proc (peerId: PeerId, amount: int) {.gcsafe, closure.}
|
CreditHandler* = proc (peerId: PeerId, amount: int) {.gcsafe, closure.}
|
||||||
DebitHandler* = proc (peerId: PeerId, amount: int) {.gcsafe, closure.}
|
DebitHandler* = proc (peerId: PeerId, amount: int) {.gcsafe, closure.}
|
||||||
|
|
||||||
|
SwapPeer* = object
|
||||||
|
peerInfo*: PeerInfo
|
||||||
|
|
||||||
WakuSwap* = ref object of LPProtocol
|
WakuSwap* = ref object of LPProtocol
|
||||||
switch*: Switch
|
switch*: Switch
|
||||||
rng*: ref BrHmacDrbgContext
|
rng*: ref BrHmacDrbgContext
|
||||||
#peers*: seq[PeerInfo]
|
peers*: seq[SwapPeer]
|
||||||
text*: string
|
text*: string
|
||||||
accounting*: Table[PeerId, int]
|
accounting*: Table[PeerId, int]
|
||||||
credit*: CreditHandler
|
credit*: CreditHandler
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user