Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/SeqCli/Config/Forwarder/SeqCliForwarderConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ class SeqCliForwarderConfig
public SeqCliForwarderStorageConfig Storage { get; set; } = new();
public SeqCliForwarderDiagnosticConfig Diagnostics { get; set; } = new();
public SeqCliForwarderApiConfig Api { get; set; } = new();
public bool UseApiKeyForwarding { get; set; }
}
2 changes: 1 addition & 1 deletion src/SeqCli/Config/KeyValueSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static void Set(SeqCliConfig config, string key, string? value)
// would be more robust.
var targetProperty = receiver.GetType().GetTypeInfo().DeclaredProperties
.Where(p => p is { CanRead: true, CanWrite: true } && p.GetMethod!.IsPublic && p.SetMethod!.IsPublic && !p.GetMethod.IsStatic)
.SingleOrDefault(p => Camelize(p.Name) == steps[^1]);
.SingleOrDefault(p => Camelize(GetUserFacingName(p)) == steps[^1]);

if (targetProperty == null)
throw new ArgumentException("The key could not be found; run `seqcli config list` to view all keys.");
Expand Down
90 changes: 90 additions & 0 deletions src/SeqCli/Forwarder/Channel/ApiKeyForwardingChannelWrapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Seq.Api;
using SeqCli.Config;
using SeqCli.Forwarder.Filesystem.System;
using Serilog;

namespace SeqCli.Forwarder.Channel;

class ApiKeyForwardingChannelWrapper : ForwardingChannelWrapper
{
readonly Dictionary<string, ForwardingChannel> _channelsByApiKey = new();
const string EmptyApiKeyChannelId = "EmptyApiKey";

public ApiKeyForwardingChannelWrapper(string bufferPath, SeqConnection connection, SeqCliConfig config) : base(bufferPath, connection, config)
{
LoadChannels();
}

// Start forwarding channels found on the file system.
void LoadChannels()
{
foreach (var directoryPath in Directory.EnumerateDirectories(BufferPath))
{
if (directoryPath.Equals(GetStorePath(SeqCliConnectionChannelId)))
{
// data was stored when not using API key forwarding
continue;
}

string apiKey, channelId;

if (new SystemStoreDirectory(directoryPath).TryReadApiKey(Config, out var key))
{
apiKey = key!;
channelId = directoryPath;
}
else
{
// directory should contain an api key file but does not
continue;
}

var created = OpenOrCreateChannel(channelId, apiKey);
_channelsByApiKey.Add(apiKey, created);
}
}

public override ForwardingChannel GetForwardingChannel(string? requestApiKey)
{
lock (ChannelsSync)
{
// use empty string to represent no api key
if (_channelsByApiKey.TryGetValue(requestApiKey ?? "", out var channel))
{
return channel;
}

var channelId = ApiKeyToId(requestApiKey);
var created = OpenOrCreateChannel(channelId, requestApiKey);
var store = new SystemStoreDirectory(GetStorePath(channelId));
store.WriteApiKey(Config, requestApiKey ?? "");
_channelsByApiKey.Add(requestApiKey ?? "", created);
return created;
}
}

string ApiKeyToId(string? apiKey)
{
return string.IsNullOrEmpty(apiKey) ? EmptyApiKeyChannelId : Guid.NewGuid().ToString();
}

public override async Task StopAsync()
{
Log.ForContext<ApiKeyForwardingChannelWrapper>().Information("Flushing log buffers");
ShutdownTokenSource.CancelAfter(TimeSpan.FromSeconds(30));

Task[] stopChannels;
lock (ChannelsSync)
{
stopChannels = _channelsByApiKey.Values.Select(ch => ch.StopAsync()).ToArray();
}

await Task.WhenAll([..stopChannels]);
await ShutdownTokenSource.CancelAsync();
}
}
96 changes: 0 additions & 96 deletions src/SeqCli/Forwarder/Channel/ForwardingChannelMap.cs

This file was deleted.

49 changes: 49 additions & 0 deletions src/SeqCli/Forwarder/Channel/ForwardingChannelWrapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Seq.Api;
using SeqCli.Config;
using SeqCli.Forwarder.Filesystem.System;
using SeqCli.Forwarder.Storage;
using Serilog;

namespace SeqCli.Forwarder.Channel;

internal abstract class ForwardingChannelWrapper(string bufferPath, SeqConnection connection, SeqCliConfig config)
{
protected const string SeqCliConnectionChannelId = "SeqCliConnection";
protected readonly string BufferPath = bufferPath;
protected readonly SeqCliConfig Config = config;
protected readonly CancellationTokenSource ShutdownTokenSource = new();
protected readonly Lock ChannelsSync = new();

// <param name="id">The id used for the channel storage on the file system.</param>
// <param name="apiKey">The apiKey that will be used to connect to the downstream Seq instance.</param>
protected ForwardingChannel OpenOrCreateChannel(string id, string? apiKey)
{
var storePath = GetStorePath(id);
var store = new SystemStoreDirectory(storePath);

Log.ForContext<ForwardingChannelWrapper>().Information("Opening local buffer in {StorePath}", storePath);

return new ForwardingChannel(
BufferAppender.Open(store),
BufferReader.Open(store),
Bookmark.Open(store),
connection,
apiKey,
Config.Forwarder.Storage.TargetChunkSizeBytes,
Config.Forwarder.Storage.MaxChunks,
Config.Connection.BatchSizeLimitBytes,
ShutdownTokenSource.Token);
}

public abstract ForwardingChannel GetForwardingChannel(string? requestApiKey);

public abstract Task StopAsync();

protected string GetStorePath(string id)
{
return Path.Combine(BufferPath, id);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System;
using System.Threading.Tasks;
using Seq.Api;
using SeqCli.Config;
using Serilog;

namespace SeqCli.Forwarder.Channel;

class SeqCliConnectionForwardingChannelWrapper: ForwardingChannelWrapper
{
readonly ForwardingChannel _seqCliConnectionChannel;

public SeqCliConnectionForwardingChannelWrapper(string bufferPath, SeqConnection connection, SeqCliConfig config, string? seqCliApiKey): base(bufferPath, connection, config)
{
_seqCliConnectionChannel = OpenOrCreateChannel(SeqCliConnectionChannelId, seqCliApiKey);
}

public override ForwardingChannel GetForwardingChannel(string? _)
{
return _seqCliConnectionChannel;
}

public override async Task StopAsync()
{
Log.ForContext<SeqCliConnectionForwardingChannelWrapper>().Information("Flushing log buffers");
ShutdownTokenSource.CancelAfter(TimeSpan.FromSeconds(30));

await _seqCliConnectionChannel.StopAsync();
await ShutdownTokenSource.CancelAsync();
}
}
32 changes: 32 additions & 0 deletions src/SeqCli/Forwarder/Filesystem/System/SystemStoreDirectory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Runtime.InteropServices;
using System.Text;
using SeqCli.Config;
using Serilog;

#if UNIX
using SeqCli.Forwarder.Filesystem.System.Unix;
Expand All @@ -34,6 +38,34 @@ public SystemStoreDirectory(string path)
if (!Directory.Exists(_directoryPath)) Directory.CreateDirectory(_directoryPath);
}

public void WriteApiKey(SeqCliConfig config, string apiKey)
{
File.WriteAllBytes(
Path.Combine(_directoryPath, "api.key"),
config.Encryption.DataProtector().Encrypt(Encoding.UTF8.GetBytes(apiKey)));
}

public bool TryReadApiKey(SeqCliConfig config, [NotNullWhen(true)] out string? apiKey)
{
apiKey = null;
var path = Path.Combine(_directoryPath, "api.key");

if (!File.Exists(path)) return false;

try
{
var encrypted = File.ReadAllBytes(path);
apiKey = Encoding.UTF8.GetString(config.Encryption.DataProtector().Decrypt(encrypted));
return true;
}
catch (Exception exception)
{
Log.Warning(exception, "Could not read or decrypt api key");
}

return false;
}

public override SystemStoreFile Create(string name)
{
var filePath = Path.Combine(_directoryPath, name);
Expand Down
14 changes: 13 additions & 1 deletion src/SeqCli/Forwarder/ForwarderModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,19 @@ public ForwarderModule(string bufferPath, SeqCliConfig config, SeqConnection con
protected override void Load(ContainerBuilder builder)
{
builder.RegisterType<ServerService>().SingleInstance();
builder.Register(_ => new ForwardingChannelMap(_bufferPath, _connection, _apiKey, _config)).SingleInstance();

if (_config.Forwarder.UseApiKeyForwarding)
{
builder.Register<ApiKeyForwardingChannelWrapper>(_ =>
new ApiKeyForwardingChannelWrapper(_bufferPath, _connection, _config))
.As<ForwardingChannelWrapper>().SingleInstance();
}
else
{
builder.Register<SeqCliConnectionForwardingChannelWrapper>(_ =>
new SeqCliConnectionForwardingChannelWrapper(_bufferPath, _connection, _config, _apiKey))
.As<ForwardingChannelWrapper>().SingleInstance();
}

builder.RegisterType<IngestionEndpoints>().As<IMapEndpoints>();

Expand Down
7 changes: 4 additions & 3 deletions src/SeqCli/Forwarder/Web/Api/IngestionEndpoints.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ class IngestionEndpoints : IMapEndpoints
{
static readonly Encoding Utf8 = new UTF8Encoding(false);

readonly ForwardingChannelMap _forwardingChannels;
readonly ForwardingChannelWrapper _forwardingChannels;
readonly SeqCliConfig _config;

public IngestionEndpoints(ForwardingChannelMap forwardingChannels, SeqCliConfig config)
public IngestionEndpoints(ForwardingChannelWrapper forwardingChannels, SeqCliConfig config)
{
_forwardingChannels = forwardingChannels;
_config = config;
Expand Down Expand Up @@ -75,7 +75,8 @@ async Task<IResult> IngestCompactFormatAsync(HttpContext context)
var cts = CancellationTokenSource.CreateLinkedTokenSource(context.RequestAborted);
cts.CancelAfter(TimeSpan.FromSeconds(5));

var log = _forwardingChannels.Get(GetApiKey(context.Request));
var requestApiKey = GetApiKey(context.Request);
var log = _forwardingChannels.GetForwardingChannel(requestApiKey);

// Add one for the extra newline that we have to insert at the end of batches.
var bufferSize = _config.Connection.BatchSizeLimitBytes + 1;
Expand Down
4 changes: 2 additions & 2 deletions src/SeqCli/Forwarder/Web/Host/ServerService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ namespace SeqCli.Forwarder.Web.Host;
class ServerService
{
readonly IHost _host;
readonly ForwardingChannelMap _forwardingChannelMap;
readonly ForwardingChannelWrapper _forwardingChannelMap;
readonly string _listenUri;

public ServerService(IHost host, ForwardingChannelMap forwardingChannelMap, string listenUri)
public ServerService(IHost host, ForwardingChannelWrapper forwardingChannelMap, string listenUri)
{
_host = host;
_forwardingChannelMap = forwardingChannelMap;
Expand Down