From e310d4a37f23c835c40ecaea1994490ef1c1f147 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Tue, 22 Jan 2013 17:17:48 +0100 Subject: [PATCH 1/1] Initial checkin for move to Git (no prior version history available). --- common.h | 12 +++ lookup.cpp | 154 ++++++++++++++++++++++++++++++ mkindex-merger.cpp | 206 ++++++++++++++++++++++++++++++++++++++++ mkindex-sorter.cpp | 179 +++++++++++++++++++++++++++++++++++ mkindex.cpp | 228 +++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 779 insertions(+) create mode 100644 common.h create mode 100644 lookup.cpp create mode 100644 mkindex-merger.cpp create mode 100644 mkindex-sorter.cpp create mode 100644 mkindex.cpp diff --git a/common.h b/common.h new file mode 100644 index 0000000..a2e37be --- /dev/null +++ b/common.h @@ -0,0 +1,12 @@ +#define POSTING_LISTS_PER_SHARD 65536 +#define NUM_SHARDS ((1 << 24) / POSTING_LISTS_PER_SHARD) +#define DIRNAME_FORMAT "index%d" +#define FILENAME_FORMAT_GLOBAL "index/%02x" +#define FILENAME_FORMAT "index%d/%02x" +#define META_INDEX_PATTERN "index%d/meta" +#define META_INDEX_PATTERN_GLOBAL "index/meta" + +struct posting_list_entry { + unsigned file_num; + unsigned pos; +}; diff --git a/lookup.cpp b/lookup.cpp new file mode 100644 index 0000000..db26aee --- /dev/null +++ b/lookup.cpp @@ -0,0 +1,154 @@ +#include +#include +#include +#include "common.h" + +using namespace std; + +struct meta_index_entry { + unsigned pos; // offset in its respective index shard + unsigned size; +}; +meta_index_entry meta_index[1 << 24]; + +void read_meta_index() +{ + fprintf(stderr, "Reading meta-index... "); + + FILE *index_file = fopen("index/meta", "rb"); + if (index_file == NULL) { + perror("index/meta"); + exit(1); + } + + if (fread(meta_index, sizeof(meta_index_entry), 1 << 24, index_file) != (1 << 24)) { + perror("fread"); + exit(1); + } + + fprintf(stderr, "done.\n"); +} + +vector read_posting_list(unsigned trigram) +{ + int shard_num = trigram / POSTING_LISTS_PER_SHARD; + char filename[256]; + sprintf(filename, FILENAME_FORMAT_GLOBAL, shard_num); + FILE *index_shard = fopen(filename, "rb"); + if (index_shard == NULL) { + perror(filename); + exit(1); + } + + int pos = meta_index[trigram].pos * sizeof(posting_list_entry); + if (fseek(index_shard, pos, SEEK_SET) != 0) { + perror("fseek"); + exit(1); + } + + int size = meta_index[trigram].size; + vector ret(size); + for (int i = 0; i < size; ++i) { + if (fread(&ret[i], sizeof(posting_list_entry), 1, index_shard) != 1) { + perror("fread"); + exit(1); + } + } + + fclose(index_shard); + return ret; +} + +vector intersect_posting_lists( + const vector& first, + const vector& second, + int relative_pos) +{ + vector ret; + vector::const_iterator first_it = first.begin(); + vector::const_iterator second_it = second.begin(); + + while (first_it != first.end() && second_it != second.end()) { + if (first_it->file_num == second_it->file_num && + first_it->pos + relative_pos == second_it->pos) { + ret.push_back(*first_it); + ++first_it; + ++second_it; + continue; + } + if (first_it->file_num < second_it->file_num) { + ++first_it; + continue; + } + if (first_it->file_num > second_it->file_num) { + ++second_it; + continue; + } + if (first_it->pos + relative_pos < second_it->pos) { + ++first_it; + } else { + ++second_it; + } + } + + return ret; +} + +void print_posting_list(const vector& pl) +{ + for (int i = 0; i < pl.size(); ++i) { + fprintf(stderr, "[file %u pos %u] ", pl[i].file_num, pl[i].pos); + } + fprintf(stderr, "\n"); +} + +void lookup(unsigned char *needle, unsigned needle_len) +{ + // trigram and relative position + vector > trigrams; + for (int i = 0; i < needle_len - 2; i += 3) { + unsigned trigram = + (unsigned(needle[i]) << 16) | + (unsigned(needle[i + 1]) << 8) | + unsigned(needle[i + 2]); + trigrams.push_back(make_pair(trigram, i)); + } + if (needle_len % 3 != 0) { + int i = needle_len - 3; + unsigned trigram = + (unsigned(needle[i]) << 16) | + (unsigned(needle[i + 1]) << 8) | + unsigned(needle[i + 2]); + trigrams.push_back(make_pair(trigram, i)); + } + + fprintf(stderr, "Reading posting list for %06x... ", trigrams[0].first); + vector curr_posting_list = read_posting_list(trigrams[0].first); + fprintf(stderr, "%u entries.\n", curr_posting_list.size()); + + for (unsigned i = 1; i < trigrams.size(); ++i) { + fprintf(stderr, "Filtering by posting list for %06x (pos %u)... ", trigrams[i].first, trigrams[i].second); + vector filter_posting_list = read_posting_list(trigrams[i].first); + curr_posting_list = intersect_posting_lists(curr_posting_list, filter_posting_list, trigrams[i].second); + fprintf(stderr, "%u entries.\n", curr_posting_list.size()); + } + + print_posting_list(curr_posting_list); +} + +int main(int argc, char **argv) +{ + read_meta_index(); + + unsigned char ch1[] = { 0x12, 0x9e, 0x10, 0x11, 0x35 }; + lookup(ch1, 5); + + unsigned char ch2[] = { 0x7c, 0x9e, 0x7a, 0x89, 0x9d, 0xbc, 0xda, 0x59, 0xb0, 0x03, 0x0a, 0xba, 0x4e, 0xbc }; + lookup(ch2, 14); + + unsigned char ch3[] = { 0x0d, 0x91, 0x1f, 0xae, 0x2a, 0xc1, 0x84, 0xac, 0x92, 0xfd, 0x69, 0x1f, 0x95, 0xcf }; + lookup(ch3, 14); + + unsigned char ch4[] = { 0x25, 0x00, 0xc6, 0x4f, 0x60, 0xcf, 0xc4 }; + lookup(ch4, 7); +} diff --git a/mkindex-merger.cpp b/mkindex-merger.cpp new file mode 100644 index 0000000..7f88666 --- /dev/null +++ b/mkindex-merger.cpp @@ -0,0 +1,206 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "common.h" + +#define MEMORY_BUDGET_MB 2048 + +using namespace std; + +struct meta_index_entry { + unsigned pos; // offset in its respective index shard + unsigned size; +}; +meta_index_entry meta_index[1 << 24]; + +struct sort_buf_entry { + unsigned trigram; + unsigned file_num; + unsigned pos; + unsigned index_num; + + bool operator< (const sort_buf_entry& other) const { + if (trigram != other.trigram) + return trigram > other.trigram; + if (file_num != other.file_num) + return file_num > other.file_num; + return pos > other.pos; + } +}; + +class IndexReader { +private: + FILE *source_file; + int shard; + int curr_local_trigram, num_this_trigram; + meta_index_entry local_meta_index[POSTING_LISTS_PER_SHARD]; + +public: + IndexReader(int index_num, int shard) + : shard(shard) { + char filename[256]; + + sprintf(filename, FILENAME_FORMAT, index_num, shard); + source_file = fopen(filename, "rb"); + if (source_file == NULL) { + perror(filename); + exit(1); + } + + sprintf(filename, META_INDEX_PATTERN, index_num); + FILE *meta_index_file = fopen(filename, "rb"); + if (meta_index_file == NULL) { + perror(filename); + exit(1); + } + + if (fseek(meta_index_file, shard * POSTING_LISTS_PER_SHARD * sizeof(meta_index_entry), SEEK_SET) != 0) { + perror("fseek"); + exit(1); + } + if (fread(local_meta_index, sizeof(meta_index_entry), POSTING_LISTS_PER_SHARD, meta_index_file) != POSTING_LISTS_PER_SHARD) { + perror("fread(meta index)"); + exit(1); + } + fclose(meta_index_file); + + curr_local_trigram = 0; + num_this_trigram = 0; + } + + ~IndexReader() { + fclose(source_file); + } + + bool read(sort_buf_entry *entry) { + if (curr_local_trigram == POSTING_LISTS_PER_SHARD) { + return false; + } + while (num_this_trigram == local_meta_index[curr_local_trigram].size) { + num_this_trigram = 0; + if (++curr_local_trigram == POSTING_LISTS_PER_SHARD) { + return false; + } + } + ++num_this_trigram; + + entry->trigram = curr_local_trigram + shard * POSTING_LISTS_PER_SHARD; + if (fread(&entry->file_num, sizeof(unsigned), 1, source_file) != 1) { + printf("curr local trigram=%u num_this=%u local meta index=%u\n", curr_local_trigram, num_this_trigram, local_meta_index[curr_local_trigram].size); + perror("fread(file num)"); + exit(1); + } + if (fread(&entry->pos, sizeof(unsigned), 1, source_file) != 1) { + perror("fread(pos)"); + exit(1); + } + + return true; + } +}; + +void merge_shards(int num_files) +{ + for (int shard = 0; shard < NUM_SHARDS; ++shard) { + char filename[256]; + sprintf(filename, FILENAME_FORMAT_GLOBAL, shard); + FILE *output = fopen(filename, "wb"); + if (output == NULL) { + perror(filename); + exit(1); + } + + fprintf(stderr, "%s...\r", filename); + + IndexReader *readers[num_files]; + for (int index_num = 0; index_num < num_files; ++index_num) { + readers[index_num] = new IndexReader(index_num, shard); + } + + // pre-fill the heap + priority_queue heap; + for (int index_num = 0; index_num < num_files; ++index_num) { + sort_buf_entry entry; + if (readers[index_num]->read(&entry)) { + entry.index_num = index_num; + heap.push(entry); + } + } + + int last_trigram = -1; + int pos = 0; + int size = 0; + while (!heap.empty()) { + const sort_buf_entry& entry = heap.top(); + int source_index = entry.index_num; + + // output the entry + if (entry.trigram != last_trigram) { + if (last_trigram != -1) { + meta_index[last_trigram].pos = pos - size; + meta_index[last_trigram].size = size; + size = 0; + } + last_trigram = entry.trigram; + } + + posting_list_entry ple; + ple.file_num = entry.file_num; + ple.pos = entry.pos; + if (fwrite(&ple, sizeof(posting_list_entry), 1, output) != 1) { + perror("fwrite"); + exit(1); + } + ++pos, ++size; + + // fetch a new one from this stream + heap.pop(); + sort_buf_entry next_entry; + if (readers[source_index]->read(&next_entry)) { + next_entry.index_num = source_index; + heap.push(next_entry); + } + } + + if (last_trigram != -1) { + meta_index[last_trigram].pos = pos - size; + meta_index[last_trigram].size = size; + } + + for (int index_num = 0; index_num < num_files; ++index_num) { + delete readers[index_num]; + } + + fclose(output); + } + + fprintf(stderr, "Writing meta-index... "); + + char filename[256]; + strcpy(filename, META_INDEX_PATTERN_GLOBAL); + fprintf(stderr, "%s]", filename); + + FILE *index_file = fopen(filename, "wb"); + if (index_file == NULL) { + perror(filename); + exit(1); + } + if (fwrite(meta_index, sizeof(meta_index_entry), 1 << 24, index_file) != (1 << 24)) { + perror("fwrite(meta_index)"); + exit(1); + } + fclose(index_file); + fprintf(stderr, "done.\n"); +} + +int main(int argc, char **argv) +{ + merge_shards(atoi(argv[1])); +} diff --git a/mkindex-sorter.cpp b/mkindex-sorter.cpp new file mode 100644 index 0000000..f2487a5 --- /dev/null +++ b/mkindex-sorter.cpp @@ -0,0 +1,179 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "common.h" + +#define MEMORY_BUDGET_MB 32 + +using namespace std; + +struct meta_index_entry { + unsigned pos; // offset in its respective index shard + unsigned size; +}; +meta_index_entry meta_index[1 << 24]; + +struct sort_buf_entry { + unsigned trigram; + unsigned file_num; + unsigned pos; + + bool operator< (const sort_buf_entry& other) const { + // if (trigram != other.trigram) + return trigram < other.trigram; +#if 0 + if (file_num != other.file_num) + return file_num < other.file_num; + return pos < other.pos; +#endif + } +}; +sort_buf_entry *sort_buf = NULL; + +unsigned long long num_entries_used = 0; +unsigned long long num_entries_room; +unsigned index_num = 0; + +void write_index() +{ + fprintf(stderr, "[sorting"); + stable_sort(sort_buf, sort_buf + num_entries_used); + fprintf(stderr, "]"); + + fprintf(stderr, "[writing "); + + char dirname[256]; + sprintf(dirname, DIRNAME_FORMAT, index_num); + mkdir(dirname, 0700); // no checking + + int curr_shard_open = -1; + int last_trigram = -1; + int pos = 0; + int size = 0; + FILE *index_shard = NULL; + for (unsigned long long i = 0; i < num_entries_used; ++i) { + const sort_buf_entry& entry = sort_buf[i]; + + int shard = entry.trigram / POSTING_LISTS_PER_SHARD; + if (entry.trigram != last_trigram) { + if (last_trigram != -1) { + meta_index[last_trigram].pos = pos - size; + meta_index[last_trigram].size = size; + size = 0; + } + last_trigram = entry.trigram; + } + if (shard != curr_shard_open) { + if (index_shard != NULL) { + fclose(index_shard); + } + + char filename[256]; + sprintf(filename, FILENAME_FORMAT, index_num, shard); + fprintf(stderr, " %s", filename); + for (unsigned j = 0; j < strlen(filename) + 1; ++j) { + fprintf(stderr, "\b"); + } + + index_shard = fopen(filename, "wb"); + if (index_shard == NULL) { + perror(filename); + exit(1); + } + + curr_shard_open = shard; + pos = 0; + assert(size == 0); + } + + posting_list_entry ple; + ple.file_num = entry.file_num; + ple.pos = entry.pos; + if (fwrite(&ple, sizeof(posting_list_entry), 1, index_shard) != 1) { + perror("fwrite"); + exit(1); + } + ++pos, ++size; + } + + if (last_trigram != -1) { + meta_index[last_trigram].pos = pos - size; + meta_index[last_trigram].size = size; + } + if (index_shard != NULL) { + fclose(index_shard); + } + + char filename[256]; + sprintf(filename, META_INDEX_PATTERN, index_num); + fprintf(stderr, "%s]", filename); + + FILE *index_file = fopen(filename, "wb"); + if (index_file == NULL) { + perror(filename); + exit(1); + } + if (fwrite(meta_index, sizeof(meta_index_entry), 1 << 24, index_file) != (1 << 24)) { + perror("fwrite(meta_index)"); + exit(1); + } + fclose(index_file); + + ++index_num; + + memset(meta_index, 0, sizeof(meta_index)); + num_entries_used = 0; +} + +void index_file(const char *filename, int file_num) +{ + fprintf(stderr, "%s... ", filename); + + FILE *file = fopen(filename, "rb"); + if (file == NULL) { + perror(filename); + exit(1); + } + + unsigned trigram = 0; + unsigned pos = 0; + while (!feof(file)) { + int ch = getc(file); + ++pos; + if (ch == -1) { + break; + } + + trigram = ((trigram << 8) | ch) & 0xffffff; + + sort_buf[num_entries_used].trigram = trigram; + sort_buf[num_entries_used].file_num = file_num; + sort_buf[num_entries_used].pos = pos - 2; + + if (++num_entries_used >= num_entries_room) { + fprintf(stderr, "[index flush: "); + write_index(); + fprintf(stderr, "] "); + } + } + + fprintf(stderr, "%u bytes. [%llu MB RAM used]\n", pos, num_entries_used * sizeof(sort_buf_entry) / 1048576ULL); + fclose(file); +} + +int main(int argc, char **argv) +{ + num_entries_room = MEMORY_BUDGET_MB * 1048576ULL / sizeof(sort_buf_entry); + sort_buf = new sort_buf_entry[num_entries_room]; + + for (int i = 1; i < argc; ++i) { + index_file(argv[i], i - 1); + } + write_index(); +} diff --git a/mkindex.cpp b/mkindex.cpp new file mode 100644 index 0000000..fe1cd51 --- /dev/null +++ b/mkindex.cpp @@ -0,0 +1,228 @@ +#include +#include +#include +#include +#include +#include "common.h" + +#define MEMORY_BUDGET_MB 2048 + +using namespace std; + +struct meta_index_entry { + unsigned pos; // offset in its respective index shard + unsigned size; +}; +meta_index_entry meta_index[1 << 24]; + +struct posting_list_entry { + unsigned file_num; + unsigned pos; + + bool operator< (const posting_list_entry& other) const { + if (file_num != other.file_num) + return file_num < other.file_num; + return pos < other.pos; + } +}; +vector posting_lists[1 << 24]; +unsigned long long num_entries_used = 0; + +vector union_posting_lists( + const vector& first, + const vector& second) +{ + vector ret(first.size() + second.size()); + vector::const_iterator first_it = first.begin(); + vector::const_iterator second_it = second.begin(); + + while (first_it != first.end() && second_it != second.end()) { + if (first_it == first.end()) { + ret.push_back(*second_it++); + } else if (second_it == second.end()) { + ret.push_back(*first_it++); + continue; + } else if (*first_it < *second_it) { + ret.push_back(*first_it++); + } else { + ret.push_back(*second_it++); + } + } + + return ret; +} + +void write_index() +{ + for (int i = 0; i < NUM_SHARDS; ++i) { + int pos = 0; + + char filename[256]; + sprintf(filename, FILENAME_FORMAT, i); + fprintf(stderr, "%s...\r", filename); + FILE *index_shard = fopen(filename, "wb"); + if (index_shard == NULL) { + perror(filename); + exit(1); + } + for (int j = 0; j < POSTING_LISTS_PER_SHARD; ++j) { + unsigned trigram = (i * POSTING_LISTS_PER_SHARD) + j; + + for (int k = 0; k < posting_lists[trigram].size(); ++k) { + if (fwrite(&posting_lists[trigram][k], sizeof(posting_list_entry), 1, index_shard) != 1) { + perror("fwrite"); + exit(1); + } + } + unsigned size = posting_lists[trigram].size(); + meta_index[trigram].pos = pos; + meta_index[trigram].size = size; + pos += size; + } + fclose(index_shard); + + } + + fprintf(stderr, "%s...\r", META_INDEX_NAME); + + FILE *index_file = fopen("index/meta", "wb"); + if (index_file == NULL) { + perror("index/meta"); + exit(1); + } + if (fwrite(meta_index, sizeof(meta_index_entry), 1 << 24, index_file) != (1 << 24)) { + perror("fwrite(meta_index)"); + exit(1); + } + fclose(index_file); + + fprintf(stderr, "Index written.\n"); +} + +void merge_index() +{ + for (int i = 0; i < NUM_SHARDS; ++i) { + int pos = 0; + + char filename[256], temp_filename[256]; + sprintf(filename, FILENAME_FORMAT, i); + strcpy(temp_filename, filename); + strcat(temp_filename, ".temp"); + fprintf(stderr, "%s...\r", filename); + FILE *old_index_shard = fopen(filename, "rb"); + if (old_index_shard == NULL) { + perror(filename); + exit(1); + } + FILE *index_shard = fopen(temp_filename, "wb"); + if (index_shard == NULL) { + perror(filename); + exit(1); + } + for (int j = 0; j < POSTING_LISTS_PER_SHARD; ++j) { + unsigned trigram = (i * POSTING_LISTS_PER_SHARD) + j; + + // Read in the old posting list + int old_size = meta_index[trigram].size; + vector old_posting_list(old_size); + for (int k = 0; k < old_size; ++k) { + if (fread(&old_posting_list[k], sizeof(posting_list_entry), 1, old_index_shard) != 1) { + perror("fread(old posting list)"); + exit(1); + } + } + + vector posting_list = + union_posting_lists(old_posting_list, posting_lists[trigram]); + vector empty; + swap(posting_lists[trigram], empty); + + for (int k = 0; k < posting_list.size(); ++k) { + if (fwrite(&posting_list[k], sizeof(posting_list_entry), 1, index_shard) != 1) { + perror("fwrite"); + exit(1); + } + } + unsigned size = posting_list.size(); + meta_index[trigram].pos = pos; + meta_index[trigram].size = size; + pos += size; + } + fclose(old_index_shard); + fclose(index_shard); + + if (unlink(filename) == -1) { + perror("unlink"); + exit(1); + } + if (rename(temp_filename, filename) == -1) { + perror("rename"); + exit(1); + } + } + + fprintf(stderr, "%s...\r", META_INDEX_NAME); + + FILE *index_file = fopen("index/meta", "wb"); + if (index_file == NULL) { + perror("index/meta"); + exit(1); + } + if (fwrite(meta_index, sizeof(meta_index_entry), 1 << 24, index_file) != (1 << 24)) { + perror("fwrite(meta_index)"); + exit(1); + } + fclose(index_file); + + fprintf(stderr, "New index written.\n"); + + num_entries_used = 0; +} + +void index_file(const char *filename, int file_num) +{ + fprintf(stderr, "%s... ", filename); + + FILE *file = fopen(filename, "rb"); + if (file == NULL) { + perror(filename); + exit(1); + } + + unsigned trigram = 0; + unsigned pos = 0; + while (!feof(file)) { + int ch = getc(file); + ++pos; + if (ch == -1) { + break; + } + + trigram = ((trigram << 8) | ch) & 0xffffff; + + posting_list_entry ple; + ple.file_num = file_num; + ple.pos = pos - 2; + posting_lists[trigram].push_back(ple); + ++num_entries_used; + } + + fprintf(stderr, "%u bytes. [%llu MB RAM used]\n", pos, num_entries_used / (1048576 / sizeof(posting_list_entry))); + fclose(file); + + if (num_entries_used >= (MEMORY_BUDGET_MB * 1048576ULL / sizeof(posting_list_entry))) { + fprintf(stderr, "[need to flush index to disk]\n"); + merge_index(); + } +} + +int main(int argc, char **argv) +{ + fprintf(stderr, "Writing empty index.\n"); + write_index(); + + for (int i = 1; i < argc; ++i) { + index_file(argv[i], i - 1); + } + merge_index(); +} -- 2.39.2