$ pyhton strees.py 10

def send_request():
url = "http://127.0.0.1:11434/api/generate"
data = {
"model": "deepseek-r1:32b",
"prompt": "解释一下量子计算",
"stream": False
}
response = requests.post(url, json=data, timeout=None)


import requests
import threading
import sys
import time
def send_request():
url = "http://127.0.1.1:11434/api/generate"
data = {
"model": "deepseek-r1:32b",
"prompt": "解释一下量子计算",
"stream": False
}
while True:
try:
response = requests.post(url, json=data, timeout=None)
print(f"Status Code: {response.status_code}, Time: {response.elapsed.total_seconds()}s")
except Exception as e:
print(f"Request failed: {e}")
try:
concurrent_workers = int(sys.argv[1])
# 启动工作线程
for _ in range(concurrent_workers):
t = threading.Thread(target=send_request)
t.daemon = True
t.start()
# 添加线程数监控
while True:
# 计算实际工作线程数(总线程数 - 主线程)
current_workers = threading.active_count() - 1
print(f"\n[监控] 目标线程数: {concurrent_workers} | 实际运行线程数: {current_workers}")
time.sleep(3) # 每3秒输出一次监控信息
except KeyboardInterrupt:
print("\n[!] 已终止持续并发请求")本文链接:https://kinber.cn/post/4983.html 转载需授权!
推荐本站淘宝优惠价购买喜欢的宝贝:

支付宝微信扫一扫,打赏作者吧~
