BililiveRecorder/BililiveRecorder.Core/StreamMonitor.cs

430 lines
14 KiB
C#
Raw Normal View History

2018-12-18 00:16:24 +08:00
using BililiveRecorder.Core.Config;
using NLog;
2018-03-21 20:56:56 +08:00
using System;
using System.IO;
2020-04-10 18:47:07 +08:00
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
2018-03-21 20:56:56 +08:00
using System.Threading;
2018-03-21 00:33:34 +08:00
using System.Threading.Tasks;
2018-12-18 00:16:24 +08:00
using Timer = System.Timers.Timer;
2018-03-13 14:23:53 +08:00
namespace BililiveRecorder.Core
{
/**
*
* HTTP轮询两部分
*
*
*
*
* HTTP轮询
*
*
*
* */
2018-10-24 14:33:05 +08:00
public class StreamMonitor : IStreamMonitor
2018-03-13 14:23:53 +08:00
{
2018-03-21 20:56:56 +08:00
private static readonly Logger logger = LogManager.GetCurrentClassLogger();
private readonly Func<TcpClient> funcTcpClient;
2018-12-18 00:16:24 +08:00
private readonly ConfigV1 config;
2019-08-22 01:48:19 +08:00
private const string DM_SERVER_HOST = "broadcastlv.chat.bilibili.com";
private const int DM_SERVER_PORT = 2243;
#pragma warning disable IDE1006 // 命名样式
private bool dmTcpConnected => dmClient?.Connected ?? false;
#pragma warning restore IDE1006 // 命名样式
private Exception dmError = null;
private TcpClient dmClient;
private NetworkStream dmNetStream;
private Thread dmReceiveMessageLoopThread;
private CancellationTokenSource dmTokenSource = null;
2018-12-18 00:16:24 +08:00
private readonly Timer httpTimer;
2018-03-21 20:56:56 +08:00
2018-03-21 00:33:34 +08:00
public int Roomid { get; private set; } = 0;
public bool IsMonitoring { get; private set; } = false;
public event RoomInfoUpdatedEvent RoomInfoUpdated;
public event StreamStartedEvent StreamStarted;
public event ReceivedDanmakuEvt ReceivedDanmaku;
2018-03-13 14:23:53 +08:00
2018-12-18 00:16:24 +08:00
public StreamMonitor(int roomid, Func<TcpClient> funcTcpClient, ConfigV1 config)
2018-03-13 14:23:53 +08:00
{
this.funcTcpClient = funcTcpClient;
2018-12-18 00:16:24 +08:00
this.config = config;
2018-03-21 00:33:34 +08:00
Roomid = roomid;
ReceivedDanmaku += Receiver_ReceivedDanmaku;
2018-03-15 21:55:01 +08:00
dmTokenSource = new CancellationTokenSource();
Repeat.Interval(TimeSpan.FromSeconds(30), () =>
{
if (dmNetStream != null && dmNetStream.CanWrite)
{
try
{
SendSocketData(2);
}
catch (Exception) { }
}
}, dmTokenSource.Token);
2018-12-18 00:16:24 +08:00
httpTimer = new Timer(config.TimingCheckInterval * 1000)
{
Enabled = false,
AutoReset = true,
SynchronizingObject = null,
Site = null
};
httpTimer.Elapsed += (sender, e) =>
{
try
{
Check(TriggerType.HttpApi);
}
catch (Exception ex)
{
logger.Log(Roomid, LogLevel.Warn, "获取直播间开播状态出错", ex);
}
};
config.PropertyChanged += (sender, e) =>
{
if (e.PropertyName.Equals(nameof(config.TimingCheckInterval)))
{
httpTimer.Interval = config.TimingCheckInterval * 1000;
}
};
Task.Run(() => ConnectWithRetry());
2018-03-15 21:55:01 +08:00
}
2018-03-21 00:33:34 +08:00
private void Receiver_ReceivedDanmaku(object sender, ReceivedDanmakuArgs e)
{
switch (e.Danmaku.MsgType)
{
case MsgTypeEnum.LiveStart:
2019-04-15 23:27:26 +08:00
if (IsMonitoring)
{
Task.Run(() => StreamStarted?.Invoke(this, new StreamStartedArgs() { type = TriggerType.Danmaku }));
2019-04-15 23:27:26 +08:00
}
2018-03-21 00:33:34 +08:00
break;
case MsgTypeEnum.LiveEnd:
break;
default:
break;
}
}
#region API
public bool Start()
2018-03-21 00:33:34 +08:00
{
if (disposedValue)
2018-03-21 00:33:34 +08:00
{
throw new ObjectDisposedException(nameof(StreamMonitor));
2018-03-21 20:56:56 +08:00
}
2018-03-21 00:33:34 +08:00
IsMonitoring = true;
2018-12-18 00:16:24 +08:00
httpTimer.Start();
2018-12-19 21:26:48 +08:00
Check(TriggerType.HttpApi);
return true;
2018-03-21 00:33:34 +08:00
}
public void Stop()
2018-03-21 00:33:34 +08:00
{
if (disposedValue)
{
throw new ObjectDisposedException(nameof(StreamMonitor));
}
IsMonitoring = false;
2018-12-18 00:16:24 +08:00
httpTimer.Stop();
2018-03-21 00:33:34 +08:00
}
2018-12-18 00:16:24 +08:00
public void Check(TriggerType type, int millisecondsDelay = 0)
2018-03-21 20:56:56 +08:00
{
if (disposedValue)
{
throw new ObjectDisposedException(nameof(StreamMonitor));
}
2018-12-18 00:16:24 +08:00
if (millisecondsDelay < 0)
2018-10-24 14:33:05 +08:00
{
2018-12-18 00:16:24 +08:00
throw new ArgumentOutOfRangeException(nameof(millisecondsDelay), "不能小于0");
}
2019-08-22 01:26:18 +08:00
Task.Run(async () =>
{
await Task.Delay(millisecondsDelay).ConfigureAwait(false);
if ((await FetchRoomInfoAsync().ConfigureAwait(false)).IsStreaming)
2018-10-24 14:33:05 +08:00
{
StreamStarted?.Invoke(this, new StreamStartedArgs() { type = type });
2018-10-24 14:33:05 +08:00
}
2018-12-18 00:16:24 +08:00
});
}
2018-10-24 14:33:05 +08:00
2019-08-22 01:26:18 +08:00
public async Task<RoomInfo> FetchRoomInfoAsync()
{
RoomInfo roomInfo = await BililiveAPI.GetRoomInfoAsync(Roomid).ConfigureAwait(false);
RoomInfoUpdated?.Invoke(this, new RoomInfoUpdatedArgs { RoomInfo = roomInfo });
return roomInfo;
}
#endregion
#region
2018-03-21 20:56:56 +08:00
private void ConnectWithRetry()
{
bool connect_result = false;
while (!dmTcpConnected && !dmTokenSource.Token.IsCancellationRequested)
2018-03-24 02:27:58 +08:00
{
Thread.Sleep((int)Math.Max(config.TimingDanmakuRetry, 0));
logger.Log(Roomid, LogLevel.Info, "连接弹幕服务器...");
connect_result = Connect();
}
if (connect_result)
{
logger.Log(Roomid, LogLevel.Info, "弹幕服务器连接成功");
}
}
private bool Connect()
{
if (dmTcpConnected) { return true; }
try
{
dmClient = funcTcpClient();
2019-08-22 01:48:19 +08:00
dmClient.Connect(DM_SERVER_HOST, DM_SERVER_PORT);
dmNetStream = dmClient.GetStream();
dmReceiveMessageLoopThread = new Thread(ReceiveMessageLoop)
2018-10-31 06:22:38 +08:00
{
Name = "ReceiveMessageLoop " + Roomid,
IsBackground = true
};
dmReceiveMessageLoopThread.Start();
SendSocketData(7, "{\"roomid\":" + Roomid + ",\"uid\":0}");
SendSocketData(2);
return true;
}
catch (Exception ex)
{
dmError = ex;
logger.Log(Roomid, LogLevel.Error, "连接弹幕服务器错误", ex);
return false;
}
}
private void ReceiveMessageLoop()
{
2019-03-01 18:00:00 +08:00
logger.Log(Roomid, LogLevel.Trace, "ReceiveMessageLoop Started");
try
{
2020-04-10 18:47:07 +08:00
var stableBuffer = new byte[16];
var buffer = new byte[4096];
while (dmTcpConnected)
{
2020-04-10 18:47:07 +08:00
dmNetStream.ReadB(stableBuffer, 0, 16);
Parse2Protocol(stableBuffer, out DanmakuProtocol protocol);
2020-04-10 18:47:07 +08:00
if (protocol.PacketLength < 16)
2018-10-31 06:22:38 +08:00
{
2020-04-10 18:47:07 +08:00
throw new NotSupportedException("协议失败: (L:" + protocol.PacketLength + ")");
2018-10-31 06:22:38 +08:00
}
2020-04-10 18:47:07 +08:00
var payloadlength = protocol.PacketLength - 16;
if (payloadlength == 0)
2018-10-31 06:22:38 +08:00
{
continue;//没有内容了
2018-10-31 06:22:38 +08:00
}
2020-04-10 18:47:07 +08:00
if (buffer.Length < payloadlength) // 不够长再申请
{
buffer = new byte[payloadlength];
}
dmNetStream.ReadB(buffer, 0, payloadlength);
if (protocol.Version == 2 && protocol.Action == 5) // 处理deflate消息
{
2020-04-10 18:47:07 +08:00
// Skip 0x78 0xDA
using (DeflateStream deflate = new DeflateStream(new MemoryStream(buffer, 2, payloadlength - 2), CompressionMode.Decompress))
{
while (deflate.Read(stableBuffer, 0, 16) > 0)
{
2020-04-10 18:47:07 +08:00
Parse2Protocol(stableBuffer, out protocol);
payloadlength = protocol.PacketLength - 16;
if (payloadlength == 0)
{
continue; // 没有内容了
}
if (buffer.Length < payloadlength) // 不够长再申请
{
buffer = new byte[payloadlength];
}
deflate.Read(buffer, 0, payloadlength);
ProcessDanmaku(protocol.Action, buffer, payloadlength);
}
2020-04-10 18:47:07 +08:00
}
}
else
{
ProcessDanmaku(protocol.Action, buffer, payloadlength);
}
void ProcessDanmaku(int action, byte[] local_buffer, int length)
{
switch (action)
{
case 3:
// var viewer = BitConverter.ToUInt32(local_buffer.Take(4).Reverse().ToArray(), 0); //观众人数
break;
case 5://playerCommand
var json = Encoding.UTF8.GetString(local_buffer, 0, length);
try
{
ReceivedDanmaku?.Invoke(this, new ReceivedDanmakuArgs() { Danmaku = new DanmakuModel(json) });
}
catch (Exception ex)
{
2019-03-01 18:00:00 +08:00
logger.Log(Roomid, LogLevel.Warn, "", ex);
}
break;
2020-04-10 18:47:07 +08:00
default:
break;
}
}
}
}
catch (Exception ex)
{
dmError = ex;
// logger.Error(ex);
2019-03-01 18:00:00 +08:00
logger.Log(Roomid, LogLevel.Debug, "Disconnected");
dmClient?.Close();
dmNetStream = null;
if (!(dmTokenSource?.IsCancellationRequested ?? true))
{
2019-03-01 18:00:00 +08:00
logger.Log(Roomid, LogLevel.Warn, "弹幕连接被断开,将尝试重连", ex);
ConnectWithRetry();
}
2018-03-24 02:27:58 +08:00
}
2018-03-21 00:33:34 +08:00
}
private void SendSocketData(int action, string body = "")
2018-03-21 00:33:34 +08:00
{
const int param = 1;
const short magic = 16;
const short ver = 1;
var playload = Encoding.UTF8.GetBytes(body);
var buffer = new byte[(playload.Length + 16)];
using (var ms = new MemoryStream(buffer))
2018-10-24 14:33:05 +08:00
{
var b = BitConverter.GetBytes(buffer.Length).ToBE();
ms.Write(b, 0, 4);
b = BitConverter.GetBytes(magic).ToBE();
ms.Write(b, 0, 2);
b = BitConverter.GetBytes(ver).ToBE();
ms.Write(b, 0, 2);
b = BitConverter.GetBytes(action).ToBE();
ms.Write(b, 0, 4);
b = BitConverter.GetBytes(param).ToBE();
ms.Write(b, 0, 4);
if (playload.Length > 0)
{
ms.Write(playload, 0, playload.Length);
}
dmNetStream.Write(buffer, 0, buffer.Length);
dmNetStream.Flush();
2018-10-24 14:33:05 +08:00
}
2018-03-21 00:33:34 +08:00
}
2020-04-10 18:47:07 +08:00
private static unsafe void Parse2Protocol(byte[] buffer, out DanmakuProtocol protocol)
{
fixed (byte* ptr = buffer)
{
protocol = *(DanmakuProtocol*)ptr;
}
protocol.ChangeEndian();
}
private struct DanmakuProtocol
{
/// <summary>
/// 消息总长度 (协议头 + 数据长度)
/// </summary>
public int PacketLength;
/// <summary>
/// 消息头长度 (固定为16[sizeof(DanmakuProtocol)])
/// </summary>
public short HeaderLength;
/// <summary>
/// 消息版本号
/// </summary>
public short Version;
/// <summary>
/// 消息类型
/// </summary>
public int Action;
/// <summary>
/// 参数, 固定为1
/// </summary>
public int Parameter;
/// <summary>
/// 转为本机字节序
/// </summary>
public void ChangeEndian()
{
PacketLength = IPAddress.HostToNetworkOrder(PacketLength);
HeaderLength = IPAddress.HostToNetworkOrder(HeaderLength);
Version = IPAddress.HostToNetworkOrder(Version);
Action = IPAddress.HostToNetworkOrder(Action);
Parameter = IPAddress.HostToNetworkOrder(Parameter);
}
}
#endregion
#region IDisposable Support
private bool disposedValue = false; // 要检测冗余调用
protected virtual void Dispose(bool disposing)
2018-03-21 00:33:34 +08:00
{
if (!disposedValue)
2018-10-24 14:33:05 +08:00
{
if (disposing)
{
dmTokenSource?.Cancel();
2018-12-18 00:16:24 +08:00
dmTokenSource?.Dispose();
httpTimer?.Dispose();
dmClient?.Close();
}
dmNetStream = null;
disposedValue = true;
2018-10-24 14:33:05 +08:00
}
}
2018-03-21 00:33:34 +08:00
public void Dispose()
{
// 请勿更改此代码。将清理代码放入以上 Dispose(bool disposing) 中。
Dispose(true);
2018-03-21 00:33:34 +08:00
}
#endregion
2018-03-13 14:23:53 +08:00
}
}