LevelDB阅读笔记-Env
LevelDB与操作系统交互接口:Env
LevelDB中的Env
类是一个上层逻辑与操作系统进行交互的接口,由于LevelDB以文件方式对数据进行持久化,并且需要读取文件来完成一次Get
操作,上述过程都涉及到对文件的操作。但不同操作系统的文件系统调用接口不同,因此LevelDB定义了Env
类来屏蔽上述操作系统的实现差异,向上层的LevelDB逻辑提供统一的接口。
上图表示了Env
在整个系统架构中位置,包括了以下几部分:
- 定义了
SequentialFile
等几类具有不同读写行为的文件接口。 - 提供了和文件系统交互的其他接口,例如对文件重命名,创建文件夹等,上图未收录这部分。
- 提供了和线程控制相关的接口,例如
Schedule
用于向后台线程传递一个执行的函数。
抽象基类Env
Env
是一个抽象基类,它屏蔽了操作系统之间的实现差异,并向上层的leveldb逻辑提供统一的操作接口。总体上看,其接口可分为两类:文件系统相关与线程调度相关。
文件系统相关接口
文件抽象
LevelDB定义了三类不同的文件类型,即三种文件抽象基类,用于表示具有不同功能的文件抽象。
RandomAccessFile
, 该类文件允许访问任意偏移位置的任意字节数量的数据, 并且该类是只读的,因此可以允许多个线程同时访问同一个文件。class LEVELDB_EXPORT RandomAccessFile { public: // 析构函数采用默认,拷贝赋值函数和拷贝构造函数是delete的,下同 // 在offset位置读取n字节的数据,拷贝到scratch所指示的内存位置,result // 指明了读取的结果,即result->data()指向读取到的数据的首地址; // result->size()说明了读取到数据的长度,长度不一定为n virtual Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const = 0; };
SequentialFile
是一个只允许顺序读的文件类,即一旦经过某个偏移位置,就不能再回过头来读这个位置的数据了。该类提供了Skip
函数来跳过接下来指定字节的数据,提供了Read
接口来读取当前位置的指定字节的数据。与RandomAccessFile
类似,该文件是只读的,但是并发访问需要外部锁的同步,原因在于Skip()
会修改当前位置指针。// A file abstraction for reading sequentially through a file class LEVELDB_EXPORT SequentialFile { public: // Read不同于RandomAccessFile, 不用指定offset,因为总是从“当前”位置开始读 // REQUIRES: External synchronization virtual Status Read(size_t n, Slice* result, char* scratch) = 0; // REQUIRES: External synchronization virtual Status Skip(uint64_t n) = 0; };
WritableFile
是一类只允许顺序写的文件类。并且为了提升写的效率,leveldb要求任何对该抽象类的实现都应该提供buffer,而不是每次都直接写入文件。类似于SequentialFile
, 该类也不是线程安全的,需要提供外部同步。WritableFile
的四个接口可以从名字上就看出其含义,值得注意的是Flush
和Sync
的不同,Flush
是指将数据从该类内置的buffer中写入文件,Sync
是指刷新该底层文件对应的文件系统cache,从而实现真正的落盘,换言之,如果Sync
调用成功,则在该sync调用之前的所有Append
操作的数据都保证是持久化的。class LEVELDB_EXPORT WritableFile { public: virtual Status Append(const Slice& data) = 0; virtual Status Close() = 0; virtual Status Flush() = 0; virtual Status Sync() = 0; };
FileLock
逻辑上可以视作一个锁,但该类并不提供锁的Lock() & Unlock()
接口,而是通过Env
的抽象接口LockFile()
返回一个FileLock
的Handle, 表示对指定文件上锁;然后通过将该Handle传递给UnlockFile()
来实现对文件的解锁。该类没有提供公共接口和内置的数据成员,但一个直觉的实现是保存指定文件元数据,如文件名等,以便
Env
能做出正确的操作。class LEVELDB_EXPORT FileLock { public: FileLock() = default; FileLock(const FileLock&) = delete; FileLock& operator=(const FileLock&) = delete; virtual ~FileLock(); };
文件的创建和并发控制
Env
类提供了对上述几类文件抽象的创建接口:
class Env {
virtual Status NewSequentialFile(const std::string& fname,
SequentialFile** result) = 0;
virtual Status NewRandomAccessFile(const std::string& fname,
RandomAccessFile** result) = 0;
virtual Status NewWritableFile(const std::string& fname,
WritableFile** result) = 0;
virtual Status NewAppendableFile(const std::string& fname,
WritableFile** result);
// If somebody else already holds the lock, finishes immediately
// with a failure. I.e., this call does not wait for existing locks
// to go away.
//
// May create the named file if it does not already exist.
virtual Status LockFile(const std::string& fname, FileLock** lock) = 0;
// Release the lock acquired by a previous successful call to LockFile.
// REQUIRES: lock was returned by a successful LockFile() call
// REQUIRES: lock has not already been unlocked.
virtual Status UnlockFile(FileLock* lock) = 0;
}
统一格式是提供创建文件名,并通过文件类指针进行返回,接口的返回值表明该次调用的结果,包括错误码等。
这几个接口名称稍微具有误导性,我们进行简单的解释:
- 对于只读的文件类型,
SequentialFile
和RandomAccessFile
,对应的New接口并不是在文件系统中创建指定名称的文件,而是打开已经存在的文件。 NewWritableFile
创建一个具有指定名称的WritableFile
,如果该文件已经存在,则覆盖它。与之对应的是NewAppendableFile
, 虽然它们都是返回一个WritableFile
,但是后者对于已经存在的文件会从文件末的位置继续写。NewAppendableFile
没有被定义为纯虚函数,因此其派生类严格来讲不需要override该函数,根据注释的说明,这是因为部分文件系统不支持向已存在的文件末尾append数据。
Env
还提供了LockFile
和UnlockFile
接口来实现并发控制:
LockFile
用于对指定文件上锁,如果成功,返回一个FileLock
的handle;如果失败,例如该文件已经上锁,则会立刻返回错误,因此该接口的实现必须是非阻塞的
文件系统元数据操作
文件系统元数据操作包括了一些文件系统的常用系统调用的封装,例如创建或删除文件夹,重命名文件,检测文件是否存在,罗列文件夹下的文件等等。
class Env {
// 检测指定名称的文件是否存在
virtual bool FileExists(const std::string& fname) = 0;
// 罗列指定文件夹下的文件名
virtual Status GetChildren(const std::string& dir,
std::vector<std::string>* result) = 0;
// 删除指定文件
virtual Status RemoveFile(const std::string& fname);
// 创建指定名称的文件夹
virtual Status CreateDir(const std::string& dirname) = 0;
// 删除指定名称的文件夹
virtual Status RemoveDir(const std::string& dirname);
// 获得指定名称的文件大小
virtual Status GetFileSize(const std::string& fname, uint64_t* file_size) = 0;
// 重命名文件
virtual Status RenameFile(const std::string& src,
const std::string& target) = 0;
}
线程相关操作
Env
提供了线程相关的操作,接口如下:
class Env {
virtual void Schedule(void (*function)(void* arg), void* arg) = 0;
// Start a new thread, invoking "function(arg)" within the new thread.
// When "function(arg)" returns, the thread will be destroyed.
virtual void StartThread(void (*function)(void* arg), void* arg) = 0;
// Sleep/delay the thread for the prescribed number of micro-seconds.
virtual void SleepForMicroseconds(int micros) = 0;
}
但由于C++11开始,标准库中已经支持了线程模型,所以没有必要再根据不同的操作系统为Env
实现这些函数,所以我认为这些函数都没有必要被设置为纯虚函数,使用虚函数即可。
基于Posix的Env实现:PosixEnv
Env
只是提供了抽象接口,还需要根据不同的操作系统特性进行对应的实现。由于服务器通常比较多使用Linux系统,因此我们只谈谈PosixEnv
的实现,它和Env
的关系大致可以用下面的UML图表示:
下面我们描述一下对应的实现。
文件系统相关接口实现
Posix文件对抽象类的实现
PosixSequentialFile
class PosixSequentialFile final : public SequentialFile {
public:
PosixSequentialFile(std::string filename, int fd)
: fd_(fd), filename_(filename) {}
~PosixSequentialFile() override { close(fd_); }
Status Read(size_t n, Slice* result, char* scratch) override {
Status status;
while (true) {
::ssize_t read_size = ::read(fd_, scratch, n);
if (read_size < 0) {
// 错误处理
}
*result = Slice(scratch, read_size);
break;
}
return status;
}
Status Skip(uint64_t n) override {
if (::lseek(fd_, n, SEEK_CUR) == static_cast<off_t>(-1)) {
return PosixError(filename_, errno);
}
return Status::OK();
}
private:
const int fd_;
const std::string filename_;
};
一个PosixSequentialFile
只包含了文件名和文件描述符(fd)。并通过对文件描述符操作来实现对应接口。例如,通过::read
来实现Read()
接口,通过lseek
来移动文件指针,实现Skip()
。
PosixRandomAccessFile
class PosixRandomAccessFile final : public RandomAccessFile {
private:
const bool has_permanent_fd_; // If false, the file is opened on every read.
const int fd_; // -1 if has_permanent_fd_ is false.
Limiter* const fd_limiter_;
const std::string filename_;
}
PosixRandomAccessFile
实现了RandomAccessFile
接口。其内部的数据结构包含了对应的文件名以及文件描述符,此外还包含了一个Limiter
对象用于保护并发控制。
Limiter的作用:由于RandomAccessFile
支持多读,因此可能有多个RandomAccessFile
的实例指向同一个fd,但这种情况不利于并发控制。因为多个线程可能在调用Read
函数时并发地修改同一个fd的内部文件指针, 这是不被允许的行为。为了解决这个问题,PosixRandomAccessFile
规定只能有一个实例拥有指定fd的所有权(Ownership), 其他实例如果需要读取相同的文件,需要实时地调用open打开文件,读取内容,然后使用close关闭文件,从而保证是在不同的fd上读取文件。为了保证仅有一个实例持有该fd的所有权,需要使用一个外部的Limiter对象。
构造函数:
PosixRandomAccessFile(std::string filename, int fd, Limiter* fd_limiter) : has_permanent_fd_(fd_limiter->Acquire()), fd_(has_permanent_fd_ ? fd : -1), fd_limiter_(fd_limiter), filename_(std::move(filename)) { if (!has_permanent_fd_) { assert(fd_ == -1); ::close(fd); // The file will be opened on every read. } }
Read
接口实现:我们描述了Read实现最核心的流程,而忽略了错误处理等细节。Read根据当前实例是否拥有fd_的所有权进行不同的处理:如果不拥有则需要每次Read时都打开文件读取数据。if (!has_permanent_fd_) { fd = ::open(file_name, O_RDONLY); } else { fd = fd_; } ::pread(fd, /*store=*/scratch, /*offset=*/offset, /*num=*/n); if (!has_permanent_fd_) { ::close(fd); }
PosixMmapReadableFile
利用Posix的mmap
接口,操作系统可以将文件内容映射到指定的内存区域,因此对该内存区域的操作就等同于对文件的操作。通过该技术可以大大简化PosixRandomAccessFile
中直接对fd进行操作的流程。
class PosixMmapReadableFile final : public RandomAccessFile {
PosixMmapReadableFile(std::string filename, char* mmap_base, size_t length,
Limiter* mmap_limiter)
: mmap_base_(mmap_base),
length_(length),
mmap_limiter_(mmap_limiter),
filename_(std::move(filename)) {}
~PosixMmapReadableFile() override {
::munmap(static_cast<void*>(mmap_base_), length_);
mmap_limiter_->Release();
}
private:
char* const mmap_base_;
const size_t length_;
Limiter* const mmap_limiter_;
const std::string filename_;
};
PosixMmapReadableFile
负责管理一块已经被映射的文件的内容,因此,mmap的调用需要在创建一个该类实例之前进行。一旦创建完成,则[mmap_base_, mmap_base_ + length_)
所标识的内存区域就完全等同于对应文件的内容。
PosixMmapReadableFile
通过直接返回标识区域中指定位置的Slice来实现Read()
接口。
PosixWritableFile
PosixWritableFile
的实现相比其他几类文件更加负责,因为它涉及到数据写入,以及与之相关的持久化问题。该类的描述如下所示,这里省略了WritableFile
定义的接口。
buf_
用于作为数据写入的缓冲,正如WritableFile
接口中描述的那样,为了性能考虑,不能每次写入都直接写入文件,而是需要先写入缓冲区,做数据的聚集。缓冲区的大小为64KB。pos_
用于记录当前缓冲区中最后的位置。即,buf_[0..pos_]
标识了当前缓冲区中有效的数据。下次写入需要从pos_
的位置开始向后append数据。is_manifest
标识该文件是否是一个manifest,manifest的意思是用于整个数据库系统控制的元数据。在后面看Version的时候我们会再说。
下面我们看看几个接口的实现:
首先是
Append
, 我们仍然采用接近伪代码的描述方式:Status Append(const Slice& data) override { // 1. Write as many data as possible into Buffer std::memcpy(buf_ + pos_, write_data, copy_size); if (no remaining data) { return Ok; } FlushBuffer(); if (remaining data can be written into buffer) { // Write remaining data into buffer std::memcpy(buf_, write_data, write_size); return Ok; } // Else directly write remaining data into file return WriteUnbuffered(write_data, write_size); }
Close
直接调用Posix的close
函数关闭指定文件,Flush
函数直接调用FlushBuffer
函数。Sync
函数首先处理当前文件为MANIFEST的特殊情况:Status status = SyncDirIfManifest(); if (!status.ok()) { return status; }
SyncDirIfManifest
的大概逻辑是查看当前file是否为manifest,如果是对其所在的directory进行sync操作。但我还没有弄清楚这里的逻辑为什么要这么实现。
然后刷新buffer并与文件系统同步:
status = FlushBuffer();
if (!status.ok()) {
return status;
}
return SyncFd(fd_, filename_);
而SyncFd
主要是调用fsync, fcntl
等系统调用确保写入数据的durability。
其他私有成员或者静态成员的实现这里不再赘述,事实上它们的实现也相当自然。
PosixFileLock
class PosixFileLock : public FileLock {
public:
PosixFileLock(int fd, std::string filename)
: fd_(fd), filename_(std::move(filename)) {}
int fd() const { return fd_; }
const std::string& filename() const { return filename_; }
private:
const int fd_;
const std::string filename_;
};
PosixFileLock
仅提供了保存被锁文件的文件名和描述符的功能。实际对并发控制的实现是在PosixEnv
中实现的PosixLockTable
, 这将在后面的文件系统控制部分进行描述。
Posix文件对象的创建
PosixEnv
实现了NewXXXFile
接口,它们的统一逻辑大概如下:
class PosixEnv : public Env {
Status NewXXXFile(const std::string& filename, XXXFile** result) override {
int fd = ::open(filename.c_str(), flags, mode);
// Deal with error
*result = new XXXFile(filename, fd);
return Ok;
}
}
大部分情况下,不同的接口仅在::open
调用中的flag和mode位置有所不同,例如:
File Type | flags | mode |
---|---|---|
SequentialFile |
O_RDONLY |kOpenBaseFlags |
Default |
RandomAccessFile |
O_RDONLY | kOpenBaseFlags |
Default |
WritableFile |
O_TRUNC | O_WRONLY | O_CREAT | kOpenBaseFlags |
0644 |
AppendableFile |
O_APPEND | O_WRONLY | O_CREAT | kOpenBaseFlags |
0644 |
(注:kOpenBaseFlags = O_CLOEXEC
,用于添加并发访问的支持)
上述四种文件类型中,NewRandomAccessFile
有所不同,由于有两种文件都支持该接口,因此需要在创建时考虑创建哪一种。LevelDB的策略是,尽可能使用PosixMmapReadableFile
,如果limiter不够,才创建RandomAccessFile
。上述策略的理由是,创建PosixMmapReadableFile
只需要一次mmap
系统调用,之后每次都可以从映射的内存段读数据,无系统调用开销;而后者则需要每次读都调用一次read
。对于RandomAccessFile
这种只读的文件类型而言,显然是采用mmap的方式能获得更好的性能。
if (!mmap_limiter_.Acquire()) {
*result = new PosixRandomAccessFile(filename, fd, &fd_limiter_);
return Status::OK();
}
uint64_t file_size;
Status status = GetFileSize(filename, &file_size);
if (status.ok()) {
void* mmap_base =
::mmap(/*addr=*/nullptr, file_size, PROT_READ, MAP_SHARED, fd, 0);
if (mmap_base != MAP_FAILED) {
*result = new PosixMmapReadableFile(filename,
reinterpret_cast<char*>(mmap_base),
file_size, &mmap_limiter_);
} else {
status = PosixError(filename, errno);
}
}
::close(fd);
if (!status.ok()) {
mmap_limiter_.Release();
}
return status;
}
代码中值得注意的一点是,在创建MmapReadableFile
时,完成mmap映射之后可以通过调用close
关闭fd对应的文件描述符,这是因为mmap
会给该文件的引用计数加1,因此文件真正被“释放”的时机是在unmap
的地方。
Posix文件系统控制相关实现
系统调用
PosixEnv
提供了文件系统控制接口,包括创建和删除文件夹,重命名文件等功能。这些接口基本上是对Posix系统调用的直接使用,只是额外在系统调用发生错误时对错误进行了处理。下面我们用一个表格来简单说明每个调用涉及到的系统调用, 为了简单起见,我们省略了参数。
PosixEnv 接口 |
涉及到的系统调用 |
---|---|
FileExists |
access() , 用于检测一个文件的accessbility, 使用F_OK Flag检测文件是否存在 |
GetChildren |
opendir() , 用于打开一个文件夹并把信息填入到DIR 结构体中readdir() , 用于从DIR 结构体中读取每个entry |
RemoveFile |
unlink() , 减少指定文件的链接数 |
CreateDir |
mkdir() , 创建指定名称的文件夹 |
RemoveDir |
rmdir() , 删除指定名称的文件夹 |
GetFileSize |
stat() ,获取指定名称的文件信息,填入到stat 结构体中,并从stat 结构体中直接读取file_size 字段获得文件大小 |
RenameFile |
std::rename() , 重命名指定名称的文件 |
LockFile |
open() , 打开指定文件,如果不存在则创建fnctl() , 给该文件指定范围加锁 |
UnlockFile |
fnctl() , 给文件指定范围解锁 |
对文件的加锁和解锁实现
PosixEnv
在对文件加锁和解锁时使用了两层,一层是逻辑层,即在用户向PosixEnv
发起加锁和解锁请求时由PosixEnv
来决定是否成功;另一层是在文件系统层面,使用fnctl
对整个文件上锁或解锁。
在逻辑层,PosixEnv
使用一个LockTable
的结构,用于保存所有已被上锁的文件的文件名:
class PosixLockTable {
public:
bool Insert(const std::string& fname) LOCKS_EXCLUDED(mu_) {
mu_.Lock();
bool succeeded = locked_files_.insert(fname).second;
mu_.Unlock();
return succeeded;
}
void Remove(const std::string& fname) LOCKS_EXCLUDED(mu_) {
mu_.Lock();
locked_files_.erase(fname);
mu_.Unlock();
}
private:
port::Mutex mu_;
std::set<std::string> locked_files_ GUARDED_BY(mu_);
};
该结果使用互斥锁来保证线程安全,并提供Insert
和Remove
函数来动态修改保存的文件名。
当PosixEnv
尝试给一个文件上锁时,它会查看PosixTable
中该文件是否已经上锁,如果没有,则将其加入PosixTable
中,并进行文件系统的加锁;如果上述过程都成功了,则返回一个PosixFileLock
的handle。
UnlockFile
是逆向的过程,它首先在文件系统层面解除该文件的锁,然后将其从PosixLockTable
中移除。
线程控制相关实现
PosixEnv
实现如下线程相关的接口:
接口名称 | 描述 |
---|---|
StartThread(thread_main, thread_main_arg) |
创建并执行一个线程,线程的函数体是thread_main , 参数是thread_main_arg |
Schedule(thread_main, thread_main_arg) |
让后台线程执行指定的任务,但并不是马上执行 |
SleepForMicroseconds(micros) |
让主线程sleep指定数量的时间 |
上述接口中,StartThread
和SleepForMicroseconds
都比较直接,它们分别通过调用C++11标准库中的std::thread
对象创建和std::this_thread::sleep_for
来实现。但leveldb早期开发的时候应该还没有C++11标准,所以这两个函数实现都是操作系统相关的,这也是它们被设置为纯虚函数的原因。
Schedule
将一个指定的任务交给后台线程执行,由于前端可能源源不断地涌入任务,因此PosixEnv
维护了一个队列,用于暂存任务。实际上,执行Schedule
的主线程和执行实际任务的Background
线程构成了一个生产者-消费者模型。
每个任务由一个BackgroundWorkItem
组成,实际上就是一个函数指针与参数指针组成的结构体:
struct BackgroundWorkItem {
void (*const function)(void*);
void* const arg;
};
在调用Schedule
时,主线程将一个BackgroundWorkItem
加入到任务队列中,然后使用信号量通知后台线程执行对应的任务:
void PosixEnv::Schedule(
void (*background_work_function)(void* background_work_arg),
void* background_work_arg) {
background_work_mutex_.Lock();
// Start the background thread, if we haven't done so already.
if (!started_background_thread_) {
started_background_thread_ = true;
std::thread background_thread(PosixEnv::BackgroundThreadEntryPoint, this);
background_thread.detach();
}
// If the queue is empty, the background thread may be waiting for work.
if (background_work_queue_.empty()) {
background_work_cv_.Signal();
}
background_work_queue_.emplace(background_work_function, background_work_arg);
background_work_mutex_.Unlock();
}
而后台线程则持续性地检查任务队列,一旦有任务则执行,没有则阻塞:
void PosixEnv::BackgroundThreadMain() {
while (true) {
background_work_mutex_.Lock();
// Wait until there is work to be done.
while (background_work_queue_.empty()) {
background_work_cv_.Wait();
}
assert(!background_work_queue_.empty());
auto background_work_function = background_work_queue_.front().function;
void* background_work_arg = background_work_queue_.front().arg;
background_work_queue_.pop();
background_work_mutex_.Unlock();
background_work_function(background_work_arg);
}
}