FLV: Detect duplicated data by using hashes

TODO: fix test data
This commit is contained in:
genteure 2022-05-28 13:55:43 +08:00
parent e3f660f6df
commit 1cebee7c0e
2 changed files with 212 additions and 14270 deletions

View File

@ -1,8 +1,13 @@
using System;
using System.Buffers;
using System.Buffers.Binary;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using BililiveRecorder.Flv;
using BililiveRecorder.Flv.Pipeline.Actions;
using FastHashes;
using StructLinq;
namespace BililiveRecorder.Flv.Pipeline.Rules
@ -12,10 +17,11 @@ namespace BililiveRecorder.Flv.Pipeline.Rules
/// </summary>
public class RemoveDuplicatedChunkRule : ISimpleProcessingRule
{
private const int MAX_HISTORY = 8;
private const int MAX_HISTORY = 16;
private const string QUEUE_KEY = "DeDuplicationQueue";
private static readonly ProcessingComment comment = new ProcessingComment(CommentType.RepeatingData, "发现了重复的 Flv Chunk");
private static readonly FarmHash64 farmHash64 = new();
private static readonly ProcessingComment comment = new ProcessingComment(CommentType.RepeatingData, "重复数据");
public void Run(FlvProcessingContext context, Action next)
{
@ -27,67 +33,56 @@ namespace BililiveRecorder.Flv.Pipeline.Rules
{
if (action is PipelineDataAction data)
{
var feature = new List<long>(data.Tags.Count * 2 + 1)
{
data.Tags.Count
};
var tagHashs = new MemoryStream(4 + data.Tags.Count * 16);
unchecked
{
// TODO: 改成用 Hash 判断
// 计算一个特征码
// 此处并没有遵循什么特定的算法,只是随便取了一些代表性比较强的值,用简单又尽量可靠的方式糅合到一起而已
var buffer = ArrayPool<byte>.Shared.Rent(4);
try
{
BinaryPrimitives.WriteInt32BigEndian(buffer, data.Tags.Count);
tagHashs.Write(buffer, 0, 4);
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
foreach (var tag in data.Tags)
{
var f = 0L;
f ^= tag.Type switch
var tagHash = tag.DataHash ?? tag.UpdateDataHash();
if (tagHash is not null)
{
TagType.Audio => 0b01,
TagType.Video => 0b10,
TagType.Script => 0b11,
_ => 0b00,
};
f <<= 3;
f ^= (int)tag.Flag & ((1 << 3) - 1);
f <<= 32;
f ^= tag.Timestamp;
f <<= 32 - 5;
f ^= tag.Size & ((1 << (32 - 5)) - 1);
feature.Add(f);
var bytes = Encoding.UTF8.GetBytes(tagHash);
tagHashs.Write(bytes, 0, bytes.Length);
}
}
if (tag.Nalus == null)
feature.Add(long.MinValue);
else
{
long n = tag.Nalus.Count << 32;
foreach (var nalu in tag.Nalus)
n ^= (((int)nalu.Type) << 16) ^ ((int)nalu.FullSize);
feature.Add(n);
}
}
}
var hash = farmHash64.ComputeHash(tagHashs.GetBuffer(), (int)tagHashs.Length);
// 存储最近 MAX_HISTORY 个 Data Chunk 的特征的 Queue
Queue<List<long>> history;
if (context.SessionItems.TryGetValue(QUEUE_KEY, out var obj) && obj is Queue<List<long>> q)
history = q;
Queue<byte[]> hashHistory;
if (context.SessionItems.TryGetValue(QUEUE_KEY, out var obj) && obj is Queue<byte[]> q)
hashHistory = q;
else
{
history = new Queue<List<long>>(MAX_HISTORY + 1);
context.SessionItems[QUEUE_KEY] = history;
hashHistory = new Queue<byte[]>(MAX_HISTORY + 1);
context.SessionItems[QUEUE_KEY] = hashHistory;
}
// 对比历史特征
if (history.ToStructEnumerable().Any(x => x.SequenceEqual(feature), x => x))
if (hashHistory.ToStructEnumerable().Any(x => x.SequenceEqual(hash), x => x))
{
// 重复数据
context.AddComment(comment);
}
else
{
history.Enqueue(feature);
// 新数据
hashHistory.Enqueue(hash);
while (history.Count > MAX_HISTORY)
history.Dequeue();
while (hashHistory.Count > MAX_HISTORY)
hashHistory.Dequeue();
yield return action;
}