p(f"Encoded msg is: {binascii.hexlify(random_msg).decode()}") p(f"We can stream {NUM_OF_BYTES_GUESSES} bytes of data before the sat kills the connection. Please help. (Send your message in hex.)")
bytes_left = NUM_OF_BYTES_GUESSES while bytes_left > 0andnot success: p(f"({bytes_left}) Msg: ", end="") line = sys.stdin.readline() line = line.replace("\n","")
l_len = len(line) bytes_left -= (l_len // 2) if l_len % 2 == 1: p("Must provide even-length string") continue elif l_len > 2*(MSG_SIZE-1): p(f"Size of msg cannot be greater than {MSG_SIZE-1}") continue elif l_len == 0: p(f"Size of msg must be greater than 0") continue elif bytes_left < 0: bytes_left += (l_len // 2) p(f"Msg too large, you only have {bytes_left} bytes left...") continue
# Check to make sure that the message doesn't contain "00" on a byte alignment for b_idx inrange(0, len(line)-2, 2): if"00" == line[b_idx:b_idx+2]: p("Must provide message with no NULL bytes (00)") continue try: msg = binascii.unhexlify(line) except binascii.Error as e: ifstr(e) == "Odd-length string": p("Must provide even-length string") continue # exit(-1) elifstr(e) == "Non-hexadecimal digit found": p("Must provide hexadecimal digits only") continue # exit(-1) continue encoded_msg = bh.encode_msg(msg) encoded_msg_hex = binascii.hexlify(encoded_msg).decode()
p(encoded_msg_hex)
if encoded_msg == random_msg: p(f"Satellite-link synced! Flag: {FLAG}") exit(0)
return success
if __name__ == "__main__": p("Generating black hole...\n") bh = BlackHole()
# 抓目标 Y m = re.search(r"Encoded msg is:\s*([0-9a-fA-F]+)", banner) assert m, "Failed to find target hex" Y = to_bytes(m.group(1)) assertlen(Y) == MSG_SIZE
# Phase 1: 扫描 L,发 0x01*L O1_by_L = {} dist_by_L = {} for L inrange(LOWER, UPPER + 1): msg = bytes([0x01]) * L out = send_and_get(io, msg) O1_by_L[L] = out # Hamming(逐字节不相等计数) dist_by_L[L] = sum(1for i inrange(MSG_SIZE) if out[i] != Y[i])
n = min(dist_by_L, key=dist_by_L.get) log.info(f"Detected n = {n}, distance = {dist_by_L[n]}")
O1 = O1_by_L[n]
# Phase 2: 一次性标号,恢复 trans 和 E v = [i + 2for i inrange(n)] # all distinct, avoid 0x00/0x01 Mmap = bytes(v) Omap = send_and_get(io, Mmap)
# 找出被影响的位置集合 S(Omap 与 O1 不同的位置) S = [p for p inrange(MSG_SIZE) if Omap[p] != O1[p]] assertlen(S) == n, f"Unexpected affected positions: {len(S)} != {n}"
trans = [None] * n # trans[i] = position p En = bytearray(O1) # start from O1; fix S positions to get E for p in S: delta = Omap[p] ^ O1[p] # = v_i ^ 0x01 vi = delta ^ 0x01 i = vi - 2 assert0 <= i < n trans[i] = p En[p] = O1[p] ^ 0x01# restore E at this position
assertall(t isnotNonefor t in trans)
# Phase 3: 还原 R 并提交 R = bytearray(n) for i inrange(n): p = trans[i] R[i] = Y[p] ^ En[p] assert R[i] != 0# 题目保证随机明文不含 0x00
final = bytes(R) # 发送最终消息,服务应回 Flag io.sendline(to_hex(final).encode())
# 服务器会先回一行编码后的hex,再回一行包含 Flag 的提示,然后断开 for _ inrange(3): line = io.recvline(timeout=2) ifnot line: break print(line.decode(errors="ignore"), end="") if"Flag"in line.decode(errors="ignore"): print("Flag found!") break
io.close()
if __name__ == "__main__": main()
sunfun
Dockerfile里要把python-dev换为python3-dev
先构建docker镜像
1
docker build challenge -t sunfun:challenge
然后
1
socat -v tcp-listen:12345,reuseaddr exec:"docker run --rm -i -e FLAG=flag{zulu49225delta\:GG1EnNVMK3-hPvlNKAdEJxcujvp9WK4rEchuEdlDp3yv_Wh_uvB5ehGq-fyRowvwkWpdAMTKbidqhK4JhFsaz1k} sunfun\:challenge"
# Fun in the Sun Challenge from cmath import acos, pi from numpy import dot, cross from numpy.linalg import norm from skyfield.api import load, wgs84 ts = load.timescale()
import os, sys from time import sleep from timeout import timeout, TimeoutError time = int(os.getenv("TIMEOUT",90))
# Challenge Intro defrender_intro(): art = [ " FUN ", " IN ", " THE ", " SUN!! ", " ", " y ", " / . ", " //// <O> ", " ////o--z ' ", " | ", " x ", " ", " " ]
for row in art: print(row) sleep(0.05) sys.stdout.flush() return
defquaternion(u,v): u = u / norm(u) v = v / norm(v)
# Set simulation epoch # May 21, 2022, 14:00 UTC t = ts.utc(2022, 5, 21, 14, 0)
# Challenge Question print("Provide the the quaternion (Qx, Qy, Qz, Qw) to point your spacecraft at the sun at",t.utc_strftime()) print("The solar panels face the -X axis of the spacecraft body frame or [-1,0,0]") print("Qx = ",end='') x = float(input()) print("Qy = ",end='') y = float(input()) print("Qz = ",end='') z = float(input()) print("Qw = ",end='') w = float(input())
# Check answer bodyV = [-1,0,0] # -x axis is solar panels in sat body frame
# TLE doesn't matter b/c this sat is so close to Earth relative to Earth-Sun distance # If it was farther away, need to convert the satellite geocentric coordinates (earth origin) to barycentric (sun origin) sunV = earth.at(t).observe(sun).position.km sunV = sunV / norm(sunV) #print(sunV) #sunQ = quaternion(bodyV,sunV) #print(sunQ)
print("The solar panels are facing %.3f degrees away from the sun" %angle) # If pointed at sun within 1 degree, accept the answer if angle < 1: return1
return0
if __name__ == "__main__": render_intro()
# Challenge success = challenge() if success: print("You got it! Here's your flag:") flag = os.getenv('FLAG') print(flag)
else: print("That didn't work, try again!")
给出了天体历de440s.bsp和卫星信息sat.tle
题目分析
数学工具函数
quaternion(u,v):构造把单位向量 u 旋到单位向量 v 的最小转角四元数,返回 [qx, qy, qz, qw](最后一位为标量部)。核心: $$q_{\text{vec}} = u \times v,\quad q_w = 1 + u\cdot v,\quad q = \frac{[q_{\text{vec}},, q_w]}{||[q_{\text{vec}},, q_w]||}$$
from numpy import array, cross, dot from numpy.linalg import norm from cmath import acos, pi from skyfield.api import load import numpy as np import os import sys
import os import sys import random import orbital import numpy as np import astropy from timeout import timeout,TimeoutError
to = int( os.getenv("TIMEOUT",120))
defstr_to_vec( str_in ): vec = np.fromstring( str_in , dtype=float, count=-1 , sep=',') if( vec.size != 3 ): raise"Vector doesnt have 3 elements" return vec defsingle( ): pos_tolerance = 10.0 vel_tolerance = 0.1 correct = True print("Orbital Elements: ") RE = 6378.0 a = 10.0 * RE e = 0.3 i = 63.0 RAAN = 25.0 peri = 78.0 mu = 398600.5# km^3 / s^2 n = np.sqrt( mu / np.power(a,3)) M = 10.0#random.random( ) * 360.0 times = ['2022-01-01T00:00:00'] t = astropy.time.Time(times, format='isot', scale='utc') orbit = orbital.elements.KeplerianElements( a=a*1000.0, e=e, i=np.deg2rad(i), raan=np.deg2rad(RAAN) , arg_pe=np.deg2rad(peri), M0=np.deg2rad(M), body=orbital.bodies.earth , ref_epoch=t ) orbit.propagate_anomaly_by(M=0.0) pos_truth = orbit.r/1000.0 vel_truth = orbit.v/1000.0 print("Given the following orbit elements: ") print("semi-major axis: {} km".format(a)) print("eccentricity: {}".format(e)) print("inclination: {} deg".format(i)) print("RAAN: {} deg".format(RAAN)) print("Mean anomaly: {} deg".format(M)) print("Argument of periapsis: {} deg".format(peri)) print("Time: {}".format( t[0] )) print("Find the state vector of the statellite") print("Position: X,Y,Z " , flush=True) pos_answer = input( ) p = str_to_vec( pos_answer ) print("Velocity: Vx,Vy,Vz", flush=True) vel_answer = input( ) v = str_to_vec( vel_answer ) dP = np.linalg.norm( pos_truth - p ) dV = np.linalg.norm( vel_truth - v ) print("dP: {} dV: {}".format( dP ,dV ), flush=True) if( dP > pos_tolerance ): print("Position incorrect", flush=True) correct = False if( dV > vel_tolerance ): print("Velocity incorrect", flush=True) correct = False return correct
@timeout( to ) defmain(): N = 5 for k inrange( N ): ok = single( ) if( False == ok ): print("Wrong", flush=True) print("....Later" , flush=True) sys.exit(0) print("Correct" , flush=True) flag = os.getenv("FLAG") print("Flag: {}".format( flag ) , flush=True)
if __name__ == "__main__": try: main() except TimeoutError: print("Timeout.....bye") sys.exit(0)
import socket import os import numpy as np import sys import re import sgp4.api as sp4 import datetime import helper
defget_value( text , preamble ): lines = text.split("\n") out = "" for line in lines: if( preamble in line ): out = line value = re.findall("\d+\.\d+", out)
returnfloat(value[0] ) defget_date( text ,preamble): lines = text.split("\n") out = "" for line in lines: if( preamble in line ): out = line.replace(preamble,"") out = out.replace("UTC","") out = out.strip() out=out+"+0000" date = datetime.datetime.strptime( out , '%Y-%m-%d %H:%M:%S%z')
return date defmean_to_eccentic( M , e , tol=1e-5):
if( M > np.pi ): M0 = np.pi*2 - M else: M0 = M E_guess = np.arange( 0 , np.pi , tol ) M_Guess = E_guess - e*np.sin(E_guess )
dM =abs( M0 - M_Guess ) ind = np.argmin( dM , axis=0) if( M > np.pi ): E = 2*np.pi - E_guess[ind] else: E = E_guess[ind] print("Eccentric anomaly is: {}".format( E)) return E
defsolve_single( sock ): print("Trying to solve", flush=True)
import os from sqlite3 import Time import numpy as np import random import siggen import argparse import threading import easteregg import sys
from timeout import timeout,TimeoutError
to = int( os.getenv("TIMEOUT",120))
defrun_siggen( gen , port ) : gen.send(port)
@timeout(to) defchallenge( args ): # Constants Fs = 100000.0 # allow a few random amplitudes amp_choices = [200, 400, 800] # increment by 2x amplitude ( 4x power since power is A^2) # Only one frquency freq_choices = [ Fs/32 ] freq_tolerance = 100# Hz snr_tolerance = 0.5# dB N = int(Fs) # Noise variance / Noise pwower noise_variance = 100.0 # Make choices at random amp = random.choice( amp_choices ) freq = random.choice( freq_choices ) # This is both the textbook and wikipedia definition of snr. # SNR = Psig / Pnoise # Psig = A^2 # Pnoise = var( noise ) snr = 10 * np.log10( amp*amp / noise_variance ) sample_host = os.getenv( "SERVICE_HOST", ) sample_port = int( os.getenv("SERVICE_PORT", 1000)) print("Connect via TCP to get the samples at {}:{}".format( sample_host,sample_port) , flush=True) print("The sample rate is {}".format( Fs ), flush=True) # Create some samples on a new thread gen = siggen.tcp_siggen( samp_rate=Fs , freq=freq , amp=amp, noise_var=noise_variance , N=N) t = threading.Thread( target=run_siggen , args=(gen,args.port) , daemon=True) t.start() # Ask the player what they think the answer is # If they answer with the correct dragonball quote then print the easter egg! print("What is the frequency of the signal? (Hz)") freq_str = input() easteregg.check_easteregg( freq_str ) print("What is the signal to noise ratio (dB)") snr_str = input() easteregg.check_easteregg( snr_str ) print("You answered freq: {} snr: {}".format( freq_str,snr_str)) # Make sure the answers are floats try: snr_answer = float( snr_str ) freq_answer = float( freq_str ) except: print("input malformed - type in numbers like 123.456789") sys.exit(0)
# Check if the answers are within the tolerance snr_correct = np.abs( snr - snr_answer ) < snr_tolerance freq_correct = np.abs( freq - freq_answer ) < freq_tolerance
if( snr_correct and freq_correct ): flag = os.getenv("FLAG") print("Here is your flag:") print(flag) else: # Wrong answer results in Napa's quote! print("What!!! there is no way that can be right!")
import os import socket import numpy as np import matplotlib.pyplot as plt import time defwork( sock , Fs , N ): print("Trying to get {} samples at {}".format(N,Fs)) count = 0 bytes_total = b"" # Get the sample bytes while( count < N*8 ): bytes_in = sock.recv( N*8 )# receive N complex64 bit numbers bytes_total = bytes_total + bytes_in count = count + len( bytes_in) # Make them into a complex 64 samples = np.frombuffer( bytes_total , dtype=np.dtype('complex64')) print("Got {} samples".format(len(samples))) # Take a FFT of the samples spectrum = np.fft.fft( samples , norm="forward") freqs = np.fft.fftfreq(samples.shape[-1]) * Fs power = np.real(spectrum * np.conj( spectrum ) ) max_ind = np.argmax( power ) # Make a copy of the spectrum but zero out the values around the signal - this leaves only noise noise_band = spectrum noise_band[max_ind] = 0# remove the tone from the signal to get the noise only (only works for a pure carrier like this problem) noise_band[max_ind-1] = 0 noise_band[max_ind+1] = 0 # Take an iift of the noise only signal so that we get things in time domain noise_only = np.fft.ifft( noise_band , norm='forward' )
# Get the frequency of the sinusioid and its power freq = freqs[ max_ind ] max_power = np.real(power[max_ind]) # Take the variance of the noise only signal to get its power noise_power = np.var( noise_only ) # Textbook definition of noise power is the variance of the noise # Put things in DB which is nice! sig_power_db = 10*np.log10( max_power) noise_power_db = 10*np.log10( noise_power ) snr = sig_power_db - noise_power_db print("I think the answers are") print("Frequency (HZ): {}".format(freq)) print("Signal Power: {}".format(sig_power_db)) print("Noise Power: {}".format(noise_power_db)) print("SNR (dB): {}".format(snr)) return (freq,snr)
defget_samp_rate( text ): preamble = "The sample rate is" lines = text.split("\n") for line in lines: if( preamble in line ): fs_str = line.replace(preamble,"") fs_str = fs_str.replace(" ","") fs_str = fs_str.replace("\n","") returnfloat( fs_str )
defsolve( host , port , sample_port ): # Get the challenge prompt print("Solver listening for challenge prompt on: {} {}".format( host,port)) prompts = socket.socket(socket.AF_INET, socket.SOCK_STREAM) prompts.connect((host, port)) ticket = os.getenv("CHAL_TICKET") if( ticket != None ): prompts.recv(1000) print("Sending ticket - {}".format( ticket ), flush=True) prompts.send( ticket.encode('utf-8')) prompts.send( "\n".encode('utf-8')) ticket_sent = True out = prompts.recv(1000) text = out.decode('utf-8') print(text, flush=True) # Create a socket to listen for the samples sample_host = '10.249.10.107' sample_port = 25000 print("Solver listening for smaples on: {} {}".format( sample_host, sample_port)) samps = socket.socket(socket.AF_INET, socket.SOCK_STREAM) samps.connect((sample_host, sample_port)) # Deduce the sample rate from the challenge Fs = get_samp_rate( text ) # Figure out what the frequency and snr is answers = work( samps, Fs, int(Fs) ) # Send the answers as text prompts.send( "{}\n".format(answers[0]).encode('utf-8')) prompts.send( "{}\n".format(answers[1]).encode('utf-8'))
time.sleep(3) out = prompts.recv(1000) print(out.decode('utf-8'), flush=True) print("Solver exiting")
if __name__ == "__main__": host = '10.249.10.107' port = 10000 sample_port = 25000 solve( host , port , sample_port )