mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-05 15:33:08 +00:00
retry postgres driver msg row insert on fail
This commit is contained in:
parent
7920368a36
commit
8ee16196af
@ -291,6 +291,9 @@ proc rowCallbackImpl(
|
||||
|
||||
outRows.add((msgHash, pubSubTopic, wakuMessage))
|
||||
|
||||
const DefaultDatabasePutRetryInterval = timer.milliseconds(200)
|
||||
const DefaultDatabasePutRetryCount = 5
|
||||
|
||||
method put*(
|
||||
s: PostgresDriver,
|
||||
messageHash: WakuMessageHash,
|
||||
@ -312,29 +315,37 @@ method put*(
|
||||
## until we completely remove the store/archive-v2 logic
|
||||
let fakeId = "0"
|
||||
|
||||
(
|
||||
## Add the row to the messages table
|
||||
await s.writeConnPool.runStmt(
|
||||
InsertRowStmtName,
|
||||
InsertRowStmtDefinition,
|
||||
@[
|
||||
fakeId, messageHash, pubsubTopic, contentTopic, payload, version, timestamp,
|
||||
meta,
|
||||
],
|
||||
@[
|
||||
int32(fakeId.len),
|
||||
int32(messageHash.len),
|
||||
int32(pubsubTopic.len),
|
||||
int32(contentTopic.len),
|
||||
int32(payload.len),
|
||||
int32(version.len),
|
||||
int32(timestamp.len),
|
||||
int32(meta.len),
|
||||
],
|
||||
@[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)],
|
||||
)
|
||||
).isOkOr:
|
||||
return err("could not put msg in messages table: " & $error)
|
||||
# Briefly retry puts to avoid failing permanently on temporary failures such as
|
||||
# partition being created in parallel to row insert.
|
||||
for i in 1 .. DefaultDatabasePutRetryCount:
|
||||
(
|
||||
await s.writeConnPool.runStmt(
|
||||
InsertRowStmtName,
|
||||
InsertRowStmtDefinition,
|
||||
@[
|
||||
fakeId, messageHash, pubsubTopic, contentTopic, payload, version, timestamp,
|
||||
meta,
|
||||
],
|
||||
@[
|
||||
int32(fakeId.len),
|
||||
int32(messageHash.len),
|
||||
int32(pubsubTopic.len),
|
||||
int32(contentTopic.len),
|
||||
int32(payload.len),
|
||||
int32(version.len),
|
||||
int32(timestamp.len),
|
||||
int32(meta.len),
|
||||
],
|
||||
@[
|
||||
int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)
|
||||
],
|
||||
)
|
||||
).isOkOr:
|
||||
if i == DefaultDatabasePutRetryCount:
|
||||
return err("could not put msg in messages table: " & $error)
|
||||
await sleepAsync(DefaultDatabasePutRetryInterval)
|
||||
continue
|
||||
break
|
||||
|
||||
## Now add the row to messages_lookup
|
||||
let ret = await s.writeConnPool.runStmt(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user