+ uint32_t bucket = hash_trigram(trgm, hdr.hashtable_size);
+ engine->submit_read(fd, sizeof(Trigram) * (hdr.extra_ht_slots + 2), hdr.hash_table_offset_bytes + sizeof(Trigram) * bucket, [this, trgm, cb{ move(cb) }](string_view s) {
+ const Trigram *trgmptr = reinterpret_cast<const Trigram *>(s.data());
+ for (unsigned i = 0; i < hdr.extra_ht_slots + 1; ++i) {
+ if (trgmptr[i].trgm == trgm) {
+ cb(trgmptr + i, trgmptr[i + 1].offset - trgmptr[i].offset);
+ return;
+ }
+ }
+
+ // Not found.
+ cb(nullptr, 0);
+ });
+}
+
+void Corpus::get_compressed_filename_block(uint32_t docid, function<void(string_view)> cb) const
+{
+ // Read the file offset from this docid and the next one.
+ // This is always allowed, since we have a sentinel block at the end.
+ engine->submit_read(fd, sizeof(uint64_t) * 2, offset_for_block(docid), [this, cb{ move(cb) }](string_view s) {
+ const uint64_t *ptr = reinterpret_cast<const uint64_t *>(s.data());
+ off_t offset = ptr[0];
+ size_t len = ptr[1] - ptr[0];
+ engine->submit_read(fd, len, offset, cb);
+ });
+}
+
+size_t Corpus::get_num_filename_blocks() const
+{
+ return hdr.num_docids;
+}
+
+void scan_file_block(const vector<Needle> &needles, string_view compressed,
+ AccessRXCache *access_rx_cache, uint64_t seq, ResultReceiver *serializer,
+ atomic<uint64_t> *matched)
+{
+ unsigned long long uncompressed_len = ZSTD_getFrameContentSize(compressed.data(), compressed.size());
+ if (uncompressed_len == ZSTD_CONTENTSIZE_UNKNOWN || uncompressed_len == ZSTD_CONTENTSIZE_ERROR) {
+ fprintf(stderr, "ZSTD_getFrameContentSize() failed\n");
+ exit(1);
+ }
+
+ string block;
+ block.resize(uncompressed_len + 1);
+
+ static thread_local ZSTD_DCtx *ctx = ZSTD_createDCtx(); // Reused across calls.
+ size_t err;
+
+ if (ddict != nullptr) {
+ err = ZSTD_decompress_usingDDict(ctx, &block[0], block.size(), compressed.data(),
+ compressed.size(), ddict);
+ } else {
+ err = ZSTD_decompressDCtx(ctx, &block[0], block.size(), compressed.data(),
+ compressed.size());
+ }
+ if (ZSTD_isError(err)) {
+ fprintf(stderr, "ZSTD_decompress(): %s\n", ZSTD_getErrorName(err));
+ exit(1);
+ }
+ block[block.size() - 1] = '\0';
+
+ auto test_candidate = [&](const char *filename, uint64_t local_seq, uint64_t next_seq) {
+ access_rx_cache->check_access(filename, /*allow_async=*/true, [matched, serializer, local_seq, next_seq, filename{ strdup(filename) }](bool ok) {
+ if (ok) {
+ ++*matched;
+ serializer->print(local_seq, next_seq - local_seq, filename);
+ } else {
+ serializer->print(local_seq, next_seq - local_seq, "");
+ }
+ free(filename);
+ });
+ };
+
+ // We need to know the next sequence number before inserting into Serializer,
+ // so always buffer one candidate.
+ const char *pending_candidate = nullptr;
+
+ uint64_t local_seq = seq << 32;
+ for (const char *filename = block.data();
+ filename != block.data() + block.size();
+ filename += strlen(filename) + 1) {
+ const char *haystack = filename;
+ if (match_basename) {
+ haystack = strrchr(filename, '/');
+ if (haystack == nullptr) {
+ haystack = filename;
+ } else {
+ ++haystack;
+ }
+ }
+
+ bool found = true;
+ for (const Needle &needle : needles) {
+ if (!matches(needle, haystack)) {
+ found = false;
+ break;
+ }
+ }
+ if (found) {
+ if (pending_candidate != nullptr) {
+ test_candidate(pending_candidate, local_seq, local_seq + 1);
+ ++local_seq;
+ }
+ pending_candidate = filename;
+ }
+ }
+ if (pending_candidate == nullptr) {
+ serializer->print(seq << 32, 1ULL << 32, "");
+ } else {
+ test_candidate(pending_candidate, local_seq, (seq + 1) << 32);
+ }
+}
+
+size_t scan_docids(const vector<Needle> &needles, const vector<uint32_t> &docids, const Corpus &corpus, IOUringEngine *engine)
+{
+ Serializer docids_in_order;
+ AccessRXCache access_rx_cache(engine);
+ atomic<uint64_t> matched{ 0 };
+ for (size_t i = 0; i < docids.size(); ++i) {
+ uint32_t docid = docids[i];
+ corpus.get_compressed_filename_block(docid, [i, &matched, &needles, &access_rx_cache, &docids_in_order](string_view compressed) {
+ scan_file_block(needles, compressed, &access_rx_cache, i, &docids_in_order, &matched);
+ });
+ }
+ engine->finish();
+ return matched;
+}
+
+struct WorkerThread {
+ thread t;
+
+ // We use a result queue instead of synchronizing Serializer,
+ // since a lock on it becomes a huge choke point if there are
+ // lots of threads.
+ mutex result_mu;
+ struct Result {
+ uint64_t seq;
+ uint64_t skip;
+ string msg;
+ };
+ vector<Result> results;
+};
+
+class WorkerThreadReceiver : public ResultReceiver {
+public:
+ WorkerThreadReceiver(WorkerThread *wt)
+ : wt(wt) {}
+
+ void print(uint64_t seq, uint64_t skip, const string msg) override
+ {
+ lock_guard<mutex> lock(wt->result_mu);
+ if (msg.empty() && !wt->results.empty() && wt->results.back().seq + wt->results.back().skip == seq) {
+ wt->results.back().skip += skip;