mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-03 22:13:12 +00:00
chore: create new httpClient per request (#1136)
* Create new httpClient per request * Fix tests after rebase and close the clients at the end
This commit is contained in:
parent
17d3bb55cf
commit
2538ff8da3
@ -14,33 +14,37 @@ import pkg/codex/sales/reservations
|
||||
export purchasing
|
||||
|
||||
type CodexClient* = ref object
|
||||
http: HttpClient
|
||||
baseurl: string
|
||||
session: HttpSessionRef
|
||||
httpClients: seq[HttpClient]
|
||||
|
||||
type CodexClientError* = object of CatchableError
|
||||
|
||||
const HttpClientTimeoutMs = 60 * 1000
|
||||
|
||||
proc new*(_: type CodexClient, baseurl: string): CodexClient =
|
||||
CodexClient(
|
||||
http: newHttpClient(timeout = HttpClientTimeoutMs),
|
||||
baseurl: baseurl,
|
||||
session: HttpSessionRef.new({HttpClientFlag.Http11Pipeline}),
|
||||
)
|
||||
CodexClient(baseurl: baseurl, httpClients: newSeq[HttpClient]())
|
||||
|
||||
proc http*(client: CodexClient): HttpClient =
|
||||
let httpClient = newHttpClient(timeout = HttpClientTimeoutMs)
|
||||
client.httpClients.insert(httpClient)
|
||||
return httpClient
|
||||
|
||||
proc close*(client: CodexClient): void =
|
||||
for httpClient in client.httpClients:
|
||||
httpClient.close()
|
||||
|
||||
proc info*(client: CodexClient): ?!JsonNode =
|
||||
let url = client.baseurl & "/debug/info"
|
||||
JsonNode.parse(client.http.getContent(url))
|
||||
JsonNode.parse(client.http().getContent(url))
|
||||
|
||||
proc setLogLevel*(client: CodexClient, level: string) =
|
||||
let url = client.baseurl & "/debug/chronicles/loglevel?level=" & level
|
||||
let headers = newHttpHeaders({"Content-Type": "text/plain"})
|
||||
let response = client.http.request(url, httpMethod = HttpPost, headers = headers)
|
||||
let response = client.http().request(url, httpMethod = HttpPost, headers = headers)
|
||||
assert response.status == "200 OK"
|
||||
|
||||
proc upload*(client: CodexClient, contents: string): ?!Cid =
|
||||
let response = client.http.post(client.baseurl & "/data", contents)
|
||||
let response = client.http().post(client.baseurl & "/data", contents)
|
||||
assert response.status == "200 OK"
|
||||
Cid.init(response.body).mapFailure
|
||||
|
||||
@ -48,9 +52,9 @@ proc upload*(client: CodexClient, bytes: seq[byte]): ?!Cid =
|
||||
client.upload(string.fromBytes(bytes))
|
||||
|
||||
proc download*(client: CodexClient, cid: Cid, local = false): ?!string =
|
||||
let response = client.http.get(
|
||||
client.baseurl & "/data/" & $cid & (if local: "" else: "/network/stream")
|
||||
)
|
||||
let response = client.http().get(
|
||||
client.baseurl & "/data/" & $cid & (if local: "" else: "/network/stream")
|
||||
)
|
||||
|
||||
if response.status != "200 OK":
|
||||
return failure(response.status)
|
||||
@ -58,7 +62,8 @@ proc download*(client: CodexClient, cid: Cid, local = false): ?!string =
|
||||
success response.body
|
||||
|
||||
proc downloadManifestOnly*(client: CodexClient, cid: Cid): ?!string =
|
||||
let response = client.http.get(client.baseurl & "/data/" & $cid & "/network/manifest")
|
||||
let response =
|
||||
client.http().get(client.baseurl & "/data/" & $cid & "/network/manifest")
|
||||
|
||||
if response.status != "200 OK":
|
||||
return failure(response.status)
|
||||
@ -66,7 +71,7 @@ proc downloadManifestOnly*(client: CodexClient, cid: Cid): ?!string =
|
||||
success response.body
|
||||
|
||||
proc downloadNoStream*(client: CodexClient, cid: Cid): ?!string =
|
||||
let response = client.http.post(client.baseurl & "/data/" & $cid & "/network")
|
||||
let response = client.http().post(client.baseurl & "/data/" & $cid & "/network")
|
||||
|
||||
if response.status != "200 OK":
|
||||
return failure(response.status)
|
||||
@ -78,8 +83,7 @@ proc downloadBytes*(
|
||||
): Future[?!seq[byte]] {.async.} =
|
||||
let uri = client.baseurl & "/data/" & $cid & (if local: "" else: "/network/stream")
|
||||
|
||||
let httpClient = newHttpClient()
|
||||
let response = httpClient.get(uri)
|
||||
let response = client.http().get(uri)
|
||||
|
||||
if response.status != "200 OK":
|
||||
return failure("fetch failed with status " & $response.status)
|
||||
@ -89,7 +93,7 @@ proc downloadBytes*(
|
||||
proc delete*(client: CodexClient, cid: Cid): ?!void =
|
||||
let
|
||||
url = client.baseurl & "/data/" & $cid
|
||||
response = client.http.delete(url)
|
||||
response = client.http().delete(url)
|
||||
|
||||
if response.status != "204 No Content":
|
||||
return failure(response.status)
|
||||
@ -98,7 +102,7 @@ proc delete*(client: CodexClient, cid: Cid): ?!void =
|
||||
|
||||
proc list*(client: CodexClient): ?!RestContentList =
|
||||
let url = client.baseurl & "/data"
|
||||
let response = client.http.get(url)
|
||||
let response = client.http().get(url)
|
||||
|
||||
if response.status != "200 OK":
|
||||
return failure(response.status)
|
||||
@ -107,7 +111,7 @@ proc list*(client: CodexClient): ?!RestContentList =
|
||||
|
||||
proc space*(client: CodexClient): ?!RestRepoStore =
|
||||
let url = client.baseurl & "/space"
|
||||
let response = client.http.get(url)
|
||||
let response = client.http().get(url)
|
||||
|
||||
if response.status != "200 OK":
|
||||
return failure(response.status)
|
||||
@ -141,7 +145,7 @@ proc requestStorageRaw*(
|
||||
if expiry != 0:
|
||||
json["expiry"] = %($expiry)
|
||||
|
||||
return client.http.post(url, $json)
|
||||
return client.http().post(url, $json)
|
||||
|
||||
proc requestStorage*(
|
||||
client: CodexClient,
|
||||
@ -167,7 +171,7 @@ proc requestStorage*(
|
||||
proc getPurchase*(client: CodexClient, purchaseId: PurchaseId): ?!RestPurchase =
|
||||
let url = client.baseurl & "/storage/purchases/" & purchaseId.toHex
|
||||
try:
|
||||
let body = client.http.getContent(url)
|
||||
let body = client.http().getContent(url)
|
||||
return RestPurchase.fromJson(body)
|
||||
except CatchableError as e:
|
||||
return failure e.msg
|
||||
@ -175,14 +179,14 @@ proc getPurchase*(client: CodexClient, purchaseId: PurchaseId): ?!RestPurchase =
|
||||
proc getSalesAgent*(client: CodexClient, slotId: SlotId): ?!RestSalesAgent =
|
||||
let url = client.baseurl & "/sales/slots/" & slotId.toHex
|
||||
try:
|
||||
let body = client.http.getContent(url)
|
||||
let body = client.http().getContent(url)
|
||||
return RestSalesAgent.fromJson(body)
|
||||
except CatchableError as e:
|
||||
return failure e.msg
|
||||
|
||||
proc getSlots*(client: CodexClient): ?!seq[Slot] =
|
||||
let url = client.baseurl & "/sales/slots"
|
||||
let body = client.http.getContent(url)
|
||||
let body = client.http().getContent(url)
|
||||
seq[Slot].fromJson(body)
|
||||
|
||||
proc postAvailability*(
|
||||
@ -200,7 +204,7 @@ proc postAvailability*(
|
||||
"minPricePerBytePerSecond": minPricePerBytePerSecond,
|
||||
"totalCollateral": totalCollateral,
|
||||
}
|
||||
let response = client.http.post(url, $json)
|
||||
let response = client.http().post(url, $json)
|
||||
doAssert response.status == "201 Created",
|
||||
"expected 201 Created, got " & response.status & ", body: " & response.body
|
||||
Availability.fromJson(response.body)
|
||||
@ -233,7 +237,7 @@ proc patchAvailabilityRaw*(
|
||||
if totalCollateral =? totalCollateral:
|
||||
json["totalCollateral"] = %totalCollateral
|
||||
|
||||
client.http.patch(url, $json)
|
||||
client.http().patch(url, $json)
|
||||
|
||||
proc patchAvailability*(
|
||||
client: CodexClient,
|
||||
@ -253,7 +257,7 @@ proc patchAvailability*(
|
||||
proc getAvailabilities*(client: CodexClient): ?!seq[Availability] =
|
||||
## Call sales availability REST endpoint
|
||||
let url = client.baseurl & "/sales/availability"
|
||||
let body = client.http.getContent(url)
|
||||
let body = client.http().getContent(url)
|
||||
seq[Availability].fromJson(body)
|
||||
|
||||
proc getAvailabilityReservations*(
|
||||
@ -261,16 +265,9 @@ proc getAvailabilityReservations*(
|
||||
): ?!seq[Reservation] =
|
||||
## Retrieves Availability's Reservations
|
||||
let url = client.baseurl & "/sales/availability/" & $availabilityId & "/reservations"
|
||||
let body = client.http.getContent(url)
|
||||
let body = client.http().getContent(url)
|
||||
seq[Reservation].fromJson(body)
|
||||
|
||||
proc close*(client: CodexClient) =
|
||||
client.http.close()
|
||||
|
||||
proc restart*(client: CodexClient) =
|
||||
client.http.close()
|
||||
client.http = newHttpClient(timeout = HttpClientTimeoutMs)
|
||||
|
||||
proc purchaseStateIs*(client: CodexClient, id: PurchaseId, state: string): bool =
|
||||
client.getPurchase(id).option .? state == some state
|
||||
|
||||
@ -283,18 +280,23 @@ proc requestId*(client: CodexClient, id: PurchaseId): ?RequestId =
|
||||
proc uploadRaw*(
|
||||
client: CodexClient, contents: string, headers = newHttpHeaders()
|
||||
): Response =
|
||||
return client.http.request(
|
||||
client.baseurl & "/data", body = contents, httpMethod = HttpPost, headers = headers
|
||||
)
|
||||
return client.http().request(
|
||||
client.baseurl & "/data",
|
||||
body = contents,
|
||||
httpMethod = HttpPost,
|
||||
headers = headers,
|
||||
)
|
||||
|
||||
proc listRaw*(client: CodexClient): Response =
|
||||
return client.http.request(client.baseurl & "/data", httpMethod = HttpGet)
|
||||
return client.http().request(client.baseurl & "/data", httpMethod = HttpGet)
|
||||
|
||||
proc downloadRaw*(client: CodexClient, cid: string, local = false): Response =
|
||||
return client.http.request(
|
||||
proc downloadRaw*(
|
||||
client: CodexClient, cid: string, local = false, httpClient = client.http()
|
||||
): Response =
|
||||
return httpClient.request(
|
||||
client.baseurl & "/data/" & cid & (if local: "" else: "/network/stream"),
|
||||
httpMethod = HttpGet,
|
||||
)
|
||||
|
||||
proc deleteRaw*(client: CodexClient, cid: string): Response =
|
||||
return client.http.request(client.baseurl & "/data/" & cid, httpMethod = HttpDelete)
|
||||
return client.http().request(client.baseurl & "/data/" & cid, httpMethod = HttpDelete)
|
||||
|
||||
@ -89,7 +89,6 @@ twonodessuite "Purchasing":
|
||||
check eventually(client1.purchaseStateIs(id, "submitted"), timeout = 3 * 60 * 1000)
|
||||
|
||||
await node1.restart()
|
||||
client1.restart()
|
||||
|
||||
check eventually(client1.purchaseStateIs(id, "submitted"), timeout = 3 * 60 * 1000)
|
||||
let request = client1.getPurchase(id).get.request.get
|
||||
|
||||
@ -314,24 +314,25 @@ twonodessuite "REST API":
|
||||
privateAccess(client1.http.type)
|
||||
|
||||
let cid = client1.upload(repeat("some file contents", 1000)).get
|
||||
let httpClient = client1.http()
|
||||
|
||||
try:
|
||||
# Sadly, there's no high level API for preventing the client from
|
||||
# consuming the whole response, and we need to close the socket
|
||||
# before that happens if we want to trigger the bug, so we need to
|
||||
# resort to this.
|
||||
client1.http.getBody = false
|
||||
let response = client1.downloadRaw($cid)
|
||||
httpClient.getBody = false
|
||||
let response = client1.downloadRaw($cid, httpClient = httpClient)
|
||||
|
||||
# Read 4 bytes from the stream just to make sure we actually
|
||||
# receive some data.
|
||||
let data = client1.http.socket.recv(4)
|
||||
let data = httpClient.socket.recv(4)
|
||||
check data.len == 4
|
||||
|
||||
# Prematurely closes the connection.
|
||||
client1.http.close()
|
||||
httpClient.close()
|
||||
finally:
|
||||
client1.http.getBody = true
|
||||
httpClient.getBody = true
|
||||
|
||||
let response = client1.downloadRaw($cid)
|
||||
let response = client1.downloadRaw($cid, httpClient = httpClient)
|
||||
check response.body == repeat("some file contents", 1000)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user