A-A+

快速批量向elasticsearch插入数据 Python-elasticsearch-bulk

2019年01月09日 20:21 汪洋大海 暂无评论 共1624字 (阅读5,526 views次)

elasticsearch 如何更快速地导入数据?
利用下面的代码我把aaa.txt中的每一行导入到elasticsearch中,发现速度相比MySQL实在太慢了,不知道是不是因为它默认有索引的缘故。

1
2
3
4
5
6
from elasticsearch import Elasticsearch
 
es=Elasticsearch()
file=open("/home/allen/aaa.txt")
for text in file:
    es.create(index="info",doc_type="line",body={"content":text})

还是认真回复一下吧:

1.用bulk来进行批量插入,不要一条一条插

2.在调用bulk前,修改index.refresh_interval为-1,刷新是成本比较高的操作,如果不改的话,可能在你插入的时候索引刷新会降低插入效率,调用完了记得再改回去

另外,es的问题还是去es的社区(比如elasticsearch.cn)提问能更好的得到回复,不是黑,但sf毕竟是综合社区,以上。

使用elasticsearch内置的bulk API进行批量的插入操作。
同样,python elasticsearch lib也提供了bulk API的功能,因此便有如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import pymysql
import time
 
# 连接ES
es = Elasticsearch(
    ['127.0.0.1'],
    port=9200
)
 
# 连接MySQL
print("Connect to mysql...")
mysql_db = "test"
m_conn = pymysql.connect(host='127.0.0.1, port=3306, user='root', passwd='root', db=mysql_db, charset='utf8')
m_cursor = m_conn.cursor()
 
try:
    num_id = 0
    while True:
        s = time.time()
        # 查询数据
        sql = "select name,age,area from testTable LIMIT {}, 100000".format(num_id*100000)
        # 这里假设查询出来的结果为 张三 26 北京
        m_cursor.execute(sql)
        query_results = m_cursor.fetchall()
 
        if not query_results:
            print("MySQL查询结果为空 num_id=<{}>".format(num_id))
            break
        else:
            actions = []
            for line in query_results:
            # 拼接插入数据结构
                action = {
                    "_index": "company_base_info_2",
                    "_type": "company_info",
                    "_source": {
                        "name": line[0],
                        "age": line[1],
                        "area": line[2],
                    }
                }
				# 形成一个长度与查询结果数量相等的列表
                actions.append(action)
            # 批量插入
            a = helpers.bulk(es, actions)
            e = time.time()
            print("{} {}s".format(a, e-s))
        num_id += 1
 
finally:
    m_cursor.close()
    m_conn.close()
    print("MySQL connection close...")

代码的关键在于构造action结构,放入列表中,给helpers.bulk(es, actions)传参,调用方法真的是很简单了。

文章来源:https://segmentfault.com/q/1010000005027014
https://blog.csdn.net/weixin_39198406/article/details/82983256

布施恩德可便相知重

微信扫一扫打赏

支付宝扫一扫打赏

×

给我留言