欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Python操作MongoDb与Redis以及ODM

时间:2023-09-01
前言:

今天在回家的火车上把这篇文章写了~

上篇文章连接:https://blog.csdn.net/weixin_51485807/article/details/122568825

祝大家新年快乐吧~具体看上篇前言

首发于:https://sleepymonster.cn

Python操作MongoDb

$ pip install pymongo

这里的能满足基本需要,要聚合查询或者整花活记得取官方文档去看看

import pymongofrom bson import ObjectIdfrom pymongo import MongoClient# 连接数据库client = MongoClient("mongodb://localhost:27017/")# 管理数据库pythonDb = client.get_database("python") # 通过逻辑库的名字找到listAll = client.list_databases() # 查看所有数据库listNameAll = client.list_database_names() # 查看所有数据库名称client.drop_database("temp") # 删除数据库# 管理集合listAllName = pythonDb.list_collection_names() # 查看集合名称main = pythonDb.get_collection("python_main") # 拿到集合data = main.find_one() # 拿到数据# 新增文档doc1 = { # 定义一个 'name': 'world1', 'age': 20, 'hobby': { 'up': 'ball', 'down': 'run' }}doc2 = { # 定义一个 'name': 'world2', 'age': 20, 'hobby': { 'up': 'ball', 'down': 'run' }}doc3 = { # 定义一个 'name': 'world3', 'age': 20, 'hobby': { 'up': 'ball', 'down': 'run' }}res = main.insert_one(doc1) # 拿到那个集合resId = res.inserted_id # 会返回主键iddocList = [doc2, doc3] # 准备批量插入resList = main.insert_many(docList)resListId = resList.inserted_ids # 列表形式# 查询文档findOne = main.find_one() # 查询一条数据findByMainKey = main.find_one({'_id': ObjectId('61ea4b7af17f38a6ff270a9f')}) # 根据主键进行查询findMany = main.find() # 查询多条findManyByOrder1 = main.find({}, {'name': 1}) # 多条查询的条件查询(只返回name列)findManyByOrder2 = main.find({'age': {'$gt': 19}}, {'name': 1, 'age': 1}) # 多条查询的条件查询 年龄大于19的findPage = main.find().skip(20).limit(10) # 进行分页 跳过前2页取10条数据findOrder1 = main.find().sort('age', pymongo.ASCENDING) # 进行排序findOrder2 = main.find({"name": "world1"}).sort('age', pymongo.ASCENDING) # 条件排序findOrder3 = main.find({"name": "world1"}).sort([('age', pymongo.ASCENDING), ('name', pymongo.ASCENDING)]) # 多列排序findCount = main.count_documents({}) # 文档总数# 更新文档res = main.update_one({}, {'$set': {'age': 19}}) # 更新resUpdate = main.replace_one({}, {"newName": "Hello"})resUpdateMany = main.update_many({}, {'$set': {'age': 19}}) # 批量更新main.find_one_and_update({}, {'$set': {'age': 21}}) # 返回的是文档main.find_one_and_replace({}, {"newName": "Hello World"})# 删除文档resDel = main.delete_one({}) # 根据条件查询并且删除第一条redDelFind = main.find_one_and_delete({}) # 如果删除且返回此文档# resDelMany = main.delete_many({}) # 根据条件删除

ODM操作MongoDb

$ pip install mongoengine

import reimport timefrom enum import Enumfrom mongoengine import connect, document, Embeddeddocument, ValidationErrorfrom mongoengine.queryset.visitor import Qfrom mongoengine.fields import IntField, StringField, EnumField, ListField, EmbeddeddocumentFieldconnect("students", host='mongodb://localhost:27017/students')# 自定义一个验证器def phone_required(value): pattern = r'^1[0-9]{10}$' if not re.search(pattern, value): raise ValidationError('请输出手机号码')class SexChoices(Enum): MEN = '男' WOMEN = '女'class CourseGrade(Embeddeddocument): # 被嵌套的文档 course_name = StringField(max_length=64, required=True, verbose_name='课程名称') teacher = StringField(max_length=16, verbose_name='老师') age = IntField(min_value=0, max_value=100, required=True, verbose_name='分数')class Student(document): stu_no = IntField(required=True, unique=True, verbose_name='学号') stu_name = StringField(required=True, max_length=16, verbose_name='姓名') sex = EnumField(enum=SexChoices, default=SexChoices('男'), verbose_name='性别') class_name = StringField(max_length=10, default="001", verbose_name='班级') address = StringField(max_length=255, default="中国", verbose_name='家庭住址') phone_no = StringField(max_length=11, default="10010000000", verbose_name='电话号码', validation=phone_required) age = IntField(min_value=0, max_value=150, default=18, verbose_name='年龄') # EmbeddeddocumentField为自定义 # 此处的gradeList实现了 [{dict}, {dict}] 的效果 gradeList = ListField(EmbeddeddocumentField(CourseGrade), verbose_name='成绩列表') # grade实现了{dict}的效果 grade = EmbeddeddocumentField(CourseGrade, verbose_name='成绩文档') # gradeArr实现了[x, x, x]的效果 gradeArr = ListField(IntField()) meta = { 'collection': 'students_main', 'ordering': ['-age'] }# 新增数据# todo 构造ODM对象stu_obj = Student(stu_no=time.time(), stu_name="jack", gradeList=[{'course_name': 'math', 'age': 18}], gradeArr=[18, 18], grade={'course_name': 'math', 'age': 18})# todo 验证数据stu_obj.validate()# todo 进行保存res = stu_obj.save() # 可以访问对应的属性# 或者使用createresCreate = Student.objects.create(stu_no=time.time() + 1, stu_name="jack", gradeList=[{'course_name': 'math', 'age': 18}], gradeArr=[18, 18], grade={'course_name': 'math', 'age': 18})# 查询数据first = Student.objects.first() # 查询结构的第一个get = Student.objects.get(id='61ea7bc06af381879c6fa89b') # 根据id查询# noGet = Student.objects.get(id='1') # 不存在会报错# noGet = Student.objects.get(stu_name='jack') # 多个也会报错# 条件查询queryAll = Student.objects.all()queryOrder = Student.objects.filter(age__gt=12) # 年龄大于12的 filed__symbolnestedQueryOrder = Student.objects.filter(grade__age__gte=60) # 嵌套grade.age数据大于60stringQueryOrder = Student.objects.filter(stu_name__startwith="j") # stu_name 开头为 j# 多条件查询querySet1 = Student.objects.filter(Q(age__gt=12) & Q(age__lt=21)) # 同时满足querySet2 = Student.objects.filter(Q(age__gt=12) | Q(age__lt=21)) # 部分满足querySet3 = Student.objects.filter(Q(age__gt=12, sex=SexChoices.MEN) | Q(age__lt=21, sex=SexChoices.WOMEN)) # 部分满足# 聚合统计counts = Student.objects.count()avgAge = Student.objects.all().average('age')sumAge = Student.objects.all().sum('age')order = Student.objects.order_by('+age', '-class_name') # 指定排序sliceData = Student.objects.all()[10:15] # 10,11,12,13,14# 修改数据Student.objects.filter(stu_no=1).update_one(age=20) # 单条修改Student.objects.filter(stu_no=2).update_one(age=20, unset__class_name=True) # unset不要这个字段了stuObj = Student.objects.filter(stu_no=2).first()stuObj.stu_name = 'Rose'stuObj.save() # 获取-》更改-》保存Student.objects.filter(age=18).update(inc__age=1) # 批量修改每个增加一个# 删除数据Student.objects(age__gt=11).delete()

Python操作Redis

$ pip install redis

import redis# 连接# connect = redis.Redis(# host='localhost',# port=6379,# db=1# )pool = redis.ConnectionPool( host='localhost', port=6379, db=1, max_connections=20, decode_responses=True) # 连接池connection = redis.Redis( connection_pool=pool)connection.set('hello', 'world') # 设置connection.close() # 关闭连接# 操作字符串done = connection.set('key', '1') # Trueparam = connection.get('key') # decode_responses解码成功connection.set('key2', 'value2', ex=10) # 10秒钟自动超时noneData = connection.get('None') # Nonewant = { 'key3': 'value3', 'key4': 'value4'}connection.mset(want) # 设置多个键值对gets = connection.mget(['key2', 'key3']) # 获取多个键值对incr = connection.incr('age') # 原子增加delData = connection.delete('key') # 删除0/1# 操作列表connection.lpush('list1', 'a', 'b') # 左插入列表connection.rpush('list2', 'a', 'b') # 插入列表resList = connection.lrange('list1', 0, -1) # 查看但不取出来res = connection.lpop('list2') # 拿出来一个resLength = connection.llen('list1') # 查看长度# 操作散列connection.hset('stu:001', 'name', 'Jack')isExists = connection.hexists('stu:001', 'name') # 是否存在connection.hsetnx('stu:002', 'name', 'Jack') # 不存在就插入connection.hset('stu:003', mapping=want) # 批量插入connection.hdel('stu:002', 'name') # 删除keys = connection.hkeys('stu:001') # 要所有的keysvalue = connection.hvals('stu:001') # 要所有的value# 操作集合connection.sadd('zoo', 'dog', 'cat') # 添加到集合中members = connection.smembers('zoo') # 查看成员isMembers = connection.sismember('zoo', 'monkey') # 是否是connection.srem('zoo', 'dog') # 删除某个成员connection.sadd('anotherZoo', 'dog', 'monkey')diff = connection.sdiff('zoo', 'anotherZoo') # 以第一个为准与后面的差别inter = connection.sinter('zoo', 'anotherZoo') # 交集union = connection.sunion('zoo', 'anotherZoo') # 并集# 操作有序集合rank = { 'member1': 2, 'member2': 4, 'member3': 3, 'member4': 5, 'member5': 1,}connection.zadd('ranking', rank) # 自动排序count = connection.zcount('ranking', 0, 100) # 最小到最大之间connection.zrem('ranking', 'member3') # 移除score = connection.zscore('ranking', 'member2') # 拿到分数memberList = connection.zrange('ranking', 0, 100) # 0~100之间的成员rankList = connection.zrank('ranking', 'member2') # 查看排名

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。