FLV: Add new rule & other changes

This commit is contained in:
Genteure 2021-04-20 20:41:26 +08:00
parent cf5dae1851
commit a0a3fd9044
13 changed files with 248 additions and 73 deletions

View File

@ -7,6 +7,7 @@ using BililiveRecorder.Core.Config;
using BililiveRecorder.Core.Config.V2;
using BililiveRecorder.Core.Event;
using BililiveRecorder.Core.SimpleWebhook;
using Serilog;
namespace BililiveRecorder.Core
{
@ -15,15 +16,17 @@ namespace BililiveRecorder.Core
private readonly object lockObject = new object();
private readonly ObservableCollection<IRoom> roomCollection;
private readonly IRoomFactory roomFactory;
private readonly ILogger logger;
private readonly BasicWebhookV1 basicWebhookV1;
private readonly BasicWebhookV2 basicWebhookV2;
private bool disposedValue;
public Recorder(IRoomFactory roomFactory, ConfigV2 config)
public Recorder(IRoomFactory roomFactory, ConfigV2 config, ILogger logger)
{
this.roomFactory = roomFactory ?? throw new ArgumentNullException(nameof(roomFactory));
this.Config = config ?? throw new ArgumentNullException(nameof(config));
this.logger = logger?.ForContext<Recorder>() ?? throw new ArgumentNullException(nameof(logger));
this.roomCollection = new ObservableCollection<IRoom>();
this.Rooms = new ReadOnlyObservableCollection<IRoom>(this.roomCollection);
@ -169,6 +172,7 @@ namespace BililiveRecorder.Core
if (disposing)
{
// dispose managed state (managed objects)
this.logger.Debug("Dispose called");
this.SaveConfig();
foreach (var room in this.roomCollection)
room.Dispose();

View File

@ -6,6 +6,7 @@ namespace BililiveRecorder.Flv.Pipeline
Logging,
Unrepairable,
TimestampJump,
TimestampOffset,
DecodingHeader,
RepeatingData,
}

View File

@ -26,10 +26,11 @@ namespace BililiveRecorder.Flv.Pipeline
builder
.Add<HandleEndTagRule>()
.Add<HandleDelayedAudioHeaderRule>()
// TODO .Add<CheckMissingKeyframeRule>()
.Add<UpdateDataTagOrderRule>()
.Add<CheckDiscontinuityRule>()
.Add<UpdateTimestampRule>()
// TODO .Add<CheckMissingKeyframeRule>()
// .Add<UpdateDataTagOrderRule>()
// .Add<CheckDiscontinuityRule>()
.Add<UpdateTimestampOffsetRule>()
.Add<UpdateTimestampJumpRule>()
.Add<HandleNewScriptRule>()
.Add<HandleNewHeaderRule>()
.Add<RemoveDuplicatedChunkRule>()

View File

@ -4,7 +4,7 @@ using System.Linq;
namespace BililiveRecorder.Flv.Pipeline.Rules
{
public class UpdateTimestampRule : ISimpleProcessingRule
public class UpdateTimestampJumpRule : ISimpleProcessingRule
{
private const string TS_STORE_KEY = "Timestamp_Store_Key";
@ -66,16 +66,17 @@ namespace BililiveRecorder.Flv.Pipeline.Rules
private void SetDataTimestamp(IReadOnlyList<Tag> tags, TimestampStore ts, FlvProcessingContext context)
{
var diff = tags[0].Timestamp - ts.LastOriginal;
var currentTimestamp = tags[0].Timestamp;
var diff = currentTimestamp - ts.LastOriginal;
if (diff < 0)
{
context.AddComment(new ProcessingComment(CommentType.TimestampJump, "时间戳问题:变小, Diff: " + diff));
ts.CurrentOffset = tags[0].Timestamp - ts.NextTimestampTarget;
context.AddComment(new ProcessingComment(CommentType.TimestampJump, $"时间戳变小, curr: {currentTimestamp}, diff: {diff}"));
ts.CurrentOffset = currentTimestamp - ts.NextTimestampTarget;
}
else if (diff > JUMP_THRESHOLD)
{
context.AddComment(new ProcessingComment(CommentType.TimestampJump, "时间戳问题:间隔过大, Diff: " + diff));
ts.CurrentOffset = tags[0].Timestamp - ts.NextTimestampTarget;
context.AddComment(new ProcessingComment(CommentType.TimestampJump, $"时间戳间隔过大, curr: {currentTimestamp}, diff: {diff}"));
ts.CurrentOffset = currentTimestamp - ts.NextTimestampTarget;
}
ts.LastOriginal = tags.Last().Timestamp;

View File

@ -0,0 +1,67 @@
using System;
using System.Collections.Generic;
using System.Linq;
namespace BililiveRecorder.Flv.Pipeline.Rules
{
public class UpdateTimestampOffsetRule : ISimpleProcessingRule
{
private const int MAX_ALLOWED_DIFF = 1000 * 10; // 10 seconds
private static readonly ProcessingComment comment1 = new ProcessingComment(CommentType.Unrepairable, "GOP 内音频或视频时间戳不连续");
public void Run(FlvProcessingContext context, Action next)
{
context.PerActionRun(this.RunPerAction);
next();
}
private bool CheckIfNormal(IEnumerable<Tag> data) => !data.Any2((a, b) => a.Timestamp > b.Timestamp);
private IEnumerable<PipelineAction?> RunPerAction(FlvProcessingContext context, PipelineAction action)
{
if (action is PipelineDataAction data)
{
var isNormal = this.CheckIfNormal(data.Tags);
if (isNormal)
{
yield return data;
yield break;
}
// 这个问题可能不能稳定修复,如果是在录直播最好还是断开重连,获取正常的直播流
// TODO 确认修复效果
yield return PipelineDisconnectAction.Instance;
if (!(this.CheckIfNormal(data.Tags.Where(x => x.Type == TagType.Audio)) && this.CheckIfNormal(data.Tags.Where(x => x.Type == TagType.Video))))
{
context.AddComment(comment1);
yield break;
}
else
{
var audio = data.Tags.First(x => x.Type == TagType.Audio);
var video = data.Tags.First(x => x.Type == TagType.Video);
var diff = audio.Timestamp - video.Timestamp;
if (diff > 50)
{
context.AddComment(new ProcessingComment(CommentType.TimestampOffset, $"音视频时间戳偏移, A: {audio.Timestamp}, V: {video.Timestamp}, D: {diff}"));
foreach (var tag in data.Tags.Where(x => x.Type == TagType.Audio))
{
tag.Timestamp -= diff;
}
}
// 因为上面已经检查了音频或视频单独不存在时间戳跳变问题,所以可以进行排序
data.Tags = data.Tags.OrderBy(x => x.Timestamp).ToList();
yield return data;
}
}
else
yield return action;
}
}
}

View File

@ -3,7 +3,7 @@ using System.Collections.Generic;
using System.Threading.Tasks;
using BililiveRecorder.Flv.Amf;
namespace BililiveRecorder.Flv.RuleTests
namespace BililiveRecorder.Flv.Writer
{
public class FlvTagListWriter : IFlvTagWriter
{

View File

@ -34,6 +34,7 @@ namespace BililiveRecorder.ToolBox.Commands
public int IssueTypeOther { get; set; }
public int IssueTypeUnrepairable { get; set; }
public int IssueTypeTimestampJump { get; set; }
public int IssueTypeTimestampOffset { get; set; }
public int IssueTypeDecodingHeader { get; set; }
public int IssueTypeRepeatingData { get; set; }
}
@ -46,18 +47,17 @@ namespace BililiveRecorder.ToolBox.Commands
public async Task<CommandResponse<AnalyzeResponse>> Handle(AnalyzeRequest request, Func<double, Task>? progress)
{
FileStream? flvFileStream = null;
try
{
var memoryStreamProvider = new DefaultMemoryStreamProvider();
var tagWriter = new AnalyzeMockFlvTagWriter();
var comments = new List<ProcessingComment>();
var context = new FlvProcessingContext();
var session = new Dictionary<object, object?>();
// Input
string? inputPath;
IFlvTagReader tagReader;
FileStream? flvFileStream = null;
try
{
inputPath = Path.GetFullPath(request.Input);
@ -77,7 +77,7 @@ namespace BililiveRecorder.ToolBox.Commands
});
else
{
flvFileStream = File.Open(inputPath, FileMode.Open, FileAccess.Read, FileShare.Read);
flvFileStream = new FileStream(inputPath, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, FileOptions.Asynchronous | FileOptions.SequentialScan);
tagReader = new FlvTagPipeReader(PipeReader.Create(flvFileStream), memoryStreamProvider, skipData: false, logger: logger);
}
}
@ -91,37 +91,46 @@ namespace BililiveRecorder.ToolBox.Commands
};
}
// Output
var tagWriter = new AnalyzeMockFlvTagWriter();
// Pipeline
using var grouping = new TagGroupReader(tagReader);
using var writer = new FlvProcessingContextWriter(tagWriter: tagWriter, allowMissingHeader: true);
var pipeline = new ProcessingPipelineBuilder(new ServiceCollection().BuildServiceProvider()).AddDefault().AddRemoveFillerData().Build();
var count = 0;
while (true)
// Run
await Task.Run(async () =>
{
var group = await grouping.ReadGroupAsync(default).ConfigureAwait(false);
if (group is null)
break;
context.Reset(group, session);
pipeline(context);
if (context.Comments.Count > 0)
var count = 0;
while (true)
{
comments.AddRange(context.Comments);
logger.Debug("分析逻辑输出 {@Comments}", context.Comments);
var group = await grouping.ReadGroupAsync(default).ConfigureAwait(false);
if (group is null)
break;
context.Reset(group, session);
pipeline(context);
if (context.Comments.Count > 0)
{
comments.AddRange(context.Comments);
logger.Debug("分析逻辑输出 {@Comments}", context.Comments);
}
await writer.WriteAsync(context).ConfigureAwait(false);
foreach (var action in context.Actions)
if (action is PipelineDataAction dataAction)
foreach (var tag in dataAction.Tags)
tag.BinaryData?.Dispose();
if (count++ % 10 == 0 && flvFileStream is not null && progress is not null)
await progress((double)flvFileStream.Position / flvFileStream.Length);
}
}).ConfigureAwait(false);
await writer.WriteAsync(context).ConfigureAwait(false);
foreach (var action in context.Actions)
if (action is PipelineDataAction dataAction)
foreach (var tag in dataAction.Tags)
tag.BinaryData?.Dispose();
if (count++ % 10 == 0 && flvFileStream is not null && progress is not null)
await progress((double)flvFileStream.Position / flvFileStream.Length);
}
// Result
var response = await Task.Run(() =>
{
var countableComments = comments.Where(x => x.T != CommentType.Logging).ToArray();
@ -137,6 +146,7 @@ namespace BililiveRecorder.ToolBox.Commands
IssueTypeOther = countableComments.Count(x => x.T == CommentType.Other),
IssueTypeUnrepairable = countableComments.Count(x => x.T == CommentType.Unrepairable),
IssueTypeTimestampJump = countableComments.Count(x => x.T == CommentType.TimestampJump),
IssueTypeTimestampOffset = countableComments.Count(x => x.T == CommentType.TimestampOffset),
IssueTypeDecodingHeader = countableComments.Count(x => x.T == CommentType.DecodingHeader),
IssueTypeRepeatingData = countableComments.Count(x => x.T == CommentType.RepeatingData)
};
@ -175,6 +185,10 @@ namespace BililiveRecorder.ToolBox.Commands
ErrorMessage = ex.Message
};
}
finally
{
flvFileStream?.Dispose();
}
}
public void PrintResponse(AnalyzeResponse response)

View File

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.IO.Pipelines;
using System.Linq;
using System.Threading.Tasks;
@ -9,6 +10,7 @@ using BililiveRecorder.Flv.Grouping;
using BililiveRecorder.Flv.Parser;
using BililiveRecorder.Flv.Pipeline;
using BililiveRecorder.Flv.Writer;
using BililiveRecorder.Flv.Xml;
using Microsoft.Extensions.DependencyInjection;
using Serilog;
@ -35,6 +37,7 @@ namespace BililiveRecorder.ToolBox.Commands
public int IssueTypeOther { get; set; }
public int IssueTypeUnrepairable { get; set; }
public int IssueTypeTimestampJump { get; set; }
public int IssueTypeTimestampOffset { get; set; }
public int IssueTypeDecodingHeader { get; set; }
public int IssueTypeRepeatingData { get; set; }
}
@ -47,41 +50,79 @@ namespace BililiveRecorder.ToolBox.Commands
public async Task<CommandResponse<FixResponse>> Handle(FixRequest request, Func<double, Task>? progress)
{
FileStream? inputStream = null;
FileStream? flvFileStream = null;
try
{
var inputPath = Path.GetFullPath(request.Input);
var outputPaths = new List<string>();
var targetProvider = new AutoFixFlvWriterTargetProvider(request.OutputBase);
targetProvider.BeforeFileOpen += (sender, path) => outputPaths.Add(path);
var memoryStreamProvider = new DefaultMemoryStreamProvider();
var tagWriter = new FlvTagFileWriter(targetProvider, memoryStreamProvider, logger);
var comments = new List<ProcessingComment>();
var context = new FlvProcessingContext();
var session = new Dictionary<object, object?>();
// Input
string? inputPath;
IFlvTagReader tagReader;
var xmlMode = false;
try
{
try
inputPath = Path.GetFullPath(request.Input);
if (inputPath.EndsWith(".gz", StringComparison.OrdinalIgnoreCase))
{
inputStream = File.Open(inputPath, FileMode.Open, FileAccess.Read, FileShare.Read);
}
catch (Exception ex) when (ex is not FlvException)
{
return new CommandResponse<FixResponse>
xmlMode = true;
tagReader = await Task.Run(() =>
{
Status = ResponseStatus.InputIOError,
Exception = ex,
ErrorMessage = ex.Message
};
using var stream = new GZipStream(File.Open(inputPath, FileMode.Open, FileAccess.Read, FileShare.Read), CompressionMode.Decompress);
var xmlFlvFile = (XmlFlvFile)XmlFlvFile.Serializer.Deserialize(stream);
return new FlvTagListReader(xmlFlvFile.Tags);
});
}
else if (inputPath.EndsWith(".xml", StringComparison.OrdinalIgnoreCase))
{
xmlMode = true;
tagReader = await Task.Run(() =>
{
using var stream = File.Open(inputPath, FileMode.Open, FileAccess.Read, FileShare.Read);
var xmlFlvFile = (XmlFlvFile)XmlFlvFile.Serializer.Deserialize(stream);
return new FlvTagListReader(xmlFlvFile.Tags);
});
}
else
{
flvFileStream = new FileStream(inputPath, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, FileOptions.Asynchronous | FileOptions.SequentialScan);
tagReader = new FlvTagPipeReader(PipeReader.Create(flvFileStream), memoryStreamProvider, skipData: false, logger: logger);
}
}
catch (Exception ex) when (ex is not FlvException)
{
return new CommandResponse<FixResponse>
{
Status = ResponseStatus.InputIOError,
Exception = ex,
ErrorMessage = ex.Message
};
}
using var grouping = new TagGroupReader(new FlvTagPipeReader(PipeReader.Create(inputStream), memoryStreamProvider, skipData: false, logger: logger));
using var writer = new FlvProcessingContextWriter(tagWriter: tagWriter, allowMissingHeader: true);
var pipeline = new ProcessingPipelineBuilder(new ServiceCollection().BuildServiceProvider()).AddDefault().AddRemoveFillerData().Build();
// Output
var outputPaths = new List<string>();
IFlvTagWriter tagWriter;
if (xmlMode)
{
tagWriter = new FlvTagListWriter();
}
else
{
var targetProvider = new AutoFixFlvWriterTargetProvider(request.OutputBase);
targetProvider.BeforeFileOpen += (sender, path) => outputPaths.Add(path);
tagWriter = new FlvTagFileWriter(targetProvider, memoryStreamProvider, logger);
}
// Pipeline
using var grouping = new TagGroupReader(tagReader);
using var writer = new FlvProcessingContextWriter(tagWriter: tagWriter, allowMissingHeader: true);
var pipeline = new ProcessingPipelineBuilder(new ServiceCollection().BuildServiceProvider()).AddDefault().AddRemoveFillerData().Build();
// Run
await Task.Run(async () =>
{
var count = 0;
while (true)
{
@ -105,11 +146,41 @@ namespace BililiveRecorder.ToolBox.Commands
foreach (var tag in dataAction.Tags)
tag.BinaryData?.Dispose();
if (count++ % 10 == 0 && progress is not null)
await progress((double)inputStream.Position / inputStream.Length);
if (count++ % 10 == 0 && progress is not null && flvFileStream is not null)
await progress((double)flvFileStream.Position / flvFileStream.Length);
}
}).ConfigureAwait(false);
// Post Run
if (xmlMode)
{
await Task.Run(() =>
{
var w = (FlvTagListWriter)tagWriter;
for (var i = 0; i < w.Files.Count; i++)
{
var path = Path.ChangeExtension(request.OutputBase, $"fix_p{i + 1}.brec.xml");
outputPaths.Add(path);
using var file = new StreamWriter(File.Create(path));
XmlFlvFile.Serializer.Serialize(file, new XmlFlvFile { Tags = w.Files[i] });
}
if (w.AlternativeHeaders.Count > 0)
{
var path = Path.ChangeExtension(request.OutputBase, $"headers.txt");
using var writer = new StreamWriter(File.Open(path, FileMode.Append, FileAccess.Write, FileShare.None));
foreach (var tag in w.AlternativeHeaders)
{
writer.WriteLine();
writer.WriteLine(tag.ToString());
writer.WriteLine(tag.BinaryDataForSerializationUseOnly);
}
}
});
}
// Result
var response = await Task.Run(() =>
{
var countableComments = comments.Where(x => x.T != CommentType.Logging).ToArray();
@ -125,6 +196,7 @@ namespace BililiveRecorder.ToolBox.Commands
IssueTypeOther = countableComments.Count(x => x.T == CommentType.Other),
IssueTypeUnrepairable = countableComments.Count(x => x.T == CommentType.Unrepairable),
IssueTypeTimestampJump = countableComments.Count(x => x.T == CommentType.TimestampJump),
IssueTypeTimestampOffset = countableComments.Count(x => x.T == CommentType.TimestampOffset),
IssueTypeDecodingHeader = countableComments.Count(x => x.T == CommentType.DecodingHeader),
IssueTypeRepeatingData = countableComments.Count(x => x.T == CommentType.RepeatingData)
};
@ -161,7 +233,7 @@ namespace BililiveRecorder.ToolBox.Commands
}
finally
{
inputStream?.Dispose();
flvFileStream?.Dispose();
}
}
@ -218,7 +290,7 @@ namespace BililiveRecorder.ToolBox.Commands
{
var i = this.fileIndex++;
var path = Path.ChangeExtension(this.pathTemplate, $"fix_p{i}.flv");
var fileStream = File.Create(path);
var fileStream = File.Open(path, FileMode.CreateNew, FileAccess.Write, FileShare.Read);
BeforeFileOpen?.Invoke(this, path);
return (fileStream, null!);
}

View File

@ -5,6 +5,7 @@ using System.Windows;
using BililiveRecorder.WPF.Controls;
using Hardcodet.Wpf.TaskbarNotification;
using ModernWpf.Controls;
using Serilog;
using WPFLocalizeExtension.Engine;
using WPFLocalizeExtension.Extensions;
@ -111,6 +112,7 @@ namespace BililiveRecorder.WPF
else
{
SingleInstance.NotificationReceived -= this.SingleInstance_NotificationReceived;
Log.Logger.ForContext<NewMainWindow>().Debug("Window Closing");
NativeBeforeWindowClose?.Invoke(this, EventArgs.Empty);
return;
}

View File

@ -88,7 +88,8 @@
</StackPanel>
<StackPanel Grid.Row="3" HorizontalAlignment="Center" Margin="10">
<TextBlock Text="{Binding OutputFileCount,StringFormat=修复将会输出 {0} 个文件}" Margin="0,0,0,5"/>
<TextBlock Text="{Binding IssueTypeTimestampJump,StringFormat=时间戳问题 {0} 处}"/>
<TextBlock Text="{Binding IssueTypeTimestampOffset,StringFormat=时间戳错位问题 {0} 处}"/>
<TextBlock Text="{Binding IssueTypeTimestampJump,StringFormat=时间戳跳变问题 {0} 处}"/>
<TextBlock Text="{Binding IssueTypeDecodingHeader,StringFormat=分辨率、解码问题 {0} 处}"/>
<TextBlock Text="{Binding IssueTypeRepeatingData,StringFormat=重复片段 {0} 处}"/>
<TextBlock Text="{Binding IssueTypeOther,StringFormat=其他问题 {0} 处}"/>

View File

@ -5,6 +5,7 @@ using System.Linq;
using System.Threading.Tasks;
using BililiveRecorder.Flv.Grouping;
using BililiveRecorder.Flv.Pipeline;
using BililiveRecorder.Flv.Writer;
using BililiveRecorder.Flv.Xml;
using Newtonsoft.Json;
using Xunit;

View File

@ -1,7 +1,9 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using BililiveRecorder.Flv.Grouping;
using BililiveRecorder.Flv.Pipeline;
using BililiveRecorder.Flv.Writer;
using BililiveRecorder.Flv.Xml;
using Xunit;

View File

@ -16,14 +16,23 @@ namespace BililiveRecorder.Flv.RuleTests.Integrated
{
protected XmlFlvFile LoadFile(string path)
{
if (Path.GetExtension(path) == ".gz")
return (XmlFlvFile)XmlFlvFile.Serializer.Deserialize(new GZipStream(File.Open(path, FileMode.Open, FileAccess.Read, FileShare.Read), CompressionMode.Decompress));
Stream? stream = null;
try
{
var gz_path = path + ".gz";
if (Path.GetExtension(path) == ".gz")
stream = new GZipStream(File.Open(path, FileMode.Open, FileAccess.Read, FileShare.Read), CompressionMode.Decompress);
else if (File.Exists(gz_path))
stream = new GZipStream(File.Open(gz_path, FileMode.Open, FileAccess.Read, FileShare.Read), CompressionMode.Decompress);
else
stream = File.Open(path, FileMode.Open, FileAccess.Read, FileShare.Read);
var gz = path + ".gz";
if (File.Exists(gz))
return (XmlFlvFile)XmlFlvFile.Serializer.Deserialize(new GZipStream(File.Open(gz, FileMode.Open, FileAccess.Read, FileShare.Read), CompressionMode.Decompress));
return (XmlFlvFile)XmlFlvFile.Serializer.Deserialize(File.Open(path, FileMode.Open, FileAccess.Read, FileShare.Read));
return (XmlFlvFile)XmlFlvFile.Serializer.Deserialize(stream);
}
finally
{
stream?.Dispose();
}
}
protected ProcessingDelegate BuildPipeline() =>