diff --git a/OneSTools.EventLog.Exporter.Core/ClickHouse/ClickHouseStorage.cs b/OneSTools.EventLog.Exporter.Core/ClickHouse/ClickHouseStorage.cs index 007e843..330efc2 100644 --- a/OneSTools.EventLog.Exporter.Core/ClickHouse/ClickHouseStorage.cs +++ b/OneSTools.EventLog.Exporter.Core/ClickHouse/ClickHouseStorage.cs @@ -35,13 +35,17 @@ public ClickHouseStorage(ILogger logger, IConfiguration confi Init(); } - public async Task ReadEventLogPositionAsync(CancellationToken cancellationToken = default) + public async Task ReadEventLogPositionAsync(CancellationToken cancellationToken = default, string filename = "") { await CreateConnectionAsync(cancellationToken); var commandText = $"SELECT TOP 1 FileName, EndPosition, LgfEndPosition, Id FROM {TableName} ORDER BY DateTime DESC, EndPosition DESC"; + if (filename != "") { + commandText = $"SELECT TOP 1 FileName, EndPosition, LgfEndPosition, Id FROM {_tableName} WHERE FileName = '{filename}' ORDER BY EndPosition DESC"; + } + await using var cmd = _connection.CreateCommand(); cmd.CommandText = commandText; diff --git a/OneSTools.EventLog.Exporter.Core/ElasticSearch/ElasticSearchStorage.cs b/OneSTools.EventLog.Exporter.Core/ElasticSearch/ElasticSearchStorage.cs index 982110b..c9dfe6d 100644 --- a/OneSTools.EventLog.Exporter.Core/ElasticSearch/ElasticSearchStorage.cs +++ b/OneSTools.EventLog.Exporter.Core/ElasticSearch/ElasticSearchStorage.cs @@ -53,7 +53,7 @@ public ElasticSearchStorage(ILogger logger, IConfiguration CheckSettings(); } - public async Task ReadEventLogPositionAsync(CancellationToken cancellationToken = default) + public async Task ReadEventLogPositionAsync(CancellationToken cancellationToken = default, string filename = "") { if (_client is null) await ConnectAsync(cancellationToken); diff --git a/OneSTools.EventLog.Exporter.Core/EventLogExporter.cs b/OneSTools.EventLog.Exporter.Core/EventLogExporter.cs index 40844cf..dc343e0 100644 --- a/OneSTools.EventLog.Exporter.Core/EventLogExporter.cs +++ b/OneSTools.EventLog.Exporter.Core/EventLogExporter.cs @@ -27,6 +27,8 @@ public class EventLogExporter : IDisposable private readonly DateTime _skipEventsBeforeDate; private string _currentLgpFile; + private EventLogPosition _currentPos; + private long _counterSkip = 0; private bool _disposedValue; @@ -37,6 +39,7 @@ public class EventLogExporter : IDisposable public EventLogExporter(EventLogExporterSettings settings, IEventLogStorage storage, ILogger logger = null) { + // Constructor - EventLogExportersManager _logger = logger; _storage = storage; @@ -55,6 +58,7 @@ public EventLogExporter(EventLogExporterSettings settings, IEventLogStorage stor public EventLogExporter(ILogger logger, IConfiguration configuration, IEventLogStorage storage) { + // Constructor - EventLogExporter _logger = logger; _storage = storage; @@ -107,7 +111,10 @@ public async Task StartAsync(CancellationToken cancellationToken = default) try { var settings = await GetReaderSettingsAsync(cancellationToken); + // Init file reader _eventLogReader = new EventLogReader(settings); + _currentLgpFile = settings.LgpFileName; + _logger?.LogInformation($"Reader started reading {_eventLogReader.LgpFileName}"); while (!cancellationToken.IsCancellationRequested && !_writeBlock.Completion.IsCompleted) { @@ -131,14 +138,36 @@ public async Task StartAsync(CancellationToken cancellationToken = default) if (item != null) { - await SendAsync(_batchBlock, item, cancellationToken); + if (!string.IsNullOrEmpty(_eventLogReader.LgpFileName) && _currentLgpFile != _eventLogReader.LgpFileName) { + if (_counterSkip > 0) + { + _logger?.LogInformation($"Reader skipped {_counterSkip} items. {_eventLogReader.LgpFileName}"); + _counterSkip = 0; + } - if (!string.IsNullOrEmpty(_eventLogReader.LgpFileName) && - _currentLgpFile != _eventLogReader.LgpFileName) - { - _logger?.LogInformation($"Reader started reading {_eventLogReader.LgpFileName}"); + _logger?.LogInformation($"Reader changed to {_eventLogReader.LgpFileName}"); _currentLgpFile = _eventLogReader.LgpFileName; + // Need fix batch + forceSending = true; + + var newPos = await _storage.ReadEventLogPositionAsync(cancellationToken, _eventLogReader.LgpFileName); + if (newPos != null) { + _currentPos = newPos; + } else { + _currentPos = new EventLogPosition(item.FileName, 0, item.LgfEndPosition, item.Id); + } + } + + //await SendAsync(_batchBlock, item, cancellationToken); + if (item.EndPosition > _currentPos.EndPosition) + { + await SendAsync(_batchBlock, item, cancellationToken); + } + else + { + _counterSkip++; + _eventLogReader.BackId(); } } else if (!settings.LiveMode) @@ -206,6 +235,7 @@ private async Task GetReaderSettingsAsync(CancellationTo if (position != null) { + _currentPos = position; var lgpFilePath = Path.Combine(_logFolder, position.FileName); if (!File.Exists(lgpFilePath)) diff --git a/OneSTools.EventLog.Exporter.Core/IEventLogStorage.cs b/OneSTools.EventLog.Exporter.Core/IEventLogStorage.cs index 6c1a423..8bc01ec 100644 --- a/OneSTools.EventLog.Exporter.Core/IEventLogStorage.cs +++ b/OneSTools.EventLog.Exporter.Core/IEventLogStorage.cs @@ -7,7 +7,7 @@ namespace OneSTools.EventLog.Exporter.Core { public interface IEventLogStorage : IDisposable { - Task ReadEventLogPositionAsync(CancellationToken cancellationToken = default); + Task ReadEventLogPositionAsync(CancellationToken cancellationToken = default, string filename = ""); Task WriteEventLogDataAsync(List entities, CancellationToken cancellationToken = default); } } \ No newline at end of file diff --git a/OneSTools.EventLog/EventLogReader.cs b/OneSTools.EventLog/EventLogReader.cs index a2fc92b..6ac89e0 100644 --- a/OneSTools.EventLog/EventLogReader.cs +++ b/OneSTools.EventLog/EventLogReader.cs @@ -46,6 +46,11 @@ public void Dispose() GC.SuppressFinalize(this); } + public void BackId() + { + _settings.ItemId--; + } + /// /// The behaviour of the method depends on the mode of the reader. In the "live" mode it'll be waiting for an appearing /// of the new event item, otherwise It'll just return null