一、概述
周末在家试着扩展SuperSocket,因为之前都是只支持.net framework, 后面出现支持.NET CORE 的SuperSocket 2.0 ,然后集成进来和dotnetty 做下对比,dotnetty 有多强,我压测可以支持20w/s, 然后客户提供的服务器,通过外网压测网关,把上行速度50MB带宽的网络跑满了,引擎主机CPU只是在15%左右,完全没有跑满。然后再试试国人开发的SuperSocket看下性能怎么样。
木舟 (Kayak) 是什么?
木舟(Kayak)是基于.NET6.0软件环境下的surging微服务引擎进行开发的, 平台包含了微服务和物联网平台。支持异步和响应式编程开发,功能包含了物模型,设备,产品,网络组件的统一管理和微服务平台下的注册中心,服务路由,模块,中间服务等管理。还有多协议适配(TCP,MQTT,UDP,CoAP,HTTP,Grpc,websocket,rtmp,httpflv,webservice,等),通过灵活多样的配置适配能够接入不同厂家不同协议等设备。并且通过设备告警,消息通知,数据可视化等功能。能够让你能快速建立起微服务物联网平台系统。
凯亚物联网平台:http://117.72.121.2:3100(用户名:fanly 密码:123456)(木舟物联网有人取了,准备改名原神凯亚,凡是交托于他的任务,总能得到解决)
链路跟踪Skywalking V8:http://117.72.121.2:8080/
surging 微服务引擎开源地址:https://github.com/fanliang11/surging(后面surging 会移动到microsurging进行维护)
二、集成SuperSocket
作为去中心化的微服务引擎,相关的引擎组件,中间件都可以替换,就比如核心的RPC组件dotnetty 都可以替换成其它组件,下面介绍如何进行替换
创建服务端消息监听SuperSocketServerMessageListener,需要继承IMessageListener,代码如下:
public class SuperSocketServerMessageListener : IMessageListener, IDisposable { public event ReceivedDelegate Received; private readonly ILogger_logger; private readonly ITransportMessageDecoder _transportMessageDecoder; private readonly ITransportMessageEncoder _transportMessageEncoder; private readonly IServiceEngineLifetime _serviceEngineLifetime; public SuperSocketServerMessageListener(ILogger logger, ITransportMessageCodecFactory codecFactory, IServiceEngineLifetime serviceEngineLifetime) { _logger = logger; _transportMessageEncoder = codecFactory.GetEncoder(); _transportMessageDecoder = codecFactory.GetDecoder(); _serviceEngineLifetime = serviceEngineLifetime; } public async Task StartAsync(EndPoint endPoint) { _serviceEngineLifetime.ServiceEngineStarted.Register(async () => { try { var ipEndPoint = endPoint as IPEndPoint; var host = SuperSocketHostBuilder.Create () .UsePackageHandler( (s, p) => { Task.Run(async () => { var sender = new SuperSocketServerMessageSender(_transportMessageEncoder, s); await OnReceived(sender, p); }); return ValueTask.CompletedTask; }) .ConfigureSuperSocket(options => { options.Name = "Echo Server"; options.Logger = _logger; options.AddListener(new ListenOptions { Ip = ipEndPoint.Address.ToString(), Port = ipEndPoint.Port, } ); }) .ConfigureLogging((hostCtx, loggingBuilder) => { loggingBuilder.AddConsole(); }) .Build(); await host.RunAsync(); } catch (Exception ex) { _logger.LogError($"SuperSocket服务主机启动失败,监听地址:{endPoint}。 "); } }); } public async Task OnReceived(IMessageSender sender, TransportMessage message) { if (Received == null) return; await Received(sender, message); } public void Dispose() { } }
创建客户端消息监听SuperSocketTransportClientFactory,需要继承ITransportClientFactory,代码如下:
internal class SuperSocketTransportClientFactory : ITransportClientFactory, IDisposable { private readonly ITransportMessageEncoder _transportMessageEncoder; private readonly ITransportMessageDecoder _transportMessageDecoder; private readonly ILogger_logger; private readonly IServiceExecutor _serviceExecutor; private readonly IHealthCheckService _healthCheckService; private readonly ConcurrentDictionary >> _clients = new ConcurrentDictionary >>(); public SuperSocketTransportClientFactory(ITransportMessageCodecFactory codecFactory, IHealthCheckService healthCheckService, ILogger logger) : this(codecFactory, healthCheckService, logger, null) { } public SuperSocketTransportClientFactory(ITransportMessageCodecFactory codecFactory, IHealthCheckService healthCheckService, ILogger logger, IServiceExecutor serviceExecutor) { _transportMessageEncoder = codecFactory.GetEncoder(); _transportMessageDecoder = codecFactory.GetDecoder(); _logger = logger; _serviceExecutor = serviceExecutor; _healthCheckService = healthCheckService; } public async Task CreateClientAsync(EndPoint endPoint) { var key = endPoint; if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug($"准备为服务端地址:{key}创建客户端。"); try { return await _clients.GetOrAdd(key , k => new Lazy >(async () => { //客户端对象 var client = new EasyClient (new TransportMessagePipelineFilter()).AsClient(); var messageListener = new MessageListener(); var messageSender = new SuperSocketMessageClientSender(_transportMessageEncoder, client); await client.ConnectAsync(endPoint); client.PackageHandler += async (sender, package) => { await messageListener.OnReceived(messageSender, package); }; client.StartReceive(); //创建客户端 var transportClient = new TransportClient(messageSender, messageListener, _logger, _serviceExecutor); return transportClient; } )).Value;//返回实例 } catch { //移除 _clients.TryRemove(key, out var value); var ipEndPoint = endPoint as IPEndPoint; //标记这个地址是失败的请求 if (ipEndPoint != null) await _healthCheckService.MarkFailure(new IpAddressModel(ipEndPoint.Address.ToString(), ipEndPoint.Port)); throw; } } public void Dispose() { foreach (var client in _clients.Values) { (client as IDisposable)?.Dispose(); } } }
注册初始化SuperSocket引擎模块,需要继承EnginePartModule, 代码如下:
public class SuperSocketModule : EnginePartModule { public override void Initialize(AppModuleContext context) { base.Initialize(context); } ////// Inject dependent third-party components /// /// protected override void RegisterBuilder(ContainerBuilderWrapper builder) { base.RegisterBuilder(builder); builder.Register(provider => { IServiceExecutor serviceExecutor = null; if (provider.IsRegistered(typeof(IServiceExecutor))) serviceExecutor = provider.Resolve (); return new SuperSocketTransportClientFactory(provider.Resolve (), provider.Resolve (), provider.Resolve >(), serviceExecutor); }).As(typeof(ITransportClientFactory)).SingleInstance(); if (AppConfig.ServerOptions.Protocol == CommunicationProtocol.Tcp || AppConfig.ServerOptions.Protocol == CommunicationProtocol.None) { RegisterDefaultProtocol(builder); } } private void RegisterDefaultProtocol(ContainerBuilderWrapper builder) { builder.Register(provider => { return new SuperSocketServerMessageListener(provider.Resolve >(), provider.Resolve (), provider.Resolve ()); }).SingleInstance(); builder.Register(provider => { var serviceExecutor = provider.ResolveKeyed (CommunicationProtocol.Tcp.ToString()); var messageListener = provider.Resolve (); return new DefaultServiceHost(async endPoint => { await messageListener.StartAsync(endPoint); return messageListener; }, serviceExecutor); }).As (); } }
客户端服务端消息发送,需要继承IMessageSender, 代码如下:
public abstract class SuperSocketMessageSender { private readonly ITransportMessageEncoder _transportMessageEncoder; protected SuperSocketMessageSender(ITransportMessageEncoder transportMessageEncoder) { _transportMessageEncoder = transportMessageEncoder; } protected byte[] GetByteBuffer(TransportMessage message) { var data = _transportMessageEncoder.Encode(message).ToList(); data.AddRange(Encoding.UTF8.GetBytes("\r\n")); //var buffer = PooledByteBufferAllocator.Default.Buffer(); return data.ToArray(); } } public class SuperSocketMessageClientSender : SuperSocketMessageSender, IMessageSender { private readonly IEasyClient_client; public SuperSocketMessageClientSender(ITransportMessageEncoder transportMessageEncoder, IEasyClient client) : base(transportMessageEncoder) { _client = client; } /// /// 发送消息。 /// /// 消息内容。 /// 一个任务。 public async Task SendAsync(TransportMessage message) { var buffer = GetByteBuffer(message); await _client.SendAsync(buffer); } /// /// 发送消息并清空缓冲区。 /// /// 消息内容。 /// 一个任务。 public async Task SendAndFlushAsync(TransportMessage message) { var buffer = GetByteBuffer(message); await _client.SendAsync(buffer); // _client.StartReceive(); //var p= await _client.ReceiveAsync(); } } #region Implementation of IMessageSender public class SuperSocketServerMessageSender : SuperSocketMessageSender, IMessageSender { private readonly IAppSession _session; public SuperSocketServerMessageSender(ITransportMessageEncoder transportMessageEncoder, IAppSession session) : base(transportMessageEncoder) { _session = session; } /// /// 发送消息。 /// /// 消息内容。 /// 一个任务。 public async Task SendAsync(TransportMessage message) { var buffer = GetByteBuffer(message); await _session.SendAsync(buffer); } /// /// 发送消息并清空缓冲区。 /// /// 消息内容。 /// 一个任务。 public async Task SendAndFlushAsync(TransportMessage message) { var buffer = GetByteBuffer(message); await _session.SendAsync(buffer); } } #endregion
SuperSocket过滤器,需要继承TerminatorPipelineFilter
public class TransportMessagePipelineFilter : TerminatorPipelineFilter{ private readonly ITransportMessageDecoder _transportMessageDecoder; public TransportMessagePipelineFilter() : base(new[] { (byte)'\r', (byte)'\n' }) { _transportMessageDecoder = ServiceLocator.GetService ().GetDecoder(); } public override TransportMessage Filter(ref SequenceReader<byte> bufferStream) { try { var bytes = bufferStream.Sequence.Slice(0, bufferStream.Length - 2).ToArray(); var transportMessage = _transportMessageDecoder.Decode(bytes); return transportMessage; } finally { bufferStream.Advance(bufferStream.Length); } } }
三、如何加载SuperSocket引擎组件
第一种方式:
去掉Surging.Core.DotNetty引用,添加Surging.Core.SuperSocket
第二种方式
添加Surging.Core.DotNetty,Surging.Core.SuperSocket这两个应用,在surgingSettings.json配置文件中把Packages的using列表中的DotNettyModule改成SuperSocketModule
四、结果
可能是预发布版本,在测试当中还是有些问题,kayka物联网平台换成SuperSocket还是会发生错误,暂时没有条件进行压测,可能是因为预发布版本,作者还需要完善,等正式版之后再压测做对比吧
五、总结
因为这段时间比较忙,还需要协助客户拆分服务,缓存降级,消息队列,等忙完这段时间,线上物联网平台会开通端口给用户进行测试,我也会努力把物联网进行完善,让微服务物联网平台能走向新的高度。