2024-07-26 08:39:27 +02:00
using CodexPlugin.Hooks ;
using Core ;
2023-09-11 10:43:27 +02:00
using FileUtils ;
2023-09-19 11:51:59 +02:00
using GethPlugin ;
2023-09-13 12:24:46 +02:00
using KubernetesWorkflow ;
2023-11-12 10:07:23 +01:00
using KubernetesWorkflow.Types ;
2024-03-26 11:39:59 +01:00
using Logging ;
2023-09-13 11:59:21 +02:00
using MetricsPlugin ;
2023-08-02 15:11:27 +02:00
using Utils ;
2023-04-13 09:33:10 +02:00
2023-09-11 11:59:33 +02:00
namespace CodexPlugin
2023-04-13 09:33:10 +02:00
{
2023-09-19 11:51:59 +02:00
public interface ICodexNode : IHasContainer , IHasMetricsScrapeTarget , IHasEthAddress
2023-04-13 09:33:10 +02:00
{
2023-04-19 09:19:06 +02:00
string GetName ( ) ;
2024-07-26 08:39:27 +02:00
string GetPeerId ( ) ;
2024-03-25 15:46:45 +01:00
DebugInfo GetDebugInfo ( ) ;
DebugPeer GetDebugPeer ( string peerId ) ;
2023-09-12 13:32:06 +02:00
ContentId UploadFile ( TrackedFile file ) ;
2024-06-06 09:54:50 +02:00
ContentId UploadFile ( TrackedFile file , Action < Failure > onFailure ) ;
2023-09-12 13:32:06 +02:00
TrackedFile ? DownloadContent ( ContentId contentId , string fileLabel = "" ) ;
2024-06-06 09:54:50 +02:00
TrackedFile ? DownloadContent ( ContentId contentId , Action < Failure > onFailure , string fileLabel = "" ) ;
2024-03-26 14:07:06 +01:00
LocalDatasetList LocalFiles ( ) ;
2024-06-05 09:20:00 +02:00
CodexSpace Space ( ) ;
2023-09-19 11:51:59 +02:00
void ConnectToPeer ( ICodexNode node ) ;
2024-03-26 10:03:52 +01:00
DebugInfoVersion Version { get ; }
2023-09-20 08:45:55 +02:00
IMarketplaceAccess Marketplace { get ; }
2023-09-20 12:55:09 +02:00
CrashWatcher CrashWatcher { get ; }
2023-11-06 14:33:47 +01:00
PodInfo GetPodInfo ( ) ;
2023-12-06 09:59:45 +01:00
ITransferSpeeds TransferSpeeds { get ; }
2024-05-24 15:34:42 +02:00
EthAccount EthAccount { get ; }
2024-06-13 08:51:52 +02:00
/// <summary>
/// Warning! The node is not usable after this.
/// TODO: Replace with delete-blocks debug call once available in Codex.
/// </summary>
void DeleteRepoFolder ( ) ;
2024-03-13 10:57:26 +01:00
void Stop ( bool waitTillStopped ) ;
2023-04-13 09:33:10 +02:00
}
2023-09-19 11:51:59 +02:00
public class CodexNode : ICodexNode
2023-04-13 09:33:10 +02:00
{
private const string UploadFailedMessage = "Unable to store block" ;
2024-08-01 10:39:06 +02:00
private readonly ILog log ;
2023-09-12 11:43:46 +02:00
private readonly IPluginTools tools ;
2024-07-26 08:39:27 +02:00
private readonly ICodexNodeHooks hooks ;
2024-05-24 15:34:42 +02:00
private readonly EthAccount ? ethAccount ;
2023-12-06 09:59:45 +01:00
private readonly TransferSpeeds transferSpeeds ;
2024-07-26 08:39:27 +02:00
private string peerId = string . Empty ;
2024-08-13 13:48:54 +02:00
private string nodeId = string . Empty ;
2023-04-13 09:33:10 +02:00
2024-07-26 08:39:27 +02:00
public CodexNode ( IPluginTools tools , CodexAccess codexAccess , CodexNodeGroup group , IMarketplaceAccess marketplaceAccess , ICodexNodeHooks hooks , EthAccount ? ethAccount )
2023-04-13 09:33:10 +02:00
{
2023-09-12 11:43:46 +02:00
this . tools = tools ;
2024-05-24 15:34:42 +02:00
this . ethAccount = ethAccount ;
2023-04-13 09:33:10 +02:00
CodexAccess = codexAccess ;
Group = group ;
2023-09-20 08:45:55 +02:00
Marketplace = marketplaceAccess ;
2024-07-26 08:39:27 +02:00
this . hooks = hooks ;
2024-03-26 10:03:52 +01:00
Version = new DebugInfoVersion ( ) ;
2023-12-06 09:59:45 +01:00
transferSpeeds = new TransferSpeeds ( ) ;
2024-08-01 10:39:06 +02:00
log = new LogPrefixer ( tools . GetLog ( ) , $"{GetName()} " ) ;
2023-04-13 09:33:10 +02:00
}
2024-07-29 10:16:37 +02:00
public void Awake ( )
{
2024-08-20 11:44:15 +02:00
hooks . OnNodeStarting ( Container . Recipe . RecipeCreatedUtc , Container . Recipe . Image , ethAccount ) ;
2024-07-29 10:16:37 +02:00
}
2024-07-26 08:39:27 +02:00
public void Initialize ( )
{
2024-08-13 13:48:54 +02:00
hooks . OnNodeStarted ( peerId , nodeId ) ;
2024-07-26 08:39:27 +02:00
}
2024-04-13 17:09:17 +03:00
public RunningPod Pod { get { return CodexAccess . Container ; } }
public RunningContainer Container { get { return Pod . Containers . Single ( ) ; } }
2023-04-13 09:33:10 +02:00
public CodexAccess CodexAccess { get ; }
2023-09-20 12:55:09 +02:00
public CrashWatcher CrashWatcher { get = > CodexAccess . CrashWatcher ; }
2023-04-13 09:33:10 +02:00
public CodexNodeGroup Group { get ; }
2023-09-20 08:45:55 +02:00
public IMarketplaceAccess Marketplace { get ; }
2024-03-26 10:03:52 +01:00
public DebugInfoVersion Version { get ; private set ; }
2023-12-06 09:59:45 +01:00
public ITransferSpeeds TransferSpeeds { get = > transferSpeeds ; }
2024-03-26 10:03:52 +01:00
2023-09-13 11:59:21 +02:00
public IMetricsScrapeTarget MetricsScrapeTarget
{
get
{
2024-04-13 17:09:17 +03:00
return new MetricsScrapeTarget ( CodexAccess . Container . Containers . First ( ) , CodexContainerRecipe . MetricsPortTag ) ;
2023-09-13 11:59:21 +02:00
}
}
2024-03-26 10:03:52 +01:00
2023-09-20 10:13:29 +02:00
public EthAddress EthAddress
2023-09-19 11:51:59 +02:00
{
get
{
2024-05-24 15:34:42 +02:00
EnsureMarketplace ( ) ;
return ethAccount ! . EthAddress ;
}
}
public EthAccount EthAccount
{
get
{
EnsureMarketplace ( ) ;
return ethAccount ! ;
2023-09-19 11:51:59 +02:00
}
}
2023-04-13 09:33:10 +02:00
public string GetName ( )
{
2024-04-19 11:52:39 +02:00
return Container . Name ;
2023-04-13 09:33:10 +02:00
}
2024-07-26 08:39:27 +02:00
public string GetPeerId ( )
{
return peerId ;
}
2024-03-25 15:46:45 +01:00
public DebugInfo GetDebugInfo ( )
2023-04-13 09:33:10 +02:00
{
2023-06-29 16:07:49 +02:00
var debugInfo = CodexAccess . GetDebugInfo ( ) ;
2024-03-26 10:03:52 +01:00
var known = string . Join ( "," , debugInfo . Table . Nodes . Select ( n = > n . PeerId ) ) ;
2024-06-03 11:42:52 +02:00
Log ( $"Got DebugInfo with id: {debugInfo.Id}. This node knows: [{known}]" ) ;
2023-04-19 14:57:00 +02:00
return debugInfo ;
2023-04-13 09:33:10 +02:00
}
2024-03-25 15:46:45 +01:00
public DebugPeer GetDebugPeer ( string peerId )
2023-05-11 12:44:53 +02:00
{
2023-06-29 16:07:49 +02:00
return CodexAccess . GetDebugPeer ( peerId ) ;
2023-05-11 12:44:53 +02:00
}
2023-09-12 13:32:06 +02:00
public ContentId UploadFile ( TrackedFile file )
2023-04-13 09:33:10 +02:00
{
2024-06-06 09:54:50 +02:00
return UploadFile ( file , DoNothing ) ;
}
public ContentId UploadFile ( TrackedFile file , Action < Failure > onFailure )
2023-04-13 09:33:10 +02:00
{
2023-09-12 11:43:46 +02:00
using var fileStream = File . OpenRead ( file . Filename ) ;
2024-08-01 09:09:30 +02:00
var uniqueId = Guid . NewGuid ( ) . ToString ( ) ;
var size = file . GetFilesize ( ) ;
hooks . OnFileUploading ( uniqueId , size ) ;
2023-06-07 08:30:10 +02:00
2024-08-01 14:50:25 +02:00
var logMessage = $"Uploading file {file.Describe()}..." ;
2024-08-01 10:39:06 +02:00
var measurement = Stopwatch . Measure ( log , logMessage , ( ) = >
2023-09-12 11:43:46 +02:00
{
2024-06-06 09:54:50 +02:00
return CodexAccess . UploadFile ( fileStream , onFailure ) ;
2023-09-12 11:43:46 +02:00
} ) ;
2023-06-07 08:30:10 +02:00
2023-12-06 09:59:45 +01:00
var response = measurement . Value ;
2024-08-01 09:09:30 +02:00
transferSpeeds . AddUploadSample ( size , measurement . Duration ) ;
2023-12-06 09:59:45 +01:00
2023-09-20 10:51:47 +02:00
if ( string . IsNullOrEmpty ( response ) ) FrameworkAssert . Fail ( "Received empty response." ) ;
if ( response . StartsWith ( UploadFailedMessage ) ) FrameworkAssert . Fail ( "Node failed to store block." ) ;
2023-07-11 08:19:14 +02:00
2024-08-01 14:50:25 +02:00
Log ( $"Uploaded file {file.Describe()}. Received contentId: '{response}'." ) ;
2024-06-12 15:28:08 +02:00
2024-07-26 08:39:27 +02:00
var cid = new ContentId ( response ) ;
2024-08-01 09:09:30 +02:00
hooks . OnFileUploaded ( uniqueId , size , cid ) ;
2024-07-26 08:39:27 +02:00
return cid ;
2023-04-13 09:33:10 +02:00
}
2023-09-12 13:32:06 +02:00
public TrackedFile ? DownloadContent ( ContentId contentId , string fileLabel = "" )
2024-06-06 09:54:50 +02:00
{
return DownloadContent ( contentId , DoNothing , fileLabel ) ;
}
public TrackedFile ? DownloadContent ( ContentId contentId , Action < Failure > onFailure , string fileLabel = "" )
2023-04-13 09:33:10 +02:00
{
2023-09-13 10:03:11 +02:00
var file = tools . GetFileManager ( ) . CreateEmptyFile ( fileLabel ) ;
2024-08-01 10:39:06 +02:00
hooks . OnFileDownloading ( contentId ) ;
2024-08-01 11:19:05 +02:00
Log ( $"Downloading '{contentId}'..." ) ;
2024-08-01 10:39:06 +02:00
2024-08-01 11:19:05 +02:00
var logMessage = $"Downloaded '{contentId}' to '{file.Filename}'" ;
2024-08-01 10:39:06 +02:00
var measurement = Stopwatch . Measure ( log , logMessage , ( ) = > DownloadToFile ( contentId . Id , file , onFailure ) ) ;
2024-08-01 09:09:30 +02:00
var size = file . GetFilesize ( ) ;
transferSpeeds . AddDownloadSample ( size , measurement ) ;
hooks . OnFileDownloaded ( size , contentId ) ;
2024-08-01 10:39:06 +02:00
2023-09-12 11:43:46 +02:00
return file ;
2023-04-13 09:33:10 +02:00
}
2024-03-26 14:07:06 +01:00
public LocalDatasetList LocalFiles ( )
2023-11-10 08:20:08 +01:00
{
2024-03-26 11:39:59 +01:00
return CodexAccess . LocalFiles ( ) ;
2023-11-10 08:20:08 +01:00
}
2024-06-05 09:20:00 +02:00
public CodexSpace Space ( )
{
return CodexAccess . Space ( ) ;
}
2023-09-19 11:51:59 +02:00
public void ConnectToPeer ( ICodexNode node )
2023-04-13 09:33:10 +02:00
{
2023-09-19 11:51:59 +02:00
var peer = ( CodexNode ) node ;
2023-04-13 09:33:10 +02:00
Log ( $"Connecting to peer {peer.GetName()}..." ) ;
var peerInfo = node . GetDebugInfo ( ) ;
2024-03-26 12:14:02 +01:00
CodexAccess . ConnectToPeer ( peerInfo . Id , GetPeerMultiAddresses ( peer , peerInfo ) ) ;
2023-04-13 09:33:10 +02:00
Log ( $"Successfully connected to peer {peer.GetName()}." ) ;
}
2023-11-06 14:33:47 +01:00
public PodInfo GetPodInfo ( )
{
return CodexAccess . GetPodInfo ( ) ;
}
2024-06-13 08:51:52 +02:00
public void DeleteRepoFolder ( )
{
CodexAccess . DeleteRepoFolder ( ) ;
}
2024-03-13 10:57:26 +01:00
public void Stop ( bool waitTillStopped )
2023-04-25 12:52:11 +02:00
{
2024-06-12 15:28:08 +02:00
Log ( "Stopping..." ) ;
2024-07-26 08:39:27 +02:00
hooks . OnNodeStopping ( ) ;
2024-04-13 17:09:17 +03:00
CrashWatcher . Stop ( ) ;
2024-04-13 17:20:23 +03:00
Group . Stop ( this , waitTillStopped ) ;
2023-04-25 12:52:11 +02:00
}
2023-07-31 11:51:29 +02:00
public void EnsureOnlineGetVersionResponse ( )
{
2023-08-02 15:11:27 +02:00
var debugInfo = Time . Retry ( CodexAccess . GetDebugInfo , "ensure online" ) ;
2024-07-26 08:39:27 +02:00
peerId = debugInfo . Id ;
2024-08-13 13:48:54 +02:00
nodeId = debugInfo . Table . LocalNode . NodeId ;
2023-07-31 11:51:29 +02:00
var nodeName = CodexAccess . Container . Name ;
2024-03-26 10:03:52 +01:00
if ( ! debugInfo . Version . IsValid ( ) )
2023-07-31 11:51:29 +02:00
{
2024-03-26 10:03:52 +01:00
throw new Exception ( $"Invalid version information received from Codex node {GetName()}: {debugInfo.Version}" ) ;
2023-07-31 11:51:29 +02:00
}
2024-07-26 08:39:27 +02:00
log . AddStringReplace ( peerId , nodeName ) ;
2024-07-26 09:14:46 +02:00
log . AddStringReplace ( CodexUtils . ToShortId ( peerId ) , nodeName ) ;
2024-03-26 10:03:52 +01:00
log . AddStringReplace ( debugInfo . Table . LocalNode . NodeId , nodeName ) ;
2024-07-26 09:14:46 +02:00
log . AddStringReplace ( CodexUtils . ToShortId ( debugInfo . Table . LocalNode . NodeId ) , nodeName ) ;
2024-03-26 10:03:52 +01:00
Version = debugInfo . Version ;
2023-07-31 11:51:29 +02:00
}
2024-09-24 10:10:59 +02:00
public override string ToString ( )
{
return $"CodexNode:{GetName()}" ;
}
2024-03-26 12:14:02 +01:00
private string [ ] GetPeerMultiAddresses ( CodexNode peer , DebugInfo peerInfo )
2023-04-13 09:33:10 +02:00
{
// The peer we want to connect is in a different pod.
// We must replace the default IP with the pod IP in the multiAddress.
2023-11-06 14:33:47 +01:00
var workflow = tools . CreateWorkflow ( ) ;
2024-04-13 17:09:17 +03:00
var podInfo = workflow . GetPodInfo ( peer . Pod ) ;
2023-11-06 14:33:47 +01:00
2024-03-26 12:14:02 +01:00
return peerInfo . Addrs . Select ( a = > a
. Replace ( "0.0.0.0" , podInfo . Ip ) )
. ToArray ( ) ;
2023-04-13 09:33:10 +02:00
}
2024-06-06 09:54:50 +02:00
private void DownloadToFile ( string contentId , TrackedFile file , Action < Failure > onFailure )
2023-04-13 09:33:10 +02:00
{
using var fileStream = File . OpenWrite ( file . Filename ) ;
2023-04-19 14:57:00 +02:00
try
{
2024-06-06 09:54:50 +02:00
using var downloadStream = CodexAccess . DownloadFile ( contentId , onFailure ) ;
2023-04-19 14:57:00 +02:00
downloadStream . CopyTo ( fileStream ) ;
}
catch
{
Log ( $"Failed to download file '{contentId}'." ) ;
throw ;
}
2023-04-13 09:33:10 +02:00
}
2024-05-24 15:34:42 +02:00
private void EnsureMarketplace ( )
{
if ( ethAccount = = null ) throw new Exception ( "Marketplace is not enabled for this Codex node. Please start it with the option '.EnableMarketplace(...)' to enable it." ) ;
}
2023-04-13 09:33:10 +02:00
private void Log ( string msg )
{
2024-08-01 10:39:06 +02:00
log . Log ( msg ) ;
2023-04-13 09:33:10 +02:00
}
2024-06-06 09:54:50 +02:00
private void DoNothing ( Failure failure )
{
}
2023-04-13 09:33:10 +02:00
}
}