Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ public ClickHouseStorage(ILogger<ClickHouseStorage> logger, IConfiguration confi
Init();
}

public async Task<EventLogPosition> ReadEventLogPositionAsync(CancellationToken cancellationToken = default)
public async Task<EventLogPosition> ReadEventLogPositionAsync(CancellationToken cancellationToken = default, string filename = "")
{
await CreateConnectionAsync(cancellationToken);

var commandText =
$"SELECT TOP 1 FileName, EndPosition, LgfEndPosition, Id FROM {TableName} ORDER BY DateTime DESC, EndPosition DESC";

if (filename != "") {
commandText = $"SELECT TOP 1 FileName, EndPosition, LgfEndPosition, Id FROM {_tableName} WHERE FileName = '{filename}' ORDER BY EndPosition DESC";
}

await using var cmd = _connection.CreateCommand();
cmd.CommandText = commandText;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public ElasticSearchStorage(ILogger<ElasticSearchStorage> logger, IConfiguration
CheckSettings();
}

public async Task<EventLogPosition> ReadEventLogPositionAsync(CancellationToken cancellationToken = default)
public async Task<EventLogPosition> ReadEventLogPositionAsync(CancellationToken cancellationToken = default, string filename = "")
{
if (_client is null)
await ConnectAsync(cancellationToken);
Expand Down
40 changes: 35 additions & 5 deletions OneSTools.EventLog.Exporter.Core/EventLogExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class EventLogExporter : IDisposable
private readonly DateTime _skipEventsBeforeDate;

private string _currentLgpFile;
private EventLogPosition _currentPos;
private long _counterSkip = 0;

private bool _disposedValue;

Expand All @@ -37,6 +39,7 @@ public class EventLogExporter : IDisposable
public EventLogExporter(EventLogExporterSettings settings, IEventLogStorage storage,
ILogger<EventLogExporter> logger = null)
{
// Constructor - EventLogExportersManager
_logger = logger;
_storage = storage;

Expand All @@ -55,6 +58,7 @@ public EventLogExporter(EventLogExporterSettings settings, IEventLogStorage stor
public EventLogExporter(ILogger<EventLogExporter> logger, IConfiguration configuration,
IEventLogStorage storage)
{
// Constructor - EventLogExporter
_logger = logger;
_storage = storage;

Expand Down Expand Up @@ -107,7 +111,10 @@ public async Task StartAsync(CancellationToken cancellationToken = default)
try
{
var settings = await GetReaderSettingsAsync(cancellationToken);
// Init file reader
_eventLogReader = new EventLogReader(settings);
_currentLgpFile = settings.LgpFileName;
_logger?.LogInformation($"Reader started reading {_eventLogReader.LgpFileName}");

while (!cancellationToken.IsCancellationRequested && !_writeBlock.Completion.IsCompleted)
{
Expand All @@ -131,14 +138,36 @@ public async Task StartAsync(CancellationToken cancellationToken = default)

if (item != null)
{
await SendAsync(_batchBlock, item, cancellationToken);
if (!string.IsNullOrEmpty(_eventLogReader.LgpFileName) && _currentLgpFile != _eventLogReader.LgpFileName) {
if (_counterSkip > 0)
{
_logger?.LogInformation($"Reader skipped {_counterSkip} items. {_eventLogReader.LgpFileName}");
_counterSkip = 0;
}

if (!string.IsNullOrEmpty(_eventLogReader.LgpFileName) &&
_currentLgpFile != _eventLogReader.LgpFileName)
{
_logger?.LogInformation($"Reader started reading {_eventLogReader.LgpFileName}");
_logger?.LogInformation($"Reader changed to {_eventLogReader.LgpFileName}");

_currentLgpFile = _eventLogReader.LgpFileName;
// Need fix batch
forceSending = true;

var newPos = await _storage.ReadEventLogPositionAsync(cancellationToken, _eventLogReader.LgpFileName);
if (newPos != null) {
_currentPos = newPos;
} else {
_currentPos = new EventLogPosition(item.FileName, 0, item.LgfEndPosition, item.Id);
}
}

//await SendAsync(_batchBlock, item, cancellationToken);
if (item.EndPosition > _currentPos.EndPosition)
{
await SendAsync(_batchBlock, item, cancellationToken);
}
else
{
_counterSkip++;
_eventLogReader.BackId();
}
}
else if (!settings.LiveMode)
Expand Down Expand Up @@ -206,6 +235,7 @@ private async Task<EventLogReaderSettings> GetReaderSettingsAsync(CancellationTo

if (position != null)
{
_currentPos = position;
var lgpFilePath = Path.Combine(_logFolder, position.FileName);

if (!File.Exists(lgpFilePath))
Expand Down
2 changes: 1 addition & 1 deletion OneSTools.EventLog.Exporter.Core/IEventLogStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace OneSTools.EventLog.Exporter.Core
{
public interface IEventLogStorage : IDisposable
{
Task<EventLogPosition> ReadEventLogPositionAsync(CancellationToken cancellationToken = default);
Task<EventLogPosition> ReadEventLogPositionAsync(CancellationToken cancellationToken = default, string filename = "");
Task WriteEventLogDataAsync(List<EventLogItem> entities, CancellationToken cancellationToken = default);
}
}
5 changes: 5 additions & 0 deletions OneSTools.EventLog/EventLogReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public void Dispose()
GC.SuppressFinalize(this);
}

public void BackId()
{
_settings.ItemId--;
}

/// <summary>
/// The behaviour of the method depends on the mode of the reader. In the "live" mode it'll be waiting for an appearing
/// of the new event item, otherwise It'll just return null
Expand Down