black-hole

先构建docker镜像

1
docker build -f challenge/Dockerfile challenge -t black_hole:challenge

然后

1
socat -v tcp-listen:31337,reuseaddr exec:"docker run --rm -i black_hole\:challenge"

后面就能通过nc与31337端口交互

chal.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/env python3

import binascii
import numpy as np
import os
import random
import secrets
import sys

from collections import defaultdict

# Local imports
from libs.timeout import timeout, TimeoutError, MINUTE

MSG_SIZE = 256
LOWER_RANDOM_MSG_SIZE = int(MSG_SIZE * 0.55)
UPPER_RANDOM_MSG_SIZE = int(LOWER_RANDOM_MSG_SIZE * 0.37) + LOWER_RANDOM_MSG_SIZE
NUM_OF_BYTES_GUESSES = UPPER_RANDOM_MSG_SIZE*(UPPER_RANDOM_MSG_SIZE+1) // 2 + 3*UPPER_RANDOM_MSG_SIZE

# Challenge enviroment variables
FLAG = os.getenv("FLAG", "flag{0bscur3!s3cur3}")
TO = int(os.getenv("TIMEOUT", 5 * MINUTE))

# Print properly in docker/socket env
def p(s, end="\n", flush=True):
sys.stdout.write(f"{s}{end}")
if flush: sys.stdout.flush()

class BlackHole():
def __init__(self):
self.encoding = defaultdict(dict)
self.create_transpositions()
self.create_random_msgs()

def create_transpositions(self):
for num_len in range(1,MSG_SIZE+1):
perm = np.random.permutation(MSG_SIZE)
self.encoding[num_len]["trans"] = list(perm[:num_len])

def create_random_msgs(self):
for num_len in range(1,MSG_SIZE+1):
self.encoding[num_len]['encode'] = b'\x00'
while(b'\x00' in self.encoding[num_len]['encode']):
self.encoding[num_len]['encode'] = secrets.token_bytes(MSG_SIZE)

def encode_msg(self, msg):
msg_nums, msg_len = list(msg), len(msg)

encoded_msg = list(self.encoding[msg_len]["encode"])

for idx, num in enumerate(msg_nums):
encoded_msg[self.encoding[msg_len]["trans"][idx]] ^= num

assert len(encoded_msg) == MSG_SIZE

return bytes(encoded_msg)

@timeout(TO)
def challenge(bh):
success = False

rnd_msg_size = random.randint(LOWER_RANDOM_MSG_SIZE, UPPER_RANDOM_MSG_SIZE)
while(b'\x00' in (random_msg := secrets.token_bytes(rnd_msg_size))): continue

random_msg = bh.encode_msg(random_msg)

p(f"Encoded msg is: {binascii.hexlify(random_msg).decode()}")
p(f"We can stream {NUM_OF_BYTES_GUESSES} bytes of data before the sat kills the connection. Please help. (Send your message in hex.)")

def p_err_and_sub(err_msg):
nonlocal bytes_left
p(err_msg)
bytes_left -= l_len

bytes_left = NUM_OF_BYTES_GUESSES
while bytes_left > 0 and not success:
p(f"({bytes_left}) Msg: ", end="")
line = sys.stdin.readline()

line = line.replace("\n","")

l_len = len(line)
bytes_left -= (l_len // 2)
if l_len % 2 == 1:
p("Must provide even-length string")
continue
elif l_len > 2*(MSG_SIZE-1):
p(f"Size of msg cannot be greater than {MSG_SIZE-1}")
continue
elif l_len == 0:
p(f"Size of msg must be greater than 0")
continue
elif bytes_left < 0:
bytes_left += (l_len // 2)
p(f"Msg too large, you only have {bytes_left} bytes left...")
continue

# Check to make sure that the message doesn't contain "00" on a byte alignment
for b_idx in range(0, len(line)-2, 2):
if "00" == line[b_idx:b_idx+2]:
p("Must provide message with no NULL bytes (00)")
continue

try:
msg = binascii.unhexlify(line)
except binascii.Error as e:
if str(e) == "Odd-length string":
p("Must provide even-length string")
continue
# exit(-1)
elif str(e) == "Non-hexadecimal digit found":
p("Must provide hexadecimal digits only")
continue
# exit(-1)
continue

encoded_msg = bh.encode_msg(msg)
encoded_msg_hex = binascii.hexlify(encoded_msg).decode()

p(encoded_msg_hex)

if encoded_msg == random_msg:
p(f"Satellite-link synced! Flag: {FLAG}")
exit(0)

return success

if __name__ == "__main__":
p("Generating black hole...\n")
bh = BlackHole()

sys.stdin.flush()

try:
success = challenge(bh)
except TimeoutError:
p("\nTimeout, Bye")
sys.exit(1)

if not success:
p("You ran out of data to send! Bye.")

程序生成一个长度为 n 的随机明文(不含 0x00),经自定义编码器得到 256 字节密文 Y(十六进制打印给挑战者)。随后进入交互循环,可以多次提交十六进制字符串,服务将返回对应编码后的 256 字节十六进制。若某次返回值恰好等于最初打印的 Y,即判定同步成功并输出 flag。

全局参数与配额

  • MSG_SIZE = 256
  • 随机明文长度 n 在区间
    LOWER_RANDOM_MSG_SIZE = ⌊256×0.55⌋ = 140
    UPPER_RANDOM_MSG_SIZE = 140 + ⌊140×0.37⌋ = 191 之间;
  • 交互总预算字节(按提交明文字节数累计)为
    NUM_OF_BYTES_GUESSES = 191×192/2 + 3×191 = 18909

输入限制在循环内严格检查:必须偶数字符、长度 ≤ 2*(MSG_SIZE-1)、不得包含“00”字节(按字节对齐),否则提示并继续。

编码器结构

BlackHole 的初始化构造了两套与长度相关的参数:

  • 置换表 trans_L:对每个 L∈[1..256],先生成 np.random.permutation(256),取前 L 个作为该长度的投影位置们;
  • 底板 E_L:对每个 L,生成一个不含 0x00的 256 字节随机串。

编码函数 encode_msg(m) 的规则是:
E_|m| 复制为可变数组,然后对 i=0..|m|-1,令
encoded[trans_|m|[i]] ^= m[i]。返回 256 字节结果

目标密文

挑战开始时,随机取 n∈[140..191],生成长度为 n 的随机明文 R(不含 0x00),计算
Y = encode_msg(R) = E_n ⊕ P_n(R),并把 Y 的 hex 打印出来。这里 P_n(·) 表示把长度为 n 的向量按 trans_n 扔到 256 维上并异或聚合的线性映射。

解题推导(线性 + 碰撞判定)

目标是构造某条提交 M* 使 encode_msg(M*) == Y。由线性结构可知:

$$\text{encode}_n(M) = E_n \oplus P_n(M)$$

因此当我们能恢复 ntrans_nE_n 的受影响位置 后,便可由

$$R[i] = Y[\text{trans}_n[i]] \oplus E_n[\text{trans}_n[i]]$$

逐字节反推出目标明文 R,提交即获胜。关键是:如何在预算内恢复这些量

第一步:判定真实长度 n

策略:对每个候选 L∈[140..191],发送全 0x01 的消息 1^L,得到

$$O1_L = E_L \oplus P_L(1,1,\dots,1)$$

与目标 Y=E_n\oplus P_n(R) 做逐字节比较,计算不等计数 H_L

  • L≠nE_LE_n 独立随机,几乎所有 256 处都会不同,H_L≈256(实验中通常 250+)。
  • L= nE_n 抵消,只在 trans_nn 个位置出现 1⊕R[i] 的差异,其余 256−n 位置两边都是 E_n,因而 H_n≈n

于是H_L 最小的 L 即为 n。这一步的字节成本为

$$\sum_{L=140}^{191} L = 8606 \ll 18909$$

完全可行。上述判定逻辑与预算上限,均可在源码参数与循环里印证。

赛中日志如 Detected n = 177, distance = 175 即是该步的产物,距离值与 n 接近符合预期。

第二步:一次”打标”恢复置换 trans_n 与底板差分

在已经固定 n 的情况下,再发送一条长度为 n标号消息

$$M_{\text{map}} = (2,3,4,\dots, n+1)$$

(避免 0x00 与 0x01),得到

$$Omap = E_n \oplus P_n(M_{\text{map}})$$

把它与 $O1_n = E_n \oplus P_n(1,1,\dots,1)$ 做差:

$$
Omap[p] \oplus O1_n[p] =
\begin{cases}
v_i \oplus 0x01, & p = \text{trans}_n[i] \newline
0, & \text{否则}
\end{cases}
$$
因此所有非零差分的位置集合正好是置换像集 $$S = {\text{trans}_n[i]}$$。并且每个差分值唯一对应 i

$$i = \big(Omap[p]\oplus O1_n[p]\oplus 0x01\big) - 2$$

即可恢复完整的 trans_n 映射。此外,在这些受影响位置 p 上,

$$E_n[p] = O1_n[p] \oplus 0x01$$

从而也得到与 R 相关的 E_n 片段。该步只需再花费 n 字节预算。

第三步:反推 R 并提交

对每个 i∈[0..n-1],令 p = trans_n[i],有

$$R[i] = Y[p] \oplus E_n[p]$$

得到 R 后把它发回即可使 encode_n(R)=Y,触发输出 flag。该步再用 n 字节。合计预算

$$8606 + n + n \le 8606 + 382 = 8988 \ll 18909$$

稳稳在限制之内。预算与限制参见源码定义与输入计数逻辑。

exp.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from pwn import *
import re

HOST, PORT = "10.249.10.107", 31337
MSG_SIZE = 256
LOWER = int(MSG_SIZE * 0.55) # 140
UPPER = int(LOWER * 0.37) + LOWER # 191

def to_bytes(hexstr):
return bytes.fromhex(hexstr.strip())

def to_hex(bs):
return bs.hex()

def recv_until_prompt(io):
data = io.recvuntil(b") Msg: ")
return data.decode(errors="ignore")

def send_and_get(io, msg_bytes):
io.sendline(to_hex(msg_bytes).encode())
# 服务先回一行编码后的 512 hex,再打印下一次提示
enc_line = io.recvline().strip().decode()
_ = io.recvuntil(b") Msg: ")
return to_bytes(enc_line)

def main():
io = remote(HOST, PORT)
banner = recv_until_prompt(io)

# 抓目标 Y
m = re.search(r"Encoded msg is:\s*([0-9a-fA-F]+)", banner)
assert m, "Failed to find target hex"
Y = to_bytes(m.group(1))
assert len(Y) == MSG_SIZE

# Phase 1: 扫描 L,发 0x01*L
O1_by_L = {}
dist_by_L = {}
for L in range(LOWER, UPPER + 1):
msg = bytes([0x01]) * L
out = send_and_get(io, msg)
O1_by_L[L] = out
# Hamming(逐字节不相等计数)
dist_by_L[L] = sum(1 for i in range(MSG_SIZE) if out[i] != Y[i])

n = min(dist_by_L, key=dist_by_L.get)
log.info(f"Detected n = {n}, distance = {dist_by_L[n]}")

O1 = O1_by_L[n]

# Phase 2: 一次性标号,恢复 trans 和 E
v = [i + 2 for i in range(n)] # all distinct, avoid 0x00/0x01
Mmap = bytes(v)
Omap = send_and_get(io, Mmap)

# 找出被影响的位置集合 S(Omap 与 O1 不同的位置)
S = [p for p in range(MSG_SIZE) if Omap[p] != O1[p]]
assert len(S) == n, f"Unexpected affected positions: {len(S)} != {n}"

trans = [None] * n # trans[i] = position p
En = bytearray(O1) # start from O1; fix S positions to get E
for p in S:
delta = Omap[p] ^ O1[p] # = v_i ^ 0x01
vi = delta ^ 0x01
i = vi - 2
assert 0 <= i < n
trans[i] = p
En[p] = O1[p] ^ 0x01 # restore E at this position

assert all(t is not None for t in trans)

# Phase 3: 还原 R 并提交
R = bytearray(n)
for i in range(n):
p = trans[i]
R[i] = Y[p] ^ En[p]
assert R[i] != 0 # 题目保证随机明文不含 0x00

final = bytes(R)
# 发送最终消息,服务应回 Flag
io.sendline(to_hex(final).encode())

# 服务器会先回一行编码后的hex,再回一行包含 Flag 的提示,然后断开
for _ in range(3):
line = io.recvline(timeout=2)
if not line:
break
print(line.decode(errors="ignore"), end="")
if "Flag" in line.decode(errors="ignore"):
print("Flag found!")
break

io.close()

if __name__ == "__main__":
main()

sunfun

Dockerfile里要把python-dev换为python3-dev

先构建docker镜像

1
docker build challenge -t sunfun:challenge

然后

1
socat -v tcp-listen:12345,reuseaddr exec:"docker run --rm -i -e FLAG=flag{zulu49225delta\:GG1EnNVMK3-hPvlNKAdEJxcujvp9WK4rEchuEdlDp3yv_Wh_uvB5ehGq-fyRowvwkWpdAMTKbidqhK4JhFsaz1k} sunfun\:challenge"

后面就能通过nc与12345端口交互

challenge.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# Fun in the Sun Challenge
from cmath import acos, pi
from numpy import dot, cross
from numpy.linalg import norm
from skyfield.api import load, wgs84
ts = load.timescale()

import os, sys
from time import sleep
from timeout import timeout, TimeoutError
time = int(os.getenv("TIMEOUT",90))

# Challenge Intro
def render_intro():
art = [
" FUN ",
" IN ",
" THE ",
" SUN!! ",
" ",
" y ",
" / . ",
" //// <O> ",
" ////o--z ' ",
" | ",
" x ",
" ",
" "
]

for row in art:
print(row)
sleep(0.05)
sys.stdout.flush()

return

def quaternion(u,v):
u = u / norm(u)
v = v / norm(v)

q = [0,0,0,0]
q[0:3] = cross(u,v)
q[3] = 1 + dot(u,v)
q = q / norm(q)
return q

def vectorFromQuaternion(u,q):
v = u + 2*q[3]*cross(q[0:3],u) + 2*(cross(q[0:3],cross(q[0:3],u)))
return v

def angleBetweenVectors(u,v):
angle = acos(dot(u,v)/(norm(u)*norm(v)))
return angle.real

@timeout(time)
def challenge():
# Load satellites
sat = load.tle_file('sat.tle')
#if sat: print("Loaded",len(sat),"TLE")
#else: print("No TLEs :(")
sat = sat[0]

eph = load('de440s.bsp')
sun, earth, mars = eph['Sun'], eph['Earth'], eph['Mars Barycenter']

# Set simulation epoch
# May 21, 2022, 14:00 UTC
t = ts.utc(2022, 5, 21, 14, 0)

# Challenge Question
print("Provide the the quaternion (Qx, Qy, Qz, Qw) to point your spacecraft at the sun at",t.utc_strftime())
print("The solar panels face the -X axis of the spacecraft body frame or [-1,0,0]")

print("Qx = ",end='')
x = float(input())
print("Qy = ",end='')
y = float(input())
print("Qz = ",end='')
z = float(input())
print("Qw = ",end='')
w = float(input())

ansQ = [x, y, z, w]
ansQ = ansQ / norm(ansQ)
print("Quaternion normalized to:",ansQ)

# Check answer
bodyV = [-1,0,0] # -x axis is solar panels in sat body frame

# TLE doesn't matter b/c this sat is so close to Earth relative to Earth-Sun distance
# If it was farther away, need to convert the satellite geocentric coordinates (earth origin) to barycentric (sun origin)
sunV = earth.at(t).observe(sun).position.km
sunV = sunV / norm(sunV)
#print(sunV)
#sunQ = quaternion(bodyV,sunV)
#print(sunQ)

ansV = vectorFromQuaternion(bodyV,ansQ)
angle = angleBetweenVectors(sunV,ansV) * 180/pi

print("The solar panels are facing %.3f degrees away from the sun" %angle)
# If pointed at sun within 1 degree, accept the answer
if angle < 1:
return 1

return 0

if __name__ == "__main__":

render_intro()

# Challenge
success = challenge()

if success:
print("You got it! Here's your flag:")
flag = os.getenv('FLAG')
print(flag)

else:
print("That didn't work, try again!")

给出了天体历de440s.bsp和卫星信息sat.tle

题目分析

数学工具函数

  • quaternion(u,v):构造把单位向量 u 旋到单位向量 v 的最小转角四元数,返回 [qx, qy, qz, qw]最后一位为标量部)。核心:
    $$q_{\text{vec}} = u \times v,\quad q_w = 1 + u\cdot v,\quad q = \frac{[q_{\text{vec}},, q_w]}{||[q_{\text{vec}},, q_w]||}$$

  • vectorFromQuaternion(u,q):用单位四元数 $q=[\mathbf{q}, w]$ 旋转三维向量 u
    $$v = u + 2w(\mathbf{q}\times u) + 2,\mathbf{q}\times(\mathbf{q}\times u)$$
    这是把 $v’ = q,v,q^*$ 展开到纯向量形式后的高效实现。

  • angleBetweenVectors(u,v):$\theta=\arccos!\frac{u\cdot v}{||u||||v||}$,返回弧度(调用处再转度)。

challenge() 主流程

  1. 读取 TLE 与天体历(TLE 在这里基本无用,因为地—日距离远,地球附近卫星位置对单位方向几乎不影响)。
  2. 固定时刻:2022-05-21 14:00:00 UTC
  3. 提示输入四元数 (Qx,Qy,Qz,Qw),并单位化。
  4. 设机体系 -X 轴 为太阳能板法向:bodyV = [-1,0,0]
  5. 用 Skyfield 计算 sunV = unit( earth.at(t).observe(sun).position.km ),即地球观测太阳方向单位向量。
  6. 用用户四元数旋转 bodyVansV,与 sunV 求夹角;角度 < 1° 即判定正确,打印 FLAG

关键点:问题本质是求一个把机体 -X 轴对准太阳方向的旋转,以四元数形式提交。

解题步骤

步骤 0:明确坐标与约定

  • 机体坐标系:太阳能板朝向为 -X 轴,向量 $u = [-1,0,0]$。
  • 惯性/天球坐标earth.at(t).observe(sun) 得到的 $\mathbf{s}$ 是 ICRF 中的地球→太阳向量。单位化 $v=\frac{\mathbf{s}}{||\mathbf{s}||}$。
  • 四元数顺序:题目与代码均采用 $[Q_x, Q_y, Q_z, Q_w]$,标量在最后

步骤 1:计算目标方向

$v = \text{unit}\big(\text{Earth}(t)\rightarrow\text{Sun}(t)\big)$

步骤 2:由两向量求最小转角四元数

设 $u=[-1,0,0]$,$v$ 如上。

  • 旋转轴方向 $\propto u\times v$;
  • 转角 $\theta=\arccos(u\cdot v)$;
  • 轴角到四元数:
    $$q = \big[\hat{n}\sin\frac{\theta}{2}, \cos\frac{\theta}{2}\big]
    \quad\text{其中};\hat{n}=\frac{u\times v}{||u\times v||}$$
  • 等价的快捷构造(避免先显式求 $\theta$):
    $$q_{\text{vec}}=u\times v,\quad q_w=1+u\cdot v,\quad
    q=\frac{[q_{\text{vec}},q_w]}{||[q_{\text{vec}},q_w]||}$$
    该式与上式在非 180° 情况下等价,而且数值稳定、实现简单。

得到 $q=[Q_x,Q_y,Q_z,Q_w]$ 后,单位化并注意:$q$ 与 $-q$ 表示同一旋转,二者都正确。

步骤 3:自检(必做)

  • vectorFromQuaternion(或等价 q v q*)旋转 $u$ 得 $u’$。
  • 夹角:
    $$\varepsilon = \arccos\frac{u’\cdot v}{||u’||||v||}$$
    应接近 0°;满足题目阈值 $\varepsilon<1^\circ$ 即通过。

题面之所以说“TLE 不重要”,就在于我们只取单位方向 (v),地球附近几百/几千公里的位置差对 1AU 量级几乎无贡献。

相关公式与实现细节

  1. 角度与点积

$$\cos\theta = \frac{u\cdot v}{||u||,||v||}
\quad\Rightarrow\quad
\theta=\arccos(\cos\theta)$$
实现时应把 $\cos\theta$ 钳制到 $[-1,1]$:
cosang = max(min(cosang, 1.0), -1.0) 以避免浮点微超界导致 acos 返回 NaN。

  1. 轴角到四元数

设旋转轴单位向量 $\hat{n}$、转角 $\theta$,则
$$q =
\left[
\hat{n}\sin\frac{\theta}{2},
\cos\frac{\theta}{2}
\right]
= [q_x, q_y, q_z, q_w]$$
本题以标量部在最后为约定。

  1. “两向量最小旋转”快捷式的由来

$$\begin{aligned}
&\cos\theta = u\cdot v,\quad
\sin\theta\hat{n} = u\times v,\newline
&\Rightarrow
\cos\frac{\theta}{2} = \sqrt{\frac{1+\cos\theta}{2}}
=\sqrt{\frac{1+u\cdot v}{2}},\newline
&\sin\frac{\theta}{2}\hat{n}
=\frac{u\times v}{||u\times v||}\sin\frac{\theta}{2}
=\frac{u\times v}{\sqrt{2(1+u\cdot v)}},\newline
&\Rightarrow q \propto \big[u\times v, 1+u\cdot v\big],
\end{aligned}$$
归一化即可。

  1. 用四元数旋转向量(无显式矩阵)

把三维向量视作纯虚四元数 $p=[\mathbf{u},0]$,单位四元数 $q=[\mathbf{q},w]$:
$$p’ = qpq^*
\Rightarrow
\mathbf{u}’ =
\mathbf{u} + 2w(\mathbf{q}\times\mathbf{u}) + 2\mathbf{q}\times(\mathbf{q}\times\mathbf{u})$$
这是代码 vectorFromQuaternion 的来源,无需建 3×3 旋转矩阵

  1. 退化与鲁棒性
  • 180° 情况:若 $u\cdot v\approx -1$,则 $q_w=1+u\cdot v\approx 0$,且 $u\times v\approx 0$(两向量共线但反向),快捷式失效。
    处理:任选一条与 $u$ 不共线的轴(如 $[0,1,0]$ 或 $[0,0,1]$),令 $\hat{n}=\text{unit}(u\times \text{fallback})$,取 $q=[\hat{n}\sin\pi/2,\cos\pi/2]=[\hat{n},0]$。
  • 单位化:四元数与方向向量都要做单位化,避免累计误差。
  • 符号等价:$q$ 与 $-q$ 等价;若下游系统约定不同,需统一。
  1. 坐标系与方向
  • earth.at(t).observe(sun) 结果位于 ICRF(与 J2000 非常接近的惯性系),不需要为“近地卫星”额外将地心向量改为太阳系质心向量,因为只取单位方向
  • 航天器机体的“面向太阳”定义清晰:以-X 为板法向;若题面改为 “+X/±Y/±Z”,只需更新 $u$。

exp.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# -*- coding: utf-8 -*-
"""
计算把航天器机体 -X 轴旋转到太阳方向的四元数 [Qx, Qy, Qz, Qw]
时刻:2022-05-21 14:00:00 UTC
依赖:numpy, skyfield (pip install skyfield)
输入:sat.tle(不严格必需,但保留加载演示), de440s.bsp
"""

from numpy import array, cross, dot
from numpy.linalg import norm
from cmath import acos, pi
from skyfield.api import load
import numpy as np
import os
import sys

# === 如果 sat.tle / de440s.bsp 不与脚本同目录,请替换为你的真实路径 ===
TLE_PATH = os.path.join(os.path.dirname(__file__), "sat.tle")
BSP_PATH = os.path.join(os.path.dirname(__file__), "de440s.bsp")

def unit(v):
v = np.array(v, dtype=float)
return v / norm(v)

def quaternion_from_u_to_v(u, v):
"""
计算将向量 u 旋转到向量 v 的四元数 [qx, qy, qz, qw](qw 为标量部)。
假设不处于 180° 退化情形;若 dot(u,v) 接近 -1,建议添加备用轴处理。
"""
u = unit(u)
v = unit(v)
q_vec = cross(u, v)
q_w = 1.0 + float(dot(u, v))
q = np.array([q_vec[0], q_vec[1], q_vec[2], q_w], dtype=float)
q = q / norm(q)
return q

def rotate_vector_by_quaternion(u, q):
"""
用单位四元数 q=[qx,qy,qz,qw] 旋转向量 u,返回旋转后的向量。
v = u + 2*w*(q_vec × u) + 2*(q_vec × (q_vec × u))
"""
u = np.array(u, dtype=float)
q_vec = np.array(q[:3], dtype=float)
w = float(q[3])
return u + 2*w*cross(q_vec, u) + 2*cross(q_vec, cross(q_vec, u))

def angle_between(u, v):
"""返回向量夹角(度)。"""
u = np.array(u, dtype=float)
v = np.array(v, dtype=float)
cu, cv = norm(u), norm(v)
if cu == 0 or cv == 0:
return float('nan')
cosang = float(dot(u, v) / (cu * cv))
# 钳制到 [-1, 1] 防止浮点越界
cosang = max(min(cosang, 1.0), -1.0)
return float(acos(cosang).real * 180.0 / pi)

def main():
# 载入时标与星历
ts = load.timescale()
# TLE 实际不参与地日方向计算,这里只示范可用性;失败也不影响后续
try:
_ = load.tle_file(TLE_PATH)
except Exception as e:
print(f"[WARN] 加载 TLE 失败(无妨):{e}", file=sys.stderr)

eph = load(BSP_PATH)
sun, earth = eph["Sun"], eph["Earth"]

# 指定挑战时刻:2022-05-21 14:00:00 UTC
t = ts.utc(2022, 5, 21, 14, 0, 0)

# 地球在 t 时刻观测太阳的方向(单位向量)
sun_vec = earth.at(t).observe(sun).position.km
sun_dir = unit(sun_vec)

# 机体 -X 轴(太阳能板法向)
body_neg_x = np.array([-1.0, 0.0, 0.0])

# 计算四元数并单位化
q = quaternion_from_u_to_v(body_neg_x, sun_dir)

# 验证:旋转机体 -X 后与太阳方向的偏差角
rotated = rotate_vector_by_quaternion(body_neg_x, q)
err_deg = angle_between(rotated, sun_dir)

print("=== 结果 ===")
print("Qx, Qy, Qz, Qw =")
print("{:.16f} {:.16f} {:.16f} {:.16f}".format(q[0], q[1], q[2], q[3]))
print("验证偏差角(度):{:.6f}".format(err_deg))

# 如需模仿挑战程序的“是否通过”逻辑:
if err_deg < 1.0:
print("通过判定:夹角 < 1° ✅")
else:
print("未通过判定:夹角 >= 1° ❌")

if __name__ == "__main__":
main()

matters_of_state

Dockerfile里要把python-dev换为python3-dev

requirements.txt修改为

1
2
3
orbitalpy==0.7.0
numpy
represent==1.6.0.post0

先构建docker镜像

1
docker build challenge -t matters_of_state:challenge

然后

1
socat -v tcp-listen:12345,reuseaddr exec:"docker run --rm -i -e SEED=1465500232115169100 -e FLAG=flag{TESTflag1234} matters_of_state\:challenge"

后面就能通过nc与12345端口交互

challenge.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import os
import sys
import random
import orbital
import numpy as np
import astropy
from timeout import timeout,TimeoutError

to = int( os.getenv("TIMEOUT",120))

def str_to_vec( str_in ):
vec = np.fromstring( str_in , dtype=float, count=-1 , sep=',')
if( vec.size != 3 ):
raise "Vector doesnt have 3 elements"
return vec
def single( ):
pos_tolerance = 10.0
vel_tolerance = 0.1
correct = True
print("Orbital Elements: ")
RE = 6378.0
a = 10.0 * RE
e = 0.3
i = 63.0
RAAN = 25.0
peri = 78.0
mu = 398600.5 # km^3 / s^2
n = np.sqrt( mu / np.power(a,3))
M = 10.0 #random.random( ) * 360.0
times = ['2022-01-01T00:00:00']
t = astropy.time.Time(times, format='isot', scale='utc')
orbit = orbital.elements.KeplerianElements( a=a*1000.0, e=e, i=np.deg2rad(i), raan=np.deg2rad(RAAN) , arg_pe=np.deg2rad(peri), M0=np.deg2rad(M), body=orbital.bodies.earth , ref_epoch=t )
orbit.propagate_anomaly_by(M=0.0)
pos_truth = orbit.r/1000.0
vel_truth = orbit.v/1000.0
print("Given the following orbit elements: ")
print("semi-major axis: {} km".format(a))
print("eccentricity: {}".format(e))
print("inclination: {} deg".format(i))
print("RAAN: {} deg".format(RAAN))
print("Mean anomaly: {} deg".format(M))
print("Argument of periapsis: {} deg".format(peri))
print("Time: {}".format( t[0] ))
print("Find the state vector of the statellite")
print("Position: X,Y,Z " , flush=True)
pos_answer = input( )
p = str_to_vec( pos_answer )
print("Velocity: Vx,Vy,Vz", flush=True)
vel_answer = input( )
v = str_to_vec( vel_answer )
dP = np.linalg.norm( pos_truth - p )
dV = np.linalg.norm( vel_truth - v )
print("dP: {} dV: {}".format( dP ,dV ), flush=True)
if( dP > pos_tolerance ):
print("Position incorrect", flush=True)
correct = False
if( dV > vel_tolerance ):
print("Velocity incorrect", flush=True)
correct = False

return correct

@timeout( to )
def main():
N = 5

for k in range( N ):
ok = single( )
if( False == ok ):
print("Wrong", flush=True)
print("....Later" , flush=True)
sys.exit(0)
print("Correct" , flush=True)
flag = os.getenv("FLAG")
print("Flag: {}".format( flag ) , flush=True)

if __name__ == "__main__":
try:
main()
except TimeoutError:
print("Timeout.....bye")
sys.exit(0)

源码分析

给出开普勒轨道要素时间,让选手在给定公差内返回三维位置/速度向量(km 和 km/s)。

关键点:

  • 轨道参数(固定示例):

    • 地球半径 $R_E=6378\text{ km}$,半长轴 $a=10R_E$
    • 偏心率 $e=0.3$,倾角 $i=63^\circ$
    • 升交点赤经 $\Omega=25^\circ$
    • 近地点幅角 $\omega=78^\circ$
    • 平近点角 $M=10^\circ$
    • 历元 $t=$ 2022-01-01T00:00:00 UTC
  • 利用 orbital + astropy 构造轨道,并得到“真值” $\mathbf r,\mathbf v$(以 m、m/s 返回,随后换算成 km、km/s)。

  • 用户需输入:

    • 位置 X,Y,Z(km)
    • 速度 Vx,Vy,Vz(km/s)
  • 评分:

    • 位置误差范数 $||\Delta \mathbf r||\le 10 \text{ km}$
    • 速度误差范数 $||\Delta \mathbf v||\le 0.1 \text{ km/s}$
    • 若有一项超标,本轮失败并立刻结束;连续答对 N=5 轮后打印环境变量 FLAG

换言之,参赛者只要能把给定六根数按地球二体轨道模型,转换为 ECI 坐标系下的 $\mathbf r,\mathbf v$,并满足阈值,就能通关。

solver.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import socket
import os
import numpy as np
import sys
import re
import sgp4.api as sp4
import datetime
import helper

def get_value( text , preamble ):
lines = text.split("\n")
out = ""
for line in lines:
if( preamble in line ):
out = line
value = re.findall("\d+\.\d+", out)

return float(value[0] )
def get_date( text ,preamble):
lines = text.split("\n")
out = ""
for line in lines:
if( preamble in line ):
out = line.replace(preamble,"")
out = out.replace("UTC","")
out = out.strip()
out=out+"+0000"
date = datetime.datetime.strptime( out , '%Y-%m-%d %H:%M:%S%z')

return date
def mean_to_eccentic( M , e , tol=1e-5 ):

if( M > np.pi ):
M0 = np.pi*2 - M
else:
M0 = M
E_guess = np.arange( 0 , np.pi , tol )
M_Guess = E_guess - e*np.sin(E_guess )

dM =abs( M0 - M_Guess )
ind = np.argmin( dM , axis=0)
if( M > np.pi ):
E = 2*np.pi - E_guess[ind]
else:
E = E_guess[ind]
print("Eccentric anomaly is: {}".format( E))
return E


def solve_single( sock ):
print("Trying to solve", flush=True)

challenge = sock.read_until(["Find" ,"Flag","Wrong"])
print(challenge , flush=True)
if( "Wrong" in challenge):
sys.exit(0)
if( "Flag" in challenge):
print(sock.get_remaining() , flush=True)
sys.exit(0)



a = get_value( challenge , "semi-major")
e = get_value( challenge , "eccentricity")
i = get_value( challenge , "inc")
RAAN = get_value( challenge , "RAAN")
peri = get_value(challenge, "Argument")
M = get_value(challenge, "Mean")
mu = 398600.4418 # km^3 / s^2
#d = get_date( challenge , "Time:")
inc = np.deg2rad( i )
E = mean_to_eccentic( np.deg2rad(M), e )
TA = np.arccos( (np.cos(E)-e)/(1- e*np.cos(E)))
print( "True Anomaly is: {}".format( TA ))
R = np.deg2rad( RAAN )
W = np.deg2rad( peri ) + TA

r = a*(1- e*np.cos(E))
v = np.sqrt( mu * ( (2/r) - (1/a) ) )
fpa = np.arctan( e*np.sin(TA ) / (1+ e*np.cos(TA)))
if( TA > np.pi ):
fpa = -fpa
D1 = np.array( [np.cos(-W), np.sin(-W), 0 , -np.sin(-W), np.cos(-W) , 0 ,0 ,0 , 1]) # true anomaly rotation and argp
D2 = np.array( [1 , 0, 0 , 0, np.cos(-inc) , np.sin(-inc) , 0, -np.sin(-inc), np.cos(-inc)] ) # incl rotation
D3 = np.array( [np.cos(-R), np.sin(-R), 0 , -np.sin(-R), np.cos(-R) , 0 ,0 ,0 , 1]) # RAAN rotation

D1 = D1.reshape(3,3)
D2 = D2.reshape(3,3)
D3 = D3.reshape(3,3)

rot = np.matmul( D3, np.matmul( D2, D1 ))
rot2 = rot.transpose()
print( "FPA is {}".format( fpa))
print("Velocity is: {}".format( v))
v_r = v * np.sin( fpa )
v_w = v * np.cos( fpa )
R_VEC = np.array([r , 0 , 0 ])
V_VEC = np.array([v_r , v_w , 0 ])
R_VEC = R_VEC.reshape(3,1)
V_VEC = V_VEC.reshape(3,1)
print( V_VEC )

position = np.matmul( rot , R_VEC ).reshape(1,3)
velocity = np.matmul( rot,V_VEC).reshape(1,3)
position = position[0]
velocity = velocity[0]
pos = "{},{},{}\n".format( position[0], position[1], position[2])
vel = "{},{},{}\n".format( velocity[0], velocity[1], velocity[2])
print("I think the position is: {}".format( pos), flush=True)
print("I think the velocity is: {}".format( vel ), flush=True)

sock.send( pos )
sock.send( vel)



def batch_solve(host, port ):
s = helper.TcpReader( host, port )
ticket = os.getenv("CHAL_TICKET")
if( ticket != None ):
s.read_until("Ticket please:")
print("Sending ticket - {}".format( ticket ), flush=True)
s.send( ticket)
s.send( "\n")
ticket_sent = True

keep_going = True
while( keep_going ):
solve_single(s)


if __name__ == "__main__":
host = '10.249.10.107'
port = 12345
batch_solve( host , port )

解题脚本流程:

  1. 从环境变量获取连接参数:CHAL_HOSTCHAL_PORT。若有工单/凭证 CHAL_TICKET,先按服务端提示发送。

  2. 轮询读取服务端的挑战文本,判断三种情形:

    • Wrong:退出(上一轮答错)
    • Flag:读取并打印剩余内容,退出(通关)
    • Find:这是题面,进入求解。
  3. 解析题面中的数值:semi-majoreccentricityinc(倾角)、RAANArgument(近地点幅角)、Mean(平近点角)。

  4. 以标准地球引力常数 $\mu=398600.4418,\text{km}^3/\text{s}^2$ 进行二体问题求解:

    • $M\to E$(平近点角到偏近点角)
    • $E\to \nu$(偏近点角到真近点角)
    • $a,e,\nu\to r,v$(轨道半径与速度模长)
    • 计算飞行路径角 $\gamma$(Flight Path Angle, FPA),把 (v) 分解为径向与切向分量 $v_r,v_t$
    • 用旋转矩阵把**轨道平面坐标系(PQW/Perifocal)**矢量转到 ECI
  5. 把得到的 positionvelocity 以 “逗号分隔+换行” 的格式回发。

数学推导与公式说明

轨道六根数回顾

给定地球二体模型下的六根数:
$$(a, e, i, \Omega, \omega, M)$$
其中 $a$ 半长轴、$e$ 偏心率、$i$ 倾角、$\Omega$ 升交点赤经、$\omega$ 近地点幅角、$M$ 平近点角。假设历元即题面时间(脚本中未做历元传播)。

Kepler 方程:求偏近点角 $E$

Kepler 方程(椭圆轨道):
$$M = E - e\sin E \quad (0\le E < 2\pi)$$
解题脚本函数 mean_to_eccentic(M,e,tol) 用网格枚举在 $[0,\pi]$ 以 tol 为步长近似求解,并对 $M>\pi$ 做对称处理。虽然不如牛顿法精确高效,但在该题误差阈值下通常足够。

偏近点角 $E$ 到真近点角 $\nu$

脚本采用:
$$\nu=\arccos\left(\frac{\cos E - e}{1-e\cos E}\right)$$
然后根据 $M$ 的半周期对称性处理符号/象限问题(通过后续对飞行路径角的符号进行修正)。更稳健的办法是用半角公式配合 atan2
$$\tan\frac{\nu}{2}=\sqrt{\frac{1+e}{1-e}}\tan\frac{E}{2},\quad
\nu=2\ \mathrm{atan2}\left(\sqrt{1+e}\sin\frac{E}{2},\ \sqrt{1-e}\cos\frac{E}{2}\right)$$
atan2 能自动处理象限。

轨道半径 $r$ 与速度模长 $v$

  • 轨道半径(椭圆几何关系):
    $$r = a(1-e\cos E)$$
    也可写成
    $$r=\frac{p}{1+e\cos\nu},\quad p=a(1-e^2)$$
  • Vis-Viva 方程 得速度模长:
    $$v=\sqrt{\mu\left(\frac{2}{r}-\frac{1}{a}\right)}$$

飞行路径角 $\gamma$ 与速度分解

脚本用:
$$\gamma=\arctan\left(\frac{e\sin\nu}{1+e\cos\nu}\right)$$
若 $\nu>\pi$ 则取负号(相当于在远地点后半周速度方向的径向分量变号)。这与经典关系一致:
$$v_r = v\sin\gamma,\qquad v_t = v\cos\gamma$$
用更“教科书”的表达是(直接用 $p=a(1-e^2)$):
$$v_r=\sqrt{\frac{\mu}{p}}e\sin\nu,\qquad
v_t=\sqrt{\frac{\mu}{p}}(1+e\cos\nu)$$
两种写法等价;脚本选择先求 $v$ 再按 $\gamma$ 分解。

坐标系与旋转(PQW → ECI)

  • PQW/Perifocal(轨道平面)坐标系:
    $ \hat{\mathbf p} $ 指向近地点、$ \hat{\mathbf q} $ 为轨道切向、$ \hat{\mathbf w}=\hat{\mathbf p}\times\hat{\mathbf q} $ 指出轨道面法向。
  • ECI:惯性坐标系(地球赤道春分点为参考)。

标准从 PQW 到 ECI 的方向余弦矩阵(DCM)是:
$$\mathbf C_{PQW\to ECI} = R_3(-\Omega),R_1(-i),R_3(-\omega),$$
其中
$$R_3(\theta)=
\begin{bmatrix}
\cos\theta&\sin\theta&0\newline
-\sin\theta&\cos\theta&0\newline
0&0&1
\end{bmatrix},\quad
R_1(\phi)=
\begin{bmatrix}
1&0&0\newline
0&\cos\phi&\sin\phi\newline
0&-\sin\phi&\cos\phi
\end{bmatrix}$$

脚本做了一个等效变换:先把真近点角 $\nu$ 并入到第一步旋转
$$W = \omega + \nu$$
然后使用
$$\mathbf{rot} = R_3(-\Omega),R_1(-i),R_3(-W)$$
再令
$$\mathbf r_{PQW} = \begin{bmatrix} r \newline 0 \newline 0\end{bmatrix},\quad
\mathbf v_{PQW} = \begin{bmatrix} v_r \newline v_t \newline 0\end{bmatrix}$$
最后
$$\mathbf r_{ECI}=\mathbf{rot},\mathbf r_{PQW},\qquad
\mathbf v_{ECI}=\mathbf{rot},\mathbf v_{PQW}$$
这是完全正确的:将 $\nu$ 并到最后一个 $R_3$ 里等价于先在 PQW 里绕 $\hat{\mathbf w}$ 旋转 $\nu$ 后再做三旋。脚本中 D1,D2,D3 正是这三步的矩阵展开。

小结:位置在 PQW 中位于 $x$ 轴(近地点方向),速度在 PQW 中由 $(v_r,v_t,0)$ 表示,随后一次旋转到 ECI。

脚本实现细节与注意事项

文本解析(get_valueget_date

  • get_value(text, preamble):逐行寻找包含关键前缀的行,用正则 \d+\.\d+ 抓取第一个小数并转为 float

    • 局限:不支持整数(如 10 没有小数点)、负数、科学计数法;若行里有多个数字,它只取第一个。
    • 但题面格式固定、均为小数形式,因此能用。
  • get_date 未使用(被注释掉),题面给的时间只作为展示,脚本假定当前历元即题面时间,因此不做传播。

Kepler 方程求解(mean_to_eccentic

  • 采用网格搜索(步长 tol),并用对称性把 $[0,2\pi)$ 的问题折半到 $[0,\pi]$。
  • 精度由 tol 控制;容差适当即可满足误差阈值。

姿态旋转矩阵构造

  • 构造 D1=R3(-W)D2=R1(-i)D3=R3(-Ω),并按 rot = D3 @ (D2 @ D1) 相乘(从右到左依次作用)。
  • 然后 r=rot @ [r,0,0]^Tv=rot @ [v_r,v_t,0]^T,得到 ECI 坐标。

解题思路

  • 读题面 → 解析六根数
  • 求 $E$:解 $M=E-e\sin E$
  • 求 $\nu$:由 $E$ 得到真近点角
  • 求 $r,v$:$r=a(1-e\cos E)$,$v=\sqrt{\mu(2/r-1/a)}$
  • 分解速度:$(v_r,v_t)=(v\sin\gamma,,v\cos\gamma)$ 或直接用 $\sqrt{\mu/p}$ 公式
  • 坐标变换:$\mathbf r=\mathbf{rot}[r,0,0]^T$,$\mathbf v=\mathbf{rot}[v_r,v_t,0]^T$,其中
    $\mathbf{rot}=R_3(-\Omega)R_1(-i)R_3(-(\omega+\nu))$
  • 按协议回填:格式 x,y,z\nvx,vy,vz\n
  • 循环处理直至拿到 Flag

power_level

先构建docker镜像

1
docker build challenge -t power_level:challenge

然后

1
socat -v tcp-listen:10000,reuseaddr exec:"docker run --rm -i -e SEED=1465500232115169100 -e FLAG=flag{TESTflag1234} -p 25000\:10001 power_level\:challenge"

后面就能通过nc与10000和25000端口交互

challenge.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import os
from sqlite3 import Time
import numpy as np
import random
import siggen
import argparse
import threading
import easteregg
import sys

from timeout import timeout,TimeoutError

to = int( os.getenv("TIMEOUT",120))

def run_siggen( gen , port ) :
gen.send(port)


@timeout(to)
def challenge( args ):
# Constants
Fs = 100000.0
# allow a few random amplitudes
amp_choices = [200, 400, 800] # increment by 2x amplitude ( 4x power since power is A^2)
# Only one frquency
freq_choices = [ Fs/32 ]
freq_tolerance = 100 # Hz
snr_tolerance = 0.5 # dB
N = int(Fs)
# Noise variance / Noise pwower
noise_variance = 100.0

# Make choices at random
amp = random.choice( amp_choices )
freq = random.choice( freq_choices )
# This is both the textbook and wikipedia definition of snr.
# SNR = Psig / Pnoise
# Psig = A^2
# Pnoise = var( noise )
snr = 10 * np.log10( amp*amp / noise_variance )
sample_host = os.getenv( "SERVICE_HOST", )
sample_port = int( os.getenv("SERVICE_PORT", 1000))
print("Connect via TCP to get the samples at {}:{}".format( sample_host,sample_port) , flush=True)
print("The sample rate is {}".format( Fs ), flush=True)
# Create some samples on a new thread
gen = siggen.tcp_siggen( samp_rate=Fs , freq=freq , amp=amp, noise_var=noise_variance , N=N)
t = threading.Thread( target=run_siggen , args=(gen,args.port) , daemon=True)
t.start()
# Ask the player what they think the answer is
# If they answer with the correct dragonball quote then print the easter egg!
print("What is the frequency of the signal? (Hz)")
freq_str = input()
easteregg.check_easteregg( freq_str )
print("What is the signal to noise ratio (dB)")
snr_str = input()
easteregg.check_easteregg( snr_str )
print("You answered freq: {} snr: {}".format( freq_str,snr_str))
# Make sure the answers are floats
try:
snr_answer = float( snr_str )
freq_answer = float( freq_str )
except:
print("input malformed - type in numbers like 123.456789")
sys.exit(0)

# Check if the answers are within the tolerance
snr_correct = np.abs( snr - snr_answer ) < snr_tolerance
freq_correct = np.abs( freq - freq_answer ) < freq_tolerance

if( snr_correct and freq_correct ):
flag = os.getenv("FLAG")
print("Here is your flag:")
print(flag)
else:
# Wrong answer results in Napa's quote!
print("What!!! there is no way that can be right!")


print("Exiting")

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--sample-port", dest="port", type=int, required=True)
args = parser.parse_args()
service_host = os.getenv("SERVICE_HOST" , "localhost")
service_port = os.getenv("SERVICE_PORT" , args.port)
print("Samples available at: {} {} ".format( service_host, service_port))
try:
challenge(args)
except TimeoutError:
print("Timeout ....bye")

exp.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import os 
import socket
import numpy as np
import matplotlib.pyplot as plt
import time
def work( sock , Fs , N ):
print("Trying to get {} samples at {}".format(N,Fs))
count = 0
bytes_total = b""
# Get the sample bytes
while( count < N*8 ):
bytes_in = sock.recv( N*8 )# receive N complex64 bit numbers
bytes_total = bytes_total + bytes_in
count = count + len( bytes_in)
# Make them into a complex 64
samples = np.frombuffer( bytes_total , dtype=np.dtype('complex64'))
print("Got {} samples".format(len(samples)))
# Take a FFT of the samples
spectrum = np.fft.fft( samples , norm="forward")
freqs = np.fft.fftfreq(samples.shape[-1]) * Fs
power = np.real(spectrum * np.conj( spectrum ) )
max_ind = np.argmax( power )
# Make a copy of the spectrum but zero out the values around the signal - this leaves only noise
noise_band = spectrum
noise_band[max_ind] = 0 # remove the tone from the signal to get the noise only (only works for a pure carrier like this problem)
noise_band[max_ind-1] = 0
noise_band[max_ind+1] = 0
# Take an iift of the noise only signal so that we get things in time domain
noise_only = np.fft.ifft( noise_band , norm='forward' )

# Get the frequency of the sinusioid and its power
freq = freqs[ max_ind ]
max_power = np.real(power[max_ind])
# Take the variance of the noise only signal to get its power
noise_power = np.var( noise_only ) # Textbook definition of noise power is the variance of the noise
# Put things in DB which is nice!
sig_power_db = 10*np.log10( max_power)
noise_power_db = 10*np.log10( noise_power )
snr = sig_power_db - noise_power_db
print("I think the answers are")
print("Frequency (HZ): {}".format(freq))
print("Signal Power: {}".format(sig_power_db))
print("Noise Power: {}".format(noise_power_db))
print("SNR (dB): {}".format(snr))
return (freq,snr)

def get_samp_rate( text ):
preamble = "The sample rate is"
lines = text.split("\n")
for line in lines:
if( preamble in line ):
fs_str = line.replace(preamble,"")
fs_str = fs_str.replace(" ","")
fs_str = fs_str.replace("\n","")
return float( fs_str )


def solve( host , port , sample_port ):
# Get the challenge prompt
print("Solver listening for challenge prompt on: {} {}".format( host,port))
prompts = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
prompts.connect((host, port))
ticket = os.getenv("CHAL_TICKET")
if( ticket != None ):
prompts.recv(1000)
print("Sending ticket - {}".format( ticket ), flush=True)
prompts.send( ticket.encode('utf-8'))
prompts.send( "\n".encode('utf-8'))
ticket_sent = True

out = prompts.recv(1000)
text = out.decode('utf-8')
print(text, flush=True)
# Create a socket to listen for the samples
sample_host = '10.249.10.107'
sample_port = 25000
print("Solver listening for smaples on: {} {}".format( sample_host, sample_port))
samps = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
samps.connect((sample_host, sample_port))
# Deduce the sample rate from the challenge
Fs = get_samp_rate( text )
# Figure out what the frequency and snr is
answers = work( samps, Fs, int(Fs) )
# Send the answers as text
prompts.send( "{}\n".format(answers[0]).encode('utf-8'))
prompts.send( "{}\n".format(answers[1]).encode('utf-8'))

time.sleep(3)
out = prompts.recv(1000)
print(out.decode('utf-8'), flush=True)
print("Solver exiting")

if __name__ == "__main__":
host = '10.249.10.107'
port = 10000
sample_port = 25000
solve( host , port , sample_port )