前段时间需要写一个模拟大量客户端的程序来对服务器做压力测试;选用了Netty做为通信框架;通信协议采用了WebSocket;根据官方下载的源码和Demo写完后发现连接几十个连接后就无法继续获取新的Socket通道了;后来经过各种尝试后发现是EventLoopGroup用法不正确导致的;正确用法是所有的Socket连接共用一个EventLoopGroup就可以正常模拟大量的客户端连接了。
文中所有代码主要参照Netty官方源码中netty-netty-4.1.84.Final/example/src/main/java/io/netty/example/http/websocketx/client/
public class Simulator {
public static void start() {
String serverIp = ;127.0.0.1;;
int serverPort = 8005;
for (int i = 0; i < 10000; i;;) {
WebSocketConnector client = new WebSocketConnector(serverIp,serverPort);
client.doConnect();
}
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.URI;
/**
* WebSocket协议类型的模拟客户端连接器类
*
* ;author duyanjun
* ;since 2022/10/13 杜燕军 新建
*/
;Slf4j
public class WebSocketConnector {
// 服务器ip
protected String serverIp;
// 服务器通信端口
protected int serverSocketPort;
// 网络通道
private Channel channel;
/**
* WebSocket协议类型的模拟客户端连接器构造方法
*
* ;param serverIp
* ;param serverSocketPort
*/
public WebSocketConnector(String serverIp,int serverSocketPort) {
this.serverIp = serverIp;
this.serverSocketPort = serverSocketPort;
}
public void doConnect() {
try {
String URL = ;ws://;; this.serverIp ; ;:; ; this.serverSocketPort ; ;/;;
URI uri = new URI(URL);
final WebSocketIoHandler handler =
new WebSocketIoHandler(
WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
//.option(ChannelOption.TCP_nodeLAY, true)
.option(ChannelOption.SO_KEEPALIVE,true)
.handler(new ChannelInitializer<SocketChannel>() {
;Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加一个http的编解码器
pipeline.addLast(new HttpClientCodec());
// 添加一个用于支持大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
// 添加一个聚合器;这个聚合器主要是将HttpMessage聚合成FullHttpRequest/Response
pipeline.addLast(new HttpObjectAggregator(1024 * 64));
pipeline.addLast(handler);
}
});
try {
synchronized (bootstrap) {
final ChannelFuture future = bootstrap.connect(this.serverIp, this.serverSocketPort).sync();
this.channel = future.channel();
}
} catch (InterruptedException e) {
log.error(;连接服务失败.......................uri:; ; uri.toString(),e);
}catch (Exception e) {
log.error(;连接服务失败.......................uri:; ; uri.toString(),e);
}
} catch (Exception e) {
log.error(;连接服务失败.......................;,e);
} finally {
}
}
public void disConnect() {
this.channel.close();
}
- 问题代码主要在上述代码中 EventLoopGroup group = new NioEventLoopGroup() 这样使用后就导致每个WebSocket连接会创建一个新的EventLoopGroup对象,导致无法创建大量的客户端连接;
- 正确用法应该是将EventLoopGroup提到WebSocketConnector类的上层;由WebSocketConnector对象的创建者维护一个公共的EventLoopGroup对象;所有WebSocketConnector对象共享一个EventLoopGroup对象;
import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
/**
* WebSocket协议类型的模拟客户端IO处理器类
*
* ;author duyanjun
* ;since 2022/10/13 杜燕军 新建
*/
;Slf4j
public class WebSocketIoHandler extends SimpleChannelInboundHandler<Object> {
private final WebSocketClientHandshaker handShaker;
private ChannelPromise handshakeFuture;
public WebSocketIoHandler(WebSocketClientHandshaker handShaker) {
this.handShaker = handShaker;
}
public ChannelFuture handshakeFuture() {
return handshakeFuture;
}
;Override
public void handlerAdded(ChannelHandlerContext ctx) {
handshakeFuture = ctx.newPromise();
}
;Override
public void channelActive(ChannelHandlerContext ctx) {
handShaker.handshake(ctx.channel());
}
;Override
public void channelInactive(ChannelHandlerContext ctx) {
ctx.close();
try {
super.channelInactive(ctx);
} catch (Exception e) {
log.error(;channelInactive 异常.;, e);
}
log.warn(;WebSocket链路与服务器连接已断开.;);
}
;Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (!handShaker.isHandshakeComplete()) {
try {
handShaker.finishHandshake(ch, (FullHttpResponse) msg);
handshakeFuture.setSuccess();
log.info(;WebSocket握手成功;可以传输数据了.;);
// 数据一定要封装成WebSocketFrame才能发达
String data = ;Hello;;
WebSocketFrame frame = new TextWebSocketFrame(data);
ch.writeAndFlush(frame);
} catch (WebSocketHandshakeException e) {
log.warn(;WebSocket Client failed to connect;);
handshakeFuture.setFailure(e);
}
return;
}
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException(
;Unexpected FullHttpResponse (getStatus=; ; response.status() ;
;, content=; ; response.content().toString(CharsetUtil.UTF_8) ; ;););
}
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
String s = textFrame.text();
log.info(;WebSocket Client received message: ; ; s);
} else if (frame instanceof PongWebSocketFrame) {
log.info(;WebSocket Client received pong;);
} else if (frame instanceof CloseWebSocketFrame) {
log.info(;WebSocket Client received closing;);
ch.close();
}
}
;Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error(;WebSocket链路由于发生异常,与服务器连接已断开.;, cause);
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause);
}
ctx.close();
super.exceptionCaught(ctx, cause);
}
;Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
// 如果写通道处于空闲状态,就发送心跳命令
if (IdleState.WRITER_IDLE.equals(event.state()) || IdleState.READER_IDLE.equals(event.state())) {
// 发送心跳数据
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
public class Simulator {
public static void start() {
String serverIp = ;127.0.0.1;;
int serverPort = 8005;
EventLoopGroup group = new NioEventLoopGroup();
for (int i = 0; i < 10000; i;;) {
WebSocketConnector client = new WebSocketConnector(serverIp,serverPort,group);
client.doConnect();
}
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.URI;
/**
* WebSocket协议类型的模拟客户端连接器类
*
* ;author duyanjun
* ;since 2022/10/13 杜燕军 新建
*/
;Slf4j
public class WebSocketConnector {
// 服务器ip
protected String serverIp;
// 服务器通信端口
protected int serverSocketPort;
// 事件循环线程池
protected EventLoopGroup group;
// 网络通道
private Channel channel;
/**
* WebSocket协议类型的模拟客户端连接器构造方法
*
* ;param serverIp
* ;param serverSocketPort
* ;param group
*/
public WebSocketConnector(String serverIp,int serverSocketPort,EventLoopGroup group) {
this.serverIp = serverIp;
this.serverSocketPort = serverSocketPort;
this.group = group;
}
public void doConnect() {
try {
String URL = ;ws://;; this.serverIp ; ;:; ; this.serverSocketPort ; ;/;;
URI uri = new URI(URL);
final WebSocketIoHandler handler =
new WebSocketIoHandler(
WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
//.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE,true)
.handler(new ChannelInitializer<SocketChannel>() {
;Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加一个http的编解码器
pipeline.addLast(new HttpClientCodec());
// 添加一个用于支持大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
// 添加一个聚合器;这个聚合器主要是将HttpMessage聚合成FullHttpRequest/Response
pipeline.addLast(new HttpObjectAggregator(1024 * 64));
pipeline.addLast(handler);
}
});
try {
synchronized (bootstrap) {
final ChannelFuture future = bootstrap.connect(this.serverIp, this.serverSocketPort).sync();
this.channel = future.channel();
}
} catch (InterruptedException e) {
log.error(;连接服务失败.......................uri:; ; uri.toString(),e);
}catch (Exception e) {
log.error(;连接服务失败.......................uri:; ; uri.toString(),e);
}
} catch (Exception e) {
log.error(;连接服务失败.......................;,e);
} finally {
}
}
public void disConnect() {
this.channel.close();
}
}
// Io数据处理类WebSocketIoHandler.java没有变化;使用二、错误用法演示->3、IO数据处理类中所示代码即可