#include "dprintf.h"
#include "io_uring_engine.h"
#include "parse_trigrams.h"
+#include "serializer.h"
#include "turbopfor.h"
#include "unique_sort.h"
#include <algorithm>
-#include <atomic>
#include <assert.h>
+#include <atomic>
#include <chrono>
#include <condition_variable>
+#include <deque>
#include <fcntl.h>
#include <fnmatch.h>
#include <functional>
#include <getopt.h>
#include <inttypes.h>
-#include <iosfwd>
#include <iterator>
#include <limits>
+#include <locale.h>
#include <map>
#include <memory>
#include <mutex>
-#include <queue>
#include <regex.h>
-#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
+#include <stdint.h>
#include <string.h>
#include <string>
#include <string_view>
#include <thread>
+#include <tuple>
#include <unistd.h>
#include <unordered_map>
#include <unordered_set>
regex_t compile_regex(const string &needle);
-void apply_limit()
-{
- if (--limit_left > 0) {
- return;
- }
- dprintf("Done in %.1f ms, found %" PRId64 " matches.\n",
- 1e3 * duration<float>(steady_clock::now() - start).count(), limit_matches);
- if (only_count) {
- printf("%" PRId64 "\n", limit_matches);
- }
- exit(0);
-}
-
-class ResultReceiver {
-public:
- virtual ~ResultReceiver() = default;
- virtual void print(uint64_t seq, uint64_t skip, const string msg) = 0;
-};
-
-class Serializer : public ResultReceiver {
-public:
- ~Serializer() { assert(limit_left <= 0 || pending.empty()); }
- void print(uint64_t seq, uint64_t skip, const string msg) override;
-
-private:
- uint64_t next_seq = 0;
- struct Element {
- uint64_t seq, skip;
- string msg;
-
- bool operator<(const Element &other) const
- {
- return seq > other.seq;
- }
- };
- priority_queue<Element> pending;
-};
-
-void Serializer::print(uint64_t seq, uint64_t skip, const string msg)
-{
- if (only_count) {
- if (!msg.empty()) {
- apply_limit();
- }
- return;
- }
-
- if (next_seq != seq) {
- pending.push(Element{ seq, skip, move(msg) });
- return;
- }
-
- if (!msg.empty()) {
- if (print_nul) {
- printf("%s%c", msg.c_str(), 0);
- } else {
- printf("%s\n", msg.c_str());
- }
- apply_limit();
- }
- next_seq += skip;
-
- // See if any delayed prints can now be dealt with.
- while (!pending.empty() && pending.top().seq == next_seq) {
- if (!pending.top().msg.empty()) {
- if (print_nul) {
- printf("%s%c", pending.top().msg.c_str(), 0);
- } else {
- printf("%s\n", pending.top().msg.c_str());
- }
- apply_limit();
- }
- next_seq += pending.top().skip;
- pending.pop();
- }
-}
-
struct Needle {
enum { STRSTR,
REGEX,
{
Serializer docids_in_order;
AccessRXCache access_rx_cache(engine);
- atomic<uint64_t> matched{0};
+ atomic<uint64_t> matched{ 0 };
for (size_t i = 0; i < docids.size(); ++i) {
uint32_t docid = docids[i];
corpus.get_compressed_filename_block(docid, [i, &matched, &needles, &access_rx_cache, &docids_in_order](string_view compressed) {
// since a lock on it becomes a huge choke point if there are
// lots of threads.
mutex result_mu;
- vector<tuple<uint64_t, uint64_t, string>> results;
+ struct Result {
+ uint64_t seq;
+ uint64_t skip;
+ string msg;
+ };
+ vector<Result> results;
};
class WorkerThreadReceiver : public ResultReceiver {
public:
- WorkerThreadReceiver(WorkerThread *wt) : wt(wt) {}
+ WorkerThreadReceiver(WorkerThread *wt)
+ : wt(wt) {}
void print(uint64_t seq, uint64_t skip, const string msg) override
{
lock_guard<mutex> lock(wt->result_mu);
- wt->results.emplace_back(seq, skip, move(msg));
+ if (msg.empty() && !wt->results.empty() && wt->results.back().seq + wt->results.back().skip == seq) {
+ wt->results.back().skip += skip;
+ } else {
+ wt->results.emplace_back(WorkerThread::Result{ seq, skip, move(msg) });
+ }
}
private:
void deliver_results(WorkerThread *wt, Serializer *serializer)
{
- vector<tuple<uint64_t, uint64_t, string>> results;
+ vector<WorkerThread::Result> results;
{
lock_guard<mutex> lock(wt->result_mu);
results = move(wt->results);
}
- for (const auto &result : results) {
- serializer->print(get<0>(result), get<1>(result), move(get<2>(result)));
+ for (const WorkerThread::Result &result : results) {
+ serializer->print(result.seq, result.skip, move(result.msg));
}
}