#include <algorithm>
#include <arpa/inet.h>
+#include <assert.h>
#include <chrono>
#include <endian.h>
#include <fcntl.h>
class Serializer {
public:
- void do_or_wait(int seq, function<void()> cb);
+ bool ready_to_print(int seq) { return next_seq == seq; }
+ void print_delayed(int seq, const vector<string> msg);
+ void release_current();
private:
int next_seq = 0;
struct Element {
int seq;
- function<void()> cb;
+ vector<string> msg;
bool operator<(const Element &other) const
{
priority_queue<Element> pending;
};
-void Serializer::do_or_wait(int seq, function<void()> cb)
+void Serializer::print_delayed(int seq, const vector<string> msg)
{
- if (seq != next_seq) {
- pending.emplace(Element{ seq, move(cb) });
- return;
- }
+ pending.push(Element{seq, move(msg)});
+}
- cb();
+void Serializer::release_current()
+{
++next_seq;
+ // See if any delayed prints can now be dealt with.
while (!pending.empty() && pending.top().seq == next_seq) {
- pending.top().cb();
+ for (const string &msg : pending.top().msg) {
+ if (print_nul) {
+ printf("%s%c", msg.c_str(), 0);
+ } else {
+ printf("%s\n", msg.c_str());
+ }
+ }
pending.pop();
++next_seq;
}
}
size_t scan_file_block(const vector<string> &needles, string_view compressed,
- unordered_map<string, bool> *access_rx_cache)
+ unordered_map<string, bool> *access_rx_cache, int seq,
+ Serializer *serializer)
{
size_t matched = 0;
}
block[block.size() - 1] = '\0';
+ bool immediate_print = (serializer == nullptr || serializer->ready_to_print(seq));
+ vector<string> delayed;
+
for (const char *filename = block.data();
filename != block.data() + block.size();
filename += strlen(filename) + 1) {
}
if (found && has_access(filename, access_rx_cache)) {
++matched;
- if (print_nul) {
- printf("%s%c", filename, 0);
+ if (immediate_print) {
+ if (print_nul) {
+ printf("%s%c", filename, 0);
+ } else {
+ printf("%s\n", filename);
+ }
} else {
- printf("%s\n", filename);
+ delayed.push_back(filename);
}
}
}
+ if (serializer != nullptr) {
+ if (immediate_print) {
+ serializer->release_current();
+ } else {
+ serializer->print_delayed(seq, move(delayed));
+ }
+ }
return matched;
}
for (size_t i = 0; i < docids.size(); ++i) {
uint32_t docid = docids[i];
corpus.get_compressed_filename_block(docid, [i, &matched, &needles, &access_rx_cache, &docids_in_order](string compressed) {
- docids_in_order.do_or_wait(i, [&matched, &needles, compressed{ move(compressed) }, &access_rx_cache] {
- matched += scan_file_block(needles, compressed, &access_rx_cache);
- });
+ matched += scan_file_block(needles, compressed, &access_rx_cache, i, &docids_in_order);
});
}
engine->finish();
for (uint32_t docid = io_docid; docid < last_docid; ++docid) {
size_t relative_offset = offsets[docid] - offsets[io_docid];
size_t len = offsets[docid + 1] - offsets[docid];
- scan_file_block(needles, { &compressed[relative_offset], len }, &access_rx_cache);
+ scan_file_block(needles, { &compressed[relative_offset], len }, &access_rx_cache, 0, nullptr);
}
}
}