using System;
using System.Buffers;
using System.Buffers.Binary;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using BililiveRecorder.Flv;
using BililiveRecorder.Flv.Pipeline.Actions;
using FastHashes;
using StructLinq;
namespace BililiveRecorder.Flv.Pipeline.Rules
{
///
/// 删除重复的直播数据。
///
public class RemoveDuplicatedChunkRule : ISimpleProcessingRule
{
private const int MAX_HISTORY = 16;
private const string QUEUE_KEY = "DeDuplicationQueue";
private static readonly FarmHash64 farmHash64 = new();
private static readonly ProcessingComment comment = new ProcessingComment(CommentType.RepeatingData, true, "重复数据");
public void Run(FlvProcessingContext context, Action next)
{
context.PerActionRun(this.RunPerAction);
next();
}
private IEnumerable RunPerAction(FlvProcessingContext context, PipelineAction action)
{
if (action is PipelineDataAction data)
{
var tagHashs = new MemoryStream(4 + data.Tags.Count * 16);
{
var buffer = ArrayPool.Shared.Rent(4);
try
{
BinaryPrimitives.WriteInt32BigEndian(buffer, data.Tags.Count);
tagHashs.Write(buffer, 0, 4);
}
finally
{
ArrayPool.Shared.Return(buffer);
}
}
foreach (var tag in data.Tags)
{
var tagHash = tag.DataHash ?? tag.UpdateDataHash();
if (tagHash is not null)
{
var bytes = Encoding.UTF8.GetBytes(tagHash);
tagHashs.Write(bytes, 0, bytes.Length);
}
}
var hash = farmHash64.ComputeHash(tagHashs.GetBuffer(), (int)tagHashs.Length);
// 存储最近 MAX_HISTORY 个 Data Chunk 的特征的 Queue
Queue hashHistory;
if (context.SessionItems.TryGetValue(QUEUE_KEY, out var obj) && obj is Queue q)
hashHistory = q;
else
{
hashHistory = new Queue(MAX_HISTORY + 1);
context.SessionItems[QUEUE_KEY] = hashHistory;
}
// 对比历史特征
if (hashHistory.ToStructEnumerable().Any(x => x.SequenceEqual(hash), x => x))
{
// 重复数据
context.AddComment(comment);
}
else
{
// 新数据
hashHistory.Enqueue(hash);
while (hashHistory.Count > MAX_HISTORY)
hashHistory.Dequeue();
yield return action;
}
}
else
yield return action;
}
}
}