]> git.sesse.net Git - index/blob - mkindex-merger.cpp
Initial checkin for move to Git (no prior version history available).
[index] / mkindex-merger.cpp
1 #include <stdio.h>
2 #include <string.h>
3 #include <stdlib.h>
4 #include <unistd.h>
5 #include <sys/stat.h>
6 #include <sys/types.h>
7 #include <string>
8 #include <vector>
9 #include <algorithm>
10 #include <queue>
11 #include "common.h"
12
13 #define MEMORY_BUDGET_MB 2048
14
15 using namespace std;
16
17 struct meta_index_entry {
18         unsigned pos;   // offset in its respective index shard
19         unsigned size;
20 };
21 meta_index_entry meta_index[1 << 24];
22
23 struct sort_buf_entry {
24         unsigned trigram;
25         unsigned file_num;
26         unsigned pos;
27         unsigned index_num;
28         
29         bool operator< (const sort_buf_entry& other) const {
30                 if (trigram != other.trigram)
31                         return trigram > other.trigram;
32                 if (file_num != other.file_num)
33                         return file_num > other.file_num;
34                 return pos > other.pos;
35         }
36 };
37
38 class IndexReader {
39 private:
40         FILE *source_file;
41         int shard;
42         int curr_local_trigram, num_this_trigram;
43         meta_index_entry local_meta_index[POSTING_LISTS_PER_SHARD];
44
45 public:
46         IndexReader(int index_num, int shard)
47             : shard(shard) {
48                 char filename[256];
49
50                 sprintf(filename, FILENAME_FORMAT, index_num, shard);
51                 source_file = fopen(filename, "rb");
52                 if (source_file == NULL) {
53                         perror(filename);
54                         exit(1);
55                 }
56                 
57                 sprintf(filename, META_INDEX_PATTERN, index_num);
58                 FILE *meta_index_file = fopen(filename, "rb");
59                 if (meta_index_file == NULL) {
60                         perror(filename);
61                         exit(1);
62                 }
63
64                 if (fseek(meta_index_file, shard * POSTING_LISTS_PER_SHARD * sizeof(meta_index_entry), SEEK_SET) != 0) {
65                         perror("fseek");
66                         exit(1);
67                 }
68                 if (fread(local_meta_index, sizeof(meta_index_entry), POSTING_LISTS_PER_SHARD, meta_index_file) != POSTING_LISTS_PER_SHARD) {
69                         perror("fread(meta index)");
70                         exit(1);
71                 }
72                 fclose(meta_index_file);
73
74                 curr_local_trigram = 0;
75                 num_this_trigram = 0;
76         }
77
78         ~IndexReader() {
79                 fclose(source_file);
80         }
81
82         bool read(sort_buf_entry *entry) {
83                 if (curr_local_trigram == POSTING_LISTS_PER_SHARD) {
84                         return false;
85                 }
86                 while (num_this_trigram == local_meta_index[curr_local_trigram].size) {
87                         num_this_trigram = 0;
88                         if (++curr_local_trigram == POSTING_LISTS_PER_SHARD) {
89                                 return false;
90                         }
91                 }
92                 ++num_this_trigram;
93                 
94                 entry->trigram = curr_local_trigram + shard * POSTING_LISTS_PER_SHARD;
95                 if (fread(&entry->file_num, sizeof(unsigned), 1, source_file) != 1) {
96                         printf("curr local trigram=%u  num_this=%u  local meta index=%u\n", curr_local_trigram, num_this_trigram, local_meta_index[curr_local_trigram].size);
97                         perror("fread(file num)");
98                         exit(1);
99                 }
100                 if (fread(&entry->pos, sizeof(unsigned), 1, source_file) != 1) {
101                         perror("fread(pos)");
102                         exit(1);
103                 }
104
105                 return true;
106         }
107 };
108
109 void merge_shards(int num_files)
110 {
111         for (int shard = 0; shard < NUM_SHARDS; ++shard) {
112                 char filename[256];
113                 sprintf(filename, FILENAME_FORMAT_GLOBAL, shard);
114                 FILE *output = fopen(filename, "wb");
115                 if (output == NULL) {
116                         perror(filename);
117                         exit(1);
118                 }
119
120                 fprintf(stderr, "%s...\r", filename);
121
122                 IndexReader *readers[num_files];
123                 for (int index_num = 0; index_num < num_files; ++index_num) {
124                         readers[index_num] = new IndexReader(index_num, shard);
125                 }
126
127                 // pre-fill the heap
128                 priority_queue<sort_buf_entry> heap;
129                 for (int index_num = 0; index_num < num_files; ++index_num) {
130                         sort_buf_entry entry;
131                         if (readers[index_num]->read(&entry)) {
132                                 entry.index_num = index_num;
133                                 heap.push(entry);
134                         }
135                 }
136
137                 int last_trigram = -1;
138                 int pos = 0;
139                 int size = 0;
140                 while (!heap.empty()) {
141                         const sort_buf_entry& entry = heap.top();
142                         int source_index = entry.index_num;
143
144                         // output the entry
145                         if (entry.trigram != last_trigram) {
146                                 if (last_trigram != -1) {
147                                         meta_index[last_trigram].pos = pos - size;
148                                         meta_index[last_trigram].size = size;
149                                         size = 0;
150                                 }
151                                 last_trigram = entry.trigram;
152                         }
153
154                         posting_list_entry ple;
155                         ple.file_num = entry.file_num;
156                         ple.pos = entry.pos;
157                         if (fwrite(&ple, sizeof(posting_list_entry), 1, output) != 1) {
158                                 perror("fwrite");
159                                 exit(1);
160                         }
161                         ++pos, ++size;
162
163                         // fetch a new one from this stream
164                         heap.pop();
165                         sort_buf_entry next_entry;
166                         if (readers[source_index]->read(&next_entry)) {
167                                 next_entry.index_num = source_index;
168                                 heap.push(next_entry);
169                         }
170                 }
171
172                 if (last_trigram != -1) {
173                         meta_index[last_trigram].pos = pos - size;
174                         meta_index[last_trigram].size = size;
175                 }
176
177                 for (int index_num = 0; index_num < num_files; ++index_num) {
178                         delete readers[index_num];
179                 }
180
181                 fclose(output);
182         }
183                 
184         fprintf(stderr, "Writing meta-index... ");
185         
186         char filename[256];
187         strcpy(filename, META_INDEX_PATTERN_GLOBAL);
188         fprintf(stderr, "%s]", filename);
189
190         FILE *index_file = fopen(filename, "wb");
191         if (index_file == NULL) {
192                 perror(filename);
193                 exit(1);
194         }
195         if (fwrite(meta_index, sizeof(meta_index_entry), 1 << 24, index_file) != (1 << 24)) {
196                 perror("fwrite(meta_index)");
197                 exit(1);
198         }
199         fclose(index_file);
200         fprintf(stderr, "done.\n");
201 }
202
203 int main(int argc, char **argv)
204 {
205         merge_shards(atoi(argv[1]));
206 }