一、背景
在当前团队中,我们测试不同的业务不同的需求之间往往有大量重复的测试用例,比如所有业务所有服务涉及的身份认证、签名校验都是同一套实现逻辑,开发可以直接复用的逻辑,但从测试角度所有功能实现下来都需要校验。为解决这个痛点问题,在这里实现apiSafetyScan接口自动化扫描策略服务并记录下来,后面工作中遇到类似问题可以来回顾下。
二、接口自动化检测流程
- 流程说明:开发在服务提测前,需要将服务所有的对外接口和内部接口在apiSafetyScan进行策略扫描,只有所有扫描策略通过后,才允许提测,需要在提测单贴上扫描结果图。
三、数据库设计
- scan_status表
CREATE TABLE `scan_status` (
`scan_id` VARCHAR(50) NOT NULL COLLATE 'utf8mb4_unicode_ci',
`status` VARCHAR(50) NOT NULL COLLATE 'utf8mb4_unicode_ci',
PRIMARY KEY (`scan_id`) USING BTREE
)
COMMENT='获取扫描状态'
COLLATE='utf8mb4_unicode_ci'
ENGINE=InnoDB
;
- scan_result表
CREATE TABLE `scan_results` (
`scan_id` VARCHAR(50) NOT NULL COLLATE 'utf8mb4_unicode_ci',
`result` VARCHAR(50) NOT NULL DEFAULT '0' COMMENT '0-未发现安全问题 1-存在安全问题' COLLATE 'utf8mb4_unicode_ci',
`status` VARCHAR(50) NOT NULL DEFAULT '' COLLATE 'utf8mb4_unicode_ci',
`scan_strategy` JSON NOT NULL,
`fail_data` JSON NOT NULL,
PRIMARY KEY (`scan_id`) USING BTREE
)
COMMENT='存储扫描结果'
COLLATE='utf8mb4_unicode_ci'
ENGINE=InnoDB
;
四、单接口扫描实现
@app.route('/scan', methods = ['POST'])
def api_scan():
# 入参协议校验
try:
validate(instance = request.json, schema = api_scan_schema)
except jsonschema.exceptions.ValidationError as err:
print(f'{request.remote_addr} request json error, err msg: {err}!')
return jsonify({"scan_id": None, "scan_status": False}), 400
# 生成scan_id
create_scan_id = str(int(time.time() * 1000)) + '_id'
# 基准测试
if 'params' in request.json.keys():
data_type = 'params'
re_data = request.json['params']
res = requests.request(method = request.json['method'], url = request.json['url'],
headers = request.json['headers'],
params = request.json['params'])
elif 'json' in request.json.keys():
data_type = 'json'
re_data = request.json['json']
res = requests.request(method = request.json['method'], url = request.json['url'],
headers = request.json['headers'],
json = request.json['json'])
else:
data_type = None
re_data = None
res = requests.request(method = request.json['method'], url = request.json['url'],
headers = request.json['headers'])
if res.status_code != 200:
return jsonify({"scan_id": create_scan_id, "scan_status": False}), 403
# 执行扫描策略
def scan(scan_id, method, url, headers, data, data_type):
all_scan_res = []
if strategy_config['sqlInjection']:
# sql注入扫描
all_scan_res.append(sql_injection_scan(method = method, url = url,
headers = headers,
data = data, data_type = data_type))
# xss扫描
if strategy_config['xssCheck']:
all_scan_res.append(xss_check_scan(method = method, url = url,
headers = headers,
data = data, data_type = data_type))
# 扫描完成后
redis_conn.set(f'scan_id:{scan_id}', json.dumps(all_scan_res), ex = 3600 * 24)
t = Thread(target = scan, args = (
create_scan_id, request.json['method'], request.json['url'], request.json['headers'], re_data, data_type))
t.start()
return jsonify({"scan_id": create_scan_id, "scan_status": True}), 200
五、获取扫描结果实现
@app.route('/scan_result', methods = ['GET'])
def scan_result():
scan_id = request.args['scan_id']
res = redis_conn.get(f'scan_id:{scan_id}')
if res is not None:
res = json.loads(res.decode('utf-8'))
else:
return jsonify({"result": False, "status": "no scanning", "fail_data": res}), 200
if res is []:
result = True
else:
result = False
return jsonify({"result": result, "status": "scan success", "fail_data": res}), 200
五、sql注入漏洞的扫描
将接口各入参进行sql注入字典集的遍历,如果存在sql注入漏洞,接口返回体可能会包含mysql的报错信息、返回多条数据。
def sql_injection_scan(method, url, headers=None, data=None, data_type=None):
if data is None:
return []
with open(file=DIR + '\\scan_business\\sqlInjectDb.txt', mode='r', encoding='utf-8') as f:
attack_f = f.readlines()
with open(file=DIR + '\\scan_business\\api_checkDB.txt', mode='r', encoding='utf-8') as f2:
check_list = f2.readlines()
data = deepcopy(data)
fail_data = []
# 遍历每一个字段
for k in data.keys():
# 遍历每一个攻击语句
for attack in attack_f:
data[k] = attack
# 接口参数篡改好后,发起请求
if data_type == 'params':
res = requests.request(method=method, url=url, params=data, headers=headers)
else:
res = requests.request(method=method, url=url, json=data, headers=headers)
# 接口结果分析
for i in check_list:
if i in res.text:
fail_data.append(
{"attack": attack, "scan_key": k, "status_code": res.status_code, "res": res.text})
return fail_data