feat: add metrics collector endpoint to libstorage (#1454)

Co-authored-by: Marcin Czenko <marcin.czenko@pm.me>
This commit is contained in:
Giuliano Mega 2026-06-18 17:10:08 -03:00 committed by GitHub
parent 647fbb84a5
commit acdc0fc325
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 406 additions and 52 deletions

View File

@ -85,6 +85,7 @@ endif
test \
testAll \
testIntegration \
testLibstorageC \
testLibstorage \
update
@ -152,7 +153,7 @@ testIntegration: | build deps
$(ENV_SCRIPT) nim testIntegration $(TEST_PARAMS) $(NIM_PARAMS) build.nims
# Builds a C example that uses the libstorage C library and runs it
testLibstorage: | build deps
testLibstorageC: | build deps
$(MAKE) $(if $(ncpu),-j$(ncpu),) libstorage
cd tests/cbindings && \
if [ "$(detected_OS)" = "Windows" ]; then \
@ -163,6 +164,10 @@ testLibstorage: | build deps
LD_LIBRARY_PATH=../../build ./storage; \
fi
testLibstorage: | testLibstorageC
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim testLibstorage $(TEST_PARAMS) $(NIM_PARAMS) build.nims
# Builds and runs all tests
testAll: | build deps
echo -e $(BUILD_MSG) "build/$@" && \

View File

@ -69,18 +69,17 @@ task storage, "build logos storage binary":
task mixTools, "build mix tools (mix_pool, mix_relay_dht)":
let (desc, ec) = gorgeEx("git describe --always --dirty")
let mixVersion =
if ec == 0 and desc.strip().len > 0: desc.strip() else: "unknown"
if ec == 0 and desc.strip().len > 0:
desc.strip()
else:
"unknown"
let mixParams =
"-d:chronicles_runtime_filtering -d:chronicles_log_level=TRACE " &
"-d:mixVersion:" & mixVersion
"-d:chronicles_runtime_filtering -d:chronicles_log_level=TRACE " & "-d:mixVersion:" &
mixVersion
buildBinary "mix_pool",
outName = "mix_pool",
srcDir = "tools/mix/",
params = mixParams
outName = "mix_pool", srcDir = "tools/mix/", params = mixParams
buildBinary "mix_relay_dht",
outName = "mix_relay_dht",
srcDir = "tools/mix/",
params = mixParams
outName = "mix_relay_dht", srcDir = "tools/mix/", params = mixParams
task testStorage, "Build & run Logos Storage tests":
test "testStorage", outName = "testStorage"
@ -94,6 +93,9 @@ task testIntegration, "Run integration tests":
# test "testIntegration", params = "-d:chronicles_sinks=textlines[notimestamps,stdout],textlines[dynamic] " &
# "-d:chronicles_enabled_topics:integration:TRACE"
task testLibstorage, "Run libstorage Nim tests":
test "testLibstorage", outName = "testLibstorage"
task build, "build Logos Storage binary":
storageTask()

View File

@ -120,6 +120,13 @@ extern "C"
StorageCallback callback,
void *userData);
// Returns node metrics in the Logos openmetrics-compatible
// format (https://github.com/logos-co/openmetrics-module).
int storage_get_metrics(
void *ctx,
StorageCallback callback,
void *userData);
// Set the log level at run time.
// `logLevel` can be one of:
// TRACE, DEBUG, INFO, NOTICE, WARN, ERROR or FATAL

View File

@ -185,6 +185,19 @@ proc storage_peer_id(
return callback.okOrError(res, userData)
proc storage_get_metrics(
ctx: ptr StorageContext, callback: StorageCallback, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibstorageParams(ctx, callback, userData)
let reqContent = NodeInfoRequest.createShared(NodeInfoMsgType.METRICS)
let res = storage_context.sendRequestToStorageThread(
ctx, RequestType.INFO, reqContent, callback, userData
)
return callback.okOrError(res, userData)
## Set the log level of the library at runtime.
## It uses updateLogLevel which is a synchronous proc and
## cannot be used inside an async context because of gcsafe issue.

66
library/logosmetrics.nim Normal file
View File

@ -0,0 +1,66 @@
import std/[json, locks, strutils, times, sets]
import pkg/metrics
proc jsonHelp(collector: Collector): string =
let prefix = "# HELP " & collector.name & " "
if collector.help.startsWith(prefix):
return collector.help[prefix.len .. ^1].strip(leading = false, trailing = true)
return ""
proc jsonType(collector: Collector): string =
let parts = collector.typ.splitWhitespace()
if parts.len == 4 and parts[0] == "#" and parts[1] == "TYPE" and
parts[2] == collector.name:
return parts[3]
return "unknown"
proc toJson(collector: Collector, metrics: var seq[JsonNode]) =
# We know the closure won't outlive `metrics` so this is
# an acceptable hack.
let metricsPtr = addr metrics
let help = collector.jsonHelp()
let typ = collector.jsonType()
proc serializeMetric(
name: string,
value: float64,
labels: openArray[string] = [],
labelValues: openArray[string] = [],
timestamp: Time,
) {.raises: [].} =
# The logos openmetrics format (https://github.com/logos-co/openmetrics-module)
# does not include the timestamp, so we don't include it either.
var labelMap = newJObject()
# When a label is missing, it's simply not included in the values, so we take
# the minimum.
for i in 0 ..< min(labelValues.len, labels.len):
labelMap[labels[i]] = %labelValues[i]
metricsPtr[].add(
%*{"name": name, "type": typ, "help": help, "value": value, "labels": labelMap}
)
collector.collect(serializeMetric)
# Serializes all collectors in a given registry to a Logos openmetrics-compatible
# format. Allows including only specific collectors by name.
proc toJson*(
registry: Registry,
exclude: openArray[string] = [],
includeOnly: openArray[string] = [],
): JsonNode =
var metrics = newSeq[JsonNode]()
withLock registry.lock:
for collector in registry.collectors:
if exclude.len > 0:
if collector.name in exclude:
continue
if includeOnly.len > 0:
if collector.name notin includeOnly:
continue
collector.toJson(metrics)
result = %*{"metrics": metrics}

View File

@ -5,6 +5,8 @@ import chronos
import chronicles
import confutils
import codexdht/discv5/spr
import metrics
import ../../logosmetrics
import ../../../storage/conf
import ../../../storage/rest/json
import ../../../storage/node
@ -18,6 +20,8 @@ type NodeInfoMsgType* = enum
REPO
SPR
PEERID
# Not sure this belongs here but for now OK.
METRICS
type NodeInfoRequest* = object
operation: NodeInfoMsgType
@ -74,3 +78,22 @@ proc process*(
error "Failed to get PEERID.", error = res.error
return err($res.error)
return res
of METRICS:
# We're making a few assumptions here, which need to be double-checked:
# 1. That defaultRegistry and all collectors we're using were
# created on the host thread that called libstorageNimMain as globals
# were initialized.
# 2. That collectors themselves are thread-safe.
# 3. That reading from defaultRegistry from this (worker) thread is safe.
#
# For (3) in particular, we lean on chronos_httpserver which does the exact
# thing we are doing here: reading defaultRegistry and its collectors and
# metrics from another thread.
#
# TODO double-check these assumptions.
{.cast(gcsafe).}:
# Excludes nim_runtime_info as dumpHeapInstances seems to be returning
# an infinite number of objects (could be related to some assumption
# failure above).
# FIXME figure out what's going on and add this back.
return ok($defaultRegistry.toJson(exclude = @["nim_runtime_info"]))

View File

@ -95,8 +95,7 @@ when isMainModule:
config.dataDir / config.netPrivKeyFile
privateKey = setupKey(keyPath).valueOr:
fatal "Failed to set up the network private key",
path = keyPath, err = error.msg
fatal "Failed to set up the network private key", path = keyPath, err = error.msg
quit QuitFailure
server =

View File

@ -1,8 +1,11 @@
#include <errno.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <stdbool.h>
#include <time.h>
#include <unistd.h>
#include "../../library/libstorage.h"
@ -51,6 +54,9 @@
typedef struct
{
pthread_mutex_t mutex;
pthread_cond_t cond;
bool done;
int ret;
char *msg;
char *chunk;
@ -60,6 +66,9 @@ typedef struct
static Resp *alloc_resp(void)
{
Resp *r = (Resp *)calloc(1, sizeof(Resp));
pthread_mutex_init(&r->mutex, NULL);
pthread_cond_init(&r->cond, NULL);
r->done = false;
r->msg = NULL;
r->chunk = NULL;
r->ret = -1;
@ -83,6 +92,8 @@ static void free_resp(Resp *r)
free(r->chunk);
}
pthread_cond_destroy(&r->cond);
pthread_mutex_destroy(&r->mutex);
free(r);
}
@ -93,7 +104,11 @@ static int get_ret(Resp *r)
return RET_ERR;
}
return r->ret;
pthread_mutex_lock(&r->mutex);
int ret = r->ret;
pthread_mutex_unlock(&r->mutex);
return ret;
}
// wait_resp waits until the async response is ready or max retries is reached.
@ -101,13 +116,33 @@ static int get_ret(Resp *r)
// indicate that the response is ready to be consumed.
static void wait_resp(Resp *r)
{
int retries = 0;
while (get_ret(r) == -1 && retries < MAX_RETRIES)
if (!r)
{
usleep(1000 * 100); // 100 ms
retries++;
return;
}
const long timeout_ms = MAX_RETRIES * 100;
struct timespec deadline;
clock_gettime(CLOCK_REALTIME, &deadline);
deadline.tv_sec += timeout_ms / 1000;
deadline.tv_nsec += (timeout_ms % 1000) * 1000000;
if (deadline.tv_nsec >= 1000000000)
{
deadline.tv_sec += 1;
deadline.tv_nsec -= 1000000000;
}
pthread_mutex_lock(&r->mutex);
while (!r->done)
{
int rc = pthread_cond_timedwait(&r->cond, &r->mutex, &deadline);
if (rc == ETIMEDOUT)
{
break;
}
}
pthread_mutex_unlock(&r->mutex);
}
// is_resp_ok checks if the async response indicates success.
@ -122,6 +157,8 @@ static int is_resp_ok(Resp *r, char **res)
wait_resp(r);
pthread_mutex_lock(&r->mutex);
int ret = (r->ret == RET_OK) ? RET_OK : RET_ERR;
// If a response pointer is provided, its safe to initialize it to NULL.
@ -142,6 +179,8 @@ static int is_resp_ok(Resp *r, char **res)
*res = strdup(r->msg);
}
pthread_mutex_unlock(&r->mutex);
free_resp(r);
return ret;
@ -168,8 +207,7 @@ static void callback(int ret, const char *msg, size_t len, void *userData)
return;
}
// Assign the return code to the response structure.
r->ret = ret;
pthread_mutex_lock(&r->mutex);
// If the reponse already has a message, just free it first.
if (r->msg)
@ -187,8 +225,8 @@ static void callback(int ret, const char *msg, size_t len, void *userData)
r->len = len;
}
// For other cases, copy the message data.
if (msg && len > 0)
// For terminal responses, copy the message data.
if (ret != RET_PROGRESS && msg && len > 0)
{
// Allocate memory for the message plus null terminator.
r->msg = (char *)malloc(len + 1);
@ -197,6 +235,10 @@ static void callback(int ret, const char *msg, size_t len, void *userData)
if (!r->msg)
{
r->len = 0;
r->ret = RET_ERR;
r->done = true;
pthread_cond_signal(&r->cond);
pthread_mutex_unlock(&r->mutex);
return;
}
@ -208,11 +250,25 @@ static void callback(int ret, const char *msg, size_t len, void *userData)
r->len = len;
}
else
else if (ret != RET_PROGRESS)
{
r->msg = NULL;
r->len = 0;
}
// Progress updates are intermediate callbacks. Keep any copied chunk data,
// but wait for the final RET_OK/RET_ERR before completing the response.
if (ret == RET_PROGRESS)
{
pthread_mutex_unlock(&r->mutex);
return;
}
// Publish completion last so wait_resp can only observe a fully written Resp.
r->ret = ret;
r->done = true;
pthread_cond_signal(&r->cond);
pthread_mutex_unlock(&r->mutex);
}
static int read_file(const char *filepath, char **res)
@ -256,7 +312,7 @@ int setup(void **storage_ctx)
wait_resp(r);
if (r->ret != RET_OK)
if (get_ret(r) != RET_OK)
{
free_resp(r);
return RET_ERR;
@ -343,9 +399,9 @@ int check_repo(void *storage_ctx)
int ret = is_resp_ok(r, &res);
if (strcmp(res, "./data-dir") != 0)
if (res == NULL || strcmp(res, "./data-dir") != 0)
{
printf("repo mismatch: %s\n", res);
printf("repo mismatch: %s\n", res ? res : "(null)");
ret = RET_ERR;
}
@ -368,9 +424,9 @@ int check_debug(void *storage_ctx)
int ret = is_resp_ok(r, &res);
// Simple check to ensure the response contains spr
if (strstr(res, "spr") == NULL)
if (res == NULL || strstr(res, "spr") == NULL)
{
fprintf(stderr, "debug content mismatch, res:%s\n", res);
fprintf(stderr, "debug content mismatch, res:%s\n", res ? res : "(null)");
ret = RET_ERR;
}
@ -392,9 +448,9 @@ int check_spr(void *storage_ctx)
int ret = is_resp_ok(r, &res);
if (strstr(res, "spr") == NULL)
if (res == NULL || strstr(res, "spr") == NULL)
{
fprintf(stderr, "spr content mismatch, res:%s\n", res);
fprintf(stderr, "spr content mismatch, res:%s\n", res ? res : "(null)");
ret = RET_ERR;
}
@ -562,7 +618,7 @@ int check_upload_file(void *storage_ctx, const char *filepath, char **res)
int ret = is_resp_ok(r, res);
if (res == NULL || strlen(*res) == 0)
if (res == NULL || *res == NULL || strlen(*res) == 0)
{
fprintf(stderr, "CID is missing\n");
return RET_ERR;
@ -600,9 +656,9 @@ int check_download_stream(void *storage_ctx, const char *cid, const char *filepa
int ret = is_resp_ok(r, &res);
if (strncmp(res, "Hello World!", strlen("Hello World!")) != 0)
if (res == NULL || strncmp(res, "Hello World!", strlen("Hello World!")) != 0)
{
fprintf(stderr, "downloaded content mismatch, res:%s\n", res);
fprintf(stderr, "downloaded content mismatch, res:%s\n", res ? res : "(null)");
ret = RET_ERR;
}
@ -612,9 +668,9 @@ int check_download_stream(void *storage_ctx, const char *cid, const char *filepa
ret = RET_ERR;
}
if (strncmp(res, "Hello World!", strlen("Hello World!")) != 0)
if (res == NULL || strncmp(res, "Hello World!", strlen("Hello World!")) != 0)
{
fprintf(stderr, "downloaded content mismatch, res:%s\n", res);
fprintf(stderr, "downloaded content mismatch, res:%s\n", res ? res : "(null)");
ret = RET_ERR;
}
@ -652,9 +708,9 @@ int check_download_chunk(void *storage_ctx, const char *cid)
int ret = is_resp_ok(r, &res);
if (strncmp(res, "Hello World!", strlen("Hello World!")) != 0)
if (res == NULL || strncmp(res, "Hello World!", strlen("Hello World!")) != 0)
{
fprintf(stderr, "downloaded chunk content mismatch, res:%s\n", res);
fprintf(stderr, "downloaded chunk content mismatch, res:%s\n", res ? res : "(null)");
ret = RET_ERR;
}
@ -691,9 +747,9 @@ int check_download_manifest(void *storage_ctx, const char *cid)
const char *expected_manifest = "{\"manifestVersion\":0,\"treeCid\":\"zDzSvJTf8JYwvysKPmG7BtzpbiAHfuwFMRphxm4hdvnMJ4XPJjKX\",\"datasetSize\":12,\"blockSize\":65536,\"filename\":\"hello_world.txt\",\"mimetype\":\"text/plain\"}";
if (strncmp(res, expected_manifest, strlen(expected_manifest)) != 0)
if (res == NULL || strncmp(res, expected_manifest, strlen(expected_manifest)) != 0)
{
fprintf(stderr, "downloaded manifest content mismatch, res:%s\n", res);
fprintf(stderr, "downloaded manifest content mismatch, res:%s\n", res ? res : "(null)");
ret = RET_ERR;
}
@ -717,9 +773,9 @@ int check_list(void *storage_ctx)
const char *expected_manifest = "{\"manifestVersion\":0,\"treeCid\":\"zDzSvJTf8JYwvysKPmG7BtzpbiAHfuwFMRphxm4hdvnMJ4XPJjKX\",\"datasetSize\":12,\"blockSize\":65536,\"filename\":\"hello_world.txt\",\"mimetype\":\"text/plain\"}";
if (strstr(res, expected_manifest) == NULL)
if (res == NULL || strstr(res, expected_manifest) == NULL)
{
fprintf(stderr, "downloaded manifest content mismatch, res:%s\n", res);
fprintf(stderr, "downloaded manifest content mismatch, res:%s\n", res ? res : "(null)");
ret = RET_ERR;
}
@ -742,9 +798,9 @@ int check_space(void *storage_ctx)
int ret = is_resp_ok(r, &res);
// Simple check to ensure the response contains totalBlocks
if (strstr(res, "totalBlocks") == NULL)
if (res == NULL || strstr(res, "totalBlocks") == NULL)
{
fprintf(stderr, "list content mismatch, res:%s\n", res);
fprintf(stderr, "space content mismatch, res:%s\n", res ? res : "(null)");
ret = RET_ERR;
}
@ -768,17 +824,17 @@ int check_exists(void *storage_ctx, const char *cid, bool expected)
if (expected)
{
if (strcmp(res, "true") != 0)
if (res == NULL || strcmp(res, "true") != 0)
{
fprintf(stderr, "exists content mismatch, res:%s\n", res);
fprintf(stderr, "exists content mismatch, res:%s\n", res ? res : "(null)");
ret = RET_ERR;
}
}
else
{
if (strcmp(res, "false") != 0)
if (res == NULL || strcmp(res, "false") != 0)
{
fprintf(stderr, "exists content mismatch, res:%s\n", res);
fprintf(stderr, "exists content mismatch, res:%s\n", res ? res : "(null)");
ret = RET_ERR;
}
}
@ -813,9 +869,9 @@ int check_toggle_private_queries(void *storage_ctx)
}
int ret = is_resp_ok(r, &res);
if (strcmp(res, "false") != 0)
if (res == NULL || strcmp(res, "false") != 0)
{
fprintf(stderr, "toggle private queries content mismatch, res:%s\n", res);
fprintf(stderr, "toggle private queries content mismatch, res:%s\n", res ? res : "(null)");
free(res);
return RET_ERR;
}
@ -830,9 +886,39 @@ int check_toggle_private_queries(void *storage_ctx)
}
ret = is_resp_ok(r, &res);
if (strcmp(res, "true") != 0)
if (res == NULL || strcmp(res, "true") != 0)
{
fprintf(stderr, "toggle private queries content mismatch, res:%s\n", res);
fprintf(stderr, "toggle private queries content mismatch, res:%s\n", res ? res : "(null)");
free(res);
return RET_ERR;
}
free(res);
return RET_OK;
}
int check_get_metrics(void *storage_ctx)
{
Resp *r = alloc_resp();
char *res = NULL;
if (storage_get_metrics(storage_ctx, (StorageCallback)callback, r) != RET_OK)
{
free_resp(r);
return RET_ERR;
}
int ret = is_resp_ok(r, &res);
if (ret != RET_OK)
{
free(res);
return ret;
}
// Checks that response contains a metric we are SURE must exist
if (res == NULL || strstr(res, "libp2p_successful_dials_total") == NULL)
{
fprintf(stderr, "get_metrics missing expected metric\n");
free(res);
return RET_ERR;
}
@ -894,7 +980,8 @@ int main(void)
RUN_TEST(check_toggle_private_queries(storage_ctx));
RUN_TEST(update_log_level(storage_ctx, "TRACE"));
RUN_TEST(check_get_metrics(storage_ctx));
RUN_TEST(cleanup(storage_ctx));
END_SUITE
}
}

View File

@ -0,0 +1,148 @@
import std/json
import std/times
import pkg/unittest2
import pkg/metrics
import ../../library/logosmetrics
declareCounter myCounter, "Parts Counter", ["part_type"]
declareGauge myGauge, "My gauge"
declareHistogram myHistogram, "My histogram", buckets = [0.0, 1.0, 2.0]
# A badly-behaved collector that emits label/labelValue arrays of differing
# lengths.
type BadCollector = ref object of Gauge
method collect(collector: BadCollector, output: MetricHandler) =
output(
name = "bad_metric",
value = 1.0,
labels = ["label1", "label2"],
labelValues = ["value1"],
timestamp = Time(),
)
output(
name = "bad_metric",
value = 1.0,
labels = ["label1", "label2"],
labelValues = ["value1", "value2"],
timestamp = Time(),
)
suite "Metrics":
test "should serialize Nim metrics to Logos Metrics format":
myCounter.inc(labelValues = ["screws"])
myCounter.inc(labelValues = ["washers"])
myGauge.set(42.0)
myHistogram.observe(1)
myHistogram.observe(2)
myHistogram.observe(3)
myHistogram.observe(4)
myHistogram.observe(5)
let metrics =
defaultRegistry.toJson(includeOnly = @["myCounter", "myGauge", "myHistogram"])
# Remove "created" metrics as we can't pin those down.
var filteredMetrics = %*{"metrics": @[]}
for metric in metrics["metrics"]:
if metric["name"].getStr() != "myCounter_created" and
metric["name"].getStr() != "myHistogram_created":
filteredMetrics["metrics"].add(metric)
check filteredMetrics ==
%*{
"metrics": [
{
"name": "myCounter_total",
"type": "counter",
"help": "Parts Counter",
"value": 1.0,
"labels": {"part_type": "screws"},
},
{
"name": "myCounter_total",
"type": "counter",
"help": "Parts Counter",
"value": 1.0,
"labels": {"part_type": "washers"},
},
{
"name": "myGauge",
"type": "gauge",
"help": "My gauge",
"value": 42.0,
"labels": {},
},
{
"name": "myHistogram_sum",
"type": "histogram",
"help": "My histogram",
"value": 15.0,
"labels": {},
},
{
"name": "myHistogram_count",
"type": "histogram",
"help": "My histogram",
"value": 5.0,
"labels": {},
},
{
"name": "myHistogram_bucket",
"type": "histogram",
"help": "My histogram",
"value": 0.0,
"labels": {"le": "0.0"},
},
{
"name": "myHistogram_bucket",
"type": "histogram",
"help": "My histogram",
"value": 1.0,
"labels": {"le": "1.0"},
},
{
"name": "myHistogram_bucket",
"type": "histogram",
"help": "My histogram",
"value": 2.0,
"labels": {"le": "2.0"},
},
{
"name": "myHistogram_bucket",
"type": "histogram",
"help": "My histogram",
"value": 5.0,
"labels": {"le": "+Inf"},
},
]
}
test "should drop labels when collector emits fewer values than labels":
discard BadCollector.newCollector("badMetric", "Badly behaved collector")
let metrics = defaultRegistry.toJson(includeOnly = @["badMetric"])
check metrics ==
%*{
"metrics": [
{
"name": "bad_metric",
"type": "gauge",
"help": "Badly behaved collector",
"value": 1.0,
"labels": {"label1": "value1"},
},
{
"name": "bad_metric",
"type": "gauge",
"help": "Badly behaved collector",
"value": 1.0,
"labels": {"label1": "value1", "label2": "value2"},
},
]
}

4
tests/testLibstorage.nim Normal file
View File

@ -0,0 +1,4 @@
# Tests the Nim side of libstorage.
import ./libstorage/logosmetrics
{.warning[UnusedImport]: off.}