first commit
This commit is contained in:
commit
a927e0fa15
|
@ -0,0 +1,21 @@
|
|||
<h3 align="center"><a href="https://github.com/testground/testground/">Testground</a> SDK for <a href="https://nim-lang.org/">Nim</a></h3>
|
||||
|
||||
Quickstart:
|
||||
```sh
|
||||
# Assuming testground is correctly setup
|
||||
git clone https://github.com/status-im/testground-nim-sdk.git
|
||||
cd testground-nim-sdk
|
||||
testground plan import --name demo --from examples/simple_tcp_ping
|
||||
testground run single --plan=demo --testcase=simple_tcp_ping --runner=local:docker --builder=docker:generic --instances=2
|
||||
```
|
||||
|
||||
|
||||
This is not stable in any shape or form. Features:
|
||||
- [X] Basic communication with testground
|
||||
- [X] Logging
|
||||
- [X] Network configuration
|
||||
- [X] Signal, barrier
|
||||
- [X] PubSub
|
||||
- [ ] Run configuration
|
||||
- [ ] Multiple scenario in a single plan
|
||||
- [ ] ..
|
|
@ -0,0 +1,11 @@
|
|||
FROM nimlang/nim:alpine as builder
|
||||
|
||||
# "checkpoint" to avoid installing dependencies every time
|
||||
# this is optional
|
||||
RUN nimble install -y "https://github.com/status-im/testground-nim-sdk"
|
||||
FROM builder
|
||||
|
||||
COPY . .
|
||||
RUN cd plan && nimble install -d && nim c -d:chronicles_log_level=NOTICE main.nim
|
||||
|
||||
ENTRYPOINT ["plan/main"]
|
|
@ -0,0 +1,37 @@
|
|||
import std/strutils, testground_sdk, chronos, stew/byteutils
|
||||
|
||||
testground(client):
|
||||
let
|
||||
myId = await client.signalAndWait("setup", client.testInstanceCount)
|
||||
myIp = client.testSubnet.split('.')[0..1].join(".") & ".1." & $myId
|
||||
serverIp = client.testSubnet.split('.')[0..1].join(".") & ".1.1"
|
||||
await client.updateNetworkParameter(
|
||||
NetworkConf(
|
||||
network: "default",
|
||||
ipv4: some myIp & "/24",
|
||||
enable: true,
|
||||
callback_state: "network_setup",
|
||||
callback_target: some client.testInstanceCount,
|
||||
routing_policy: "accept_all",
|
||||
)
|
||||
)
|
||||
|
||||
await client.waitForBarrier("network_setup", client.testInstanceCount)
|
||||
|
||||
const payload = "Hello playground!"
|
||||
if myId == 1: # server
|
||||
let
|
||||
server = createStreamServer(initTAddress(myIp & ":5050"), flags = {ReuseAddr})
|
||||
connection = await server.accept()
|
||||
|
||||
doAssert (await connection.write(payload.toBytes())) == payload.len
|
||||
connection.close()
|
||||
|
||||
else: # client
|
||||
let connection = await connect(initTAddress(serverIp & ":5050"))
|
||||
var buffer: array[payload.len, byte]
|
||||
|
||||
await connection.readExactly(addr buffer[0], payload.len)
|
||||
connection.close()
|
||||
doAssert string.fromBytes(buffer) == payload
|
||||
client.recordMessage("Hourray " & $myId & "!")
|
|
@ -0,0 +1,14 @@
|
|||
name = "simple_tcp_ping"
|
||||
[defaults]
|
||||
builder = "docker:generic"
|
||||
runner = "local:docker"
|
||||
|
||||
[builders."docker:generic"]
|
||||
enabled = true
|
||||
|
||||
[runners."local:docker"]
|
||||
enabled = true
|
||||
|
||||
[[testcases]]
|
||||
name= "simple_tcp_ping"
|
||||
instances = { min = 2, default = 2, max = 2 }
|
|
@ -0,0 +1,14 @@
|
|||
# Package
|
||||
|
||||
version = "0.1.0"
|
||||
author = "The author"
|
||||
description = "Demo for another package"
|
||||
license = "MIT"
|
||||
|
||||
|
||||
# Dependencies
|
||||
|
||||
requires "nim >= 1.2.2",
|
||||
"https://github.com/status-im/testground-nim-sdk",
|
||||
"stew",
|
||||
"chronos"
|
|
@ -0,0 +1,259 @@
|
|||
import
|
||||
std/[tables, parseutils, strutils, os],
|
||||
chronos, websock/websock, chronicles, stew/byteutils
|
||||
|
||||
# workaround https://github.com/status-im/nim-serialization/issues/43
|
||||
from serialization import serializedFieldName
|
||||
from json_serialization import Reader, init, readValue, handleReadException
|
||||
import json_serialization/std/options
|
||||
|
||||
export chronos, options, chronicles, websock
|
||||
|
||||
type
|
||||
Client* = ref object
|
||||
requestId: int
|
||||
requests: Table[string, Future[Response]]
|
||||
subbed: Table[string, AsyncQueue[string]]
|
||||
connection: WSSession
|
||||
testRun: string
|
||||
testPlan: string
|
||||
testCase: string
|
||||
testInstanceCount: int
|
||||
testGroupId: string
|
||||
testHostname: string
|
||||
testSubnet: string
|
||||
|
||||
LinkShape* = object
|
||||
latency: int
|
||||
jitter: int
|
||||
bandwidth: int
|
||||
loss: float
|
||||
corrupt: float
|
||||
corrupt_corr: float
|
||||
reorder: float
|
||||
reorder_corr: float
|
||||
duplicate: float
|
||||
duplicate_corr: float
|
||||
|
||||
NetworkConf* = object
|
||||
network: string
|
||||
ipv4 {.serializedFieldName: "IPv4".}: Option[string]
|
||||
ipv6 {.serializedFieldName: "IPv6".}: Option[string]
|
||||
enable: bool
|
||||
default: LinkShape
|
||||
callback_state: string
|
||||
callback_target: Option[int]
|
||||
routing_policy: string
|
||||
|
||||
SignalEntryRequest* = object
|
||||
state: string
|
||||
|
||||
SignalEntryResponse* = object
|
||||
seq: int
|
||||
|
||||
BarrierRequest* = object
|
||||
state: string
|
||||
target: int
|
||||
|
||||
Subscribe* = object
|
||||
topic: string
|
||||
|
||||
PublishRequest* = object
|
||||
topic: string
|
||||
event {.serializedFieldName: "payload".}: Option[Event]
|
||||
networkConf {.serializedFieldName: "payload".}: Option[NetworkConf]
|
||||
payload: Option[string]
|
||||
|
||||
Event* = object
|
||||
#workaround https://github.com/status-im/nim-json-serialization/pull/50
|
||||
dummy: string
|
||||
success_event: Option[SuccessEvent]
|
||||
message_event: Option[MessageEvent]
|
||||
|
||||
StdoutEvent = object
|
||||
event: Event
|
||||
|
||||
MessageEvent* = object
|
||||
message: string
|
||||
|
||||
SuccessEvent* = object
|
||||
group: string
|
||||
|
||||
Request* = object
|
||||
id: string
|
||||
signal_entry: Option[SignalEntryRequest]
|
||||
publish: Option[PublishRequest]
|
||||
barrier: Option[BarrierRequest]
|
||||
subscribe: Option[Subscribe]
|
||||
|
||||
Response* = object
|
||||
id: string
|
||||
signal_entry: Option[SignalEntryResponse]
|
||||
subscribe: Option[string]
|
||||
error: string
|
||||
|
||||
proc request(c: Client, r: Request): Future[Response] {.async.} =
|
||||
let retFut = newFuture[Response]("sendRequest")
|
||||
var r2 = r
|
||||
|
||||
r2.id = $c.requestId
|
||||
c.requestId.inc
|
||||
c.requests[r2.id] = retFut
|
||||
await c.connection.send(r2.toJson())
|
||||
|
||||
#echo "sending:", r2.toJson()
|
||||
|
||||
return await retFut
|
||||
|
||||
proc getContext(c: Client): string =
|
||||
"run:" & c.testRun & ":plan:" & c.testPlan & ":case:" & c.testCase
|
||||
proc getState(c: Client, state: string): string =
|
||||
c.getContext & ":states:" & state
|
||||
|
||||
proc signal*(c: Client, state: string): Future[int] {.async.} =
|
||||
## Signal that we reached `state`, returns how many times
|
||||
## `state` was reached
|
||||
let r = await c.request(Request(
|
||||
signal_entry: some(SignalEntryRequest(
|
||||
state: c.getState(state)
|
||||
))
|
||||
))
|
||||
|
||||
return r.signal_entry.get().seq
|
||||
|
||||
proc waitForBarrier*(c: Client, state: string, target: int) {.async.} =
|
||||
## Wait for `state` to be reached `target` times
|
||||
discard await c.request(Request(
|
||||
barrier: some(BarrierRequest(
|
||||
state: c.getState(state),
|
||||
target: target
|
||||
))
|
||||
))
|
||||
|
||||
proc signalAndWait*(c: Client, state: string, target: int): Future[int] {.async.} =
|
||||
## Signal that we reached `state`, wait for it to be reached `target` times,
|
||||
## and returns how much time it was reached before us
|
||||
result = await c.signal(state)
|
||||
await c.waitForBarrier(state, target)
|
||||
|
||||
proc success(c: Client): Future[int] {.async.} =
|
||||
let r = await c.request(Request(
|
||||
publish: some(PublishRequest(
|
||||
topic: c.getContext() & ":run_events",
|
||||
event: some Event(
|
||||
success_event: some(SuccessEvent(
|
||||
group: c.testGroupId
|
||||
))
|
||||
)
|
||||
))
|
||||
))
|
||||
|
||||
|
||||
echo StdoutEvent(
|
||||
event: Event(
|
||||
success_event: some(SuccessEvent(
|
||||
group: c.testGroupId
|
||||
))
|
||||
)
|
||||
).toJson()
|
||||
|
||||
proc recordMessage*(c: Client, s: string) =
|
||||
## Record message
|
||||
let e = StdoutEvent(
|
||||
event: Event(
|
||||
message_event: some(MessageEvent(message: s))
|
||||
)
|
||||
)
|
||||
echo e.toJson()
|
||||
|
||||
proc waitNetwork*(c: Client) {.async.} =
|
||||
## Wait for network to be setup
|
||||
if getEnv("TEST_SIDECAR") == "true":
|
||||
await c.waitForBarrier("network-initialized", c.testInstanceCount)
|
||||
|
||||
# magic value
|
||||
c.recordMessage("network initialisation successful")
|
||||
|
||||
proc updateNetworkParameter*(c: Client, n: NetworkConf) {.async.} =
|
||||
let r = await c.request(Request(
|
||||
publish: some PublishRequest(
|
||||
topic: c.getContext() & ":topics:network:" & c.testHostname,
|
||||
networkConf: some n
|
||||
)
|
||||
))
|
||||
|
||||
proc subscribe*(c: Client, topic: string): AsyncQueue[string] =
|
||||
## Subscribe to `topic`. Returns a queue that will be filled with each new entry
|
||||
let id = c.requestId
|
||||
|
||||
c.requestId.inc
|
||||
result = newAsyncQueue[string](1000)
|
||||
c.subbed[$id] = result
|
||||
asyncSpawn c.connection.send(Request(
|
||||
id: $id,
|
||||
subscribe: some Subscribe(
|
||||
topic: c.getContext() & ":topics:" & topic
|
||||
)
|
||||
).toJson())
|
||||
|
||||
proc publish*(c: Client, topic, content: string) {.async.} =
|
||||
## Publish `content` to `topic`
|
||||
let r = await c.request(Request(
|
||||
publish: some PublishRequest(
|
||||
topic: c.getContext() & ":topics:" & topic,
|
||||
payload: some content
|
||||
)
|
||||
))
|
||||
|
||||
proc runner(todo: proc(c: Client): Future[void]) {.async.} =
|
||||
let
|
||||
c = Client(
|
||||
testRun: getEnv("TEST_RUN"),
|
||||
testPlan: getEnv("TEST_PLAN"),
|
||||
testCase: getEnv("TEST_CASE"),
|
||||
testGroupId: getEnv("TEST_GROUP_ID"),
|
||||
testHostname: getEnv("HOSTNAME"),
|
||||
testSubnet: getEnv("TEST_SUBNET")
|
||||
)
|
||||
discard parseInt(getEnv("TEST_INSTANCE_COUNT"), c.testInstanceCount)
|
||||
let
|
||||
serviceHost = if existsEnv("SYNC_SERVICE_HOST"): getEnv("SYNC_SERVICE_HOST") else: "testground-sync-service"
|
||||
servicePort = if existsEnv("SYNC_SERVICE_PORT"): getEnv("SYNC_SERVICE_PORT") else: "5050"
|
||||
fullWsAddress = serviceHost & ":" & servicePort
|
||||
|
||||
c.connection = await WebSocket.connect(fullWsAddress, "/")
|
||||
|
||||
defer: await c.connection.close()
|
||||
|
||||
proc readLoop {.async.} =
|
||||
while true:
|
||||
let
|
||||
v = string.fromBytes(await c.connection.recvMsg())
|
||||
parsed = json_serialization.decode(Json, v, Response, allowUnknownFields = true)
|
||||
|
||||
#echo "got:", v
|
||||
if parsed.id in c.requests:
|
||||
c.requests[parsed.id].complete(parsed)
|
||||
elif parsed.id in c.subbed:
|
||||
c.subbed[parsed.id].addLastNoWait(parsed.subscribe.get().strip(chars = {'"'}))
|
||||
else:
|
||||
echo "Unknown response id!!!", parsed.id
|
||||
|
||||
let
|
||||
theTest = todo(c)
|
||||
loop = readLoop()
|
||||
|
||||
await theTest or loop
|
||||
await theTest.cancelAndWait()
|
||||
await loop.cancelAndWait()
|
||||
|
||||
template testground*(c: untyped, b: untyped) =
|
||||
proc todo(c: Client) {.async.} =
|
||||
await c.waitNetwork()
|
||||
b
|
||||
|
||||
discard await c.success()
|
||||
|
||||
#for key, val in envPairs():
|
||||
# echo key, ":", val
|
||||
waitFor(runner(todo))
|
|
@ -0,0 +1,15 @@
|
|||
# Package
|
||||
|
||||
version = "0.1.0"
|
||||
author = "Status Research & Development GmbH"
|
||||
description = "Testground Nim SDK"
|
||||
license = "MIT"
|
||||
|
||||
|
||||
# Dependencies
|
||||
|
||||
requires "nim >= 1.6.0",
|
||||
"chronos >= 3.0.6",
|
||||
"websock",
|
||||
"stew",
|
||||
"jsonserialization"
|
Loading…
Reference in New Issue