LevelDB源碼分析之十三:table
一.Table的邏輯結(jié)構(gòu)
Table也叫SSTable(Sorted String Table),是數(shù)據(jù)在.sst文件中的存儲(chǔ)形式。Table的邏輯結(jié)構(gòu)如下所示,包括存儲(chǔ)數(shù)據(jù)的Block,存儲(chǔ)索引信息的Block,存儲(chǔ)Filter的Block:
Footer:為于Table尾部,記錄指向Metaindex Block的Handle和指向Index Block的Handle。需要說(shuō)明的是Table中所有的Handle是通過(guò)偏移量Offset以及Size一同來(lái)表示的,用來(lái)指明所指向的Block位置。Footer是SST文件解析開始的地方,通過(guò)Footer中記錄的這兩個(gè)關(guān)鍵元信息Block的位置,可以方便的開啟之后的解析工作。另外Footer種還記錄了用于驗(yàn)證文件是否為合法SST文件的常數(shù)值Magicnum。
Index Block:記錄Data Block位置信息的Block,其中的每一條Entry指向一個(gè)Data Block,其Key值為所指向的Data Block最后一條數(shù)據(jù)的Key,Value為指向該Data Block位置的Handle。
Metaindex Block:與Index Block類似,由一組Handle組成,不同的是這里的Handle指向的Meta Block。
Data Block:以Key-Value的方式存儲(chǔ)實(shí)際數(shù)據(jù),其中Key定義為:
DataBlock Key := UserKey + SequenceNum + Type
Type := kDelete or kValue
對(duì)比Memtable中的Key,可以發(fā)現(xiàn)Data Block中的Key并沒有拼接UserKey的長(zhǎng)度在UserKey前,這是由于上面講到的物理結(jié)構(gòu)中已經(jīng)有了Key的長(zhǎng)度信息。Meta Block:比較特殊的Block,用來(lái)存儲(chǔ)元信息,目前LevelDB使用的僅有對(duì)布隆過(guò)濾器的存儲(chǔ)。寫入Data Block的數(shù)據(jù)會(huì)同時(shí)更新對(duì)應(yīng)Meta Block中的過(guò)濾器。讀取數(shù)據(jù)時(shí)也會(huì)首先經(jīng)過(guò)布隆過(guò)濾器(Bloom Filter)過(guò)濾,我看的源碼還未用到Bloom Filter,可參考:BloomFilter——大規(guī)模數(shù)據(jù)處理利器。Meta Block的物理結(jié)構(gòu)也與其他Block有所不同:
[filter 0]
[filter 1]
[filter 2]
...
[filter N-1]
[offset of filter 0] : 4 bytes
[offset of filter 1] : 4 bytes
[offset of filter 2] : 4 bytes
...
[offset of filter N-1] : 4 bytes
[offset of beginning of offset array] : 4 bytes
lg(base) : 1 byte
其中每個(gè)filter節(jié)對(duì)應(yīng)一段Key Range,落在某個(gè)Key Range的Key需要到對(duì)應(yīng)的filter節(jié)中查找自己的過(guò)濾信息,base指定這個(gè)Range的大小。關(guān)于Block的結(jié)構(gòu)詳見:LevelDB源碼分析之十二:block
與Block類似,Table的管理也是讀寫分離的,讀取后的遍歷查詢操作由table類實(shí)現(xiàn),構(gòu)建則由TableBuilder類實(shí)現(xiàn)。
二.Table的構(gòu)建
leveldb通過(guò)TableBuilder類來(lái)構(gòu)建每一個(gè).sst文件,TableBuilder類的成員變量只有一個(gè)結(jié)構(gòu)體Rep* rep_,Rep的結(jié)構(gòu)為:
struct TableBuilder::Rep {
Options options;
Options index_block_options;
WritableFile* file;//要生成的.sst文件
uint64_t offset;//累加每個(gè)Data Block的偏移量
Status status;
BlockBuilder data_block;//存儲(chǔ)KV對(duì)的數(shù)據(jù)塊
BlockBuilder index_block;//數(shù)據(jù)塊對(duì)應(yīng)的索引塊
std::string last_key;//上一個(gè)插入的key值,新插入的key必須比它大,保證.sst文件中的key是從小到大排列的
int64_t num_entries;//.sst文件中存儲(chǔ)的所有記錄總數(shù)。關(guān)于記錄可以參考LevelDB源碼分析之十二:block
bool closed;// 調(diào)用Finish()或Abandon()時(shí),closed=true,表示Table構(gòu)建結(jié)束。
bool pending_index_entry;//當(dāng)一個(gè)Data Block被寫入到.sst文件時(shí),為true
BlockHandle pending_handle; //BlockHandle只有offset_和size_兩個(gè)變量,用來(lái)記錄每個(gè)Data Block在.sst文件中的偏移量和大小
std::string compressed_output;//Data Block的block_data字段壓縮后的結(jié)果
Rep(const Options& opt, WritableFile* f)
: options(opt),
index_block_options(opt),
file(f),
offset(0),
data_block(&options),
index_block(&index_block_options),
num_entries(0),
closed(false),
pending_index_entry(false) {
index_block_options.block_restart_interval = 1;//Index Block的block_data字段中重啟點(diǎn)的間隔
}
};
可以看到Rep中不僅接管了各種Block的生成細(xì)節(jié),而且還會(huì)記錄生成Block需要的一些統(tǒng)計(jì)信息。因此我們可以認(rèn)為,TableBuilder只不過(guò)是對(duì)Block的一層淺封裝,真正做事情的是Rep。而TableBuilder中的Add函數(shù)本質(zhì)上不過(guò)是對(duì)Rep中BlockBuilder的Add函數(shù)的調(diào)用。
1.Add函數(shù)
通過(guò)Add函數(shù)向.sst文件中寫入一個(gè)Data Block。
void TableBuilder::Add(const Slice& key, const Slice& value) {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->num_entries > 0) {
assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
}
// 當(dāng)一個(gè)Data Block被寫入到磁盤時(shí),為true
if (r->pending_index_entry) {
// 說(shuō)明到了新的一個(gè)Data Block
assert(r->data_block.empty());
// 考慮這兩個(gè)key"the quick brown fox"和"the who", 進(jìn)FindShortestSeparator
// 處理后,r->last_key=the r。這樣的話r->last_key就大于上一個(gè)Data Block的
// 所有key,并且小于后面所有Data Block的key。
r->options.comparator->FindShortestSeparator(&r->last_key, key);
// 將上一個(gè)Data Block的偏移和大小編碼后作為Value存放到index_block中
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
r->last_key.assign(key.data(), key.size());
r->num_entries++;
r->data_block.Add(key, value);
const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
// 如果Data Block的block_data字段大小滿足要求,準(zhǔn)備寫入到磁盤
if (estimated_block_size >= r->options.block_size) {
Flush();
}
}
2.Flush函數(shù)
當(dāng)一個(gè)Data Block大小超過(guò)設(shè)定值(默認(rèn)為4K)時(shí),執(zhí)行Flush()操作。
void TableBuilder::Flush() {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->data_block.empty()) return;
assert(!r->pending_index_entry);
WriteBlock(&r->data_block, &r->pending_handle);
if (ok()) {
r->pending_index_entry = true;
// 將Data Block實(shí)時(shí)寫入到磁盤,防止緩存中的file過(guò)大
r->status = r->file->Flush();
}
}
Flush函數(shù)先調(diào)用WriteBlock向文件添加數(shù)據(jù),然后執(zhí)行file的Flush()函數(shù)將文件寫入磁盤。3.WriteBlock
void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
// File format contains a sequence of blocks where each block has:
// block_data: uint8[n]
// type: uint8
// crc: uint32
assert(ok());
Rep* r = rep_;
// 返回完整的block_data字段
Slice raw = block->Finish();
Slice block_contents;
CompressionType type = r->options.compression;
// TODO(postrelease): Support more compression options: zlib?
switch (type) {
case kNoCompression:
block_contents = raw;
break;
// 采用Snappy壓縮,Snappy是谷歌開源的壓縮庫(kù)
case kSnappyCompression: {
std::string* compressed = &r->compressed_output;
if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
compressed->size() < raw.size() - (raw.size() / 8u)) {
block_contents = *compressed;
} else {
// Snappy not supported, or compressed less than 12.5%, so just
// store uncompressed form
// 如果不支持Snappy壓縮,或者壓縮比小于12.5%,那就使用原始數(shù)據(jù)
block_contents = raw;
type = kNoCompression;
}
break;
}
}
// 設(shè)置Data Block的偏移和該Data Block的block_data字段的大小
// 第一個(gè)Data Block的偏移為0
handle->set_offset(r->offset);
handle->set_size(block_contents.size());
r->status = r->file->Append(block_contents);
if (r->status.ok()) {
char trailer[kBlockTrailerSize];
trailer[0] = type;
// 為block_contents添加校驗(yàn)
uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
// 為type也添加校驗(yàn)
crc = crc32c::Extend(crc, trailer, 1);
// 將校驗(yàn)碼拷貝到trailer的后四個(gè)字節(jié)
EncodeFixed32(trailer+1, crc32c::Mask(crc));
// 向文件尾部添加壓縮類型和校驗(yàn)碼,這樣一個(gè)完整的Block Data誕生
r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
if (r->status.ok()) {
// 偏移應(yīng)該包括壓縮類型和校驗(yàn)碼的大小
r->offset += block_contents.size() + kBlockTrailerSize;
}
}
r->compressed_output.clear();
// 重置block
block->Reset();
}
WriteBlock函數(shù)實(shí)際上就是把block_data進(jìn)行Snappy壓縮(如果支持),然后包裝成完整的Block Data,并記錄一些統(tǒng)計(jì)信息。我用的Windows版LevelDB,代碼比較老,默認(rèn)不支持Snappy壓縮。4.Finish函數(shù)
Status TableBuilder::Finish() {
Rep* r = rep_;
// 為何要調(diào)用一次Flush,是因?yàn)檎{(diào)用Finish的時(shí)候,
// block_data不一定大于等于block_size,所以要調(diào)用Flush
// 將這部分block_data寫入到磁盤
Flush();
assert(!r->closed);
r->closed = true;
BlockHandle metaindex_block_handle;
BlockHandle index_block_handle;
if (ok()) {
// 我看的源碼不支持Meta Block,這里的meta_index_block也沒有實(shí)際作用
BlockBuilder meta_index_block(&r->options);
// metaindex_block_handle記錄了Meta Index Block的偏移和大小
WriteBlock(&meta_index_block, &metaindex_block_handle);
}
if (ok()) {
// 最后一個(gè)Data_Block,無(wú)法進(jìn)行r->last_key和key的比較,
// 所以只能調(diào)用FindShortSuccessor,直接取一個(gè)比r->last_key大的key
if (r->pending_index_entry) {
r->options.comparator->FindShortSuccessor(&r->last_key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
// index_block_handle記錄了Index Block的偏移和大小
WriteBlock(&r->index_block, &index_block_handle);
}
if (ok()) {
// 組建Footer,并添加到文件結(jié)尾
Footer footer;
footer.set_metaindex_handle(metaindex_block_handle);
footer.set_index_handle(index_block_handle);
std::string footer_encoding;
footer.EncodeTo(&footer_encoding);
r->status = r->file->Append(footer_encoding);
if (r->status.ok()) {
r->offset += footer_encoding.size();
}
}
return r->status;
}
分析Footer類的源碼可知,F(xiàn)ooter更詳細(xì)的結(jié)構(gòu)如下:
metaindex_handle: char[p]; // Block handle for metaindex
index_handle: char[q]; // Block handle for index
padding: char[40-p-q]; // 0 bytes to make fixed length
// (40==2*BlockHandle::kMaxEncodedLength)
magic: fixed64; // == 0xdb4775248b80fb57
注意,只在Finish的時(shí)候才調(diào)用WriteBlock給Index Block添加了type和crc,但是對(duì)于Data Block,每次寫入到磁盤都會(huì)調(diào)用一次WriteBlock。
Mate Index Block、Index Block和Footer應(yīng)該是在WritableFile析構(gòu)時(shí)被寫入到磁盤的,WritableFile析構(gòu)時(shí)會(huì)調(diào)用其Flush函數(shù)。
關(guān)于WritableFile,詳見:LevelDB源碼分析之九:env
三.Table的解析
leveldb通過(guò)Table類來(lái)解析每一個(gè).sst文件,Table類的成員變量也只有一個(gè)結(jié)構(gòu)體Rep* rep_,Rep的結(jié)構(gòu)為:
struct Table::Rep {
~Rep() {
delete index_block;
}
Options options;
Status status;
RandomAccessFile* file;
uint64_t cache_id;//block cache的ID,用于組建block cache結(jié)點(diǎn)的key
BlockHandle metaindex_handle; //用于存儲(chǔ)從footer中解析出的metaindex_handle
Block* index_block;
};
1.Open函數(shù)
Open函數(shù)比較簡(jiǎn)單,就是打開一個(gè)本地.sst文件,然后從文件尾部讀取footer,根據(jù)footer中的index_handle,調(diào)用ReadBock函數(shù)讀取index_block。接著對(duì)結(jié)構(gòu)體Rep賦值,并將其當(dāng)做參數(shù)傳給Table的構(gòu)造函數(shù)。
Status Table::Open(const Options& options,
RandomAccessFile* file,
uint64_t size,
Table** table) {
*table = NULL;
if (size < Footer::kEncodedLength) {
return Status::InvalidArgument("file is too short to be an sstable");
}
char footer_space[Footer::kEncodedLength];
Slice footer_input;
Status s = file->Read(size - Footer::kEncodedLength, Footer::kEncodedLength,
&footer_input, footer_space);
if (!s.ok()) return s;
Footer footer;
s = footer.DecodeFrom(&footer_input);
if (!s.ok()) return s;
// Read the index block
Block* index_block = NULL;
if (s.ok()) {
s = ReadBlock(file, ReadOptions(), footer.index_handle(), &index_block);
}
if (s.ok()) {
// We've successfully read the footer and the index block: we're
// ready to serve requests.
Rep* rep = new Table::Rep;
rep->options = options;
rep->file = file;
rep->metaindex_handle = footer.metaindex_handle();
rep->index_block = index_block;
rep->cache_id = (options.block_cache ? options.block_cache->NewId() : 0);
*table = new Table(rep);
} else {
if (index_block) delete index_block;
}
return s;
}
2.ReadBlock函數(shù)
// 根據(jù)BlockHandle從file中讀取block_data,放在*block中
Status ReadBlock(RandomAccessFile* file,
const ReadOptions& options,
const BlockHandle& handle,
Block** block) {
*block = NULL;
// n是block_data的大小
size_t n = static_cast(handle.size());
// n + kBlockTrailerSize就是block_data+type+crc的大小
char* buf = new char[n + kBlockTrailerSize];
Slice contents;
// 根據(jù)Block的偏移讀取指定內(nèi)容
Status s = file->Read(handle.offset(), n + kBlockTrailerSize, &contents, buf);
if (!s.ok()) {
delete[] buf;
return s;
}
if (contents.size() != n + kBlockTrailerSize) {
delete[] buf;
return Status::Corruption("truncated block read");
}
// 從contents解析出crc并校驗(yàn),可以通過(guò)options.verify_checksums配置不校驗(yàn)
const char* data = contents.data();
if (options.verify_checksums) {
const uint32_t crc = crc32c::Unmask(DecodeFixed32(data + n + 1));
const uint32_t actual = crc32c::Value(data, n + 1);
if (actual != crc) {
delete[] buf;
s = Status::Corruption("block checksum mismatch");
return s;
}
}
// data[n]實(shí)際上就是type字段
switch (data[n]) {
case kNoCompression:
//
if (data != buf) {
// File implementation gave us pointer to some other data.
// Copy into buf[].
memcpy(buf, data, n + kBlockTrailerSize);
}
// Ok
break;
// 我用的源碼比較老,不涉及Snappy壓縮
case kSnappyCompression: {
size_t ulength = 0;
if (!port::Snappy_GetUncompressedLength(data, n, &ulength)) {
delete[] buf;
return Status::Corruption("corrupted compressed block contents");
}
char* ubuf = new char[ulength];
if (!port::Snappy_Uncompress(data, n, ubuf)) {
delete[] buf;
delete[] ubuf;
return Status::Corruption("corrupted compressed block contents");
}
delete[] buf;
buf = ubuf;
n = ulength;
break;
}
default:
delete[] buf;
return Status::Corruption("bad block type");
}
*block = new Block(buf, n); // Block takes ownership of buf[]
return Status::OK();
}
3.NewIterator函數(shù)
NewIterator用于創(chuàng)建Table的迭代器,此迭代器是一個(gè)雙層迭代器,詳見:LevelDB源碼分析之十四:TwoLevelIterator
Iterator* Table::NewIterator(const ReadOptions& options) const {
return NewTwoLevelIterator(
rep_->index_block->NewIterator(rep_->options.comparator),
&Table::BlockReader, const_cast(this), options);
}
傳入的參數(shù)包括Index Block中block_data字段的迭代器和函數(shù)BlockReader的指針,該函數(shù)用于創(chuàng)建Data Block中block_data字段的迭代器。
4.BlockReader
// arg:Table的指針
// index_value:Data Block中block_data字段的迭代器的value值,
// 也就是Data Block的偏移和該Data Block的block_data字段大小
// 編碼后的結(jié)果
// return:Data Block中block_data字段的迭代器
Iterator* Table::BlockReader(void* arg,
const ReadOptions& options,
const Slice& index_value) {
Table* table = reinterpret_cast(arg);
Cache* block_cache = table->rep_->options.block_cache;
Block* block = NULL;
Cache::Handle* cache_handle = NULL;
BlockHandle handle;
Slice input = index_value;
// 解碼得到Data Block的偏移和該Data Block的block_data字段大小
Status s = handle.DecodeFrom(&input);
if (s.ok()) {
// 1.如果Block Cache不為NULL,先去Block Cache中查找結(jié)點(diǎn),
// 如果沒找到,再去文件中讀取Data Block的block_data字段,
// 并將該block_data插入到Block Cache
// 2.如果Block Cache為NULL,直接去文件里讀
if (block_cache != NULL) {
// 組建key
char cache_key_buffer[16];
EncodeFixed64(cache_key_buffer, table->rep_->cache_id);
EncodeFixed64(cache_key_buffer+8, handle.offset());
Slice key(cache_key_buffer, sizeof(cache_key_buffer));
cache_handle = block_cache->Lookup(key);
if (cache_handle != NULL) {
block = reinterpret_cast(block_cache->Value(cache_handle));
} else {
s = ReadBlock(table->rep_->file, options, handle, &block);
if (s.ok() && options.fill_cache) {
cache_handle = block_cache->Insert(
key, block, block->size(), &DeleteCachedBlock);
}
}
} else {
s = ReadBlock(table->rep_->file, options, handle, &block);
}
}
Iterator* iter;
// 如果block讀取成功
if (block != NULL) {
iter = block->NewIterator(table->rep_->options.comparator);
// iter->RegisterCleanup函數(shù)實(shí)現(xiàn)會(huì)有點(diǎn)繞,被它注冊(cè)的函數(shù)會(huì)在iter析構(gòu)時(shí)被調(diào)用
// 如果block_cache為NULL,說(shuō)明block不在緩存中,iter析構(gòu)時(shí)調(diào)用DeleteBlock刪除這個(gè)block。
// 否則調(diào)用ReleaseBlock使block_cache的cache_handle結(jié)點(diǎn)減少一個(gè)引用計(jì)數(shù)
if (cache_handle == NULL) {
iter->RegisterCleanup(&DeleteBlock, block, NULL);
} else {
iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle);
}
} else {
// 否則返回錯(cuò)誤
iter = NewErrorIterator(s);
}
return iter;
}
5.ApproximateOffsetOf
這個(gè)函數(shù)用于估算key值所在記錄的偏移,不準(zhǔn)確。代碼中的注釋是我的個(gè)人理解,也不知對(duì)不對(duì)。只有看到了調(diào)用該函數(shù)的代碼,才能更深入的理解。
// 估算key的偏移,不準(zhǔn)確
uint64_t Table::ApproximateOffsetOf(const Slice& key) const {
Iterator* index_iter =
rep_->index_block->NewIterator(rep_->options.comparator);
index_iter->Seek(key);
uint64_t result;
if (index_iter->Valid()) {
BlockHandle handle;
Slice input = index_iter->value();
Status s = handle.DecodeFrom(&input);
//
if (s.ok()) {
// result比key的真實(shí)偏移小
result = handle.offset();
} else {
// Strange: we can't decode the block handle in the index block.
// We'll just return the offset of the metaindex block, which is
// close to the whole file size for this case.
// result比key的真實(shí)偏移大
result = rep_->metaindex_handle.offset();
}
} else {
// key is past the last key in the file. Approximate the offset
// by returning the offset of the metaindex block (which is
// right near the end of the file).
// result比key的真實(shí)偏移小
result = rep_->metaindex_handle.offset();
}
delete index_iter;
return result;
}
參考鏈接:http://catkang.github.io/2017/01/17/leveldb-data.html
本站聲明: 本文章由作者或相關(guān)機(jī)構(gòu)授權(quán)發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點(diǎn),本站亦不保證或承諾內(nèi)容真實(shí)性等。需要轉(zhuǎn)載請(qǐng)聯(lián)系該專欄作者,如若文章內(nèi)容侵犯您的權(quán)益,請(qǐng)及時(shí)聯(lián)系本站刪除。
換一批
延伸閱讀
倫敦2024年8月29日 /美通社/ -- 英國(guó)汽車技術(shù)公司SODA.Auto推出其旗艦產(chǎn)品SODA V,這是全球首款涵蓋汽車工程師從創(chuàng)意到認(rèn)證的所有需求的工具,可用于創(chuàng)建軟件定義汽車。 SODA V工具的開發(fā)耗時(shí)1.5...
關(guān)鍵字:
汽車
人工智能
智能驅(qū)動(dòng)
BSP
要點(diǎn): 有效應(yīng)對(duì)環(huán)境變化,經(jīng)營(yíng)業(yè)績(jī)穩(wěn)中有升 落實(shí)提質(zhì)增效舉措,毛利潤(rùn)率延續(xù)升勢(shì) 戰(zhàn)略布局成效顯著,戰(zhàn)新業(yè)務(wù)引領(lǐng)增長(zhǎng) 以科技創(chuàng)新為引領(lǐng),提升企業(yè)核心競(jìng)爭(zhēng)力 堅(jiān)持高質(zhì)量發(fā)展策略,塑強(qiáng)核心競(jìng)爭(zhēng)優(yōu)勢(shì)...
關(guān)鍵字:
通信
BSP
電信運(yùn)營(yíng)商
數(shù)字經(jīng)濟(jì)