? ? ? ? 下面我們帶大家看看log文件的具體物理和邏輯布局是怎樣的,LevelDB對于一個log文件,會把它切割成以32K為單位的物理Block,每次讀取的單位以一個Block作為基本讀取單位,下圖展示的log文件由3個Block構成,所以從物理布局來講,一個log文件就是由連續(xù)的32K大小Block構成的。
? ? ? ? 在應用的視野里是看不到這些Block的,應用看到的是一系列的Key/Value對,在LevelDB內部,會將一個Key/Value對看做一條記錄的數據,另外在這個數據前增加一個記錄頭,用來記載一些管理信息,以方便內部處理,下圖顯示了一個記錄在LevelDB內部是如何表示的。
一.log文件的格式
namespace log {
// 記錄的類型
enum RecordType {
// 保留位,用于預分配的文件
kZeroType = 0,
// 整個存儲
kFullType = 1,
// 分段存儲
kFirstType = 2,
kMiddleType = 3,
kLastType = 4
};
static const int kMaxRecordType = kLastType;
// 32K
static const int kBlockSize = 32768;
// Header is checksum (4 bytes), type (1 byte), length (2 bytes).
// chcksum是類型和數據字段的校驗碼,type是記錄類型,length是數據字段的長度。
static const int kHeaderSize = 4 + 1 + 2;
}
二.log文件的寫
Writer類的頭文件很簡單,看下cpp文件
namespace log {
Writer::Writer(WritableFile* dest)
: dest_(dest),
block_offset_(0) {// block_offset_
// 分別校驗所有類型,并把校驗碼存儲到數組type_crc_中
// 放在構造函數里提前計算類型的校驗碼,是為了減少運行中計算時的性能損耗
for (int i = 0; i <= kMaxRecordType; i++) {
// 這里直接將int轉換為char,因為int的值較小,不會造成精度丟失
char t = static_cast(i);
type_crc_[i] = crc32c::Value(&t, 1);
}
}
Writer::~Writer() {
}
Status Writer::AddRecord(const Slice& slice) {
const char* ptr = slice.data();
size_t left = slice.size();
// Fragment the record if necessary and emit it. Note that if slice
// is empty, we still want to iterate once to emit a single
// zero-length record
Status s;
bool begin = true;
do {
const int leftover = kBlockSize - block_offset_;
assert(leftover >= 0);
// 如果當前Block中剩下的容量leftover小于kHeaderSize的大小
// 則將剩下的容量填充空字符,因為leftover小于kHeaderSize
// 所以最多只能填充六個空字符,當leftover大于等于kHeaderSize時,
// Slice會自行截斷
if (leftover < kHeaderSize) {
if (leftover > 0) {
// Fill the trailer (literal below relies on kHeaderSize being 7)
assert(kHeaderSize == 7);
dest_->Append(Slice("x00x00x00x00x00x00", leftover));
}
// 切換到一個新的Block
block_offset_ = 0;
}
// Invariant: we never leave < kHeaderSize bytes in a block.
assert(kBlockSize - block_offset_ - kHeaderSize >= 0);
// 如果當前Block中剩下的容量leftover大于等于kHeaderSize的大小
// 則leftover-kHeaderSize為可用大小,即avail
const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
const size_t fragment_length = (left < avail) ? left : avail;
// 如果新的slice小于avail,則該slice可用整個添加到當前Block中,
// 不需要分段,此時type=kFullType
// 如果slice大于等于avail,則該slice需要分段存儲,如果是第一段
// type = kFirstType,如果是最后一段type = kLastType,否則type = kMiddleType
RecordType type;
const bool end = (left == fragment_length);
if (begin && end) {
type = kFullType;
} else if (begin) {
type = kFirstType;
} else if (end) {
type = kLastType;
} else {
type = kMiddleType;
}
// 將數據組建成指定格式后存儲到磁盤
s = EmitPhysicalRecord(type, ptr, fragment_length);
ptr += fragment_length;
left -= fragment_length;
begin = false;
} while (s.ok() && left > 0);
return s;
}
Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
assert(n <= 0xffff); // 最大為兩個字節(jié)
assert(block_offset_ + kHeaderSize + n <= kBlockSize);
// Format the header
char buf[kHeaderSize];
// 長度的低位放到數組的第五個字節(jié)
// 長度的高位放到數組的第六個字節(jié)
buf[4] = static_cast(n & 0xff);
buf[5] = static_cast(n >> 8);
// 類型放到數組的第七個字節(jié)
buf[6] = static_cast(t);
// Compute the crc of the record type and the payload.
uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n);
crc = crc32c::Mask(crc); // Adjust for storage
// 1.添加校驗碼到header中(包括類型字段和數據字段的校驗)
EncodeFixed32(buf, crc);
// 2.添加header
// Write the header and the payload
Status s = dest_->Append(Slice(buf, kHeaderSize));
if (s.ok()) {
// 3.添加數據
s = dest_->Append(Slice(ptr, n));
if (s.ok()) {
// 寫入到磁盤
s = dest_->Flush();
}
}
// 偏移的自增
block_offset_ += kHeaderSize + n;
return s;
}
}
三.log文件的讀
Reader類的頭文件
namespace log {
class Reader {
public:
// 報告錯誤的接口
class Reporter {
public:
virtual ~Reporter();
// 如果有損壞被檢測到,那么bytes就是由于檢測到的損壞而丟失大概字節(jié)數
virtual void Corruption(size_t bytes, const Status& status) = 0;
};
// Reader的功能時從log文件中讀取記錄
// 如果reporter不是NULL,只要有一些數據由于檢測到的損壞而丟失,就會通知它。
// 如果“校驗和”為真,則驗證校驗和是否可用。
// Reader會從文件內物理位置大于等于initial_offset的第一條記錄開始讀
Reader(SequentialFile* file, Reporter* reporter, bool checksum,
uint64_t initial_offset);
~Reader();
// 讀取下一個記錄到*record中,*scratch用于臨時存儲
bool ReadRecord(Slice* record, std::string* scratch);
// 返回上一條記錄的物理偏移
// 在第一次調用ReadRecord前調用該函數是無定義的。
// 因此要在ReadRecord之后調用該函數。
uint64_t LastRecordOffset();
private:
SequentialFile* const file_;
// 數據損壞報告
Reporter* const reporter_;
// 是否進行數據校驗
bool const checksum_;
// read以Block為單位去從磁盤取數據,取完數據就是存在blocking_store_里面,
// 其實就是讀取數據的buffer
char* const backing_store_;
// 指向blocking_store_的slice對象,方便對blocking_store_的操作
Slice buffer_;
// 是否到了文件尾
bool eof_;
// 上一條記錄的偏移
uint64_t last_record_offset_;
// 當前Block的結束位置的偏移
uint64_t end_of_buffer_offset_;
// 初始Offset,從該偏移出查找第一條記錄
uint64_t const initial_offset_;
// 這些特殊值是記錄類型的擴展
enum {
kEof = kMaxRecordType + 1,
// Returned whenever we find an invalid physical record.
// Currently there are three situations in which this happens:
// * The record has an invalid CRC (ReadPhysicalRecord reports a drop)
// * The record is a 0-length record (No drop is reported)
// * The record is below constructor's initial_offset (No drop is reported)
kBadRecord = kMaxRecordType + 2
};
// 跳過"initial_offset_"之前的所有Block.
bool SkipToInitialBlock();
// 讀取一條記錄中的數據字段,存儲在result中,返回記錄類型或者上面的特殊值之一
unsigned int ReadPhysicalRecord(Slice* result);
// 將損壞的字節(jié)數報告給reporter.
void ReportCorruption(size_t bytes, const char* reason);
void ReportDrop(size_t bytes, const Status& reason);
// No copying allowed
Reader(const Reader&);
void operator=(const Reader&);
};
}
Reader類的源文件
namespace log {
Reader::Reporter::~Reporter() {
}
Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum,
uint64_t initial_offset)
: file_(file),
reporter_(reporter),
checksum_(checksum),
backing_store_(new char[kBlockSize]),
buffer_(),
eof_(false),
last_record_offset_(0),
end_of_buffer_offset_(0),
initial_offset_(initial_offset) {
}
Reader::~Reader() {
delete[] backing_store_;
}
bool Reader::SkipToInitialBlock() {
// 構造時傳入的initial_offset大于等于kBlockSize,則block_start_location
// 是第(initial_offset_ / kBlockSize)+1個Block起始位置的偏移。
// 當initial_offset比kBlockSize小時,則block_start_location是第1個Block
// 起始位置的偏移
size_t offset_in_block = initial_offset_ % kBlockSize;
uint64_t block_start_location = initial_offset_ - offset_in_block;
// offset_in_block > kBlockSize - 6,說明已經到了一個Block的尾部,
// 尾部填充的是6個空字符。此時只能定位到下一個Block的開頭。
if (offset_in_block > kBlockSize - 6) {
offset_in_block = 0;
block_start_location += kBlockSize;
}
end_of_buffer_offset_ = block_start_location;
// 如果block_start_location大于0,則文件中應該跳過block_start_location
// 個字節(jié),到達目標Block的開頭。否則將數據損壞信息打印到LOG文件。
if (block_start_location > 0) {
Status skip_status = file_->Skip(block_start_location);
if (!skip_status.ok()) {
ReportDrop(block_start_location, skip_status);
return false;
}
}
return true;
}
bool Reader::ReadRecord(Slice* record, std::string* scratch) {
if (last_record_offset_ < initial_offset_) {
if (!SkipToInitialBlock()) {
return false;
}
}
scratch->clear();
record->clear();
// 是否是分段的記錄
bool in_fragmented_record = false;
// 當前讀取的記錄的邏輯偏移
uint64_t prospective_record_offset = 0;
Slice fragment;
while (true) {
// buffer_會在ReadPhysicalRecord中自偏移,實際上buffer_中存儲的是當前Block
// 還未解析的記錄,而end_of_buffer_offset_是當前Block的結束位置的偏移
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
const unsigned int record_type = ReadPhysicalRecord(&fragment);
switch (record_type) {
case kFullType:
if (in_fragmented_record) {
// Handle bug in earlier versions of log::Writer where
// it could emit an empty kFirstType record at the tail end
// of a block followed by a kFullType or kFirstType record
// at the beginning of the next block.
if (scratch->empty()) {
in_fragmented_record = false;
} else {
ReportCorruption(scratch->size(), "partial record without end(1)");
}
}
// 當為kFullType時,物理記錄和邏輯記錄1:1的關系,所以offset也是一樣的
prospective_record_offset = physical_record_offset;
scratch->clear();
*record = fragment;
last_record_offset_ = prospective_record_offset;
return true;
case kFirstType:
if (in_fragmented_record) {
// Handle bug in earlier versions of log::Writer where
// it could emit an empty kFirstType record at the tail end
// of a block followed by a kFullType or kFirstType record
// at the beginning of the next block.
if (scratch->empty()) {
in_fragmented_record = false;
} else {
ReportCorruption(scratch->size(), "partial record without end(2)");
}
}
// 因為是第一分段,所以物理記錄的offset,也是邏輯記錄的offset
// 注意第一個分段用的是assign添加到scratch
prospective_record_offset = physical_record_offset;
scratch->assign(fragment.data(), fragment.size());
in_fragmented_record = true;
break;
case kMiddleType:
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(1)");
} else {
scratch->append(fragment.data(), fragment.size());
}
break;
case kLastType:
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(2)");
} else {
scratch->append(fragment.data(), fragment.size());
*record = Slice(*scratch);
// 邏輯記錄結束,更新最近一條邏輯記錄的offset
last_record_offset_ = prospective_record_offset;
return true;
}
break;
case kEof:
if (in_fragmented_record) {
ReportCorruption(scratch->size(), "partial record without end(3)");
scratch->clear();
}
return false;
case kBadRecord:
if (in_fragmented_record) {
ReportCorruption(scratch->size(), "error in middle of record");
in_fragmented_record = false;
scratch->clear();
}
break;
default: {
char buf[40];
snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
ReportCorruption(
(fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
buf);
in_fragmented_record = false;
scratch->clear();
break;
}
}
}
return false;
}
uint64_t Reader::LastRecordOffset() {
return last_record_offset_;
}
void Reader::ReportCorruption(size_t bytes, const char* reason) {
ReportDrop(bytes, Status::Corruption(reason));
}
void Reader::ReportDrop(size_t bytes, const Status& reason) {
if (reporter_ != NULL &&
end_of_buffer_offset_ - buffer_.size() - bytes >= initial_offset_) {
reporter_->Corruption(bytes, reason);
}
}
unsigned int Reader::ReadPhysicalRecord(Slice* result) {
while (true) {
// 兩種情況下該條件成立
// 1.出現在第一次read,因為buffer_在reader的構造函數里是初始化空
// 2.當前buffer_的內容為Block尾部的6個空字符,這時實際上當前Block
// 以及解析完了,準備解析下一個Block
if (buffer_.size() < kHeaderSize) {
if (!eof_) {
// 清空buffer_,存儲下一個Block
buffer_.clear();
// 從文件中每次讀取一個Block,Read內部會做偏移,保證按順序讀取
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
// 當前Block結束位置的偏移
end_of_buffer_offset_ += buffer_.size();
// 讀取失敗,打印LOG信息,并將eof_設置為true,終止log文件的解析
if (!status.ok()) {
buffer_.clear();
ReportDrop(kBlockSize, status);
eof_ = true;
return kEof;
// 如果讀到的數據小于kBlockSize,也說明到了文件結尾,eof_設為true
} else if (buffer_.size() < kBlockSize) {
eof_ = true;
}
// 跳過后面的解析,因為buffer_.size() < kHeaderSize時,buffer是無法解析的
continue;
} else if (buffer_.size() == 0) {
// 如果eof_為false,但是buffer_.size,說明遇到了Bad Record,也應該終止log文件的解析
return kEof;
} else {
// 如果最后一個Block的大小剛好為kBlockSize,且結尾為6個空字符
size_t drop_size = buffer_.size();
buffer_.clear();
ReportCorruption(drop_size, "truncated record at end of file");
return kEof;
}
}
// Parse the header
const char* header = buffer_.data();
const uint32_t a = static_cast(header[4]) & 0xff;
const uint32_t b = static_cast(header[5]) & 0xff;
const unsigned int type = header[6];
const uint32_t length = a | (b << 8);
// 一個Block里放不下一條記錄,顯示是Bad Record
if (kHeaderSize + length > buffer_.size()) {
size_t drop_size = buffer_.size();
buffer_.clear();
ReportCorruption(drop_size, "bad record length");
return kBadRecord;
}
// 長度為0的記錄,顯然也是Bad Record
if (type == kZeroType && length == 0) {
// Skip zero length record without reporting any drops since
// such records are produced by the mmap based writing code in
// env_posix.cc that preallocates file regions.
buffer_.clear();
return kBadRecord;
}
// 如果校驗失敗,也是Bad Record
if (checksum_) {
uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
if (actual_crc != expected_crc) {
// Drop the rest of the buffer since "length" itself may have
// been corrupted and if we trust it, we could find some
// fragment of a real log record that just happens to look
// like a valid log record.
size_t drop_size = buffer_.size();
buffer_.clear();
ReportCorruption(drop_size, "checksum mismatch");
return kBadRecord;
}
}
// buffer_的自偏移
buffer_.remove_prefix(kHeaderSize + length);
// 這樣的記錄也是Bad Record,不解釋了,太明顯
if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
initial_offset_) {
result->clear();
return kBadRecord;
}
// 取出記錄中的數據字段
*result = Slice(header + kHeaderSize, length);
return type;
}
}
}
參考鏈接:http://blog.csdn.net/tankles/article/details/7663873