Block的種類很多,包括Data Block、Meta Block等,每個Block由三部分組成,如下圖所示:
block data是具體的KV對存儲區(qū)域。雖然Block有好幾種,但是block data都是有序的KV對,因此寫入、讀取block data的接口都是統(tǒng)一的。
2.type
type指明使用的是哪種壓縮方式,當前支持none和snappy壓縮。
3.crc32
數(shù)據(jù)校驗位
LevelDB對block data的管理是讀寫分離的,讀取后的遍歷查詢操作由Block類實現(xiàn),block data的構(gòu)建則由BlockBuilder類實現(xiàn)。
二.block data的結(jié)構(gòu)
假設(shè)每一個KV對是一條記錄(Record),則記錄的格式如下。
——共享前綴長度 ? ? ? ? ? ? ? ? ? ? shared_bytes: ? ? ? varint32
——前綴之后的字符串長度 ? ? ? ?unshared_bytes: ? varint32
——值的長度 ? ? ? ? ? ? ? ? ? ? ? ? ? value_length: ? ? ? ?varint32
——前綴之后的字符串 ? ? ? ? ? ? ?key_delta: ? ? ? ? ? ? char[unshared_bytes]
——值 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?value: ? ? ? ? ? ? ? ? ? ?char[value_length]
對于重啟點而言,shared_bytes = 0,重啟點存儲完整的key。
block data的結(jié)尾段格式是:
——restarts: ? ? ?uint32[num_restarts]
——num_restarts: ?uint32?
尾段存儲的是重啟點相關(guān)信息,包括重啟點的位置和個數(shù)。元素restarts[i]存儲的是block data第i個重啟點距離block data首地址的偏移。很明顯第一條記錄,總是第一個重啟點,也就是restarts[0] = 0。num_restarts是重啟點的個數(shù)。
block data的結(jié)構(gòu)圖如下所示:
總體來看block data可分為KV存儲區(qū)和重啟點信息存儲區(qū)兩部分。
三.block data的構(gòu)建
block data的構(gòu)建是通過BlockBuilder實現(xiàn)的,BlockBuilder類的頭文件如下所示。
class BlockBuilder {
public:
explicit BlockBuilder(const Options* options);
// 重置BlockBuilder
void Reset();
// Add的調(diào)用應該在Reset之后,在Finish之前。
// Add只添加KV對(一條記錄),重啟點信息部分由Finish添加。
// 每次調(diào)用Add時,key應該越來越大。
void Add(const Slice& key, const Slice& value);
// 組建block data完成,返回block data
Slice Finish();
// 估算當前block data的大小
size_t CurrentSizeEstimate() const;
// 是否已經(jīng)開始組建block data
bool empty() const {
return buffer_.empty();
}
private:
const Options* options_;
std::string buffer_; // 用于存放block data
std::vector restarts_; // 用于存放重啟點的位置信息
int counter_; // 從上個重啟點遍歷到下個重啟點時的計數(shù)
bool finished_; // 是否調(diào)用了Finish
std::string last_key_; // 記錄最后Add的key
// No copying allowed
BlockBuilder(const BlockBuilder&);
void operator=(const BlockBuilder&);
};
下面重點介紹BlockBuilder類中的方法。1.構(gòu)造函數(shù)
BlockBuilder::BlockBuilder(const Options* options)
: options_(options),
restarts_(),
counter_(0),
finished_(false) {
assert(options->block_restart_interval >= 1);
restarts_.push_back(0);
}
options->block_restart_interval表示當前重啟點(其實也是一條記錄)和上個重啟點之間間隔了多少條記錄。restarts_.push_back(0),表示第一個重啟點距離block data起始位置的偏移為0,也就是說第一條記錄就是重啟點。
2.Add函數(shù)
void BlockBuilder::Add(const Slice& key, const Slice& value) {
Slice last_key_piece(last_key_);
assert(!finished_);
assert(counter_ <= options_->block_restart_interval);
assert(buffer_.empty() // No values yet?
|| options_->comparator->Compare(key, last_key_piece) > 0);
size_t shared = 0;
if (counter_ < options_->block_restart_interval) {
// See how much sharing to do with previous string
const size_t min_length = std::min(last_key_piece.size(), key.size());
while ((shared < min_length) && (last_key_piece[shared] == key[shared])) {
shared++;
}
} else {
// 如果counter_=options_->block_restart_interval,說明這條記錄就是重啟點。
// 將這條記錄距離block data首地址的偏移添加到restarts_中,并使counter_ = 0,
restarts_.push_back(buffer_.size());
counter_ = 0;
}
const size_t non_shared = key.size() - shared;
// 開始組建一條記錄
PutVarint32(&buffer_, shared);
PutVarint32(&buffer_, non_shared);
PutVarint32(&buffer_, value.size());
// Add string delta to buffer_ followed by value
buffer_.append(key.data() + shared, non_shared);
buffer_.append(value.data(), value.size());
// 此時last_key_還等于上一個key。resize用于取出last_key_與當前key中相同的前綴
last_key_.resize(shared);
// last_key_添加當前key中與上一個key的不同部分,此時last_key_與當前key是相等的。
last_key_.append(key.data() + shared, non_shared);
// 上面兩句其實等效于last_key_=key.ToString(),但是像上面那樣寫可以使內(nèi)存copy最小化
assert(Slice(last_key_) == key);
counter_++;
}
在求相同前綴的長度時,為何要調(diào)用std::min來計算上一個key(last_key_piece)和當前key的長度呢?因為當前key雖然比上一個key大(通過Compare得出),但是不一定就比上一個key長。比如mouse和morning,由于u大于r,mouse是大于moring的。關(guān)于comparator可以參考:LevelDB源碼分析之二:comparator需要注意的是,為了節(jié)約存儲空間,每條記錄的前三個字段是被壓縮存儲的(通過PutVarint32實現(xiàn)),關(guān)于壓縮,詳見:LevelDB源碼分析之一:coding
3.Finish函數(shù)
Slice BlockBuilder::Finish() {
// 添加重啟點信息部分
for (size_t i = 0; i < restarts_.size(); i++) {
PutFixed32(&buffer_, restarts_[i]);
}
PutFixed32(&buffer_, restarts_.size());
finished_ = true;
return Slice(buffer_);
}
Finish只是在記錄存儲區(qū)后邊添加了重啟點信息,重啟點信息沒有進行壓縮,關(guān)于PutFixed32也可以參考:LevelDB源碼分析之一:coding假設(shè)添加的5個KV對分別是("the bus","1"),("the car","11"),("the color","111"),("the mouse","1111"),("the tree","11111"),那么當options_->block_restart_interval=3時,block data的示意圖如下所示。
四.block data的解析
block data的解析是通過Block類實現(xiàn)的。
inline uint32_t Block::NumRestarts() const {
// size_為何要大于等于2*sizeof(uint32_t),因為如果只調(diào)用BlockBuilder中
// 的Finish函數(shù),那么block data至少包含一個uint32_t類型的重啟點位置信息和
// 一個uint32_t類型的重啟點個數(shù)信息
assert(size_ >= 2*sizeof(uint32_t));
// block data的最后一個uint32_t類型字段表示重啟點個數(shù)。
return DecodeFixed32(data_ + size_ - sizeof(uint32_t));
}
Block::Block(const char* data, size_t size)
: data_(data),
size_(size) {
if (size_ < sizeof(uint32_t)) {
size_ = 0; // 出錯時,使size_=0
} else {
// 總大小減去重啟點信息的大小,重啟點信息包括重啟點位置數(shù)組和重啟點個數(shù),
// 他們都是uint32_t類型的
restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32_t);
// 這里的條件判斷是防止NumRestarts()返回值為負數(shù)
if (restart_offset_ > size_ - sizeof(uint32_t)) {
size_ = 0;
}
}
}
Block::~Block() {
delete[] data_;
}
// Helper routine: decode the next block entry starting at "p",
// storing the number of shared key bytes, non_shared key bytes,
// and the length of the value in "*shared", "*non_shared", and
// "*value_length", respectively. Will not derefence past "limit".
//
// 解析從block data的p位置開始的數(shù)據(jù),將解析得到的shared、non_shared和value length
// 分別放到"*shared"、"*non_shared"和"*value_length"
// 從源碼看出,p應該是一條記錄的起始位置
// 如果解析錯誤,返回NULL。否則,返回指向一條記錄的key_delta字段的指針
static inline const char* DecodeEntry(const char* p, const char* limit,
uint32_t* shared,
uint32_t* non_shared,
uint32_t* value_length) {
if (limit - p < 3) return NULL;
*shared = reinterpret_cast(p)[0];
*non_shared = reinterpret_cast(p)[1];
*value_length = reinterpret_cast(p)[2];
// 如果最高位都是0, 那么按照壓縮規(guī)則,每個值只占一個字節(jié),且小于128
// 這里相當于做了一個優(yōu)化,如果三個值之和都小于128,那肯定是每個值只占一個字節(jié)
if ((*shared | *non_shared | *value_length) < 128) {
p += 3;
} else {
if ((p = GetVarint32Ptr(p, limit, shared)) == NULL) return NULL;
if ((p = GetVarint32Ptr(p, limit, non_shared)) == NULL) return NULL;
if ((p = GetVarint32Ptr(p, limit, value_length)) == NULL) return NULL;
}
// 如果limit中剩下的空間小于*non_shared、*value_length之和,說明limit中
// 已經(jīng)容納不下記錄中的key_delta和value字段了。
if (static_cast(limit - p) < (*non_shared + *value_length)) {
return NULL;
}
return p;
}
class Block::Iter : public Iterator {
private:
const Comparator* const comparator_;
const char* const data_; // block data
uint32_t const restarts_; // 重啟點信息在block data中的偏移
uint32_t const num_restarts_; // 重啟點個數(shù)
// current_是當前記錄在bock data中的偏移,如果current_>=restarts_,說明出錯啦。
uint32_t current_;
uint32_t restart_index_; // 重啟點的索引
std::string key_;
Slice value_;
Status status_;
inline int Compare(const Slice& a, const Slice& b) const {
return comparator_->Compare(a, b);
}
// 因為value_是一條記錄的最后一個字段,所以這里返回的是下一條記錄的偏移量,也就是current_
// 但是如果在該函數(shù)之前調(diào)用了SeekToRestartPoint,此時的value_.data()=data_,value.size=0
// 這樣的話即使是block data的第一條記錄,也可以用使用該函數(shù),此時返回的偏移量為0
inline uint32_t NextEntryOffset() const {
return (value_.data() + value_.size()) - data_;
}
// 獲取第index個重啟點的偏移
uint32_t GetRestartPoint(uint32_t index) {
assert(index < num_restarts_);
return DecodeFixed32(data_ + restarts_ + index * sizeof(uint32_t));
}
// 該函數(shù)只是設(shè)置了幾個有限的狀態(tài),其它值將在函數(shù)ParseNextKey()中設(shè)置。
// 需要注意的是,這里的value_并不是記錄中的value字段,而只是一個指向記錄起始位置的0長度指針,
// 這樣后面的ParseNextKey函數(shù)將會解析出重啟點的value字段,并賦值到value_中。
void SeekToRestartPoint(uint32_t index) {
key_.clear();
restart_index_ = index;
// current_ will be fixed by ParseNextKey();
// current_的值會在ParseNextKey()方法中不斷被修改
// ParseNextKey() starts at the end of value_, so set value_ accordingly
uint32_t offset = GetRestartPoint(index);
value_ = Slice(data_ + offset, 0);
}
public:
Iter(const Comparator* comparator,
const char* data,
uint32_t restarts,
uint32_t num_restarts)
: comparator_(comparator),
data_(data),
restarts_(restarts),
num_restarts_(num_restarts),
current_(restarts_),
restart_index_(num_restarts_) {
assert(num_restarts_ > 0);
}
virtual bool Valid() const { return current_ < restarts_; }
virtual Status status() const { return status_; }
virtual Slice key() const {
assert(Valid());
return key_;
}
virtual Slice value() const {
assert(Valid());
return value_;
}
// 向后解析比較簡單,就是解析當前記錄的下一個記錄
virtual void Next() {
assert(Valid());
ParseNextKey();
}
// 向前解析復雜一些,步驟如下
// 1.先向前查找當前記錄之前的重啟點
// 2.當循環(huán)到了第一個重啟點,其偏移量(0)依然與當前記錄的偏移量相等
// 說明當前記錄就是第一條記錄,此時初始化current_和restart_index_,并返回
// 3.調(diào)用SeekToRestartPoint定位到符合要求的啟動點
// 4.向后循環(huán)解析,直到解析了原記錄之前的一條記錄,結(jié)束
virtual void Prev() {
assert(Valid());
const uint32_t original = current_;
while (GetRestartPoint(restart_index_) >= original) {
if (restart_index_ == 0) {
// No more entries
current_ = restarts_;
restart_index_ = num_restarts_;
return;
}
restart_index_--;
}
SeekToRestartPoint(restart_index_);
do {
// Loop until end of current entry hits the start of original entry
} while (ParseNextKey() && NextEntryOffset() < original);
}
// 從左到右(從前到后)查找第一條key大于target的記錄
// 1.二分查找,找到key < target的最后一個重啟點
// 2.定位到該重啟點,其索引由left指定,這是前面二分查找到的結(jié)果。如前面所分析的,
// value_指向重啟點的地址,而size_指定為0,這樣ParseNextKey函數(shù)將會解析出重啟點key和value。
// 3.自重啟點線性向下查找,直到遇到key>=target的記錄或者直到最后一條記錄,也不滿足key>=target,返回
virtual void Seek(const Slice& target) {
// Binary search in restart array to find the first restart point
// with a key >= target
uint32_t left = 0;
uint32_t right = num_restarts_ - 1;
while (left < right) {
uint32_t mid = (left + right + 1) / 2;
uint32_t region_offset = GetRestartPoint(mid);
uint32_t shared, non_shared, value_length;
const char* key_ptr = DecodeEntry(data_ + region_offset,
data_ + restarts_,
&shared, &non_shared, &value_length);
// 需要注意的是重啟點保存的key是完整的,所以它的shared字段等于0
if (key_ptr == NULL || (shared != 0)) {
CorruptionError();
return;
}
Slice mid_key(key_ptr, non_shared);
if (Compare(mid_key, target) < 0) {
// Key at "mid" is smaller than "target". Therefore all
// blocks before "mid" are uninteresting.
left = mid;
} else {
// Key at "mid" is >= "target". Therefore all blocks at or
// after "mid" are uninteresting.
right = mid - 1;
}
}
// Linear search (within restart block) for first key >= target
SeekToRestartPoint(left);
while (true) {
if (!ParseNextKey()) {
return;
}
if (Compare(key_, target) >= 0) {
return;
}
}
}
// 解析block data的第一條記錄
virtual void SeekToFirst() {
//先定位到第一個重啟點
SeekToRestartPoint(0);
ParseNextKey();
}
// 解析block data的最后一條記錄
virtual void SeekToLast() {
// 先定位到最后一個重啟點
SeekToRestartPoint(num_restarts_ - 1);
while (ParseNextKey() && NextEntryOffset() < restarts_) {
// Keep skipping
}
}
private:
void CorruptionError() {
current_ = restarts_;
restart_index_ = num_restarts_;
status_ = Status::Corruption("bad entry in block");
key_.clear();
value_.clear();
}
bool ParseNextKey() {
current_ = NextEntryOffset();
const char* p = data_ + current_;// 指向當前記錄
const char* limit = data_ + restarts_; // limit限制了記錄存儲區(qū)的范圍
if (p >= limit) {
// 如果出錯,恢復到默認值,并返回false
current_ = restarts_;
restart_index_ = num_restarts_;
return false;
}
// Decode next entry
uint32_t shared, non_shared, value_length;
p = DecodeEntry(p, limit, &shared, &non_shared, &value_length);
if (p == NULL || key_.size() < shared) {
CorruptionError();
return false;
} else {
// 解析出記錄中的key和value
key_.resize(shared);
key_.append(p, non_shared);
value_ = Slice(p + non_shared, value_length);
// 如果你當前記錄的偏移已經(jīng)比下一個重啟點的偏移還有大了
// 那么關(guān)鍵點索引restart_index_加1,且后面記錄的解析都是
// 以這個重啟點為參照的。
// 因為restart_index_=0的重啟點就是block data的第一條記錄
// 所以下一個重啟點的索引是restart_index_ + 1
while (restart_index_ + 1 < num_restarts_ &&
GetRestartPoint(restart_index_ + 1) < current_) {
++restart_index_;
}
return true;
}
}
};
Iterator* Block::NewIterator(const Comparator* cmp) {
if (size_ < 2*sizeof(uint32_t)) {
return NewErrorIterator(Status::Corruption("bad block contents"));
}
const uint32_t num_restarts = NumRestarts();
if (num_restarts == 0) {
return NewEmptyIterator();
} else {
return new Iter(cmp, data_, restart_offset_, num_restarts);
}
}
初始化迭代器時,為什么是把current設(shè)置為restarts,把restart_index_設(shè)置為num_restarts_?
創(chuàng)建一個Block::Itr之后,它是處于invalid狀態(tài)的,即不能Prev也不能Next;只能先Seek/SeekToxxx之后,才能調(diào)用next/prev。想想和std的iterator行為很像吧,比如你聲明一個vector::iterator,必須先賦值才能使用。
一個問題,既然通過Comparator可以極大的節(jié)省key的存儲空間,那為什么又要使用重啟點機制來額外占用一下空間呢?這是因為如果最開頭的記錄數(shù)據(jù)損壞,其后的所有記錄都將無法恢復。為了降低這個風險,leveldb引入了重啟點,每隔固定條數(shù)記錄會強制加入一個重啟點,這個位置的Entry會完整的記錄自己的Key。
參考鏈接:http://blog.csdn.net/sparkliang/article/details/8635821