Core: Add disk IO detection

This commit is contained in:
genteure 2021-12-19 00:56:41 +08:00
parent 1711b9fa57
commit 95e4f1d5dd
12 changed files with 140 additions and 61 deletions

View File

@ -0,0 +1,29 @@
using System;
namespace BililiveRecorder.Core.Event
{
public class IOStatsEventArgs : EventArgs
{
public DateTimeOffset StartTime { get; set; }
public DateTimeOffset EndTime { get; set; }
public TimeSpan Duration { get; set; }
public int NetworkBytesDownloaded { get; set; }
/// <summary>
/// mibi-bits per seconds
/// </summary>
public double NetworkMbps { get; set; }
public TimeSpan DiskWriteTime { get; set; }
public int DiskBytesWritten { get; set; }
/// <summary>
/// mibi-bytes per seconds
/// </summary>
public double DiskMBps { get; set; }
}
}

View File

@ -1,17 +0,0 @@
using System;
namespace BililiveRecorder.Core.Event
{
public class NetworkingStatsEventArgs : EventArgs
{
public DateTimeOffset StartTime { get; set; }
public DateTimeOffset EndTime { get; set; }
public TimeSpan Duration { get; set; }
public int BytesDownloaded { get; set; }
public double Mbps { get; set; }
}
}

View File

@ -15,7 +15,7 @@ namespace BililiveRecorder.Core
event EventHandler<AggregatedRoomEventArgs<RecordSessionEndedEventArgs>>? RecordSessionEnded;
event EventHandler<AggregatedRoomEventArgs<RecordFileOpeningEventArgs>>? RecordFileOpening;
event EventHandler<AggregatedRoomEventArgs<RecordFileClosedEventArgs>>? RecordFileClosed;
event EventHandler<AggregatedRoomEventArgs<NetworkingStatsEventArgs>>? NetworkingStats;
event EventHandler<AggregatedRoomEventArgs<IOStatsEventArgs>>? IOStats;
event EventHandler<AggregatedRoomEventArgs<RecordingStatsEventArgs>>? RecordingStats;
IRoom AddRoom(int roomid);

View File

@ -29,7 +29,7 @@ namespace BililiveRecorder.Core
event EventHandler<RecordFileOpeningEventArgs>? RecordFileOpening;
event EventHandler<RecordFileClosedEventArgs>? RecordFileClosed;
event EventHandler<RecordingStatsEventArgs>? RecordingStats;
event EventHandler<NetworkingStatsEventArgs>? NetworkingStats;
event EventHandler<IOStatsEventArgs>? IOStats;
void StartRecord();
void StopRecord();

View File

@ -50,7 +50,7 @@ namespace BililiveRecorder.Core
public event EventHandler<AggregatedRoomEventArgs<RecordSessionEndedEventArgs>>? RecordSessionEnded;
public event EventHandler<AggregatedRoomEventArgs<RecordFileOpeningEventArgs>>? RecordFileOpening;
public event EventHandler<AggregatedRoomEventArgs<RecordFileClosedEventArgs>>? RecordFileClosed;
public event EventHandler<AggregatedRoomEventArgs<NetworkingStatsEventArgs>>? NetworkingStats;
public event EventHandler<AggregatedRoomEventArgs<IOStatsEventArgs>>? IOStats;
public event EventHandler<AggregatedRoomEventArgs<RecordingStatsEventArgs>>? RecordingStats;
public event PropertyChangedEventHandler? PropertyChanged;
@ -81,7 +81,7 @@ namespace BililiveRecorder.Core
room.RecordSessionEnded += this.Room_RecordSessionEnded;
room.RecordFileOpening += this.Room_RecordFileOpening;
room.RecordFileClosed += this.Room_RecordFileClosed;
room.NetworkingStats += this.Room_NetworkingStats;
room.IOStats += this.Room_IOStats;
room.RecordingStats += this.Room_RecordingStats;
room.PropertyChanged += this.Room_PropertyChanged;
@ -122,10 +122,10 @@ namespace BililiveRecorder.Core
#region Events
private void Room_NetworkingStats(object sender, NetworkingStatsEventArgs e)
private void Room_IOStats(object sender, IOStatsEventArgs e)
{
var room = (IRoom)sender;
NetworkingStats?.Invoke(this, new AggregatedRoomEventArgs<NetworkingStatsEventArgs>(room, e));
IOStats?.Invoke(this, new AggregatedRoomEventArgs<IOStatsEventArgs>(room, e));
}
private void Room_RecordingStats(object sender, RecordingStatsEventArgs e)

View File

@ -8,7 +8,7 @@ namespace BililiveRecorder.Core.Recording
{
Guid SessionId { get; }
event EventHandler<NetworkingStatsEventArgs>? NetworkingStats;
event EventHandler<IOStatsEventArgs>? IOStats;
event EventHandler<RecordingStatsEventArgs>? RecordingStats;
event EventHandler<RecordFileOpeningEventArgs>? RecordFileOpening;
event EventHandler<RecordFileClosedEventArgs>? RecordFileClosed;

View File

@ -63,9 +63,18 @@ namespace BililiveRecorder.Core.Recording
if (bytesRead == 0)
break;
Interlocked.Add(ref this.fillerDownloadedBytes, bytesRead);
Interlocked.Add(ref this.ioNetworkDownloadedBytes, bytesRead);
this.ioDiskStopwatch.Restart();
await file.WriteAsync(buffer, 0, bytesRead).ConfigureAwait(false);
this.ioDiskStopwatch.Stop();
lock (this.ioDiskStatsLock)
{
this.ioDiskWriteTime += this.ioDiskStopwatch.Elapsed;
this.ioDiskWrittenBytes += bytesRead;
}
this.ioDiskStopwatch.Reset();
}
}
catch (OperationCanceledException ex)

View File

@ -1,4 +1,5 @@
using System;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net.Http;
@ -35,9 +36,17 @@ namespace BililiveRecorder.Core.Recording
protected bool started = false;
protected bool timeoutTriggered = false;
private readonly object fillerStatsLock = new object();
protected int fillerDownloadedBytes;
private DateTimeOffset fillerStatsLastTrigger;
private readonly object ioStatsLock = new();
protected int ioNetworkDownloadedBytes;
protected Stopwatch ioDiskStopwatch = new();
protected object ioDiskStatsLock = new();
protected TimeSpan ioDiskWriteTime;
protected int ioDiskWrittenBytes;
private DateTimeOffset ioDiskWarningTimeout;
private DateTimeOffset ioStatsLastTrigger;
private TimeSpan durationSinceNoDataReceived;
protected RecordTaskBase(IRoom room, ILogger logger, IApiClient apiClient, FileNameGenerator fileNameGenerator)
@ -48,20 +57,20 @@ namespace BililiveRecorder.Core.Recording
this.fileNameGenerator = fileNameGenerator ?? throw new ArgumentNullException(nameof(fileNameGenerator));
this.ct = this.cts.Token;
this.timer.Elapsed += this.Timer_Elapsed_TriggerNetworkStats;
this.timer.Elapsed += this.Timer_Elapsed_TriggerIOStats;
}
public Guid SessionId { get; } = Guid.NewGuid();
#region Events
public event EventHandler<NetworkingStatsEventArgs>? NetworkingStats;
public event EventHandler<IOStatsEventArgs>? IOStats;
public event EventHandler<RecordingStatsEventArgs>? RecordingStats;
public event EventHandler<RecordFileOpeningEventArgs>? RecordFileOpening;
public event EventHandler<RecordFileClosedEventArgs>? RecordFileClosed;
public event EventHandler? RecordSessionEnded;
protected void OnNetworkingStats(NetworkingStatsEventArgs e) => NetworkingStats?.Invoke(this, e);
protected void OnIOStats(IOStatsEventArgs e) => IOStats?.Invoke(this, e);
protected void OnRecordingStats(RecordingStatsEventArgs e) => RecordingStats?.Invoke(this, e);
protected void OnRecordFileOpening(RecordFileOpeningEventArgs e) => RecordFileOpening?.Invoke(this, e);
protected void OnRecordFileClosed(RecordFileClosedEventArgs e) => RecordFileClosed?.Invoke(this, e);
@ -98,7 +107,7 @@ namespace BililiveRecorder.Core.Recording
var stream = await this.GetStreamAsync(fullUrl: fullUrl, timeout: (int)this.room.RoomConfig.TimingStreamConnect).ConfigureAwait(false);
this.fillerStatsLastTrigger = DateTimeOffset.UtcNow;
this.ioStatsLastTrigger = DateTimeOffset.UtcNow;
this.durationSinceNoDataReceived = TimeSpan.Zero;
this.ct.Register(state => Task.Run(async () =>
@ -118,38 +127,61 @@ namespace BililiveRecorder.Core.Recording
protected abstract void StartRecordingLoop(Stream stream);
private void Timer_Elapsed_TriggerNetworkStats(object sender, ElapsedEventArgs e)
private void Timer_Elapsed_TriggerIOStats(object sender, ElapsedEventArgs e)
{
int bytes;
TimeSpan diff;
DateTimeOffset start, end;
int networkDownloadBytes, diskWriteBytes;
TimeSpan durationDiff, diskWriteTime;
DateTimeOffset startTime, endTime;
lock (this.fillerStatsLock)
lock (this.ioStatsLock) // 锁 timer elapsed 事件本身防止并行运行
{
bytes = Interlocked.Exchange(ref this.fillerDownloadedBytes, 0);
end = DateTimeOffset.UtcNow;
start = this.fillerStatsLastTrigger;
this.fillerStatsLastTrigger = end;
diff = end - start;
// networks
networkDownloadBytes = Interlocked.Exchange(ref this.ioNetworkDownloadedBytes, 0); // 锁网络统计
endTime = DateTimeOffset.UtcNow;
startTime = this.ioStatsLastTrigger;
this.ioStatsLastTrigger = endTime;
durationDiff = endTime - startTime;
this.durationSinceNoDataReceived = bytes > 0 ? TimeSpan.Zero : this.durationSinceNoDataReceived + diff;
this.durationSinceNoDataReceived = networkDownloadBytes > 0 ? TimeSpan.Zero : this.durationSinceNoDataReceived + durationDiff;
// disks
lock (this.ioDiskStatsLock) // 锁硬盘统计
{
diskWriteTime = this.ioDiskWriteTime;
diskWriteBytes = this.ioDiskWrittenBytes;
this.ioDiskWriteTime = TimeSpan.Zero;
this.ioDiskWrittenBytes = 0;
}
}
var mbps = bytes * (8d / 1024d / 1024d) / diff.TotalSeconds;
var netMbps = networkDownloadBytes * (8d / 1024d / 1024d) / durationDiff.TotalSeconds;
var diskMBps = diskWriteBytes / (1024d * 1024d) / diskWriteTime.TotalSeconds;
this.OnNetworkingStats(new NetworkingStatsEventArgs
this.OnIOStats(new IOStatsEventArgs
{
BytesDownloaded = bytes,
Duration = diff,
StartTime = start,
EndTime = end,
Mbps = mbps
NetworkBytesDownloaded = networkDownloadBytes,
Duration = durationDiff,
StartTime = startTime,
EndTime = endTime,
NetworkMbps = netMbps,
DiskBytesWritten = diskWriteBytes,
DiskWriteTime = diskWriteTime,
DiskMBps = diskMBps,
});
var now = DateTimeOffset.Now;
if (diskWriteBytes > 0 && this.ioDiskWarningTimeout < now && (diskWriteTime.TotalSeconds > 1d || diskMBps < 2d))
{
// 硬盘 IO 可能不能满足录播
this.ioDiskWarningTimeout = now + TimeSpan.FromMinutes(2); // 最多每 2 分钟提醒一次
this.logger.Warning("检测到硬盘写入速度较慢可能影响录播,请检查是否有其他软件或游戏正在使用硬盘");
}
if ((!this.timeoutTriggered) && (this.durationSinceNoDataReceived.TotalMilliseconds > this.room.RoomConfig.TimingWatchdogTimeout))
{
this.timeoutTriggered = true;
this.logger.Warning("直播服务器未断开连接但停止发送直播数据,将会主动断开连接");
this.logger.Warning("检测到录制卡住,可能是网络或硬盘原因,将会主动断开连接");
this.RequestStop();
}
}

View File

@ -129,7 +129,7 @@ namespace BililiveRecorder.Core.Recording
if (bytesRead == 0)
break;
writer.Advance(bytesRead);
Interlocked.Add(ref this.fillerDownloadedBytes, bytesRead);
Interlocked.Add(ref this.ioNetworkDownloadedBytes, bytesRead);
}
catch (Exception ex)
{
@ -171,7 +171,16 @@ namespace BililiveRecorder.Core.Recording
if (this.context.Comments.Count > 0)
this.logger.Debug("修复逻辑输出 {@Comments}", this.context.Comments);
await this.writer.WriteAsync(this.context).ConfigureAwait(false);
this.ioDiskStopwatch.Restart();
var bytesWritten = await this.writer.WriteAsync(this.context).ConfigureAwait(false);
this.ioDiskStopwatch.Stop();
lock (this.ioDiskStatsLock)
{
this.ioDiskWriteTime += this.ioDiskStopwatch.Elapsed;
this.ioDiskWrittenBytes += bytesWritten;
}
this.ioDiskStopwatch.Reset();
if (this.context.Actions.Any(x => x is PipelineDisconnectAction))
{

View File

@ -102,7 +102,7 @@ namespace BililiveRecorder.Core
public event EventHandler<RecordSessionEndedEventArgs>? RecordSessionEnded;
public event EventHandler<RecordFileOpeningEventArgs>? RecordFileOpening;
public event EventHandler<RecordFileClosedEventArgs>? RecordFileClosed;
public event EventHandler<NetworkingStatsEventArgs>? NetworkingStats;
public event EventHandler<IOStatsEventArgs>? IOStats;
public event EventHandler<RecordingStatsEventArgs>? RecordingStats;
public event PropertyChangedEventHandler? PropertyChanged;
@ -219,7 +219,7 @@ namespace BililiveRecorder.Core
return;
var task = this.recordTaskFactory.CreateRecordTask(this);
task.NetworkingStats += this.RecordTask_NetworkingStats;
task.IOStats += this.RecordTask_IOStats;
task.RecordingStats += this.RecordTask_RecordingStats;
task.RecordFileOpening += this.RecordTask_RecordFileOpening;
task.RecordFileClosed += this.RecordTask_RecordFileClosed;
@ -349,13 +349,13 @@ namespace BililiveRecorder.Core
#region Event Handlers
///
private void RecordTask_NetworkingStats(object sender, NetworkingStatsEventArgs e)
private void RecordTask_IOStats(object sender, IOStatsEventArgs e)
{
this.logger.Verbose("Networking stats: {@stats}", e);
this.logger.Verbose("IO stats: {@stats}", e);
this.Stats.NetworkMbps = e.Mbps;
this.Stats.NetworkMbps = e.NetworkMbps;
NetworkingStats?.Invoke(this, e);
IOStats?.Invoke(this, e);
}
///

View File

@ -12,6 +12,6 @@ namespace BililiveRecorder.Flv
event EventHandler<FileClosedEventArgs> FileClosed;
Task WriteAsync(FlvProcessingContext context);
Task<int> WriteAsync(FlvProcessingContext context);
}
}

View File

@ -25,6 +25,8 @@ namespace BililiveRecorder.Flv.Writer
private KeyframesScriptDataValue? keyframesScriptDataValue = null;
private double lastDuration;
private int bytesWrittenByCurrentWriteCall { get; set; }
public event EventHandler<FileClosedEventArgs>? FileClosed;
public Action<ScriptTagBody>? BeforeScriptTagWrite { get; set; }
@ -37,7 +39,7 @@ namespace BililiveRecorder.Flv.Writer
this.disableKeyframes = disableKeyframes;
}
public async Task WriteAsync(FlvProcessingContext context)
public async Task<int> WriteAsync(FlvProcessingContext context)
{
if (this.state == WriterState.Invalid)
throw new InvalidOperationException("FlvProcessingContextWriter is in a invalid state.");
@ -70,11 +72,16 @@ namespace BililiveRecorder.Flv.Writer
this.semaphoreSlim.Release();
}
var bytesWritten = this.bytesWrittenByCurrentWriteCall;
this.bytesWrittenByCurrentWriteCall = 0;
// Dispose tags
foreach (var action in context.Actions)
if (action is PipelineDataAction dataAction)
foreach (var tag in dataAction.Tags)
tag.BinaryData?.Dispose();
return bytesWritten;
}
#region Flv Writer Implementation
@ -214,6 +221,8 @@ namespace BililiveRecorder.Flv.Writer
await this.tagWriter.WriteTag(this.nextAudioHeaderTag).ConfigureAwait(false);
}
this.bytesWrittenByCurrentWriteCall += (int)this.tagWriter.FileSize;
this.state = WriterState.Writing;
}
@ -236,9 +245,13 @@ namespace BililiveRecorder.Flv.Writer
var duration = tags[tags.Count - 1].Timestamp / 1000d;
this.lastDuration = duration;
var beforeFileSize = this.tagWriter.FileSize;
foreach (var tag in tags)
await this.tagWriter.WriteTag(tag).ConfigureAwait(false);
this.bytesWrittenByCurrentWriteCall += (int)(this.tagWriter.FileSize - beforeFileSize);
await this.RewriteScriptTagImpl(duration, firstTag.IsKeyframeData(), firstTag.Timestamp, pos).ConfigureAwait(false);
}
@ -255,7 +268,11 @@ namespace BililiveRecorder.Flv.Writer
throw new InvalidOperationException($"Can't write data tag with current state ({this.state})");
}
var beforeFileSize = this.tagWriter.FileSize;
await this.tagWriter.WriteTag(endAction.Tag).ConfigureAwait(false);
this.bytesWrittenByCurrentWriteCall += (int)(this.tagWriter.FileSize - beforeFileSize);
}
#endregion