mirror of
https://github.com/waku-org/telemetry.git
synced 2025-02-19 03:16:24 +00:00
fix: aggregation with user in chat before and after (#6)
As a user can leave a chat, he can also join it, Before accounting for it, we need to ensure that he was in a specific for chat for the whoe period
This commit is contained in:
parent
4749351837
commit
7ae26a8bb6
@ -2,6 +2,7 @@ package telemetry
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
)
|
||||
@ -31,29 +32,20 @@ func (a *Aggregator) Run(d time.Duration) {
|
||||
log.Fatalf("could not query received message: %s", err)
|
||||
}
|
||||
|
||||
// Collect all key uids
|
||||
receiverKeyUIDs := make(map[string]bool)
|
||||
for _, receivedMessage := range receivedMessages {
|
||||
receiverKeyUIDs[receivedMessage.ReceiverKeyUID] = true
|
||||
}
|
||||
|
||||
// Ensure the specific key uids received a message after the end of the duration
|
||||
// That way we know that this specific key uid has been connected
|
||||
for receiverKeyUID := range receiverKeyUIDs {
|
||||
ok, err := didReceivedMessageBeforeAndAfter(a.DB, receiverKeyUID, startsAt, endsAt)
|
||||
if err != nil {
|
||||
log.Fatalf("could not check key UID: %s, because of %s", receiverKeyUID, err)
|
||||
}
|
||||
if !ok {
|
||||
receiverKeyUIDs[receiverKeyUID] = false
|
||||
}
|
||||
}
|
||||
|
||||
// Group the received messages by chat id and key uid
|
||||
groupedMessages := make(map[string]map[string]int)
|
||||
for _, receivedMessage := range receivedMessages {
|
||||
// Skip receiver key uid if it has not been connected
|
||||
if !receiverKeyUIDs[receivedMessage.ReceiverKeyUID] {
|
||||
// Skip receiver key uid if it has not been connected or was not in the chat after and before
|
||||
ok, err := didReceivedMessageBeforeAndAfterInChat(
|
||||
a.DB, receivedMessage.ReceiverKeyUID,
|
||||
startsAt,
|
||||
endsAt,
|
||||
receivedMessage.ChatID,
|
||||
)
|
||||
if err != nil {
|
||||
log.Fatalf("could not check message id: %s, because of %s", fmt.Sprint(receivedMessage.ID), err)
|
||||
}
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -55,15 +55,25 @@ func queryReceivedMessagesBetween(db *sql.DB, startsAt time.Time, endsAt time.Ti
|
||||
return receivedMessages, nil
|
||||
}
|
||||
|
||||
func didReceivedMessageBeforeAndAfter(db *sql.DB, receiverPublicKey string, before, after time.Time) (bool, error) {
|
||||
func didReceivedMessageBeforeAndAfterInChat(db *sql.DB, receiverPublicKey string, before, after time.Time, chatId string) (bool, error) {
|
||||
var afterCount int
|
||||
err := db.QueryRow("SELECT COUNT(*) FROM receivedMessages WHERE receiverKeyUID = $1 AND createdAt > $2", receiverPublicKey, after.Unix()).Scan(&afterCount)
|
||||
err := db.QueryRow(
|
||||
"SELECT COUNT(*) FROM receivedMessages WHERE receiverKeyUID = $1 AND createdAt > $2 AND chatId = $3",
|
||||
receiverPublicKey,
|
||||
after.Unix(),
|
||||
chatId,
|
||||
).Scan(&afterCount)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
var beforeCount int
|
||||
err = db.QueryRow("SELECT COUNT(*) FROM receivedMessages WHERE receiverKeyUID = $1 AND createdAt < $2", receiverPublicKey, before.Unix()).Scan(&beforeCount)
|
||||
err = db.QueryRow(
|
||||
"SELECT COUNT(*) FROM receivedMessages WHERE receiverKeyUID = $1 AND createdAt < $2 AND chatId = $3",
|
||||
receiverPublicKey,
|
||||
before.Unix(),
|
||||
chatId,
|
||||
).Scan(&beforeCount)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user