Typed pubsub (#3)

This commit is contained in:
Tanguy 2022-11-02 20:29:29 +01:00 committed by GitHub
parent 1c69daf469
commit 9fba811117
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 4 deletions

View File

@ -1,4 +1,9 @@
import std/strutils, testground_sdk, chronos, stew/byteutils
import std/strutils, std/random
import testground_sdk, chronos, stew/byteutils
type
AwesomeStruct = object
rand: int
testground(client):
let
@ -18,6 +23,12 @@ testground(client):
await client.waitForBarrier("network_setup", client.testInstanceCount)
randomize()
await client.publish("rands", AwesomeStruct(rand: rand(100)))
let randomValues = client.subscribe("rands", AwesomeStruct)
for _ in 0 ..< 2:
echo await randomValues.popFirst()
let
payload = client.param(string, "payload")
count = client.param(int, "count")

View File

@ -1,5 +1,5 @@
import
std/[tables, parseutils, strutils, os, sequtils],
std/[tables, parseutils, strutils, os, sequtils, json],
chronos, websock/websock, chronicles, stew/byteutils
# workaround https://github.com/status-im/nim-serialization/issues/43
@ -9,6 +9,7 @@ import json_serialization/std/options
export sequtils, strutils, os, tables
export chronos, options, chronicles, websock
export json_serialization
type
Client* = ref object
@ -197,7 +198,22 @@ proc subscribe*(c: Client, topic: string): AsyncQueue[string] =
)
).toJson())
proc publish*(c: Client, topic, content: string) {.async.} =
proc subscribe*[T](c: Client, topic: string, _: type[T]): AsyncQueue[T] =
var
theQueue = c.subscribe(topic)
resQueue = newAsyncQueue[T](1000)
proc getter {.async.} =
mixin decode
while true:
let elem = await theQueue.popFirst()
let decoded = json_serialization.decode(Json, unescape(elem), T, allowUnknownFields = true)
resQueue.addLastNoWait(decoded)
asyncSpawn getter()
resQueue
proc publish*(c: Client, topic: string, content: string) {.async.} =
## Publish `content` to `topic`
let r = await c.request(Request(
publish: some PublishRequest(
@ -206,6 +222,10 @@ proc publish*(c: Client, topic, content: string) {.async.} =
)
))
proc publish*[T](c: Client, topic: string, content: T) {.async.} =
mixin toJson
await c.publish(topic, content.toJson())
proc param*[T](c: Client, _: type[T], name: string): T =
let params = getEnv("TEST_INSTANCE_PARAMS").split("|").mapIt(it.split("=", 2)).mapIt((it[0], it[1])).toTable()
@ -250,7 +270,7 @@ proc runner(todo: proc(c: Client): Future[void] {.gcsafe.}) {.async.} =
if parsed.id in c.requests:
c.requests[parsed.id].complete(parsed)
elif parsed.id in c.subbed:
c.subbed[parsed.id].addLastNoWait(parsed.subscribe.get().strip(chars = {'"'}))
c.subbed[parsed.id].addLastNoWait(parsed.subscribe.get())
else:
echo "Unknown response id!!!", parsed.id