From 693a1778d19bc73cfa041f65476c13d2fd0b2083 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Mon, 18 Mar 2024 15:59:45 +0100 Subject: [PATCH] chore: Better postgres duplicate insert (#2535) * postgres_driver: add ON CONFLICT DO NOTHING in the insert statement * test_driver_postgres: adapt test to ON CONFLICT DO NOTHING The insert does not fail when duplicate, it returns a positive response when doing 'put' of a duplicated row. The test is adapted so that we just check that the number of messages doesn't grow after trying to add a duplicated row. --- tests/waku_archive/test_driver_postgres.nim | 18 +++++++++++++++++- waku/waku_archive/archive.nim | 9 +-------- .../driver/postgres_driver/postgres_driver.nim | 3 ++- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/tests/waku_archive/test_driver_postgres.nim b/tests/waku_archive/test_driver_postgres.nim index 7ff8ef404..a42cf1fb1 100644 --- a/tests/waku_archive/test_driver_postgres.nim +++ b/tests/waku_archive/test_driver_postgres.nim @@ -164,6 +164,9 @@ suite "Postgres driver": let msg1 = fakeWakuMessage(ts = now) let msg2 = fakeWakuMessage(ts = now) + let initialNumMsgs = (await driver.getMessagesCount()).valueOr: + raiseAssert "could not get num mgs correctly: " & $error + var putRes = await driver.put( DefaultPubsubTopic, msg1, @@ -173,6 +176,12 @@ suite "Postgres driver": ) assert putRes.isOk(), putRes.error + var newNumMsgs = (await driver.getMessagesCount()).valueOr: + raiseAssert "could not get num mgs correctly: " & $error + + assert newNumMsgs == (initialNumMsgs + 1.int64), + "wrong number of messages: " & $newNumMsgs + putRes = await driver.put( DefaultPubsubTopic, msg2, @@ -180,4 +189,11 @@ suite "Postgres driver": computeMessageHash(DefaultPubsubTopic, msg2), msg2.timestamp, ) - assert not putRes.isOk() + + assert putRes.isOk() + + newNumMsgs = (await driver.getMessagesCount()).valueOr: + raiseAssert "could not get num mgs correctly: " & $error + + assert newNumMsgs == (initialNumMsgs + 1.int64), + "wrong number of messages: " & $newNumMsgs diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index 903852a2b..6f5330f65 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -113,14 +113,7 @@ proc handleMessage*( (await self.driver.put(pubsubTopic, msg, msgDigest, msgHash, msgTimestamp)).isOkOr: waku_archive_errors.inc(labelValues = [insertFailure]) - # Prevent spamming the logs when multiple nodes are connected to the same database. - # In that case, the message cannot be inserted but is an expected "insert error" - # and therefore we reduce its visibility by having the log in trace level. - if "duplicate key value violates unique constraint" in error: - trace "failed to insert message", err = error - else: - debug "failed to insert message", err = error - + debug "failed to insert message", err = error let insertDuration = getTime().toUnixFloat() - insertStartTime waku_archive_insert_duration_seconds.observe(insertDuration) diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index e62634b08..94c681ba9 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -31,7 +31,7 @@ type PostgresDriver* = ref object of ArchiveDriver const InsertRowStmtName = "InsertRow" const InsertRowStmtDefinition = # TODO: get the sql queries from a file """INSERT INTO messages (id, messageHash, storedAt, contentTopic, payload, pubsubTopic, - version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7, $8);""" + version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT DO NOTHING;""" const SelectNoCursorAscStmtName = "SelectWithoutCursorAsc" const SelectNoCursorAscStmtDef = @@ -679,6 +679,7 @@ proc loopPartitionFactory( debug "creating a new partition for the future" ## The current used partition is the last one that was created. ## Thus, let's create another partition for the future. + ( await self.addPartition( newestPartition.getLastMoment(), PartitionsRangeInterval