之前文章有講到借助:fingerprint filter 插件實現(xiàn)去重,。
近期又有社群小伙伴問到同樣的問題,,這里一并梳理回復(fù)如下。
1,、python 腳本實現(xiàn)文檔去重
這里講的實現(xiàn),,借助 python 腳本實現(xiàn)。
由于涉及 8.X 版本 Elasticsearch 以安全方式的連接,,這里需要 python 升級到 3.10+ 版本才可以,。
1.1 實現(xiàn)前提
標(biāo)定文檔重復(fù)標(biāo)記——一般文檔中幾個字段或者全部字段重復(fù),才認(rèn)為文檔是一樣的,。
業(yè)務(wù)層面自己指定就可用 md5
值實現(xiàn),。
對于新聞類類線上業(yè)務(wù)的文檔舉例:
https://3g.163.com/news/article/H5APDMGH00019UD6.html
https://news.sina.com.cn/sx/2022-04-19/detail-imcwiwst2726757.shtml
如果拿文章標(biāo)題(title) + 正文內(nèi)容(content)內(nèi)容組合取 md5,然后對比的話,,兩者發(fā)布網(wǎng)站不同,,但內(nèi)容可以認(rèn)為是重復(fù)的。
1.2 實現(xiàn)原理
Step 1:scan遍歷全部文檔,,生成文檔 md5,。
Step2:生成字典,字典兩部分組成,,md5 值是 key,,value 是一個數(shù)組,里面存的是文檔id,。
Step3:遍歷字典的value部分大于1的值,,就代表存在重復(fù)文檔。
2,、實現(xiàn)代碼
#!/usr/local/bin/python3
from elasticsearch import Elasticsearch, helpers
import hashlib
import ssl
# 全局變量
ES_HOST = 'https://192.168.1.10:9200'
ES_USER = 'elastic'
ES_PASSWORD = '9Z=T2wOWIXXXXXXXX'
CERT_FINGERPRINT = "a4d0fe1eb7e1fd9874XXXXXXXXXX"
# 全局詞典
dict_of_duplicate_docs = {}
# https://www./guide/en/elasticsearch/client/python-api/current/config.html
# 要求python 版本3.10及以上
# fingerprint 生成方式,,方式一:Elasticsearch 首次啟動的時候自動生成,。
# 方式二:借助命令行再次生成(結(jié)果同方式一)
# bash-4.2$ openssl x509 -fingerprint -sha256 -in config/certs/http_ca.crt
# SHA256 Fingerprint=A4:D0:FE:1E:B7:E1:FD:98:74:A4:10:6F:E1:XXXXXXXX
def es_init():
es = Elasticsearch( ES_HOST,
ssl_assert_fingerprint=CERT_FINGERPRINT,
basic_auth=(ES_USER, ES_PASSWORD),
verify_certs=False )
return es
# 對每個文檔生成唯一id(自定義生成)
def populate_dict_of_duplicate_docs(hit):
combined_key = ""
# 三個字段決定索引是否重復(fù),自動是根據(jù)業(yè)務(wù)指定的
keys_to_include_in_hash = ["CAC", "FTSE", "SMI"]
for mykey in keys_to_include_in_hash:
combined_key += str(hit['_source'][mykey])
_id = hit["_id"]
# 基于三個字段的組合,,生成md5值,,便于后續(xù)去重用。
hashval = hashlib.md5(combined_key.encode('utf-8')).digest()
# 生成鍵值對詞典,key使用md5值,,value 為數(shù)組類型,。
# 相同的 key 值也就是相同的文檔,value 是文檔id列表
dict_of_duplicate_docs.setdefault(hashval, []).append(_id)
print(dict_of_duplicate_docs)
# 待去重索引遍歷
def scroll_over_all_docs():
es = es_init()
for hit in helpers.scan(es, index='stocks'):
populate_dict_of_duplicate_docs(hit)
# 去重處理函數(shù)
def loop_over_hashes_and_remove_duplicates():
es = es_init()
for hashval, array_of_ids in dict_of_duplicate_docs.items():
# 對id列表長度大于1的做去重處理操作
if len(array_of_ids) > 1:
print(" Duplicate docs hash=%s ****" % hashval)
# 獲取相同的文檔
matching_docs = es.mget(index="stocks", ids= array_of_ids[0:len(array_of_ids)-1])
for doc in matching_docs['docs']:
print("doc=%s\n" % doc)
es.delete(index="stocks", id = doc['_id'])
def main():
scroll_over_all_docs()
loop_over_hashes_and_remove_duplicates()
main()
代碼的核心:
使用了 8.X 版本的 Elasticsearch 訪問方式,。借助:fingerprint 訪問實現(xiàn),。
fingerprint 兩種獲取方式:
方式一:Elasticsearch 啟動的時候已經(jīng)包含。
方式二:可以借助命令行再生成,。
openssl x509 -fingerprint -sha256 -in config/certs/http_ca.crt
3,、小結(jié)
文章給出 8.X 版本實現(xiàn)文檔去重的完整思路和Python 代碼實現(xiàn),加上之前講解的 logstash fingerprint filter 插件實現(xiàn)去重實現(xiàn),,共2種方案解決文檔重復(fù)問題,。
你的項目實戰(zhàn)環(huán)節(jié)有沒有遇到文檔去重問題、刪除重復(fù)文檔問題,?如何解決的,?歡迎留言交流。
參考
https://github.com/deric/es-dedupe/blob/master/esdedupe/esdedupe.py
https://github.com/alexander-marquardt/deduplicate-elasticsearch
https:///2018/07/23/deduplicating-documents-in-elasticsearch/