mirror of
https://github.com/BililiveRecorder/BililiveRecorder.git
synced 2024-12-26 20:26:00 +08:00
1cebee7c0e
TODO: fix test data
95 lines
3.1 KiB
C#
95 lines
3.1 KiB
C#
using System;
|
|
using System.Buffers;
|
|
using System.Buffers.Binary;
|
|
using System.Collections.Generic;
|
|
using System.IO;
|
|
using System.Linq;
|
|
using System.Text;
|
|
using BililiveRecorder.Flv;
|
|
using BililiveRecorder.Flv.Pipeline.Actions;
|
|
using FastHashes;
|
|
using StructLinq;
|
|
|
|
namespace BililiveRecorder.Flv.Pipeline.Rules
|
|
{
|
|
/// <summary>
|
|
/// 删除重复的直播数据。
|
|
/// </summary>
|
|
public class RemoveDuplicatedChunkRule : ISimpleProcessingRule
|
|
{
|
|
private const int MAX_HISTORY = 16;
|
|
private const string QUEUE_KEY = "DeDuplicationQueue";
|
|
|
|
private static readonly FarmHash64 farmHash64 = new();
|
|
private static readonly ProcessingComment comment = new ProcessingComment(CommentType.RepeatingData, "重复数据");
|
|
|
|
public void Run(FlvProcessingContext context, Action next)
|
|
{
|
|
context.PerActionRun(this.RunPerAction);
|
|
next();
|
|
}
|
|
|
|
private IEnumerable<PipelineAction?> RunPerAction(FlvProcessingContext context, PipelineAction action)
|
|
{
|
|
if (action is PipelineDataAction data)
|
|
{
|
|
var tagHashs = new MemoryStream(4 + data.Tags.Count * 16);
|
|
|
|
{
|
|
var buffer = ArrayPool<byte>.Shared.Rent(4);
|
|
try
|
|
{
|
|
BinaryPrimitives.WriteInt32BigEndian(buffer, data.Tags.Count);
|
|
tagHashs.Write(buffer, 0, 4);
|
|
}
|
|
finally
|
|
{
|
|
ArrayPool<byte>.Shared.Return(buffer);
|
|
}
|
|
}
|
|
|
|
foreach (var tag in data.Tags)
|
|
{
|
|
var tagHash = tag.DataHash ?? tag.UpdateDataHash();
|
|
if (tagHash is not null)
|
|
{
|
|
var bytes = Encoding.UTF8.GetBytes(tagHash);
|
|
tagHashs.Write(bytes, 0, bytes.Length);
|
|
}
|
|
}
|
|
|
|
var hash = farmHash64.ComputeHash(tagHashs.GetBuffer(), (int)tagHashs.Length);
|
|
|
|
// 存储最近 MAX_HISTORY 个 Data Chunk 的特征的 Queue
|
|
Queue<byte[]> hashHistory;
|
|
if (context.SessionItems.TryGetValue(QUEUE_KEY, out var obj) && obj is Queue<byte[]> q)
|
|
hashHistory = q;
|
|
else
|
|
{
|
|
hashHistory = new Queue<byte[]>(MAX_HISTORY + 1);
|
|
context.SessionItems[QUEUE_KEY] = hashHistory;
|
|
}
|
|
|
|
// 对比历史特征
|
|
if (hashHistory.ToStructEnumerable().Any(x => x.SequenceEqual(hash), x => x))
|
|
{
|
|
// 重复数据
|
|
context.AddComment(comment);
|
|
}
|
|
else
|
|
{
|
|
// 新数据
|
|
hashHistory.Enqueue(hash);
|
|
|
|
while (hashHistory.Count > MAX_HISTORY)
|
|
hashHistory.Dequeue();
|
|
|
|
yield return action;
|
|
}
|
|
}
|
|
else
|
|
yield return action;
|
|
}
|
|
}
|
|
}
|