diff --git a/tests/v2/test_waku_swap.nim b/tests/v2/test_waku_swap.nim index 0c57e6e6c..612545bcc 100644 --- a/tests/v2/test_waku_swap.nim +++ b/tests/v2/test_waku_swap.nim @@ -72,6 +72,8 @@ procSuite "Waku SWAP Accounting": await sleepAsync(2000.millis) node1.wakuStore.setPeer(node2.peerInfo) + node1.wakuSwap.setPeer(node2.peerInfo) + node2.wakuSwap.setPeer(node1.peerInfo) proc storeHandler(response: HistoryResponse) {.gcsafe, closure.} = debug "storeHandler hit" @@ -88,3 +90,51 @@ procSuite "Waku SWAP Accounting": node2.wakuSwap.accounting[node1.peerInfo.peerId] == -1 await node1.stop() await node2.stop() + + # TODO Add cheque here + asyncTest "Update accounting state after sending cheque": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), + Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), + Port(60001)) + contentTopic = ContentTopic(1) + message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) + + var futures = [newFuture[bool](), newFuture[bool]()] + + # Start nodes and mount protocols + await node1.start() + node1.mountSwap() + node1.mountStore() + await node2.start() + node2.mountSwap() + node2.mountStore() + + await node2.subscriptions.notify("/waku/2/default-waku/proto", message) + + await sleepAsync(2000.millis) + + node1.wakuStore.setPeer(node2.peerInfo) + node1.wakuSwap.setPeer(node2.peerInfo) + node2.wakuSwap.setPeer(node1.peerInfo) + + proc handler1(response: HistoryResponse) {.gcsafe, closure.} = + futures[0].complete(true) + proc handler2(response: HistoryResponse) {.gcsafe, closure.} = + futures[1].complete(true) + + # TODO Handshakes - for now we assume implicit, e2e still works for PoC + await node1.query(HistoryQuery(topics: @[contentTopic]), handler1) + await node1.query(HistoryQuery(topics: @[contentTopic]), handler2) + + check: + (await allFutures(futures).withTimeout(5.seconds)) == true + # Accounting table updated with credit and debit, respectively + # After sending a cheque the balance is partially adjusted + node1.wakuSwap.accounting[node2.peerInfo.peerId] == 1 + node2.wakuSwap.accounting[node1.peerInfo.peerId] == -1 + await node1.stop() + await node2.stop() diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index b1032fff4..75e23f4a1 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -426,6 +426,8 @@ when isMainModule: if conf.swap: mountSwap(node) + # TODO Set swap peer, for now should be same as store peer + if conf.store: var store: MessageStore diff --git a/waku/v2/protocol/waku_swap/waku_swap.nim b/waku/v2/protocol/waku_swap/waku_swap.nim index 78a31a47e..41cf9c0b1 100644 --- a/waku/v2/protocol/waku_swap/waku_swap.nim +++ b/waku/v2/protocol/waku_swap/waku_swap.nim @@ -81,10 +81,44 @@ proc init*(T: type Cheque, buffer: seq[byte]): ProtoResult[T] = # TODO Test for credit/debit operations in succession +proc sendCheque*(ws: WakuSwap) {.async.} = + # TODO Better peer selection, for now using hardcoded peer + let peer = ws.peers[0] + let conn = await ws.switch.dial(peer.peerInfo.peerId, peer.peerInfo.addrs, WakuSwapCodec) + + info "sendCheque" + + # TODO Add beneficiary, etc + # XXX Hardcoded amount for now + await conn.writeLP(Cheque(amount: 1).encode().buffer) + + # Set new balance + # XXX Assume peerId is first peer + let peerId = ws.peers[0].peerInfo.peerId + ws.accounting[peerId] -= 1 + info "New accounting state", accounting = ws.accounting[peerId] + +# TODO Authenticate cheque, check beneficiary etc +proc handleCheque*(ws: WakuSwap, cheque: Cheque) = + info "handle incoming cheque" + # XXX Assume peerId is first peer + let peerId = ws.peers[0].peerInfo.peerId + ws.accounting[peerId] += int(cheque.amount) + info "New accounting state", accounting = ws.accounting[peerId] + proc init*(wakuSwap: WakuSwap) = info "wakuSwap init 1" proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = - info "NYI swap handle incoming connection" + info "swap handle incoming connection" + var message = await conn.readLp(64*1024) + # XXX This can be handshake, etc + var res = Cheque.init(message) + if res.isErr: + error "failed to decode rpc" + return + + info "received cheque", value=res.value + wakuSwap.handleCheque(res.value) proc credit(peerId: PeerId, n: int) {.gcsafe, closure.} = info "Crediting peer for", peerId, n @@ -94,6 +128,14 @@ proc init*(wakuSwap: WakuSwap) = wakuSwap.accounting[peerId] = -n info "Accounting state", accounting = wakuSwap.accounting[peerId] + # TODO Isolate to policy function + # TODO Tunable disconnect threshhold, hard code for PoC + let disconnectThreshhold = 2 + if wakuSwap.accounting[peerId] >= disconnectThreshhold: + info "Disconnect threshhold hit, disconnect peer" + else: + info "Disconnect threshhold not hit" + # TODO Debit and credit here for Karma asset proc debit(peerId: PeerId, n: int) {.gcsafe, closure.} = info "Debiting peer for", peerId, n @@ -103,6 +145,15 @@ proc init*(wakuSwap: WakuSwap) = wakuSwap.accounting[peerId] = n info "Accounting state", accounting = wakuSwap.accounting[peerId] + # TODO Isolate to policy function + # TODO Tunable payment threshhold, hard code for PoC + let paymentThreshhold = 1 + if wakuSwap.accounting[peerId] >= paymentThreshhold: + info "Payment threshhold hit, send cheque" + discard wakuSwap.sendCheque() + else: + info "Payment threshhold not hit" + wakuSwap.handler = handle wakuSwap.codec = WakuSwapCodec wakuSwap.credit = credit @@ -118,4 +169,8 @@ proc init*(T: type WakuSwap, switch: Switch, rng: ref BrHmacDrbgContext): T = result.text = "test" result.init() +proc setPeer*(ws: WakuSwap, peer: PeerInfo) = + ws.peers.add(SwapPeer(peerInfo: peer)) + # TODO End to end communication + diff --git a/waku/v2/protocol/waku_swap/waku_swap_types.nim b/waku/v2/protocol/waku_swap/waku_swap_types.nim index 7548d8778..e25b255a5 100644 --- a/waku/v2/protocol/waku_swap/waku_swap_types.nim +++ b/waku/v2/protocol/waku_swap/waku_swap_types.nim @@ -20,10 +20,13 @@ type CreditHandler* = proc (peerId: PeerId, amount: int) {.gcsafe, closure.} DebitHandler* = proc (peerId: PeerId, amount: int) {.gcsafe, closure.} + SwapPeer* = object + peerInfo*: PeerInfo + WakuSwap* = ref object of LPProtocol switch*: Switch rng*: ref BrHmacDrbgContext - #peers*: seq[PeerInfo] + peers*: seq[SwapPeer] text*: string accounting*: Table[PeerId, int] credit*: CreditHandler