using System; using System.Buffers; using System.Buffers.Binary; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using BililiveRecorder.Flv; using BililiveRecorder.Flv.Pipeline.Actions; using FastHashes; using StructLinq; namespace BililiveRecorder.Flv.Pipeline.Rules { /// /// 删除重复的直播数据。 /// public class RemoveDuplicatedChunkRule : ISimpleProcessingRule { private const int MAX_HISTORY = 16; private const string QUEUE_KEY = "DeDuplicationQueue"; private static readonly FarmHash64 farmHash64 = new(); private static readonly ProcessingComment comment = new ProcessingComment(CommentType.RepeatingData, true, "重复数据"); public void Run(FlvProcessingContext context, Action next) { context.PerActionRun(this.RunPerAction); next(); } private IEnumerable RunPerAction(FlvProcessingContext context, PipelineAction action) { if (action is PipelineDataAction data) { var tagHashs = new MemoryStream(4 + data.Tags.Count * 16); { var buffer = ArrayPool.Shared.Rent(4); try { BinaryPrimitives.WriteInt32BigEndian(buffer, data.Tags.Count); tagHashs.Write(buffer, 0, 4); } finally { ArrayPool.Shared.Return(buffer); } } foreach (var tag in data.Tags) { var tagHash = tag.DataHash ?? tag.UpdateDataHash(); if (tagHash is not null) { var bytes = Encoding.UTF8.GetBytes(tagHash); tagHashs.Write(bytes, 0, bytes.Length); } } var hash = farmHash64.ComputeHash(tagHashs.GetBuffer(), (int)tagHashs.Length); // 存储最近 MAX_HISTORY 个 Data Chunk 的特征的 Queue Queue hashHistory; if (context.SessionItems.TryGetValue(QUEUE_KEY, out var obj) && obj is Queue q) hashHistory = q; else { hashHistory = new Queue(MAX_HISTORY + 1); context.SessionItems[QUEUE_KEY] = hashHistory; } // 对比历史特征 if (hashHistory.ToStructEnumerable().Any(x => x.SequenceEqual(hash), x => x)) { // 重复数据 context.AddComment(comment); } else { // 新数据 hashHistory.Enqueue(hash); while (hashHistory.Count > MAX_HISTORY) hashHistory.Dequeue(); yield return action; } } else yield return action; } } }