倒排索引的分布式存储

倒排索引又叫反向索引

背景

索引数据的规模为TB级。TB相当于1 000 GB,一个1 000 GB的文件是不可想象的。因此将全部索引文件存放在一台主机上,不仅是不合适的,而且是不安全的。这样一旦这个倒排文件损坏,全部服务就会受到很大影响,因此倒排索引的分布式存储技术应运而生了。

大数据遇到的问题

单机的瓶颈

  • 存储:索引数据大
  • 网络:传输瓶颈(网络负载),尽量减少网络开销
  • 磁盘I/O:

多机需要解决的问题 (集群或分布式)

  • 数据倾斜问题
  • 可靠性
  • 网络
  • 查询速度,memory db?如何

策略

如何解决以上各种问题?

  • 没什么特别好的办法…就是各种切分索引,然后把结果合并之类的

常见操作

  • 重建索引 周期性重建索引
  • 基于主索引的前提下,构建辅助索引,用于储存新文档,维护于内存中,当辅助索引达到一定的内存占用时,写入磁盘与主索引进行合并;

种切分索引,然后把结果合并之类的

服务功能的分布式拆分

  • 尽量减少网络开销
  • 各个子服务应该是无状态的
  • 每个子服务都应该是可横向扩展的

分布式 VS 集群

数据的分布式拆分

  • 搜索引擎索引分片
  • Log

参考 https://juejin.im/entry/58abf9432f301e006bdbc373

常见疑问

切分索引

多机分布式索引一般按照文档编号(docId)或者按照索引词编号(wordId)进行划分。按照DocId划分的结果称为局部倒排文件(Local inverted file);按照WordId划分的结果称为全局倒排文件(Global inverted file)

1
2
3
4
5
6
7
apple -> 1, 13, 24, 33, 46, 52, 77
banana -> 4, 8, 33, 34, 52, 66, 88
grapes -> 7, 22, 46, 77, 89
pineapple -> 15, 37, 52
delicious -> 24, 34, 46, 77, 89
rotten -> 8, 66
exotic -> 37

按照docid来切分,比如1-100101-200分在不同的服务器。

按照wordid来切分,比如apple banana分在不同的服务器。

group_by term (全局方案) index (局部倒排文件)
index的获取 并行度=term数 并行度不限
网络负载 单点压力大 分布式结点分担网络负载
磁盘IO 节约磁盘I/O (如果只检索一个单词,那么只需要在一个索引结点中检索即可)
可靠性 单点故障很危险 单点故障影响不大

索引的存储结果我们人为能看到的就是segment文件,其实索引文件segment的下层结构就是field域(类似于数据库里面的列名,但是这两个概念区别还是蛮大的,只是拿过来类比),对于每一个field里面存储的就是倒排文件,而我们进行查询的过程时,为了加快查询效率就会制定field域去查询,对于每一个term来说会·去查找字典的一种结构(现在存储结构有FST(英文字典存储结构),前缀树等),因为字典是已经排好序的了,所以这里只需要进行二分查找就可以了,对于每一个term查找到的倒排链进行交集或者并集的合并,在合并的过程若要是按照文本相关性排序(不指定排序股则),就会在合并的过程中会进行相关的score分数计算(例如BM25,或者TF-IDF等一些算法),计算出来的文档会存储在一个top-N的小根堆里面,最后返回给用户。对于倒排链的合并过程交集是一个比较消耗性能的操作,比如lucene对于OR操作的优化比较多,比如说把现在N条倒排链按照长度排序(短的文档在前,长的在后),然后分成两组最短的1条一个组,剩下的N-1条一组,然后对于这两个组进行合并。在OR的合并过程中,可以指定最少有几个term满足要求,这样在前N-1中要是没有满足要求,这样最后一条就不需要在进行合并了。

##

倒排索引,当有100台server,要把索引表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/*
An inverted index is used to support text search. The inverted index has a record for each term. Each record has the list of documents that the term appears in. Documents are identified by an integer document ID. The list of document IDs is sorted in ascending order. For the purpose of this problem, assume that the only operation performed on the inverted index is intersection to find the documents that contain all terms in the search query.
For example, the inverted index could have the following data.
Term
Document IDs

apple -> 1, 13, 24, 33, 46, 52, 77
banana -> 4, 8, 33, 34, 52, 66, 88
grapes -> 7, 22, 46, 77, 89
pineapple -> 15, 37, 52
delicious -> 24, 34, 46, 77, 89
rotten -> 8, 66
exotic -> 37

Expected results from intersections are as follows:
Terms intersected
Document IDs with all terms

delicious apple -> 24, 46, 77
delicious apple grapes -> 46, 77
apple banana -> 33, 52
We have an inverted index that is very large and requires N servers to "fit". Assume N is 100.
*/




/* This class will be given a list of words (such as might be tokenized
* from a paragraph of text), and will provide a method that takes two
* words and returns the shortest distance (in words) between those two
* words in the provided text.
* Example:
* WordDistanceFinder finder = new WordDistanceFinder(Arrays.asList("the", "quick", "brown", "fox", "quick"));
* assert(finder.distance("fox", "the") == 3);
* assert(finder.distance("quick", "fox") == 1);
*
* "quick" appears twice in the input. There are two possible distance values for "quick" and "fox":
* (3 - 1) = 2 and (4 - 3) = 1.
* Since we have to return the shortest distance between the two words we return 1.
*/
public class WordDistanceFinder {
public WordDistanceFinder (List<String> words) {
Map<String, List<Integer>> map = new HashMap<String, List<Integer>>();
for(int i = 0; i < words.size(); i++) {
if(!map.contains(word))
map.put(word, new ArrayList<Integer>());
map.get(word).add(i);
}
}

public int distance (String wordOne, String wordTwo) {
List<Integer> index1 = map.get(wordOne);
List<integer> index2 = map.get(wordTwo);
int i = 0; j = 0;
int min_distance = map.size();
while(i < index1.size() && j < index2.size()) {
ind1 = index1[i];
ind2 = index[j];
current_distance = math.abs(ind1 - ind2);
min_distance = current_distance>min_distance?min_distance:current_distance;

if(ind1 < ind2) {
i++;
} else {
j++;
}
}
return min_distance;
// implementation here
}
}