From 9fba8111179c2ace7a2b595d8ed8cb10f0bf9c21 Mon Sep 17 00:00:00 2001 From: Tanguy Date: Wed, 2 Nov 2022 20:29:29 +0100 Subject: [PATCH] Typed pubsub (#3) --- examples/simple_tcp_ping/main.nim | 13 ++++++++++++- testground_sdk.nim | 26 +++++++++++++++++++++++--- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/examples/simple_tcp_ping/main.nim b/examples/simple_tcp_ping/main.nim index a11ff60..d316442 100644 --- a/examples/simple_tcp_ping/main.nim +++ b/examples/simple_tcp_ping/main.nim @@ -1,4 +1,9 @@ -import std/strutils, testground_sdk, chronos, stew/byteutils +import std/strutils, std/random +import testground_sdk, chronos, stew/byteutils + +type + AwesomeStruct = object + rand: int testground(client): let @@ -18,6 +23,12 @@ testground(client): await client.waitForBarrier("network_setup", client.testInstanceCount) + randomize() + await client.publish("rands", AwesomeStruct(rand: rand(100))) + let randomValues = client.subscribe("rands", AwesomeStruct) + for _ in 0 ..< 2: + echo await randomValues.popFirst() + let payload = client.param(string, "payload") count = client.param(int, "count") diff --git a/testground_sdk.nim b/testground_sdk.nim index 0c30b0a..a79c9fd 100644 --- a/testground_sdk.nim +++ b/testground_sdk.nim @@ -1,5 +1,5 @@ import - std/[tables, parseutils, strutils, os, sequtils], + std/[tables, parseutils, strutils, os, sequtils, json], chronos, websock/websock, chronicles, stew/byteutils # workaround https://github.com/status-im/nim-serialization/issues/43 @@ -9,6 +9,7 @@ import json_serialization/std/options export sequtils, strutils, os, tables export chronos, options, chronicles, websock +export json_serialization type Client* = ref object @@ -197,7 +198,22 @@ proc subscribe*(c: Client, topic: string): AsyncQueue[string] = ) ).toJson()) -proc publish*(c: Client, topic, content: string) {.async.} = +proc subscribe*[T](c: Client, topic: string, _: type[T]): AsyncQueue[T] = + var + theQueue = c.subscribe(topic) + resQueue = newAsyncQueue[T](1000) + proc getter {.async.} = + mixin decode + while true: + let elem = await theQueue.popFirst() + + let decoded = json_serialization.decode(Json, unescape(elem), T, allowUnknownFields = true) + resQueue.addLastNoWait(decoded) + + asyncSpawn getter() + resQueue + +proc publish*(c: Client, topic: string, content: string) {.async.} = ## Publish `content` to `topic` let r = await c.request(Request( publish: some PublishRequest( @@ -206,6 +222,10 @@ proc publish*(c: Client, topic, content: string) {.async.} = ) )) +proc publish*[T](c: Client, topic: string, content: T) {.async.} = + mixin toJson + await c.publish(topic, content.toJson()) + proc param*[T](c: Client, _: type[T], name: string): T = let params = getEnv("TEST_INSTANCE_PARAMS").split("|").mapIt(it.split("=", 2)).mapIt((it[0], it[1])).toTable() @@ -250,7 +270,7 @@ proc runner(todo: proc(c: Client): Future[void] {.gcsafe.}) {.async.} = if parsed.id in c.requests: c.requests[parsed.id].complete(parsed) elif parsed.id in c.subbed: - c.subbed[parsed.id].addLastNoWait(parsed.subscribe.get().strip(chars = {'"'})) + c.subbed[parsed.id].addLastNoWait(parsed.subscribe.get()) else: echo "Unknown response id!!!", parsed.id