FLV: 重构处理规则,去除 async

This commit is contained in:
Genteure 2021-03-09 00:50:13 +08:00
parent 3580313bd8
commit 44d3d672c9
22 changed files with 243 additions and 187 deletions

View File

@ -52,7 +52,7 @@ namespace BililiveRecorder.Core.ProcessingRules
var groups = new List<List<PipelineDataAction>?>();
{
List<PipelineDataAction>? curr = null;
foreach (var action in context.Output)
foreach (var action in context.Actions)
{
if (action is PipelineDataAction dataAction)
{

View File

@ -266,14 +266,14 @@ namespace BililiveRecorder.Core.Recording
this.context.Reset(group, this.session);
await this.pipeline(this.context).ConfigureAwait(false);
this.pipeline(this.context);
if (this.context.Comments.Count > 0)
this.logger.Debug("修复逻辑输出 {@Comments}", this.context.Comments);
await this.writer.WriteAsync(this.context).ConfigureAwait(false);
if (this.context.Output.Any(x => x is PipelineDisconnectAction))
if (this.context.Actions.Any(x => x is PipelineDisconnectAction))
{
this.logger.Information("根据修复逻辑的要求结束录制");
break;

View File

@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Linq;
namespace BililiveRecorder.Flv.Pipeline
{
@ -17,9 +18,10 @@ namespace BililiveRecorder.Flv.Pipeline
}
#pragma warning restore CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable.
[Obsolete("obsolete", true)]
public PipelineAction OriginalInput { get; private set; }
public List<PipelineAction> Output { get; set; }
public List<PipelineAction> Actions { get; set; }
public IDictionary<object, object?> SessionItems { get; private set; }
@ -27,11 +29,16 @@ namespace BililiveRecorder.Flv.Pipeline
public List<ProcessingComment> Comments { get; private set; }
public void Reset(PipelineAction data, IDictionary<object, object?> sessionItems)
public void Reset(PipelineAction action, IDictionary<object, object?> sessionItems)
{
var actions = new List<PipelineAction> { action ?? throw new ArgumentNullException(nameof(action)) };
this.Reset(actions, sessionItems);
}
public void Reset(List<PipelineAction> actions, IDictionary<object, object?> sessionItems)
{
this.OriginalInput = data ?? throw new ArgumentNullException(nameof(data));
this.SessionItems = sessionItems ?? throw new ArgumentNullException(nameof(sessionItems));
this.Output = new List<PipelineAction> { this.OriginalInput.Clone() };
this.Actions = actions;
this.LocalItems = new Dictionary<object, object?>();
this.Comments = new List<ProcessingComment>();
}
@ -45,18 +52,39 @@ namespace BililiveRecorder.Flv.Pipeline
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void AddNewFileAtStart(this FlvProcessingContext context)
=> context.Output.Insert(0, PipelineNewFileAction.Instance);
=> context.Actions.Insert(0, PipelineNewFileAction.Instance);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void AddNewFileAtEnd(this FlvProcessingContext context)
=> context.Output.Add(PipelineNewFileAction.Instance);
=> context.Actions.Add(PipelineNewFileAction.Instance);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void AddDisconnectAtStart(this FlvProcessingContext context)
=> context.Output.Insert(0, PipelineDisconnectAction.Instance);
=> context.Actions.Insert(0, PipelineDisconnectAction.Instance);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void ClearOutput(this FlvProcessingContext context)
=> context.Output.Clear();
=> context.Actions.Clear();
public static bool PerActionRun(this FlvProcessingContext context, Func<FlvProcessingContext, PipelineAction, IEnumerable<PipelineAction?>> func)
{
var success = true;
var actions = context.Actions;
var result = new List<PipelineAction>();
foreach (var output in actions.SelectMany(action => func(context, action)))
{
if (output is null)
{
success = false;
goto exit;
}
result.Add(output);
}
exit:
context.Actions = result;
return success;
}
}
}

View File

@ -1,9 +1,7 @@
using System.Threading.Tasks;
namespace BililiveRecorder.Flv.Pipeline
{
public interface IFullProcessingRule : IProcessingRule
{
Task RunAsync(FlvProcessingContext context, ProcessingDelegate next);
void Run(FlvProcessingContext context, ProcessingDelegate next);
}
}

View File

@ -9,16 +9,16 @@ namespace BililiveRecorder.Flv.Pipeline
public static IProcessingPipelineBuilder Add<T>(this IProcessingPipelineBuilder builder) where T : IProcessingRule =>
builder.Add(next => (ActivatorUtilities.GetServiceOrCreateInstance<T>(builder.ServiceProvider)) switch
{
ISimpleProcessingRule simple => context => simple.RunAsync(context, () => next(context)),
IFullProcessingRule full => context => full.RunAsync(context, next),
ISimpleProcessingRule simple => context => simple.Run(context, () => next(context)),
IFullProcessingRule full => context => full.Run(context, next),
_ => throw new ArgumentException($"Type ({typeof(T).FullName}) does not ISimpleProcessingRule or IFullProcessingRule")
});
public static IProcessingPipelineBuilder Add<T>(this IProcessingPipelineBuilder builder, T instance) where T : IProcessingRule =>
instance switch
{
ISimpleProcessingRule simple => builder.Add(next => context => simple.RunAsync(context, () => next(context))),
IFullProcessingRule full => builder.Add(next => context => full.RunAsync(context, next)),
ISimpleProcessingRule simple => builder.Add(next => context => simple.Run(context, () => next(context))),
IFullProcessingRule full => builder.Add(next => context => full.Run(context, next)),
_ => throw new ArgumentException($"Type ({typeof(T).FullName}) does not ISimpleProcessingRule or IFullProcessingRule")
};

View File

@ -1,10 +1,9 @@
using System;
using System.Threading.Tasks;
namespace BililiveRecorder.Flv.Pipeline
{
public interface ISimpleProcessingRule : IProcessingRule
{
Task RunAsync(FlvProcessingContext context, Func<Task> next);
void Run(FlvProcessingContext context, Action next);
}
}

View File

@ -1,6 +1,4 @@
using System.Threading.Tasks;
namespace BililiveRecorder.Flv.Pipeline
{
public delegate Task ProcessingDelegate(FlvProcessingContext context);
public delegate void ProcessingDelegate(FlvProcessingContext context);
}

View File

@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace BililiveRecorder.Flv.Pipeline
{
@ -23,6 +22,6 @@ namespace BililiveRecorder.Flv.Pipeline
}
public ProcessingDelegate Build()
=> this.rules.AsEnumerable().Reverse().Aggregate((ProcessingDelegate)(_ => Task.CompletedTask), (i, o) => o(i));
=> this.rules.AsEnumerable().Reverse().Aggregate((ProcessingDelegate)(_ => { }), (i, o) => o(i));
}
}

View File

@ -1,5 +1,5 @@
using System;
using System.Threading.Tasks;
using System.Collections.Generic;
namespace BililiveRecorder.Flv.Pipeline.Rules
{
@ -19,9 +19,15 @@ namespace BililiveRecorder.Flv.Pipeline.Rules
private static readonly ProcessingComment Comment1 = new ProcessingComment(CommentType.Unrepairable, "Flv Chunk 内出现时间戳跳变(变小)");
private static readonly ProcessingComment Comment2 = new ProcessingComment(CommentType.Unrepairable, "Flv Chunk 内出现时间戳跳变(间隔过大)");
public Task RunAsync(FlvProcessingContext context, Func<Task> next)
public void Run(FlvProcessingContext context, Action next)
{
if (context.OriginalInput is PipelineDataAction data)
context.PerActionRun(this.RunPerAction);
next();
}
private IEnumerable<PipelineAction?> RunPerAction(FlvProcessingContext context, PipelineAction action)
{
if (action is PipelineDataAction data)
{
for (var i = 0; i < data.Tags.Count - 1; i++)
{
@ -30,22 +36,24 @@ namespace BililiveRecorder.Flv.Pipeline.Rules
if (f1.Timestamp > f2.Timestamp)
{
context.ClearOutput();
context.AddDisconnectAtStart();
context.AddComment(Comment1);
return Task.CompletedTask;
yield return PipelineDisconnectAction.Instance;
yield return null;
yield break;
}
else if ((f2.Timestamp - f1.Timestamp) > MAX_ALLOWED_DIFF)
{
context.ClearOutput();
context.AddDisconnectAtStart();
context.AddComment(Comment2);
return Task.CompletedTask;
yield return PipelineDisconnectAction.Instance;
yield return null;
yield break;
}
}
return next();
yield return data;
}
else return next();
else
yield return action;
}
}
}

View File

@ -1,6 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace BililiveRecorder.Flv.Pipeline.Rules
{
@ -17,20 +17,29 @@ namespace BililiveRecorder.Flv.Pipeline.Rules
{
private static readonly ProcessingComment comment = new ProcessingComment(CommentType.Unrepairable, "Flv Chunk 内缺少关键帧");
public Task RunAsync(FlvProcessingContext context, Func<Task> next)
public void Run(FlvProcessingContext context, Action next)
{
if (context.OriginalInput is PipelineDataAction data)
context.PerActionRun(this.RunPerAction);
next();
}
private IEnumerable<PipelineAction?> RunPerAction(FlvProcessingContext context, PipelineAction action)
{
if (action is PipelineDataAction data)
{
var f = data.Tags.FirstOrDefault(x => x.Type == TagType.Video);
if (f == null || (0 == (f.Flag & TagFlag.Keyframe)))
{
context.AddComment(comment);
context.AddDisconnectAtStart();
return Task.CompletedTask;
yield return PipelineDisconnectAction.Instance;
yield return null;
yield break;
}
else return next();
else
yield return action;
}
else return next();
else
yield return action;
}
}
}

View File

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace BililiveRecorder.Flv.Pipeline.Rules
{
@ -9,59 +10,50 @@ namespace BililiveRecorder.Flv.Pipeline.Rules
/// <remarks>
/// 本规则应该放在所有规则前面
/// </remarks>
public class HandleDelayedAudioHeaderRule : IFullProcessingRule
public class HandleDelayedAudioHeaderRule : ISimpleProcessingRule
{
private static readonly ProcessingComment comment = new ProcessingComment(CommentType.DecodingHeader, "检测到延后收到的音频头");
public Task RunAsync(FlvProcessingContext context, ProcessingDelegate next)
public void Run(FlvProcessingContext context, Action next)
{
if (context.OriginalInput is PipelineDataAction dataAction)
{
if (!dataAction.Tags.Any(x => x.IsHeader()))
return next(context);
else
return this.RunAsyncCore(dataAction, context, next);
}
else
return next(context);
context.PerActionRun(this.RunPerAction);
next();
}
private async Task RunAsyncCore(PipelineDataAction dataAction, FlvProcessingContext context, ProcessingDelegate next)
private IEnumerable<PipelineAction?> RunPerAction(FlvProcessingContext context, PipelineAction action)
{
context.ClearOutput();
context.AddComment(comment);
var tags = dataAction.Tags;
var index = tags.IndexOf(tags.Last(x => x.Flag == TagFlag.Header));
for (var i = 0; i < index; i++)
if (action is PipelineDataAction data)
{
if (tags[i].Type == TagType.Audio)
var tags = data.Tags;
if (tags.Any(x => x.IsHeader()))
{
context.AddDisconnectAtStart();
return;
context.AddComment(comment);
var index = tags.IndexOf(tags.Last(x => x.Flag == TagFlag.Header));
for (var i = 0; i < index; i++)
{
if (tags[i].Type == TagType.Audio)
{
// 在一段数据内 Header 之前出现了音频数据
yield return PipelineDisconnectAction.Instance;
yield return null;
yield break;
}
}
var headerTags = tags.Where(x => x.Flag == TagFlag.Header).ToList();
var newHeaderAction = new PipelineHeaderAction(headerTags);
var dataTags = tags.Where(x => x.Flag != TagFlag.Header).ToList();
var newDataAction = new PipelineDataAction(dataTags);
yield return newHeaderAction;
yield return newDataAction;
}
else
yield return data;
}
var headerTags = tags.Where(x => x.Flag == TagFlag.Header).ToList();
var newHeaderAction = new PipelineHeaderAction(headerTags);
var dataTags = tags.Where(x => x.Flag != TagFlag.Header).ToList();
var newDataAction = new PipelineDataAction(dataTags);
var localContext = new FlvProcessingContext(newHeaderAction, context.SessionItems);
await next(localContext).ConfigureAwait(false);
context.Output.AddRange(localContext.Output);
context.Comments.AddRange(localContext.Comments);
localContext.Reset(newDataAction, context.SessionItems);
await next(localContext).ConfigureAwait(false);
context.Output.AddRange(localContext.Output);
context.Comments.AddRange(localContext.Comments);
// TODO fix me
//var oi = context.Output.IndexOf(dataAction);
//context.Output.Insert(oi,newHeaderAction);
else
yield return action;
}
}
}

View File

@ -1,18 +1,24 @@
using System.Threading.Tasks;
using System;
using System.Collections.Generic;
namespace BililiveRecorder.Flv.Pipeline.Rules
{
/// <summary>
/// 处理 end tag
/// </summary>
public class HandleEndTagRule : IFullProcessingRule
public class HandleEndTagRule : ISimpleProcessingRule
{
public Task RunAsync(FlvProcessingContext context, ProcessingDelegate next)
public void Run(FlvProcessingContext context, Action next)
{
if (context.OriginalInput is PipelineEndAction)
context.AddNewFileAtEnd();
context.PerActionRun(this.RunPerAction);
next();
}
return next(context);
private IEnumerable<PipelineAction?> RunPerAction(FlvProcessingContext context, PipelineAction action)
{
yield return action;
if (action is PipelineEndAction)
yield return PipelineNewFileAction.Instance;
}
}
}

View File

@ -1,6 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace BililiveRecorder.Flv.Pipeline.Rules
{
@ -21,11 +21,15 @@ namespace BililiveRecorder.Flv.Pipeline.Rules
private static readonly ProcessingComment MultipleHeaderComment = new ProcessingComment(CommentType.DecodingHeader, "收到了连续多个 Header", skipCounting: true);
private static readonly ProcessingComment SplitFileComment = new ProcessingComment(CommentType.DecodingHeader, "因为 Header 问题新建文件");
public Task RunAsync(FlvProcessingContext context, Func<Task> next)
public void Run(FlvProcessingContext context, Action next)
{
if (context.OriginalInput is not PipelineHeaderAction header)
return next();
else
context.PerActionRun(this.RunPerAction);
next();
}
private IEnumerable<PipelineAction?> RunPerAction(FlvProcessingContext context, PipelineAction action)
{
if (action is PipelineHeaderAction header)
{
Tag? lastVideoHeader, lastAudioHeader;
Tag? currentVideoHeader, currentAudioHeader;
@ -95,15 +99,13 @@ namespace BililiveRecorder.Flv.Pipeline.Rules
if (lastAudioHeader is null && lastVideoHeader is null)
{
// 本 session 第一次输出 header
context.ClearOutput();
var output = new PipelineHeaderAction(Array.Empty<Tag>())
{
AudioHeader = currentAudioHeader?.Clone(),
VideoHeader = currentVideoHeader?.Clone(),
};
context.Output.Add(output);
yield return output;
}
else
{
@ -120,11 +122,11 @@ namespace BililiveRecorder.Flv.Pipeline.Rules
if (split_file)
context.AddComment(SplitFileComment);
context.ClearOutput();
if (split_file)
{
context.AddNewFileAtStart();
yield return PipelineNewFileAction.Instance;
var output = new PipelineHeaderAction(Array.Empty<Tag>())
{
@ -132,16 +134,17 @@ namespace BililiveRecorder.Flv.Pipeline.Rules
VideoHeader = currentVideoHeader?.Clone(),
};
context.Output.Add(output);
yield return output;
// 输出所有 Header 到一个独立的文件,以防出现无法播放的问题
// 如果不能正常播放,后期可以使用这里保存的 Header 配合 FlvInteractiveRebase 工具手动修复
if (multiple_header_present)
context.Output.Add(new PipelineLogAlternativeHeaderAction(header.AllTags));
yield return new PipelineLogAlternativeHeaderAction(header.AllTags);
}
}
return Task.CompletedTask;
}
else
yield return action;
}
}
}

View File

@ -1,6 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using BililiveRecorder.Flv.Amf;
namespace BililiveRecorder.Flv.Pipeline.Rules
@ -15,9 +14,15 @@ namespace BililiveRecorder.Flv.Pipeline.Rules
{
private static readonly ProcessingComment comment = new ProcessingComment(CommentType.Other, "收到了非 onMetaData 的 Script Tag");
public Task RunAsync(FlvProcessingContext context, Func<Task> next)
public void Run(FlvProcessingContext context, Action next)
{
if (context.OriginalInput is PipelineScriptAction scriptAction)
context.PerActionRun(this.RunPerAction);
next();
}
private IEnumerable<PipelineAction?> RunPerAction(FlvProcessingContext context, PipelineAction action)
{
if (action is PipelineScriptAction scriptAction)
{
var data = scriptAction.Tag.ScriptData;
if (!(data is null)
@ -35,9 +40,8 @@ namespace BililiveRecorder.Flv.Pipeline.Rules
if (value is null)
value = new ScriptDataEcmaArray();
context.ClearOutput();
context.AddNewFileAtStart();
context.Output.Add(new PipelineScriptAction(new Tag
yield return PipelineNewFileAction.Instance;
yield return (new PipelineScriptAction(new Tag
{
Type = TagType.Script,
ScriptData = new ScriptTagBody(new List<IScriptDataValue>
@ -50,12 +54,10 @@ namespace BililiveRecorder.Flv.Pipeline.Rules
else
{
context.AddComment(comment);
context.ClearOutput();
}
return Task.CompletedTask;
}
else
return next();
yield return action;
}
}
}

View File

@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using BililiveRecorder.Flv;
namespace BililiveRecorder.Flv.Pipeline.Rules
@ -21,11 +20,15 @@ namespace BililiveRecorder.Flv.Pipeline.Rules
private static readonly ProcessingComment comment = new ProcessingComment(CommentType.RepeatingData, "发现了重复的 Flv Chunk");
public Task RunAsync(FlvProcessingContext context, Func<Task> next)
public void Run(FlvProcessingContext context, Action next)
{
if (context.OriginalInput is not PipelineDataAction data)
return next();
else
context.PerActionRun(this.RunPerAction);
next();
}
private IEnumerable<PipelineAction?> RunPerAction(FlvProcessingContext context, PipelineAction action)
{
if (action is PipelineDataAction data)
{
var feature = new List<long>(data.Tags.Count * 2 + 1)
{
@ -79,9 +82,7 @@ namespace BililiveRecorder.Flv.Pipeline.Rules
// 对比历史特征
if (history.Any(x => x.SequenceEqual(feature)))
{
context.ClearOutput();
context.AddComment(comment);
return Task.CompletedTask;
}
else
{
@ -90,9 +91,11 @@ namespace BililiveRecorder.Flv.Pipeline.Rules
while (history.Count > MAX_HISTORY)
history.Dequeue();
return next();
yield return action;
}
}
else
yield return action;
}
}
}

View File

@ -1,5 +1,4 @@
using System;
using System.Threading.Tasks;
namespace BililiveRecorder.Flv.Pipeline.Rules
{
@ -14,13 +13,13 @@ namespace BililiveRecorder.Flv.Pipeline.Rules
/// </remarks>
public class RemoveFillerDataRule : ISimpleProcessingRule
{
public async Task RunAsync(FlvProcessingContext context, Func<Task> next)
public void Run(FlvProcessingContext context, Action next)
{
// 先运行下层规则
await next().ConfigureAwait(false);
next();
// 处理下层规则返回的数据
context.Output.ForEach(action =>
context.Actions.ForEach(action =>
{
if (action is PipelineDataAction dataAction)
foreach (var tag in dataAction.Tags)

View File

@ -1,6 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace BililiveRecorder.Flv.Pipeline.Rules
{
@ -12,54 +12,67 @@ namespace BililiveRecorder.Flv.Pipeline.Rules
private static readonly ProcessingComment comment1 = new ProcessingComment(CommentType.Unrepairable, "GOP内音频或视频时间戳变小");
private static readonly ProcessingComment comment2 = new ProcessingComment(CommentType.Unrepairable, "GOP内音频时间戳相比视频时间戳过小");
public Task RunAsync(FlvProcessingContext context, Func<Task> next)
public void Run(FlvProcessingContext context, Action next)
{
if (context.OriginalInput is not PipelineDataAction data)
return next();
context.PerActionRun(this.RunPerAction);
next();
}
private IEnumerable<PipelineAction?> RunPerAction(FlvProcessingContext context, PipelineAction action)
{
if (action is not PipelineDataAction data)
{
yield return action;
yield break;
}
// TODO: 重写这个规则,现在有问题
// 如果一切正常,直接跳过
if (!data.Tags.Any2((t1, t2) => t1.Timestamp > t2.Timestamp))
return next();
// 如果音频和视频单独判断还有问题,则判定为无法修复
if (data.Tags.Where(x => x.Type == TagType.Audio).Any2((a, b) => a.Timestamp > b.Timestamp)
|| data.Tags.Where(x => x.Type == TagType.Video).Any2((a, b) => a.Timestamp > b.Timestamp))
if (data.Tags.Any2((t1, t2) => t1.Timestamp > t2.Timestamp))
{
context.ClearOutput();
context.AddDisconnectAtStart();
context.AddComment(comment1);
return Task.CompletedTask;
// 如果音频和视频单独判断还有问题,则判定为无法修复
if (data.Tags.Where(x => x.Type == TagType.Audio).Any2((a, b) => a.Timestamp > b.Timestamp)
|| data.Tags.Where(x => x.Type == TagType.Video).Any2((a, b) => a.Timestamp > b.Timestamp))
{
context.AddComment(comment1);
yield return PipelineDisconnectAction.Instance;
yield return null;
yield break;
}
var audio1 = data.Tags.First(x => x.Type == TagType.Audio);
var video1 = data.Tags.First(x => x.Type == TagType.Video);
var diff = audio1.Timestamp - video1.Timestamp;
if (diff >= 0)
{
// 正常
}
else if (diff >= -5)
{
// 音频时间戳比视频早,把音频整体向后偏移
context.AddComment(new ProcessingComment(CommentType.TimestampJump, "GOP内音频时间戳轻度偏移"));
foreach (var tag in data.Tags.Where(x => x.Type == TagType.Audio))
tag.Timestamp -= diff;
// TODO: 重写改为排序而不是修改时间戳
}
else
{
// 音频时间戳比视频早太多,判定为无法修复
context.AddComment(comment2);
yield return PipelineDisconnectAction.Instance;
yield return null;
yield break;
}
// 排序
data.Tags = data.Tags.OrderBy(x => x.Timestamp).ToList();
}
var audio1 = data.Tags.First(x => x.Type == TagType.Audio);
var video1 = data.Tags.First(x => x.Type == TagType.Video);
var diff = audio1.Timestamp - video1.Timestamp;
if (diff >= 0)
{
// 正常
}
else if (diff >= -5)
{
// 音频时间戳比视频早,把音频整体向后偏移
context.AddComment(new ProcessingComment(CommentType.TimestampJump, "GOP内音频时间戳轻度偏移"));
foreach (var tag in data.Tags.Where(x => x.Type == TagType.Audio))
tag.Timestamp -= diff;
}
else
{
// 音频时间戳比视频早太多,判定为无法修复
context.ClearOutput();
context.AddDisconnectAtStart();
context.AddComment(comment2);
return Task.CompletedTask;
}
// 排序
data.Tags = data.Tags.OrderBy(x => x.Timestamp).ToList();
return next();
yield return data;
}
}
}

View File

@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace BililiveRecorder.Flv.Pipeline.Rules
{
@ -19,14 +18,14 @@ namespace BililiveRecorder.Flv.Pipeline.Rules
private const int VIDEO_DURATION_MIN = 15;
private const int VIDEO_DURATION_MAX = 50;
public async Task RunAsync(FlvProcessingContext context, Func<Task> next)
public void Run(FlvProcessingContext context, Action next)
{
await next().ConfigureAwait(false);
next();
var ts = context.SessionItems.ContainsKey(TS_STORE_KEY) ? context.SessionItems[TS_STORE_KEY] as TimestampStore ?? new TimestampStore() : new TimestampStore();
context.SessionItems[TS_STORE_KEY] = ts;
foreach (var action in context.Output)
foreach (var action in context.Actions)
{
if (action is PipelineDataAction dataAction)
{

View File

@ -46,7 +46,7 @@ namespace BililiveRecorder.Flv.Writer
await this.semaphoreSlim.WaitAsync().ConfigureAwait(false);
try
{
foreach (var item in context.Output)
foreach (var item in context.Actions)
{
try
{
@ -65,7 +65,7 @@ namespace BililiveRecorder.Flv.Writer
}
// Dispose tags
foreach (var action in context.Output)
foreach (var action in context.Actions)
if (action is PipelineDataAction dataAction)
foreach (var tag in dataAction.Tags)
tag.BinaryData?.Dispose();

View File

@ -110,14 +110,14 @@ namespace BililiveRecorder.WPF.Pages
break;
context.Reset(group, session);
await pipeline(context).ConfigureAwait(false);
pipeline(context);
if (context.Comments.Count > 0)
logger.Debug("修复逻辑输出 {@Comments}", context.Comments);
await writer.WriteAsync(context).ConfigureAwait(false);
foreach (var action in context.Output)
foreach (var action in context.Actions)
if (action is PipelineDataAction dataAction)
foreach (var tag in dataAction.Tags)
tag.BinaryData?.Dispose();
@ -180,7 +180,7 @@ namespace BililiveRecorder.WPF.Pages
break;
context.Reset(group, session);
await pipeline(context).ConfigureAwait(false);
pipeline(context);
if (context.Comments.Count > 0)
{
@ -188,7 +188,7 @@ namespace BililiveRecorder.WPF.Pages
comments.AddRange(context.Comments);
}
foreach (var action in context.Output)
foreach (var action in context.Actions)
if (action is PipelineDataAction dataAction)
foreach (var tag in dataAction.Tags)
tag.BinaryData?.Dispose();

View File

@ -31,7 +31,7 @@ namespace BililiveRecorder.Flv.RuleTests.Integrated
break;
context.Reset(group, session);
await pipeline(context).ConfigureAwait(false);
pipeline(context);
comments.AddRange(context.Comments);
await writer.WriteAsync(context).ConfigureAwait(false);

View File

@ -20,7 +20,7 @@ namespace BililiveRecorder.Flv.UnitTests.Grouping
public class TestOutputProvider : IFlvWriterTargetProvider
{
public Stream CreateAlternativeHeaderStream() => throw new NotImplementedException();
public (Stream, object) CreateOutputStream() => (File.Open(Path.Combine(TEST_OUTPUT_PATH, DateTimeOffset.Now.ToString("s").Replace(':', '-') + ".flv"), FileMode.CreateNew, FileAccess.ReadWrite, FileShare.None), null);
public (Stream, object) CreateOutputStream() => (File.Open(Path.Combine(TEST_OUTPUT_PATH, DateTimeOffset.Now.ToString("s").Replace(':', '-') + ".flv"), FileMode.CreateNew, FileAccess.ReadWrite, FileShare.None), null!);
}
[Fact(Skip = "Not ready")]
@ -33,7 +33,7 @@ namespace BililiveRecorder.Flv.UnitTests.Grouping
var grouping = new TagGroupReader(new FlvTagPipeReader(PipeReader.Create(File.OpenRead(path)), new TestRecyclableMemoryStreamProvider(), skipData: true, logger: null));
var context = new FlvProcessingContext();
var session = new Dictionary<object, object>();
var session = new Dictionary<object, object?>();
var sp = new ServiceCollection().BuildServiceProvider();
var pipeline = new ProcessingPipelineBuilder(sp).AddDefault().AddRemoveFillerData().Build();
@ -47,9 +47,9 @@ namespace BililiveRecorder.Flv.UnitTests.Grouping
context.Reset(g, session);
await pipeline(context);
pipeline(context);
foreach (var item in context.Output)
foreach (var item in context.Actions)
{
results.Add(item);
}
@ -75,7 +75,7 @@ namespace BililiveRecorder.Flv.UnitTests.Grouping
var comments = new List<string>();
var context = new FlvProcessingContext();
var session = new Dictionary<object, object>();
var session = new Dictionary<object, object?>();
var sp = new ServiceCollection().BuildServiceProvider();
var pipeline = new ProcessingPipelineBuilder(sp).AddDefault().AddRemoveFillerData().Build();
@ -91,12 +91,12 @@ namespace BililiveRecorder.Flv.UnitTests.Grouping
context.Reset(g, session);
await pipeline(context).ConfigureAwait(false);
pipeline(context);
comments.AddRange(context.Comments.Select(x => x.Comment));
await writer.WriteAsync(context).ConfigureAwait(false);
foreach (var action in context.Output)
foreach (var action in context.Actions)
{
// TODO action.Dispose();