mirror of https://github.com/waku-org/nwaku.git
chore: Postgres enhance get oldest timestamp (#2687)
* postgres: consider also the existing paritions when getting oldest timestamp * test_driver_postgres_query: adapt test to oldest timestamp
This commit is contained in:
parent
8d7b0ed0e8
commit
8451cf8e1b
|
@ -1,7 +1,10 @@
|
||||||
{.used.}
|
{.used.}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, sequtils, random, algorithm], testutils/unittests, chronos, chronicles
|
std/[options, sequtils, strformat, random, algorithm],
|
||||||
|
testutils/unittests,
|
||||||
|
chronos,
|
||||||
|
chronicles
|
||||||
import
|
import
|
||||||
../../../waku/waku_archive,
|
../../../waku/waku_archive,
|
||||||
../../../waku/waku_archive/driver as driver_module,
|
../../../waku/waku_archive/driver as driver_module,
|
||||||
|
@ -1767,9 +1770,21 @@ suite "Postgres driver - queries":
|
||||||
)
|
)
|
||||||
).isOk()
|
).isOk()
|
||||||
|
|
||||||
|
## just keep the second resolution.
|
||||||
|
## Notice that the oldest timestamps considers the minimum partition timestamp, which
|
||||||
|
## is expressed in seconds.
|
||||||
|
let oldestPartitionTimestamp =
|
||||||
|
Timestamp(float(oldestTime) / 1_000_000_000) * 1_000_000_000
|
||||||
|
|
||||||
var res = await driver.getOldestMessageTimestamp()
|
var res = await driver.getOldestMessageTimestamp()
|
||||||
assert res.isOk(), res.error
|
assert res.isOk(), res.error
|
||||||
assert res.get() == oldestTime, "Failed to retrieve the latest timestamp"
|
|
||||||
|
## We give certain margin of error. The oldest timestamp is obtained from
|
||||||
|
## the oldest partition timestamp and there might be at most one second of difference
|
||||||
|
## between the time created in the test and the oldest-partition-timestamp created within
|
||||||
|
## the driver logic.
|
||||||
|
assert abs(res.get() - oldestPartitionTimestamp) < (2 * 1_000_000_000),
|
||||||
|
fmt"Failed to retrieve the latest timestamp {res.get()} != {oldestPartitionTimestamp}"
|
||||||
|
|
||||||
res = await driver.getNewestMessageTimestamp()
|
res = await driver.getNewestMessageTimestamp()
|
||||||
assert res.isOk(), res.error
|
assert res.isOk(), res.error
|
||||||
|
|
|
@ -83,6 +83,9 @@ proc getLastMoment*(partition: Partition): int64 =
|
||||||
let lastTimeInSec = partition.timeRange.`end`
|
let lastTimeInSec = partition.timeRange.`end`
|
||||||
return lastTimeInSec
|
return lastTimeInSec
|
||||||
|
|
||||||
|
proc getPartitionStartTimeInNanosec*(partition: Partition): int64 =
|
||||||
|
return partition.timeRange.beginning * 1_000_000_000
|
||||||
|
|
||||||
proc containsMoment*(partition: Partition, time: int64): bool =
|
proc containsMoment*(partition: Partition, time: int64): bool =
|
||||||
## Returns true if the given moment is contained within the partition window,
|
## Returns true if the given moment is contained within the partition window,
|
||||||
## 'false' otherwise.
|
## 'false' otherwise.
|
||||||
|
|
|
@ -701,7 +701,10 @@ proc getInt(
|
||||||
try:
|
try:
|
||||||
retInt = parseInt(str)
|
retInt = parseInt(str)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return err("exception in getInt, parseInt: " & getCurrentExceptionMsg())
|
return err(
|
||||||
|
"exception in getInt, parseInt, str: " & str & " query: " & query & " exception: " &
|
||||||
|
getCurrentExceptionMsg()
|
||||||
|
)
|
||||||
|
|
||||||
return ok(retInt)
|
return ok(retInt)
|
||||||
|
|
||||||
|
@ -726,11 +729,20 @@ method getMessagesCount*(
|
||||||
method getOldestMessageTimestamp*(
|
method getOldestMessageTimestamp*(
|
||||||
s: PostgresDriver
|
s: PostgresDriver
|
||||||
): Future[ArchiveDriverResult[Timestamp]] {.async.} =
|
): Future[ArchiveDriverResult[Timestamp]] {.async.} =
|
||||||
|
## In some cases it could happen that we have
|
||||||
|
## empty partitions which are older than the current stored rows.
|
||||||
|
## In those cases we want to consider those older partitions as the oldest considered timestamp.
|
||||||
|
let oldestPartition = s.partitionMngr.getOldestPartition().valueOr:
|
||||||
|
return err("could not get oldest partition: " & $error)
|
||||||
|
|
||||||
|
let oldestPartitionTimeNanoSec = oldestPartition.getPartitionStartTimeInNanosec()
|
||||||
|
|
||||||
let intRes = await s.getInt("SELECT MIN(storedAt) FROM messages")
|
let intRes = await s.getInt("SELECT MIN(storedAt) FROM messages")
|
||||||
if intRes.isErr():
|
if intRes.isErr():
|
||||||
return err("error in getOldestMessageTimestamp: " & intRes.error)
|
## Just return the oldest partition time considering the partitions set
|
||||||
|
return ok(Timestamp(oldestPartitionTimeNanoSec))
|
||||||
|
|
||||||
return ok(Timestamp(intRes.get()))
|
return ok(Timestamp(min(intRes.get(), oldestPartitionTimeNanoSec)))
|
||||||
|
|
||||||
method getNewestMessageTimestamp*(
|
method getNewestMessageTimestamp*(
|
||||||
s: PostgresDriver
|
s: PostgresDriver
|
||||||
|
|
Loading…
Reference in New Issue