Accounting credit receiving node (#292)

* Accounting WIP: Swap access through Store

* Fix order bug and comment scenario + typo

* WIP

* Accounting: Account for receiving store node

- Turn accountFor function into credit and debit
- Misc formatting

* Accounting: Fix bugs related to mount and test

* Accounting: Simplify query signature

We already have a ref to wakuSwap through wakuStore now.

* Resolve rebase issues
This commit is contained in:
Oskar Thorén 2020-11-24 12:53:42 +08:00 committed by GitHub
parent 44e9d4d86b
commit 63b0e8af76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 74 additions and 36 deletions

View File

@ -203,6 +203,9 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId
echo &"Listening on\n {listenStr}"
if conf.swap:
node.mountSwap()
if conf.storenode != "":
node.mountStore()
@ -214,9 +217,6 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
echo &"{payload}"
info "Hit store handler"
if conf.swap:
node.mountSwap()
await node.query(HistoryQuery(topics: @[DefaultContentTopic]), storeHandler)
if conf.filternode != "":

View File

@ -61,11 +61,11 @@ procSuite "Waku SWAP Accounting":
# Start nodes and mount protocols
await node1.start()
node1.mountSwap()
node1.mountStore()
node1.mountSwap()
await node2.start()
node2.mountSwap()
node2.mountStore()
node1.mountSwap()
await node2.subscriptions.notify("/waku/2/default-waku/proto", message)
@ -81,13 +81,10 @@ procSuite "Waku SWAP Accounting":
await node1.query(HistoryQuery(topics: @[contentTopic]), storeHandler)
# TODO Other node accounting field not set
# info "node2", msgs = node2.wakuSwap.accounting # crashes
# node2.wakuSwap.accounting[node1.peerInfo.peerId] = -1
check:
(await completionFut.withTimeout(5.seconds)) == true
# Accounting table updated with one message credit
# Accounting table updated with credit and debit, respectively
node1.wakuSwap.accounting[node2.peerInfo.peerId] == 1
node2.wakuSwap.accounting[node1.peerInfo.peerId] == -1
await node1.stop()
await node2.stop()

View File

@ -238,12 +238,14 @@ proc query*(node: WakuNode, query: HistoryQuery, handler: QueryHandlerFunc) {.as
##
## Status: Implemented.
# TODO Once waku swap is less experimental, this can simplified
if node.wakuSwap.isNil:
debug "Using default query"
await node.wakuStore.query(query, handler)
else:
debug "Using SWAPAccounting query"
await node.wakuStore.queryWithAccounting(query, handler, node.wakuSwap)
# TODO wakuSwap now part of wakuStore object
await node.wakuStore.queryWithAccounting(query, handler)
# TODO Extend with more relevant info: topics, peers, memory usage, online time, etc
proc info*(node: WakuNode): WakuInfo =
@ -269,12 +271,8 @@ proc mountFilter*(node: WakuNode) =
node.switch.mount(node.wakuFilter)
node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription())
proc mountStore*(node: WakuNode, store: MessageStore = nil) =
info "mounting store"
node.wakuStore = WakuStore.init(node.switch, node.rng, store)
node.switch.mount(node.wakuStore)
node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription())
# NOTE: If using the swap protocol, it must be mounted before store. This is
# because store is using a reference to the swap protocol.
proc mountSwap*(node: WakuNode) =
info "mounting swap"
node.wakuSwap = WakuSwap.init(node.switch, node.rng)
@ -282,6 +280,19 @@ proc mountSwap*(node: WakuNode) =
# NYI - Do we need this?
#node.subscriptions.subscribe(WakuSwapCodec, node.wakuSwap.subscription())
proc mountStore*(node: WakuNode, store: MessageStore = nil) =
info "mounting store"
if node.wakuSwap.isNil:
debug "mounting store without swap"
node.wakuStore = WakuStore.init(node.switch, node.rng, store)
else:
debug "mounting store with swap"
node.wakuStore = WakuStore.init(node.switch, node.rng, store, node.wakuSwap)
node.switch.mount(node.wakuStore)
node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription())
proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string]()) {.async, gcsafe.} =
let wakuRelay = WakuRelay.init(
switch = node.switch,
@ -430,7 +441,7 @@ when isMainModule:
store = res.value
mountStore(node, store)
if conf.filter:
mountFilter(node)

View File

@ -309,6 +309,18 @@ method init*(ws: WakuStore) =
let value = res.value
let response = ws.findMessages(res.value.query)
# TODO Do accounting here, response is HistoryResponse
# How do we get node or swap context?
if not ws.wakuSwap.isNil:
info "handle store swap test", text=ws.wakuSwap.text
# NOTE Perform accounting operation
let peerId = conn.peerInfo.peerId
let messages = response.messages
ws.wakuSwap.credit(peerId, messages.len)
else:
info "handle store swap is nil"
await conn.writeLp(HistoryRPC(requestId: value.requestId,
response: response).encode().buffer)
@ -325,11 +337,13 @@ method init*(ws: WakuStore) =
if res.isErr:
warn "failed to load messages from store", err = res.error
proc init*(T: type WakuStore, switch: Switch, rng: ref BrHmacDrbgContext, store: MessageStore = nil): T =
proc init*(T: type WakuStore, switch: Switch, rng: ref BrHmacDrbgContext,
store: MessageStore = nil, wakuSwap: WakuSwap = nil): T =
new result
result.rng = rng
result.switch = switch
result.store = store
result.wakuSwap = wakuSwap
result.init()
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
@ -377,8 +391,7 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn
handler(response.value.response)
# NOTE: Experimental, maybe incorporate as part of query call
proc queryWithAccounting*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc,
wakuSwap: WakuSwap) {.async, gcsafe.} =
proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} =
# @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service.
# Ideally depending on the query and our set of peers we take a subset of ideal peers.
# This will require us to check for various factors such as:
@ -386,10 +399,10 @@ proc queryWithAccounting*(w: WakuStore, query: HistoryQuery, handler: QueryHandl
# - latency?
# - default store peer?
let peer = w.peers[0]
let conn = await w.switch.dial(peer.peerInfo.peerId, peer.peerInfo.addrs, WakuStoreCodec)
let peer = ws.peers[0]
let conn = await ws.switch.dial(peer.peerInfo.peerId, peer.peerInfo.addrs, WakuStoreCodec)
await conn.writeLP(HistoryRPC(requestId: generateRequestId(w.rng),
await conn.writeLP(HistoryRPC(requestId: generateRequestId(ws.rng),
query: query).encode().buffer)
var message = await conn.readLp(64*1024)
@ -400,9 +413,9 @@ proc queryWithAccounting*(w: WakuStore, query: HistoryQuery, handler: QueryHandl
return
# NOTE Perform accounting operation
# if SWAPAccountingEnabled:
# Assumes wakuSwap protocol is mounted
let peerId = peer.peerInfo.peerId
let messages = response.value.response.messages
wakuSwap.accountFor(peerId, messages.len)
ws.wakuSwap.debit(peerId, messages.len)
handler(response.value.response)

View File

@ -4,7 +4,8 @@ import
bearssl, stew/[byteutils, endians2],
libp2p/[switch, peerinfo],
libp2p/protocols/protocol,
../../waku_types
../../waku_types,
../waku_swap/waku_swap_types
type
QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
@ -47,3 +48,4 @@ type
peers*: seq[HistoryPeer]
messages*: seq[IndexedWakuMessage]
store*: MessageStore
wakuSwap*: WakuSwap

View File

@ -40,6 +40,8 @@ logScope:
const WakuSwapCodec* = "/vac/waku/swap/2.0.0-alpha1"
# Serialization
# -------------------------------------------------------------------------------
proc encode*(handshake: Handshake): ProtoBuffer =
result = initProtoBuffer()
result.write(1, handshake.beneficiary)
@ -72,19 +74,29 @@ proc init*(T: type Cheque, buffer: seq[byte]): ProtoResult[T] =
ok(cheque)
# Accounting
# -------------------------------------------------------------------------------
#
# We credit and debits peers based on what for now is a form of Karma asset.
# TODO Test for credit/debit operations in succession
proc init*(wakuSwap: WakuSwap) =
info "wakuSwap init 1"
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
info "NYI swap handle incoming connection"
proc accountFor(peerId: PeerId, n: int) {.gcsafe, closure.} =
info "Accounting for", peerId, n
info "Accounting test", text = wakuSwap.text
# Nicer way to write this?
proc credit(peerId: PeerId, n: int) {.gcsafe, closure.} =
info "Crediting peer for", peerId, n
if wakuSwap.accounting.hasKey(peerId):
wakuSwap.accounting[peerId] -= n
else:
wakuSwap.accounting[peerId] = -n
info "Accounting state", accounting = wakuSwap.accounting[peerId]
# TODO Debit and credit here for Karma asset
proc debit(peerId: PeerId, n: int) {.gcsafe, closure.} =
info "Debiting peer for", peerId, n
if wakuSwap.accounting.hasKey(peerId):
wakuSwap.accounting[peerId] += n
else:
@ -93,9 +105,10 @@ proc init*(wakuSwap: WakuSwap) =
wakuSwap.handler = handle
wakuSwap.codec = WakuSwapCodec
wakuSwap.accountFor = accountFor
wakuSwap.credit = credit
wakuSwap.debit = debit
# TODO Expression return?
proc init*(T: type WakuSwap, switch: Switch, rng: ref BrHmacDrbgContext): T =
info "wakuSwap init 2"
new result

View File

@ -17,7 +17,8 @@ type
date*: uint32
amount*: uint32
AccountHandler* = proc (peerId: PeerId, amount: int) {.gcsafe, closure.}
CreditHandler* = proc (peerId: PeerId, amount: int) {.gcsafe, closure.}
DebitHandler* = proc (peerId: PeerId, amount: int) {.gcsafe, closure.}
WakuSwap* = ref object of LPProtocol
switch*: Switch
@ -25,4 +26,5 @@ type
#peers*: seq[PeerInfo]
text*: string
accounting*: Table[PeerId, int]
accountFor*: AccountHandler
credit*: CreditHandler
debit*: DebitHandler