+ unsigned long long uncompressed_len = ZSTD_getFrameContentSize(compressed.data(), compressed.size());
+ if (uncompressed_len == ZSTD_CONTENTSIZE_UNKNOWN || uncompressed_len == ZSTD_CONTENTSIZE_ERROR) {
+ fprintf(stderr, "ZSTD_getFrameContentSize() failed\n");
+ exit(1);
+ }
+
+ string block;
+ block.resize(uncompressed_len + 1);
+
+ static thread_local ZSTD_DCtx *ctx = ZSTD_createDCtx(); // Reused across calls.
+ size_t err;
+
+ if (ddict != nullptr) {
+ err = ZSTD_decompress_usingDDict(ctx, &block[0], block.size(), compressed.data(),
+ compressed.size(), ddict);
+ } else {
+ err = ZSTD_decompressDCtx(ctx, &block[0], block.size(), compressed.data(),
+ compressed.size());
+ }
+ if (ZSTD_isError(err)) {
+ fprintf(stderr, "ZSTD_decompress(): %s\n", ZSTD_getErrorName(err));
+ exit(1);
+ }
+ block[block.size() - 1] = '\0';
+
+ auto test_candidate = [&](const char *filename, uint64_t local_seq, uint64_t next_seq) {
+ access_rx_cache->check_access(filename, /*allow_async=*/true, [matched, serializer, local_seq, next_seq, filename{ strdup(filename) }](bool ok) {
+ if (ok) {
+ ++*matched;
+ serializer->print(local_seq, next_seq - local_seq, filename);
+ } else {
+ serializer->print(local_seq, next_seq - local_seq, "");
+ }
+ free(filename);
+ });
+ };
+
+ // We need to know the next sequence number before inserting into Serializer,
+ // so always buffer one candidate.
+ const char *pending_candidate = nullptr;
+
+ uint64_t local_seq = seq << 32;
+ for (const char *filename = block.data();
+ filename != block.data() + block.size();
+ filename += strlen(filename) + 1) {
+ bool found = true;
+ for (const Needle &needle : needles) {
+ if (!matches(needle, filename)) {
+ found = false;
+ break;
+ }
+ }
+ if (found) {
+ if (pending_candidate != nullptr) {
+ test_candidate(pending_candidate, local_seq, local_seq + 1);
+ ++local_seq;
+ }
+ pending_candidate = filename;
+ }
+ }
+ if (pending_candidate == nullptr) {
+ serializer->print(seq << 32, 1ULL << 32, "");
+ } else {
+ test_candidate(pending_candidate, local_seq, (seq + 1) << 32);
+ }
+}
+
+size_t scan_docids(const vector<Needle> &needles, const vector<uint32_t> &docids, const Corpus &corpus, IOUringEngine *engine)
+{
+ Serializer docids_in_order;
+ AccessRXCache access_rx_cache(engine);
+ atomic<uint64_t> matched{ 0 };
+ for (size_t i = 0; i < docids.size(); ++i) {
+ uint32_t docid = docids[i];
+ corpus.get_compressed_filename_block(docid, [i, &matched, &needles, &access_rx_cache, &docids_in_order](string_view compressed) {
+ scan_file_block(needles, compressed, &access_rx_cache, i, &docids_in_order, &matched);
+ });
+ }
+ engine->finish();
+ return matched;
+}
+
+struct WorkerThread {
+ thread t;
+
+ // We use a result queue instead of synchronizing Serializer,
+ // since a lock on it becomes a huge choke point if there are
+ // lots of threads.
+ mutex result_mu;
+ struct Result {
+ uint64_t seq;
+ uint64_t skip;
+ string msg;
+ };
+ vector<Result> results;
+};
+
+class WorkerThreadReceiver : public ResultReceiver {
+public:
+ WorkerThreadReceiver(WorkerThread *wt)
+ : wt(wt) {}
+
+ void print(uint64_t seq, uint64_t skip, const string msg) override
+ {
+ lock_guard<mutex> lock(wt->result_mu);
+ if (msg.empty() && !wt->results.empty() && wt->results.back().seq + wt->results.back().skip == seq) {
+ wt->results.back().skip += skip;