2
0
mirror of synced 2025-01-20 21:39:10 +00:00

289 lines
9.5 KiB
C#
Raw Normal View History

using CodexPlugin.Hooks;
using Core;
using FileUtils;
2023-09-19 11:51:59 +02:00
using GethPlugin;
2023-09-13 12:24:46 +02:00
using KubernetesWorkflow;
using KubernetesWorkflow.Types;
2024-03-26 11:39:59 +01:00
using Logging;
2023-09-13 11:59:21 +02:00
using MetricsPlugin;
2023-08-02 15:11:27 +02:00
using Utils;
2023-04-13 09:33:10 +02:00
namespace CodexPlugin
2023-04-13 09:33:10 +02:00
{
2023-09-19 11:51:59 +02:00
public interface ICodexNode : IHasContainer, IHasMetricsScrapeTarget, IHasEthAddress
2023-04-13 09:33:10 +02:00
{
string GetName();
string GetPeerId();
2024-03-25 15:46:45 +01:00
DebugInfo GetDebugInfo();
DebugPeer GetDebugPeer(string peerId);
2023-09-12 13:32:06 +02:00
ContentId UploadFile(TrackedFile file);
ContentId UploadFile(TrackedFile file, Action<Failure> onFailure);
2023-09-12 13:32:06 +02:00
TrackedFile? DownloadContent(ContentId contentId, string fileLabel = "");
TrackedFile? DownloadContent(ContentId contentId, Action<Failure> onFailure, string fileLabel = "");
2024-03-26 14:07:06 +01:00
LocalDatasetList LocalFiles();
CodexSpace Space();
2023-09-19 11:51:59 +02:00
void ConnectToPeer(ICodexNode node);
2024-03-26 10:03:52 +01:00
DebugInfoVersion Version { get; }
2023-09-20 08:45:55 +02:00
IMarketplaceAccess Marketplace { get; }
2023-09-20 12:55:09 +02:00
CrashWatcher CrashWatcher { get; }
PodInfo GetPodInfo();
2023-12-06 09:59:45 +01:00
ITransferSpeeds TransferSpeeds { get; }
2024-05-24 15:34:42 +02:00
EthAccount EthAccount { get; }
2024-06-13 08:51:52 +02:00
/// <summary>
/// Warning! The node is not usable after this.
/// TODO: Replace with delete-blocks debug call once available in Codex.
/// </summary>
void DeleteRepoFolder();
void Stop(bool waitTillStopped);
2023-04-13 09:33:10 +02:00
}
2023-09-19 11:51:59 +02:00
public class CodexNode : ICodexNode
2023-04-13 09:33:10 +02:00
{
private const string UploadFailedMessage = "Unable to store block";
private readonly ILog log;
2023-09-12 11:43:46 +02:00
private readonly IPluginTools tools;
private readonly ICodexNodeHooks hooks;
2024-05-24 15:34:42 +02:00
private readonly EthAccount? ethAccount;
2023-12-06 09:59:45 +01:00
private readonly TransferSpeeds transferSpeeds;
private string peerId = string.Empty;
private string nodeId = string.Empty;
2023-04-13 09:33:10 +02:00
public CodexNode(IPluginTools tools, CodexAccess codexAccess, CodexNodeGroup group, IMarketplaceAccess marketplaceAccess, ICodexNodeHooks hooks, EthAccount? ethAccount)
2023-04-13 09:33:10 +02:00
{
2023-09-12 11:43:46 +02:00
this.tools = tools;
2024-05-24 15:34:42 +02:00
this.ethAccount = ethAccount;
2023-04-13 09:33:10 +02:00
CodexAccess = codexAccess;
Group = group;
2023-09-20 08:45:55 +02:00
Marketplace = marketplaceAccess;
this.hooks = hooks;
2024-03-26 10:03:52 +01:00
Version = new DebugInfoVersion();
2023-12-06 09:59:45 +01:00
transferSpeeds = new TransferSpeeds();
log = new LogPrefixer(tools.GetLog(), $"{GetName()} ");
2023-04-13 09:33:10 +02:00
}
public void Awake()
{
hooks.OnNodeStarting(Container.Recipe.RecipeCreatedUtc, Container.Recipe.Image);
}
public void Initialize()
{
hooks.OnNodeStarted(peerId, nodeId);
}
2024-04-13 17:09:17 +03:00
public RunningPod Pod { get { return CodexAccess.Container; } }
public RunningContainer Container { get { return Pod.Containers.Single(); } }
2023-04-13 09:33:10 +02:00
public CodexAccess CodexAccess { get; }
2023-09-20 12:55:09 +02:00
public CrashWatcher CrashWatcher { get => CodexAccess.CrashWatcher; }
2023-04-13 09:33:10 +02:00
public CodexNodeGroup Group { get; }
2023-09-20 08:45:55 +02:00
public IMarketplaceAccess Marketplace { get; }
2024-03-26 10:03:52 +01:00
public DebugInfoVersion Version { get; private set; }
2023-12-06 09:59:45 +01:00
public ITransferSpeeds TransferSpeeds { get => transferSpeeds; }
2024-03-26 10:03:52 +01:00
2023-09-13 11:59:21 +02:00
public IMetricsScrapeTarget MetricsScrapeTarget
{
get
{
2024-04-13 17:09:17 +03:00
return new MetricsScrapeTarget(CodexAccess.Container.Containers.First(), CodexContainerRecipe.MetricsPortTag);
2023-09-13 11:59:21 +02:00
}
}
2024-03-26 10:03:52 +01:00
2023-09-20 10:13:29 +02:00
public EthAddress EthAddress
2023-09-19 11:51:59 +02:00
{
get
{
2024-05-24 15:34:42 +02:00
EnsureMarketplace();
return ethAccount!.EthAddress;
}
}
public EthAccount EthAccount
{
get
{
EnsureMarketplace();
return ethAccount!;
2023-09-19 11:51:59 +02:00
}
}
2023-04-13 09:33:10 +02:00
public string GetName()
{
2024-04-19 11:52:39 +02:00
return Container.Name;
2023-04-13 09:33:10 +02:00
}
public string GetPeerId()
{
return peerId;
}
2024-03-25 15:46:45 +01:00
public DebugInfo GetDebugInfo()
2023-04-13 09:33:10 +02:00
{
2023-06-29 16:07:49 +02:00
var debugInfo = CodexAccess.GetDebugInfo();
2024-03-26 10:03:52 +01:00
var known = string.Join(",", debugInfo.Table.Nodes.Select(n => n.PeerId));
2024-06-03 11:42:52 +02:00
Log($"Got DebugInfo with id: {debugInfo.Id}. This node knows: [{known}]");
2023-04-19 14:57:00 +02:00
return debugInfo;
2023-04-13 09:33:10 +02:00
}
2024-03-25 15:46:45 +01:00
public DebugPeer GetDebugPeer(string peerId)
2023-05-11 12:44:53 +02:00
{
2023-06-29 16:07:49 +02:00
return CodexAccess.GetDebugPeer(peerId);
2023-05-11 12:44:53 +02:00
}
2023-09-12 13:32:06 +02:00
public ContentId UploadFile(TrackedFile file)
2023-04-13 09:33:10 +02:00
{
return UploadFile(file, DoNothing);
}
public ContentId UploadFile(TrackedFile file, Action<Failure> onFailure)
2023-04-13 09:33:10 +02:00
{
2023-09-12 11:43:46 +02:00
using var fileStream = File.OpenRead(file.Filename);
var uniqueId = Guid.NewGuid().ToString();
var size = file.GetFilesize();
hooks.OnFileUploading(uniqueId, size);
2024-08-01 14:50:25 +02:00
var logMessage = $"Uploading file {file.Describe()}...";
var measurement = Stopwatch.Measure(log, logMessage, () =>
2023-09-12 11:43:46 +02:00
{
return CodexAccess.UploadFile(fileStream, onFailure);
2023-09-12 11:43:46 +02:00
});
2023-12-06 09:59:45 +01:00
var response = measurement.Value;
transferSpeeds.AddUploadSample(size, measurement.Duration);
2023-12-06 09:59:45 +01:00
2023-09-20 10:51:47 +02:00
if (string.IsNullOrEmpty(response)) FrameworkAssert.Fail("Received empty response.");
if (response.StartsWith(UploadFailedMessage)) FrameworkAssert.Fail("Node failed to store block.");
2023-07-11 08:19:14 +02:00
2024-08-01 14:50:25 +02:00
Log($"Uploaded file {file.Describe()}. Received contentId: '{response}'.");
2024-06-12 15:28:08 +02:00
var cid = new ContentId(response);
hooks.OnFileUploaded(uniqueId, size, cid);
return cid;
2023-04-13 09:33:10 +02:00
}
2023-09-12 13:32:06 +02:00
public TrackedFile? DownloadContent(ContentId contentId, string fileLabel = "")
{
return DownloadContent(contentId, DoNothing, fileLabel);
}
public TrackedFile? DownloadContent(ContentId contentId, Action<Failure> onFailure, string fileLabel = "")
2023-04-13 09:33:10 +02:00
{
2023-09-13 10:03:11 +02:00
var file = tools.GetFileManager().CreateEmptyFile(fileLabel);
hooks.OnFileDownloading(contentId);
Log($"Downloading '{contentId}'...");
var logMessage = $"Downloaded '{contentId}' to '{file.Filename}'";
var measurement = Stopwatch.Measure(log, logMessage, () => DownloadToFile(contentId.Id, file, onFailure));
var size = file.GetFilesize();
transferSpeeds.AddDownloadSample(size, measurement);
hooks.OnFileDownloaded(size, contentId);
2023-09-12 11:43:46 +02:00
return file;
2023-04-13 09:33:10 +02:00
}
2024-03-26 14:07:06 +01:00
public LocalDatasetList LocalFiles()
2023-11-10 08:20:08 +01:00
{
2024-03-26 11:39:59 +01:00
return CodexAccess.LocalFiles();
2023-11-10 08:20:08 +01:00
}
public CodexSpace Space()
{
return CodexAccess.Space();
}
2023-09-19 11:51:59 +02:00
public void ConnectToPeer(ICodexNode node)
2023-04-13 09:33:10 +02:00
{
2023-09-19 11:51:59 +02:00
var peer = (CodexNode)node;
2023-04-13 09:33:10 +02:00
Log($"Connecting to peer {peer.GetName()}...");
var peerInfo = node.GetDebugInfo();
2024-03-26 12:14:02 +01:00
CodexAccess.ConnectToPeer(peerInfo.Id, GetPeerMultiAddresses(peer, peerInfo));
2023-04-13 09:33:10 +02:00
Log($"Successfully connected to peer {peer.GetName()}.");
}
public PodInfo GetPodInfo()
{
return CodexAccess.GetPodInfo();
}
2024-06-13 08:51:52 +02:00
public void DeleteRepoFolder()
{
CodexAccess.DeleteRepoFolder();
}
public void Stop(bool waitTillStopped)
2023-04-25 12:52:11 +02:00
{
2024-06-12 15:28:08 +02:00
Log("Stopping...");
hooks.OnNodeStopping();
2024-04-13 17:09:17 +03:00
CrashWatcher.Stop();
2024-04-13 17:20:23 +03:00
Group.Stop(this, waitTillStopped);
2023-04-25 12:52:11 +02:00
}
2023-07-31 11:51:29 +02:00
public void EnsureOnlineGetVersionResponse()
{
2023-08-02 15:11:27 +02:00
var debugInfo = Time.Retry(CodexAccess.GetDebugInfo, "ensure online");
peerId = debugInfo.Id;
nodeId = debugInfo.Table.LocalNode.NodeId;
2023-07-31 11:51:29 +02:00
var nodeName = CodexAccess.Container.Name;
2024-03-26 10:03:52 +01:00
if (!debugInfo.Version.IsValid())
2023-07-31 11:51:29 +02:00
{
2024-03-26 10:03:52 +01:00
throw new Exception($"Invalid version information received from Codex node {GetName()}: {debugInfo.Version}");
2023-07-31 11:51:29 +02:00
}
log.AddStringReplace(peerId, nodeName);
2024-07-26 09:14:46 +02:00
log.AddStringReplace(CodexUtils.ToShortId(peerId), nodeName);
2024-03-26 10:03:52 +01:00
log.AddStringReplace(debugInfo.Table.LocalNode.NodeId, nodeName);
2024-07-26 09:14:46 +02:00
log.AddStringReplace(CodexUtils.ToShortId(debugInfo.Table.LocalNode.NodeId), nodeName);
2024-03-26 10:03:52 +01:00
Version = debugInfo.Version;
2023-07-31 11:51:29 +02:00
}
2024-03-26 12:14:02 +01:00
private string[] GetPeerMultiAddresses(CodexNode peer, DebugInfo peerInfo)
2023-04-13 09:33:10 +02:00
{
// The peer we want to connect is in a different pod.
// We must replace the default IP with the pod IP in the multiAddress.
var workflow = tools.CreateWorkflow();
2024-04-13 17:09:17 +03:00
var podInfo = workflow.GetPodInfo(peer.Pod);
2024-03-26 12:14:02 +01:00
return peerInfo.Addrs.Select(a => a
.Replace("0.0.0.0", podInfo.Ip))
.ToArray();
2023-04-13 09:33:10 +02:00
}
private void DownloadToFile(string contentId, TrackedFile file, Action<Failure> onFailure)
2023-04-13 09:33:10 +02:00
{
using var fileStream = File.OpenWrite(file.Filename);
2023-04-19 14:57:00 +02:00
try
{
using var downloadStream = CodexAccess.DownloadFile(contentId, onFailure);
2023-04-19 14:57:00 +02:00
downloadStream.CopyTo(fileStream);
}
catch
{
Log($"Failed to download file '{contentId}'.");
throw;
}
2023-04-13 09:33:10 +02:00
}
2024-05-24 15:34:42 +02:00
private void EnsureMarketplace()
{
if (ethAccount == null) throw new Exception("Marketplace is not enabled for this Codex node. Please start it with the option '.EnableMarketplace(...)' to enable it.");
}
2023-04-13 09:33:10 +02:00
private void Log(string msg)
{
log.Log(msg);
2023-04-13 09:33:10 +02:00
}
private void DoNothing(Failure failure)
{
}
2023-04-13 09:33:10 +02:00
}
}