考慮到移植以及靈活性,LevelDB將系統(tǒng)相關的處理(文件/進程/時間)抽象成Evn,用戶可以自己實現(xiàn)相應的接口,作為option的一部分傳入,默認使用自帶的實現(xiàn)。?
env.h中聲明了:
虛基類env,在env_posix.cc中,派生類PosixEnv繼承自env類,是LevelDB的默認實現(xiàn)。虛基類WritableFile、SequentialFile、RandomAccessFile,分別是文件的寫抽象類,順序讀抽象類和隨機讀抽象類類Logger,log文件的寫入接口,log文件是防止系統(tǒng)異常終止造成數(shù)據(jù)丟失,是memtable在磁盤的備份類FileLock,為文件上鎖WriteStringToFile、ReadFileToString、Log三個全局函數(shù),封裝了上述接口
下面來看看env_posix.cc中為我們寫好的默認實現(xiàn)
順序讀:
class?PosixSequentialFile:?public?SequentialFile?{ ?private: ??std::string?filename_; ??FILE*?file_; ?public: ??PosixSequentialFile(const?std::string&?fname,?FILE*?f) ????:?filename_(fname),?file_(f)?{?} ??virtual?~PosixSequentialFile()?{?fclose(file_);?} ??//?從文件中讀取n個字節(jié)存放到?"scratch[0..n-1]",?然后將"scratch[0..n-1]"轉(zhuǎn)化為Slice類型并存放到*result中 ??//?如果正確讀取,則返回OK?status,否則返回non-OK?status ??virtual?Status?Read(size_t?n,?Slice*?result,?char*?scratch)?{ ??Status?s; #ifdef?BSD ??//?fread_unlocked?doesn't?exist?on?FreeBSD ??size_t?r?=?fread(scratch,?1,?n,?file_); #else ??//?size_t?fread_unlocked(void?*ptr,?size_t?size,?size_t?n,FILE?*stream); ??//?ptr:用于接收數(shù)據(jù)的內(nèi)存地址 ??//?size:要讀的每個數(shù)據(jù)項的字節(jié)數(shù),單位是字節(jié) ??//?n:要讀n個數(shù)據(jù)項,每個數(shù)據(jù)項size個字節(jié) ??//?stream:輸入流 ??//?返回值:返回實際讀取的數(shù)據(jù)大小 ??//?因為函數(shù)名帶了"_unlocked"后綴,所以它不是線程安全的 ??size_t?r?=?fread_unlocked(scratch,?1,?n,?file_); #endif ??//?Slice的第二個參數(shù)要用實際讀到的數(shù)據(jù)大小,因為讀到文件尾部,剩下的字節(jié)數(shù)可能小于n ??*result?=?Slice(scratch,?r); ??if?(r?<?n)?{ ????if?(feof(file_))?{ ????//?We?leave?status?as?ok?if?we?hit?the?end?of?the?file ????//?如果r<n,且feof(file_)非零,說明到了文件結(jié)尾,什么都不用做,函數(shù)結(jié)束后會返回OK?Status ????}?else?{ ????//?A?partial?read?with?an?error:?return?a?non-ok?status ????//?否則返回錯誤信息 ????s?=?Status::IOError(filename_,?strerror(errno)); ????} ??} ??return?s; ??} ??//?跳過n字節(jié)的內(nèi)容,這并不比讀取n字節(jié)的內(nèi)容慢,而且會更快。 ??//?如果到達了文件尾部,則會停留在文件尾部,并返回OK?Status。 ??//?否則,返回錯誤信息 ??virtual?Status?Skip(uint64_t?n)?{ ???//?int?fseek(FILE?*stream,?long?offset,?int?origin); ???//?stream:文件指針 ???//?offset:偏移量,整數(shù)表示正向偏移,負數(shù)表示負向偏移 ???//?origin:設定從文件的哪里開始偏移,?可能取值為:SEEK_CUR、?SEEK_END?或?SEEK_SET ???//?SEEK_SET:?文件開頭 ???//?SEEK_CUR:?當前位置 ???//?SEEK_END:?文件結(jié)尾 ???//?其中SEEK_SET,?SEEK_CUR和SEEK_END和依次為0,1和2. ???//?舉例: ???//?fseek(fp,?100L,?0);?把fp指針移動到離文件開頭100字節(jié)處; ???//?fseek(fp,?100L,?1);?把fp指針移動到離文件當前位置100字節(jié)處; ???//?fseek(fp,?100L,?2);?把fp指針退回到離文件結(jié)尾100字節(jié)處。 ???//?返回值:成功返回0,失敗返回非0 ??if?(fseek(file_,?n,?SEEK_CUR))?{ ????return?Status::IOError(filename_,?strerror(errno)); ??} ??return?Status::OK(); ??} };
這就是LevelDB從磁盤順序讀取文件的接口了,用的是C的流文件操作和FILE結(jié)構(gòu)體。需要注意的是Read接口讀取文件時不會鎖住文件流,因此外部的并發(fā)訪問需要自行提供并發(fā)控制。
隨機讀:
class?PosixRandomAccessFile:?public?RandomAccessFile?{ ?private: ??std::string?filename_; ??int?fd_; ??mutable?boost::mutex?mu_; ?public: ??PosixRandomAccessFile(const?std::string&?fname,?int?fd) ????:?filename_(fname),?fd_(fd)?{?} ??virtual?~PosixRandomAccessFile()?{?close(fd_);?} ??//?這里與順序讀的同名函數(shù)相比,多了一個參數(shù)offset,offset用來指定 ??//?讀取位置距離文件起始位置的偏移量,這樣就可以實現(xiàn)隨機讀了。 ??virtual?Status?Read(uint64_t?offset,?size_t?n,?Slice*?result, ????????????char*?scratch)?const?{ ????Status?s; #ifdef?WIN32 ????//?no?pread?on?Windows?so?we?emulate?it?with?a?mutex ????boost::unique_locklock(mu_); ????if?(::_lseeki64(fd_,?offset,?SEEK_SET)?==?-1L)?{ ??????return?Status::IOError(filename_,?strerror(errno)); ????} //?int?_read(int?_FileHandle,?void?*?_DstBuf,?unsigned?int?_MaxCharCount) //?_FileHandle:文件描述符 //?_DstBuf:保存讀取數(shù)據(jù)的緩沖區(qū) //?_MaxCharCount:讀取的字節(jié)數(shù) //?返回值:成功返回讀取的字節(jié)數(shù),出錯返回-1并設置errno。 ????int?r?=?::_read(fd_,?scratch,?n); ????*result?=?Slice(scratch,?(r?<?0)???0?:?r); ????lock.unlock(); #else //?在非windows系統(tǒng)上使用pread進行隨機讀,為何此時不用鎖呢?詳見下文分析 ????ssize_t?r?=?pread(fd_,?scratch,?n,?static_cast(offset)); ????*result?=?Slice(scratch,?(r?<?0)???0?:?r); #endif ????if?(r?<?0)?{ ??????//?An?error:?return?a?non-ok?status ??????s?=?Status::IOError(filename_,?strerror(errno)); ????} ????return?s; ??} };
可以看到的是,PosixRandomAccessFile 在非windows系統(tǒng)上使用了 pread 來實現(xiàn)原子的定位加訪問功能。常規(guī)的隨機訪問文件的過程可以分為兩步,fseek (seek) 定位到訪問點,調(diào)用 fread (read) 來從特定位置開始訪問 FILE* (fd)。然而,這兩個操作組合在一起并不是原子的,即 fseek 和 fread 之間可能會插入其他線程的文件操作。相比之下 pread 由系統(tǒng)來保證實現(xiàn)原子的定位和讀取組合功能。需要注意的是,pread 操作不會更新文件指針。
需要注意的是,在隨機讀和順序讀中,分別用fd和FILE *來表示一個文件。文件描述符(file descriptor)是系統(tǒng)層的概念, fd 對應于系統(tǒng)打開文件表里面的一個文件;FILE* 是應用層的概念,其包含了應用層操作文件的數(shù)據(jù)結(jié)構(gòu)。
順序?qū)懀?/p>
class?BoostFile?:?public?WritableFile?{ public: ??explicit?BoostFile(std::string?path)?:?path_(path),?written_(0)?{ ????Open(); ??} ??virtual?~BoostFile()?{ ????Close(); ??} private: ??void?Open()?{ ????//?we?truncate?the?file?as?implemented?in?env_posix ?//?trunc:先將文件中原有的內(nèi)容清空 ?//?out:為輸出(寫)而打開文件 ?//?binary:以二進制方式打開文件 ?????file_.open(path_.generic_string().c_str(),? ?????????std::ios_base::trunc?|?std::ios_base::out?|?std::ios_base::binary); ?????written_?=?0; ??} public: ??virtual?Status?Append(const?Slice&?data)?{ ????Status?result; ????file_.write(data.data(),?data.size()); ????if?(!file_.good())?{ ??????result?=?Status::IOError( ??????????path_.generic_string()?+?"?Append",?"cannot?write"); ????} ????return?result; ??} ??virtual?Status?Close()?{ ????Status?result; ????try?{ ??????if?(file_.is_open())?{ ????????Sync(); //?關閉流時,緩沖區(qū)中的數(shù)據(jù)會自動寫入到文件 //?上面調(diào)用Sync()強制刷新,是為了確保數(shù)據(jù)寫入,防止數(shù)據(jù)丟失 ????????file_.close(); ??????} ????}?catch?(const?std::exception?&?e)?{ ??????result?=?Status::IOError(path_.generic_string()?+?"?close",?e.what()); ????} ????return?result; ??} ??virtual?Status?Flush()?{ ????file_.flush(); ????return?Status::OK(); ??} ??//?手動刷新(清空輸出緩沖區(qū),并把緩沖區(qū)內(nèi)容同步到文件) ??virtual?Status?Sync()?{ ????Status?result; ????try?{ ??????Flush(); ????}?catch?(const?std::exception?&?e)?{ ??????result?=?Status::IOError(path_.string()?+?"?sync",?e.what()); ????} ????return?result; ??} private: ??boost::filesystem::path?path_; ??boost::uint64_t?written_; ??std::ofstream?file_; };
關于ofstream::flush和ofstream::Close的區(qū)別,詳見:C++之ofstream::flush與ofstream::close
文件鎖:
class?BoostFileLock?:?public?FileLock?{ ?public: ??boost::interprocess::file_lock?fl_; };
virtual?Status?LockFile(const?std::string&?fname,?FileLock**?lock)?{ ????*lock?=?NULL; ????Status?result; ????try?{ ??????if?(!boost::filesystem::exists(fname))?{ ????????std::ofstream?of(fname,?std::ios_base::trunc?|?std::ios_base::out); ??????} ??????assert(boost::filesystem::exists(fname)); ??????boost::interprocess::file_lock?fl(fname.c_str()); ??????BoostFileLock?*?my_lock?=?new?BoostFileLock(); ??????my_lock->fl_?=?std::move(fl); ??????if?(my_lock->fl_.try_lock()) ????????*lock?=?my_lock; ??????else ????????result?=?Status::IOError("acquiring?lock?"?+?fname?+?"?failed"); ????}?catch?(const?std::exception?&?e)?{ ??????result?=?Status::IOError("lock?"?+?fname,?e.what()); ????} ????return?result; ??}
?virtual?Status?UnlockFile(FileLock*?lock)?{ ????Status?result; ????try?{ ??????BoostFileLock?*?my_lock?=?static_cast(lock); ??????my_lock->fl_.unlock(); ??????delete?my_lock; ????}?catch?(const?std::exception?&?e)?{ ??????result?=?Status::IOError("unlock",?e.what()); ????} ????return?result; ??}
文件的鎖操作是調(diào)用Boost的鎖實現(xiàn)的。加鎖是為了防止多進程的并發(fā)沖突,如果加鎖失敗,*lock=NULL,且返回non-OK;如果加鎖成功,*lock存放的的是鎖的指針,并返回OK。如果進程退出,鎖會自動釋放,否則用戶需要調(diào)用UnlockFile顯式的釋放鎖。
這幾個方法都非常簡單,比較晦澀的是這句:my_lock->std::move(f1),從函數(shù)名來看,是要移動f1。其實std::move是C++11標準庫在
計劃任務:
PosixEnv還有一個很重要的功能,計劃任務,也就是后臺的compaction線程。compaction就是壓縮合并的意思,在LevelDB源碼分析之六:skiplist(2)中也有提到。對于LevelDB來說,寫入記錄操作很簡單,刪除記錄僅僅寫入一個刪除標記就算完事,但是讀取記錄比較復雜,需要在內(nèi)存以及各個層級文件中依照新鮮程度依次查找,代價很高。為了加快讀取速度,LevelDB采取了compaction的方式來對已有的記錄進行整理壓縮,通過這種方式,來刪除掉一些不再有效的KV數(shù)據(jù),減小數(shù)據(jù)規(guī)模,減少文件數(shù)量等。
PosixEnv中定義了一個任務隊列:
??struct?BGItem?{?void*?arg;?void?(*function)(void*);?}; ??//用的是deque雙端隊列作為底層的數(shù)據(jù)結(jié)構(gòu) ??typedef?std::dequeBGQueue; ??BGQueue?queue_;
主線程一旦判定需要進行compaction操作,就把compaction任務壓進隊列queue_中,BGItem是存有任務函數(shù)和db對象指針的結(jié)構(gòu)。而后臺線程從一開始就不斷根據(jù)隊列中的函數(shù)指針執(zhí)行compaction任務。BGThread()函數(shù)就是不停的在queue_中取出函數(shù)指針,執(zhí)行。
后臺進程一直執(zhí)行queue_中的任務,由于queue_是動態(tài)的,自然需要考慮queue_空了怎么辦,LevelDB采用的是條件變量boost::condition_variable bgsignal_,隊列空了就進入等待,直至有新的任務加入進來。而條件變量一般是要和boost::mutex mu_搭配使用,防止某些邏輯錯誤。
?//?BGThread函數(shù)的包裝,里面調(diào)用的就是BGThread函數(shù) ??static?void*?BGThreadWrapper(void*?arg)?{ ????reinterpret_cast(arg)->BGThread(); ????return?NULL; ??}
void?PosixEnv::Schedule(void?(*function)(void*),?void*?arg)?{ ??boost::unique_locklock(mu_); ??//?Start?background?thread?if?necessary ??if?(!bgthread_)?{ ?????bgthread_.reset( ?????????new?boost::thread(boost::bind(&PosixEnv::BGThreadWrapper,?this))); ??} ??//?Add?to?priority?queue ??//?將任務壓進隊列中 ??queue_.push_back(BGItem()); ??queue_.back().function?=?function; ??queue_.back().arg?=?arg; ??lock.unlock(); ??bgsignal_.notify_one(); }
void?PosixEnv::BGThread()?{ ??while?(true)?{ ??//?加鎖,防止并發(fā)沖突 ??boost::unique_locklock(mu_); ??//?如果隊列為空,等待,直到收到通知(notification) ??while?(queue_.empty())?{ ????bgsignal_.wait(lock); ??} ??//?從隊列頭取出任務的函數(shù)及其參數(shù) ??void?(*function)(void*)?=?queue_.front().function; ??void*?arg?=?queue_.front().arg; ??queue_.pop_front(); ??lock.unlock(); ??//?調(diào)用函數(shù) ??(*function)(arg); ??} }
此外PosixEnv中還有FileExists、GetChildren、DeleteFile、CreateDir、DeleteDir、GetFileSize、RenameFile等等函數(shù),他們見名知義,都是調(diào)用Boot的相應函數(shù)實現(xiàn)的。
EnvWrapper:
在levelDB中還實現(xiàn)了一個EnvWrapper類,該類繼承自Env,且只有一個成員函數(shù)Env* target_,該類的所有變量都調(diào)用Env類相應的成員變量,我們知道,Env是一個抽象類,是不能定義Env 類型的對象的。我們傳給EnvWrapper 的構(gòu)造函數(shù)的類型是PosixEnv,所以,最后調(diào)用的都是PosixEnv類的成員變量,你可能已經(jīng)猜到了,這就是設計模式中的代理模式,EnvWrapper只是進行了簡單的封裝,它的代理了Env的子類PosixEnv。
EnvWrapper和Env與PosixEnv的關系如下:
由于篇幅限制,Env中的Logger類就放在后面分析了,參考:LevelDB源碼分析之十:LOG文件,從env給我的收獲就是:
利用虛基類的特性提供了默認的實現(xiàn),也開放了用戶自定義操作的權(quán)限面向?qū)ο缶幊谭妒降膶W習,把一切操作定義成類文件的加鎖解鎖,線程的同步C的文件流操作,對文件名的字符提取操作,創(chuàng)建、刪除文件和路徑,這些都可以直接用到將來自己的項目中參考.