Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task AB# 1310793: [LevelDB] Allow per-platform log buffer size #7

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion db/corruption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ TEST_F(CorruptionTest, Recovery) {
Build(100);
Check(100, 100);
Corrupt(kLogFile, 19, 1); // WriteBatch tag for first record
Corrupt(kLogFile, log::kBlockSize + 1000, 1); // Somewhere in second block
Corrupt(kLogFile, port::kLogBlockSize + 1000, 1); // Somewhere in second block
Reopen();

// The 64 records in the first two log blocks are completely lost.
Expand Down
2 changes: 0 additions & 2 deletions db/log_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ enum RecordType {
};
static const int kMaxRecordType = kLastType;

static const int kBlockSize = 32768;

// Header is checksum (4 bytes), length (2 bytes), type (1 byte).
static const int kHeaderSize = 4 + 2 + 1;

Expand Down
14 changes: 7 additions & 7 deletions db/log_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum,
: file_(file),
reporter_(reporter),
checksum_(checksum),
backing_store_(new char[kBlockSize]),
backing_store_(new char[port::kLogBlockSize]),
buffer_(),
eof_(false),
last_record_offset_(0),
Expand All @@ -31,12 +31,12 @@ Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum,
Reader::~Reader() { delete[] backing_store_; }

bool Reader::SkipToInitialBlock() {
const size_t offset_in_block = initial_offset_ % kBlockSize;
const size_t offset_in_block = initial_offset_ % port::kLogBlockSize;
uint64_t block_start_location = initial_offset_ - offset_in_block;

// Don't search a block if we'd be in the trailer
if (offset_in_block > kBlockSize - 6) {
block_start_location += kBlockSize;
if (offset_in_block > port::kLogBlockSize - 6) {
block_start_location += port::kLogBlockSize;
}

end_of_buffer_offset_ = block_start_location;
Expand Down Expand Up @@ -192,14 +192,14 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) {
if (!eof_) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
Status status = file_->Read(port::kLogBlockSize, &buffer_, backing_store_);
end_of_buffer_offset_ += buffer_.size();
if (!status.ok()) {
buffer_.clear();
ReportDrop(kBlockSize, status);
ReportDrop(port::kLogBlockSize, status);
eof_ = true;
return kEof;
} else if (buffer_.size() < kBlockSize) {
} else if (buffer_.size() < port::kLogBlockSize) {
eof_ = true;
}
continue;
Expand Down
2 changes: 1 addition & 1 deletion db/log_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class Reader {
bool const checksum_;
char* const backing_store_;
Slice buffer_;
bool eof_; // Last Read() indicated EOF by returning < kBlockSize
bool eof_; // Last Read() indicated EOF by returning < port::kLogBlockSize

// Offset of the last record returned by ReadRecord.
uint64_t last_record_offset_;
Expand Down
60 changes: 30 additions & 30 deletions db/log_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,20 +238,20 @@ class LogTest : public testing::Test {
size_t LogTest::initial_offset_record_sizes_[] = {
10000, // Two sizable records in first block
10000,
2 * log::kBlockSize - 1000, // Span three blocks
2 * port::kLogBlockSize - 1000, // Span three blocks
1,
13716, // Consume all but two bytes of block 3.
log::kBlockSize - kHeaderSize, // Consume the entirety of block 4.
port::kLogBlockSize - kHeaderSize, // Consume the entirety of block 4.
};

uint64_t LogTest::initial_offset_last_record_offsets_[] = {
0,
kHeaderSize + 10000,
2 * (kHeaderSize + 10000),
2 * (kHeaderSize + 10000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
2 * (kHeaderSize + 10000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize +
2 * (kHeaderSize + 10000) + (2 * port::kLogBlockSize - 1000) + 3 * kHeaderSize,
2 * (kHeaderSize + 10000) + (2 * port::kLogBlockSize - 1000) + 3 * kHeaderSize +
kHeaderSize + 1,
3 * log::kBlockSize,
3 * port::kLogBlockSize,
};

// LogTest::initial_offset_last_record_offsets_ must be defined before this.
Expand Down Expand Up @@ -295,9 +295,9 @@ TEST_F(LogTest, Fragmentation) {

TEST_F(LogTest, MarginalTrailer) {
// Make a trailer that is exactly the same length as an empty record.
const int n = kBlockSize - 2 * kHeaderSize;
const int n = port::kLogBlockSize - 2 * kHeaderSize;
Write(BigString("foo", n));
ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
ASSERT_EQ(port::kLogBlockSize - kHeaderSize, WrittenBytes());
Write("");
Write("bar");
ASSERT_EQ(BigString("foo", n), Read());
Expand All @@ -308,9 +308,9 @@ TEST_F(LogTest, MarginalTrailer) {

TEST_F(LogTest, MarginalTrailer2) {
// Make a trailer that is exactly the same length as an empty record.
const int n = kBlockSize - 2 * kHeaderSize;
const int n = port::kLogBlockSize - 2 * kHeaderSize;
Write(BigString("foo", n));
ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
ASSERT_EQ(port::kLogBlockSize - kHeaderSize, WrittenBytes());
Write("bar");
ASSERT_EQ(BigString("foo", n), Read());
ASSERT_EQ("bar", Read());
Expand All @@ -320,9 +320,9 @@ TEST_F(LogTest, MarginalTrailer2) {
}

TEST_F(LogTest, ShortTrailer) {
const int n = kBlockSize - 2 * kHeaderSize + 4;
const int n = port::kLogBlockSize - 2 * kHeaderSize + 4;
Write(BigString("foo", n));
ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
ASSERT_EQ(port::kLogBlockSize - kHeaderSize + 4, WrittenBytes());
Write("");
Write("bar");
ASSERT_EQ(BigString("foo", n), Read());
Expand All @@ -332,9 +332,9 @@ TEST_F(LogTest, ShortTrailer) {
}

TEST_F(LogTest, AlignedEof) {
const int n = kBlockSize - 2 * kHeaderSize + 4;
const int n = port::kLogBlockSize - 2 * kHeaderSize + 4;
Write(BigString("foo", n));
ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
ASSERT_EQ(port::kLogBlockSize - kHeaderSize + 4, WrittenBytes());
ASSERT_EQ(BigString("foo", n), Read());
ASSERT_EQ("EOF", Read());
}
Expand Down Expand Up @@ -367,7 +367,7 @@ TEST_F(LogTest, ReadError) {
Write("foo");
ForceError();
ASSERT_EQ("EOF", Read());
ASSERT_EQ(kBlockSize, DroppedBytes());
ASSERT_EQ(port::kLogBlockSize, DroppedBytes());
ASSERT_EQ("OK", MatchError("read error"));
}

Expand All @@ -391,13 +391,13 @@ TEST_F(LogTest, TruncatedTrailingRecordIsIgnored) {
}

TEST_F(LogTest, BadLength) {
const int kPayloadSize = kBlockSize - kHeaderSize;
const int kPayloadSize = port::kLogBlockSize - kHeaderSize;
Write(BigString("bar", kPayloadSize));
Write("foo");
// Least significant size byte is stored in header[4].
IncrementByte(4, 1);
ASSERT_EQ("foo", Read());
ASSERT_EQ(kBlockSize, DroppedBytes());
ASSERT_EQ(port::kLogBlockSize, DroppedBytes());
ASSERT_EQ("OK", MatchError("bad record length"));
}

Expand Down Expand Up @@ -458,7 +458,7 @@ TEST_F(LogTest, UnexpectedFirstType) {
}

TEST_F(LogTest, MissingLastIsIgnored) {
Write(BigString("bar", kBlockSize));
Write(BigString("bar", port::kLogBlockSize));
// Remove the LAST block, including header.
ShrinkSize(14);
ASSERT_EQ("EOF", Read());
Expand All @@ -467,7 +467,7 @@ TEST_F(LogTest, MissingLastIsIgnored) {
}

TEST_F(LogTest, PartialLastIsIgnored) {
Write(BigString("bar", kBlockSize));
Write(BigString("bar", port::kLogBlockSize));
// Cause a bad record length in the LAST block.
ShrinkSize(1);
ASSERT_EQ("EOF", Read());
Expand All @@ -481,9 +481,9 @@ TEST_F(LogTest, SkipIntoMultiRecord) {
// If initial_offset points to a record after first(R1) but before first(R2)
// incomplete fragment errors are not actual errors, and must be suppressed
// until a new first or full record is encountered.
Write(BigString("foo", 3 * kBlockSize));
Write(BigString("foo", 3 * port::kLogBlockSize));
Write("correct");
StartReadingAt(kBlockSize);
StartReadingAt(port::kLogBlockSize);

ASSERT_EQ("correct", Read());
ASSERT_EQ("", ReportMessage());
Expand All @@ -498,20 +498,20 @@ TEST_F(LogTest, ErrorJoinsRecords) {
// first(R1),last(R2) to get joined and returned as a valid record.

// Write records that span two blocks
Write(BigString("foo", kBlockSize));
Write(BigString("bar", kBlockSize));
Write(BigString("foo", port::kLogBlockSize));
Write(BigString("bar", port::kLogBlockSize));
Write("correct");

// Wipe the middle block
for (int offset = kBlockSize; offset < 2 * kBlockSize; offset++) {
for (int offset = port::kLogBlockSize; offset < 2 * port::kLogBlockSize; offset++) {
SetByte(offset, 'x');
}

ASSERT_EQ("correct", Read());
ASSERT_EQ("EOF", Read());
const size_t dropped = DroppedBytes();
ASSERT_LE(dropped, 2 * kBlockSize + 100);
ASSERT_GE(dropped, 2 * kBlockSize);
ASSERT_LE(dropped, 2 * port::kLogBlockSize + 100);
ASSERT_GE(dropped, 2 * port::kLogBlockSize);
}

TEST_F(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); }
Expand All @@ -529,25 +529,25 @@ TEST_F(LogTest, ReadThirdStart) { CheckInitialOffsetRecord(20014, 2); }
TEST_F(LogTest, ReadFourthOneOff) { CheckInitialOffsetRecord(20015, 3); }

TEST_F(LogTest, ReadFourthFirstBlockTrailer) {
CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
CheckInitialOffsetRecord(port::kLogBlockSize - 4, 3);
}

TEST_F(LogTest, ReadFourthMiddleBlock) {
CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
CheckInitialOffsetRecord(port::kLogBlockSize + 1, 3);
}

TEST_F(LogTest, ReadFourthLastBlock) {
CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
CheckInitialOffsetRecord(2 * port::kLogBlockSize + 1, 3);
}

TEST_F(LogTest, ReadFourthStart) {
CheckInitialOffsetRecord(
2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
2 * (kHeaderSize + 1000) + (2 * port::kLogBlockSize - 1000) + 3 * kHeaderSize,
3);
}

TEST_F(LogTest, ReadInitialOffsetIntoBlockPadding) {
CheckInitialOffsetRecord(3 * log::kBlockSize - 3, 5);
CheckInitialOffsetRecord(3 * port::kLogBlockSize - 3, 5);
}

TEST_F(LogTest, ReadEnd) { CheckOffsetPastEndReturnsNoRecords(0); }
Expand Down
10 changes: 5 additions & 5 deletions db/log_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Writer::Writer(WritableFile* dest) : dest_(dest), block_offset_(0) {
}

Writer::Writer(WritableFile* dest, uint64_t dest_length)
: dest_(dest), block_offset_(dest_length % kBlockSize) {
: dest_(dest), block_offset_(dest_length % port::kLogBlockSize) {
InitTypeCrc(type_crc_);
}

Expand All @@ -41,7 +41,7 @@ Status Writer::AddRecord(const Slice& slice) {
Status s;
bool begin = true;
do {
const int leftover = kBlockSize - block_offset_;
const int leftover = port::kLogBlockSize - block_offset_;
assert(leftover >= 0);
if (leftover < kHeaderSize) {
// Switch to a new block
Expand All @@ -54,9 +54,9 @@ Status Writer::AddRecord(const Slice& slice) {
}

// Invariant: we never leave < kHeaderSize bytes in a block.
assert(kBlockSize - block_offset_ - kHeaderSize >= 0);
assert(port::kLogBlockSize - block_offset_ - kHeaderSize >= 0);

const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
const size_t avail = port::kLogBlockSize - block_offset_ - kHeaderSize;
const size_t fragment_length = (left < avail) ? left : avail;

RecordType type;
Expand All @@ -82,7 +82,7 @@ Status Writer::AddRecord(const Slice& slice) {
Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr,
size_t length) {
assert(length <= 0xffff); // Must fit in two bytes
assert(block_offset_ + kHeaderSize + length <= kBlockSize);
assert(block_offset_ + kHeaderSize + length <= port::kLogBlockSize);

// Format the header
char buf[kHeaderSize];
Expand Down
3 changes: 3 additions & 0 deletions port/port_example.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ namespace port {
// TODO(jorlow): Many of these belong more in the environment class rather than
// here. We should try moving them and see if it affects perf.

// Buffer size for log
static const int kLogBlockSize = 32768;

// ------------------ Threading -------------------

// A Mutex represents an exclusive lock.
Expand Down
2 changes: 2 additions & 0 deletions port/port_stdcxx.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ namespace port {

class CondVar;

static const int kLogBlockSize = 32768;

// Thinly wraps std::mutex.
class LOCKABLE Mutex {
public:
Expand Down
Loading