add CSV support, adhoc scripts and adhoc analysis

This commit is contained in:
gmega 2023-10-19 18:07:07 -03:00
parent 5a855a97e7
commit b63ece0fea
No known key found for this signature in database
GPG Key ID: FFD8DAF00660270F
25 changed files with 931 additions and 253 deletions

7
.gitignore vendored
View File

@ -1,4 +1,9 @@
.idea
data
dist
**/__pycache__
**/__pycache__
.Rproj.user
.RData
*.html
*.log
*.csv

View File

@ -2,27 +2,40 @@
## Installation
```
pip install logtools
```sh
pip install pip install git+https://github.com/gmega/logtools.git
```
## Usage
### Merge by Timestamp
```
```sh
log-merge log1.log log2.log
```
### Merge by Timestamp Showing Aliases Instead of File Name
```
```sh
log-merge log1.log log2.log --aliases bootstrap codex21
```
### Merge and Filter by Timestamp
```
```sh
# If no timezone is provided, assumes UTC
log-merge log1.log log2.log --from 2021-01-01T00:00:00 --to 2021-01-02T00:00:00
```
### Transform Raw Logs into CSV
```sh
cat ./log1.log | log-to-csv
```
### Transform Raw Logs into CSV, Extracting Topics Into Column
```sh
cat ./log1.log | log-to-csv --extract-fields topics
```

0
adhoc/__init__.py Normal file
View File

18
adhoc/identify_uploads.py Normal file
View File

@ -0,0 +1,18 @@
"""Ad-hoc script which tags uploads with a sequential number."""
import sys
uploading = False
upload_no = 0
for line in sys.stdin:
if 'Handling file upload' in line:
upload_no += 1
uploading = True
if uploading:
line = line.strip()
line = line.rsplit(' ', maxsplit=1)
line = ' '.join([line[0], f'upload={upload_no}', line[1]])
print(line)
if 'Uploaded file' in line:
uploading = False

512
analysis/.Rhistory Normal file
View File

@ -0,0 +1,512 @@
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1:2], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 13) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[c(-1,-2)], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 13) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[c(-1,-2,-3)], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 13) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[c(-1,-2,-4)], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 13) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[c(-1,-2,-3)], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 13) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 18) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 18) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-(1:3)], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 18) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-(1:4)], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 18) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-(1:5)], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 18) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-(1:10)], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
c(1,2,3,4)
c(1,2,3,4)[-1]
c(1,2,3,4)[-2]
c(1,2,3,4)[-(1:2)]
ggplot(
interlog_intervals |>
filter(upload == 18) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval, probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 18) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
interlog_intervals |>
filter(upload == 18) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds'))
ggplot(
interlog_intervals |>
filter(upload == 18) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 1) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 2) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 3) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 4) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 5) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 6) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 7) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 8) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 7) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 8) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 9) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 10) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 11) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 12) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 13) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 14) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 15) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 16) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 17) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
ggplot(
interlog_intervals |>
filter(upload == 18
) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()

152
analysis/analysis.Rmd Normal file
View File

@ -0,0 +1,152 @@
---
title: "R Notebook"
output: html_notebook
---
```{r}
library(tidyverse)
library(lubridate)
```
```{r}
uploads <- read_csv('./codex-continuous-tests-0codex3-5-77bdb95dc7-j7f46_codex3-5-uploads.csv')
```
```{r}
durations <- uploads |>
arrange(count) |>
group_by(upload) |>
summarise(
start = timestamp[1],
end = timestamp[n()],
) |>
mutate(duration = end - start)
```
How long are uploads taking?
```{r}
ggplot(durations, aes(x = upload, y = duration)) +
geom_point() +
geom_line() +
ylab('upload duration') +
xlab('upload number') +
theme_minimal()
```
Are all uploads completing?
```{r}
uploads |>
filter(message == 'Got data from stream') |>
group_by(upload) |>
count(name = 'blocks')
```
Does the end of the upload coincide with the last chunk that gets stored?
```{r}
uploads |>
filter(grepl('Got data from stream', message)) |>
group_by(upload) |>
summarise(
last_store = max(timestamp)
) |>
inner_join(durations, by='upload')
```
```{r}
durations
```
```{r}
uploads |> filter(grepl('Exception', message)) |> group_by(message) |> count() |> arrange(n)
```
```{r}
uploads |> filter(upload == 18) |> group_by(message) |> count() |> arrange(n)
```
```{r}
uploads |> filter(upload == 17) |> group_by(message) |> count() |> arrange(n)
```
```{r}
messages <- uploads |> group_by(message) |> count() |> filter(n > 100) |> pull(message)
```
```{r fig.height = 10}
uploads |> filter(message %in% messages) |> group_by(upload, message) |> count() %>% {
ggplot(.) +
geom_point(aes(x = message, y = n, color = as.factor(upload))) + theme_minimal() + theme(axis.text.x = element_text(angle = 45, hjust=1)) +
ylab('count') +
scale_color_manual(values=c('18'='red'))
}
```
```{r}
interlog_intervals <- uploads |>
group_by(upload) |>
arrange(timestamp) |>
mutate(log_interval = as.numeric(timestamp - lag(timestamp))) |>
ungroup()
```
```{r}
interlog_intervals |>
group_by(upload) |>
summarise(
mean_li = mean(log_interval, na.rm=TRUE),
median_li = median(log_interval, na.rm=TRUE),
max_li = max(log_interval, na.rm=TRUE),
) |>
pivot_longer(-upload) %>% {
ggplot(.) +
geom_line(aes(x = upload, y = value, col = name)) +
scale_y_log10() +
theme_minimal() +
ylab('duration (logscale, seconds)')
}
```
```{r}
interlog_intervals |> group_by(upload) |> count() |> arrange(desc(n))
```
```{r fig.height=5}
interlog_intervals |>
group_by(upload) |>
arrange(log_interval) |>
mutate(rank = seq_along(log_interval)) |> ungroup() %>% {
ggplot(.) +
geom_point(aes(x = rank, y = log_interval, col = as.factor(upload))) +
theme_minimal() +
xlab('rank') +
ylab('time between two consecutive log messages') +
guides(col = guide_legend(title = 'upload #'))
}
```
```{r}
ggplot(
interlog_intervals |>
filter(upload == 18
) |>
mutate(bucket = floor_date(timestamp, unit = '5 seconds')) |>
group_by(bucket) |>
mutate(
mean_interval = mean(log_interval),
p_70 = quantile(log_interval[-1], probs = c(0.95))
) |>
ungroup()
) +
geom_point(aes(x = timestamp, y = log_interval)) +
geom_line(aes(x = bucket, y = mean_interval), col = 'red', lwd = 2) +
geom_line(aes(x = bucket, y = p_70), col = 'orange', lwd = 2) +
theme_minimal()
```

13
analysis/analysis.Rproj Normal file
View File

@ -0,0 +1,13 @@
Version: 1.0
RestoreWorkspace: Default
SaveWorkspace: Default
AlwaysSaveHistory: Default
EnableCodeIndexing: Yes
UseSpacesForTab: Yes
NumSpacesForTab: 2
Encoding: UTF-8
RnwWeave: Sweave
LaTeX: pdfLaTeX

View File

@ -57,8 +57,9 @@ def _assign_colors(names: Dict[str, str]) -> Dict[str, str]:
def _filtering_predicate(args):
if args.from_ or args.to:
return timestamp_range(
_ensure_utc(tsparser.parse(args.from_)),
_ensure_utc(tsparser.parse(args.to))
_ensure_utc(args.from_) if args.from_ is not None else datetime(
year=1980, month=1, day=1, hour=0, minute=0, second=0, tzinfo=pytz.UTC),
_ensure_utc(args.to) if args.to is not None else datetime.utcnow().replace(tzinfo=pytz.UTC)
)
return lambda x: True

38
logtools/cli/to_csv.py Normal file
View File

@ -0,0 +1,38 @@
"""Parses a log stream, possibly pre-filtered and/or merged, into a CSV file. Allows chronicles topics to be
extracted into their own columns."""
import sys
from csv import DictWriter
from traitlets.config.loader import ArgumentParser
from logtools.log.sources.stream_log_source import StreamLogSource
def to_csv(args):
fields = args.extract_fields
writer = DictWriter(sys.stdout,
fieldnames=['timestamp', 'line_number', 'level', 'fields', 'count', 'message'] + fields)
writer.writeheader()
for line in StreamLogSource(sys.stdin):
line_fields = {field: line.fields.get(field, 'NA') for field in fields}
writer.writerow({
'timestamp': line.timestamp.isoformat(),
'line_number': line.location.line_number,
'level': line.level.value,
'fields': line.topics,
'count': line.count,
'message': line.message,
**line_fields,
})
def main():
argparse = ArgumentParser()
argparse.add_argument('--extract-fields', nargs='+', default=[],
help='Extract chronicles topics into CSV columns')
to_csv(argparse.parse_args())
if __name__ == '__main__':
main()

View File

@ -1,165 +0,0 @@
# import abc
# import re
# from abc import abstractmethod
# from datetime import datetime
# from heapq import heapify, heappop, heappush
# from pathlib import Path
# from typing import TypedDict, Iterable, Union, Generator, Optional, Iterator, List
#
# from parse.utils import group_match
#
# class LogIterator(abc.ABC, Iterator[LogLine]):
# @abstractmethod
# def peek(self) -> Optional[LogLine]:
# ...
#
# def context(self) -> str:
# ...
#
#
# class SingleLogIterator(LogIterator):
#
# def __init__(
# self,
# path: Path,
# alias: str,
# from_ts: Optional[datetime] = None,
# to_ts: Optional[datetime] = None,
# parse_datetime=False
# ):
# self.path = path
# self.line_number = 0
# self.parse_datetime = parse_datetime
# self.alias = alias
#
# # If from_ts or to_ts is specified, then timestamp parsing is mandatory.
# self.parse_datetime = self.parse_datetime or (from_ts is not None or to_ts is not None)
# self.from_ts = from_ts
# self.to_ts = to_ts
#
# self.inner_iterator = self._iterator()
# self.look_ahead = next(self.inner_iterator, None)
#
# def __next__(self) -> LogLine:
# next_element = self.look_ahead if self.look_ahead is not None else next(self.inner_iterator)
# self.look_ahead = next(self.inner_iterator, None)
# return next_element
#
# def __iter__(self):
# return self
#
# def __lt__(self, other):
# return self.latest_timestamp() < other.latest_timestamp()
#
# def __le__(self, other):
# return self.latest_timestamp() <= other.latest_timestamp()
#
# def _iterator(self) -> Generator[LogLine, None, None]:
# with self.path.open() as f:
# for line in f:
# self.line_number += 1
# contents = group_match(line, LOG_LINE)
# if not contents:
# continue
#
# line = LogLine(
# parent=self,
# log=self.alias,
# raw=line,
# line_number=self.line_number,
# timestamp=(datetime.fromisoformat(contents['timestamp']) if self.parse_datetime
# else contents['timestamp']),
# message=contents['message'],
# )
#
# if self.should_accept(line):
# yield line
#
# def should_accept(self, line: LogLine) -> bool:
# timestamp = line['timestamp']
# if self.from_ts is not None and timestamp <= self.from_ts:
# return False
#
# if self.to_ts is not None and timestamp >= self.to_ts:
# return False
#
# return True
#
# def peek(self) -> Optional[LogLine]:
# return self.look_ahead
#
# def latest_timestamp(self) -> Optional[datetime]:
# return self.peek()['timestamp'] if self.peek() is not None else None
#
# def context(self) -> str:
# return f'{self.path}:{self.line_number}'
#
#
# def _exclude_empty(logs: Iterable[LogIterator]):
# return [log for log in logs if log.peek() is not None]
#
#
# class CollatingLogIterator(LogIterator):
#
# def __init__(self, logs: List[SingleLogIterator]):
# self.logs = _exclude_empty(logs)
#
# def __iter__(self):
# return self
#
# def __next__(self):
# if not self.logs:
# raise StopIteration()
#
# log = self.logs[0]
# value = next(log)
# if log.peek() is None:
# self.logs.pop(0)
# return value
#
# def peek(self) -> Optional[LogLine]:
# if not self.logs:
# return None
#
# return self.logs[0].peek()
#
# def context(self) -> str:
# if not self.logs:
# raise Exception('Undefined context.')
#
# return self.logs[0].context()
#
#
# class MergingLogIterator(LogIterator):
# def __init__(self, logs: List[SingleLogIterator]):
# self.logs = _exclude_empty(logs)
# heapify(self.logs)
#
# def __iter__(self):
# return self
#
# def __next__(self) -> LogLine:
# if not self.logs:
# raise StopIteration()
#
# # by construction, we can't have any empty iterators at this point, so the call to next always succeeds.
# log = heappop(self.logs)
# value = next(log)
#
# # if the iterator still has stuff in it...
# if log.peek() is not None:
# heappush(self.logs, log)
#
# return value
#
# def peek(self) -> Optional[LogLine]:
# if not self.logs:
# return None
#
# return self.logs[0].peek()
#
# def context(self) -> str:
# if not self.logs:
# raise Exception('Undefined context.')
#
# return self.logs[0].context()

View File

@ -2,7 +2,9 @@ import re
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Union, Self, Optional
from typing import Union, Optional
TOPICS = re.compile(r'(?P<key>\w+)=(?P<value>"[\w\s]+"|\S+)')
class LogLevel(Enum):
@ -10,35 +12,20 @@ class LogLevel(Enum):
debug = 'DBG'
info = 'INF'
error = 'ERR'
LOG_LINE = re.compile(
r'(?P<line_type>\w{3}) (?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{3}\+\d{2}:\d{2}) (?P<message>.*) '
r'count=(?P<count>\d+)$'
)
warning = 'WRN'
note = 'NOT'
@dataclass
class LogLine:
raw: str
level: LogLevel
line_number: int
timestamp: Union[str, datetime]
message: str
topics: str
count: Optional[int]
@classmethod
def from_str(cls, source: str, parse_datetime: bool = False) -> Self:
parsed = LOG_LINE.search(source)
if not parsed:
raise ValueError(f'Could not parse log line: {source}')
return cls(
raw=source,
level=LogLevel(parsed['line_type'].upper()),
line_number=0,
timestamp=(datetime.fromisoformat(parsed['timestamp']) if parse_datetime
else parsed['timestamp']),
message=parsed['message'],
count=int(parsed['count']) if parsed['count'] else None,
)
@property
def fields(self):
fields = TOPICS.findall(self.topics)
return {key: value for key, value in fields} if fields else {}

View File

@ -1,30 +1,25 @@
import sys
from dataclasses import dataclass
from pathlib import Path
from logtools.log.sources.log_source import TrackedLogLine, LogSource
from logtools.log.sources.log_parsers import LineNumberLocation
from logtools.log.sources.stream_log_source import StreamLogSource
@dataclass
class FileLineLocation:
class FileLineLocation(LineNumberLocation):
path: Path
line_number: int
class FileLogSource(LogSource[TrackedLogLine[FileLineLocation]]):
class FileLogSource(StreamLogSource):
def __init__(self, path: Path, parse_datetime=True):
self.path = path
self.parse_datetime = parse_datetime
super().__init__(self.path.open(encoding='utf-8'), parse_datetime=parse_datetime)
def __iter__(self):
with self.path.open(encoding='utf-8') as f:
for line_number, line in enumerate(f, start=1):
try:
parsed = TrackedLogLine.from_str(line, parse_datetime=True)
parsed.location = FileLineLocation(self.path, line_number)
try:
yield from super().__iter__()
finally:
self.stream.close()
yield parsed
except ValueError:
# FIXME we should probably relax parsing restrictions and output
# these too but for now just skip it.
print(f'Skip unparseable line: {line}', file=sys.stderr)
def _location(self, line_number: int) -> LineNumberLocation:
return FileLineLocation(path=self.path, line_number=line_number)

View File

@ -17,6 +17,6 @@ class FilteredSource(LogSource[TrackedLogLine[TLocation]]):
def timestamp_range(start: datetime, end: datetime):
def predicate(line: TrackedLogLine[TLocation]):
return start <= line.timestamp <= end
return start <= line.timestamp <= end # type: ignore
return predicate

View File

@ -0,0 +1,76 @@
import re
import sys
from csv import DictReader
from dataclasses import dataclass
from typing import Callable, TextIO, Optional, cast
from dateutil import parser as tsparser
from logtools.log.log_line import LogLevel
from logtools.log.sources.log_source import TrackedLogLine, LogSource
@dataclass
class LineNumberLocation:
line_number: int
"""A :class:`LogParser` is a function that takes a raw text stream and returns a :class:`LogSource`, which in turn
is an iterable of parsed lines."""
LogParser = Callable[[TextIO], LogSource[LineNumberLocation]]
LOG_LINE = re.compile(
r'(?P<line_type>\w{3}) (?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{3}\+\d{2}:\d{2}) (?P<message>.*) '
r'count=(?P<count>\d+)$'
)
TOPICS = re.compile(r'((\w+=("[\w\s]+"|\S+) )+)?\w+=("[\w\s]+"|\S+)$')
def parse_raw(line: str, parse_datetime: bool = True) -> Optional[TrackedLogLine[LineNumberLocation]]:
parsed = LOG_LINE.search(line)
topics = TOPICS.search(parsed['message'])
if not parsed or not topics:
return None
return TrackedLogLine(
raw=line,
level=LogLevel(parsed['line_type'].upper()),
timestamp=(tsparser.parse(parsed['timestamp']) if parse_datetime
else parsed['timestamp']),
message=parsed['message'][:topics.start() - 1].strip(),
count=int(parsed['count']) if parsed['count'] else None,
topics=topics.group()
)
def raw_parser(stream: TextIO, parse_datetime=True) -> LogSource:
for line_number, line in enumerate(stream, start=1):
parsed = parse_raw(line, parse_datetime=parse_datetime)
if not parsed:
# FIXME we should probably relax parsing restrictions and output
# these too but for now just skip it.
print(f'Skip unparseable line: {line}', file=sys.stderr)
continue
yield parsed
def csv_parser(stream: TextIO, parse_datetime=True) -> LogSource:
for line_number, line in enumerate(DictReader(stream), start=1):
try:
line = TrackedLogLine(
raw=line['message'], # FIXME this is NOT the raw line...
timestamp=line['timestamp'],
message=line['message'],
count=int(line['count']) if line['count'] else None,
topics=line['topics'],
level=LogLevel[line['level']],
)
if parse_datetime:
line.timestamp = tsparser.parse(cast(str, line.timestamp))
yield line
except ValueError:
print(f'Skip unparseable line: {line}', file=sys.stderr)

View File

@ -0,0 +1,19 @@
from typing import TextIO
from logtools.log.sources.log_parsers import raw_parser, LineNumberLocation, LogParser
from logtools.log.sources.log_source import LogSource, TrackedLogLine
class StreamLogSource(LogSource[TrackedLogLine[LineNumberLocation]]):
def __init__(self, stream: TextIO, parse_datetime=True, log_format: LogParser = raw_parser):
self.stream = stream
self.format = log_format
self.parse_datetime = parse_datetime
def __iter__(self):
for line_number, line in enumerate(self.format(self.stream), start=1):
line.location = self._location(line_number)
yield line
def _location(self, line_number: int) -> LineNumberLocation:
return LineNumberLocation(line_number)

View File

@ -1,22 +1,19 @@
from dataclasses import dataclass
from io import StringIO
from logtools.log.sources.log_source import LogSource, TrackedLogLine
from logtools.log.sources.log_parsers import LogParser
from logtools.log.sources.stream_log_source import StreamLogSource, LineNumberLocation, raw_parser
@dataclass
class ParseLocation:
class ParseLocation(LineNumberLocation):
name: str
number: int
class StringLogSource(LogSource[TrackedLogLine[ParseLocation]]):
def __init__(self, name: str, lines: str):
class StringLogSource(StreamLogSource):
def __init__(self, name: str, lines: str, log_format: LogParser = raw_parser):
self.name = name
self.lines = lines
super().__init__(stream=StringIO(lines), log_format=log_format)
def __iter__(self):
for line_number, line in enumerate(self.lines.splitlines(), start=1):
parsed = TrackedLogLine.from_str(line, parse_datetime=True)
parsed.location = ParseLocation(self.name, line_number)
yield parsed
def _location(self, line_number: int) -> LineNumberLocation:
return ParseLocation(name=self.name, line_number=line_number)

View File

@ -22,7 +22,7 @@ def test_should_collate_lines_from_log_sources():
)
collated = CollatingSource(log1, log2)
entries = [(line.location.name, line.location.number, line.count) for line in collated]
entries = [(line.location.name, line.location.line_number, line.count) for line in collated]
assert entries == [
('log1', 1, 1),
('log1', 2, 2),

View File

@ -2,6 +2,7 @@ from dateutil import parser
from logtools.log.log_line import LogLine
from logtools.log.sources.filtered_source import FilteredSource, timestamp_range
from logtools.log.sources.log_parsers import parse_raw
from logtools.log.sources.tests.string_log_source import StringLogSource
@ -32,7 +33,7 @@ def test_should_generate_correct_datetime_range_predicate():
matches = timestamp_range(start=parser.parse('2023-10-16 22:29:24.597+00:00'),
end=parser.parse('2023-10-18 20:29:25.597+00:00'))
lines = [LogLine.from_str(line, parse_datetime=True) for line in raw_lines]
lines = [parse_raw(line, parse_datetime=True) for line in raw_lines]
filtered = [line.count for line in lines if matches(line)]
assert filtered == [2, 3]

View File

@ -0,0 +1,19 @@
from datetime import datetime
import pytz
from logtools.log.log_line import LogLevel
from logtools.log.sources.log_parsers import parse_raw
def test_raw_parser_should_parse_logline_from_string():
line = parse_raw('TRC 2023-10-16 17:28:46.579+00:00 Sending want list to peer '
'topics="codex blockexcnetwork" tid=1 peer=16U*7mogoM '
'type=WantBlock items=1 count=870781', parse_datetime=True)
assert line.level == LogLevel.trace
assert line.timestamp == datetime(2023, 10, 16, 17, 28, 46,
579000, tzinfo=pytz.utc)
assert line.message == 'Sending want list to peer'
assert line.topics == 'topics="codex blockexcnetwork" tid=1 peer=16U*7mogoM type=WantBlock items=1'
assert line.count == 870781

View File

@ -5,15 +5,12 @@ from logtools.log.sources.tests.string_log_source import StringLogSource
def test_should_order_sources_by_lookahead_timestamp():
contents = StringLogSource(
name='log1',
lines="""TRC 2023-10-16 20:29:24.595+00:00 Advertising block topics="codex discoveryengine" count=1
lines = """TRC 2023-10-16 20:29:24.595+00:00 Advertising block topics="codex discoveryengine" count=1
TRC 2023-10-16 20:29:24.597+00:00 Provided to nodes topics="codex discovery" tid=1 count=2
TRC 2023-10-16 20:29:24.646+00:00 Retrieved record from repo topics="codex repostore" count=3"""
)
log1 = OrderedSource(contents)
log2 = OrderedSource(contents)
log1 = OrderedSource(StringLogSource(name='log1', lines=lines))
log2 = OrderedSource(StringLogSource(name='log2', lines=lines))
next(log1)
assert log2 < log1
@ -24,19 +21,15 @@ def test_should_order_sources_by_lookahead_timestamp():
def test_should_raise_error_if_comparing_empty_sources():
contents = StringLogSource(
name='log1',
lines="""TRC 2023-10-16 20:29:24.595+00:00 Advertising block topics="codex discoveryengine" count=1
lines = """TRC 2023-10-16 20:29:24.595+00:00 Advertising block topics="codex discoveryengine" count=1
TRC 2023-10-16 20:29:24.597+00:00 Provided to nodes topics="codex discovery" tid=1 count=2
TRC 2023-10-16 20:29:24.646+00:00 Retrieved record from repo topics="codex repostore" count=3"""
)
log1 = OrderedSource(contents)
log2 = OrderedSource(contents)
log1 = OrderedSource(StringLogSource(name='log1', lines=lines))
log2 = OrderedSource(StringLogSource(name='log2', lines=lines))
for _ in log1:
...
with pytest.raises(ValueError):
_ = log1 < log2

View File

@ -1,16 +1,19 @@
from datetime import datetime
import pytz
from logtools.log.log_line import LogLine, LogLevel
def test_should_parse_logline_from_string():
line = LogLine.from_str('TRC 2023-10-16 17:28:46.579+00:00 Sending want list to peer '
'topics="codex blockexcnetwork" tid=1 peer=16U*7mogoM '
'type=WantBlock items=1 count=870781', parse_datetime=True)
def test_should_parse_chronicles_fields():
line = LogLine(message='Sending want list to peer',
topics='topics="codex blockexcnetwork" tid=1 peer=16U*7mogoM '
'type=WantBlock items=1',
timestamp='',
count=0,
raw='',
level=LogLevel.trace)
assert line.level == LogLevel.trace
assert line.timestamp == datetime(2023, 10, 16, 17, 28, 46,
579000, tzinfo=pytz.utc)
assert line.count == 870781
assert line.fields == {
'topics': '"codex blockexcnetwork"',
'tid': '1',
'peer': '16U*7mogoM',
'type': 'WantBlock',
'items': '1',
}

View File

View File

@ -24,4 +24,5 @@ requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[tool.poetry.scripts]
log-merge = 'logtools.cli.merge:main'
log-merge = 'logtools.cli.merge:main'
log-to-csv = 'logtools.cli.to_csv:main'