欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

利用python将elasticsearch的数据存储至MySQL

时间:2023-06-11

由于elasticsearch不存在表的概念,其查询结果返回的是json格式的数据,在将数据插入MySQL之前需要将数据解析出来。数据逻辑:读取MySQL数据->解析->存储至MySQL数据库。

创建MySQL数据库连接以及elasticsearch连接

from elasticsearch import Elasticsearchimport pandas as pdfrom sqlalchemy import create_engine##创建es连接es = Elasticsearch(hosts = 'host:port',http_auth=('username','password'))##创建mysql连接con_to_mysql = create_engine("mysql+pymysql://username:password@host:port/database",encoding = 'utf-8')

接下来就是从es中获取数据并解析数据了;

获取数据闭并解析

这里定义一个函数,实现获取数据并解析数据的功能

def get_data(index,body,scroll): data = es.search(index = index,body = body,scroll = scroll) #获取数据标签 scrollid = data['_scroll_id'] #数据总量 total_num = data['hits']['total'] json_data = data['hits']['hits'] for i in range(round(total_num / 10000) + 1): res = es.scroll(scroll_id=scrollid, scroll='5m') json_data += res['hits']['hits'] data_dataframe = pd.Dataframe(json_data) detail_data = data_dataframe['_source'] detail_Data_Dataframe = pd.Dataframe(list(detail_data)) return detail_Data_Dataframe

因为elasticsearch无法全量查询数据,所以需要创建游标,类似于书签,每次查询10000条数据,es中返回的每条数据可能顺序不一,因此在解析的时候直接将json数据转换成dataframe。

创建主函数,将数据写入MySQL数据库

if __name__ == '__main__': body = { "size": 10000, "query": { "match_all": {} }} gas_bottle = get_data('gas-bottle',body,'5m')# Dataframe数据框写入MySQL gas_bottle.to_sql('gas-bottle',con = con_to_mysql,if_exists = 'replace',index = False) es.close() #关闭es连接

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。