LevelDB与操作系统交互接口:Env

LevelDB中的Env类是一个上层逻辑与操作系统进行交互的接口,由于LevelDB以文件方式对数据进行持久化,并且需要读取文件来完成一次Get操作,上述过程都涉及到对文件的操作。但不同操作系统的文件系统调用接口不同,因此LevelDB定义了Env类来屏蔽上述操作系统的实现差异,向上层的LevelDB逻辑提供统一的接口。

上图表示了Env在整个系统架构中位置,包括了以下几部分:

  • 定义了SequentialFile等几类具有不同读写行为的文件接口。
  • 提供了和文件系统交互的其他接口,例如对文件重命名,创建文件夹等,上图未收录这部分。
  • 提供了和线程控制相关的接口,例如Schedule用于向后台线程传递一个执行的函数。

抽象基类Env

Env是一个抽象基类,它屏蔽了操作系统之间的实现差异,并向上层的leveldb逻辑提供统一的操作接口。总体上看,其接口可分为两类:文件系统相关与线程调度相关。


文件系统相关接口

文件抽象

LevelDB定义了三类不同的文件类型,即三种文件抽象基类,用于表示具有不同功能的文件抽象。

  • RandomAccessFile, 该类文件允许访问任意偏移位置的任意字节数量的数据, 并且该类是只读的,因此可以允许多个线程同时访问同一个文件。

    class LEVELDB_EXPORT RandomAccessFile {
     public:
    	// 析构函数采用默认,拷贝赋值函数和拷贝构造函数是delete的,下同
      
      // 在offset位置读取n字节的数据,拷贝到scratch所指示的内存位置,result
      // 指明了读取的结果,即result->data()指向读取到的数据的首地址;
      // result->size()说明了读取到数据的长度,长度不一定为n
      virtual Status Read(uint64_t offset, size_t n, Slice* result,
                          char* scratch) const = 0;
    };
    
  • SequentialFile是一个只允许顺序读的文件类,即一旦经过某个偏移位置,就不能再回过头来读这个位置的数据了。该类提供了Skip函数来跳过接下来指定字节的数据,提供了Read接口来读取当前位置的指定字节的数据。与RandomAccessFile类似,该文件是只读的,但是并发访问需要外部锁的同步,原因在于Skip()会修改当前位置指针。

    // A file abstraction for reading sequentially through a file
    class LEVELDB_EXPORT SequentialFile {
     public:
      // Read不同于RandomAccessFile, 不用指定offset,因为总是从“当前”位置开始读
      // REQUIRES: External synchronization
      virtual Status Read(size_t n, Slice* result, char* scratch) = 0;
      
      // REQUIRES: External synchronization
      virtual Status Skip(uint64_t n) = 0;
    };
    
  • WritableFile是一类只允许顺序写的文件类。并且为了提升写的效率,leveldb要求任何对该抽象类的实现都应该提供buffer,而不是每次都直接写入文件。类似于SequentialFile, 该类也不是线程安全的,需要提供外部同步。WritableFile的四个接口可以从名字上就看出其含义,值得注意的是FlushSync的不同,Flush是指将数据从该类内置的buffer中写入文件,Sync是指刷新该底层文件对应的文件系统cache,从而实现真正的落盘,换言之,如果Sync调用成功,则在该sync调用之前的所有Append操作的数据都保证是持久化的。

    class LEVELDB_EXPORT WritableFile {
     public:
      virtual Status Append(const Slice& data) = 0;
      virtual Status Close() = 0;
      virtual Status Flush() = 0;
      virtual Status Sync() = 0;
    };
  • FileLock 逻辑上可以视作一个锁,但该类并不提供锁的Lock() & Unlock()接口,而是通过Env的抽象接口LockFile()返回一个FileLock的Handle, 表示对指定文件上锁;然后通过将该Handle传递给UnlockFile()来实现对文件的解锁。

    该类没有提供公共接口和内置的数据成员,但一个直觉的实现是保存指定文件元数据,如文件名等,以便Env能做出正确的操作。

    class LEVELDB_EXPORT FileLock {
     public:
      FileLock() = default;
    
      FileLock(const FileLock&) = delete;
      FileLock& operator=(const FileLock&) = delete;
    
      virtual ~FileLock();
    };

文件的创建和并发控制

Env类提供了对上述几类文件抽象的创建接口:

class Env {
  virtual Status NewSequentialFile(const std::string& fname,
                                   SequentialFile** result) = 0;

  virtual Status NewRandomAccessFile(const std::string& fname,
                                     RandomAccessFile** result) = 0;

  virtual Status NewWritableFile(const std::string& fname,
                                 WritableFile** result) = 0;

  virtual Status NewAppendableFile(const std::string& fname,
                                   WritableFile** result);
  
  // If somebody else already holds the lock, finishes immediately
  // with a failure.  I.e., this call does not wait for existing locks
  // to go away.
  //
  // May create the named file if it does not already exist.
  virtual Status LockFile(const std::string& fname, FileLock** lock) = 0;

  // Release the lock acquired by a previous successful call to LockFile.
  // REQUIRES: lock was returned by a successful LockFile() call
  // REQUIRES: lock has not already been unlocked.
  virtual Status UnlockFile(FileLock* lock) = 0;
}

统一格式是提供创建文件名,并通过文件类指针进行返回,接口的返回值表明该次调用的结果,包括错误码等。

这几个接口名称稍微具有误导性,我们进行简单的解释:

  • 对于只读的文件类型,SequentialFileRandomAccessFile,对应的New接口并不是在文件系统中创建指定名称的文件,而是打开已经存在的文件。
  • NewWritableFile创建一个具有指定名称的WritableFile,如果该文件已经存在,则覆盖它。与之对应的是NewAppendableFile, 虽然它们都是返回一个WritableFile,但是后者对于已经存在的文件会从文件末的位置继续写。

  • NewAppendableFile没有被定义为纯虚函数,因此其派生类严格来讲不需要override该函数,根据注释的说明,这是因为部分文件系统不支持向已存在的文件末尾append数据

Env还提供了LockFileUnlockFile接口来实现并发控制:

  • LockFile用于对指定文件上锁,如果成功,返回一个FileLock的handle;如果失败,例如该文件已经上锁,则会立刻返回错误,因此该接口的实现必须是非阻塞的

文件系统元数据操作

文件系统元数据操作包括了一些文件系统的常用系统调用的封装,例如创建或删除文件夹,重命名文件,检测文件是否存在,罗列文件夹下的文件等等。

class Env { 
	// 检测指定名称的文件是否存在
  virtual bool FileExists(const std::string& fname) = 0;

  // 罗列指定文件夹下的文件名
  virtual Status GetChildren(const std::string& dir,
                             std::vector<std::string>* result) = 0;
  // 删除指定文件
  virtual Status RemoveFile(const std::string& fname);

  // 创建指定名称的文件夹
  virtual Status CreateDir(const std::string& dirname) = 0;

  // 删除指定名称的文件夹
  virtual Status RemoveDir(const std::string& dirname);

  // 获得指定名称的文件大小
  virtual Status GetFileSize(const std::string& fname, uint64_t* file_size) = 0;

  // 重命名文件
  virtual Status RenameFile(const std::string& src,
                            const std::string& target) = 0;
}

线程相关操作

Env提供了线程相关的操作,接口如下:

class Env {
  virtual void Schedule(void (*function)(void* arg), void* arg) = 0;

  // Start a new thread, invoking "function(arg)" within the new thread.
  // When "function(arg)" returns, the thread will be destroyed.
  virtual void StartThread(void (*function)(void* arg), void* arg) = 0;
  
  // Sleep/delay the thread for the prescribed number of micro-seconds.
  virtual void SleepForMicroseconds(int micros) = 0;
} 

但由于C++11开始,标准库中已经支持了线程模型,所以没有必要再根据不同的操作系统为Env实现这些函数,所以我认为这些函数都没有必要被设置为纯虚函数,使用虚函数即可。


基于Posix的Env实现:PosixEnv

Env只是提供了抽象接口,还需要根据不同的操作系统特性进行对应的实现。由于服务器通常比较多使用Linux系统,因此我们只谈谈PosixEnv的实现,它和Env的关系大致可以用下面的UML图表示:

下面我们描述一下对应的实现。


文件系统相关接口实现

Posix文件对抽象类的实现

PosixSequentialFile
class PosixSequentialFile final : public SequentialFile {
 public:
  PosixSequentialFile(std::string filename, int fd)
      : fd_(fd), filename_(filename) {}
  ~PosixSequentialFile() override { close(fd_); }

  Status Read(size_t n, Slice* result, char* scratch) override {
    Status status;
    while (true) {
      ::ssize_t read_size = ::read(fd_, scratch, n);
      if (read_size < 0) { 
        // 错误处理
      }
      *result = Slice(scratch, read_size);
      break;
    }
    return status;
  }

  Status Skip(uint64_t n) override {
    if (::lseek(fd_, n, SEEK_CUR) == static_cast<off_t>(-1)) {
      return PosixError(filename_, errno);
    }
    return Status::OK();
  }

 private:
  const int fd_;
  const std::string filename_;
};

一个PosixSequentialFile只包含了文件名和文件描述符(fd)。并通过对文件描述符操作来实现对应接口。例如,通过::read来实现Read()接口,通过lseek来移动文件指针,实现Skip()


PosixRandomAccessFile
class PosixRandomAccessFile final : public RandomAccessFile {
 private:
  const bool has_permanent_fd_;  // If false, the file is opened on every read.
  const int fd_;                 // -1 if has_permanent_fd_ is false.
  Limiter* const fd_limiter_;
  const std::string filename_;
}

PosixRandomAccessFile实现了RandomAccessFile接口。其内部的数据结构包含了对应的文件名以及文件描述符,此外还包含了一个Limiter对象用于保护并发控制。

Limiter的作用:由于RandomAccessFile支持多读,因此可能有多个RandomAccessFile的实例指向同一个fd,但这种情况不利于并发控制。因为多个线程可能在调用Read函数时并发地修改同一个fd的内部文件指针, 这是不被允许的行为。为了解决这个问题,PosixRandomAccessFile规定只能有一个实例拥有指定fd的所有权(Ownership), 其他实例如果需要读取相同的文件,需要实时地调用open打开文件,读取内容,然后使用close关闭文件,从而保证是在不同的fd上读取文件。为了保证仅有一个实例持有该fd的所有权,需要使用一个外部的Limiter对象。

  • 构造函数:

    PosixRandomAccessFile(std::string filename, int fd, Limiter* fd_limiter)
        : has_permanent_fd_(fd_limiter->Acquire()),
          fd_(has_permanent_fd_ ? fd : -1),
          fd_limiter_(fd_limiter),
          filename_(std::move(filename)) {
      if (!has_permanent_fd_) {
        assert(fd_ == -1);
        ::close(fd);  // The file will be opened on every read.
      }
    }
  • Read接口实现:我们描述了Read实现最核心的流程,而忽略了错误处理等细节。Read根据当前实例是否拥有fd_的所有权进行不同的处理:如果不拥有则需要每次Read时都打开文件读取数据。

    if (!has_permanent_fd_) {
      fd = ::open(file_name, O_RDONLY);
    } else {
      fd = fd_;
    }
    ::pread(fd, /*store=*/scratch, /*offset=*/offset, /*num=*/n);
    if (!has_permanent_fd_) {
      ::close(fd);
    }

PosixMmapReadableFile

利用Posix的mmap接口,操作系统可以将文件内容映射到指定的内存区域,因此对该内存区域的操作就等同于对文件的操作。通过该技术可以大大简化PosixRandomAccessFile中直接对fd进行操作的流程。

class PosixMmapReadableFile final : public RandomAccessFile {
  PosixMmapReadableFile(std::string filename, char* mmap_base, size_t length,
                        Limiter* mmap_limiter)
      : mmap_base_(mmap_base),
        length_(length),
        mmap_limiter_(mmap_limiter),
        filename_(std::move(filename)) {}

  ~PosixMmapReadableFile() override {
    ::munmap(static_cast<void*>(mmap_base_), length_);
    mmap_limiter_->Release();
  }
 private:
  char* const mmap_base_;
  const size_t length_;
  Limiter* const mmap_limiter_;
  const std::string filename_;
};

PosixMmapReadableFile负责管理一块已经被映射的文件的内容,因此,mmap的调用需要在创建一个该类实例之前进行。一旦创建完成,则[mmap_base_, mmap_base_ + length_)所标识的内存区域就完全等同于对应文件的内容。

PosixMmapReadableFile通过直接返回标识区域中指定位置的Slice来实现Read()接口。


PosixWritableFile

PosixWritableFile的实现相比其他几类文件更加负责,因为它涉及到数据写入,以及与之相关的持久化问题。该类的描述如下所示,这里省略了WritableFile定义的接口。

  • buf_用于作为数据写入的缓冲,正如WritableFile接口中描述的那样,为了性能考虑,不能每次写入都直接写入文件,而是需要先写入缓冲区,做数据的聚集。缓冲区的大小为64KB。
  • pos_用于记录当前缓冲区中最后的位置。即,buf_[0..pos_]标识了当前缓冲区中有效的数据。下次写入需要从pos_的位置开始向后append数据。
  • is_manifest标识该文件是否是一个manifest,manifest的意思是用于整个数据库系统控制的元数据。在后面看Version的时候我们会再说。

下面我们看看几个接口的实现:

  • 首先是Append, 我们仍然采用接近伪代码的描述方式:

    Status Append(const Slice& data) override {
      // 1. Write as many data as possible into Buffer
      std::memcpy(buf_ + pos_, write_data, copy_size);
      if (no remaining data) {
        return Ok;
      }
      
      FlushBuffer();
      
      if (remaining data can be written into buffer) { 
        // Write remaining data into buffer
        std::memcpy(buf_, write_data, write_size);
        return Ok;
      }
      // Else directly write remaining data into file
      return WriteUnbuffered(write_data, write_size);
    }
  • Close直接调用Posix的close函数关闭指定文件,Flush函数直接调用FlushBuffer函数。

  • Sync函数首先处理当前文件为MANIFEST的特殊情况:

    Status status = SyncDirIfManifest();
    if (!status.ok()) {
      return status;
    }

SyncDirIfManifest的大概逻辑是查看当前file是否为manifest,如果是对其所在的directory进行sync操作。但我还没有弄清楚这里的逻辑为什么要这么实现。

然后刷新buffer并与文件系统同步:

status = FlushBuffer();
if (!status.ok()) {
  return status;
}
return SyncFd(fd_, filename_);

SyncFd主要是调用fsync, fcntl等系统调用确保写入数据的durability。

其他私有成员或者静态成员的实现这里不再赘述,事实上它们的实现也相当自然。


PosixFileLock
class PosixFileLock : public FileLock {
 public:
  PosixFileLock(int fd, std::string filename)
      : fd_(fd), filename_(std::move(filename)) {}

  int fd() const { return fd_; }
  const std::string& filename() const { return filename_; }

 private:
  const int fd_;
  const std::string filename_;
};

PosixFileLock仅提供了保存被锁文件的文件名和描述符的功能。实际对并发控制的实现是在PosixEnv中实现的PosixLockTable, 这将在后面的文件系统控制部分进行描述。


Posix文件对象的创建

PosixEnv实现了NewXXXFile接口,它们的统一逻辑大概如下:

class PosixEnv : public Env {
  Status NewXXXFile(const std::string& filename, XXXFile** result) override {
    int fd = ::open(filename.c_str(), flags, mode);
    // Deal with error
    *result = new XXXFile(filename, fd);
    return Ok;
  }
}

大部分情况下,不同的接口仅在::open调用中的flag和mode位置有所不同,例如:

File Type flags mode
SequentialFile O_RDONLY |kOpenBaseFlags Default
RandomAccessFile O_RDONLY | kOpenBaseFlags Default
WritableFile O_TRUNC | O_WRONLY | O_CREAT | kOpenBaseFlags 0644
AppendableFile O_APPEND | O_WRONLY | O_CREAT | kOpenBaseFlags 0644

(注:kOpenBaseFlags = O_CLOEXEC,用于添加并发访问的支持)

上述四种文件类型中,NewRandomAccessFile有所不同,由于有两种文件都支持该接口,因此需要在创建时考虑创建哪一种。LevelDB的策略是,尽可能使用PosixMmapReadableFile,如果limiter不够,才创建RandomAccessFile。上述策略的理由是,创建PosixMmapReadableFile只需要一次mmap系统调用,之后每次都可以从映射的内存段读数据,无系统调用开销;而后者则需要每次读都调用一次read。对于RandomAccessFile这种只读的文件类型而言,显然是采用mmap的方式能获得更好的性能。

  if (!mmap_limiter_.Acquire()) {
    *result = new PosixRandomAccessFile(filename, fd, &fd_limiter_);
    return Status::OK();
  }

  uint64_t file_size;
  Status status = GetFileSize(filename, &file_size);
  if (status.ok()) {
    void* mmap_base =
        ::mmap(/*addr=*/nullptr, file_size, PROT_READ, MAP_SHARED, fd, 0);
    if (mmap_base != MAP_FAILED) {
      *result = new PosixMmapReadableFile(filename,
                                          reinterpret_cast<char*>(mmap_base),
                                          file_size, &mmap_limiter_);
    } else {
      status = PosixError(filename, errno);
    }
  }
  ::close(fd);
  if (!status.ok()) {
    mmap_limiter_.Release();
  }
  return status;
}

代码中值得注意的一点是,在创建MmapReadableFile时,完成mmap映射之后可以通过调用close关闭fd对应的文件描述符,这是因为mmap会给该文件的引用计数加1,因此文件真正被“释放”的时机是在unmap的地方。


Posix文件系统控制相关实现

系统调用

PosixEnv提供了文件系统控制接口,包括创建和删除文件夹,重命名文件等功能。这些接口基本上是对Posix系统调用的直接使用,只是额外在系统调用发生错误时对错误进行了处理。下面我们用一个表格来简单说明每个调用涉及到的系统调用, 为了简单起见,我们省略了参数。

PosixEnv接口 涉及到的系统调用
FileExists access(), 用于检测一个文件的accessbility, 使用F_OKFlag检测文件是否存在
GetChildren opendir(), 用于打开一个文件夹并把信息填入到DIR结构体中
readdir(), 用于从DIR结构体中读取每个entry
RemoveFile unlink(), 减少指定文件的链接数
CreateDir mkdir(), 创建指定名称的文件夹
RemoveDir rmdir(), 删除指定名称的文件夹
GetFileSize stat(),获取指定名称的文件信息,填入到stat结构体中,并从stat结构体中直接读取file_size字段获得文件大小
RenameFile std::rename(), 重命名指定名称的文件
LockFile open(), 打开指定文件,如果不存在则创建
fnctl(), 给该文件指定范围加锁
UnlockFile fnctl(), 给文件指定范围解锁
对文件的加锁和解锁实现

PosixEnv在对文件加锁和解锁时使用了两层,一层是逻辑层,即在用户向PosixEnv发起加锁和解锁请求时由PosixEnv来决定是否成功;另一层是在文件系统层面,使用fnctl对整个文件上锁或解锁。

在逻辑层,PosixEnv使用一个LockTable的结构,用于保存所有已被上锁的文件的文件名:

class PosixLockTable {
 public:
  bool Insert(const std::string& fname) LOCKS_EXCLUDED(mu_) {
    mu_.Lock();
    bool succeeded = locked_files_.insert(fname).second;
    mu_.Unlock();
    return succeeded;
  }
  void Remove(const std::string& fname) LOCKS_EXCLUDED(mu_) {
    mu_.Lock();
    locked_files_.erase(fname);
    mu_.Unlock();
  }

 private:
  port::Mutex mu_;
  std::set<std::string> locked_files_ GUARDED_BY(mu_);
};

该结果使用互斥锁来保证线程安全,并提供InsertRemove函数来动态修改保存的文件名。

PosixEnv尝试给一个文件上锁时,它会查看PosixTable中该文件是否已经上锁,如果没有,则将其加入PosixTable中,并进行文件系统的加锁;如果上述过程都成功了,则返回一个PosixFileLock的handle。

UnlockFile是逆向的过程,它首先在文件系统层面解除该文件的锁,然后将其从PosixLockTable中移除。


线程控制相关实现

PosixEnv实现如下线程相关的接口:

接口名称 描述
StartThread(thread_main, thread_main_arg) 创建并执行一个线程,线程的函数体是thread_main, 参数是thread_main_arg
Schedule(thread_main, thread_main_arg) 让后台线程执行指定的任务,但并不是马上执行
SleepForMicroseconds(micros) 让主线程sleep指定数量的时间

上述接口中,StartThreadSleepForMicroseconds都比较直接,它们分别通过调用C++11标准库中的std::thread对象创建和std::this_thread::sleep_for来实现。但leveldb早期开发的时候应该还没有C++11标准,所以这两个函数实现都是操作系统相关的,这也是它们被设置为纯虚函数的原因。

Schedule将一个指定的任务交给后台线程执行,由于前端可能源源不断地涌入任务,因此PosixEnv维护了一个队列,用于暂存任务。实际上,执行Schedule的主线程和执行实际任务的Background线程构成了一个生产者-消费者模型。

每个任务由一个BackgroundWorkItem组成,实际上就是一个函数指针与参数指针组成的结构体:

struct BackgroundWorkItem {
  void (*const function)(void*);
  void* const arg;
};

在调用Schedule时,主线程将一个BackgroundWorkItem加入到任务队列中,然后使用信号量通知后台线程执行对应的任务:

void PosixEnv::Schedule(
    void (*background_work_function)(void* background_work_arg),
    void* background_work_arg) {
  background_work_mutex_.Lock();

  // Start the background thread, if we haven't done so already.
  if (!started_background_thread_) {
    started_background_thread_ = true;
    std::thread background_thread(PosixEnv::BackgroundThreadEntryPoint, this);
    background_thread.detach();
  }
  
  // If the queue is empty, the background thread may be waiting for work.
  if (background_work_queue_.empty()) {
    background_work_cv_.Signal();
  }

  background_work_queue_.emplace(background_work_function, background_work_arg);
  background_work_mutex_.Unlock();
}

而后台线程则持续性地检查任务队列,一旦有任务则执行,没有则阻塞:

void PosixEnv::BackgroundThreadMain() {
  while (true) {
    background_work_mutex_.Lock();

    // Wait until there is work to be done.
    while (background_work_queue_.empty()) {
      background_work_cv_.Wait();
    }

    assert(!background_work_queue_.empty());
    auto background_work_function = background_work_queue_.front().function;
    void* background_work_arg = background_work_queue_.front().arg;
    background_work_queue_.pop();

    background_work_mutex_.Unlock();
    background_work_function(background_work_arg);
  }
}