commit a927e0fa1590776da148fa009614300727357c12 Author: Tanguy Date: Thu Jul 21 17:08:23 2022 +0200 first commit diff --git a/README.md b/README.md new file mode 100644 index 0000000..501ed33 --- /dev/null +++ b/README.md @@ -0,0 +1,21 @@ +

Testground SDK for Nim

+ +Quickstart: +```sh +# Assuming testground is correctly setup +git clone https://github.com/status-im/testground-nim-sdk.git +cd testground-nim-sdk +testground plan import --name demo --from examples/simple_tcp_ping +testground run single --plan=demo --testcase=simple_tcp_ping --runner=local:docker --builder=docker:generic --instances=2 +``` + + +This is not stable in any shape or form. Features: +- [X] Basic communication with testground +- [X] Logging +- [X] Network configuration +- [X] Signal, barrier +- [X] PubSub +- [ ] Run configuration +- [ ] Multiple scenario in a single plan +- [ ] .. diff --git a/examples/simple_tcp_ping/Dockerfile b/examples/simple_tcp_ping/Dockerfile new file mode 100644 index 0000000..0f17936 --- /dev/null +++ b/examples/simple_tcp_ping/Dockerfile @@ -0,0 +1,11 @@ +FROM nimlang/nim:alpine as builder + +# "checkpoint" to avoid installing dependencies every time +# this is optional +RUN nimble install -y "https://github.com/status-im/testground-nim-sdk" +FROM builder + +COPY . . +RUN cd plan && nimble install -d && nim c -d:chronicles_log_level=NOTICE main.nim + +ENTRYPOINT ["plan/main"] diff --git a/examples/simple_tcp_ping/main.nim b/examples/simple_tcp_ping/main.nim new file mode 100644 index 0000000..4e8a609 --- /dev/null +++ b/examples/simple_tcp_ping/main.nim @@ -0,0 +1,37 @@ +import std/strutils, testground_sdk, chronos, stew/byteutils + +testground(client): + let + myId = await client.signalAndWait("setup", client.testInstanceCount) + myIp = client.testSubnet.split('.')[0..1].join(".") & ".1." & $myId + serverIp = client.testSubnet.split('.')[0..1].join(".") & ".1.1" + await client.updateNetworkParameter( + NetworkConf( + network: "default", + ipv4: some myIp & "/24", + enable: true, + callback_state: "network_setup", + callback_target: some client.testInstanceCount, + routing_policy: "accept_all", + ) + ) + + await client.waitForBarrier("network_setup", client.testInstanceCount) + + const payload = "Hello playground!" + if myId == 1: # server + let + server = createStreamServer(initTAddress(myIp & ":5050"), flags = {ReuseAddr}) + connection = await server.accept() + + doAssert (await connection.write(payload.toBytes())) == payload.len + connection.close() + + else: # client + let connection = await connect(initTAddress(serverIp & ":5050")) + var buffer: array[payload.len, byte] + + await connection.readExactly(addr buffer[0], payload.len) + connection.close() + doAssert string.fromBytes(buffer) == payload + client.recordMessage("Hourray " & $myId & "!") diff --git a/examples/simple_tcp_ping/manifest.toml b/examples/simple_tcp_ping/manifest.toml new file mode 100644 index 0000000..f0aff2d --- /dev/null +++ b/examples/simple_tcp_ping/manifest.toml @@ -0,0 +1,14 @@ +name = "simple_tcp_ping" +[defaults] +builder = "docker:generic" +runner = "local:docker" + +[builders."docker:generic"] +enabled = true + +[runners."local:docker"] +enabled = true + +[[testcases]] +name= "simple_tcp_ping" +instances = { min = 2, default = 2, max = 2 } diff --git a/examples/simple_tcp_ping/simple_tcp_ping.nimble b/examples/simple_tcp_ping/simple_tcp_ping.nimble new file mode 100644 index 0000000..dd7e0c0 --- /dev/null +++ b/examples/simple_tcp_ping/simple_tcp_ping.nimble @@ -0,0 +1,14 @@ +# Package + +version = "0.1.0" +author = "The author" +description = "Demo for another package" +license = "MIT" + + +# Dependencies + +requires "nim >= 1.2.2", + "https://github.com/status-im/testground-nim-sdk", + "stew", + "chronos" diff --git a/testground_sdk.nim b/testground_sdk.nim new file mode 100644 index 0000000..892b09f --- /dev/null +++ b/testground_sdk.nim @@ -0,0 +1,259 @@ +import + std/[tables, parseutils, strutils, os], + chronos, websock/websock, chronicles, stew/byteutils + +# workaround https://github.com/status-im/nim-serialization/issues/43 +from serialization import serializedFieldName +from json_serialization import Reader, init, readValue, handleReadException +import json_serialization/std/options + +export chronos, options, chronicles, websock + +type + Client* = ref object + requestId: int + requests: Table[string, Future[Response]] + subbed: Table[string, AsyncQueue[string]] + connection: WSSession + testRun: string + testPlan: string + testCase: string + testInstanceCount: int + testGroupId: string + testHostname: string + testSubnet: string + + LinkShape* = object + latency: int + jitter: int + bandwidth: int + loss: float + corrupt: float + corrupt_corr: float + reorder: float + reorder_corr: float + duplicate: float + duplicate_corr: float + + NetworkConf* = object + network: string + ipv4 {.serializedFieldName: "IPv4".}: Option[string] + ipv6 {.serializedFieldName: "IPv6".}: Option[string] + enable: bool + default: LinkShape + callback_state: string + callback_target: Option[int] + routing_policy: string + + SignalEntryRequest* = object + state: string + + SignalEntryResponse* = object + seq: int + + BarrierRequest* = object + state: string + target: int + + Subscribe* = object + topic: string + + PublishRequest* = object + topic: string + event {.serializedFieldName: "payload".}: Option[Event] + networkConf {.serializedFieldName: "payload".}: Option[NetworkConf] + payload: Option[string] + + Event* = object + #workaround https://github.com/status-im/nim-json-serialization/pull/50 + dummy: string + success_event: Option[SuccessEvent] + message_event: Option[MessageEvent] + + StdoutEvent = object + event: Event + + MessageEvent* = object + message: string + + SuccessEvent* = object + group: string + + Request* = object + id: string + signal_entry: Option[SignalEntryRequest] + publish: Option[PublishRequest] + barrier: Option[BarrierRequest] + subscribe: Option[Subscribe] + + Response* = object + id: string + signal_entry: Option[SignalEntryResponse] + subscribe: Option[string] + error: string + +proc request(c: Client, r: Request): Future[Response] {.async.} = + let retFut = newFuture[Response]("sendRequest") + var r2 = r + + r2.id = $c.requestId + c.requestId.inc + c.requests[r2.id] = retFut + await c.connection.send(r2.toJson()) + + #echo "sending:", r2.toJson() + + return await retFut + +proc getContext(c: Client): string = + "run:" & c.testRun & ":plan:" & c.testPlan & ":case:" & c.testCase +proc getState(c: Client, state: string): string = + c.getContext & ":states:" & state + +proc signal*(c: Client, state: string): Future[int] {.async.} = + ## Signal that we reached `state`, returns how many times + ## `state` was reached + let r = await c.request(Request( + signal_entry: some(SignalEntryRequest( + state: c.getState(state) + )) + )) + + return r.signal_entry.get().seq + +proc waitForBarrier*(c: Client, state: string, target: int) {.async.} = + ## Wait for `state` to be reached `target` times + discard await c.request(Request( + barrier: some(BarrierRequest( + state: c.getState(state), + target: target + )) + )) + +proc signalAndWait*(c: Client, state: string, target: int): Future[int] {.async.} = + ## Signal that we reached `state`, wait for it to be reached `target` times, + ## and returns how much time it was reached before us + result = await c.signal(state) + await c.waitForBarrier(state, target) + +proc success(c: Client): Future[int] {.async.} = + let r = await c.request(Request( + publish: some(PublishRequest( + topic: c.getContext() & ":run_events", + event: some Event( + success_event: some(SuccessEvent( + group: c.testGroupId + )) + ) + )) + )) + + + echo StdoutEvent( + event: Event( + success_event: some(SuccessEvent( + group: c.testGroupId + )) + ) + ).toJson() + +proc recordMessage*(c: Client, s: string) = + ## Record message + let e = StdoutEvent( + event: Event( + message_event: some(MessageEvent(message: s)) + ) + ) + echo e.toJson() + +proc waitNetwork*(c: Client) {.async.} = + ## Wait for network to be setup + if getEnv("TEST_SIDECAR") == "true": + await c.waitForBarrier("network-initialized", c.testInstanceCount) + + # magic value + c.recordMessage("network initialisation successful") + +proc updateNetworkParameter*(c: Client, n: NetworkConf) {.async.} = + let r = await c.request(Request( + publish: some PublishRequest( + topic: c.getContext() & ":topics:network:" & c.testHostname, + networkConf: some n + ) + )) + +proc subscribe*(c: Client, topic: string): AsyncQueue[string] = + ## Subscribe to `topic`. Returns a queue that will be filled with each new entry + let id = c.requestId + + c.requestId.inc + result = newAsyncQueue[string](1000) + c.subbed[$id] = result + asyncSpawn c.connection.send(Request( + id: $id, + subscribe: some Subscribe( + topic: c.getContext() & ":topics:" & topic + ) + ).toJson()) + +proc publish*(c: Client, topic, content: string) {.async.} = + ## Publish `content` to `topic` + let r = await c.request(Request( + publish: some PublishRequest( + topic: c.getContext() & ":topics:" & topic, + payload: some content + ) + )) + +proc runner(todo: proc(c: Client): Future[void]) {.async.} = + let + c = Client( + testRun: getEnv("TEST_RUN"), + testPlan: getEnv("TEST_PLAN"), + testCase: getEnv("TEST_CASE"), + testGroupId: getEnv("TEST_GROUP_ID"), + testHostname: getEnv("HOSTNAME"), + testSubnet: getEnv("TEST_SUBNET") + ) + discard parseInt(getEnv("TEST_INSTANCE_COUNT"), c.testInstanceCount) + let + serviceHost = if existsEnv("SYNC_SERVICE_HOST"): getEnv("SYNC_SERVICE_HOST") else: "testground-sync-service" + servicePort = if existsEnv("SYNC_SERVICE_PORT"): getEnv("SYNC_SERVICE_PORT") else: "5050" + fullWsAddress = serviceHost & ":" & servicePort + + c.connection = await WebSocket.connect(fullWsAddress, "/") + + defer: await c.connection.close() + + proc readLoop {.async.} = + while true: + let + v = string.fromBytes(await c.connection.recvMsg()) + parsed = json_serialization.decode(Json, v, Response, allowUnknownFields = true) + + #echo "got:", v + if parsed.id in c.requests: + c.requests[parsed.id].complete(parsed) + elif parsed.id in c.subbed: + c.subbed[parsed.id].addLastNoWait(parsed.subscribe.get().strip(chars = {'"'})) + else: + echo "Unknown response id!!!", parsed.id + + let + theTest = todo(c) + loop = readLoop() + + await theTest or loop + await theTest.cancelAndWait() + await loop.cancelAndWait() + +template testground*(c: untyped, b: untyped) = + proc todo(c: Client) {.async.} = + await c.waitNetwork() + b + + discard await c.success() + + #for key, val in envPairs(): + # echo key, ":", val + waitFor(runner(todo)) diff --git a/testground_sdk.nimble b/testground_sdk.nimble new file mode 100644 index 0000000..7e33163 --- /dev/null +++ b/testground_sdk.nimble @@ -0,0 +1,15 @@ +# Package + +version = "0.1.0" +author = "Status Research & Development GmbH" +description = "Testground Nim SDK" +license = "MIT" + + +# Dependencies + +requires "nim >= 1.6.0", + "chronos >= 3.0.6", + "websock", + "stew", + "jsonserialization"