mirror of
https://github.com/logos-storage/logos-storage-nim-cs-dist-tests.git
synced 2026-01-03 22:13:10 +00:00
pushes container concepts out of codexAccess
This commit is contained in:
parent
2292d2e672
commit
e45ed0c21e
@ -1,9 +1,10 @@
|
||||
using k8s;
|
||||
using Logging;
|
||||
using Utils;
|
||||
|
||||
namespace KubernetesWorkflow
|
||||
{
|
||||
public class CrashWatcher
|
||||
public class ContainerCrashWatcher : ICrashWatcher
|
||||
{
|
||||
private readonly ILog log;
|
||||
private readonly KubernetesClientConfiguration config;
|
||||
@ -15,7 +16,7 @@ namespace KubernetesWorkflow
|
||||
private Task? worker;
|
||||
private Exception? workerException;
|
||||
|
||||
public CrashWatcher(ILog log, KubernetesClientConfiguration config, string containerName, string podName, string recipeName, string k8sNamespace)
|
||||
public ContainerCrashWatcher(ILog log, KubernetesClientConfiguration config, string containerName, string podName, string recipeName, string k8sNamespace)
|
||||
{
|
||||
this.log = log;
|
||||
this.config = config;
|
||||
@ -45,7 +46,7 @@ namespace KubernetesWorkflow
|
||||
if (workerException != null) throw new Exception("Exception occurred in CrashWatcher worker thread.", workerException);
|
||||
}
|
||||
|
||||
public bool HasContainerCrashed()
|
||||
public bool HasCrashed()
|
||||
{
|
||||
using var client = new Kubernetes(config);
|
||||
var result = HasContainerBeenRestarted(client);
|
||||
@ -946,13 +946,13 @@ namespace KubernetesWorkflow
|
||||
|
||||
#endregion
|
||||
|
||||
public CrashWatcher CreateCrashWatcher(RunningContainer container)
|
||||
public ContainerCrashWatcher CreateCrashWatcher(RunningContainer container)
|
||||
{
|
||||
var containerName = container.Name;
|
||||
var podName = GetPodName(container);
|
||||
var recipeName = container.Recipe.Name;
|
||||
|
||||
return new CrashWatcher(log, cluster.GetK8sClientConfig(), containerName, podName, recipeName, K8sNamespace);
|
||||
return new ContainerCrashWatcher(log, cluster.GetK8sClientConfig(), containerName, podName, recipeName, K8sNamespace);
|
||||
}
|
||||
|
||||
private V1Pod[] FindPodsByLabel(string podLabel)
|
||||
|
||||
@ -13,7 +13,7 @@ namespace KubernetesWorkflow
|
||||
FutureContainers Start(int numberOfContainers, ILocation location, ContainerRecipeFactory recipeFactory, StartupConfig startupConfig);
|
||||
PodInfo GetPodInfo(RunningContainer container);
|
||||
PodInfo GetPodInfo(RunningPod pod);
|
||||
CrashWatcher CreateCrashWatcher(RunningContainer container);
|
||||
ContainerCrashWatcher CreateCrashWatcher(RunningContainer container);
|
||||
void Stop(RunningPod pod, bool waitTillStopped);
|
||||
void DownloadContainerLog(RunningContainer container, ILogHandler logHandler, int? tailLines = null, bool? previous = null);
|
||||
IDownloadedLog DownloadContainerLog(RunningContainer container, int? tailLines = null, bool? previous = null);
|
||||
@ -93,7 +93,7 @@ namespace KubernetesWorkflow
|
||||
return K8s(c => c.GetPodInfo(pod.StartResult.Deployment));
|
||||
}
|
||||
|
||||
public CrashWatcher CreateCrashWatcher(RunningContainer container)
|
||||
public ContainerCrashWatcher CreateCrashWatcher(RunningContainer container)
|
||||
{
|
||||
return K8s(c => c.CreateCrashWatcher(container));
|
||||
}
|
||||
@ -209,6 +209,7 @@ namespace KubernetesWorkflow
|
||||
var port = startResult.GetExternalServicePorts(recipe, tag);
|
||||
|
||||
return new Address(
|
||||
logName: $"{recipe.Name}:{tag}",
|
||||
startResult.Cluster.HostAddress,
|
||||
port.Number);
|
||||
}
|
||||
@ -220,6 +221,7 @@ namespace KubernetesWorkflow
|
||||
var port = startResult.GetInternalServicePorts(recipe, tag);
|
||||
|
||||
return new Address(
|
||||
logName: $"{serviceName}:{tag}",
|
||||
$"http://{serviceName}.{namespaceName}.svc.cluster.local",
|
||||
port.Number);
|
||||
}
|
||||
|
||||
@ -2,12 +2,14 @@
|
||||
{
|
||||
public class Address
|
||||
{
|
||||
public Address(string host, int port)
|
||||
public Address(string logName, string host, int port)
|
||||
{
|
||||
LogName = logName;
|
||||
Host = host;
|
||||
Port = port;
|
||||
}
|
||||
|
||||
public string LogName { get; }
|
||||
public string Host { get; }
|
||||
public int Port { get; }
|
||||
|
||||
|
||||
9
Framework/Utils/CrashWatcher.cs
Normal file
9
Framework/Utils/CrashWatcher.cs
Normal file
@ -0,0 +1,9 @@
|
||||
namespace Utils
|
||||
{
|
||||
public interface ICrashWatcher
|
||||
{
|
||||
void Start();
|
||||
void Stop();
|
||||
bool HasCrashed();
|
||||
}
|
||||
}
|
||||
@ -1,7 +1,5 @@
|
||||
using CodexOpenApi;
|
||||
using Core;
|
||||
using KubernetesWorkflow;
|
||||
using KubernetesWorkflow.Types;
|
||||
using Logging;
|
||||
using Newtonsoft.Json;
|
||||
using Utils;
|
||||
@ -12,20 +10,30 @@ namespace CodexPlugin
|
||||
{
|
||||
private readonly ILog log;
|
||||
private readonly IPluginTools tools;
|
||||
private readonly ICodexInstance instance;
|
||||
private readonly Mapper mapper = new Mapper();
|
||||
|
||||
public CodexAccess(IPluginTools tools, RunningPod container, CrashWatcher crashWatcher)
|
||||
public CodexAccess(IPluginTools tools, ICodexInstance instance, ICrashWatcher crashWatcher)
|
||||
{
|
||||
this.tools = tools;
|
||||
this.instance = instance;
|
||||
log = tools.GetLog();
|
||||
Container = container;
|
||||
CrashWatcher = crashWatcher;
|
||||
|
||||
CrashWatcher.Start();
|
||||
}
|
||||
|
||||
public RunningPod Container { get; }
|
||||
public CrashWatcher CrashWatcher { get; }
|
||||
public ICrashWatcher CrashWatcher { get; }
|
||||
|
||||
public string GetImageName()
|
||||
{
|
||||
return instance.ImageName;
|
||||
}
|
||||
|
||||
public DateTime GetStartUtc()
|
||||
{
|
||||
return instance.StartUtc;
|
||||
}
|
||||
|
||||
public DebugInfo GetDebugInfo()
|
||||
{
|
||||
@ -170,29 +178,35 @@ namespace CodexPlugin
|
||||
|
||||
public string GetName()
|
||||
{
|
||||
return Container.Name;
|
||||
return instance.Name;
|
||||
}
|
||||
|
||||
public PodInfo GetPodInfo()
|
||||
public Address GetDiscoveryEndpoint()
|
||||
{
|
||||
var workflow = tools.CreateWorkflow();
|
||||
return workflow.GetPodInfo(Container);
|
||||
return instance.DiscoveryEndpoint;
|
||||
//var info = codexAccess.GetPodInfo();
|
||||
//return new Address(
|
||||
// logName: $"{GetName()}:DiscoveryPort",
|
||||
// host: info.Ip,
|
||||
// port: Container.Recipe.GetPortByTag(CodexContainerRecipe.DiscoveryPortTag)!.Number
|
||||
//);
|
||||
}
|
||||
|
||||
public void DeleteRepoFolder()
|
||||
public void DeleteDataDirFolder()
|
||||
{
|
||||
try
|
||||
{
|
||||
var containerNumber = Container.Containers.First().Recipe.Number;
|
||||
var dataDir = $"datadir{containerNumber}";
|
||||
var workflow = tools.CreateWorkflow();
|
||||
workflow.ExecuteCommand(Container.Containers.First(), "rm", "-Rfv", $"/codex/{dataDir}/repo");
|
||||
Log("Deleted repo folder.");
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Log("Unable to delete repo folder: " + e);
|
||||
}
|
||||
//try
|
||||
//{
|
||||
// var containerNumber = Container.Containers.First().Recipe.Number;
|
||||
// var dataDir = $"datadir{containerNumber}";
|
||||
// var workflow = tools.CreateWorkflow();
|
||||
// workflow.ExecuteCommand(Container.Containers.First(), "rm", "-Rfv", $"/codex/{dataDir}/repo");
|
||||
// Log("Deleted repo folder.");
|
||||
//}
|
||||
//catch (Exception e)
|
||||
//{
|
||||
// Log("Unable to delete repo folder: " + e);
|
||||
//}
|
||||
instance.DeleteDataDirFolder();
|
||||
}
|
||||
|
||||
private T OnCodex<T>(Func<CodexApi, Task<T>> action)
|
||||
@ -223,7 +237,7 @@ namespace CodexPlugin
|
||||
}
|
||||
finally
|
||||
{
|
||||
CrashWatcher.HasContainerCrashed();
|
||||
CrashWatcher.HasCrashed();
|
||||
}
|
||||
}
|
||||
|
||||
@ -231,12 +245,12 @@ namespace CodexPlugin
|
||||
{
|
||||
return tools
|
||||
.CreateHttp(GetHttpId(), CheckContainerCrashed)
|
||||
.CreateEndpoint(GetAddress(), "/api/codex/v1/", Container.Name);
|
||||
.CreateEndpoint(GetAddress(), "/api/codex/v1/", GetName());
|
||||
}
|
||||
|
||||
private Address GetAddress()
|
||||
{
|
||||
return Container.Containers.Single().GetAddress(CodexContainerRecipe.ApiPortTag);
|
||||
return instance.ApiEndpoint;
|
||||
}
|
||||
|
||||
private string GetHttpId()
|
||||
@ -246,7 +260,7 @@ namespace CodexPlugin
|
||||
|
||||
private void CheckContainerCrashed(HttpClient client)
|
||||
{
|
||||
if (CrashWatcher.HasContainerCrashed()) throw new Exception($"Container {GetName()} has crashed.");
|
||||
if (CrashWatcher.HasCrashed()) throw new Exception($"Container {GetName()} has crashed.");
|
||||
}
|
||||
|
||||
private Retry CreateRetryConfig(string description, Action<Failure> onFailure)
|
||||
|
||||
14
ProjectPlugins/CodexPlugin/CodexInstance.cs
Normal file
14
ProjectPlugins/CodexPlugin/CodexInstance.cs
Normal file
@ -0,0 +1,14 @@
|
||||
using Utils;
|
||||
|
||||
namespace CodexPlugin
|
||||
{
|
||||
public interface ICodexInstance
|
||||
{
|
||||
string Name { get; }
|
||||
string ImageName { get; }
|
||||
DateTime StartUtc { get; }
|
||||
Address DiscoveryEndpoint { get; }
|
||||
Address ApiEndpoint { get; }
|
||||
void DeleteDataDirFolder();
|
||||
}
|
||||
}
|
||||
@ -2,15 +2,13 @@
|
||||
using Core;
|
||||
using FileUtils;
|
||||
using GethPlugin;
|
||||
using KubernetesWorkflow;
|
||||
using KubernetesWorkflow.Types;
|
||||
using Logging;
|
||||
using MetricsPlugin;
|
||||
using Utils;
|
||||
|
||||
namespace CodexPlugin
|
||||
{
|
||||
public interface ICodexNode : IHasContainer, IHasMetricsScrapeTarget, IHasEthAddress
|
||||
public interface ICodexNode : IHasMetricsScrapeTarget, IHasEthAddress
|
||||
{
|
||||
string GetName();
|
||||
string GetPeerId();
|
||||
@ -43,7 +41,7 @@ namespace CodexPlugin
|
||||
/// Warning! The node is not usable after this.
|
||||
/// TODO: Replace with delete-blocks debug call once available in Codex.
|
||||
/// </summary>
|
||||
void DeleteRepoFolder();
|
||||
void DeleteDataDirFolder();
|
||||
void Stop(bool waitTillStopped);
|
||||
bool HasCrashed();
|
||||
}
|
||||
@ -58,12 +56,13 @@ namespace CodexPlugin
|
||||
private readonly TransferSpeeds transferSpeeds;
|
||||
private string peerId = string.Empty;
|
||||
private string nodeId = string.Empty;
|
||||
private readonly CodexAccess codexAccess;
|
||||
|
||||
public CodexNode(IPluginTools tools, CodexAccess codexAccess, CodexNodeGroup group, IMarketplaceAccess marketplaceAccess, ICodexNodeHooks hooks, EthAccount? ethAccount)
|
||||
{
|
||||
this.tools = tools;
|
||||
this.ethAccount = ethAccount;
|
||||
CodexAccess = codexAccess;
|
||||
this.codexAccess = codexAccess;
|
||||
Group = group;
|
||||
Marketplace = marketplaceAccess;
|
||||
this.hooks = hooks;
|
||||
@ -75,7 +74,7 @@ namespace CodexPlugin
|
||||
|
||||
public void Awake()
|
||||
{
|
||||
hooks.OnNodeStarting(Container.Recipe.RecipeCreatedUtc, Container.Recipe.Image, ethAccount);
|
||||
hooks.OnNodeStarting(codexAccess.GetStartUtc(), codexAccess.GetImageName(), ethAccount);
|
||||
}
|
||||
|
||||
public void Initialize()
|
||||
@ -83,21 +82,17 @@ namespace CodexPlugin
|
||||
hooks.OnNodeStarted(peerId, nodeId);
|
||||
}
|
||||
|
||||
public RunningPod Pod { get { return CodexAccess.Container; } }
|
||||
|
||||
public RunningContainer Container { get { return Pod.Containers.Single(); } }
|
||||
public CodexAccess CodexAccess { get; }
|
||||
public CrashWatcher CrashWatcher { get => CodexAccess.CrashWatcher; }
|
||||
public CodexNodeGroup Group { get; }
|
||||
public IMarketplaceAccess Marketplace { get; }
|
||||
public DebugInfoVersion Version { get; private set; }
|
||||
public ITransferSpeeds TransferSpeeds { get => transferSpeeds; }
|
||||
|
||||
public IMetricsScrapeTarget MetricsScrapeTarget
|
||||
public Address MetricsScrapeTarget
|
||||
{
|
||||
get
|
||||
{
|
||||
return new MetricsScrapeTarget(CodexAccess.Container.Containers.First(), CodexContainerRecipe.MetricsPortTag);
|
||||
throw new Exception("todo");
|
||||
//return new MetricsScrapeTarget(CodexAccess.Container.Containers.First(), CodexContainerRecipe.MetricsPortTag);
|
||||
}
|
||||
}
|
||||
|
||||
@ -121,7 +116,7 @@ namespace CodexPlugin
|
||||
|
||||
public string GetName()
|
||||
{
|
||||
return Container.Name;
|
||||
return codexAccess.GetName();
|
||||
}
|
||||
|
||||
public string GetPeerId()
|
||||
@ -131,7 +126,7 @@ namespace CodexPlugin
|
||||
|
||||
public DebugInfo GetDebugInfo(bool log = false)
|
||||
{
|
||||
var debugInfo = CodexAccess.GetDebugInfo();
|
||||
var debugInfo = codexAccess.GetDebugInfo();
|
||||
if (log)
|
||||
{
|
||||
var known = string.Join(",", debugInfo.Table.Nodes.Select(n => n.PeerId));
|
||||
@ -142,12 +137,12 @@ namespace CodexPlugin
|
||||
|
||||
public string GetSpr()
|
||||
{
|
||||
return CodexAccess.GetSpr();
|
||||
return codexAccess.GetSpr();
|
||||
}
|
||||
|
||||
public DebugPeer GetDebugPeer(string peerId)
|
||||
{
|
||||
return CodexAccess.GetDebugPeer(peerId);
|
||||
return codexAccess.GetDebugPeer(peerId);
|
||||
}
|
||||
|
||||
public ContentId UploadFile(TrackedFile file)
|
||||
@ -172,7 +167,7 @@ namespace CodexPlugin
|
||||
var logMessage = $"Uploading file {file.Describe()} with contentType: '{input.ContentType}' and disposition: '{input.ContentDisposition}'...";
|
||||
var measurement = Stopwatch.Measure(log, logMessage, () =>
|
||||
{
|
||||
return CodexAccess.UploadFile(input, onFailure);
|
||||
return codexAccess.UploadFile(input, onFailure);
|
||||
});
|
||||
|
||||
var response = measurement.Value;
|
||||
@ -212,7 +207,7 @@ namespace CodexPlugin
|
||||
public LocalDataset DownloadStreamless(ContentId cid)
|
||||
{
|
||||
Log($"Downloading streamless '{cid}' (no-wait)");
|
||||
return CodexAccess.DownloadStreamless(cid);
|
||||
return codexAccess.DownloadStreamless(cid);
|
||||
}
|
||||
|
||||
public LocalDataset DownloadStreamlessWait(ContentId cid, ByteSize size)
|
||||
@ -222,7 +217,7 @@ namespace CodexPlugin
|
||||
var sw = Stopwatch.Measure(log, nameof(DownloadStreamlessWait), () =>
|
||||
{
|
||||
var startSpace = Space();
|
||||
var result = CodexAccess.DownloadStreamless(cid);
|
||||
var result = codexAccess.DownloadStreamless(cid);
|
||||
WaitUntilQuotaUsedIncreased(startSpace, size);
|
||||
return result;
|
||||
});
|
||||
@ -233,17 +228,17 @@ namespace CodexPlugin
|
||||
public LocalDataset DownloadManifestOnly(ContentId cid)
|
||||
{
|
||||
Log($"Downloading manifest-only '{cid}'");
|
||||
return CodexAccess.DownloadManifestOnly(cid);
|
||||
return codexAccess.DownloadManifestOnly(cid);
|
||||
}
|
||||
|
||||
public LocalDatasetList LocalFiles()
|
||||
{
|
||||
return CodexAccess.LocalFiles();
|
||||
return codexAccess.LocalFiles();
|
||||
}
|
||||
|
||||
public CodexSpace Space()
|
||||
{
|
||||
return CodexAccess.Space();
|
||||
return codexAccess.Space();
|
||||
}
|
||||
|
||||
public void ConnectToPeer(ICodexNode node)
|
||||
@ -252,31 +247,30 @@ namespace CodexPlugin
|
||||
|
||||
Log($"Connecting to peer {peer.GetName()}...");
|
||||
var peerInfo = node.GetDebugInfo();
|
||||
CodexAccess.ConnectToPeer(peerInfo.Id, GetPeerMultiAddresses(peer, peerInfo));
|
||||
codexAccess.ConnectToPeer(peerInfo.Id, GetPeerMultiAddresses(peer, peerInfo));
|
||||
|
||||
Log($"Successfully connected to peer {peer.GetName()}.");
|
||||
}
|
||||
|
||||
public void DeleteRepoFolder()
|
||||
public void DeleteDataDirFolder()
|
||||
{
|
||||
CodexAccess.DeleteRepoFolder();
|
||||
codexAccess.DeleteDataDirFolder();
|
||||
}
|
||||
|
||||
public void Stop(bool waitTillStopped)
|
||||
{
|
||||
Log("Stopping...");
|
||||
hooks.OnNodeStopping();
|
||||
|
||||
CrashWatcher.Stop();
|
||||
codexAccess.CrashWatcher.Stop();
|
||||
Group.Stop(this, waitTillStopped);
|
||||
}
|
||||
|
||||
public void EnsureOnlineGetVersionResponse()
|
||||
{
|
||||
var debugInfo = Time.Retry(CodexAccess.GetDebugInfo, "ensure online");
|
||||
var debugInfo = Time.Retry(codexAccess.GetDebugInfo, "ensure online");
|
||||
peerId = debugInfo.Id;
|
||||
nodeId = debugInfo.Table.LocalNode.NodeId;
|
||||
var nodeName = CodexAccess.Container.Name;
|
||||
var nodeName = codexAccess.Container.Name;
|
||||
|
||||
if (!debugInfo.Version.IsValid())
|
||||
{
|
||||
@ -292,16 +286,12 @@ namespace CodexPlugin
|
||||
|
||||
public Address GetDiscoveryEndpoint()
|
||||
{
|
||||
var info = CodexAccess.GetPodInfo();
|
||||
return new Address(
|
||||
host: info.Ip,
|
||||
port: Container.Recipe.GetPortByTag(CodexContainerRecipe.DiscoveryPortTag)!.Number
|
||||
);
|
||||
return codexAccess.GetDiscoveryEndpoint();
|
||||
}
|
||||
|
||||
public bool HasCrashed()
|
||||
{
|
||||
return CrashWatcher.HasContainerCrashed();
|
||||
return codexAccess.CrashWatcher.HasCrashed();
|
||||
}
|
||||
|
||||
public override string ToString()
|
||||
@ -311,13 +301,12 @@ namespace CodexPlugin
|
||||
|
||||
private string[] GetPeerMultiAddresses(CodexNode peer, DebugInfo peerInfo)
|
||||
{
|
||||
// The peer we want to connect is in a different pod.
|
||||
// We must replace the default IP with the pod IP in the multiAddress.
|
||||
var workflow = tools.CreateWorkflow();
|
||||
var podInfo = workflow.GetPodInfo(peer.Pod);
|
||||
var peerId = peer.GetDiscoveryEndpoint().Host
|
||||
.Replace("http://", "")
|
||||
.Replace("https://", "");
|
||||
|
||||
return peerInfo.Addrs.Select(a => a
|
||||
.Replace("0.0.0.0", podInfo.Ip))
|
||||
.Replace("0.0.0.0", peerId))
|
||||
.ToArray();
|
||||
}
|
||||
|
||||
@ -330,11 +319,11 @@ namespace CodexPlugin
|
||||
// Type of stream generated by openAPI client does not support timeouts.
|
||||
var start = DateTime.UtcNow;
|
||||
var cts = new CancellationTokenSource();
|
||||
var downloadTask = Task.Run(() =>
|
||||
var downloadTask = Task.Run((Action)(() =>
|
||||
{
|
||||
using var downloadStream = CodexAccess.DownloadFile(contentId, onFailure);
|
||||
downloadStream.CopyTo(fileStream);
|
||||
}, cts.Token);
|
||||
using var downloadStream = this.codexAccess.DownloadFile(contentId, onFailure);
|
||||
downloadStream.CopyTo((Stream)fileStream);
|
||||
}), cts.Token);
|
||||
|
||||
while (DateTime.UtcNow - start < timeout)
|
||||
{
|
||||
|
||||
@ -9,7 +9,7 @@ namespace CodexPlugin
|
||||
public interface ICodexNodeFactory
|
||||
{
|
||||
CodexNode CreateOnlineCodexNode(CodexAccess access, CodexNodeGroup group);
|
||||
CrashWatcher CreateCrashWatcher(RunningContainer c);
|
||||
ContainerCrashWatcher CreateCrashWatcher(RunningContainer c);
|
||||
}
|
||||
|
||||
public class CodexNodeFactory : ICodexNodeFactory
|
||||
@ -45,7 +45,7 @@ namespace CodexPlugin
|
||||
return ethAccount;
|
||||
}
|
||||
|
||||
public CrashWatcher CreateCrashWatcher(RunningContainer c)
|
||||
public ContainerCrashWatcher CreateCrashWatcher(RunningContainer c)
|
||||
{
|
||||
return tools.CreateWorkflow().CreateCrashWatcher(c);
|
||||
}
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
using Core;
|
||||
using KubernetesWorkflow.Types;
|
||||
using MetricsPlugin;
|
||||
using System.Collections;
|
||||
using Utils;
|
||||
|
||||
namespace CodexPlugin
|
||||
{
|
||||
@ -14,12 +14,12 @@ namespace CodexPlugin
|
||||
public class CodexNodeGroup : ICodexNodeGroup
|
||||
{
|
||||
private readonly CodexStarter starter;
|
||||
private CodexNode[] nodes;
|
||||
|
||||
public CodexNodeGroup(CodexStarter starter, IPluginTools tools, RunningPod[] containers, ICodexNodeFactory codexNodeFactory)
|
||||
public CodexNodeGroup(CodexStarter starter, IPluginTools tools, CodexNode[] nodes)
|
||||
{
|
||||
this.starter = starter;
|
||||
Containers = containers;
|
||||
Nodes = containers.Select(c => CreateOnlineCodexNode(c, tools, codexNodeFactory)).ToArray();
|
||||
this.nodes = nodes;
|
||||
Version = new DebugInfoVersion();
|
||||
}
|
||||
|
||||
@ -35,21 +35,18 @@ namespace CodexPlugin
|
||||
{
|
||||
starter.BringOffline(this, waitTillStopped);
|
||||
// Clear everything. Prevent accidental use.
|
||||
Nodes = Array.Empty<CodexNode>();
|
||||
Containers = null!;
|
||||
nodes = Array.Empty<CodexNode>();
|
||||
}
|
||||
|
||||
public void Stop(CodexNode node, bool waitTillStopped)
|
||||
{
|
||||
starter.Stop(node.Pod, waitTillStopped);
|
||||
Nodes = Nodes.Where(n => n != node).ToArray();
|
||||
Containers = Containers.Where(c => c != node.Pod).ToArray();
|
||||
starter.Stop(node, waitTillStopped);
|
||||
nodes = nodes.Where(n => n != node).ToArray();
|
||||
}
|
||||
|
||||
public RunningPod[] Containers { get; private set; }
|
||||
public CodexNode[] Nodes { get; private set; }
|
||||
public ICodexNode[] Nodes => nodes;
|
||||
public DebugInfoVersion Version { get; private set; }
|
||||
public IMetricsScrapeTarget[] ScrapeTargets => Nodes.Select(n => n.MetricsScrapeTarget).ToArray();
|
||||
public Address[] ScrapeTargets => Nodes.Select(n => n.MetricsScrapeTarget).ToArray();
|
||||
|
||||
public IEnumerator<ICodexNode> GetEnumerator()
|
||||
{
|
||||
@ -63,12 +60,12 @@ namespace CodexPlugin
|
||||
|
||||
public string Describe()
|
||||
{
|
||||
return $"group:[{Containers.Describe()}]";
|
||||
return $"group:[{string.Join(",", Nodes.Select(n => n.GetName()))}]";
|
||||
}
|
||||
|
||||
public void EnsureOnline()
|
||||
{
|
||||
foreach (var node in Nodes) node.EnsureOnlineGetVersionResponse();
|
||||
foreach (var node in nodes) node.EnsureOnlineGetVersionResponse();
|
||||
var versionResponses = Nodes.Select(n => n.Version);
|
||||
|
||||
var first = versionResponses.First();
|
||||
@ -79,16 +76,7 @@ namespace CodexPlugin
|
||||
}
|
||||
|
||||
Version = first;
|
||||
foreach (var node in Nodes) node.Initialize();
|
||||
}
|
||||
|
||||
private CodexNode CreateOnlineCodexNode(RunningPod c, IPluginTools tools, ICodexNodeFactory factory)
|
||||
{
|
||||
var watcher = factory.CreateCrashWatcher(c.Containers.Single());
|
||||
var access = new CodexAccess(tools, c, watcher);
|
||||
var node = factory.CreateOnlineCodexNode(access, this);
|
||||
node.Awake();
|
||||
return node;
|
||||
foreach (var node in nodes) node.Initialize();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -68,7 +68,7 @@ namespace CodexPlugin
|
||||
Log("Stopped.");
|
||||
}
|
||||
|
||||
public void Stop(RunningPod pod, bool waitTillStopped)
|
||||
public void Stop(CodexNode pod, bool waitTillStopped)
|
||||
{
|
||||
Log($"Stopping node...");
|
||||
var workflow = pluginTools.CreateWorkflow();
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
using Core;
|
||||
using KubernetesWorkflow.Types;
|
||||
using Logging;
|
||||
using Utils;
|
||||
|
||||
namespace MetricsPlugin
|
||||
{
|
||||
@ -11,7 +12,7 @@ namespace MetricsPlugin
|
||||
return Plugin(ci).DeployMetricsCollector(scrapeTargets.Select(t => t.MetricsScrapeTarget).ToArray(), scrapeInterval);
|
||||
}
|
||||
|
||||
public static RunningPod DeployMetricsCollector(this CoreInterface ci, TimeSpan scrapeInterval, params IMetricsScrapeTarget[] scrapeTargets)
|
||||
public static RunningPod DeployMetricsCollector(this CoreInterface ci, TimeSpan scrapeInterval, params Address[] scrapeTargets)
|
||||
{
|
||||
return Plugin(ci).DeployMetricsCollector(scrapeTargets, scrapeInterval);
|
||||
}
|
||||
@ -21,7 +22,7 @@ namespace MetricsPlugin
|
||||
return ci.WrapMetricsCollector(metricsPod, scrapeTarget.MetricsScrapeTarget);
|
||||
}
|
||||
|
||||
public static IMetricsAccess WrapMetricsCollector(this CoreInterface ci, RunningPod metricsPod, IMetricsScrapeTarget scrapeTarget)
|
||||
public static IMetricsAccess WrapMetricsCollector(this CoreInterface ci, RunningPod metricsPod, Address scrapeTarget)
|
||||
{
|
||||
return Plugin(ci).WrapMetricsCollectorDeployment(metricsPod, scrapeTarget);
|
||||
}
|
||||
@ -36,7 +37,7 @@ namespace MetricsPlugin
|
||||
return ci.GetMetricsFor(scrapeInterval, scrapeTargets.Select(t => t.MetricsScrapeTarget).ToArray());
|
||||
}
|
||||
|
||||
public static IMetricsAccess[] GetMetricsFor(this CoreInterface ci, TimeSpan scrapeInterval, params IMetricsScrapeTarget[] scrapeTargets)
|
||||
public static IMetricsAccess[] GetMetricsFor(this CoreInterface ci, TimeSpan scrapeInterval, params Address[] scrapeTargets)
|
||||
{
|
||||
var rc = ci.DeployMetricsCollector(scrapeInterval, scrapeTargets);
|
||||
return scrapeTargets.Select(t => ci.WrapMetricsCollector(rc, t)).ToArray();
|
||||
|
||||
@ -15,13 +15,13 @@ namespace MetricsPlugin
|
||||
public class MetricsAccess : IMetricsAccess
|
||||
{
|
||||
private readonly MetricsQuery query;
|
||||
private readonly IMetricsScrapeTarget target;
|
||||
private readonly Address target;
|
||||
|
||||
public MetricsAccess(MetricsQuery query, IMetricsScrapeTarget target)
|
||||
public MetricsAccess(MetricsQuery query, Address target)
|
||||
{
|
||||
this.query = query;
|
||||
this.target = target;
|
||||
TargetName = target.Container.Name;
|
||||
TargetName = $"'{target.Host}'";
|
||||
}
|
||||
|
||||
public string TargetName { get; }
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
using Core;
|
||||
using KubernetesWorkflow.Types;
|
||||
using Logging;
|
||||
using Utils;
|
||||
|
||||
namespace MetricsPlugin
|
||||
{
|
||||
@ -31,12 +32,12 @@ namespace MetricsPlugin
|
||||
{
|
||||
}
|
||||
|
||||
public RunningPod DeployMetricsCollector(IMetricsScrapeTarget[] scrapeTargets, TimeSpan scrapeInterval)
|
||||
public RunningPod DeployMetricsCollector(Address[] scrapeTargets, TimeSpan scrapeInterval)
|
||||
{
|
||||
return starter.CollectMetricsFor(scrapeTargets, scrapeInterval);
|
||||
}
|
||||
|
||||
public IMetricsAccess WrapMetricsCollectorDeployment(RunningPod runningPod, IMetricsScrapeTarget target)
|
||||
public IMetricsAccess WrapMetricsCollectorDeployment(RunningPod runningPod, Address target)
|
||||
{
|
||||
runningPod = SerializeGate.Gate(runningPod);
|
||||
return starter.CreateAccessForTarget(runningPod, target);
|
||||
|
||||
@ -3,6 +3,7 @@ using IdentityModel;
|
||||
using KubernetesWorkflow.Types;
|
||||
using Logging;
|
||||
using System.Globalization;
|
||||
using Utils;
|
||||
|
||||
namespace MetricsPlugin
|
||||
{
|
||||
@ -23,7 +24,7 @@ namespace MetricsPlugin
|
||||
|
||||
public RunningContainer RunningContainer { get; }
|
||||
|
||||
public Metrics GetMostRecent(string metricName, IMetricsScrapeTarget target)
|
||||
public Metrics GetMostRecent(string metricName, Address target)
|
||||
{
|
||||
var response = GetLastOverTime(metricName, GetInstanceStringForNode(target));
|
||||
if (response == null) throw new Exception($"Failed to get most recent metric: {metricName}");
|
||||
@ -53,7 +54,7 @@ namespace MetricsPlugin
|
||||
return result;
|
||||
}
|
||||
|
||||
public Metrics GetAllMetricsForNode(IMetricsScrapeTarget target)
|
||||
public Metrics GetAllMetricsForNode(Address target)
|
||||
{
|
||||
var instanceString = GetInstanceStringForNode(target);
|
||||
var response = endpoint.HttpGetJson<PrometheusQueryResponse>($"query?query={instanceString}{GetQueryTimeRange()}");
|
||||
@ -139,12 +140,12 @@ namespace MetricsPlugin
|
||||
};
|
||||
}
|
||||
|
||||
private string GetInstanceNameForNode(IMetricsScrapeTarget target)
|
||||
private string GetInstanceNameForNode(Address target)
|
||||
{
|
||||
return ScrapeTargetHelper.FormatTarget(log, target);
|
||||
return ScrapeTargetHelper.FormatTarget(target);
|
||||
}
|
||||
|
||||
private string GetInstanceStringForNode(IMetricsScrapeTarget target)
|
||||
private string GetInstanceStringForNode(Address target)
|
||||
{
|
||||
return "{instance=\"" + GetInstanceNameForNode(target) + "\"}";
|
||||
}
|
||||
@ -172,9 +173,9 @@ namespace MetricsPlugin
|
||||
return new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc).AddSeconds(unixSeconds);
|
||||
}
|
||||
|
||||
private void Log(IMetricsScrapeTarget target, string metricName, Metrics result)
|
||||
private void Log(Address target, string metricName, Metrics result)
|
||||
{
|
||||
Log($"{target.Container.Name} '{metricName}' = {result}");
|
||||
Log($"{target.LogName} '{metricName}' = {result}");
|
||||
}
|
||||
|
||||
private void Log(string metricName, Metrics result)
|
||||
@ -182,9 +183,9 @@ namespace MetricsPlugin
|
||||
Log($"'{metricName}' = {result}");
|
||||
}
|
||||
|
||||
private void Log(IMetricsScrapeTarget target, Metrics result)
|
||||
private void Log(Address target, Metrics result)
|
||||
{
|
||||
Log($"{target.Container.Name} => {result}");
|
||||
Log($"{target.LogName} => {result}");
|
||||
}
|
||||
|
||||
private void Log(string msg)
|
||||
|
||||
@ -1,32 +1,14 @@
|
||||
using KubernetesWorkflow.Types;
|
||||
using Utils;
|
||||
|
||||
namespace MetricsPlugin
|
||||
{
|
||||
public interface IMetricsScrapeTarget
|
||||
{
|
||||
RunningContainer Container { get; }
|
||||
string MetricsPortTag { get; }
|
||||
}
|
||||
|
||||
public interface IHasMetricsScrapeTarget
|
||||
{
|
||||
IMetricsScrapeTarget MetricsScrapeTarget { get; }
|
||||
Address MetricsScrapeTarget { get; }
|
||||
}
|
||||
|
||||
public interface IHasManyMetricScrapeTargets
|
||||
{
|
||||
IMetricsScrapeTarget[] ScrapeTargets { get; }
|
||||
}
|
||||
|
||||
public class MetricsScrapeTarget : IMetricsScrapeTarget
|
||||
{
|
||||
public MetricsScrapeTarget(RunningContainer container, string metricsPortTag)
|
||||
{
|
||||
Container = container;
|
||||
MetricsPortTag = metricsPortTag;
|
||||
}
|
||||
|
||||
public RunningContainer Container { get; }
|
||||
public string MetricsPortTag { get; }
|
||||
Address[] ScrapeTargets { get; }
|
||||
}
|
||||
}
|
||||
|
||||
@ -3,6 +3,7 @@ using KubernetesWorkflow;
|
||||
using KubernetesWorkflow.Types;
|
||||
using Logging;
|
||||
using System.Text;
|
||||
using Utils;
|
||||
|
||||
namespace MetricsPlugin
|
||||
{
|
||||
@ -16,7 +17,7 @@ namespace MetricsPlugin
|
||||
this.tools = tools;
|
||||
}
|
||||
|
||||
public RunningPod CollectMetricsFor(IMetricsScrapeTarget[] targets, TimeSpan scrapeInterval)
|
||||
public RunningPod CollectMetricsFor(Address[] targets, TimeSpan scrapeInterval)
|
||||
{
|
||||
if (!targets.Any()) throw new ArgumentException(nameof(targets) + " must not be empty.");
|
||||
|
||||
@ -32,7 +33,7 @@ namespace MetricsPlugin
|
||||
return runningContainers;
|
||||
}
|
||||
|
||||
public MetricsAccess CreateAccessForTarget(RunningPod metricsPod, IMetricsScrapeTarget target)
|
||||
public MetricsAccess CreateAccessForTarget(RunningPod metricsPod, Address target)
|
||||
{
|
||||
var metricsQuery = new MetricsQuery(tools, metricsPod.Containers.Single());
|
||||
return new MetricsAccess(metricsQuery, target);
|
||||
@ -48,7 +49,7 @@ namespace MetricsPlugin
|
||||
tools.GetLog().Log(msg);
|
||||
}
|
||||
|
||||
private string GeneratePrometheusConfig(IMetricsScrapeTarget[] targets, TimeSpan scrapeInterval)
|
||||
private string GeneratePrometheusConfig(Address[] targets, TimeSpan scrapeInterval)
|
||||
{
|
||||
var secs = Convert.ToInt32(scrapeInterval.TotalSeconds);
|
||||
if (secs < 1) throw new Exception("ScrapeInterval can't be < 1s");
|
||||
@ -74,19 +75,18 @@ namespace MetricsPlugin
|
||||
return Convert.ToBase64String(bytes);
|
||||
}
|
||||
|
||||
private string FormatTarget(IMetricsScrapeTarget target)
|
||||
private string FormatTarget(Address target)
|
||||
{
|
||||
return ScrapeTargetHelper.FormatTarget(tools.GetLog(), target);
|
||||
return ScrapeTargetHelper.FormatTarget(target);
|
||||
}
|
||||
}
|
||||
|
||||
public static class ScrapeTargetHelper
|
||||
{
|
||||
public static string FormatTarget(ILog log, IMetricsScrapeTarget target)
|
||||
public static string FormatTarget(Address target)
|
||||
{
|
||||
var a = target.Container.GetAddress(target.MetricsPortTag);
|
||||
var host = a.Host.Replace("http://", "").Replace("https://", "");
|
||||
return $"{host}:{a.Port}";
|
||||
var host = target.Host.Replace("http://", "").Replace("https://", "");
|
||||
return $"{host}:{target.Port}";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -47,7 +47,7 @@ public class ScalabilityTests : CodexDistTest
|
||||
|
||||
downloadedFile!.AssertIsEqual(testFile);
|
||||
|
||||
uploader.DeleteRepoFolder();
|
||||
uploader.DeleteDataDirFolder();
|
||||
uploader.Stop(true);
|
||||
|
||||
var otherDownloader = nodes.PickOneRandom();
|
||||
@ -55,8 +55,8 @@ public class ScalabilityTests : CodexDistTest
|
||||
|
||||
downloadedFile!.AssertIsEqual(testFile);
|
||||
|
||||
downloader.DeleteRepoFolder();
|
||||
otherDownloader.DeleteRepoFolder();
|
||||
downloader.DeleteDataDirFolder();
|
||||
otherDownloader.DeleteDataDirFolder();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user