mirror of
https://github.com/logos-storage/logos-storage-nim-cs-dist-tests.git
synced 2026-01-03 22:13:10 +00:00
checking and responding to availability of codex node
This commit is contained in:
parent
36cc93ebee
commit
99e0e75ded
@ -299,7 +299,7 @@ namespace CodexClient
|
||||
|
||||
private void InitializePeerNodeId()
|
||||
{
|
||||
var debugInfo = Time.Retry(codexAccess.GetDebugInfo, "ensure online");
|
||||
var debugInfo = codexAccess.GetDebugInfo();
|
||||
if (!debugInfo.Version.IsValid())
|
||||
{
|
||||
throw new Exception($"Invalid version information received from Codex node {GetName()}: {debugInfo.Version}");
|
||||
|
||||
@ -35,6 +35,7 @@ namespace BiblioTech
|
||||
|
||||
public async Task SendInAdminChannel(string[] lines)
|
||||
{
|
||||
if (adminChannel == null) return;
|
||||
var chunker = new LineChunker(lines);
|
||||
var chunks = chunker.GetChunks();
|
||||
if (!chunks.Any()) return;
|
||||
|
||||
@ -15,6 +15,7 @@ namespace BiblioTech.CodexChecking
|
||||
Task CouldNotDownloadCid();
|
||||
Task GiveCidToUser(string cid);
|
||||
Task GiveDataFileToUser(string fileContent);
|
||||
Task CodexUnavailable();
|
||||
|
||||
Task ToAdminChannel(string msg);
|
||||
}
|
||||
@ -43,7 +44,13 @@ namespace BiblioTech.CodexChecking
|
||||
repo.SaveChanges();
|
||||
}
|
||||
|
||||
var cid = await UploadData(check.UniqueData);
|
||||
var cid = UploadData(check.UniqueData);
|
||||
if (cid == null)
|
||||
{
|
||||
await handler.CodexUnavailable();
|
||||
return;
|
||||
}
|
||||
|
||||
await handler.GiveCidToUser(cid);
|
||||
}
|
||||
|
||||
@ -87,7 +94,7 @@ namespace BiblioTech.CodexChecking
|
||||
return;
|
||||
}
|
||||
|
||||
var manifest = await GetManifest(receivedCid);
|
||||
var manifest = GetManifest(receivedCid);
|
||||
if (manifest == null)
|
||||
{
|
||||
await handler.CouldNotDownloadCid();
|
||||
@ -96,7 +103,12 @@ namespace BiblioTech.CodexChecking
|
||||
|
||||
if (IsManifestLengthCompatible(handler, check, manifest))
|
||||
{
|
||||
if (await IsContentCorrect(handler, check, receivedCid))
|
||||
var correct = IsContentCorrect(handler, check, receivedCid);
|
||||
if (!correct.HasValue) {
|
||||
await handler.CodexUnavailable();
|
||||
return;
|
||||
}
|
||||
if (correct.Value)
|
||||
{
|
||||
await CheckNowCompleted(handler, check, userId, "UploadCheck");
|
||||
return;
|
||||
@ -120,7 +132,7 @@ namespace BiblioTech.CodexChecking
|
||||
check.CompletedUtc < expiry;
|
||||
}
|
||||
|
||||
private async Task<string> UploadData(string uniqueData)
|
||||
private string? UploadData(string uniqueData)
|
||||
{
|
||||
var filePath = Path.Combine(config.ChecksDataPath, Guid.NewGuid().ToString());
|
||||
|
||||
@ -129,7 +141,7 @@ namespace BiblioTech.CodexChecking
|
||||
File.WriteAllText(filePath, uniqueData);
|
||||
var file = new TrackedFile(log, filePath, "checkData");
|
||||
|
||||
return await codexWrapper.OnCodex(node =>
|
||||
return codexWrapper.OnCodex(node =>
|
||||
{
|
||||
return node.UploadFile(file).Id;
|
||||
});
|
||||
@ -145,11 +157,11 @@ namespace BiblioTech.CodexChecking
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<Manifest?> GetManifest(string receivedCid)
|
||||
private Manifest? GetManifest(string receivedCid)
|
||||
{
|
||||
try
|
||||
{
|
||||
return await codexWrapper.OnCodex(node =>
|
||||
return codexWrapper.OnCodex(node =>
|
||||
{
|
||||
return node.DownloadManifestOnly(new ContentId(receivedCid)).Manifest;
|
||||
});
|
||||
@ -172,11 +184,11 @@ namespace BiblioTech.CodexChecking
|
||||
manifestLength < (dataLength + 1);
|
||||
}
|
||||
|
||||
private async Task<bool> IsContentCorrect(ICheckResponseHandler handler, TransferCheck check, string receivedCid)
|
||||
private bool? IsContentCorrect(ICheckResponseHandler handler, TransferCheck check, string receivedCid)
|
||||
{
|
||||
try
|
||||
{
|
||||
var content = await codexWrapper.OnCodex(node =>
|
||||
var content = codexWrapper.OnCodex(node =>
|
||||
{
|
||||
var file = node.DownloadContent(new ContentId(receivedCid));
|
||||
if (file == null) return string.Empty;
|
||||
@ -190,6 +202,8 @@ namespace BiblioTech.CodexChecking
|
||||
}
|
||||
});
|
||||
|
||||
if (content == null) return null;
|
||||
|
||||
Log($"Checking content: content={content},check={check.UniqueData}");
|
||||
return content == check.UniqueData;
|
||||
}
|
||||
|
||||
@ -21,38 +21,72 @@ namespace BiblioTech.CodexChecking
|
||||
|
||||
var httpFactory = CreateHttpFactory();
|
||||
factory = new CodexNodeFactory(log, httpFactory, dataDir: config.DataPath);
|
||||
|
||||
Task.Run(CheckCodexNode);
|
||||
}
|
||||
|
||||
public async Task OnCodex(Action<ICodexNode> action)
|
||||
public T? OnCodex<T>(Func<ICodexNode, T> func) where T : class
|
||||
{
|
||||
await Task.Run(() =>
|
||||
lock (codexLock)
|
||||
{
|
||||
lock (codexLock)
|
||||
{
|
||||
action(Get());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public async Task<T> OnCodex<T>(Func<ICodexNode, T> func)
|
||||
{
|
||||
return await Task<T>.Run(() =>
|
||||
{
|
||||
lock (codexLock)
|
||||
{
|
||||
return func(Get());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private ICodexNode Get()
|
||||
{
|
||||
if (currentCodexNode == null)
|
||||
{
|
||||
currentCodexNode = CreateCodex();
|
||||
if (currentCodexNode == null) return null;
|
||||
return func(currentCodexNode);
|
||||
}
|
||||
}
|
||||
|
||||
return currentCodexNode;
|
||||
private void CheckCodexNode()
|
||||
{
|
||||
Thread.Sleep(TimeSpan.FromSeconds(10.0));
|
||||
|
||||
while (true)
|
||||
{
|
||||
lock (codexLock)
|
||||
{
|
||||
var newNode = GetNewCodexNode();
|
||||
if (newNode != null && currentCodexNode == null) ShowConnectionRestored();
|
||||
if (newNode == null && currentCodexNode != null) ShowConnectionLost();
|
||||
currentCodexNode = newNode;
|
||||
}
|
||||
|
||||
Thread.Sleep(TimeSpan.FromMinutes(15.0));
|
||||
}
|
||||
}
|
||||
|
||||
private ICodexNode? GetNewCodexNode()
|
||||
{
|
||||
try
|
||||
{
|
||||
if (currentCodexNode != null)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Current instance is responsive? Keep it.
|
||||
var info = currentCodexNode.GetDebugInfo();
|
||||
if (info != null && info.Version != null &&
|
||||
!string.IsNullOrEmpty(info.Version.Revision)) return currentCodexNode;
|
||||
}
|
||||
catch
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
return CreateCodex();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
log.Error("Exception when trying to check codex node: " + ex.Message);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private void ShowConnectionLost()
|
||||
{
|
||||
Program.AdminChecker.SendInAdminChannel("Codex node connection lost.");
|
||||
}
|
||||
|
||||
private void ShowConnectionRestored()
|
||||
{
|
||||
Program.AdminChecker.SendInAdminChannel("Codex node connection restored.");
|
||||
}
|
||||
|
||||
private ICodexNode CreateCodex()
|
||||
@ -76,7 +110,7 @@ namespace BiblioTech.CodexChecking
|
||||
{
|
||||
if (string.IsNullOrEmpty(config.CodexEndpointAuth) || !config.CodexEndpointAuth.Contains(":"))
|
||||
{
|
||||
return new HttpFactory(log);
|
||||
return new HttpFactory(log, new SnappyTimeSet());
|
||||
}
|
||||
|
||||
var tokens = config.CodexEndpointAuth.Split(':');
|
||||
|
||||
@ -26,6 +26,11 @@ namespace BiblioTech.Commands
|
||||
await context.Followup("Could not download the CID.");
|
||||
}
|
||||
|
||||
public async Task CodexUnavailable()
|
||||
{
|
||||
await context.Followup("Couldn't perform check: Our Codex node appears unavailable. Try again later?");
|
||||
}
|
||||
|
||||
public async Task GiveCidToUser(string cid)
|
||||
{
|
||||
await context.Followup(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user