BililiveRecorder/BililiveRecorder.Flv/Pipeline/Rules/RemoveDuplicatedChunkRule.cs

120 lines
4.2 KiB
C#
Raw Normal View History

2021-02-08 16:51:19 +08:00
using System;
using System.Buffers;
using System.Buffers.Binary;
2021-02-08 16:51:19 +08:00
using System.Collections.Generic;
using System.IO;
2021-02-08 16:51:19 +08:00
using System.Linq;
using System.Text;
2021-02-08 16:51:19 +08:00
using BililiveRecorder.Flv;
using BililiveRecorder.Flv.Pipeline.Actions;
using FastHashes;
using StructLinq;
2021-02-08 16:51:19 +08:00
namespace BililiveRecorder.Flv.Pipeline.Rules
{
/// <summary>
2022-05-02 00:01:41 +08:00
/// 删除重复的直播数据。
2021-02-08 16:51:19 +08:00
/// </summary>
public class RemoveDuplicatedChunkRule : ISimpleProcessingRule
{
private const int MAX_HISTORY = 16;
2021-02-08 16:51:19 +08:00
private const string QUEUE_KEY = "DeDuplicationQueue";
private const string DUPLICATED_COUNT_KEY = "DuplicatedFlvDataCount";
2021-02-08 16:51:19 +08:00
private static readonly FarmHash64 farmHash64 = new();
2022-06-17 17:42:50 +08:00
private static readonly ProcessingComment comment = new ProcessingComment(CommentType.RepeatingData, true, "重复数据");
2021-02-27 20:44:04 +08:00
2021-03-09 00:50:13 +08:00
public void Run(FlvProcessingContext context, Action next)
2021-02-08 16:51:19 +08:00
{
2021-03-09 00:50:13 +08:00
context.PerActionRun(this.RunPerAction);
next();
}
private IEnumerable<PipelineAction?> RunPerAction(FlvProcessingContext context, PipelineAction action)
{
if (action is PipelineDataAction data)
2021-02-08 16:51:19 +08:00
{
var tagHashs = new MemoryStream(4 + data.Tags.Count * 16);
2021-02-08 16:51:19 +08:00
{
var buffer = ArrayPool<byte>.Shared.Rent(4);
try
{
BinaryPrimitives.WriteInt32BigEndian(buffer, data.Tags.Count);
tagHashs.Write(buffer, 0, 4);
}
finally
2021-02-08 16:51:19 +08:00
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
2021-02-08 16:51:19 +08:00
foreach (var tag in data.Tags)
{
var tagHash = tag.DataHash ?? tag.UpdateDataHash();
if (tagHash is not null)
{
var bytes = Encoding.UTF8.GetBytes(tagHash);
tagHashs.Write(bytes, 0, bytes.Length);
2021-02-08 16:51:19 +08:00
}
}
var hash = farmHash64.ComputeHash(tagHashs.GetBuffer(), (int)tagHashs.Length);
2021-02-08 16:51:19 +08:00
// 存储最近 MAX_HISTORY 个 Data Chunk 的特征的 Queue
Queue<byte[]> hashHistory;
if (context.SessionItems.TryGetValue(QUEUE_KEY, out var obj) && obj is Queue<byte[]> q)
hashHistory = q;
2021-02-08 16:51:19 +08:00
else
{
hashHistory = new Queue<byte[]>(MAX_HISTORY + 1);
context.SessionItems[QUEUE_KEY] = hashHistory;
2021-02-08 16:51:19 +08:00
}
// 对比历史特征
if (hashHistory.ToStructEnumerable().Any(x => x.SequenceEqual(hash), x => x))
2021-02-08 16:51:19 +08:00
{
// 重复数据
2021-02-27 20:44:04 +08:00
context.AddComment(comment);
// 判断连续收到的重复数据数量
if (context.SessionItems.ContainsKey(DUPLICATED_COUNT_KEY) && context.SessionItems[DUPLICATED_COUNT_KEY] is int count)
{
count += 1;
}
else
{
count = 1;
}
const int DisconnectOnDuplicatedDataCount = 10;
if (count > DisconnectOnDuplicatedDataCount)
{
yield return new PipelineDisconnectAction($"连续收到了 {DisconnectOnDuplicatedDataCount} 段重复数据");
context.SessionItems.Remove(DUPLICATED_COUNT_KEY);
}
else
{
context.SessionItems[DUPLICATED_COUNT_KEY] = count;
}
2021-02-08 16:51:19 +08:00
}
else
{
// 新数据
hashHistory.Enqueue(hash);
2021-02-08 16:51:19 +08:00
while (hashHistory.Count > MAX_HISTORY)
hashHistory.Dequeue();
2021-02-08 16:51:19 +08:00
context.SessionItems.Remove(DUPLICATED_COUNT_KEY);
2021-03-09 00:50:13 +08:00
yield return action;
2021-02-08 16:51:19 +08:00
}
}
2021-03-09 00:50:13 +08:00
else
{
2021-03-09 00:50:13 +08:00
yield return action;
}
2021-02-08 16:51:19 +08:00
}
}
}