$ pyhton strees.py 10
def send_request():
url = "http://127.0.0.1:11434/api/generate"
data = {
"model": "deepseek-r1:32b",
"prompt": "解释一下量子计算",
"stream": False
}
response = requests.post(url, json=data, timeout=None)
import requests import threading import sys import time def send_request(): url = "http://127.0.1.1:11434/api/generate" data = { "model": "deepseek-r1:32b", "prompt": "解释一下量子计算", "stream": False } while True: try: response = requests.post(url, json=data, timeout=None) print(f"Status Code: {response.status_code}, Time: {response.elapsed.total_seconds()}s") except Exception as e: print(f"Request failed: {e}") try: concurrent_workers = int(sys.argv[1]) # 启动工作线程 for _ in range(concurrent_workers): t = threading.Thread(target=send_request) t.daemon = True t.start() # 添加线程数监控 while True: # 计算实际工作线程数(总线程数 - 主线程) current_workers = threading.active_count() - 1 print(f"\n[监控] 目标线程数: {concurrent_workers} | 实际运行线程数: {current_workers}") time.sleep(3) # 每3秒输出一次监控信息 except KeyboardInterrupt: print("\n[!] 已终止持续并发请求")
本文链接:https://www.kinber.cn/post/4983.html 转载需授权!
推荐本站淘宝优惠价购买喜欢的宝贝: