由于elasticsearch不存在表的概念,其查询结果返回的是json格式的数据,在将数据插入MySQL之前需要将数据解析出来。数据逻辑:读取MySQL数据->解析->存储至MySQL数据库。
创建MySQL数据库连接以及elasticsearch连接from elasticsearch import Elasticsearchimport pandas as pdfrom sqlalchemy import create_engine##创建es连接es = Elasticsearch(hosts = 'host:port',http_auth=('username','password'))##创建mysql连接con_to_mysql = create_engine("mysql+pymysql://username:password@host:port/database",encoding = 'utf-8')
接下来就是从es中获取数据并解析数据了;
获取数据闭并解析这里定义一个函数,实现获取数据并解析数据的功能
def get_data(index,body,scroll): data = es.search(index = index,body = body,scroll = scroll) #获取数据标签 scrollid = data['_scroll_id'] #数据总量 total_num = data['hits']['total'] json_data = data['hits']['hits'] for i in range(round(total_num / 10000) + 1): res = es.scroll(scroll_id=scrollid, scroll='5m') json_data += res['hits']['hits'] data_dataframe = pd.Dataframe(json_data) detail_data = data_dataframe['_source'] detail_Data_Dataframe = pd.Dataframe(list(detail_data)) return detail_Data_Dataframe
因为elasticsearch无法全量查询数据,所以需要创建游标,类似于书签,每次查询10000条数据,es中返回的每条数据可能顺序不一,因此在解析的时候直接将json数据转换成dataframe。
创建主函数,将数据写入MySQL数据库if __name__ == '__main__': body = { "size": 10000, "query": { "match_all": {} }} gas_bottle = get_data('gas-bottle',body,'5m')# Dataframe数据框写入MySQL gas_bottle.to_sql('gas-bottle',con = con_to_mysql,if_exists = 'replace',index = False) es.close() #关闭es连接