+void Corpus::find_trigram(uint32_t trgm, function<void(const Trigram *trgmptr, size_t len)> cb)
+{
+ uint32_t bucket = hash_trigram(trgm, hdr.hashtable_size);
+ engine->submit_read(fd, sizeof(Trigram) * (hdr.extra_ht_slots + 2), hdr.hash_table_offset_bytes + sizeof(Trigram) * bucket, [this, trgm, cb{ move(cb) }](string_view s) {
+ const Trigram *trgmptr = reinterpret_cast<const Trigram *>(s.data());
+ for (unsigned i = 0; i < hdr.extra_ht_slots + 1; ++i) {
+ if (trgmptr[i].trgm == trgm) {
+ cb(trgmptr + i, trgmptr[i + 1].offset - trgmptr[i].offset);
+ return;
+ }
+ }
+
+ // Not found.
+ cb(nullptr, 0);
+ });
+}
+
+void Corpus::get_compressed_filename_block(uint32_t docid, function<void(string_view)> cb) const
+{
+ // Read the file offset from this docid and the next one.
+ // This is always allowed, since we have a sentinel block at the end.
+ engine->submit_read(fd, sizeof(uint64_t) * 2, offset_for_block(docid), [this, cb{ move(cb) }](string_view s) {
+ const uint64_t *ptr = reinterpret_cast<const uint64_t *>(s.data());
+ off_t offset = ptr[0];
+ size_t len = ptr[1] - ptr[0];
+ engine->submit_read(fd, len, offset, cb);
+ });
+}
+
+size_t Corpus::get_num_filename_blocks() const
+{
+ return hdr.num_docids;
+}
+
+void scan_file_block(const vector<Needle> &needles, string_view compressed,
+ AccessRXCache *access_rx_cache, uint64_t seq, Serializer *serializer,
+ uint64_t *matched)
+{
+ unsigned long long uncompressed_len = ZSTD_getFrameContentSize(compressed.data(), compressed.size());
+ if (uncompressed_len == ZSTD_CONTENTSIZE_UNKNOWN || uncompressed_len == ZSTD_CONTENTSIZE_ERROR) {
+ fprintf(stderr, "ZSTD_getFrameContentSize() failed\n");
+ exit(1);
+ }
+
+ string block;
+ block.resize(uncompressed_len + 1);
+
+ static ZSTD_DCtx *ctx = ZSTD_createDCtx(); // Reused across calls.
+ size_t err;
+
+ if (ddict != nullptr) {
+ err = ZSTD_decompress_usingDDict(ctx, &block[0], block.size(), compressed.data(),
+ compressed.size(), ddict);
+ } else {
+ err = ZSTD_decompressDCtx(ctx, &block[0], block.size(), compressed.data(),
+ compressed.size());
+ }
+ if (ZSTD_isError(err)) {
+ fprintf(stderr, "ZSTD_decompress(): %s\n", ZSTD_getErrorName(err));
+ exit(1);
+ }
+ block[block.size() - 1] = '\0';
+
+ auto test_candidate = [&](const char *filename, uint64_t local_seq, uint64_t next_seq) {
+ access_rx_cache->check_access(filename, /*allow_async=*/true, [matched, serializer, local_seq, next_seq, filename{ strdup(filename) }](bool ok) {
+ if (ok) {
+ ++*matched;
+ serializer->print(local_seq, next_seq - local_seq, filename);
+ } else {
+ serializer->print(local_seq, next_seq - local_seq, "");
+ }
+ free(filename);
+ });
+ };
+
+ // We need to know the next sequence number before inserting into Serializer,
+ // so always buffer one candidate.
+ const char *pending_candidate = nullptr;
+
+ uint64_t local_seq = seq << 32;
+ for (const char *filename = block.data();
+ filename != block.data() + block.size();
+ filename += strlen(filename) + 1) {
+ bool found = true;
+ for (const Needle &needle : needles) {
+ if (!matches(needle, filename)) {
+ found = false;
+ break;
+ }
+ }
+ if (found) {
+ if (pending_candidate != nullptr) {
+ test_candidate(pending_candidate, local_seq, local_seq + 1);
+ ++local_seq;
+ }
+ pending_candidate = filename;
+ }
+ }
+ if (pending_candidate == nullptr) {
+ serializer->print(seq << 32, 1ULL << 32, "");
+ } else {
+ test_candidate(pending_candidate, local_seq, (seq + 1) << 32);
+ }
+}
+
+size_t scan_docids(const vector<Needle> &needles, const vector<uint32_t> &docids, const Corpus &corpus, IOUringEngine *engine)
+{
+ Serializer docids_in_order;
+ AccessRXCache access_rx_cache(engine);
+ uint64_t matched = 0;
+ for (size_t i = 0; i < docids.size(); ++i) {
+ uint32_t docid = docids[i];
+ corpus.get_compressed_filename_block(docid, [i, &matched, &needles, &access_rx_cache, &docids_in_order](string_view compressed) {
+ scan_file_block(needles, compressed, &access_rx_cache, i, &docids_in_order, &matched);
+ });
+ }
+ engine->finish();
+ return matched;
+}
+
+// We do this sequentially, as it's faster than scattering
+// a lot of I/O through io_uring and hoping the kernel will
+// coalesce it plus readahead for us.
+uint64_t scan_all_docids(const vector<Needle> &needles, int fd, const Corpus &corpus)
+{
+ {
+ const Header &hdr = corpus.get_hdr();
+ if (hdr.zstd_dictionary_length_bytes > 0) {
+ string dictionary;
+ dictionary.resize(hdr.zstd_dictionary_length_bytes);
+ complete_pread(fd, &dictionary[0], hdr.zstd_dictionary_length_bytes, hdr.zstd_dictionary_offset_bytes);
+ ddict = ZSTD_createDDict(dictionary.data(), dictionary.size());
+ }
+ }
+
+ AccessRXCache access_rx_cache(nullptr);
+ Serializer serializer; // Mostly dummy; handles only the limit.
+ uint32_t num_blocks = corpus.get_num_filename_blocks();
+ unique_ptr<uint64_t[]> offsets(new uint64_t[num_blocks + 1]);
+ complete_pread(fd, offsets.get(), (num_blocks + 1) * sizeof(uint64_t), corpus.offset_for_block(0));
+ string compressed;
+ uint64_t matched = 0;
+ for (uint32_t io_docid = 0; io_docid < num_blocks; io_docid += 32) {
+ uint32_t last_docid = std::min(io_docid + 32, num_blocks);
+ size_t io_len = offsets[last_docid] - offsets[io_docid];
+ if (compressed.size() < io_len) {
+ compressed.resize(io_len);
+ }
+ complete_pread(fd, &compressed[0], io_len, offsets[io_docid]);
+
+ for (uint32_t docid = io_docid; docid < last_docid; ++docid) {
+ size_t relative_offset = offsets[docid] - offsets[io_docid];
+ size_t len = offsets[docid + 1] - offsets[docid];
+ scan_file_block(needles, { &compressed[relative_offset], len }, &access_rx_cache, docid, &serializer, &matched);
+ }
+ }
+ return matched;
+}
+
+// Takes the given posting list, unions it into the parts of the trigram disjunction
+// already read; if the list is complete, intersects with “cur_candidates”.
+//
+// Returns true if the search should be aborted (we are done).
+bool new_posting_list_read(TrigramDisjunction *td, vector<uint32_t> decoded, vector<uint32_t> *cur_candidates, vector<uint32_t> *tmp)
+{
+ if (td->docids.empty()) {
+ td->docids = move(decoded);
+ } else {
+ tmp->clear();
+ set_union(decoded.begin(), decoded.end(), td->docids.begin(), td->docids.end(), back_inserter(*tmp));
+ swap(*tmp, td->docids);
+ }
+ if (--td->remaining_trigrams_to_read > 0) {
+ // Need to wait for more.
+ if (ignore_case) {
+ dprintf(" ... %u reads left in OR group %u (%zu docids in list)\n",
+ td->remaining_trigrams_to_read, td->index, td->docids.size());
+ }
+ return false;
+ }
+ if (cur_candidates->empty()) {
+ if (ignore_case) {
+ dprintf(" ... all reads done for OR group %u (%zu docids)\n",
+ td->index, td->docids.size());
+ }
+ *cur_candidates = move(td->docids);
+ } else {
+ tmp->clear();
+ set_intersection(cur_candidates->begin(), cur_candidates->end(),
+ td->docids.begin(), td->docids.end(),
+ back_inserter(*tmp));
+ swap(*cur_candidates, *tmp);
+ if (ignore_case) {
+ if (cur_candidates->empty()) {
+ dprintf(" ... all reads done for OR group %u (%zu docids), intersected (none left, search is done)\n",
+ td->index, td->docids.size());
+ return true;
+ } else {
+ dprintf(" ... all reads done for OR group %u (%zu docids), intersected (%zu left)\n",
+ td->index, td->docids.size(), cur_candidates->size());
+ }
+ }
+ }
+ return false;
+}