1. MyServerHandler是否必须继承ChannelInboundHandlerAdapter?

不,不是固定的。 继承 ChannelInboundHandlerAdapter 是最常见的方式,因为它让你可以选择性地重写你关心的事件方法(如 channelRead, exceptionCaught)。

但还有另一种选择:实现 ChannelInboundHandler 接口

  • ChannelInboundHandlerAdapter:是一个适配器类,它实现了 ChannelInboundHandler 接口,但所有方法都是空实现。你可以只重写你需要的方法,这是一种推荐的做法,更简洁。
  • ChannelInboundHandler:是一个接口,你必须实现里面的所有方法(包括你可能不关心的)。

所以,MyServerHandler 的核心是成为一个 ChannelInboundHandler,而继承适配器是实现这一目标最方便的方式。


2. 是否可以添加多个 .childHandler?流程是怎样的?

不可以直接添加多个 .childHandler 方法。 后一个 .childHandler 会覆盖前一个。

正确的做法是:在一个 ChannelInitializerinitChannel 方法中,向管道(Pipeline)添加多个处理器(Handler)。

Netty的处理器模型是基于 责任链模式 的。数据(比如接收到的消息)会在管道中的各个处理器之间传递。每个处理器负责完成一项特定的任务。

流程如下:

  1. 入站(Inbound)流程:当数据从网络通道(Channel)读取后,会从管道的头部开始,依次传递给每一个 入站处理器(InboundHandler)。例如:channelRead 事件会从第一个 InboundHandler 传递到最后一个。
  2. 出站(Outbound)流程:当要向通道写入数据时,会从管道的尾部开始,逆序传递给每一个 出站处理器(OutboundHandler)。例如:write 事件会从最后一个 OutboundHandler 传递到第一个。

示例:一个包含多个处理器的服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 获取管道
ChannelPipeline pipeline = ch.pipeline();

// 1. 首先添加一个解码器:将收到的ByteBuf解码成String
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));

// 2. 再添加一个自定义的业务处理器:处理String类型的消息
pipeline.addLast(new MyServerHandler());

// 3. 最后添加一个编码器:将发出的String编码成ByteBuf
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));

// 注意:还可以添加更多,比如日志处理器、加密解密处理器等。
// pipeline.addLast("logging", new LoggingHandler(LogLevel.INFO));
}
});

假设客户端发送了数据 “Hello”:

  1. StringDecoderchannelRead 方法被触发,它将 ByteBuf 类型的 msg 转换成了 String 类型的 “Hello”。
  2. StringDecoder 调用 ctx.fireChannelRead("Hello"),将转换后的字符串传递给下一个处理器。
  3. MyServerHandlerchannelRead 方法被触发,此时参数 msg 已经是一个 String 对象了,可以直接处理。
  4. MyServerHandler 处理完后,调用 ctx.writeAndFlush("服务端回复") 发送消息。
  5. 这个出站消息会先传递给 StringEncoder
  6. StringEncoderwrite 方法将 “服务端回复” 这个字符串编码回 ByteBuf,然后继续传递,最终通过网络发送给客户端。

关键点:

  • 顺序很重要! 解码器必须在业务处理器之前,编码器通常在之后。
  • 处理器需要调用 ctx.fireChannelRead(msg)ctx.write(msg) 将事件传递给链中的下一个处理器。如果忘记调用,责任链就会中断。

3. 如何实现文字与文件的传输?

实现文字和文件的混合传输,核心在于设计一个简单的协议来区分不同类型的消息。通常有两种主流方式:

方法一:使用定长消息头(推荐用于学习)

在消息开头用一个固定长度的字段来表示消息类型和长度。

例如,设计一个8字节的消息头:

  • 第1个字节:消息类型(比如 1 代表文本,2 代表文件)
  • 第2-8个字节:消息体的长度(7个字节可以表示很大的文件)

流程:

  1. 服务端/客户端首先有一个 解码器,它专门读取8个字节的头。
  2. 根据头部的类型,判断接下来要读取的是文本还是文件。
  3. 根据头部的长度,读取指定字节数的内容。
  4. 将读取到的内容(ByteBuf)交给不同的业务处理器处理。

Netty提供了 LengthFieldBasedFrameDecoder 来帮助你处理这种基于长度的分包,这是最专业和高效的方式。

方法二:使用分隔符(简单,适用于小文件或内网)

用特殊的字符序列来分隔不同的消息。例如,用 \n 分隔文本行,用一个特殊的 [FILE_END] 标记来表示文件传输结束。这种方式对于大文件不太可靠。

代码示例:方法一的简化版(自定义解码器)

这里提供一个非常简化的概念性代码,展示如何区分文本和文件。

1. 自定义协议消息类

1
2
3
4
5
6
7
public class MyMessage {
private byte type; // 1: text, 2: file
private int length; // 数据长度
private ByteBuf data; // 数据内容

// ... 构造方法、getter、setter ...
}

2. 自定义解码器(继承 ByteToMessageDecoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class MyMessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 可读字节必须大于头部长度的5 (1字节类型 + 4字节长度)
if (in.readableBytes() < 5) {
return; // 等待更多数据
}

in.markReaderIndex(); // 标记读取位置

byte type = in.readByte();
int length = in.readInt();

// 如果数据体还没有完全到达,重置读指针,等待下次触发
if (in.readableBytes() < length) {
in.resetReaderIndex();
return;
}

// 读取数据体
ByteBuf dataBuf = in.readBytes(length);
MyMessage message = new MyMessage();
message.setType(type);
message.setLength(length);
message.setData(dataBuf);

out.add(message); // 将解码好的对象传递给下一个Handler
}
}

3. 服务端管道配置

1
2
3
4
5
6
7
8
9
10
11
12
13
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

// 添加自定义解码器
pipeline.addLast(new MyMessageDecoder());
// 添加业务处理器
pipeline.addLast(new MyServerHandler());
// 如果需要回复,还要添加编码器(将MyMessage对象编码成ByteBuf)
// pipeline.addLast(new MyMessageEncoder());
}
});

4. 业务处理器 MyServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 现在收到的msg已经是解码后的MyMessage对象了
MyMessage myMessage = (MyMessage) msg;

if (myMessage.getType() == 1) {
// 处理文本
String text = myMessage.getData().toString(CharsetUtil.UTF_8);
System.out.println("收到文本: " + text);
// ... 处理文本逻辑 ...
} else if (myMessage.getType() == 2) {
// 处理文件
ByteBuf fileData = myMessage.getData();
// 将fileData写入到本地文件
// ... 处理文件逻辑(使用FileOutputStream等)...
System.out.println("收到文件,大小: " + myMessage.getLength() + " 字节");
}

// 记得释放ByteBuf,或者如果在MyMessage中已经retain()了,在这里release()
myMessage.getData().release();
}
}

总结:

  • 多个Handler:通过向Pipeline添加多个处理器实现,形成责任链。
  • 文件文字传输:核心是设计协议(如长度前缀法),并编写相应的解码器(Decoder) 来区分消息类型。业务处理器则根据类型进行不同的处理。对于大型文件,还需要考虑分块传输、断点续传等更复杂的机制。

设计一个简单的文本与文件互发案例

1. 协议设计

我们设计一个简单的协议:

  • 消息头:8字节
    • 第1字节:消息类型(1=文本,2=文件块)
    • 第2-5字节:数据长度(int,4字节)
    • 第6-8字节:文件块序号(int,3字节,0=文本消息或文件结束标记)

2. 公共组件

消息类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class FileMessage {
private byte type; // 消息类型:1=文本,2=文件块
private int length; // 数据长度
private int chunkIndex; // 文件块序号(0表示文本或文件结束)
private String fileName; // 文件名(仅文件传输使用)
private ByteBuf data; // 数据内容

// 构造方法
public FileMessage(byte type, int length, int chunkIndex, String fileName, ByteBuf data) {
this.type = type;
this.length = length;
this.chunkIndex = chunkIndex;
this.fileName = fileName;
this.data = data;
}

// getter和setter...
public byte getType() { return type; }
public int getLength() { return length; }
public int getChunkIndex() { return chunkIndex; }
public String getFileName() { return fileName; }
public ByteBuf getData() { return data; }

public void setFileName(String fileName) { this.fileName = fileName; }

@Override
public String toString() {
return "FileMessage{" +
"type=" + type +
", length=" + length +
", chunkIndex=" + chunkIndex +
", fileName='" + fileName + '\'' +
'}';
}
}

编码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class FileMessageEncoder extends MessageToByteEncoder<FileMessage> {
private static final Charset CHARSET = StandardCharsets.UTF_8;

@Override
protected void encode(ChannelHandlerContext ctx, FileMessage msg, ByteBuf out) throws Exception {
// 写入消息头:1字节类型 + 4字节长度 + 3字节块序号
out.writeByte(msg.getType());
out.writeInt(msg.getLength());
out.writeMedium(msg.getChunkIndex()); // 写入3字节

// 如果是文件传输,写入文件名(先写长度,再写内容)
if (msg.getType() == 2 && msg.getFileName() != null) {
byte[] fileNameBytes = msg.getFileName().getBytes(CHARSET);
out.writeByte(fileNameBytes.length); // 文件名长度(1字节)
out.writeBytes(fileNameBytes);
}

// 写入数据
if (msg.getData() != null) {
out.writeBytes(msg.getData());
}
}
}

解码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class FileMessageDecoder extends LengthFieldBasedFrameDecoder {
private static final Charset CHARSET = StandardCharsets.UTF_8;
private static final int MAX_FRAME_LENGTH = 1024 * 1024; // 1MB
private static final int LENGTH_FIELD_OFFSET = 1;
private static final int LENGTH_FIELD_LENGTH = 4;
private static final int LENGTH_ADJUSTMENT = 3; // 块序号3字节
private static final int INITIAL_BYTES_TO_STRIP = 0;

public FileMessageDecoder() {
super(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH,
LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP);
}

@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = (ByteBuf) super.decode(ctx, in);
if (frame == null) {
return null;
}

try {
byte type = frame.readByte();
int length = frame.readInt();
int chunkIndex = frame.readUnsignedMedium(); // 读取3字节

String fileName = null;
if (type == 2) {
// 读取文件名
byte fileNameLength = frame.readByte();
byte[] fileNameBytes = new byte[fileNameLength];
frame.readBytes(fileNameBytes);
fileName = new String(fileNameBytes, CHARSET);
}

// 读取数据
ByteBuf data = frame.readRetainedSlice(length);

return new FileMessage(type, length, chunkIndex, fileName, data);
} finally {
frame.release();
}
}
}

3. 服务端实现

服务端处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
public class FileServerHandler extends ChannelInboundHandlerAdapter {
private static final String FILE_STORAGE_DIR = "server_files/";
private Map<String, FileOutputStream> fileStreams = new ConcurrentHashMap<>();
private Map<String, Integer> expectedChunks = new ConcurrentHashMap<>();

static {
new File(FILE_STORAGE_DIR).mkdirs();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FileMessage) {
FileMessage fileMsg = (FileMessage) msg;

if (fileMsg.getType() == 1) {
// 文本消息
handleTextMessage(ctx, fileMsg);
} else if (fileMsg.getType() == 2) {
// 文件块消息
handleFileChunk(ctx, fileMsg);
}
} else {
ctx.fireChannelRead(msg);
}
}

private void handleTextMessage(ChannelHandlerContext ctx, FileMessage msg) {
String text = msg.getData().toString(StandardCharsets.UTF_8);
System.out.println("【服务端】收到文本: " + text);

// 回复确认
String response = "服务端已收到文本: " + text.substring(0, Math.min(text.length(), 10)) + "...";
sendTextMessage(ctx, response);

msg.getData().release();
}

private void handleFileChunk(ChannelHandlerContext ctx, FileMessage msg) {
String fileName = msg.getFileName();
int chunkIndex = msg.getChunkIndex();
ByteBuf chunkData = msg.getData();

try {
if (chunkIndex == 0) {
// 文件开始传输,创建文件流
FileOutputStream fos = new FileOutputStream(FILE_STORAGE_DIR + fileName);
fileStreams.put(fileName, fos);
expectedChunks.put(fileName, 0);
System.out.println("【服务端】开始接收文件: " + fileName);
}

FileOutputStream fos = fileStreams.get(fileName);
if (fos != null) {
// 写入文件块
byte[] bytes = new byte[chunkData.readableBytes()];
chunkData.readBytes(bytes);
fos.write(bytes);

expectedChunks.put(fileName, expectedChunks.get(fileName) + 1);

System.out.println("【服务端】收到文件块: " + fileName + " [块" + chunkIndex + "]");

// 发送确认(简单实现,实际应该更复杂)
if (chunkIndex % 10 == 0) {
sendTextMessage(ctx, "已收到 " + fileName + " 的第 " + chunkIndex + " 块");
}
}

} catch (Exception e) {
e.printStackTrace();
sendTextMessage(ctx, "文件传输错误: " + e.getMessage());
} finally {
chunkData.release();
}
}

private void sendTextMessage(ChannelHandlerContext ctx, String text) {
ByteBuf buffer = Unpooled.copiedBuffer(text, StandardCharsets.UTF_8);
FileMessage response = new FileMessage((byte) 1, buffer.readableBytes(), 0, null, buffer);
ctx.writeAndFlush(response);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 关闭所有文件流
for (FileOutputStream fos : fileStreams.values()) {
try { fos.close(); } catch (Exception e) { e.printStackTrace(); }
}
fileStreams.clear();
expectedChunks.clear();
super.channelInactive(ctx);
}
}

服务端启动类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class FileServer {
private static final int PORT = 8888;

public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new FileMessageDecoder(), // 解码器
new FileMessageEncoder(), // 编码器
new FileServerHandler() // 业务处理器
);
}
});

ChannelFuture future = bootstrap.bind(PORT).sync();
System.out.println("文件服务器启动成功,端口: " + PORT);
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

4. 客户端实现

客户端处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
public class FileClientHandler extends ChannelInboundHandlerAdapter {
private ChannelHandlerContext ctx;
private final String fileToSend;

public FileClientHandler(String fileToSend) {
this.fileToSend = fileToSend;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
System.out.println("【客户端】连接服务器成功");

// 连接成功后先发送文本消息
sendTextMessage("Hello Server! 准备发送文件: " + fileToSend);

// 然后发送文件
if (fileToSend != null && new File(fileToSend).exists()) {
new Thread(() -> sendFileInChunks(fileToSend)).start();
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FileMessage) {
FileMessage fileMsg = (FileMessage) msg;
if (fileMsg.getType() == 1) {
String text = fileMsg.getData().toString(StandardCharsets.UTF_8);
System.out.println("【客户端】收到回复: " + text);
}
fileMsg.getData().release();
}
}

private void sendTextMessage(String text) {
ByteBuf buffer = Unpooled.copiedBuffer(text, StandardCharsets.UTF_8);
FileMessage message = new FileMessage((byte) 1, buffer.readableBytes(), 0, null, buffer);
ctx.writeAndFlush(message);
}

private void sendFileInChunks(String filePath) {
try {
File file = new File(filePath);
String fileName = file.getName();
FileInputStream fis = new FileInputStream(file);
byte[] buffer = new byte[1024]; // 1KB块大小
int bytesRead;
int chunkIndex = 0;

// 发送文件开始标记(块序号为0)
sendFileChunk(fileName, new byte[0], 0);
Thread.sleep(100); // 稍作延迟

System.out.println("【客户端】开始发送文件: " + fileName);

while ((bytesRead = fis.read(buffer)) != -1) {
byte[] chunkData = Arrays.copyOf(buffer, bytesRead);
sendFileChunk(fileName, chunkData, ++chunkIndex);

// 模拟网络延迟
Thread.sleep(10);

if (chunkIndex % 50 == 0) {
System.out.println("【客户端】已发送 " + chunkIndex + " 个数据块");
}
}

fis.close();
System.out.println("【客户端】文件发送完成,总共 " + chunkIndex + " 个数据块");

// 发送完成消息
sendTextMessage("文件 " + fileName + " 发送完成!");

} catch (Exception e) {
e.printStackTrace();
sendTextMessage("文件发送错误: " + e.getMessage());
}
}

private void sendFileChunk(String fileName, byte[] data, int chunkIndex) {
ByteBuf buffer = Unpooled.copiedBuffer(data);
FileMessage message = new FileMessage((byte) 2, data.length, chunkIndex, fileName, buffer);
ctx.writeAndFlush(message);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

客户端启动类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class FileClient {
private static final String HOST = "localhost";
private static final int PORT = 8888;

public static void main(String[] args) throws Exception {
String fileToSend = args.length > 0 ? args[0] : "test.txt";

EventLoopGroup group = new NioEventLoopGroup();

try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new FileMessageDecoder(), // 解码器
new FileMessageEncoder(), // 编码器
new FileClientHandler(fileToSend) // 业务处理器
);
}
});

ChannelFuture future = bootstrap.connect(HOST, PORT).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}

5. 测试文件创建

创建一个测试文件 test.txt

1
2
3
echo "这是一个测试文件,用于验证Netty文件分块传输功能。" > test.txt
# 或者创建一个大文件进行测试
dd if=/dev/zero of=largefile.bin bs=1M count=5

6. 运行方式

  1. 启动服务端
1
2
javac *.java
java FileServer
  1. 启动客户端
1
java FileClient test.txt

关键特性说明

  1. 分块传输:文件被分成1KB的块逐个发送
  2. 协议设计:使用固定头部分辨消息类型
  3. 内存管理:正确使用ByteBuf的retain/release机制
  4. 并发安全:使用ConcurrentHashMap处理并发文件写入
  5. 错误处理:完善的异常处理和资源释放

这个实现展示了Netty如何处理混合类型的数据传输,你可以根据需要调整块大小、协议格式等参数。