因业务需求,需要每天早上上班在钉钉群发送每个业务员对于的流量图,遂写此脚本进行发送。目前有两种方案,
1。使用python下载图片到本地web服务器,web服务器打开目录浏览,钉钉发送本地服务器的图片连接。
2。使用pythone直接发送cacti服务器中的图片url(需要cacti打开图片下载链接的免鉴权)。
第一种方法如下:
img.list文件格式如下
图片名称1,图形ID,token,备注 图片名称2,图形ID,token,备注 图片名称3,图形ID,token,备注 图片名称3,图形ID,token,备注
import os
import requests
import time
from datetime import datetime, timedelta
from dingtalkchatbot.chatbot import DingtalkChatbot
# 发送图片到钉钉群
def send_image_to_dingding(token, image_name, image_path):
url = f"https://oapi.dingtalk.com/robot/send?access_token={token}"
headers = {'Content-Type': 'application/json'}
data = {
"msgtype": "markdown",
"markdown": {
"title": "今日流量报告",
"text": f"{image_name}\n\n"
},
"at": {
"isAtAll": False
},
}
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
print(f"Image sent successfully to {token}")
try:
error_data = response.json()
error_code = error_data.get('errcode', 'Unknown')
error_msg = error_data.get('errmsg', 'Unknown')
print(f"Error code: {error_code}, Error message: {error_msg}")
except Exception as e:
print(f"Failed to parse error response: {e}")
with open('error_log.txt', 'a') as error_log:
error_log.write(f"Failed to send image to {token}. Status code: {response.status_code}\n")
error_log.write(f"Response content: {response.content}\n")
else:
print(f"Failed to send image to {token}. Status code: {response.status_code}")
print(f"Response content: {response.content}")
try:
error_data = response.json()
error_code = error_data.get('errcode', 'Unknown')
error_msg = error_data.get('errmsg', 'Unknown')
print(f"Error code: {error_code}, Error message: {error_msg}")
except Exception as e:
print(f"Failed to parse error response: {e}")
with open('error_log.txt', 'a') as error_log:
error_log.write(f"Failed to send image to {token}. Status code: {response.status_code}\n")
error_log.write(f"Response content: {response.content}\n")
# 获取昨天的日期
yesterday = datetime.now() - timedelta(days=1)
yesterday_start = datetime(yesterday.year, yesterday.month, yesterday.day, 0, 0, 0)
yesterday_end = datetime(yesterday.year, yesterday.month, yesterday.day, 23, 59, 59)
# 将时间转换为 Unix 时间戳
start_timestamp = int(time.mktime(yesterday_start.timetuple()))
end_timestamp = int(time.mktime(yesterday_end.timetuple()))
# 存储图片的根目录
image_root_path = '/var/www/html'
# 读取 img.list 文件
with open('img.list', 'r') as file:
for line in file:
# 解析图片信息
name, img_id, token, sender = line.strip().split(',')
# 构建图片下载链接
download_link = f"http://url:port/cacti/graph_image.php?local_graph_id={img_id}&graph_start={start_timestamp}&graph_end={end_timestamp}"
# 下载图片
response = requests.get(download_link)
if response.status_code == 200:
# 创建文件夹
folder_name = yesterday.strftime("%Y-%m-%d")
folder_path = os.path.join(image_root_path, folder_name)
os.makedirs(folder_path, exist_ok=True)
# 保存图片
image_name = f"{name}-{yesterday.strftime('%Y-%m-%d')}.jpg"
image_path = f"http://caimg.xipunet.com/{folder_name}/{image_name}"
local_image_path = os.path.join(folder_path, image_name)
with open(local_image_path, 'wb') as img_file:
img_file.write(response.content)
print(f"Image path: {image_path}") # 打印图片路径
# 发送图片到钉钉群
send_image_to_dingding(token, image_name, image_path)
else:
print(f"Failed to download image for {name}")nginx的配置文件如下(打开目录浏览):
/etc/nginx/sites-available/ca.conf
server {
listen 80;
server_name caimg.xipunet.com;
root /var/www/html;
index index.html;
location / {
autoindex on;
charset utf-8;
}
# 添加其他可能需要的配置,如日志记录等
access_log /var/log/nginx/caimg_access.log;
error_log /var/log/nginx/caimg_error.log;
}第二种方法则直接从cacti数据库中抓取指定图形树中的图片,并根据检索图形title中的特定字符串匹配不同的token,然后直接发送cacti服务器的图片连接,这个方法理论上要比第一种方法简单,而且不需要在每次cacti服务器添加新图形后再去报警服务器添加列表。具体代码如下:
import pymysql
import requests
import time
# MySQL数据库连接信息
mysql_host = "数据库地址"
mysql_port = 数据库端口mysql_username = "数据库用户名"
mysql_password = "数据库密码"
database_name = "表名" # 数据库表位cacti
# 钉钉机器人token及发送地址
ding_token_mapping = {
"字符串1": "https://oapi.dingtalk.com/robot/send?access_token=token1",
"字符串2": "https://oapi.dingtalk.com/robot/send?access_token=token2",
"字符串3": "https://oapi.dingtalk.com/robot/send?access_token=token3",
"字符串4": "https://oapi.dingtalk.com/robot/send?access_token=token4",
}
# SQL查询语句 22为查询的图形树ID
sql_query = """
SELECT REPLACE
( REPLACE ( REPLACE ( title_cache, '/', '-' ), ' ', '' ), '*', '' ) AS title,
graph_tree_items.local_graph_id AS id
FROM
graph_tree_items
LEFT JOIN graph_templates_graph ON graph_templates_graph.local_graph_id = graph_tree_items.local_graph_id
WHERE
title_cache NOT LIKE '%总带宽%'
AND graph_tree_items.graph_tree_id = 22
AND graph_tree_items.local_graph_id <> 0
ORDER BY
'id' DESC
"""
# 获取昨天的日期
yesterday = datetime.now() - timedelta(days=1)
yesterday_start = datetime(yesterday.year, yesterday.month, yesterday.day, 0, 0, 0)
yesterday_end = datetime(yesterday.year, yesterday.month, yesterday.day, 23, 59, 59)
# 将时间转换为 Unix 时间戳
start_timestamp = int(time.mktime(yesterday_start.timetuple()))
end_timestamp = int(time.mktime(yesterday_end.timetuple()))
# 连接MySQL数据库
connection = pymysql.connect(host=mysql_host,
port=mysql_port,
user=mysql_username,
password=mysql_password,
database=database_name,
charset='utf8', # 修改字符集为utf8
cursorclass=pymysql.cursors.DictCursor)
try:
with connection.cursor() as cursor:
# 执行SQL查询
cursor.execute(sql_query)
result = cursor.fetchall()
for row in result:
title = row['title']
id = row['id']
# 检查标题中的特定字符串并匹配相应的token
for keyword, token in ding_token_mapping.items():
if keyword in title:
# 构建Markdown文本
markdown_title = "流量日报"
image_title = f"![{title}]"
image_link = f"http://{mysql_host}:CA端口/cacti/graph_image.php?local_graph_id={id}&graph_start={start_timestamp}&graph_end={end_timestamp}"
markdown_text = f"{image_title}\n\n{image_title}({image_link})"
# 发送Markdown文本到相应的钉钉token
ding_token = ding_token_mapping[keyword]
print({ding_token})
response = requests.post(ding_token, json={"msgtype": "markdown",
"markdown": {"title": markdown_title,
"text": markdown_text}})
print(response.text)
break # 如果找到匹配的关键字,则跳出内循环
else:
print("未找到匹配的关键字")
finally:
connection.close()本文链接:https://kinber.cn/post/4339.html 转载需授权!
推荐本站淘宝优惠价购买喜欢的宝贝:

支付宝微信扫一扫,打赏作者吧~
