mirror of
https://github.com/logos-messaging/nim-ffi.git
synced 2026-07-02 22:29:42 +00:00
test(e2e): native + CBOR concurrency stress harness
Hammers one shared context from several threads, alternating the two ABIs of the same library: the native path (EchoRequest struct in, typed EchoResponse* back) and the CBOR path (encoded request in, CBOR map back), each verifying the echoed message round-trips. Exercises the POD deep-copy/free on the way in, the respPod deliver/free on the way out, and the request channel under contention. Run plain or with SAN=address / SAN=thread. Clean at 6 threads x 1500 iters (9000 calls per ABI) under both ASAN and TSAN — no leaks, use-after-free, or data races. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
2ec514cf9f
commit
c6c7600c6b
4
tests/e2e/c/.gitignore
vendored
Normal file
4
tests/e2e/c/.gitignore
vendored
Normal file
@ -0,0 +1,4 @@
|
||||
/stress
|
||||
/libmy_timer.dylib
|
||||
/libmy_timer.so
|
||||
/build/
|
||||
48
tests/e2e/c/Makefile
Normal file
48
tests/e2e/c/Makefile
Normal file
@ -0,0 +1,48 @@
|
||||
# Build + run the native/CBOR concurrency stress test for the timer example.
|
||||
#
|
||||
# make stress # plain build + run
|
||||
# make stress SAN=address # AddressSanitizer
|
||||
# make stress SAN=thread # ThreadSanitizer
|
||||
# make clean
|
||||
#
|
||||
# Drives the real exported ABI of the timer example library, so it links the
|
||||
# same dylib the C/Go consumers do. The library is built from the repo root so
|
||||
# its vendored Nimble dependencies resolve.
|
||||
|
||||
REPO_ROOT := $(abspath ../../..)
|
||||
NIM_SRC := $(REPO_ROOT)/examples/timer/timer.nim
|
||||
HDR_DIR := $(REPO_ROOT)/examples/timer/c_bindings
|
||||
|
||||
UNAME_S := $(shell uname -s)
|
||||
ifeq ($(UNAME_S),Darwin)
|
||||
LIBNAME := libmy_timer.dylib
|
||||
RPATH := -Wl,-rpath,.
|
||||
else
|
||||
LIBNAME := libmy_timer.so
|
||||
RPATH := -Wl,-rpath,'$$ORIGIN'
|
||||
endif
|
||||
|
||||
CC ?= cc
|
||||
CFLAGS ?= -std=c11 -Wall -Wextra -O1 -g -I$(HDR_DIR)
|
||||
NIMFLAGS := --mm:orc -d:chronicles_log_level=WARN --app:lib --noMain \
|
||||
--nimMainPrefix:libmy_timer
|
||||
|
||||
# Optional sanitizer: SAN=address | thread | undefined
|
||||
ifneq ($(SAN),)
|
||||
SANFLAGS := -fsanitize=$(SAN) -fno-omit-frame-pointer
|
||||
CFLAGS += $(SANFLAGS)
|
||||
LDSAN := $(SANFLAGS)
|
||||
NIMFLAGS += -d:useMalloc --passC:-fsanitize=$(SAN) --passL:-fsanitize=$(SAN)
|
||||
endif
|
||||
|
||||
.PHONY: stress clean
|
||||
|
||||
$(LIBNAME):
|
||||
cd $(REPO_ROOT) && nim c $(NIMFLAGS) -o:$(CURDIR)/$(LIBNAME) $(NIM_SRC)
|
||||
|
||||
stress: stress.c $(HDR_DIR)/my_timer.h $(LIBNAME)
|
||||
$(CC) $(CFLAGS) stress.c -L. -lmy_timer -lpthread $(RPATH) $(LDSAN) -o stress
|
||||
./stress
|
||||
|
||||
clean:
|
||||
rm -f stress $(LIBNAME)
|
||||
202
tests/e2e/c/stress.c
Normal file
202
tests/e2e/c/stress.c
Normal file
@ -0,0 +1,202 @@
|
||||
// Concurrency + memory stress for BOTH ABIs of one library:
|
||||
// - native (pure C): EchoRequest struct in, typed EchoResponse* back
|
||||
// - CBOR: encoded request in, CBOR EchoResponse map back
|
||||
//
|
||||
// Many threads hammer a shared context with both call shapes, each verifying
|
||||
// the echoed message round-trips. Built to run under ASAN/TSAN to flush out
|
||||
// leaks, use-after-free in the POD deep-copy/free paths, and data races.
|
||||
//
|
||||
// make stress && ./stress # plain
|
||||
// make stress SAN=address # -fsanitize=address
|
||||
// make stress SAN=thread # -fsanitize=thread
|
||||
#include "my_timer.h"
|
||||
#include "my_timer_cbor.h"
|
||||
#include <pthread.h>
|
||||
#include <stdatomic.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
||||
#ifndef STRESS_THREADS
|
||||
#define STRESS_THREADS 6
|
||||
#endif
|
||||
#ifndef STRESS_ITERS
|
||||
#define STRESS_ITERS 1500
|
||||
#endif
|
||||
|
||||
static atomic_long g_ok;
|
||||
static atomic_long g_bad;
|
||||
|
||||
// --- per-call blocking capture ----------------------------------------------
|
||||
typedef struct {
|
||||
int ret, done;
|
||||
char echoed[128]; // copied out of the typed return / decoded from CBOR
|
||||
pthread_mutex_t mu;
|
||||
pthread_cond_t cv;
|
||||
} Cap;
|
||||
static void cap_init(Cap *c) {
|
||||
memset(c, 0, sizeof(*c));
|
||||
pthread_mutex_init(&c->mu, NULL);
|
||||
pthread_cond_init(&c->cv, NULL);
|
||||
}
|
||||
static void cap_destroy(Cap *c) {
|
||||
pthread_mutex_destroy(&c->mu);
|
||||
pthread_cond_destroy(&c->cv);
|
||||
}
|
||||
static void cap_reset(Cap *c) {
|
||||
pthread_mutex_lock(&c->mu);
|
||||
c->done = 0;
|
||||
c->echoed[0] = 0;
|
||||
pthread_mutex_unlock(&c->mu);
|
||||
}
|
||||
static void cap_wait(Cap *c) {
|
||||
pthread_mutex_lock(&c->mu);
|
||||
while (!c->done) pthread_cond_wait(&c->cv, &c->mu);
|
||||
pthread_mutex_unlock(&c->mu);
|
||||
}
|
||||
|
||||
// Native EchoResponse return: read the typed struct in-callback.
|
||||
static void native_cb(int ret, const char *msg, size_t len, void *ud) {
|
||||
Cap *c = ud;
|
||||
pthread_mutex_lock(&c->mu);
|
||||
c->ret = ret;
|
||||
if (ret == RET_OK) {
|
||||
const EchoResponse *r = (const EchoResponse *)msg;
|
||||
strncpy(c->echoed, r->echoed, sizeof(c->echoed) - 1);
|
||||
}
|
||||
c->done = 1;
|
||||
pthread_cond_signal(&c->cv);
|
||||
pthread_mutex_unlock(&c->mu);
|
||||
(void)len;
|
||||
}
|
||||
|
||||
// --- minimal CBOR map text reader (for the CBOR EchoResponse) ---------------
|
||||
static size_t cbor_item_len(const uint8_t *p, size_t len) {
|
||||
if (len < 1) return 0;
|
||||
uint8_t major = p[0] >> 5, info = p[0] & 0x1f;
|
||||
size_t head = 1;
|
||||
uint64_t arg = info;
|
||||
if (info == 24) { if (len < 2) return 0; arg = p[1]; head = 2; }
|
||||
else if (info == 25) { if (len < 3) return 0; arg = ((uint64_t)p[1] << 8) | p[2]; head = 3; }
|
||||
else if (info == 26) { if (len < 5) return 0; arg = ((uint64_t)p[1] << 24)|((uint64_t)p[2] << 16)|((uint64_t)p[3] << 8)|p[4]; head = 5; }
|
||||
else if (info == 27) { if (len < 9) return 0; arg = 0; for (int i = 1; i <= 8; i++) arg = (arg << 8) | p[i]; head = 9; }
|
||||
else if (info >= 28) return 0;
|
||||
switch (major) {
|
||||
case 0: case 1: case 7: return head;
|
||||
case 2: case 3: return head + (size_t)arg;
|
||||
case 4: { size_t off = head; for (uint64_t i = 0; i < arg; i++) { size_t n = cbor_item_len(p+off, len-off); if (!n) return 0; off += n; } return off; }
|
||||
case 5: { size_t off = head; for (uint64_t i = 0; i < arg*2; i++) { size_t n = cbor_item_len(p+off, len-off); if (!n) return 0; off += n; } return off; }
|
||||
default: return 0;
|
||||
}
|
||||
}
|
||||
static int cbor_get_text(const uint8_t *p, size_t len, const char *key, char *out, size_t cap) {
|
||||
if (len < 1 || (p[0] >> 5) != 5) return 0;
|
||||
uint8_t info = p[0] & 0x1f;
|
||||
if (info >= 24) return 0;
|
||||
size_t off = 1, klen = strlen(key);
|
||||
for (uint64_t i = 0; i < info; i++) {
|
||||
const uint8_t *k = p + off;
|
||||
if ((k[0] >> 5) != 3) return 0;
|
||||
uint8_t ki = k[0] & 0x1f;
|
||||
if (ki >= 24) return 0;
|
||||
const uint8_t *v = p + off + 1 + ki;
|
||||
if ((size_t)ki == klen && memcmp(k + 1, key, klen) == 0 && (v[0] >> 5) == 3) {
|
||||
uint8_t vi = v[0] & 0x1f;
|
||||
if (vi >= 24) return 0;
|
||||
size_t n = vi < cap - 1 ? vi : cap - 1;
|
||||
memcpy(out, v + 1, n);
|
||||
out[n] = 0;
|
||||
return 1;
|
||||
}
|
||||
size_t vn = cbor_item_len(v, len - (off + 1 + ki));
|
||||
if (!vn) return 0;
|
||||
off += 1 + ki + vn;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
// CBOR EchoResponse return: decode the "echoed" field in-callback.
|
||||
static void cbor_cb(int ret, const char *msg, size_t len, void *ud) {
|
||||
Cap *c = ud;
|
||||
pthread_mutex_lock(&c->mu);
|
||||
c->ret = ret;
|
||||
if (ret == RET_OK)
|
||||
cbor_get_text((const uint8_t *)msg, len, "echoed", c->echoed, sizeof(c->echoed));
|
||||
c->done = 1;
|
||||
pthread_cond_signal(&c->cv);
|
||||
pthread_mutex_unlock(&c->mu);
|
||||
}
|
||||
|
||||
struct Args { void *ctx; int id; };
|
||||
|
||||
static void *worker(void *p) {
|
||||
struct Args *a = p;
|
||||
Cap cap;
|
||||
cap_init(&cap);
|
||||
char want[128];
|
||||
for (int i = 0; i < STRESS_ITERS; i++) {
|
||||
snprintf(want, sizeof(want), "t%d-i%d", a->id, i);
|
||||
if (i & 1) {
|
||||
// native: EchoRequest in, typed EchoResponse* back
|
||||
cap_reset(&cap);
|
||||
EchoRequest req = {.message = want, .delayMs = 0};
|
||||
if (my_timer_echo(a->ctx, native_cb, &cap, req) == RET_OK) cap_wait(&cap);
|
||||
} else {
|
||||
// CBOR: { "req": { "message": want, "delayMs": 0 } }
|
||||
cap_reset(&cap);
|
||||
FfiCbor e = ffi_cbor_new();
|
||||
ffi_cbor_map(&e, 1);
|
||||
ffi_cbor_text(&e, "req");
|
||||
ffi_cbor_map(&e, 2);
|
||||
ffi_cbor_kv_text(&e, "message", want);
|
||||
ffi_cbor_kv_int(&e, "delayMs", 0);
|
||||
if (my_timer_echo_cbor(a->ctx, cbor_cb, &cap, e.buf, e.len) == RET_OK)
|
||||
cap_wait(&cap);
|
||||
ffi_cbor_free(&e);
|
||||
}
|
||||
if (cap.ret == RET_OK && strcmp(cap.echoed, want) == 0)
|
||||
atomic_fetch_add(&g_ok, 1);
|
||||
else
|
||||
atomic_fetch_add(&g_bad, 1);
|
||||
}
|
||||
cap_destroy(&cap);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void create_cb(int ret, const char *msg, size_t len, void *ud) {
|
||||
Cap *c = ud;
|
||||
pthread_mutex_lock(&c->mu);
|
||||
c->ret = ret;
|
||||
c->done = 1;
|
||||
pthread_cond_signal(&c->cv);
|
||||
pthread_mutex_unlock(&c->mu);
|
||||
(void)msg; (void)len;
|
||||
}
|
||||
|
||||
int main(void) {
|
||||
Cap cc;
|
||||
cap_init(&cc);
|
||||
TimerConfig cfg = {.name = "stress"};
|
||||
void *ctx = my_timer_create(cfg, create_cb, &cc);
|
||||
cap_wait(&cc);
|
||||
if (!ctx) { fprintf(stderr, "create failed\n"); return 1; }
|
||||
|
||||
pthread_t th[STRESS_THREADS];
|
||||
struct Args args[STRESS_THREADS];
|
||||
for (int i = 0; i < STRESS_THREADS; i++) {
|
||||
args[i].ctx = ctx;
|
||||
args[i].id = i;
|
||||
pthread_create(&th[i], NULL, worker, &args[i]);
|
||||
}
|
||||
for (int i = 0; i < STRESS_THREADS; i++) pthread_join(th[i], NULL);
|
||||
|
||||
my_timer_destroy(ctx);
|
||||
cap_destroy(&cc);
|
||||
|
||||
long ok = atomic_load(&g_ok), bad = atomic_load(&g_bad);
|
||||
long total = (long)STRESS_THREADS * STRESS_ITERS;
|
||||
printf("native+cbor stress: %ld/%ld ok, %ld bad (%d threads x %d iters)\n", ok,
|
||||
total, bad, STRESS_THREADS, STRESS_ITERS);
|
||||
printf(bad == 0 && ok == total ? "PASSED\n" : "FAILED\n");
|
||||
return (bad == 0 && ok == total) ? 0 : 1;
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user