Merge branch 'feature/multiple-container-addresses'

This commit is contained in:
benbierens 2023-10-20 08:31:45 +02:00
commit b1bd1de027
No known key found for this signature in database
GPG Key ID: FE44815D96D0A1AA
15 changed files with 109 additions and 75 deletions

View File

@ -5,11 +5,9 @@ namespace Core
public static class SerializeGate
{
/// <summary>
/// SerializeGate was added to help ensure deployment objects are serializable
/// and remain viable after deserialization.
/// SerializeGate was added to help ensure deployment objects are serializable and remain viable after deserialization.
/// Tools can be built on top of the core interface that rely on deployment objects being serializable.
/// Insert the serialization gate after deployment but before wrapping to ensure any future changes
/// don't break this requirement.
/// Insert the serialization gate after deployment but before wrapping to ensure any future changes don't break this requirement.
/// </summary>
public static T Gate<T>(T anything)
{

View File

@ -24,6 +24,8 @@
{
Name = $"ctnr{Number}";
}
if (exposedPorts.Any(p => string.IsNullOrEmpty(p.Tag))) throw new Exception("Port tags are required for all exposed ports.");
}
public string Name { get; }
@ -65,6 +67,12 @@
public int Number { get; }
public string Tag { get; }
public override string ToString()
{
if (string.IsNullOrEmpty(Tag)) return $"untagged-port={Number}";
return $"{Tag}={Number}";
}
}
public class EnvVar

View File

@ -50,12 +50,12 @@ namespace KubernetesWorkflow
protected int Index { get; private set; } = 0;
protected abstract void Initialize(StartupConfig config);
protected Port AddExposedPort(string tag = "")
protected Port AddExposedPort(string tag)
{
return AddExposedPort(factory.CreatePort(tag));
}
protected Port AddExposedPort(int number, string tag = "")
protected Port AddExposedPort(int number, string tag)
{
return AddExposedPort(factory.CreatePort(number, tag));
}
@ -67,7 +67,7 @@ namespace KubernetesWorkflow
return p;
}
protected void AddExposedPortAndVar(string name, string tag = "")
protected void AddExposedPortAndVar(string name, string tag)
{
AddEnvVar(name, AddExposedPort(tag));
}
@ -132,11 +132,6 @@ namespace KubernetesWorkflow
private Port AddExposedPort(Port port)
{
if (exposedPorts.Any())
{
throw new NotImplementedException("Current implementation only support 1 exposed port per container recipe. " +
$"Methods for determining container addresses in {nameof(StartupWorkflow)} currently rely on this constraint.");
}
exposedPorts.Add(port);
return port;
}

View File

@ -572,16 +572,15 @@ namespace KubernetesWorkflow
var readback = client.Run(c => c.ReadNamespacedService(serviceSpec.Metadata.Name, K8sNamespace));
foreach (var r in containerRecipes)
{
if (r.ExposedPorts.Any())
foreach (var port in r.ExposedPorts)
{
var firstExposedPort = r.ExposedPorts.First();
var portName = GetNameForPort(r, firstExposedPort);
var portName = GetNameForPort(r, port);
var matchingServicePorts = readback.Spec.Ports.Where(p => p.Name == portName);
if (matchingServicePorts.Any())
{
// These service ports belongs to this recipe.
var optionals = matchingServicePorts.Select(p => MapNodePortIfAble(p, portName));
var optionals = matchingServicePorts.Select(p => MapNodePortIfAble(p, port.Tag));
var ports = optionals.Where(p => p != null).Select(p => p!).ToArray();
result.Add(new ContainerRecipePortMapEntry(r.Number, ports));

View File

@ -16,18 +16,26 @@ namespace KubernetesWorkflow
internal static RunnerLocation DetermineRunnerLocation(RunningContainer container)
{
if (knownLocation != null) return knownLocation.Value;
knownLocation = PingForLocation(container);
return knownLocation.Value;
}
private static RunnerLocation PingForLocation(RunningContainer container)
{
if (PingHost(container.Pod.PodInfo.Ip))
{
knownLocation = RunnerLocation.InternalToCluster;
}
else if (PingHost(Format(container.ClusterExternalAddress)))
{
knownLocation = RunnerLocation.ExternalToCluster;
return RunnerLocation.InternalToCluster;
}
if (knownLocation == null) throw new Exception("Unable to determine location relative to kubernetes cluster.");
return knownLocation.Value;
foreach (var port in container.ContainerPorts)
{
if (PingHost(Format(port.ExternalAddress)))
{
return RunnerLocation.ExternalToCluster;
}
}
throw new Exception("Unable to determine location relative to kubernetes cluster.");
}
private static string Format(Address host)

View File

@ -24,37 +24,46 @@ namespace KubernetesWorkflow
public class RunningContainer
{
public RunningContainer(RunningPod pod, ContainerRecipe recipe, Port[] servicePorts, string name, Address clusterExternalAddress, Address clusterInternalAddress)
public RunningContainer(RunningPod pod, ContainerRecipe recipe, Port[] servicePorts, string name, ContainerPort[] containerPorts)
{
Pod = pod;
Recipe = recipe;
ServicePorts = servicePorts;
Name = name;
ClusterExternalAddress = clusterExternalAddress;
ClusterInternalAddress = clusterInternalAddress;
ContainerPorts = containerPorts;
}
public string Name { get; }
public RunningPod Pod { get; }
public ContainerRecipe Recipe { get; }
public Port[] ServicePorts { get; }
public Address ClusterExternalAddress { get; }
public Address ClusterInternalAddress { get; }
public ContainerPort[] ContainerPorts { get; }
[JsonIgnore]
public Address Address
public Address GetAddress(string portTag)
{
get
var containerPort = ContainerPorts.Single(c => c.Port.Tag == portTag);
if (RunnerLocationUtils.DetermineRunnerLocation(this) == RunnerLocation.InternalToCluster)
{
if (RunnerLocationUtils.DetermineRunnerLocation(this) == RunnerLocation.InternalToCluster)
{
return ClusterInternalAddress;
}
return ClusterExternalAddress;
return containerPort.InternalAddress;
}
return containerPort.ExternalAddress;
}
}
public class ContainerPort
{
public ContainerPort(Port port, Address externalAddress, Address internalAddress)
{
Port = port;
ExternalAddress = externalAddress;
InternalAddress = internalAddress;
}
public Port Port { get; }
public Address ExternalAddress { get; }
public Address InternalAddress { get; }
}
public static class RunningContainersExtensions
{
public static RunningContainer[] Containers(this RunningContainers[] runningContainers)

View File

@ -19,12 +19,10 @@
public Port[] GetServicePortsForContainerRecipe(ContainerRecipe containerRecipe)
{
if (PortMapEntries.Any(p => p.ContainerNumber == containerRecipe.Number))
{
return PortMapEntries.Single(p => p.ContainerNumber == containerRecipe.Number).Ports;
}
return Array.Empty<Port>();
return PortMapEntries
.Where(p => p.ContainerNumber == containerRecipe.Number)
.SelectMany(p => p.Ports)
.ToArray();
}
}

View File

@ -118,8 +118,7 @@ namespace KubernetesWorkflow
var name = GetContainerName(r, startupConfig);
return new RunningContainer(runningPod, r, servicePorts, name,
GetContainerExternalAddress(runningPod, servicePorts),
GetContainerInternalAddress(r));
CreateContainerPorts(runningPod, r, servicePorts));
}).ToArray();
}
@ -137,35 +136,39 @@ namespace KubernetesWorkflow
}
}
private Address GetContainerExternalAddress(RunningPod pod, Port[] servicePorts)
private ContainerPort[] CreateContainerPorts(RunningPod pod, ContainerRecipe recipe, Port[] servicePorts)
{
return new Address(
pod.Cluster.HostAddress,
GetServicePort(servicePorts));
var result = new List<ContainerPort>();
foreach (var exposedPort in recipe.ExposedPorts)
{
result.Add(new ContainerPort(
exposedPort,
GetContainerExternalAddress(pod, servicePorts, exposedPort),
GetContainerInternalAddress(exposedPort)));
}
return result.ToArray();
}
private Address GetContainerInternalAddress(ContainerRecipe recipe)
private static Address GetContainerExternalAddress(RunningPod pod, Port[] servicePorts, Port exposedPort)
{
var servicePort = servicePorts.Single(p => p.Tag == exposedPort.Tag);
return new Address(
pod.Cluster.HostAddress,
servicePort.Number);
}
private Address GetContainerInternalAddress(Port exposedPort)
{
var serviceName = "service-" + numberSource.WorkflowNumber;
var port = GetInternalPort(recipe);
var port = exposedPort.Number;
return new Address(
$"http://{serviceName}.{k8sNamespace}.svc.cluster.local",
port);
}
private static int GetServicePort(Port[] servicePorts)
{
if (servicePorts.Any()) return servicePorts.First().Number;
return 0;
}
private static int GetInternalPort(ContainerRecipe recipe)
{
if (recipe.ExposedPorts.Any()) return recipe.ExposedPorts.First().Number;
return 0;
}
private ContainerRecipe[] CreateRecipes(int numberOfContainers, ContainerRecipeFactory recipeFactory, StartupConfig startupConfig)
{
log.Debug();

View File

@ -10,5 +10,10 @@
public string Host { get; }
public int Port { get; }
public override string ToString()
{
return $"{Host}:{Port}";
}
}
}

View File

@ -1,5 +1,6 @@
using Core;
using KubernetesWorkflow;
using Utils;
namespace CodexPlugin
{
@ -98,12 +99,17 @@ namespace CodexPlugin
private IHttp Http()
{
return tools.CreateHttp(Container.Address, baseUrl: "/api/codex/v1", CheckContainerCrashed, Container.Name);
return tools.CreateHttp(GetAddress(), baseUrl: "/api/codex/v1", CheckContainerCrashed, Container.Name);
}
private IHttp LongHttp()
{
return tools.CreateHttp(Container.Address, baseUrl: "/api/codex/v1", CheckContainerCrashed, new LongTimeSet(), Container.Name);
return tools.CreateHttp(GetAddress(), baseUrl: "/api/codex/v1", CheckContainerCrashed, new LongTimeSet(), Container.Name);
}
private Address GetAddress()
{
return Container.GetAddress(CodexContainerRecipe.ApiPortTag);
}
private void CheckContainerCrashed(HttpClient client)

View File

@ -8,8 +8,10 @@ namespace CodexPlugin
private readonly MarketplaceStarter marketplaceStarter = new MarketplaceStarter();
private const string DefaultDockerImage = "codexstorage/nim-codex:latest-dist-tests";
public const string MetricsPortTag = "metrics_port";
public const string DiscoveryPortTag = "discovery-port";
public const string ApiPortTag = "codex_api_port";
public const string ListenPortTag = "codex_listen_port";
public const string MetricsPortTag = "codex_metrics_port";
public const string DiscoveryPortTag = "codex_discovery_port";
// Used by tests for time-constraint assertions.
public static readonly TimeSpan MaxUploadTimePerMegabyte = TimeSpan.FromSeconds(2.0);
@ -27,20 +29,20 @@ namespace CodexPlugin
var config = startupConfig.Get<CodexStartupConfig>();
AddExposedPortAndVar("CODEX_API_PORT");
AddExposedPortAndVar("CODEX_API_PORT", ApiPortTag);
AddEnvVar("CODEX_API_BINDADDR", "0.0.0.0");
var dataDir = $"datadir{ContainerNumber}";
AddEnvVar("CODEX_DATA_DIR", dataDir);
AddVolume($"codex/{dataDir}", GetVolumeCapacity(config));
AddInternalPortAndVar("CODEX_DISC_PORT", DiscoveryPortTag);
AddExposedPortAndVar("CODEX_DISC_PORT", DiscoveryPortTag);
AddEnvVar("CODEX_LOG_LEVEL", config.LogLevelWithTopics());
// This makes the node announce itself to its local (pod) IP address.
AddEnvVar("NAT_IP_AUTO", "true");
var listenPort = AddInternalPort();
var listenPort = AddExposedPort(ListenPortTag);
AddEnvVar("CODEX_LISTEN_ADDRS", $"/ip4/0.0.0.0/tcp/{listenPort.Number}");
if (!string.IsNullOrEmpty(config.BootstrapSpr))

View File

@ -73,7 +73,7 @@ namespace GethPlugin
private NethereumInteraction StartInteraction()
{
var address = StartResult.Container.Address;
var address = StartResult.Container.GetAddress(GethContainerRecipe.HttpPortTag);
var account = Account;
var creator = new NethereumInteractionCreator(log, address.Host, address.Port, account.PrivateKey);

View File

@ -13,7 +13,7 @@ namespace MetricsPlugin
public MetricsQuery(IPluginTools tools, RunningContainer runningContainer)
{
RunningContainer = runningContainer;
http = tools.CreateHttp(RunningContainer.Address, "api/v1");
http = tools.CreateHttp(RunningContainer.GetAddress(PrometheusContainerRecipe.PortTag), "api/v1");
log = tools.GetLog();
}

View File

@ -7,11 +7,13 @@ namespace MetricsPlugin
public override string AppName => "prometheus";
public override string Image => "codexstorage/dist-tests-prometheus:latest";
public const string PortTag = "prometheus_port_tag";
protected override void Initialize(StartupConfig startupConfig)
{
var config = startupConfig.Get<PrometheusStartupConfig>();
AddExposedPortAndVar("PROM_PORT");
AddExposedPortAndVar("PROM_PORT", PortTag);
AddEnvVar("PROM_CONFIG", config.PrometheusConfigBase64);
}
}

View File

@ -87,7 +87,8 @@ namespace ContinuousTests
{
cancelToken.ThrowIfCancellationRequested();
log.Log($"Checking {n.Container.Name} @ '{n.Container.Address.Host}:{n.Container.Address.Port}'...");
var address = n.Container.GetAddress(CodexContainerRecipe.ApiPortTag);
log.Log($"Checking {n.Container.Name} @ '{address}'...");
if (EnsureOnline(log, n))
{
@ -95,7 +96,7 @@ namespace ContinuousTests
}
else
{
log.Error($"No response from '{n.Container.Address.Host}'.");
log.Error($"No response from '{address}'.");
pass = false;
}
}