欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

大一上--数科作业(连载完结篇)

时间:2023-06-03

说明(read!me!please!!!):

①本篇目旨在讲解,所附代码为片段截取式,每一个片段侧重于展现一项功能,不同片段各有省略和重合部分,完整项目代码详见GitHub哦

②注意看代码注释,可以说是 相 当 之 详 细(不吹)


目录:

①功能要求

②流程概况

③代码简介


序:一些准备工作

python基本知识补充(附代码说明)

python相关学习笔记:

https://blog.csdn.net/zhuaishao_/category_11588953.html?spm=1001.2014.3001.5482

https://blog.csdn.net/zhuaishao_/category_11581861.html?spm=1001.2014.3001.5482

# 定义一个类# class 类名class Gun: # 类中的初始化方法:实例化类时自动调用 def __init__(self,model,count): Gun.model = model Gun.count = count # 类中的方法,可通过类名.方法名调用 # 类中的实例方法应含有self变量,self为实例名 def add_bullet(self,count_add): Gun.count = Gun.count + count_add def shoot(self): if Gun.count > 0: print('子弹发射') Gun.count -= 1 else: print("没子弹了")class Soldier: def __init__(self,name,gun): Soldier.name = name Soldier.gun = gun def __name__(self): return "士兵%s持有枪支%s" %(Soldier.name,Soldier.gun) def fire(self): if Gun.count > 0: print('开火') Gun.shoot(gun) print("子弹剩余%d" %Gun.count) else: print('没子弹了')# 实例化Gun类gun = Gun("AK47",3)# 输出实例的存储地址等信息print(gun)# 实例化Soldier,传入参数Xu = Soldier("许三多","AK47")print(Xu)# 使用类名调用类中封装的方法Gun.add_bullet(gun,3)Soldier.fire(Xu)# 类属性和类方法就是针对类对象定义的属性和方法# 在类方法中可以直接访问类属性和其他的类方法;# 除类方法外类中定义的其他方法为实例方法# 定义类,以object为父类(继承自object)class Tool(object): # 定义类变量count count = 0 # 修饰器,表明定义类方法 @classmethod # 类方法参数必含cls def show_tool_count(cls): print("内含工具的数量%d" %Tool.count) def __init__(self): Tool.count += 1# 开发时,如果需要在类中封装一个方法,这个方法:# ①需要访问实例属性,定义为实例方法(以self作为第一个参数)# ②需要访问类属性,定义为类方法# ③实例方法与类方法均不需要访问,则定义为静态方法即可(无需参数)class Dog(object): # 定义为静态方法 @staticmethod def run(): print("小狗跑步")# 使用类名.方法名调用静态方法,无需创建实例Dog.run()class Game(object): # 定义类变量 highest_score = 0 # 定义类方法 @classmethod def show_score(cls): print("历史最高分为%d" %cls.highest_score) def __init__(self,name): self.name = name def start_game(self): print("%s开始游戏" %self.name) # 定义静态方法 @staticmethod def help(): print("游戏说明")# 使用类名调用静态方法Game.help()# 使用类名调用类方法Game.show_score()# 实例化Game类player = Game("玩家1")# 使用实例名调用实例方法player.start_game()

flask官方参考文档:Welcome to Flask — Flask documentation (2.0.x)

websocket官方参考文档:websocket.org - Powered by Kaazing

redis官方参考文档:Welcome to redis-py’s documentation! — redis-py dev documentation


PART1:功能要求

制作一个老年求助机,设备实现当用户按下求助按键时,将不同的求助信息通过企业微信转发,显示到用户微信消息,用户通过微信回复消息,该消息可由求助机收到并显示

实现用户端(求助机)---用户端(手机微信)的双向通信


PART2:流程概况

前三步完成的功能逻辑:

第一步:开通企业微信,新建自定义应用用于消息转接 

第二步:调试

①发送HTTP请求,获取调用企业微信发送消息API的凭证

②使用API,发送HTTP请求,验证API有效性

③用户若成功接收到自定义应用发来的消息则证明发送消息接口可用,可进一步编写入代码中

第三步:验证

验证URL有效性,使自定义应用的后台(服务器)能接收到来自客户端的回复

前三步整合:企业微信自定义应用的收发消息---实际实现了HTTP服务器与用户之间的通信

第四步:

①在服务器中部署写入redis的代码实现服务器间通信

②在websocket服务器中部署写入websocket的代码实现服务器到单片机设备的双向通信


PART3:代码简介

1.HTTP服务器端代码:

基础配置:

from flask import Flask, request, jsonify, make_responseimport apifrom WXBizMsgCrypt import WXBizMsgCrypt, ETimport redis# 导入模块app = Flask(__name__)# 实例化flask类,创建项目对象token = api.get_token()my_crypt = WXBizMsgCrypt(api.receive_token, api.AESKey, api.corpid)# 调用企业微信api接口,获取token、AESkey、corpid等信息r = redis.Redis(host='redis', port=6379, db=0)p = r.pubsub()# 创建redis pubsub对象# redis 提供两个类 Redis 和 StrictRedis, StrictRedis 用于实现大部分官方的命令# Redis 是StrictRedis 的子类,用于向后兼用旧版本。

初始化设置: 

功能:

①验证URL有效性---回调请求

@app.route('/', methods=["GET"])# 注册路由:当访问域名根目录且使用GET请求方法时,执行以下函数def check(): # 企业微信验证SSL接口 if request.method == 'GET': msg_signature = request.args.get('msg_signature') timestamp = request.args.get('timestamp') nonce = request.args.get('nonce') echostr = request.args.get('echostr') content = my_crypt.VerifyURL(msg_signature, timestamp, nonce, echostr) response = make_response(content) return response # 将请求回调所需信息打包回复,以通过验证

主要应用flask中表示当前请求的request对象,request对象中保存一次HTTP请求的所有信息 

表单数据:通过表单让用户填写内容然后提交到服务器端的数据

查询参数:用函数方法进行查询操作时,函数或方法要使用的参数

form和data是用来提取请求体数据,通过request.form可以直接提取请求体中的表单格式的数据,是一个类字典的对象;args是用来提取url中的参数

Flask——request的form_data_args用法_活动的笑脸的博客-CSDN博客

②接收用户消息

@app.route('/', methods=["POST"]) # 注册路由def receive(): # 企业微信用户发送消息的处理接口 if request.method == 'POST': msg_signature = request.args.get('msg_signature') timestamp = request.args.get('timestamp') nonce = request.args.get('nonce') _, xml_content = my_crypt.DecryptMsg( request.get_data(), msg_signature, timestamp, nonce) content = ET.fromstring(xml_content).find("Content") user = ET.fromstring(xml_content).find("FromUserName")# 使用企业微信封装api,解码出消息内容和用户名 response = make_response(jsonify({'message': 'OK'}, 200))# 返回消息,表示成功接收 pass# ... return response

③redis订阅/广播消息

@app.route('/', methods=["POST"])def receive(): pass# ... r.publish('w2d-channel', user.text + ": " + content.text)# http服务器端需要向websocket服务器端发送消息# 即:向 ① wechat to desk 频道广播消息 return responsedef message_handler(message): api.send_message_all(message['data'].decode('utf-8'), token)# @message: String --从websocket服务器端publish来的消息对象# 将收到的订阅消息显示在自定义应用后台if __name__ == "__main__": app.config['SERVER_NAME'] = 'chichibomm.com:6666' p.subscribe(**{'d2w-channel': message_handler})# http服务器需要接收来自websocket服务器端的消息# 订阅 ② desk to wechat 频道至 message_handler 函数 p.run_in_thread(sleep_time=0.001)# 新开一个进程来保证订阅消息被处理# 多线程 app.run('0.0.0.0', debug=True, port=6666, ssl_context=( "/ssl/chichibomm.com.pem", "/ssl/chichibomm.com.key"))# 配置flask运行的服务器端口、ssl证书等

redis-py 带有可以订阅频道又可以聆听新信息的PubSub目标

创建一个PubSub的方法如下:

r = redis.StrictRedis(...)p = r.pubsub()

创建PubSub 实例后就可以聆听频道和模式了。

p.subscribe('my-first-channel', 'my-second-channel', ...)p.psubscribe('my-*', ...)

Redis 发布订阅 (pub/sub)是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。Redis 客户端可以订阅任意数量的频道。

      

当我们创建1个Flask实例对象(app),就可以通过app.config来查看这个app的所有配置变量

官方说明文档:Configuration Handling — Flask documentation (0.12.x)

SERVER_NAME 的作用:

①协助flask在活动的请求(request)之外生成绝对url

②用于子域名支持 

线程是程序中的一个单一的顺序控制流程,在单个程序中同时运行多个线程完成不同的工作,称为多线程

线程和进程的区别在于,子进程和父进程有不同的代码和数据空间,而多个线程则共享数据空间,多线程主要是为了节约CPU时间 

2.websocket服务器端代码:

基础配置:

from websocket_server import WebsocketServerimport redis# 导入redis和websocket模块r = redis.Redis(host='redis', port=6379, db=0)p = r.pubsub()# 连接到redisserver = WebsocketServer(host="0.0.0.0", port=2333)# 创建websocket对象

websocket是一种通信协议,实现网络全双工

通信

websocket提供了三个简单的函数,onopen,onclose以及onmessage,分别监听socket的开启、断开和消息状态。

服务器端需要给客户端推送消息,通常的做法就是轮询或长轮询
轮询:就是在每特定的时间内(例如每个2秒),向服务器端发送http请求,然后由服务器端返回最新的数据给用户端

第一次http请求----------客户端:有消息?---------------------服务器端:没有消息第二次http请求----------客户端:有消息?---------------------服务器端:没有消息重复http请求....

长轮询:客户端向服务器发送http请求,如果服务器端没有数据就不返回数据,等待时一直保持http连接,等到服务器端有数据后在返回数据,然后再继续请求。

第一次http请求----------客户端:有消息?---------------------服务器端:服务器没有消息,则不回应,一直保持http连接,等啊等。---------------------服务器端:有消息,返回给客户端信息,第二次http请求----------客户端:有消息?---------------------服务器端:服务器没有消息,则不回应,一直保持http连接,等啊等。---------------------服务器端:有消息,返回给客户端信息,一直重复http请求....

缺点:

①每次都要建立http连接,连接次数多,占用服务器带宽造成带宽浪费

②不是服务器发送消息主动发起的请求 

WebSocket实现网页实时聊天工具 | 消息推送 | 搭建WebSocket服务、python WebsocketServer_九瓜的博客-CSDN博客_网页聊天工具

功能:

①对是否连接的判断及反应:

# 定义连接成功和断开连接两个功能函数def new_client(client, server): """ 新的求助机连接的事件处理函数 @client: websocket client对象 @server: websocket server对象 """ print("当新的客户端连接时会提示:%s" % client['id']) r.publish('d2w-channel',"%s号求助机已连接" % client['id']) # 广播消息到d2w频道 def client_left(client, server): """ 求助机断开连接的事件处理函数 @client: websocket client对象 @server: websocket server对象 """ r.publish('d2w-channel',"%s号求助机断开连接" % client['id']) # 广播消息到d2w频道

 ②发送消息到服务器及处理接收到的消息

def message_received(client, server, message): """ 求助机向服务器端发送消息的事件处理函数 @client: websocket client对象 @server: websocket server对象 @message: 发送的消息对象 """ r.publish('d2w-channel',"%s号求助机发送消息:" % client['id'] + message) # 向 d2w 频道广播消息def message_handler(message): """ 订阅消息的处理函数 @message: String //从HTTP服务器端publish来的消息对象 """ server.send_message_to_all(message['data'].decode('utf-8')) # 将收到的消息广播至全体求助机(websocket发送消息到所有连接)

程序运行:

if __name__ == '__main__': p.subscribe(**{'w2d-channel': message_handler}) # 订阅 wechat to desk 频道至 message_handler 函数 server.set_fn_new_client(new_client) # 有新设备连接上(()内为定义的设备连接函数) server.set_fn_client_left(client_left) # 有设备断开连接(()内为定义的设备断开函数) server.set_fn_message_received(message_received) # 开始监听消息 thread = p.run_in_thread(sleep_time=0.001) # 新开一个进程来保证订阅消息被处理 server.run_forever() # 持续运行

附:服务器端 docker的使用

Docker作为一个软件集装箱化平台,可以让开发者构建应用程序时,将它与其依赖环境一起打包到一个容器中,然后很容易地发布和应用到任意平台中。

Docker广泛应用于Web应用的自动化打包和发布。

什么是Docker,它可干什么? - 听海漫步 - 博客园

version: '2'services: http_server: build: ./http_server/. links: - redis ports: - 6666:6666 networks: - helpdesk_network depends_on: - redis volumes: - /root/ssl:/ssl websocket_server: build: ./websocket_server/. links: - redis ports: - 2333:2333 networks: - helpdesk_network depends_on: - redis redis: image: redis networks: - helpdesk_network expose: - 6379networks: helpdesk_network: {}

3.MCU端代码:

MCU大部分功能在上一部分已经实现,以下主要展开websocket服务相关部分代码

基础配置:

①接线方式:

②导入库:

// Arduino架构库#include // 基于Arduino架构的显示屏驱动库#include // 基于Arduino架构的多按钮驱动库#include // 基于Arduino架构的蜂鸣器驱动库#include // 基于Arduino架构的WIFI连接库#include // 基于Arduino架构的Websocket协议库#include

③封装websocket服务:

附:以代码方式代替AT指令连接WiFi

// 导入库// 基于Arduino架构的WIFI连接库#include // 定义相关参数// 2.4G WiFi名称#define WIFI_SSID "luckin"// 对应的WiFi密码#define WIFI_PASS "20030708"// Websocket服务器域名/IP地址#define WEBSOCKET_SERVER "chichibomm.com"// 二次封装WiFi连接函数char connect(){ // 调用WiFi连接库,接入WiFiWiFi.begin(WIFI_SSID, WIFI_PASS); Serial.print("等待连接WIFI");int timeout_s = 30;// 当WiFi未接入,持续等待时显示“...” // while条件判断中与运算:需要同时满足&&前后两个条件,执行循环 // timeout_s--:每次循环timeout自减1 // 等待时延:delay(1000)*30 while (WiFi.status() != WL_ConNECTED && timeout_s-- > 0){delay(1000);Serial.print(".");} // 检测到超过时延后WiFi状态仍为未连接if (WiFi.status() != WL_CONNECTED){Serial.print("无法连接,请检查密码"); message = "无法连接,请检查密码";return 0;} // 检测到时延内WiFi状态为成功连接else{Serial.println("WIFI连接成功!"); message = "WIFI连接成功!";Serial.print(WiFi.localIP());return 1;}}

// 基于Arduino架构的Websocket协议库#include // 定义Websocket服务端口#define WEBSOCKET_PORT 2333// 定义Websocket方法路由#define WEBSOCKET_PATH "/"

// websocket事件逻辑定义// 定义事件处理函数// 参数WStype_t type判断websocket与服务器连接状态void webSocketEvent(WStype_t type, uint8_t * payload, size_t length){// C语言switch语句(不同并列条件对应不同处理方式) switch(type) { // 情况1:websocket未成功连接 case WStype_DISCONNECTED:Serial.printf("Websocket断开连接!n");message = "服务器断开连接"; // 调用封装好的蜂鸣器函数ez_beep(2);break;// 情况2:websocket成功连接 case WStype_CONNECTED:Serial.printf("Websocket已连接至路由:%sn", payload);message = "服务器已连接";break; // 情况3:websocket收到来自服务器的消息case WStype_TEXT:Serial.printf("Websocket收到信息: %sn", payload);message = String((const char*)payload);ez_beep(1);break;}}

// 尝试连接服务器webSocket.begin(WEBSOCKET_SERVER, WEBSOCKET_PORT, WEBSOCKET_PATH);webSocket.setReconnectInterval(5000);webSocket.onEvent(webSocketEvent);

主程序展现:

// 按钮按下时,定义触发逻辑void click(Button2 &btn){if (btn == buttonA){ // 单片机显示Serial.println("按键A按下"); // 传入显示函数的message参数message = "我生病了"; // 调用ez_beep(1); // 使用websocket发送消息"我生病了"到websocket服务器端webSocket.sendTXT("我生病了");} // ... pass}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。