文章目录
大致写入流程为
写入
// 写入数据到log和mem_table
// 多线程调用-->DB::Put-->DBImpl::Write,Write()中,即该函数中将多线程的updates插入到一个Writer的队列(writers_)
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
// Writer封装了一些同步原语
Writer w(&mutex_);
w.batch = updates;
w.sync = options.sync;
w.done = false;
// 加锁,串型化执行。RAII机制,构造时加锁,析构时释放
MutexLock l(&mutex_);
// Writer的写队列
writers_.push_back(&w);
// 其他线程已经帮忙完成了w的写入 ||
// 抢到锁并且位于writers_首部(轮到当前线程执行了,其他线程已经执行完了前面的任务)
// 即,leveldb这里是选一个生产者来当消费者,(只有当这个生产者所加入的任务位于队列的头部或者该线程加入的任务已经被处理(即writer.done == true),线程才会被唤醒)
// 之所以加上!w.done,是因为同一时间可能会有多个任务被处理掉
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
// 如果w已经完成,直接返回
if (w.done) {
return w.status;
}
// 查看是否有足够空间写入。位于写队列头部的线程调用该方法
// May temporarily unlock and wait.
Status status = MakeRoomForWrite(updates == nullptr);
// last_sequence记录的是leveldb中已经写入的数据的最大序列号
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
// 将生产者队列中的所有任务组合成一个大的任务。即,将所有任务中的writebatch,组合在一起形成一个包含所有writebatch的K-V的大的writebatch,即updates,
// 因此,BuildBatchGroup函数里面会遍历当前writers_中的所有Writer,并将他们组合。
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
// 添加到日志并应用于memtable。我们可以在这个阶段释放锁,因为 &w 目前负责记录并防止并发记录器和并发写入mem_。
{
mutex_.Unlock();
// WriterBatch写入log文件,包括:sequence。操作count,每次操作的类型(Put/Delete),key/value及其长度
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
// 写入文件系统后不用担心数据丢失,继续插入MemTable
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
mutex_.Lock();
if (sync_error) {
// sync_error为true表示写log文件状态不确定,我们刚刚添加的日志记录在重新打开数据库时可能会也可能不会出现。所以我们强制数据库进入所有未来写入失败的模式。
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
if (write_batch == tmp_batch_) tmp_batch_->Clear();
// 设置last_sequence。sequence和versions_记录了单调递增的sequence,对于相同 key,value的判断版本先后顺序依赖该数值。
// 写入时,sequence递增的更新到 memtable,但是一次性的记录到versions_:
versions_->SetLastSequence(last_sequence);
}
while (true) {
// 呼应前面的条件变量等待,此处这个while循环即将已处理的任务从队列里移除,同时通知相应任务的生产者线程说明
// 他所添加的任务已经处理完成(通过ready->done)
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}
// Notify new head of write queue
// 唤醒队列未写入的第一个Writer
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
return status;
}
查看是否有足够空间写入
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
assert(!writers_.empty());
bool allow_delay = !force;
Status s;
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
s = bg_error_;
break;
} else if (allow_delay && versions_->NumLevelFiles(0) >=
config::kL0_SlowdownWritesTrigger) {
// 判断level0层的文件是否达到了kL0_SlowdownWritesTrigger的设置的值(默认为8),如果达到了就sleep 1ms,继续下个循环,等待合并线程将文件合并
// 我们即将达到 L0 文件数量的硬性限制。当我们达到硬限制时,不要将单个写入延迟几秒钟,而是开始将每个单独的写入延迟 1 毫秒以减少延迟差异。此外,这种延迟将一些 CPU 移交给压缩线程,以防它与编写器共享相同的内核。
mutex_.Unlock();
env_->SleepForMicroseconds(1000);
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
// memtable大小还没达到options_.write_buffer_size设置的大小,没达到就是符合写入条件,直接break
// There is room in current memtable(当前 memtable 中有空间)
break;
} else if (imm_ != nullptr) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.(我们已经填满了当前的memtable,但是之前的memtable还在被压缩,所以我们等待。)
Log(options_.info_log, "Current memtable full; waiting...\n");
background_work_finished_signal_.Wait();
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// 如果level0层文件数量太多(大于kL0_StopWritesTrigger,默认为12),则等待后台的 compaction 任务执行完成,并且直到满足条件
// There are too many level-0 files.
Log(options_.info_log, "Too many L0 files; waiting...\n");
background_work_finished_signal_.Wait();
} else {
// Attempt to switch to a new memtable and trigger compaction of old(尝试切换到新的内存表并触发旧内存的压缩)
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
// 创建一个新的log文件
WritableFile* lfile = nullptr;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
break;
}
// 清除对应log
delete log_;
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
// 通过改变指针指向,将Memtable转换成Immutable
imm_ = mem_;
has_imm_.store(true, std::memory_order_release);
// 构造一个新的Memtable
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
// 可能触发compaction
MaybeScheduleCompaction();
}
}
return s;
}