13 #define MEMORY_BUDGET_MB 2048
17 struct meta_index_entry {
18 unsigned pos; // offset in its respective index shard
21 meta_index_entry meta_index[1 << 24];
23 struct sort_buf_entry {
29 bool operator< (const sort_buf_entry& other) const {
30 if (trigram != other.trigram)
31 return trigram > other.trigram;
32 if (file_num != other.file_num)
33 return file_num > other.file_num;
34 return pos > other.pos;
42 int curr_local_trigram, num_this_trigram;
43 meta_index_entry local_meta_index[POSTING_LISTS_PER_SHARD];
46 IndexReader(int index_num, int shard)
50 sprintf(filename, FILENAME_FORMAT, index_num, shard);
51 source_file = fopen(filename, "rb");
52 if (source_file == NULL) {
57 sprintf(filename, META_INDEX_PATTERN, index_num);
58 FILE *meta_index_file = fopen(filename, "rb");
59 if (meta_index_file == NULL) {
64 if (fseek(meta_index_file, shard * POSTING_LISTS_PER_SHARD * sizeof(meta_index_entry), SEEK_SET) != 0) {
68 if (fread(local_meta_index, sizeof(meta_index_entry), POSTING_LISTS_PER_SHARD, meta_index_file) != POSTING_LISTS_PER_SHARD) {
69 perror("fread(meta index)");
72 fclose(meta_index_file);
74 curr_local_trigram = 0;
82 bool read(sort_buf_entry *entry) {
83 if (curr_local_trigram == POSTING_LISTS_PER_SHARD) {
86 while (num_this_trigram == local_meta_index[curr_local_trigram].size) {
88 if (++curr_local_trigram == POSTING_LISTS_PER_SHARD) {
94 entry->trigram = curr_local_trigram + shard * POSTING_LISTS_PER_SHARD;
95 if (fread(&entry->file_num, sizeof(unsigned), 1, source_file) != 1) {
96 printf("curr local trigram=%u num_this=%u local meta index=%u\n", curr_local_trigram, num_this_trigram, local_meta_index[curr_local_trigram].size);
97 perror("fread(file num)");
100 if (fread(&entry->pos, sizeof(unsigned), 1, source_file) != 1) {
101 perror("fread(pos)");
109 void merge_shards(int num_files)
111 for (int shard = 0; shard < NUM_SHARDS; ++shard) {
113 sprintf(filename, FILENAME_FORMAT_GLOBAL, shard);
114 FILE *output = fopen(filename, "wb");
115 if (output == NULL) {
120 fprintf(stderr, "%s...\r", filename);
122 IndexReader *readers[num_files];
123 for (int index_num = 0; index_num < num_files; ++index_num) {
124 readers[index_num] = new IndexReader(index_num, shard);
128 priority_queue<sort_buf_entry> heap;
129 for (int index_num = 0; index_num < num_files; ++index_num) {
130 sort_buf_entry entry;
131 if (readers[index_num]->read(&entry)) {
132 entry.index_num = index_num;
137 int last_trigram = -1;
140 while (!heap.empty()) {
141 const sort_buf_entry& entry = heap.top();
142 int source_index = entry.index_num;
145 if (entry.trigram != last_trigram) {
146 if (last_trigram != -1) {
147 meta_index[last_trigram].pos = pos - size;
148 meta_index[last_trigram].size = size;
151 last_trigram = entry.trigram;
154 posting_list_entry ple;
155 ple.file_num = entry.file_num;
157 if (fwrite(&ple, sizeof(posting_list_entry), 1, output) != 1) {
163 // fetch a new one from this stream
165 sort_buf_entry next_entry;
166 if (readers[source_index]->read(&next_entry)) {
167 next_entry.index_num = source_index;
168 heap.push(next_entry);
172 if (last_trigram != -1) {
173 meta_index[last_trigram].pos = pos - size;
174 meta_index[last_trigram].size = size;
177 for (int index_num = 0; index_num < num_files; ++index_num) {
178 delete readers[index_num];
184 fprintf(stderr, "Writing meta-index... ");
187 strcpy(filename, META_INDEX_PATTERN_GLOBAL);
188 fprintf(stderr, "%s]", filename);
190 FILE *index_file = fopen(filename, "wb");
191 if (index_file == NULL) {
195 if (fwrite(meta_index, sizeof(meta_index_entry), 1 << 24, index_file) != (1 << 24)) {
196 perror("fwrite(meta_index)");
200 fprintf(stderr, "done.\n");
203 int main(int argc, char **argv)
205 merge_shards(atoi(argv[1]));