mirror of
https://github.com/waku-org/telemetry.git
synced 2025-02-20 19:58:12 +00:00
fix: ensure message received (#5)
This commit is contained in:
parent
c92bd95a8b
commit
4749351837
@ -40,7 +40,7 @@ func (a *Aggregator) Run(d time.Duration) {
|
|||||||
// Ensure the specific key uids received a message after the end of the duration
|
// Ensure the specific key uids received a message after the end of the duration
|
||||||
// That way we know that this specific key uid has been connected
|
// That way we know that this specific key uid has been connected
|
||||||
for receiverKeyUID := range receiverKeyUIDs {
|
for receiverKeyUID := range receiverKeyUIDs {
|
||||||
ok, err := didReceivedMessageAfter(a.DB, receiverKeyUID, endsAt)
|
ok, err := didReceivedMessageBeforeAndAfter(a.DB, receiverKeyUID, startsAt, endsAt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("could not check key UID: %s, because of %s", receiverKeyUID, err)
|
log.Fatalf("could not check key UID: %s, because of %s", receiverKeyUID, err)
|
||||||
}
|
}
|
||||||
|
@ -40,6 +40,11 @@ func dropTables(db *sql.DB) {
|
|||||||
db.Close()
|
db.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func updateCreatedAt(db *sql.DB, m *ReceivedMessage) error {
|
||||||
|
_, err := db.Exec("UPDATE receivedMessages SET createdAt = $1 WHERE id = $2", m.CreatedAt, m.ID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func queryAggregatedMessage(db *sql.DB) ([]*ReceivedMessageAggregated, error) {
|
func queryAggregatedMessage(db *sql.DB) ([]*ReceivedMessageAggregated, error) {
|
||||||
rows, err := db.Query("SELECT * FROM receivedMessageAggregated")
|
rows, err := db.Query("SELECT * FROM receivedMessageAggregated")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -90,6 +95,20 @@ func TestRunAggregatorSimple(t *testing.T) {
|
|||||||
err = m.put(db)
|
err = m.put(db)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
twoHourAndHalf := 5*time.Hour + time.Minute*30
|
||||||
|
m = &ReceivedMessage{
|
||||||
|
ChatID: "3",
|
||||||
|
MessageHash: "3",
|
||||||
|
ReceiverKeyUID: "1",
|
||||||
|
SentAt: time.Now().Add(-twoHourAndHalf).Unix(),
|
||||||
|
Topic: "1",
|
||||||
|
}
|
||||||
|
err = m.put(db)
|
||||||
|
require.NoError(t, err)
|
||||||
|
m.CreatedAt = m.SentAt
|
||||||
|
err = updateCreatedAt(db, m)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
agg := NewAggregator(db)
|
agg := NewAggregator(db)
|
||||||
|
|
||||||
agg.Run(time.Hour)
|
agg.Run(time.Hour)
|
||||||
@ -138,6 +157,20 @@ func TestRunAggregatorSimpleWithMessageMissing(t *testing.T) {
|
|||||||
err = m.put(db)
|
err = m.put(db)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
twoHourAndHalf := 5*time.Hour + time.Minute*30
|
||||||
|
m = &ReceivedMessage{
|
||||||
|
ChatID: "3",
|
||||||
|
MessageHash: "4",
|
||||||
|
ReceiverKeyUID: "1",
|
||||||
|
SentAt: time.Now().Add(-twoHourAndHalf).Unix(),
|
||||||
|
Topic: "1",
|
||||||
|
}
|
||||||
|
err = m.put(db)
|
||||||
|
require.NoError(t, err)
|
||||||
|
m.CreatedAt = m.SentAt
|
||||||
|
err = updateCreatedAt(db, m)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
m = &ReceivedMessage{
|
m = &ReceivedMessage{
|
||||||
ChatID: "1",
|
ChatID: "1",
|
||||||
MessageHash: "1",
|
MessageHash: "1",
|
||||||
@ -158,6 +191,19 @@ func TestRunAggregatorSimpleWithMessageMissing(t *testing.T) {
|
|||||||
err = m.put(db)
|
err = m.put(db)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
m = &ReceivedMessage{
|
||||||
|
ChatID: "3",
|
||||||
|
MessageHash: "4",
|
||||||
|
ReceiverKeyUID: "2",
|
||||||
|
SentAt: time.Now().Add(-twoHourAndHalf).Unix(),
|
||||||
|
Topic: "1",
|
||||||
|
}
|
||||||
|
err = m.put(db)
|
||||||
|
require.NoError(t, err)
|
||||||
|
m.CreatedAt = m.SentAt
|
||||||
|
err = updateCreatedAt(db, m)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
agg := NewAggregator(db)
|
agg := NewAggregator(db)
|
||||||
|
|
||||||
agg.Run(time.Hour)
|
agg.Run(time.Hour)
|
||||||
|
@ -55,13 +55,20 @@ func queryReceivedMessagesBetween(db *sql.DB, startsAt time.Time, endsAt time.Ti
|
|||||||
return receivedMessages, nil
|
return receivedMessages, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func didReceivedMessageAfter(db *sql.DB, receiverPublicKey string, after time.Time) (bool, error) {
|
func didReceivedMessageBeforeAndAfter(db *sql.DB, receiverPublicKey string, before, after time.Time) (bool, error) {
|
||||||
var count int
|
var afterCount int
|
||||||
err := db.QueryRow("SELECT COUNT(*) FROM receivedMessages WHERE receiverKeyUID = $1 AND createdAt > $2", receiverPublicKey, after.Unix()).Scan(&count)
|
err := db.QueryRow("SELECT COUNT(*) FROM receivedMessages WHERE receiverKeyUID = $1 AND createdAt > $2", receiverPublicKey, after.Unix()).Scan(&afterCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
return count > 0, nil
|
|
||||||
|
var beforeCount int
|
||||||
|
err = db.QueryRow("SELECT COUNT(*) FROM receivedMessages WHERE receiverKeyUID = $1 AND createdAt < $2", receiverPublicKey, before.Unix()).Scan(&beforeCount)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return afterCount > 0 && beforeCount > 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ReceivedMessage) put(db *sql.DB) error {
|
func (r *ReceivedMessage) put(db *sql.DB) error {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user