diff --git a/telemetry/aggregator.go b/telemetry/aggregator.go index 875aaf4..1298ca2 100644 --- a/telemetry/aggregator.go +++ b/telemetry/aggregator.go @@ -40,7 +40,7 @@ func (a *Aggregator) Run(d time.Duration) { // Ensure the specific key uids received a message after the end of the duration // That way we know that this specific key uid has been connected for receiverKeyUID := range receiverKeyUIDs { - ok, err := didReceivedMessageAfter(a.DB, receiverKeyUID, endsAt) + ok, err := didReceivedMessageBeforeAndAfter(a.DB, receiverKeyUID, startsAt, endsAt) if err != nil { log.Fatalf("could not check key UID: %s, because of %s", receiverKeyUID, err) } diff --git a/telemetry/aggregator_test.go b/telemetry/aggregator_test.go index 245f233..2107721 100644 --- a/telemetry/aggregator_test.go +++ b/telemetry/aggregator_test.go @@ -40,6 +40,11 @@ func dropTables(db *sql.DB) { db.Close() } +func updateCreatedAt(db *sql.DB, m *ReceivedMessage) error { + _, err := db.Exec("UPDATE receivedMessages SET createdAt = $1 WHERE id = $2", m.CreatedAt, m.ID) + return err +} + func queryAggregatedMessage(db *sql.DB) ([]*ReceivedMessageAggregated, error) { rows, err := db.Query("SELECT * FROM receivedMessageAggregated") if err != nil { @@ -90,6 +95,20 @@ func TestRunAggregatorSimple(t *testing.T) { err = m.put(db) require.NoError(t, err) + twoHourAndHalf := 5*time.Hour + time.Minute*30 + m = &ReceivedMessage{ + ChatID: "3", + MessageHash: "3", + ReceiverKeyUID: "1", + SentAt: time.Now().Add(-twoHourAndHalf).Unix(), + Topic: "1", + } + err = m.put(db) + require.NoError(t, err) + m.CreatedAt = m.SentAt + err = updateCreatedAt(db, m) + require.NoError(t, err) + agg := NewAggregator(db) agg.Run(time.Hour) @@ -138,6 +157,20 @@ func TestRunAggregatorSimpleWithMessageMissing(t *testing.T) { err = m.put(db) require.NoError(t, err) + twoHourAndHalf := 5*time.Hour + time.Minute*30 + m = &ReceivedMessage{ + ChatID: "3", + MessageHash: "4", + ReceiverKeyUID: "1", + SentAt: time.Now().Add(-twoHourAndHalf).Unix(), + Topic: "1", + } + err = m.put(db) + require.NoError(t, err) + m.CreatedAt = m.SentAt + err = updateCreatedAt(db, m) + require.NoError(t, err) + m = &ReceivedMessage{ ChatID: "1", MessageHash: "1", @@ -158,6 +191,19 @@ func TestRunAggregatorSimpleWithMessageMissing(t *testing.T) { err = m.put(db) require.NoError(t, err) + m = &ReceivedMessage{ + ChatID: "3", + MessageHash: "4", + ReceiverKeyUID: "2", + SentAt: time.Now().Add(-twoHourAndHalf).Unix(), + Topic: "1", + } + err = m.put(db) + require.NoError(t, err) + m.CreatedAt = m.SentAt + err = updateCreatedAt(db, m) + require.NoError(t, err) + agg := NewAggregator(db) agg.Run(time.Hour) diff --git a/telemetry/receivedmessage.go b/telemetry/receivedmessage.go index 874e769..4de8d47 100644 --- a/telemetry/receivedmessage.go +++ b/telemetry/receivedmessage.go @@ -55,13 +55,20 @@ func queryReceivedMessagesBetween(db *sql.DB, startsAt time.Time, endsAt time.Ti return receivedMessages, nil } -func didReceivedMessageAfter(db *sql.DB, receiverPublicKey string, after time.Time) (bool, error) { - var count int - err := db.QueryRow("SELECT COUNT(*) FROM receivedMessages WHERE receiverKeyUID = $1 AND createdAt > $2", receiverPublicKey, after.Unix()).Scan(&count) +func didReceivedMessageBeforeAndAfter(db *sql.DB, receiverPublicKey string, before, after time.Time) (bool, error) { + var afterCount int + err := db.QueryRow("SELECT COUNT(*) FROM receivedMessages WHERE receiverKeyUID = $1 AND createdAt > $2", receiverPublicKey, after.Unix()).Scan(&afterCount) if err != nil { return false, err } - return count > 0, nil + + var beforeCount int + err = db.QueryRow("SELECT COUNT(*) FROM receivedMessages WHERE receiverKeyUID = $1 AND createdAt < $2", receiverPublicKey, before.Unix()).Scan(&beforeCount) + if err != nil { + return false, err + } + + return afterCount > 0 && beforeCount > 0, nil } func (r *ReceivedMessage) put(db *sql.DB) error {