using System; using System.Collections.Generic; using System.ComponentModel; using System.IO; using System.Net.Http; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using BililiveRecorder.Core.Api; using BililiveRecorder.Core.Config.V3; using BililiveRecorder.Core.Danmaku; using BililiveRecorder.Core.Event; using BililiveRecorder.Core.Recording; using Microsoft.Extensions.DependencyInjection; using Newtonsoft.Json.Linq; using Polly; using Serilog; using Serilog.Events; using Timer = System.Timers.Timer; namespace BililiveRecorder.Core { internal class Room : IRoom { private readonly object recordStartLock = new object(); private readonly SemaphoreSlim recordRetryDelaySemaphoreSlim = new SemaphoreSlim(1, 1); private readonly Timer timer; private readonly IServiceScope scope; private readonly ILogger loggerWithoutContext; private readonly IDanmakuClient danmakuClient; private readonly IApiClient apiClient; private readonly IBasicDanmakuWriter basicDanmakuWriter; private readonly IRecordTaskFactory recordTaskFactory; private readonly CancellationTokenSource cts; private readonly CancellationToken ct; private ILogger logger; private bool disposedValue; private int shortId; private string name = string.Empty; private string title = string.Empty; private string areaNameParent = string.Empty; private string areaNameChild = string.Empty; private bool danmakuConnected; private bool streaming; private bool autoRecordForThisSession = true; private IRecordTask? recordTask; private DateTimeOffset recordTaskStartTime; private DateTimeOffset danmakuClientConnectTime; private static readonly TimeSpan danmakuClientReconnectNoDelay = TimeSpan.FromMinutes(1); public Room(IServiceScope scope, RoomConfig roomConfig, int initDelayFactor, ILogger logger, IDanmakuClient danmakuClient, IApiClient apiClient, IBasicDanmakuWriter basicDanmakuWriter, IRecordTaskFactory recordTaskFactory) { this.scope = scope ?? throw new ArgumentNullException(nameof(scope)); this.RoomConfig = roomConfig ?? throw new ArgumentNullException(nameof(roomConfig)); this.loggerWithoutContext = logger?.ForContext() ?? throw new ArgumentNullException(nameof(logger)); this.logger = this.loggerWithoutContext.ForContext(LoggingContext.RoomId, this.RoomConfig.RoomId); this.danmakuClient = danmakuClient ?? throw new ArgumentNullException(nameof(danmakuClient)); this.apiClient = apiClient ?? throw new ArgumentNullException(nameof(apiClient)); this.basicDanmakuWriter = basicDanmakuWriter ?? throw new ArgumentNullException(nameof(basicDanmakuWriter)); this.recordTaskFactory = recordTaskFactory ?? throw new ArgumentNullException(nameof(recordTaskFactory)); this.timer = new Timer(this.RoomConfig.TimingCheckInterval * 1000d); this.cts = new CancellationTokenSource(); this.ct = this.cts.Token; this.PropertyChanged += this.Room_PropertyChanged; this.RoomConfig.PropertyChanged += this.RoomConfig_PropertyChanged; this.timer.Elapsed += this.Timer_Elapsed; this.danmakuClient.StatusChanged += this.DanmakuClient_StatusChanged; this.danmakuClient.DanmakuReceived += this.DanmakuClient_DanmakuReceived; _ = Task.Run(async () => { await Task.Delay(1500 + (initDelayFactor * 500)); this.timer.Start(); await this.RefreshRoomInfoAsync(); }); } public int ShortId { get => this.shortId; private set => this.SetField(ref this.shortId, value); } public string Name { get => this.name; private set => this.SetField(ref this.name, value); } public string Title { get => this.title; private set => this.SetField(ref this.title, value); } public string AreaNameParent { get => this.areaNameParent; private set => this.SetField(ref this.areaNameParent, value); } public string AreaNameChild { get => this.areaNameChild; private set => this.SetField(ref this.areaNameChild, value); } public JObject? RawBilibiliApiJsonData { get; private set; } public bool Streaming { get => this.streaming; private set => this.SetField(ref this.streaming, value); } public bool AutoRecordForThisSession { get => this.autoRecordForThisSession; private set => this.SetField(ref this.autoRecordForThisSession, value); } public bool DanmakuConnected { get => this.danmakuConnected; private set => this.SetField(ref this.danmakuConnected, value); } public bool Recording => this.recordTask != null; public RoomConfig RoomConfig { get; } public RoomStats Stats { get; } = new RoomStats(); public Guid ObjectId { get; } = Guid.NewGuid(); public event EventHandler? RecordSessionStarted; public event EventHandler? RecordSessionEnded; public event EventHandler? RecordFileOpening; public event EventHandler? RecordFileClosed; public event EventHandler? IOStats; public event EventHandler? RecordingStats; public event PropertyChangedEventHandler? PropertyChanged; public void SplitOutput() { if (this.disposedValue) return; lock (this.recordStartLock) { this.recordTask?.SplitOutput(); } } public void StartRecord() { if (this.disposedValue) return; lock (this.recordStartLock) { this.AutoRecordForThisSession = true; _ = Task.Run(() => { try { // 手动触发录制,启动录制前再刷新一次房间信息 this.CreateAndStartNewRecordTask(skipFetchRoomInfo: false); } catch (Exception ex) { this.logger.Write(ex is ExecutionRejectedException ? LogEventLevel.Verbose : LogEventLevel.Warning, ex, "尝试开始录制时出错"); } }); } } public void StopRecord() { if (this.disposedValue) return; lock (this.recordStartLock) { this.AutoRecordForThisSession = false; if (this.recordTask == null) return; this.recordTask.RequestStop(); } } public async Task RefreshRoomInfoAsync() { if (this.disposedValue) return; try { // 如果直播状态从 false 改成 true,Room_PropertyChanged 会触发录制 await this.FetchRoomInfoAsync().ConfigureAwait(false); this.StartDamakuConnection(delay: false); } catch (Exception ex) { this.logger.Write(ex is ExecutionRejectedException ? LogEventLevel.Verbose : LogEventLevel.Warning, ex, "刷新房间信息时出错"); } } #region Recording /// private async Task FetchRoomInfoAsync() { if (this.disposedValue) return; var room = (await this.apiClient.GetRoomInfoAsync(this.RoomConfig.RoomId).ConfigureAwait(false)).Data; if (room != null) { this.RoomConfig.RoomId = room.Room.RoomId; this.ShortId = room.Room.ShortId; this.Title = room.Room.Title; this.AreaNameParent = room.Room.ParentAreaName; this.AreaNameChild = room.Room.AreaName; this.Streaming = room.Room.LiveStatus == 1; this.Name = room.User.BaseInfo.Name; this.RawBilibiliApiJsonData = room.RawBilibiliApiJsonData; } } /// private void CreateAndStartNewRecordTask(bool skipFetchRoomInfo = false) { lock (this.recordStartLock) { if (this.disposedValue) return; if (!this.Streaming) return; if (this.recordTask != null) return; var task = this.recordTaskFactory.CreateRecordTask(this); task.IOStats += this.RecordTask_IOStats; task.RecordingStats += this.RecordTask_RecordingStats; task.RecordFileOpening += this.RecordTask_RecordFileOpening; task.RecordFileClosed += this.RecordTask_RecordFileClosed; task.RecordSessionEnded += this.RecordTask_RecordSessionEnded; this.recordTask = task; this.recordTaskStartTime = DateTimeOffset.UtcNow; this.Stats.Reset(); this.OnPropertyChanged(nameof(this.Recording)); _ = Task.Run(async () => { try { if (!skipFetchRoomInfo) await this.FetchRoomInfoAsync(); await this.recordTask.StartAsync(); } catch (NoMatchingQnValueException) { this.recordTask = null; this.OnPropertyChanged(nameof(this.Recording)); // 无匹配的画质,重试录制之前等待更长时间 _ = Task.Run(() => this.RestartAfterRecordTaskFailedAsync(RestartRecordingReason.NoMatchingQnValue)); return; } catch (Exception ex) { this.logger.Write(ex is ExecutionRejectedException ? LogEventLevel.Verbose : LogEventLevel.Warning, ex, "启动录制出错"); this.recordTask = null; this.OnPropertyChanged(nameof(this.Recording)); // 请求直播流出错时的重试逻辑 _ = Task.Run(() => this.RestartAfterRecordTaskFailedAsync(RestartRecordingReason.GenericRetry)); return; } RecordSessionStarted?.Invoke(this, new RecordSessionStartedEventArgs(this) { SessionId = this.recordTask.SessionId }); }); } } /// private async Task RestartAfterRecordTaskFailedAsync(RestartRecordingReason restartRecordingReason) { if (this.disposedValue) return; if (!this.Streaming || !this.AutoRecordForThisSession) return; try { if (!await this.recordRetryDelaySemaphoreSlim.WaitAsync(0).ConfigureAwait(false)) return; try { var delay = restartRecordingReason switch { RestartRecordingReason.GenericRetry => this.RoomConfig.TimingStreamRetry, RestartRecordingReason.NoMatchingQnValue => this.RoomConfig.TimingStreamRetryNoQn * 1000, _ => throw new InvalidOperationException() }; await Task.Delay((int)delay, this.ct).ConfigureAwait(false); } catch (TaskCanceledException) { // 房间已经被删除 return; } finally { this.recordRetryDelaySemaphoreSlim.Release(); } // 如果状态是非直播中,跳过重试尝试。当状态切换到直播中时会开始新的录制任务。 if (!this.Streaming || !this.AutoRecordForThisSession) return; // 启动录制时更新房间信息 if (this.Streaming && this.AutoRecordForThisSession) this.CreateAndStartNewRecordTask(skipFetchRoomInfo: false); } catch (Exception ex) { this.logger.Write(ex is ExecutionRejectedException ? LogEventLevel.Verbose : LogEventLevel.Warning, ex, "重试开始录制时出错"); _ = Task.Run(() => this.RestartAfterRecordTaskFailedAsync(restartRecordingReason)); } } /// private void StartDamakuConnection(bool delay = true) => Task.Run(async () => { if (this.disposedValue) return; try { if (delay) { try { await Task.Delay((int)this.RoomConfig.TimingDanmakuRetry, this.ct).ConfigureAwait(false); } catch (TaskCanceledException) { // 房间已被删除 return; } } await this.danmakuClient.ConnectAsync(this.RoomConfig.RoomId, this.RoomConfig.DanmakuTransport, this.ct).ConfigureAwait(false); } catch (Exception ex) { this.logger.Write(ex is ExecutionRejectedException ? LogEventLevel.Verbose : LogEventLevel.Warning, ex, "连接弹幕服务器时出错"); if (!this.ct.IsCancellationRequested) this.StartDamakuConnection(delay: true); } }); #endregion #region Event Handlers /// private void RecordTask_IOStats(object sender, IOStatsEventArgs e) { this.logger.Verbose("IO stats: {@stats}", e); this.Stats.StreamHost = e.StreamHost; this.Stats.StartTime = e.StartTime; this.Stats.EndTime = e.EndTime; this.Stats.Duration = e.Duration; this.Stats.NetworkBytesDownloaded = e.NetworkBytesDownloaded; this.Stats.NetworkMbps = e.NetworkMbps; this.Stats.DiskWriteDuration = e.DiskWriteDuration; this.Stats.DiskBytesWritten = e.DiskBytesWritten; this.Stats.DiskMBps = e.DiskMBps; IOStats?.Invoke(this, e); } /// private void RecordTask_RecordingStats(object sender, RecordingStatsEventArgs e) { this.logger.Verbose("Recording stats: {@stats}", e); this.Stats.SessionDuration = TimeSpan.FromMilliseconds(e.SessionDuration); this.Stats.TotalInputBytes = e.TotalInputBytes; this.Stats.TotalOutputBytes = e.TotalOutputBytes; this.Stats.CurrentFileSize = e.CurrentFileSize; this.Stats.SessionMaxTimestamp = TimeSpan.FromMilliseconds(e.SessionMaxTimestamp); this.Stats.FileMaxTimestamp = TimeSpan.FromMilliseconds(e.FileMaxTimestamp); this.Stats.AddedDuration = e.AddedDuration; this.Stats.PassedTime = e.PassedTime; this.Stats.DurationRatio = e.DurationRatio; this.Stats.InputVideoBytes = e.InputVideoBytes; this.Stats.InputAudioBytes = e.InputAudioBytes; this.Stats.OutputVideoFrames = e.OutputVideoFrames; this.Stats.OutputAudioFrames = e.OutputAudioFrames; this.Stats.OutputVideoBytes = e.OutputVideoBytes; this.Stats.OutputAudioBytes = e.OutputAudioBytes; this.Stats.TotalInputVideoBytes = e.TotalInputVideoBytes; this.Stats.TotalInputAudioBytes = e.TotalInputAudioBytes; this.Stats.TotalOutputVideoFrames = e.TotalOutputVideoFrames; this.Stats.TotalOutputAudioFrames = e.TotalOutputAudioFrames; this.Stats.TotalOutputVideoBytes = e.TotalOutputVideoBytes; this.Stats.TotalOutputAudioBytes = e.TotalOutputAudioBytes; RecordingStats?.Invoke(this, e); } /// private void RecordTask_RecordFileClosed(object sender, RecordFileClosedEventArgs e) { this.basicDanmakuWriter.Disable(); RecordFileClosed?.Invoke(this, e); } /// private void RecordTask_RecordFileOpening(object sender, RecordFileOpeningEventArgs e) { if (this.RoomConfig.RecordDanmaku) this.basicDanmakuWriter.EnableWithPath(Path.ChangeExtension(e.FullPath, "xml"), this); else this.basicDanmakuWriter.Disable(); if (this.RoomConfig.SaveStreamCover) { // 保存直播间封面 _ = Task.Run(async () => { try { var coverUrl = this.RawBilibiliApiJsonData?["room_info"]?["cover"]?.ToObject(); if (string.IsNullOrWhiteSpace(coverUrl)) { this.logger.Information("没有直播间封面信息"); return; } var targetPath = Path.ChangeExtension(e.FullPath, "cover" + Path.GetExtension(coverUrl)); using var http = new HttpClient(); http.DefaultRequestHeaders.UserAgent.Clear(); var stream = await http.GetStreamAsync(coverUrl).ConfigureAwait(false); using var file = new FileStream(targetPath, FileMode.Create, FileAccess.Write, FileShare.None); await stream.CopyToAsync(file).ConfigureAwait(false); this.logger.Debug("直播间封面已成功从 {CoverUrl} 保存到 {FilePath}", coverUrl, targetPath); } catch (Exception ex) { this.logger.Warning(ex, "保存直播间封面时出错"); } }); } RecordFileOpening?.Invoke(this, e); } /// private void RecordTask_RecordSessionEnded(object sender, EventArgs e) { Guid id; lock (this.recordStartLock) { id = this.recordTask?.SessionId ?? default; this.recordTask = null; _ = Task.Run(async () => { await Task.Yield(); // 录制结束退出后的重试逻辑 // 比 RestartAfterRecordTaskFailedAsync 少了等待时间 // 如果状态是非直播中,跳过重试尝试。当状态切换到直播中时会开始新的录制任务。 if (!this.Streaming || !this.AutoRecordForThisSession) return; try { // 开始录制前刷新房间信息 if (this.Streaming && this.AutoRecordForThisSession) this.CreateAndStartNewRecordTask(skipFetchRoomInfo: false); } catch (Exception ex) { this.logger.Write(LogEventLevel.Warning, ex, "重试开始录制时出错"); _ = Task.Run(() => this.RestartAfterRecordTaskFailedAsync(RestartRecordingReason.GenericRetry)); } }); } this.basicDanmakuWriter.Disable(); this.OnPropertyChanged(nameof(this.Recording)); this.Stats.Reset(); RecordSessionEnded?.Invoke(this, new RecordSessionEndedEventArgs(this) { SessionId = id }); } private void DanmakuClient_DanmakuReceived(object sender, Api.Danmaku.DanmakuReceivedEventArgs e) { var d = e.Danmaku; switch (d.MsgType) { case Api.Danmaku.DanmakuMsgType.LiveStart: this.Streaming = true; break; case Api.Danmaku.DanmakuMsgType.LiveEnd: this.Streaming = false; break; case Api.Danmaku.DanmakuMsgType.RoomChange: this.Title = d.Title ?? this.Title; this.AreaNameParent = d.ParentAreaName ?? this.AreaNameParent; this.AreaNameChild = d.AreaName ?? this.AreaNameChild; break; default: break; } _ = Task.Run(async () => await this.basicDanmakuWriter.WriteAsync(d)); } private void DanmakuClient_StatusChanged(object sender, Api.Danmaku.StatusChangedEventArgs e) { if (e.Connected) { this.DanmakuConnected = true; this.danmakuClientConnectTime = DateTimeOffset.UtcNow; this.logger.Information("弹幕服务器已连接"); } else { this.DanmakuConnected = false; this.logger.Information("与弹幕服务器的连接被断开"); // 如果连接弹幕服务器的时间在至少 1 分钟之前,重连时不需要等待 // 针对偶尔的网络波动的优化,如果偶尔断开了尽快重连,减少漏录的弹幕量 this.StartDamakuConnection(delay: !((DateTimeOffset.UtcNow - this.danmakuClientConnectTime) > danmakuClientReconnectNoDelay)); this.danmakuClientConnectTime = DateTimeOffset.MaxValue; } } private void Timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) { this.StartDamakuConnection(delay: false); if (this.RoomConfig.AutoRecord) { _ = Task.Run(async () => { try { // 定时主动检查不需要错误重试 await this.FetchRoomInfoAsync().ConfigureAwait(false); // 刚更新了房间信息不需要再获取一次 if (this.Streaming && this.AutoRecordForThisSession && this.RoomConfig.AutoRecord) this.CreateAndStartNewRecordTask(skipFetchRoomInfo: true); } catch (Exception) { } }); } } private void Room_PropertyChanged(object sender, PropertyChangedEventArgs e) { switch (e.PropertyName) { case nameof(this.Streaming): if (this.Streaming) { // 如果开播状态是通过广播消息获取的,本地的直播间信息就不是最新的,需要重新获取。 if (this.AutoRecordForThisSession && this.RoomConfig.AutoRecord) this.CreateAndStartNewRecordTask(skipFetchRoomInfo: false); } else { this.AutoRecordForThisSession = true; } break; default: break; } } private void RoomConfig_PropertyChanged(object sender, PropertyChangedEventArgs e) { switch (e.PropertyName) { case nameof(this.RoomConfig.RoomId): this.logger = this.loggerWithoutContext.ForContext(LoggingContext.RoomId, this.RoomConfig.RoomId); break; case nameof(this.RoomConfig.TimingCheckInterval): this.timer.Interval = this.RoomConfig.TimingCheckInterval * 1000d; break; case nameof(this.RoomConfig.AutoRecord): if (this.RoomConfig.AutoRecord) { this.AutoRecordForThisSession = true; // 启动录制时更新一次房间信息 if (this.Streaming && this.AutoRecordForThisSession) this.CreateAndStartNewRecordTask(skipFetchRoomInfo: false); } break; default: break; } } #endregion #region PropertyChanged protected void SetField(ref T location, T value, [CallerMemberName] string? propertyName = null) { if (EqualityComparer.Default.Equals(location, value)) return; location = value; if (propertyName != null) this.OnPropertyChanged(propertyName); } protected void OnPropertyChanged(string propertyName) => PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(propertyName: propertyName)); #endregion #region Dispose protected virtual void Dispose(bool disposing) { if (!this.disposedValue) { this.disposedValue = true; if (disposing) { // dispose managed state (managed objects) this.cts.Cancel(); this.cts.Dispose(); this.recordTask?.RequestStop(); this.basicDanmakuWriter.Disable(); this.scope.Dispose(); } // free unmanaged resources (unmanaged objects) and override finalizer // set large fields to null } } // override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources // ~Room() // { // // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method // Dispose(disposing: false); // } public void Dispose() { // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method this.Dispose(disposing: true); GC.SuppressFinalize(this); } #endregion } }