diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index 3ae5d95a7..f5caf08a1 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -17,7 +17,8 @@ import ./waku_archive/test_driver_sqlite_query, ./waku_archive/test_driver_sqlite, ./waku_archive/test_retention_policy, - ./waku_archive/test_waku_archive + ./waku_archive/test_waku_archive, + ./waku_archive/test_partition_manager const os* {.strdefine.} = "" when os == "Linux" and diff --git a/tests/waku_archive/test_partition_manager.nim b/tests/waku_archive/test_partition_manager.nim new file mode 100644 index 000000000..e238c7675 --- /dev/null +++ b/tests/waku_archive/test_partition_manager.nim @@ -0,0 +1,16 @@ +{.used.} + +import testutils/unittests, chronos +import + ../../../waku/waku_archive/driver/postgres_driver/partitions_manager, + ../../../waku/waku_core/time + +suite "Partition Manager": + test "Calculate end partition time": + # 1717372850 == Mon Jun 03 2024 00:00:50 GMT+0000 + # 1717376400 == Mon Jun 03 2024 01:00:00 GMT+0000 + check 1717376400 == partitions_manager.calcEndPartitionTime(Timestamp(1717372850)) + + # 1717372800 == Mon Jun 03 2024 00:00:00 GMT+0000 + # 1717376400 == Mon Jun 03 2024 01:00:00 GMT+0000 + check 1717376400 == partitions_manager.calcEndPartitionTime(Timestamp(1717372800)) diff --git a/waku/waku_archive/driver/postgres_driver/partitions_manager.nim b/waku/waku_archive/driver/postgres_driver/partitions_manager.nim index 52a01cef8..347cddaa3 100644 --- a/waku/waku_archive/driver/postgres_driver/partitions_manager.nim +++ b/waku/waku_archive/driver/postgres_driver/partitions_manager.nim @@ -4,8 +4,9 @@ ## The created partitions are referenced by the 'storedAt' field. ## -import std/deques +import std/[deques, times] import chronos, chronicles +import ../../../waku_core/time logScope: topics = "waku archive partitions_manager" @@ -95,6 +96,36 @@ proc containsMoment*(partition: Partition, time: int64): bool = return false +proc calcEndPartitionTime*(startTime: Timestamp): Timestamp = + ## Each partition has an "startTime" and "end" time. This proc calculates the "end" time so that + ## it precisely matches the next o'clock time. + ## This considers that the partitions should be 1 hour long. + ## For example, if `startTime` == 14:28 , then the returned end time should be 15:00. + ## Notice both `startTime` and returned time are in seconds since Epoch. + ## + ## startTime - seconds from Epoch that represents the partition start time + + let startDateTime: DateTime = times.fromUnix(startTime).utc() + + let maxPartitionDuration: times.Duration = times.initDuration(hours = 1) + ## Max time range covered by each parition + ## It is max because we aim to make the partition times synced to + ## o'clock hours. i.e. each partition edge will have min == sec == nanosec == 0 + + let endDateTime = startDateTime + maxPartitionDuration + let endDateTimeOClock = times.dateTime( + year = endDateTime.year, + month = endDateTime.month, + monthday = endDateTime.monthday, + hour = endDateTime.hour, + minute = 0, + second = 0, + nanosecond = 0, + zone = utc(), + ) + + return Timestamp(endDateTimeOClock.toTime().toUnix()) + proc getName*(partition: Partition): string = return partition.name diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index 1a035b2c1..b80acbde4 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -892,14 +892,14 @@ proc performWriteQuery*( return ok() proc addPartition( - self: PostgresDriver, startTime: Timestamp, duration: timer.Duration + self: PostgresDriver, startTime: Timestamp ): Future[ArchiveDriverResult[void]] {.async.} = ## Creates a partition table that will store the messages that fall in the range ## `startTime` <= storedAt < `startTime + duration`. ## `startTime` is measured in seconds since epoch let beginning = startTime - let `end` = (startTime + duration.seconds) + let `end` = partitions_manager.calcEndPartitionTime(startTime) let fromInSec: string = $beginning let untilInSec: string = $`end` @@ -914,6 +914,11 @@ proc addPartition( "messages FOR VALUES FROM ('" & fromInNanoSec & "') TO ('" & untilInNanoSec & "');" (await self.performWriteQuery(createPartitionQuery)).isOkOr: + if error.contains("already exists"): + debug "skip create new partition as it already exists: ", skipped_error = $error + return ok() + + ## for any different error, just consider it return err(fmt"error adding partition [{partitionName}]: " & $error) debug "new partition added", query = createPartitionQuery @@ -953,7 +958,6 @@ proc initializePartitionsInfo( return ok() const DefaultDatabasePartitionCheckTimeInterval = timer.minutes(10) -const PartitionsRangeInterval = timer.hours(1) ## Time range covered by each parition proc loopPartitionFactory( self: PostgresDriver, onFatalError: OnFatalErrorHandler @@ -963,11 +967,6 @@ proc loopPartitionFactory( debug "starting loopPartitionFactory" - if PartitionsRangeInterval < DefaultDatabasePartitionCheckTimeInterval: - onFatalError( - "partition factory partition range interval should be bigger than check interval" - ) - ## First of all, let's make the 'partition_manager' aware of the current partitions (await self.initializePartitionsInfo()).isOkOr: onFatalError("issue in loopPartitionFactory: " & $error) @@ -979,7 +978,7 @@ proc loopPartitionFactory( if self.partitionMngr.isEmpty(): debug "adding partition because now there aren't more partitions" - (await self.addPartition(now, PartitionsRangeInterval)).isOkOr: + (await self.addPartition(now)).isOkOr: onFatalError("error when creating a new partition from empty state: " & $error) else: let newestPartitionRes = self.partitionMngr.getNewestPartition() @@ -992,18 +991,14 @@ proc loopPartitionFactory( ## The current used partition is the last one that was created. ## Thus, let's create another partition for the future. - ( - await self.addPartition( - newestPartition.getLastMoment(), PartitionsRangeInterval - ) - ).isOkOr: + (await self.addPartition(newestPartition.getLastMoment())).isOkOr: onFatalError("could not add the next partition for 'now': " & $error) elif now >= newestPartition.getLastMoment(): debug "creating a new partition to contain current messages" ## There is no partition to contain the current time. ## This happens if the node has been stopped for quite a long time. ## Then, let's create the needed partition to contain 'now'. - (await self.addPartition(now, PartitionsRangeInterval)).isOkOr: + (await self.addPartition(now)).isOkOr: onFatalError("could not add the next partition: " & $error) await sleepAsync(DefaultDatabasePartitionCheckTimeInterval)