feat: libstorage metrics potential fixes (#1458)

## Root Cause

The most likely failure is in the C test harness, not `storage_space`
itself.

The callback writes `r->ret` before it finishes writing `r->msg`, and
the polling thread treats `ret != -1` as “response is complete”. That
can make `is_resp_ok` return with `res == NULL`, after which
`check_space` calls `strstr(res, ...)` and segfaults.

The segfault is very likely a race in `tests/cbindings/storage.c`.

In `callback`:

```c
r->ret = ret;
...
r->msg = malloc(len + 1);
memcpy(r->msg, msg, len);
```

But `wait_resp` treats `r->ret != -1` as “the response is ready”:

```c
while (get_ret(r) == -1 && retries < MAX_RETRIES)
```

So this can happen:

1. Storage worker thread enters `callback`.
2. Callback sets `r->ret = RET_OK`.
3. Main C test thread sees `ret != -1` and exits `wait_resp`.
4. `is_resp_ok` observes `r->msg == NULL`, so `res` remains `NULL`.
5. `check_space` calls:

```c
strstr(res, "totalBlocks")
```

6. `strstr(NULL, ...)` segfaults.

It can also be worse: `is_resp_ok` may call `free_resp(r)` while the
callback is still writing into `r`, causing use-after-free.

## Why It Appears At `check_space`

This is probably timing. Earlier responses happened to finish copying
before the polling thread observed `ret`; `check_space` just hit the
race.

The new metrics PR did not reach `check_get_metrics`; the crash happens
before it. But since the PR now makes `make testLibstorage` run the C
test consistently, it exposes this existing test harness race.

## Minimal Fix

1. In `callback`, set `r->ret = ret` last, after `msg`, `chunk`, and
`len` are fully populated.
2. Add defensive `res == NULL` checks in tests like `check_space`,
similar to `check_get_metrics`.

## Longer-Term Fix

Ideally, replace polling shared fields with a mutex/condition variable
or atomics, because the current cross-thread access is technically a C
data race.
```

For point 3: the current fix improves ordering, but it still relies on unsynchronized reads/writes between threads. In C, one thread writing `r->ret`, `r->msg`, and `r->len` while another thread reads them without atomics or a lock is undefined behavior. Practically, it may work most of the time, but the compiler/CPU are not required to make those writes visible in the order we expect.

A proper fix would make `Resp` thread-safe. The cleanest approach is usually:

```c
typedef struct
{
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    bool done;
    int ret;
    char *msg;
    char *chunk;
    size_t len;
} Resp;
```

Then:

- `callback` locks the mutex.
- It writes `msg`, `chunk`, `len`, and `ret`.
- It sets `done = true`.
- It signals the condition variable.
- It unlocks.

And:

- `wait_resp` locks the mutex.
- It waits on the condition variable until `done == true` or timeout.
- It unlocks.

This removes the sleep-based polling and gives the test a real happens-before relationship between callback writes and test-thread reads.
This commit is contained in:
Marcin Czenko 2026-06-18 20:24:46 +02:00 committed by GitHub
commit 6acb78b6ef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 192 additions and 56 deletions

View File

@ -85,6 +85,7 @@ endif
test \
testAll \
testIntegration \
testLibstorageC \
testLibstorage \
update

View File

@ -104,7 +104,6 @@ task test, "Run tests":
task testAll, "Run all tests (except for Taiko L2 tests)":
testStorageTask()
testLibstorageTask()
testIntegrationTask()
import strutils

View File

@ -1,11 +1,28 @@
import std/[json, locks, times, sets]
import std/[json, locks, strutils, times, sets]
import pkg/metrics
proc jsonHelp(collector: Collector): string =
let prefix = "# HELP " & collector.name & " "
if collector.help.startsWith(prefix):
return collector.help[prefix.len .. ^1].strip(leading = false, trailing = true)
return ""
proc jsonType(collector: Collector): string =
let parts = collector.typ.splitWhitespace()
if parts.len == 4 and parts[0] == "#" and parts[1] == "TYPE" and
parts[2] == collector.name:
return parts[3]
return "unknown"
proc toJson(collector: Collector, metrics: var seq[JsonNode]) =
# We know the closure won't outlive `metrics` so this is
# an acceptable hack.
let metricsPtr = addr metrics
let help = collector.jsonHelp()
let typ = collector.jsonType()
proc serializeMetric(
name: string,
@ -22,7 +39,9 @@ proc toJson(collector: Collector, metrics: var seq[JsonNode]) =
for i in 0 ..< min(labelValues.len, labels.len):
labelMap[labels[i]] = %labelValues[i]
metricsPtr[].add(%*{"name": name, "value": value, "labels": labelMap})
metricsPtr[].add(
%*{"name": name, "type": typ, "help": help, "value": value, "labels": labelMap}
)
collector.collect(serializeMetric)

View File

@ -95,8 +95,7 @@ when isMainModule:
config.dataDir / config.netPrivKeyFile
privateKey = setupKey(keyPath).valueOr:
fatal "Failed to set up the network private key",
path = keyPath, err = error.msg
fatal "Failed to set up the network private key", path = keyPath, err = error.msg
quit QuitFailure
server =

View File

@ -1,8 +1,11 @@
#include <errno.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <stdbool.h>
#include <time.h>
#include <unistd.h>
#include "../../library/libstorage.h"
@ -51,6 +54,9 @@
typedef struct
{
pthread_mutex_t mutex;
pthread_cond_t cond;
bool done;
int ret;
char *msg;
char *chunk;
@ -60,6 +66,9 @@ typedef struct
static Resp *alloc_resp(void)
{
Resp *r = (Resp *)calloc(1, sizeof(Resp));
pthread_mutex_init(&r->mutex, NULL);
pthread_cond_init(&r->cond, NULL);
r->done = false;
r->msg = NULL;
r->chunk = NULL;
r->ret = -1;
@ -83,6 +92,8 @@ static void free_resp(Resp *r)
free(r->chunk);
}
pthread_cond_destroy(&r->cond);
pthread_mutex_destroy(&r->mutex);
free(r);
}
@ -93,7 +104,11 @@ static int get_ret(Resp *r)
return RET_ERR;
}
return r->ret;
pthread_mutex_lock(&r->mutex);
int ret = r->ret;
pthread_mutex_unlock(&r->mutex);
return ret;
}
// wait_resp waits until the async response is ready or max retries is reached.
@ -101,13 +116,33 @@ static int get_ret(Resp *r)
// indicate that the response is ready to be consumed.
static void wait_resp(Resp *r)
{
int retries = 0;
while (get_ret(r) == -1 && retries < MAX_RETRIES)
if (!r)
{
usleep(1000 * 100); // 100 ms
retries++;
return;
}
const long timeout_ms = MAX_RETRIES * 100;
struct timespec deadline;
clock_gettime(CLOCK_REALTIME, &deadline);
deadline.tv_sec += timeout_ms / 1000;
deadline.tv_nsec += (timeout_ms % 1000) * 1000000;
if (deadline.tv_nsec >= 1000000000)
{
deadline.tv_sec += 1;
deadline.tv_nsec -= 1000000000;
}
pthread_mutex_lock(&r->mutex);
while (!r->done)
{
int rc = pthread_cond_timedwait(&r->cond, &r->mutex, &deadline);
if (rc == ETIMEDOUT)
{
break;
}
}
pthread_mutex_unlock(&r->mutex);
}
// is_resp_ok checks if the async response indicates success.
@ -122,6 +157,8 @@ static int is_resp_ok(Resp *r, char **res)
wait_resp(r);
pthread_mutex_lock(&r->mutex);
int ret = (r->ret == RET_OK) ? RET_OK : RET_ERR;
// If a response pointer is provided, its safe to initialize it to NULL.
@ -142,6 +179,8 @@ static int is_resp_ok(Resp *r, char **res)
*res = strdup(r->msg);
}
pthread_mutex_unlock(&r->mutex);
free_resp(r);
return ret;
@ -168,8 +207,7 @@ static void callback(int ret, const char *msg, size_t len, void *userData)
return;
}
// Assign the return code to the response structure.
r->ret = ret;
pthread_mutex_lock(&r->mutex);
// If the reponse already has a message, just free it first.
if (r->msg)
@ -187,8 +225,8 @@ static void callback(int ret, const char *msg, size_t len, void *userData)
r->len = len;
}
// For other cases, copy the message data.
if (msg && len > 0)
// For terminal responses, copy the message data.
if (ret != RET_PROGRESS && msg && len > 0)
{
// Allocate memory for the message plus null terminator.
r->msg = (char *)malloc(len + 1);
@ -197,6 +235,10 @@ static void callback(int ret, const char *msg, size_t len, void *userData)
if (!r->msg)
{
r->len = 0;
r->ret = RET_ERR;
r->done = true;
pthread_cond_signal(&r->cond);
pthread_mutex_unlock(&r->mutex);
return;
}
@ -208,11 +250,25 @@ static void callback(int ret, const char *msg, size_t len, void *userData)
r->len = len;
}
else
else if (ret != RET_PROGRESS)
{
r->msg = NULL;
r->len = 0;
}
// Progress updates are intermediate callbacks. Keep any copied chunk data,
// but wait for the final RET_OK/RET_ERR before completing the response.
if (ret == RET_PROGRESS)
{
pthread_mutex_unlock(&r->mutex);
return;
}
// Publish completion last so wait_resp can only observe a fully written Resp.
r->ret = ret;
r->done = true;
pthread_cond_signal(&r->cond);
pthread_mutex_unlock(&r->mutex);
}
static int read_file(const char *filepath, char **res)
@ -256,7 +312,7 @@ int setup(void **storage_ctx)
wait_resp(r);
if (r->ret != RET_OK)
if (get_ret(r) != RET_OK)
{
free_resp(r);
return RET_ERR;
@ -343,9 +399,9 @@ int check_repo(void *storage_ctx)
int ret = is_resp_ok(r, &res);
if (strcmp(res, "./data-dir") != 0)
if (res == NULL || strcmp(res, "./data-dir") != 0)
{
printf("repo mismatch: %s\n", res);
printf("repo mismatch: %s\n", res ? res : "(null)");
ret = RET_ERR;
}
@ -368,9 +424,9 @@ int check_debug(void *storage_ctx)
int ret = is_resp_ok(r, &res);
// Simple check to ensure the response contains spr
if (strstr(res, "spr") == NULL)
if (res == NULL || strstr(res, "spr") == NULL)
{
fprintf(stderr, "debug content mismatch, res:%s\n", res);
fprintf(stderr, "debug content mismatch, res:%s\n", res ? res : "(null)");
ret = RET_ERR;
}
@ -392,9 +448,9 @@ int check_spr(void *storage_ctx)
int ret = is_resp_ok(r, &res);
if (strstr(res, "spr") == NULL)
if (res == NULL || strstr(res, "spr") == NULL)
{
fprintf(stderr, "spr content mismatch, res:%s\n", res);
fprintf(stderr, "spr content mismatch, res:%s\n", res ? res : "(null)");
ret = RET_ERR;
}
@ -562,7 +618,7 @@ int check_upload_file(void *storage_ctx, const char *filepath, char **res)
int ret = is_resp_ok(r, res);
if (res == NULL || strlen(*res) == 0)
if (res == NULL || *res == NULL || strlen(*res) == 0)
{
fprintf(stderr, "CID is missing\n");
return RET_ERR;
@ -600,9 +656,9 @@ int check_download_stream(void *storage_ctx, const char *cid, const char *filepa
int ret = is_resp_ok(r, &res);
if (strncmp(res, "Hello World!", strlen("Hello World!")) != 0)
if (res == NULL || strncmp(res, "Hello World!", strlen("Hello World!")) != 0)
{
fprintf(stderr, "downloaded content mismatch, res:%s\n", res);
fprintf(stderr, "downloaded content mismatch, res:%s\n", res ? res : "(null)");
ret = RET_ERR;
}
@ -612,9 +668,9 @@ int check_download_stream(void *storage_ctx, const char *cid, const char *filepa
ret = RET_ERR;
}
if (strncmp(res, "Hello World!", strlen("Hello World!")) != 0)
if (res == NULL || strncmp(res, "Hello World!", strlen("Hello World!")) != 0)
{
fprintf(stderr, "downloaded content mismatch, res:%s\n", res);
fprintf(stderr, "downloaded content mismatch, res:%s\n", res ? res : "(null)");
ret = RET_ERR;
}
@ -652,9 +708,9 @@ int check_download_chunk(void *storage_ctx, const char *cid)
int ret = is_resp_ok(r, &res);
if (strncmp(res, "Hello World!", strlen("Hello World!")) != 0)
if (res == NULL || strncmp(res, "Hello World!", strlen("Hello World!")) != 0)
{
fprintf(stderr, "downloaded chunk content mismatch, res:%s\n", res);
fprintf(stderr, "downloaded chunk content mismatch, res:%s\n", res ? res : "(null)");
ret = RET_ERR;
}
@ -691,9 +747,9 @@ int check_download_manifest(void *storage_ctx, const char *cid)
const char *expected_manifest = "{\"manifestVersion\":0,\"treeCid\":\"zDzSvJTf8JYwvysKPmG7BtzpbiAHfuwFMRphxm4hdvnMJ4XPJjKX\",\"datasetSize\":12,\"blockSize\":65536,\"filename\":\"hello_world.txt\",\"mimetype\":\"text/plain\"}";
if (strncmp(res, expected_manifest, strlen(expected_manifest)) != 0)
if (res == NULL || strncmp(res, expected_manifest, strlen(expected_manifest)) != 0)
{
fprintf(stderr, "downloaded manifest content mismatch, res:%s\n", res);
fprintf(stderr, "downloaded manifest content mismatch, res:%s\n", res ? res : "(null)");
ret = RET_ERR;
}
@ -717,9 +773,9 @@ int check_list(void *storage_ctx)
const char *expected_manifest = "{\"manifestVersion\":0,\"treeCid\":\"zDzSvJTf8JYwvysKPmG7BtzpbiAHfuwFMRphxm4hdvnMJ4XPJjKX\",\"datasetSize\":12,\"blockSize\":65536,\"filename\":\"hello_world.txt\",\"mimetype\":\"text/plain\"}";
if (strstr(res, expected_manifest) == NULL)
if (res == NULL || strstr(res, expected_manifest) == NULL)
{
fprintf(stderr, "downloaded manifest content mismatch, res:%s\n", res);
fprintf(stderr, "downloaded manifest content mismatch, res:%s\n", res ? res : "(null)");
ret = RET_ERR;
}
@ -742,9 +798,9 @@ int check_space(void *storage_ctx)
int ret = is_resp_ok(r, &res);
// Simple check to ensure the response contains totalBlocks
if (strstr(res, "totalBlocks") == NULL)
if (res == NULL || strstr(res, "totalBlocks") == NULL)
{
fprintf(stderr, "list content mismatch, res:%s\n", res);
fprintf(stderr, "space content mismatch, res:%s\n", res ? res : "(null)");
ret = RET_ERR;
}
@ -768,17 +824,17 @@ int check_exists(void *storage_ctx, const char *cid, bool expected)
if (expected)
{
if (strcmp(res, "true") != 0)
if (res == NULL || strcmp(res, "true") != 0)
{
fprintf(stderr, "exists content mismatch, res:%s\n", res);
fprintf(stderr, "exists content mismatch, res:%s\n", res ? res : "(null)");
ret = RET_ERR;
}
}
else
{
if (strcmp(res, "false") != 0)
if (res == NULL || strcmp(res, "false") != 0)
{
fprintf(stderr, "exists content mismatch, res:%s\n", res);
fprintf(stderr, "exists content mismatch, res:%s\n", res ? res : "(null)");
ret = RET_ERR;
}
}
@ -813,9 +869,9 @@ int check_toggle_private_queries(void *storage_ctx)
}
int ret = is_resp_ok(r, &res);
if (strcmp(res, "false") != 0)
if (res == NULL || strcmp(res, "false") != 0)
{
fprintf(stderr, "toggle private queries content mismatch, res:%s\n", res);
fprintf(stderr, "toggle private queries content mismatch, res:%s\n", res ? res : "(null)");
free(res);
return RET_ERR;
}
@ -830,9 +886,9 @@ int check_toggle_private_queries(void *storage_ctx)
}
ret = is_resp_ok(r, &res);
if (strcmp(res, "true") != 0)
if (res == NULL || strcmp(res, "true") != 0)
{
fprintf(stderr, "toggle private queries content mismatch, res:%s\n", res);
fprintf(stderr, "toggle private queries content mismatch, res:%s\n", res ? res : "(null)");
free(res);
return RET_ERR;
}
@ -860,7 +916,7 @@ int check_get_metrics(void *storage_ctx)
}
// Checks that response contains a metric we are SURE must exist
if (strstr(res, "libp2p_successful_dials_total") == NULL)
if (res == NULL || strstr(res, "libp2p_successful_dials_total") == NULL)
{
fprintf(stderr, "get_metrics missing expected metric\n");
free(res);
@ -928,4 +984,4 @@ int main(void)
RUN_TEST(cleanup(storage_ctx));
END_SUITE
}
}

View File

@ -56,15 +56,69 @@ suite "Metrics":
check filteredMetrics ==
%*{
"metrics": [
{"name": "myCounter_total", "value": 1.0, "labels": {"part_type": "screws"}},
{"name": "myCounter_total", "value": 1.0, "labels": {"part_type": "washers"}},
{"name": "myGauge", "value": 42.0, "labels": {}},
{"name": "myHistogram_sum", "value": 15.0, "labels": {}},
{"name": "myHistogram_count", "value": 5.0, "labels": {}},
{"name": "myHistogram_bucket", "value": 0.0, "labels": {"le": "0.0"}},
{"name": "myHistogram_bucket", "value": 1.0, "labels": {"le": "1.0"}},
{"name": "myHistogram_bucket", "value": 2.0, "labels": {"le": "2.0"}},
{"name": "myHistogram_bucket", "value": 5.0, "labels": {"le": "+Inf"}},
{
"name": "myCounter_total",
"type": "counter",
"help": "Parts Counter",
"value": 1.0,
"labels": {"part_type": "screws"},
},
{
"name": "myCounter_total",
"type": "counter",
"help": "Parts Counter",
"value": 1.0,
"labels": {"part_type": "washers"},
},
{
"name": "myGauge",
"type": "gauge",
"help": "My gauge",
"value": 42.0,
"labels": {},
},
{
"name": "myHistogram_sum",
"type": "histogram",
"help": "My histogram",
"value": 15.0,
"labels": {},
},
{
"name": "myHistogram_count",
"type": "histogram",
"help": "My histogram",
"value": 5.0,
"labels": {},
},
{
"name": "myHistogram_bucket",
"type": "histogram",
"help": "My histogram",
"value": 0.0,
"labels": {"le": "0.0"},
},
{
"name": "myHistogram_bucket",
"type": "histogram",
"help": "My histogram",
"value": 1.0,
"labels": {"le": "1.0"},
},
{
"name": "myHistogram_bucket",
"type": "histogram",
"help": "My histogram",
"value": 2.0,
"labels": {"le": "2.0"},
},
{
"name": "myHistogram_bucket",
"type": "histogram",
"help": "My histogram",
"value": 5.0,
"labels": {"le": "+Inf"},
},
]
}
@ -76,9 +130,17 @@ suite "Metrics":
check metrics ==
%*{
"metrics": [
{"name": "bad_metric", "value": 1.0, "labels": {"label1": "value1"}},
{
"name": "bad_metric",
"type": "gauge",
"help": "Badly behaved collector",
"value": 1.0,
"labels": {"label1": "value1"},
},
{
"name": "bad_metric",
"type": "gauge",
"help": "Badly behaved collector",
"value": 1.0,
"labels": {"label1": "value1", "label2": "value2"},
},