Replicating mysql data into elasticsearch

Why

先前使用過的 Mysql 資料轉置,隨著後來爬蟲累積的資料逐漸成長,query開始越變越慢,到最近已經有點受不了了,因此試著用別的方式解決。

Elasticsearch 的 query 簡直太好用,不過目前還在摸索中,因此僅記錄我是如何將資料從 MySQL 丟到Elasticsearch 的(7.0.0)。而由於並不是建置時就有這個需求,因此以下方法適用於一次性的複製而非同步,關於其他方式(例如某個透過binlog的GO版本),請參考How to sync your MySQL data to Elasticsearch

 

MySQL dump data

關於如何從 MySQL 備份資料有各種技巧,最常用的應該是直接使用 mysqldump,另外為了讓 Elasticsearch 讀懂我打算將它輸出成 CSV 再轉成 JSON 。輸出成 SCV

mysqldump DB_NAME --fields-terminated-by=',' --lines-terminated-by='\n' -T DIR_NAME> dump.sql

或是 MySQL 裡:

SELECT * FROM table_name
INTO OUTFILE '/path/to/dump.csv'                                
FIELDS TERMINATED BY ',';

#BTW,這邊遇到一個MySQL輸出的坑,以後有空再補。

 

BUT,為了將這些動作串起來以及少打一點字,後來決定用懶人專用 Pandas ,所以輸出資料就改成了:

import pandas as pd
from sqlalchemy import create_engine

MYSQL_HOST = 'localhost'
MYSQL_USER = 'USER_NAME'
MYSQL_PASSWORD = '*****'
MYSQL_DB = 'DB_NAME'
TABLE_TO_EXPORT = 'TABLE_NAME'
OUTPUT_FILE = 'result.json'


connection = create_engine('mysql+mysqldb://' + MYSQL_USER + ':' + MYSQL_PASSWORD + '@' + MYSQL_HOST + '/' + MYSQL_DB)
# 讓pandas幫我取出資料,一行搞定
data = pd.read_sql_table(TABLE_TO_EXPORT, connection)

這邊當然是是為了寫更簡潔的code才用 pandas 的,跟我懶不懶絕對一點關係都沒有。

 

One-shot replication to Elasticsearch

抓好的 DataFrame 其實透過 to_json() 就可以直接變成 JSON string 了,但是為了餵給 Elasticsearch 吃,必須做一些處理,讓他看得懂。

json_string = data.to_json(orient = 'records', lines = True)

orient = 'records' 指的是以 record 為單位來組成 object ,而 lines = True 可以直接以斷行表示每個 object ,對某些資料處理 piping 很實用(例如 Hadoop or Spark)。

 

轉成 JSON 後,資料應該會長這樣:

{"column1" : "value1", "column2" : "value2", ...}
{"column1" : "value3", "column2" : "value4", ...}
...

為了把它餵給 Elasticsearch_bulk API ,我們必須讓他符合格式:

json_string = re.sub(r'^', '{"index":{}}\n', json_string, flags = re.MULTILINE)

在每筆資料之前需要有指令表明要幹嘛,以這個case來說我們是要做 bulk indexing(這邊可以指定或不指定 index)。

 

接下來剩下的就單純是存檔,以及喂給 Elasticsearch 了:

url = ES_HOST + ':' + ES_PORT + '/' + ES_INDEX + '/_doc/_bulk?pretty&refresh'
data = open(DATA_FILE).read()
headers = {'content-type' : 'application/json'}
r = requests.post(url, headers = headers, data = data)
# the response
print(r.text)