surging 集成SuperSocket预发布版本2.0

一、概述

周末在家试着扩展SuperSocket,因为之前都是只支持.net framework, 后面出现支持.NET CORE 的SuperSocket 2.0 ,然后集成进来和dotnetty 做下对比,dotnetty 有多强,我压测可以支持20w/s, 然后客户提供的服务器,通过外网压测网关,把上行速度50MB带宽的网络跑满了,引擎主机CPU只是在15%左右,完全没有跑满。然后再试试国人开发的SuperSocket看下性能怎么样。

 

 

木舟 (Kayak) 是什么?

       木舟(Kayak)是基于.NET6.0软件环境下的surging微服务引擎进行开发的, 平台包含了微服务和物联网平台。支持异步和响应式编程开发,功能包含了物模型,设备,产品,网络组件的统一管理和微服务平台下的注册中心,服务路由,模块,中间服务等管理。还有多协议适配(TCP,MQTT,UDP,CoAP,HTTP,Grpc,websocket,rtmp,httpflv,webservice,等),通过灵活多样的配置适配能够接入不同厂家不同协议等设备。并且通过设备告警,消息通知,数据可视化等功能。能够让你能快速建立起微服务物联网平台系统。

     凯亚物联网平台:http://117.72.121.2:3100(用户名:fanly  密码:123456)(木舟物联网有人取了,准备改名原神凯亚,凡是交托于他的任务,总能得到解决

    链路跟踪Skywalking V8:http://117.72.121.2:8080/

      surging 微服务引擎开源地址:https://github.com/fanliang11/surging(后面surging 会移动到microsurging进行维护)

 

二、集成SuperSocket

作为去中心化的微服务引擎,相关的引擎组件,中间件都可以替换,就比如核心的RPC组件dotnetty 都可以替换成其它组件,下面介绍如何进行替换

创建服务端消息监听SuperSocketServerMessageListener,需要继承IMessageListener,代码如下:

  public class SuperSocketServerMessageListener : IMessageListener, IDisposable
  {
      public event ReceivedDelegate Received;

      private readonly ILogger _logger;

      private readonly ITransportMessageDecoder _transportMessageDecoder;
      private readonly ITransportMessageEncoder _transportMessageEncoder;
      private readonly IServiceEngineLifetime _serviceEngineLifetime;

      public SuperSocketServerMessageListener(ILogger logger, ITransportMessageCodecFactory codecFactory, IServiceEngineLifetime serviceEngineLifetime)
      {
          _logger = logger;
          _transportMessageEncoder = codecFactory.GetEncoder();
          _transportMessageDecoder = codecFactory.GetDecoder();
          _serviceEngineLifetime = serviceEngineLifetime;
      }
      public async Task StartAsync(EndPoint endPoint)
      {

          _serviceEngineLifetime.ServiceEngineStarted.Register(async () =>
          {
              try
              {
                  var ipEndPoint = endPoint as IPEndPoint;
                  var host = SuperSocketHostBuilder.Create()
              
                  .UsePackageHandler( (s, p) =>
                  {
                      Task.Run(async () =>
                      {
                          var sender = new SuperSocketServerMessageSender(_transportMessageEncoder, s);
                          await OnReceived(sender, p);
                      });
                      return ValueTask.CompletedTask;
                  })
                  .ConfigureSuperSocket(options =>
                  {
                      options.Name = "Echo Server";
                      options.Logger = _logger; 
                      options.AddListener(new ListenOptions
                      {
                          Ip = ipEndPoint.Address.ToString(),
                          Port = ipEndPoint.Port,
                        
                      }
                      );
                  })
                  .ConfigureLogging((hostCtx, loggingBuilder) =>
                  {
                      loggingBuilder.AddConsole();
                  })
                  .Build();
                  await host.RunAsync();
              }
              catch (Exception ex)
              {
                  _logger.LogError($"SuperSocket服务主机启动失败,监听地址:{endPoint}。 ");
              }
          });

      }
       
      public async Task OnReceived(IMessageSender sender, TransportMessage message)
      {
          if (Received == null)
              return;
          await Received(sender, message);
      }

      public void Dispose()
      { 
      }
  }

创建客户端消息监听SuperSocketTransportClientFactory,需要继承ITransportClientFactory,代码如下:

 

 internal class SuperSocketTransportClientFactory : ITransportClientFactory, IDisposable
 {
     private readonly ITransportMessageEncoder _transportMessageEncoder;
     private readonly ITransportMessageDecoder _transportMessageDecoder;
     private readonly ILogger _logger;
     private readonly IServiceExecutor _serviceExecutor;
     private readonly IHealthCheckService _healthCheckService;
     private readonly ConcurrentDictionary>> _clients = new ConcurrentDictionary>>();

     public SuperSocketTransportClientFactory(ITransportMessageCodecFactory codecFactory, IHealthCheckService healthCheckService, ILogger logger)
 : this(codecFactory, healthCheckService, logger, null)
     {
     }

     public SuperSocketTransportClientFactory(ITransportMessageCodecFactory codecFactory, IHealthCheckService healthCheckService, ILogger logger, IServiceExecutor serviceExecutor)
     {
         _transportMessageEncoder = codecFactory.GetEncoder();
         _transportMessageDecoder = codecFactory.GetDecoder();
         _logger = logger;
         _serviceExecutor = serviceExecutor;
         _healthCheckService = healthCheckService;
     }
     public async Task CreateClientAsync(EndPoint endPoint)
     {
         var key = endPoint;
         if (_logger.IsEnabled(LogLevel.Debug))
             _logger.LogDebug($"准备为服务端地址:{key}创建客户端。");
         try
         {
             return await _clients.GetOrAdd(key
          , k => new Lazy>(async () =>
          {
              //客户端对象 
              var client = new EasyClient(new TransportMessagePipelineFilter()).AsClient();
              var messageListener = new MessageListener();
              var messageSender = new SuperSocketMessageClientSender(_transportMessageEncoder, client);
              await client.ConnectAsync(endPoint);
              client.PackageHandler += async (sender, package) =>
              {
                  await messageListener.OnReceived(messageSender, package);
              };
              client.StartReceive();
              //创建客户端
              var transportClient = new TransportClient(messageSender, messageListener, _logger, _serviceExecutor);
              return transportClient;
          }
              )).Value;//返回实例
         }
         catch
         {
             //移除
             _clients.TryRemove(key, out var value);
             var ipEndPoint = endPoint as IPEndPoint;
             //标记这个地址是失败的请求
             if (ipEndPoint != null)
                 await _healthCheckService.MarkFailure(new IpAddressModel(ipEndPoint.Address.ToString(), ipEndPoint.Port));
             throw;
         }
     }


     public void Dispose()
     {
         foreach (var client in _clients.Values)
         {
             (client as IDisposable)?.Dispose();
         }
     }
 }

注册初始化SuperSocket引擎模块,需要继承EnginePartModule, 代码如下:

  public class SuperSocketModule : EnginePartModule
  {
      public override void Initialize(AppModuleContext context)
      {
          base.Initialize(context);
      }

      /// 
      /// Inject dependent third-party components
      /// 
      /// 
      protected override void RegisterBuilder(ContainerBuilderWrapper builder)
      {
          base.RegisterBuilder(builder);
          builder.Register(provider =>
          {
              IServiceExecutor serviceExecutor = null;
              if (provider.IsRegistered(typeof(IServiceExecutor)))
                  serviceExecutor = provider.Resolve();
              return new SuperSocketTransportClientFactory(provider.Resolve(),
                    provider.Resolve(),
                  provider.Resolve>(),
                  serviceExecutor);
          }).As(typeof(ITransportClientFactory)).SingleInstance();
          if (AppConfig.ServerOptions.Protocol == CommunicationProtocol.Tcp ||
              AppConfig.ServerOptions.Protocol == CommunicationProtocol.None)
          {
              RegisterDefaultProtocol(builder);
          }
      }

      private void RegisterDefaultProtocol(ContainerBuilderWrapper builder)
      {
          builder.Register(provider =>
          {
              return new SuperSocketServerMessageListener(provider.Resolve>(),
                    provider.Resolve(),
                       provider.Resolve());
          }).SingleInstance();
          builder.Register(provider =>
          {
              var serviceExecutor = provider.ResolveKeyed(CommunicationProtocol.Tcp.ToString());
              var messageListener = provider.Resolve();
              return new DefaultServiceHost(async endPoint =>
              {
                  await messageListener.StartAsync(endPoint);
                  return messageListener;
              }, serviceExecutor);
          }).As();
      }
  }

客户端服务端消息发送,需要继承IMessageSender, 代码如下:

 

    public abstract class SuperSocketMessageSender
    {
        private readonly ITransportMessageEncoder _transportMessageEncoder;

        protected SuperSocketMessageSender(ITransportMessageEncoder transportMessageEncoder)
        {
            _transportMessageEncoder = transportMessageEncoder;
        }

        protected byte[] GetByteBuffer(TransportMessage message)
        {
            var data = _transportMessageEncoder.Encode(message).ToList();
            data.AddRange(Encoding.UTF8.GetBytes("\r\n"));
            //var buffer = PooledByteBufferAllocator.Default.Buffer();
            return data.ToArray();
        }
    }

    public class SuperSocketMessageClientSender : SuperSocketMessageSender, IMessageSender
    {
        private readonly IEasyClient _client; 

        public SuperSocketMessageClientSender(ITransportMessageEncoder transportMessageEncoder, IEasyClient client) : base(transportMessageEncoder)
        {
            _client = client; 
        }

        /// 
        /// 发送消息。
        /// 
        /// 消息内容。
        /// 一个任务。 
        public async Task SendAsync(TransportMessage message)
        {
            var buffer = GetByteBuffer(message);
           await _client.SendAsync(buffer); 
        }

        /// 
        /// 发送消息并清空缓冲区。
        /// 
        /// 消息内容。
        /// 一个任务。 
        public async Task SendAndFlushAsync(TransportMessage message)
        {
            var buffer = GetByteBuffer(message);
            await _client.SendAsync(buffer); 
            // _client.StartReceive();
            //var p=  await _client.ReceiveAsync();
        }
    }

    #region Implementation of IMessageSender
    public class SuperSocketServerMessageSender : SuperSocketMessageSender, IMessageSender
    {
        private readonly IAppSession _session;

        public SuperSocketServerMessageSender(ITransportMessageEncoder transportMessageEncoder, IAppSession session) : base(transportMessageEncoder)
        {
            _session = session;
        }


        /// 
        /// 发送消息。
        /// 
        /// 消息内容。
        /// 一个任务。 
        public async Task SendAsync(TransportMessage message)
        {
            var buffer = GetByteBuffer(message);
           await _session.SendAsync(buffer);
        }

        /// 
        /// 发送消息并清空缓冲区。
        /// 
        /// 消息内容。
        /// 一个任务。 
        public async Task SendAndFlushAsync(TransportMessage message)
        {
            var buffer = GetByteBuffer(message);
           await _session.SendAsync(buffer);
        }

    }
    #endregion

 

SuperSocket过滤器,需要继承TerminatorPipelineFilter, 代码如下:

 

    public class TransportMessagePipelineFilter : TerminatorPipelineFilter
    {
        private readonly ITransportMessageDecoder _transportMessageDecoder;
        public TransportMessagePipelineFilter() : base(new[] { (byte)'\r', (byte)'\n' })
        {
            _transportMessageDecoder = ServiceLocator.GetService().GetDecoder();
        }


        public override TransportMessage Filter(ref SequenceReader<byte> bufferStream)
        {
            try
            {
                var bytes = bufferStream.Sequence.Slice(0, bufferStream.Length - 2).ToArray();
                var transportMessage = _transportMessageDecoder.Decode(bytes);
                return transportMessage;
            }
            finally
            {
                bufferStream.Advance(bufferStream.Length);
            }
        }
    }

 

三、如何加载SuperSocket引擎组件

第一种方式:

去掉Surging.Core.DotNetty引用,添加Surging.Core.SuperSocket

 第二种方式

添加Surging.Core.DotNetty,Surging.Core.SuperSocket这两个应用,在surgingSettings.json配置文件中把Packages的using列表中的DotNettyModule改成SuperSocketModule

四、结果

可能是预发布版本,在测试当中还是有些问题,kayka物联网平台换成SuperSocket还是会发生错误,暂时没有条件进行压测,可能是因为预发布版本,作者还需要完善,等正式版之后再压测做对比吧

 

五、总结

因为这段时间比较忙,还需要协助客户拆分服务,缓存降级,消息队列,等忙完这段时间,线上物联网平台会开通端口给用户进行测试,我也会努力把物联网进行完善,让微服务物联网平台能走向新的高度。

 

From:https://www.cnblogs.com/fanliang11/p/18824796
fanly11
100+评论
captcha