前言
在项目提测阶段,很多公司都会要求bug日清,那么对于这个指标要求,大部分公司都会将这个任务分配给测试去跟进。
那么咱们测试人员,总不能说每天去盯着禅道,然后去找对应的开发吧。那么下面就是咱们测试人员的高光时刻,通过脚本去实现统计bug情况,并且钉钉@对应的开发~
话不多说,直接开干!
数据库封装
首先,我们需要连接禅道的数据库,查询出所有项目未关闭的bug数据。
# -*- coding: utf-8 -*-# @Time : 2020/12/20 4:25 PM# @Author : 余少琪# @FileName: mysql_db.py# @email : 1603453211@qq.com# @Software: 蜂goimport pymysqlfrom warnings import filterwarningsfrom config.setting import DevConfig# 忽略 Mysql 告警信息filterwarnings("ignore", category=pymysql.Warning)class MysqlDB(object): def __init__(self): self.config = DevConfig() # 建立数据库连接 self.conn = pymysql.connect( host='', user='', password='', db='' ) # 使用 cursor 方法获取操作游标,得到一个可以执行sql语句,并且操作结果为字典返回的游标 self.cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor) def __del__(self): # 关闭游标 self.cur.close() # 关闭连接 self.conn.close() def query(self, sql, state="all"): """ 查询 :param sql: :param state: all 是默认查询全部 :return: """ self.cur.execute(sql) if state == "all": # 查询全部 data = self.cur.fetchall() else: # 查询单条 data = self.cur.fetchone() return data def excute(self, sql): """ 更新 、 删除、 新增 :param sql: :return: """ try: # 使用 excute 操作 sql rows = self.cur.execute(sql) # 提交事务 self.conn.commit() return rows except Exception as e: print("数据库操作异常{0}".format(e)) # 如果事务异常,则回滚数据 self.conn.rollback()
封装钉钉通知
这里钉钉通知,我们需要用到钉钉机器人的 webhook、 sgin,这两个数据我是放在配置文件中读取的,大家可以直接写在代码中。
#!/usr/bin/env python# -*- coding: utf-8 -*-# @Time : 2021/11/25 13:15# @Author : 余少琪import base64import hashlibimport hmacimport timeimport urllib.parsefrom utils.FileOperation.yamlControl import GetYamlDatafrom dingtalkchatbot.chatbot import DingtalkChatbot, Feedlinkfrom config.setting import ConfigHandlerfrom typing import Anyclass DingTalkSendMsg(object): def __init__(self): self.timeStamp = str(round(time.time() * 1000)) self.sign = self.get_sign() self.devConfig = ConfigHandler() # 从yaml文件中获取钉钉配置信息 self.getDingTalk = GetYamlData(self.devConfig.config_path).get_yaml_data()['DingTalk'] # 获取 webhook地址 self.webhook = self.getDingTalk["webhook"] + "×tamp=" + self.timeStamp + "&sign=" + self.sign self.xiaoDing = DingtalkChatbot(self.webhook) self.Process = CaseCount() def get_sign(self) -> str: """ 根据时间戳 + "sign" 生成密钥 :return: """ secret = GetYamlData(ConfigHandler().config_path).get_yaml_data()['DingTalk']['secret'] string_to_sign = '{}n{}'.format(self.timeStamp, secret).encode('utf-8') hmac_code = hmac.new(secret.encode('utf-8'), string_to_sign, digestmod=hashlib.sha256).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) return sign def send_text(self, msg: str, mobiles=None) -> None: """ 发送文本信息 :param msg: 文本内容 :param mobiles: 艾特用户电话 :return: """ if not mobiles: self.xiaoDing.send_text(msg=msg, is_at_all=True) else: if isinstance(mobiles, list): self.xiaoDing.send_text(msg=msg, at_mobiles=mobiles) else: raise TypeError("mobiles类型错误 不是list类型.") def send_link(self, title: str, text: str, message_url: str, pic_url: str) -> None: """ 发送link通知 :return: """ try: self.xiaoDing.send_link(title=title, text=text, message_url=message_url, pic_url=pic_url) except Exception: raise def send_markdown(self, title: str, msg: str, mobiles=None, is_at_all=False) -> None: """ :param is_at_all: :param mobiles: :param title: :param msg: markdown 格式 """ if mobiles is None: self.xiaoDing.send_markdown(title=title, text=msg, is_at_all=is_at_all) else: if isinstance(mobiles, list): self.xiaoDing.send_markdown(title=title, text=msg, at_mobiles=mobiles) else: raise TypeError("mobiles类型错误 不是list类型.") @staticmethod def feed_link(title: str, message_url: str, pic_url: str) -> Any: return Feedlink(title=title, message_url=message_url, pic_url=pic_url) def send_feed_link(self, *arg) -> None: try: self.xiaoDing.send_feed_card(list(arg)) except Exception: raise
代码实现
实现思路:
这里会涉及到查询禅道的数据库中的数据,所以采用了继承 MysqlDB类考虑到通常我们封装的数据库类,通常都会连接公司的数据库,所以我在初始化方法中,重新编写了子类的连接信息封装获取未关闭项目ID方法查询所有项目中未关闭的bug总数、对应负责人、负责人未关闭的bug总数发送钉钉通知并@对方#!/usr/bin/env python# -*- coding: utf-8 -*-# @Time : 2022/2/17 16:13# @Author : 余少琪from utils.mysql.mysqlControl import MysqlDBimport pymysqlfrom utils.notice.dingtalkControl import DingTalkSendMsgclass Bugs(MysqlDB): """ bug 日清通知 """ def __init__(self): MysqlDB.__init__(self) try: # 建立数据库连接 self.conn = pymysql.connect( host="", user="root", password="", db='', port=3334 ) # 使用 cursor 方法获取操作游标,得到一个可以执行sql语句,并且操作结果为字典返回的游标 self.cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor) except Exception as e: print("数据库连接失败,失败原因{0}".format(e)) def get_project_ids(self) -> list: """ 获取项目 ID """ sql = "select * from zt_product where acl = 'open' and deleted = '0';" project_data = self.query(sql) ids = [] for i in project_data: ids.append(i['id']) return ids def get_bugs(self) -> list: BUGS = [] for i in self.get_project_ids(): """查询所有项目中未关闭的bug(不包含延期处理)""" sql = "select count(a.id) as bugCount, b.realName , c.name, b.mobile from zt_bug as a " "inner join zt_user as b inner join zt_product as c where a.assignedTo = b.account " "and a.product = c.id and a.product = {0} and a.deleted = '0' and a.status != 'closed' " "and resolution != 'postponed'GROUP BY a.assignedTo".format(i) # 统计出该项目中未关闭的bug对应的负责人,以及未关闭的总数 bugsCount = self.query(sql) BUGS.append(bugsCount) return BUGS def send_Notice_Msg(self): """发送钉钉通知@对应的负责人""" for bugs in self.get_bugs(): for i in bugs: text1 = "#### **禅道处理通知**n 执 行 人: {0} nn 未处理bug数量: {1} 个 nn 所 属 项 目: {2} " "nn[点我查看](http://114.55.234.41:8000/bug-browse-3-0-assigntome.html)".format(i['realName'], i['bugCount'], i['name']) if i['mobile'] != "": # 判断如果禅道中的用户有手机号码,才会@对方 DingTalkSendMsg().send_markdown(title="禅道处理通知", msg=text1, mobiles=[i['mobile']]) else: DingTalkSendMsg().send_markdown(title="禅道处理通知", msg=text1)if __name__ == '__main__': Bugs().send_Notice_Msg()
下面我们来看看禅道的日清通知吧~~