mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-04 06:53:12 +00:00
Fix protocol connection close (#3588)
* Added connection closeWithEof for protocol handler and clients of lightpush/legacy lightpush and filter (except filer push case) * Store/Legacy store close connections note: this enhancement is fully made by Zoltán. Me I just resubmitted it after nwaku history cleanup. --------- Co-authored-by: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com>
This commit is contained in:
parent
5b5ff4cbe7
commit
74b3770f6c
@ -58,6 +58,9 @@ proc sendSubscribeRequest(
|
|||||||
|
|
||||||
let connection = connOpt.get()
|
let connection = connOpt.get()
|
||||||
|
|
||||||
|
defer:
|
||||||
|
await connection.closeWithEOF()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await connection.writeLP(filterSubscribeRequest.encode().buffer)
|
await connection.writeLP(filterSubscribeRequest.encode().buffer)
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
|
|||||||
@ -293,6 +293,9 @@ proc initProtocolHandler(wf: WakuFilter) =
|
|||||||
|
|
||||||
var response: FilterSubscribeResponse
|
var response: FilterSubscribeResponse
|
||||||
|
|
||||||
|
defer:
|
||||||
|
await conn.closeWithEOF()
|
||||||
|
|
||||||
wf.peerRequestRateLimiter.checkUsageLimit(WakuFilterSubscribeCodec, conn):
|
wf.peerRequestRateLimiter.checkUsageLimit(WakuFilterSubscribeCodec, conn):
|
||||||
var buf: seq[byte]
|
var buf: seq[byte]
|
||||||
try:
|
try:
|
||||||
|
|||||||
@ -43,6 +43,9 @@ proc sendPushRequest(
|
|||||||
dialFailure & ": " & $peer & " is not accessible",
|
dialFailure & ": " & $peer & " is not accessible",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
defer:
|
||||||
|
await connection.closeWithEOF()
|
||||||
|
|
||||||
await connection.writeLP(req.encode().buffer)
|
await connection.writeLP(req.encode().buffer)
|
||||||
|
|
||||||
var buffer: seq[byte]
|
var buffer: seq[byte]
|
||||||
|
|||||||
@ -107,6 +107,9 @@ proc handleRequest*(
|
|||||||
proc initProtocolHandler(wl: WakuLightPush) =
|
proc initProtocolHandler(wl: WakuLightPush) =
|
||||||
proc handler(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
|
proc handler(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
|
||||||
var rpc: LightPushResponse
|
var rpc: LightPushResponse
|
||||||
|
defer:
|
||||||
|
await conn.closeWithEOF()
|
||||||
|
|
||||||
wl.requestRateLimiter.checkUsageLimit(WakuLightPushCodec, conn):
|
wl.requestRateLimiter.checkUsageLimit(WakuLightPushCodec, conn):
|
||||||
var buffer: seq[byte]
|
var buffer: seq[byte]
|
||||||
try:
|
try:
|
||||||
|
|||||||
@ -40,6 +40,9 @@ proc sendPushRequest(
|
|||||||
return err(dialFailure)
|
return err(dialFailure)
|
||||||
let connection = connOpt.get()
|
let connection = connOpt.get()
|
||||||
|
|
||||||
|
defer:
|
||||||
|
await connection.closeWithEOF()
|
||||||
|
|
||||||
let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: some(req))
|
let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: some(req))
|
||||||
await connection.writeLP(rpc.encode().buffer)
|
await connection.writeLP(rpc.encode().buffer)
|
||||||
|
|
||||||
|
|||||||
@ -67,6 +67,9 @@ proc handleRequest*(
|
|||||||
proc initProtocolHandler(wl: WakuLegacyLightPush) =
|
proc initProtocolHandler(wl: WakuLegacyLightPush) =
|
||||||
proc handler(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
|
proc handler(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
|
||||||
var rpc: PushRPC
|
var rpc: PushRPC
|
||||||
|
defer:
|
||||||
|
await conn.closeWithEOF()
|
||||||
|
|
||||||
wl.requestRateLimiter.checkUsageLimit(WakuLegacyLightPushCodec, conn):
|
wl.requestRateLimiter.checkUsageLimit(WakuLegacyLightPushCodec, conn):
|
||||||
var buffer: seq[byte]
|
var buffer: seq[byte]
|
||||||
try:
|
try:
|
||||||
|
|||||||
@ -25,6 +25,9 @@ proc sendStoreRequest(
|
|||||||
): Future[StoreQueryResult] {.async, gcsafe.} =
|
): Future[StoreQueryResult] {.async, gcsafe.} =
|
||||||
var req = request
|
var req = request
|
||||||
|
|
||||||
|
defer:
|
||||||
|
await connection.closeWithEof()
|
||||||
|
|
||||||
if req.requestId == "":
|
if req.requestId == "":
|
||||||
req.requestId = generateRequestId(self.rng)
|
req.requestId = generateRequestId(self.rng)
|
||||||
|
|
||||||
|
|||||||
@ -92,6 +92,10 @@ proc initProtocolHandler(self: WakuStore) =
|
|||||||
var successfulQuery = false ## only consider the correct queries in metrics
|
var successfulQuery = false ## only consider the correct queries in metrics
|
||||||
var resBuf: StoreResp
|
var resBuf: StoreResp
|
||||||
var queryDuration: float
|
var queryDuration: float
|
||||||
|
|
||||||
|
defer:
|
||||||
|
await conn.closeWithEof()
|
||||||
|
|
||||||
self.requestRateLimiter.checkUsageLimit(WakuStoreCodec, conn):
|
self.requestRateLimiter.checkUsageLimit(WakuStoreCodec, conn):
|
||||||
let readRes = catch:
|
let readRes = catch:
|
||||||
await conn.readLp(DefaultMaxRpcSize.int)
|
await conn.readLp(DefaultMaxRpcSize.int)
|
||||||
|
|||||||
@ -43,6 +43,9 @@ proc sendHistoryQueryRPC(
|
|||||||
|
|
||||||
let connection = connOpt.get()
|
let connection = connOpt.get()
|
||||||
|
|
||||||
|
defer:
|
||||||
|
await connection.closeWithEof()
|
||||||
|
|
||||||
let requestId =
|
let requestId =
|
||||||
if req.requestId != "":
|
if req.requestId != "":
|
||||||
req.requestId
|
req.requestId
|
||||||
|
|||||||
@ -114,6 +114,10 @@ proc initProtocolHandler(ws: WakuStore) =
|
|||||||
var successfulQuery = false ## only consider the correct queries in metrics
|
var successfulQuery = false ## only consider the correct queries in metrics
|
||||||
var resBuf: StoreResp
|
var resBuf: StoreResp
|
||||||
var queryDuration: float
|
var queryDuration: float
|
||||||
|
|
||||||
|
defer:
|
||||||
|
await conn.closeWithEof()
|
||||||
|
|
||||||
ws.requestRateLimiter.checkUsageLimit(WakuLegacyStoreCodec, conn):
|
ws.requestRateLimiter.checkUsageLimit(WakuLegacyStoreCodec, conn):
|
||||||
let readRes = catch:
|
let readRes = catch:
|
||||||
await conn.readLp(DefaultMaxRpcSize.int)
|
await conn.readLp(DefaultMaxRpcSize.int)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user