using namespace std;
-IOUringEngine::IOUringEngine()
+IOUringEngine::IOUringEngine(size_t slop_bytes)
+ : slop_bytes(slop_bytes)
{
#ifdef WITHOUT_URING
int ret = -1;
using_uring = (ret >= 0);
}
-void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(string)> cb)
+void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(string_view)> cb)
{
if (!using_uring) {
// Synchronous read.
string s;
- s.resize(len);
+ s.resize(len + slop_bytes);
complete_pread(fd, &s[0], len, offset);
- cb(move(s));
+ cb(string_view(s.data(), len));
return;
}
}
#ifndef WITHOUT_URING
-void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function<void(string)> cb)
+void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function<void(string_view)> cb)
{
void *buf;
- if (posix_memalign(&buf, /*alignment=*/4096, len)) {
+ if (posix_memalign(&buf, /*alignment=*/4096, len + slop_bytes)) {
fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno));
exit(1);
}
--pending_reads;
size_t old_pending_reads = pending_reads;
- pending->cb(string(reinterpret_cast<char *>(pending->buf), pending->len));
+ pending->cb(string_view(reinterpret_cast<char *>(pending->buf), pending->len));
free(pending->buf);
delete pending;
class IOUringEngine {
public:
- IOUringEngine();
- void submit_read(int fd, size_t len, off_t offset, std::function<void(std::string)> cb);
+ IOUringEngine(size_t slop_bytes);
+ void submit_read(int fd, size_t len, off_t offset, std::function<void(std::string_view)> cb);
void finish();
size_t get_waiting_reads() const { return pending_reads + queued_reads.size(); }
private:
#ifndef WITHOUT_URING
- void submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, std::function<void(std::string)> cb);
+ void submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, std::function<void(std::string_view)> cb);
io_uring ring;
#endif
size_t pending_reads = 0; // Number of requests we have going in the ring.
bool using_uring;
+ const size_t slop_bytes;
struct QueuedRead {
int fd;
size_t len;
off_t offset;
- std::function<void(std::string)> cb;
+ std::function<void(std::string_view)> cb;
};
std::queue<QueuedRead> queued_reads;
struct PendingRead {
void *buf;
size_t len;
- std::function<void(std::string)> cb;
+ std::function<void(std::string_view)> cb;
// For re-submission.
int fd;
Corpus(int fd, IOUringEngine *engine);
~Corpus();
void find_trigram(uint32_t trgm, function<void(const Trigram *trgmptr, size_t len)> cb);
- void get_compressed_filename_block(uint32_t docid, function<void(string)> cb) const;
+ void get_compressed_filename_block(uint32_t docid, function<void(string_view)> cb) const;
size_t get_num_filename_blocks() const;
off_t offset_for_block(uint32_t docid) const
{
void Corpus::find_trigram(uint32_t trgm, function<void(const Trigram *trgmptr, size_t len)> cb)
{
uint32_t bucket = hash_trigram(trgm, hdr.hashtable_size);
- engine->submit_read(fd, sizeof(Trigram) * (hdr.extra_ht_slots + 2), hdr.hash_table_offset_bytes + sizeof(Trigram) * bucket, [this, trgm, cb{ move(cb) }](string s) {
+ engine->submit_read(fd, sizeof(Trigram) * (hdr.extra_ht_slots + 2), hdr.hash_table_offset_bytes + sizeof(Trigram) * bucket, [this, trgm, cb{ move(cb) }](string_view s) {
const Trigram *trgmptr = reinterpret_cast<const Trigram *>(s.data());
for (unsigned i = 0; i < hdr.extra_ht_slots + 1; ++i) {
if (trgmptr[i].trgm == trgm) {
});
}
-void Corpus::get_compressed_filename_block(uint32_t docid, function<void(string)> cb) const
+void Corpus::get_compressed_filename_block(uint32_t docid, function<void(string_view)> cb) const
{
// Read the file offset from this docid and the next one.
// This is always allowed, since we have a sentinel block at the end.
- engine->submit_read(fd, sizeof(uint64_t) * 2, offset_for_block(docid), [this, cb{ move(cb) }](string s) {
+ engine->submit_read(fd, sizeof(uint64_t) * 2, offset_for_block(docid), [this, cb{ move(cb) }](string_view s) {
const uint64_t *ptr = reinterpret_cast<const uint64_t *>(s.data());
off_t offset = ptr[0];
size_t len = ptr[1] - ptr[0];
uint64_t matched = 0;
for (size_t i = 0; i < docids.size(); ++i) {
uint32_t docid = docids[i];
- corpus.get_compressed_filename_block(docid, [i, &matched, &needles, &access_rx_cache, &docids_in_order](string compressed) {
+ corpus.get_compressed_filename_block(docid, [i, &matched, &needles, &access_rx_cache, &docids_in_order](string_view compressed) {
matched += scan_file_block(needles, compressed, &access_rx_cache, i, &docids_in_order);
});
}
return;
}
- IOUringEngine engine;
+ IOUringEngine engine(/*slop_bytes=*/16); // 16 slop bytes as described in turbopfor.h.
Corpus corpus(fd, &engine);
dprintf("Corpus init done after %.1f ms.\n", 1e3 * duration<float>(steady_clock::now() - start).count());
if (done)
break;
}
- engine.submit_read(fd, len, trgmptr.offset, [trgmptr{trgmptr}, len{len}, &done, &in1, &in2, &out](string s) {
+ engine.submit_read(fd, len, trgmptr.offset, [trgmptr{trgmptr}, len{len}, &done, &in1, &in2, &out](string_view s) {
if (done)
return;
uint32_t trgm __attribute__((unused)) = trgmptr.trgm;
size_t num = trgmptr.num_docids;
- unsigned char *pldata = reinterpret_cast<unsigned char *>(s.data());
+ const unsigned char *pldata = reinterpret_cast<const unsigned char *>(s.data());
if (in1.empty()) {
in1.resize(num + 128);
decode_pfor_delta1<128>(pldata, num, /*interleaved=*/true, &in1[0]);