×

cacti钉钉流量日报

hqy hqy 发表于2024-12-25 14:55:48 浏览39 评论0

抢沙发发表评论

mportant;">因业务需求,需要每天早上上班在钉钉群发送每个业务员对于的流量图,遂写此脚本进行发送。目前有两种方案,

1。使用python下载图片到本地web服务器,web服务器打开目录浏览,钉钉发送本地服务器的图片连接。

2。使用pythone直接发送cacti服务器中的图片url(需要cacti打开图片下载链接的免鉴权)。

第一种方法如下:

img.list文件格式如下

图片名称1,图形ID,token,备注
图片名称2,图形ID,token,备注
图片名称3,图形ID,token,备注
图片名称3,图形ID,token,备注
import os
import requests
import time
from datetime import datetime, timedelta
from dingtalkchatbot.chatbot import DingtalkChatbot

# 发送图片到钉钉群
def send_image_to_dingding(token, image_name, image_path):
    url = f"https://oapi.dingtalk.com/robot/send?access_token={token}"
    headers = {'Content-Type': 'application/json'}
    data = {
        "msgtype": "markdown",
        "markdown": {
            "title": "今日流量报告",
            "text": f"{image_name}\n\n![{image_name}]({image_path})"
        },
        "at": {
            "isAtAll": False
        },
    }
    response = requests.post(url, headers=headers, json=data)
    if response.status_code == 200:
        print(f"Image sent successfully to {token}")
        try:
            error_data = response.json()
            error_code = error_data.get('errcode', 'Unknown')
            error_msg = error_data.get('errmsg', 'Unknown')
            print(f"Error code: {error_code}, Error message: {error_msg}")
        except Exception as e:
            print(f"Failed to parse error response: {e}")
        with open('error_log.txt', 'a') as error_log:
            error_log.write(f"Failed to send image to {token}. Status code: {response.status_code}\n")
            error_log.write(f"Response content: {response.content}\n")
    else:
        print(f"Failed to send image to {token}. Status code: {response.status_code}")
        print(f"Response content: {response.content}")
        try:
            error_data = response.json()
            error_code = error_data.get('errcode', 'Unknown')
            error_msg = error_data.get('errmsg', 'Unknown')
            print(f"Error code: {error_code}, Error message: {error_msg}")
        except Exception as e:
            print(f"Failed to parse error response: {e}")
        with open('error_log.txt', 'a') as error_log:
            error_log.write(f"Failed to send image to {token}. Status code: {response.status_code}\n")
            error_log.write(f"Response content: {response.content}\n")

# 获取昨天的日期
yesterday = datetime.now() - timedelta(days=1)
yesterday_start = datetime(yesterday.year, yesterday.month, yesterday.day, 0, 0, 0)
yesterday_end = datetime(yesterday.year, yesterday.month, yesterday.day, 23, 59, 59)

# 将时间转换为 Unix 时间戳
start_timestamp = int(time.mktime(yesterday_start.timetuple()))
end_timestamp = int(time.mktime(yesterday_end.timetuple()))

# 存储图片的根目录
image_root_path = '/var/www/html'

# 读取 img.list 文件
with open('img.list', 'r') as file:
    for line in file:
        # 解析图片信息
        name, img_id, token, sender = line.strip().split(',')
        
        # 构建图片下载链接
        download_link = f"http://url:port/cacti/graph_image.php?local_graph_id={img_id}&graph_start={start_timestamp}&graph_end={end_timestamp}"
        
        # 下载图片
        response = requests.get(download_link)
        if response.status_code == 200:
            # 创建文件夹
            folder_name = yesterday.strftime("%Y-%m-%d")
            folder_path = os.path.join(image_root_path, folder_name)
            os.makedirs(folder_path, exist_ok=True)
            
            # 保存图片
            image_name = f"{name}-{yesterday.strftime('%Y-%m-%d')}.jpg"
            image_path = f"http://caimg.xipunet.com/{folder_name}/{image_name}"
            local_image_path = os.path.join(folder_path, image_name)
            with open(local_image_path, 'wb') as img_file:
                img_file.write(response.content)
                
            print(f"Image path: {image_path}")  # 打印图片路径
            
            # 发送图片到钉钉群
            send_image_to_dingding(token, image_name, image_path)

        else:
            print(f"Failed to download image for {name}")

nginx的配置文件如下(打开目录浏览):

/etc/nginx/sites-available/ca.conf
server {
    listen 80;
    server_name caimg.xipunet.com;

    root /var/www/html;
    index index.html;

    location / {
        autoindex on;
        charset utf-8;
    }

    # 添加其他可能需要的配置,如日志记录等
    access_log /var/log/nginx/caimg_access.log;
    error_log /var/log/nginx/caimg_error.log;
}

第二种方法则直接从cacti数据库中抓取指定图形树中的图片,并根据检索图形title中的特定字符串匹配不同的token,然后直接发送cacti服务器的图片连接,这个方法理论上要比第一种方法简单,而且不需要在每次cacti服务器添加新图形后再去报警服务器添加列表。具体代码如下:

import pymysql
import requests
import time

# MySQL数据库连接信息
mysql_host = "数据库地址"
mysql_port = 数据库端口mysql_username = "数据库用户名"
mysql_password = "数据库密码"
database_name = "表名"  # 数据库表位cacti

# 钉钉机器人token及发送地址
ding_token_mapping = {
    "字符串1": "https://oapi.dingtalk.com/robot/send?access_token=token1",
    "字符串2": "https://oapi.dingtalk.com/robot/send?access_token=token2",
    "字符串3": "https://oapi.dingtalk.com/robot/send?access_token=token3",
    "字符串4": "https://oapi.dingtalk.com/robot/send?access_token=token4",
}

# SQL查询语句 22为查询的图形树ID
sql_query = """
SELECT REPLACE
    ( REPLACE ( REPLACE ( title_cache, '/', '-' ), ' ', '' ), '*', '' ) AS title,
    graph_tree_items.local_graph_id AS id
FROM
    graph_tree_items
    LEFT JOIN graph_templates_graph ON graph_templates_graph.local_graph_id = graph_tree_items.local_graph_id 
WHERE
    title_cache NOT LIKE '%总带宽%' 
    AND graph_tree_items.graph_tree_id = 22 
    AND graph_tree_items.local_graph_id <> 0 
ORDER BY
    'id' DESC
"""

# 获取昨天的日期
yesterday = datetime.now() - timedelta(days=1)
yesterday_start = datetime(yesterday.year, yesterday.month, yesterday.day, 0, 0, 0)
yesterday_end = datetime(yesterday.year, yesterday.month, yesterday.day, 23, 59, 59)

# 将时间转换为 Unix 时间戳
start_timestamp = int(time.mktime(yesterday_start.timetuple()))
end_timestamp = int(time.mktime(yesterday_end.timetuple()))



# 连接MySQL数据库
connection = pymysql.connect(host=mysql_host,
                             port=mysql_port,
                             user=mysql_username,
                             password=mysql_password,
                             database=database_name,
                             charset='utf8',  # 修改字符集为utf8
                             cursorclass=pymysql.cursors.DictCursor)

try:
    with connection.cursor() as cursor:
        # 执行SQL查询
        cursor.execute(sql_query)
        result = cursor.fetchall()

        for row in result:
            title = row['title']
            id = row['id']

            # 检查标题中的特定字符串并匹配相应的token
            for keyword, token in ding_token_mapping.items():
                if keyword in title:
                    # 构建Markdown文本
                    markdown_title = "流量日报"
                    image_title = f"![{title}]"
                    image_link = f"http://{mysql_host}:CA端口/cacti/graph_image.php?local_graph_id={id}&graph_start={start_timestamp}&graph_end={end_timestamp}"
                    markdown_text = f"{image_title}\n\n{image_title}({image_link})"

                    # 发送Markdown文本到相应的钉钉token
                    ding_token = ding_token_mapping[keyword]
                    print({ding_token})
                    response = requests.post(ding_token, json={"msgtype": "markdown",
                                                               "markdown": {"title": markdown_title,
                                                                            "text": markdown_text}})
                    print(response.text)
                    break  # 如果找到匹配的关键字,则跳出内循环
        else:
            print("未找到匹配的关键字")
finally:
    connection.close()


打赏

本文链接:https://www.kinber.cn/post/4339.html 转载需授权!

分享到:


推荐本站淘宝优惠价购买喜欢的宝贝:

image.png

 您阅读本篇文章共花了: 

群贤毕至

访客