澳门新萄京官方网站-www.8455.com-澳门新萄京赌场网址

澳门新萄京官方网站:开源项目rocksdb分析,Com

2019-09-22 作者:数据库网络   |   浏览(85)

概述

       后面会在微信公众号中推送后续的翻译章节,与TensorFlow的第一次接触系列已整理成pdf,关注公众号后回复:tensorflow即可下载~~公众号:源码之心

一 RocksDB的磁盘数据组织层次

rocksdb 介绍

Rocksdb同样是一种基于operation log的文件系统。
由于采用了op log,将对磁盘的随机写转换成了对op log的顺序写,最新的数据是存储在内存的memrory中,可以提高IO效率。每一个的column family分别有一个memtable与sstablle.当某一coloumn family内存中的memory table超过阈值时,转换成immute memtable并创建新的op log,immute memtable由一系列的memtable组成,它们是只读的,可供查询,不能更新数据。当immute memtable的数目超过设置的数值时,会触发flush,DB会调度后台线程将多个memtable合并后再dump到磁盘生成Level0中一个新的sstable文件,Level0中的sstable文件不断累积,会触发compaction,DB会调度后台compaction线程将Level0中的sstable文件根据key与Level1中的sstable合并并生成新的sstable,依次类推,根据key的空间从低层往上compact,最终形成了一层层的结构,层级数目是由用户设置的。

leveldb中,memtable在内存中核心s的数据结构为skiplist,而在rocksdb中,memtable在内存中的形式有三种:skiplist、hash-skiplist、hash-linklist,从字面中就可以看出数据结构的大体形式,hash-skiplist就是每个hash bucket中是一个skiplist,hash-linklist中,每个hash bucket中是一个link-list,启用何用数据结构可在配置中选择。

     compaction主要包括两类:将内存中imutable 转储到磁盘上sst的过程称之为flush或者minor compaction;磁盘上的sst文件从低层向高层转储的过程称之为compaction或者是major compaction。对于myrocks来说,compaction过程都由后台线程触发,对于minor compaction和major compaction分别对应一组线程,通过参数rocksdb_max_background_flushes和rocksdb_max_background_compactions可以来控制。通过minor compaction,内存中的数据不断地写入的磁盘,保证有足够的内存来应对新的写入;而通过major compaction,多层之间的SST文件的重复数据和无用的数据可以迅速减少,进而减少sst文件占用的磁盘空间。对于读而言,由于需要访问的sst文件变少了,也会有性能的提升。由于compaction过程在后台不断地做,单位时间内compaction的内容不多,不会影响整体的性能,当然这个可以根据实际的场景对参数进行调整,compaction的整体架构可以参见图1。了解了compaction的基本概念,下面会详细介绍compaction的流程,主要包括两部分flush(minor compaction),compaction(major compaction),对应的入口函数分别是BackgroundFlush和BackgroundCompaction。

澳门新萄京官方网站 1

1 磁盘文件的组织方式

澳门新萄京官方网站,rocksdb在磁盘上的文件是分为多层的,分别叫做level-0, level-1等等
level0上包含的文件,是由内存中的memtable dump到磁盘上生成的,单个文件内部按key有序,文件之间无序。
其它level上的多个文件都是按照key有序的。

澳门新萄京官方网站 2

sst文件在磁盘上的组织方式

日志结构树

从概念上说,最基本的LSM是很简单的 。将之前使用一个大的查找结构(造成随机读写,影响写性能),变换为将写操作顺序的保存到一些相似的有序文件(也就是sstable)中。所以每个文件包 含短时间内的一些改动。因为文件是有序的,所以之后查找也会很快。文件是不可修改的,他们永远不会被更新,新的更新操作只会写到新的文件中。读操作检查很 有的文件。通过周期性的合并这些文件来减少文件个数。

澳门新萄京官方网站 3

源码之心

2 data range partition

非0 level上的key,按序分片,保存在不同的文件中。

澳门新萄京官方网站 4

data range partition

读操作

澳门新萄京官方网站 5

LevelDb 读操作顺序

Status DBImpl::Get(const ReadOptions& read_options,
                   ColumnFamilyHandle* column_family, const Slice& key,
                   PinnableSlice* value) {
  return GetImpl(read_options, column_family, key, value);
}

Status DBImpl::GetImpl(const ReadOptions& read_options,
                       ColumnFamilyHandle* column_family, const Slice& key,
                       PinnableSlice* pinnable_val, bool* value_found,
                       ReadCallback* callback, bool* is_blob_index) {
  assert(pinnable_val != nullptr);
  StopWatch sw(env_, stats_, DB_GET);
  PERF_TIMER_GUARD(get_snapshot_time);

  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  auto cfd = cfh->cfd();

  // Acquire SuperVersion
// holds references to memtable, all immutable memtables and version

  SuperVersion* sv = GetAndRefSuperVersion(cfd);

  TEST_SYNC_POINT("DBImpl::GetImpl:1");
  TEST_SYNC_POINT("DBImpl::GetImpl:2");

  SequenceNumber snapshot;
  if (read_options.snapshot != nullptr) {
    // Note: In WritePrepared txns this is not necessary but not harmful either.
    // Because prep_seq > snapshot => commit_seq > snapshot so if a snapshot is
    // specified we should be fine with skipping seq numbers that are greater
    // than that.

// Abstract handle to particular state of a DB.
// A Snapshot is an immutable object and can therefore be safely
// accessed from multiple threads without any external synchronization.

    snapshot = reinterpret_cast<const SnapshotImpl*>(
        read_options.snapshot)->number_;
  } else {
    // Since we get and reference the super version before getting
    // the snapshot number, without a mutex protection, it is possible
    // that a memtable switch happened in the middle and not all the
    // data for this snapshot is available. But it will contain all
    // the data available in the super version we have, which is also
    // a valid snapshot to read from.
    // We shouldn't get snapshot before finding and referencing the
    // super versipon because a flush happening in between may compact
    // away data for the snapshot, but the snapshot is earlier than the
    // data overwriting it, so users may see wrong results.
    snapshot = last_seq_same_as_publish_seq_
                   ? versions_->LastSequence()
                   : versions_->LastPublishedSequence();
  }
  TEST_SYNC_POINT("DBImpl::GetImpl:3");
  TEST_SYNC_POINT("DBImpl::GetImpl:4");

  // Prepare to store a list of merge operations if merge occurs.
  MergeContext merge_context;
  RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot);

  Status s;
  // First look in the memtable, then in the immutable memtable (if any).
  // s is both in/out. When in, s could either be OK or MergeInProgress.
  // merge_operands will contain the sequence of merges in the latter case.
  LookupKey lkey(key, snapshot);
  PERF_TIMER_STOP(get_snapshot_time);

  bool skip_memtable = (read_options.read_tier == kPersistedTier &&
                        has_unpersisted_data_.load(std::memory_order_relaxed));
  bool done = false;
  if (!skip_memtable) {
    if (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
                     &range_del_agg, read_options, callback, is_blob_index)) {
      done = true;
      pinnable_val->PinSelf();
      RecordTick(stats_, MEMTABLE_HIT);
    } else if ((s.ok() || s.IsMergeInProgress()) &&
               sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
                            &range_del_agg, read_options, callback,
                            is_blob_index)) {
      done = true;
      pinnable_val->PinSelf();
      RecordTick(stats_, MEMTABLE_HIT);
    }
    if (!done && !s.ok() && !s.IsMergeInProgress()) {
      ReturnAndCleanupSuperVersion(cfd, sv);
      return s;
    }
  }
  if (!done) {
    PERF_TIMER_GUARD(get_from_output_files_time);
    sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context,
                     &range_del_agg, value_found, nullptr, nullptr, callback,
                     is_blob_index);
    RecordTick(stats_, MEMTABLE_MISS);
  }

  {
    PERF_TIMER_GUARD(get_post_process_time);

    ReturnAndCleanupSuperVersion(cfd, sv);

    RecordTick(stats_, NUMBER_KEYS_READ);
    size_t size = pinnable_val->size();
    RecordTick(stats_, BYTES_READ, size);
    MeasureTime(stats_, BYTES_PER_READ, size);
    PERF_COUNTER_ADD(get_read_bytes, size);
  }
  return s;
}

其中,LookupKey 实现如下:

LookupKey::LookupKey(const Slice& _user_key, SequenceNumber s) {
  size_t usize = _user_key.size();
  size_t needed = usize   13;  // A conservative estimate
  char* dst;
  if (needed <= sizeof(space_)) {
    dst = space_;
  } else {
    dst = new char[needed];
  }
  start_ = dst;
  // NOTE: We don't support users keys of more than 2GB :)
  dst = EncodeVarint32(dst, static_cast<uint32_t>(usize   8));
  kstart_ = dst;
  memcpy(dst, _user_key.data(), usize);
  dst  = usize;
  EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
  dst  = 8;
  end_ = dst;
}

                                          图1

       因为本人对一些经典的开源项目很有兴趣,也想从大牛设计的开源系统中学习架构设计经验,所以喜欢分析一些开源代码,这次因为项目中需要使用rocksdb,故在使用的时候仔细分析了rocksdb的实现细节,从2015年11月11日下决心整理出这一系列的blog,也算是对工作的总结吧。分享出来希望能帮到有需要的朋友。因为之前已经读完LevelDB的源码,读的过程中也参考了网上的相关文章,此小节的介绍会与LevelDB有些类似,毕竟rocksdb是基于LevelDB设计实现的,只在一些地方做了优化而已,有些代码甚至都是一样的。源码分析的后面章节会具体分析rocksdb的实现。

3 key在SST文件中查找

每个level的文件都是整体有序,并且文件内有序的。
要在某个level上查找某个key时:

  • 先根据每个文件的start/end key对所有文件进行二分查找来确定哪些文件可能包含key
  • 再通过二分查找在候选的文件中定位key的准确位置
    这是一次对一个level上所有文件的二分查找的过程。

写操作

rocks db 的写操作是通过 WriteImpl实现的,其结构如下所示:

Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
  return WriteImpl(write_options, my_batch, nullptr, nullptr);
}

// The main write queue. This is the only write queue that updates LastSequence.
// When using one write queue, the same sequence also indicates the last
// published sequence.
Status DBImpl::WriteImpl(const WriteOptions& write_options,
                         WriteBatch* my_batch, WriteCallback* callback,
                         uint64_t* log_used, uint64_t log_ref,
                         bool disable_memtable, uint64_t* seq_used,
                         PreReleaseCallback* pre_release_callback) {
  if (my_batch == nullptr) {
    return Status::Corruption("Batch is nullptr!");
  }
  if (write_options.sync && write_options.disableWAL) {
    return Status::InvalidArgument("Sync writes has to enable WAL.");
  }
  if (two_write_queues_ && immutable_db_options_.enable_pipelined_write) {
    return Status::NotSupported(
        "pipelined_writes is not compatible with concurrent prepares");
  }
  if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) {
    return Status::NotSupported(
        "pipelined_writes is not compatible with seq_per_batch");
  }
  // Otherwise IsLatestPersistentState optimization does not make sense
  assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) ||
         disable_memtable);

  Status status;
  if (write_options.low_pri) {
    status = ThrottleLowPriWritesIfNeeded(write_options, my_batch);
    if (!status.ok()) {
      return status;
    }
  }

  if (two_write_queues_ && disable_memtable) {
    return WriteImplWALOnly(write_options, my_batch, callback, log_used,
                            log_ref, seq_used, pre_release_callback);
  }

  if (immutable_db_options_.enable_pipelined_write) {
    return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
                              log_ref, disable_memtable, seq_used);
  }

  PERF_TIMER_GUARD(write_pre_and_post_process_time);
  WriteThread::Writer w(write_options, my_batch, callback, log_ref,
                        disable_memtable, pre_release_callback);

  if (!write_options.disableWAL) {
    RecordTick(stats_, WRITE_WITH_WAL);
  }

  StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);

  write_thread_.JoinBatchGroup(&w);
  if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
    // we are a non-leader in a parallel group
    PERF_TIMER_GUARD(write_memtable_time);

    if (w.ShouldWriteToMemtable()) {
      ColumnFamilyMemTablesImpl column_family_memtables(
          versions_->GetColumnFamilySet());
      w.status = WriteBatchInternal::InsertInto(
          &w, w.sequence, &column_family_memtables, &flush_scheduler_,
          write_options.ignore_missing_column_families, 0 /*log_number*/, this,
          true /*concurrent_memtable_writes*/, seq_per_batch_);
    }

    if (write_thread_.CompleteParallelMemTableWriter(&w)) {
      // we're responsible for exit batch group
      for (auto* writer : *(w.write_group)) {
        if (!writer->CallbackFailed() && writer->pre_release_callback) {
          assert(writer->sequence != kMaxSequenceNumber);
          Status ws = writer->pre_release_callback->Callback(writer->sequence);
          if (!ws.ok()) {
            status = ws;
            break;
          }
        }
      }
      // TODO(myabandeh): propagate status to write_group
      auto last_sequence = w.write_group->last_sequence;
      versions_->SetLastSequence(last_sequence);
      MemTableInsertStatusCheck(w.status);
      write_thread_.ExitAsBatchGroupFollower(&w);
    }
    assert(w.state == WriteThread::STATE_COMPLETED);
    // STATE_COMPLETED conditional below handles exit

    status = w.FinalStatus();
  }
  if (w.state == WriteThread::STATE_COMPLETED) {
    if (log_used != nullptr) {
      *log_used = w.log_used;
    }
    if (seq_used != nullptr) {
      *seq_used = w.sequence;
    }
    // write is complete and leader has updated sequence
    return w.FinalStatus();
  }
  // else we are the leader of the write batch group
  assert(w.state == WriteThread::STATE_GROUP_LEADER);

  // Once reaches this point, the current writer "w" will try to do its write
  // job.  It may also pick up some of the remaining writers in the "writers_"
  // when it finds suitable, and finish them in the same write batch.
  // This is how a write job could be done by the other writer.
  WriteContext write_context;
  WriteThread::WriteGroup write_group;
  bool in_parallel_group = false;
  uint64_t last_sequence = kMaxSequenceNumber;
  if (!two_write_queues_) {
    last_sequence = versions_->LastSequence();
  }

  mutex_.Lock();

  bool need_log_sync = write_options.sync;
  bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
  if (!two_write_queues_ || !disable_memtable) {
    // With concurrent writes we do preprocess only in the write thread that
    // also does write to memtable to avoid sync issue on shared data structure
    // with the other thread
    status = PreprocessWrite(write_options, &need_log_sync, &write_context);
  }
  log::Writer* log_writer = logs_.back().writer;

  mutex_.Unlock();

  // Add to log and apply to memtable.  We can release the lock
  // during this phase since &w is currently responsible for logging
  // and protects against concurrent loggers and concurrent writes
  // into memtables

  last_batch_group_size_ =
      write_thread_.EnterAsBatchGroupLeader(&w, &write_group);

  if (status.ok()) {
    // Rules for when we can update the memtable concurrently
    // 1. supported by memtable
    // 2. Puts are not okay if inplace_update_support
    // 3. Merges are not okay
    //
    // Rules 1..2 are enforced by checking the options
    // during startup (CheckConcurrentWritesSupported), so if
    // options.allow_concurrent_memtable_write is true then they can be
    // assumed to be true.  Rule 3 is checked for each batch.  We could
    // relax rules 2 if we could prevent write batches from referring
    // more than once to a particular key.
    bool parallel = immutable_db_options_.allow_concurrent_memtable_write &&
                    write_group.size > 1;
    size_t total_count = 0;
    size_t valid_batches = 0;
    uint64_t total_byte_size = 0;
    for (auto* writer : write_group) {
      if (writer->CheckCallback(this)) {
        valid_batches  ;
        if (writer->ShouldWriteToMemtable()) {
          total_count  = WriteBatchInternal::Count(writer->batch);
          parallel = parallel && !writer->batch->HasMerge();
        }

        total_byte_size = WriteBatchInternal::AppendedByteSize(
            total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
      }
    }
    // Note about seq_per_batch_: either disableWAL is set for the entire write
    // group or not. In either case we inc seq for each write batch with no
    // failed callback. This means that there could be a batch with
    // disalbe_memtable in between; although we do not write this batch to
    // memtable it still consumes a seq. Otherwise, if !seq_per_batch_, we inc
    // the seq per valid written key to mem.
    size_t seq_inc = seq_per_batch_ ? valid_batches : total_count;

    const bool concurrent_update = two_write_queues_;
    // Update stats while we are an exclusive group leader, so we know
    // that nobody else can be writing to these particular stats.
    // We're optimistic, updating the stats before we successfully
    // commit.  That lets us release our leader status early.
    auto stats = default_cf_internal_stats_;
    stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count,
                      concurrent_update);
    RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
    stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size,
                      concurrent_update);
    RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
    stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1, concurrent_update);
    RecordTick(stats_, WRITE_DONE_BY_SELF);
    auto write_done_by_other = write_group.size - 1;
    if (write_done_by_other > 0) {
      stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, write_done_by_other,
                        concurrent_update);
      RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
    }
    MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size);

    if (write_options.disableWAL) {
      has_unpersisted_data_.store(true, std::memory_order_relaxed);
    }

    PERF_TIMER_STOP(write_pre_and_post_process_time);

    if (!two_write_queues_) {
      if (status.ok() && !write_options.disableWAL) {
        PERF_TIMER_GUARD(write_wal_time);
        status = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
                            need_log_dir_sync, last_sequence   1);
      }
    } else {
      if (status.ok() && !write_options.disableWAL) {
        PERF_TIMER_GUARD(write_wal_time);
        // LastAllocatedSequence is increased inside WriteToWAL under
        // wal_write_mutex_ to ensure ordered events in WAL
        status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
                                      seq_inc);
      } else {
        // Otherwise we inc seq number for memtable writes
        last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
      }
    }
    assert(last_sequence != kMaxSequenceNumber);
    const SequenceNumber current_sequence = last_sequence   1;
    last_sequence  = seq_inc;

    if (status.ok()) {
      PERF_TIMER_GUARD(write_memtable_time);

      if (!parallel) {
        // w.sequence will be set inside InsertInto
        w.status = WriteBatchInternal::InsertInto(
            write_group, current_sequence, column_family_memtables_.get(),
            &flush_scheduler_, write_options.ignore_missing_column_families,
            0 /*recovery_log_number*/, this, parallel, seq_per_batch_);
      } else {
        SequenceNumber next_sequence = current_sequence;
        // Note: the logic for advancing seq here must be consistent with the
        // logic in WriteBatchInternal::InsertInto(write_group...) as well as
        // with WriteBatchInternal::InsertInto(write_batch...) that is called on
        // the merged batch during recovery from the WAL.
        for (auto* writer : write_group) {
          if (writer->CallbackFailed()) {
            continue;
          }
          writer->sequence = next_sequence;
          if (seq_per_batch_) {
            next_sequence  ;
          } else if (writer->ShouldWriteToMemtable()) {
            next_sequence  = WriteBatchInternal::Count(writer->batch);
          }
        }
        write_group.last_sequence = last_sequence;
        write_group.running.store(static_cast<uint32_t>(write_group.size),
                                  std::memory_order_relaxed);
        write_thread_.LaunchParallelMemTableWriters(&write_group);
        in_parallel_group = true;

        // Each parallel follower is doing each own writes. The leader should
        // also do its own.
        if (w.ShouldWriteToMemtable()) {
          ColumnFamilyMemTablesImpl column_family_memtables(
              versions_->GetColumnFamilySet());
          assert(w.sequence == current_sequence);
          w.status = WriteBatchInternal::InsertInto(
              &w, w.sequence, &column_family_memtables, &flush_scheduler_,
              write_options.ignore_missing_column_families, 0 /*log_number*/,
              this, true /*concurrent_memtable_writes*/, seq_per_batch_);
        }
      }
      if (seq_used != nullptr) {
        *seq_used = w.sequence;
      }
    }
  }
  PERF_TIMER_START(write_pre_and_post_process_time);

  if (!w.CallbackFailed()) {
    WriteCallbackStatusCheck(status);
  }

  if (need_log_sync) {
    mutex_.Lock();
    MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
    mutex_.Unlock();
    // Requesting sync with two_write_queues_ is expected to be very rare. We
    // hance provide a simple implementation that is not necessarily efficient.
    if (two_write_queues_) {
      if (manual_wal_flush_) {
        status = FlushWAL(true);
      } else {
        status = SyncWAL();
      }
    }
  }

  bool should_exit_batch_group = true;
  if (in_parallel_group) {
    // CompleteParallelWorker returns true if this thread should
    // handle exit, false means somebody else did
    should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w);
  }
  if (should_exit_batch_group) {
    if (status.ok()) {
      for (auto* writer : write_group) {
        if (!writer->CallbackFailed() && writer->pre_release_callback) {
          assert(writer->sequence != kMaxSequenceNumber);
          Status ws = writer->pre_release_callback->Callback(writer->sequence);
          if (!ws.ok()) {
            status = ws;
            break;
          }
        }
      }
      versions_->SetLastSequence(last_sequence);
    }
    MemTableInsertStatusCheck(w.status);
    write_thread_.ExitAsBatchGroupLeader(write_group, status);
  }

  if (status.ok()) {
    status = w.FinalStatus();
  }
  return status;
}

flush(minor-compaction)

       Rocksdb是facebook开源的NOSQL存储系统,其设计是基于Google开源的Leveldb,优化了LevelDB中存在的一些问题,其性能号称要比LevelDB强,rocksdb的设计跟Leveldb的极其类似,读过LevelDB源码的再读rocksdb的源码基本毫无压力,rocksdb也包括了内存memtable,LRUcache,磁盘上的sstable,operation log等等。本系列就是从rocksdb的源码级别来分析其设计实现与性能

二 数据压缩 Compaction

Compaction操作

澳门新萄京官方网站 6

minor Compaction操作

跳表直接存到本地磁盘

澳门新萄京官方网站 7

major compaction 操作

多路归并排序算法

      Rockdb中在内存的数据都是通过memtable存储,主要包括两种形式,active-memtable和immutable-memtable。active-memtable是当前正在提供写操作的memtable,当active-memtable写入超过阀值(通过参数wirte_buffer_size控制),会将这个memtable标记为read-only,然后再创建一个新的memtable供新的写入,这个read-only的memtable就是immutable-memtable。我们所说的flush操作就是将imumutable-memtable 写入到level0的过程。flush过程以column family为单位进行,一个column family是一组sst文件的集合,在myrocks中一个表可以是一个单独的column family,也可以多个表共用一个column family。每个column family中可能包含一个或多个immutable-memtable,一个flush线程会抓取column family中所有的immutable-memtable进行merge,然后flush到level0。由于一个线程在flush过程中,新的写入也源源不断进来,进而产生新的immutable-memtable,其它flush线程可以新起一个任务进行flush,因此在rocksdb体系下,active-memtable->immutable-memtable->sst文件转换过程是流水作业,并且flush可以并发执行,相对于levelDB,并发compaction的速度要快很多。通过参数max_write_buffer_number可以控制memtable的总数量,如果写入非常快,而compaction很慢,会导致memtable数量超过阀值,导致write stall的严重后果。另外一个参数是min_write_buffer_number_to_merge,整个参数是控制至少几个immutable才会触发flush,默认是1。flush的基本流程如下:

       既然rocksdb是基于leveldb设计实现并优化了一些细节,那我们先看一下leveldb的基本框架,

1 L0 compaction

当L0的文件数量达到level0_file_num_compaction_trigger的值时,触发L0和L1的合并。通常必须将所有L0的文件合并到L1中,因为L0的文件的key是有交叠的(overlapping)。

澳门新萄京官方网站 8

L0与L1的compaction

Column Family

自从RocksDB 3.0引入支持column family,每个KV数据对能够指定关联唯一的column family(默认为“default”),做到相对独立的隔离存储;column family提供了逻辑上划分数据库的能力,支持跨column family进行原子写操作(借助WriteBatch实现)等。

所有的column family共享WAL日志文件(write-head log),但是每个column family有独立的MemTable和SSTable。
共享WAL有利于原子操作实现,更高效的成组提交。
独立的MemTable和SSTable则更方便数据压缩,配置选项,快速的删除某个column family。

参考

  • 写优化之JoinBatchGroup
  • leveldb 源码分析(一)
  • LevelDB 源码分析(二):主体结构
  • Log Structured Merge Trees(LSM) 原理
  • FlatBuffers 介绍
  • Android 数据库 ObjectBox 源码解析
  • Column Family-RocksDB源码剖析(1)
  • rocksdb读/写/空间放大分析

1.遍历immutable-list,如果没有其它线程flush,则加入队列

澳门新萄京官方网站 9

2 高层Compaction

当L0 compaction完成后,L1的文件总size或者文件数量可能会超过阈值,触发L1向L2的合并。从L1至少选择一个文件,合并到L2中key有交叠的文件中。

澳门新萄京官方网站 10

L1向L2合并

同样的,合并后可能会触发下一各level的compaction。

澳门新萄京官方网站 11

合并后的L2

澳门新萄京官方网站 12

L2向L3合并

合并后的L3也需要做Compaction.

澳门新萄京官方网站 13

合并后的L3

2.通过迭代器逐一扫描key-value,将key-value写入到data-block 

       由该架构图可以看到,Leveldb是由memtable, immute memtable,wal log,sstable组成内存中的memtalbe与imm memtable各为一个,imm memtable是由memtable达到阈值后转化而成的,其数据结构是一样的。这里对于Leveldb的具体实现细节这里不详细论述,有兴趣的可以参考下其它Leveldb的源码分析,或者继续看后续分析章节,对理解Leveldb也很有帮助。

3 并行Compaction

澳门新萄京官方网站 14

并行compaction

max_background_compactions控制了并行compaction的最大数量。

3.如果data block大小已经超过block_size(比如16k),或者已经key-value对是最后的一对,则触发一次block-flush

       rocksdb对leveldb的优化有:

4 L0 subcompaction

L0向L1的compaction不可以与其他level compaction并行。这可能成为整体compaction速度的瓶颈,可以通过设置max_subcompactions来加速L0到L1的compaction。

澳门新萄京官方网站 15

subcompaction

4.根据压缩算法对block进行压缩,并生成对应的index block记录(begin_key, last_key, offset)

增加了column family,有了列簇的概念,可把一些相关的key存储在一起,column famiy的设计挺有意思的,后面会单独分析

5 Compaction的选择策略

当多个level都满足触发compaction的条件,rocksdb通过计算得分来选择先做哪一个level的compaction。

  • 对于非0 level,score = 该level文件的总长度 / 阈值。已经正在做compaction的文件不计入总长度中。
  • 对于L0,score = max{文件数量 / level0_file_num_compaction_trigger, L0文件总长度 / max_bytes_for_level_base} 并且 L0文件数量 > level0_file_num_compaction_trigger。
    得分最高的level有限做compaction。

5.至此若干个block已经写入文件,并为每个block生成了indexblock记录

内存中有多个immute memtalbe,可防止Leveldb中的 write stall

6 compaction触发阈值

每一层的compaction阈值设置策略由level_compaction_dynamic_level_bytes来决定。

6.写入index block,meta block,metaindex block以及footer信息到文件尾

可支持多线程同时compation,理论上多线程同时comaption会比一个线程compation要快

当level_compaction_dynamic_level_bytes为false

L1 触发阈值:max_bytes_for_level_base
下面的level触发阈值通过公式计算:Target_Size(Ln 1) = Target_Size(Ln) * max_bytes_for_level_multiplier * max_bytes_for_level_multiplier_additional[n]. max_bytes_for_level_multiplier_additional

例如:
max_bytes_for_level_base = 16384
max_bytes_for_level_multiplier = 10
max_bytes_for_level_multiplier_additional = 1
那么每个level的触发阈值为 L1, L2, L3 and L4 分别为 16384, 163840, 1638400, and 16384000

7.将变化sst文件的元信息写入manifest文件

增加了merge operator,也就是原地更新,优化了modify的效率

当level_compaction_dynamic_level_bytes为true

最后一个level的文件长度总是固定的。
上面level触发阈值通过公式计算:Target_Size(Ln-1) = Target_Size(Ln) / max_bytes_for_level_multiplier
如果计算得到的值小于 max_bytes_for_level_base / max_bytes_for_level_multiplier, 那么该level将维持为空,L0做compaction时将直接merge到第一个有合法阈值的level上。
例如:
max_bytes_for_level_base = 1G
num_levels = 6
level 6 size = 276G
那么从L1到L6的触发阈值分别为:0, 0, 0.276G, 2.76G, 27.6G,276G。

这样分配,保证了稳定的LSM-tree结构。并且有90%的数据存储在最后一层,9%的数据保存在倒数第二层。

澳门新萄京官方网站 16

image.png


参考资料:官方wiki

      flush实质是对memtable中的记录进行一次有序遍历,在这个过程中会去掉一些冗余的记录,然后以block为单位写入sst文件,写入文件时根据压缩策略确定是否对block进行压缩。为什么会有冗余记录?这个主要是因为rocksdb中无论是insert,update还是delete,所有的写入操作都是以append的方式写入memtable,比如先后对key=1的记录执行三个操作insert(1),update(1),delete(1),在rocksdb中会产生3条不同记录。(在innodb中,对于同一个key的操作都是原地更新,只有一条记录)。实际上delete后这个记录不应该存在了,所以在合并时,可以干掉这些冗余的记录,比如这里的insert(1),update(1),这种合并使得flush到level0的sst已经比较紧凑。冗余记录主要有以下三种情况:(user_key, op)表示对user_key的操作,比如put,delete等。

支持DB级的TTL

1.对于(user_key,put),(user_key,delete),则可以将put删掉

flush与compation分开不同的线程池来调度,并具有不同的优先级,flush要优于compation,这样可以加快flush,防止stall

2.对于(user_key,single-delete),(user_key,put),single-delete保证put,delete成对出现,可以同时将两条记录都删掉。

对SSD存储做了优化,可以以in-memory方式运行

3.对于(user_key,put1),(user_key,put2),(user_key,put3)可以干掉比较老的put

增加了对 write ahead log(WAL)的管理机制,更方便管理WAL,WAL是binlog文件

对于以上3种情况,都要考虑snapshot,如果要删除的key在某个snapshot可见,则不能删除。注意第1种情况,(user_key,delete)这条记录是不能被删除的,因为对用户而言,这条记录已经不存在了,但由于rocksdb的LSM-tree存储结构,这个user_key的记录可能在level0,level1或者levelN,所以(user_key, delete)这条记录要保留,直到进行最后一层的compaction操作时才能将它干掉。第2种情况,single-delete是一个特殊的delete操作,这个操作保证了put,delete一定是成对出现的,所以flush时,可以将这两条记录同时干掉。 

上面只要简单的总结,更多的细节还需要进一步分析,rocksdb的基本框架如下图,

compaction(major-compaction)

澳门新萄京官方网站 17

       我们通常所说的compaction就是major-compaction,sst文件从低level合并到高level的过程,这个过程与flush过程类似,也是通过迭代器将多个sst文件的key进行merge,遍历key然后创建sst文件。flush的触发条件是immutable memtable的数量是否超过了min_write_buffer_number_to_merge,而compaction的触发条件是两类:文件个数和文件大小。对于level0,触发条件是sst文件个数,通过参数level0_file_num_compaction_trigger控制,score通过sst文件数目与level0_file_num_compaction_trigger的比值得到。level1-levelN触发条件是sst文件的大小,通过参数max_bytes_for_level_base和max_bytes_for_level_multiplier来控制每一层最大的容量,score是本层当前的总容量与能存放的最大容量的比值。rocksdb中通过一个任务队列维护compaction任务流,通过判断某个level是否满足compaction条件来加入队列,然后从队列中获取任务来进行compact。compaction的主要流程如下:

从图1-1与图1-2可以看出,Leveldb的框架与Rocksdb的框架十分类似,rocksdb从3.0开始支持ColumnFamily的概念,所以我们从ColumnFamily的角度来看rocksdb的框架,

1.首先找score最高的level,如果level的score>1,则选择从这个level进行compaction

澳门新萄京官方网站 18

2.根据一定的策略,从level中选择一个sst文件进行compact,对于level0,由于sst文件之间(minkey,maxkey)有重叠,所以可能有多个。

      每个columnfamilyl的meltable与sstable都是分开的,所以每一个column family都可以单独配置,所有column family共用同一个WAL log文件,可以保证跨column family写入时的原子性

3.从level中选出的文件,我们能计算出(minkey,maxkey)

      Rocksdb同样是一种基于operation log的文件系统,

4.从level 1中选出与(minkey,maxkey)有重叠的sst文件

      由于采用了op log,将对磁盘的随机写转换成了对op log的顺序写,最新的数据是存储在内存的memrory中,可以提高IO效率。每一个的column family分别有一个memtable与sstablle.当某一coloumn family内存中的memory table超过阈值时,转换成immute memtable并创建新的op log,immute memtable由一系列的memtable组成,它们是只读的,可供查询,不能更新数据。当immute memtable的数目超过设置的数值时,会触发flush,DB会调度后台线程将多个memtable合并后再dump到磁盘生成Level0中一个新的sstable文件,Level0中的sstable文件不断累积,会触发compaction,DB会调度后台compaction线程将Level0中的sstable文件根据key与Level1中的sstable合并并生成新的sstable,依次类推,根据key的空间从低层往上compact,最终形成了一层层的结构,层级数目是由用户设置的。

5.多个sst文件进行归并排序,合并写出到sst文件

二.

6.根据压缩策略,对写出的sst文件进行压缩

       在阅读rocksdb源码前,需要提前储备下一些基本知识,要是对LevelDB的架构已经比较熟悉的话,基本可以略过此处,开始关注后面的章节。

7.合并结束后,利用VersionEdit更新VersionSet,更新统计信息

必备知识点

     上面的步骤基本介绍了compaction的流程,简单来说就是选择某个level的sst文件与level 1中存在重叠的sst文件进行合并,然后将合并后的文件写入到level 1层的过程。通过判断每个level的score是否大于1,确定level是否需要compact;对于level中sst文件的选择,会有几种策略,默认是选择文件size较大,包含delete记录较多的sst文件,这种文件尽快合并有利于缩小空间。关于选择sst文件的策略可以参考options.h中的CompactionPri的定义。每次会从level中选取一个sst文件与下层compact,但由于level0中可能会有多个sst文件存在重叠的范围,因此一次compaction可能有多个level0的sst文件参与。rocksdb后台一般有多个线程执行compact任务,compaction线程不断地从任务队列中获取任务,也会不断地检查每个level是否需要compact,然后加入到队列,因此整体来看,compact过程是并发的,但并发的基本原则是,多个并发任务不会有重叠的key。对于level0来说,由于多个sst文件会存在重叠的key范围,根据level0,level 1中参与compact的sst文件key范围进行分区,划分为多个子任务进行compact,所有子任务并发执行,都执行完成后,整个compact过程结束。另外还有一个问题要说明的是,compact时并不是都需要合并,如果level中的输入sst文件与level 1中无重叠,则可以直接将文件移到level 1中。 

  1. 字节序

Universal Compaction

rocksdb中与leveldb类似,数字的存储是little-endian的,也就是说在把int32与int64转换成char*的函数中,是按照先低位再高位的顺序存放的。

      前面介绍的compaction类型是level compaction,在rocksdb中还有一类compaction,称之为Univeral Compaction。Univeral模式中,所有的sst文件都可能存在重叠的key范围。对于R1,R2,R3,...,Rn,每个R是一个sst文件,R1中包含了最新的数据,而Rn包含了最老的数据。合并的前提条件是sst文件数目大于level0_file_num_compaction_trigger,如果没有达到这个阀值,则不会触发合并。在满足前置条件的情况下,按优先级顺序触发以下合并。

  1. Varint

1.如果空间放大超过一定的比例,则所有sst进行一次compaction,所谓的full compaction,通过参数max_size_amplification_percent控制。

在把int32或int64转换到字符串中时,为减小存储空间,采用变长存储,也就是VarInt。变长存储的实现与PB中的基本一样,即每byte的有效存储是7bit的,最高的8bit位表示是否结束, 最高bit位为1时,表示后面还有一个byte的数字,为0时,表示已经结束,具体实现可参考Encodexxx和Decodexxx系列函数

2.如果前size(R1)小于size(R2)在一定比例,默认1%,则与R1与R2一起进行compaction,如果(R1 R2)*(100 ratio)0<R3,则将R3也加入到compaction任务中,依次顺序加入sst文件

  1. 基本数据结构

3.如果第1和第2种情况都没有compaction,则强制选择前N个文件进行合并。

3.1 Slice

      相对于level compaction,Univeral compaction由于每一次合并的文件较多,相对于level compaction的多层合并,写放大较小,付出的代价是空间放大较大。除了前面介绍的level compaction和univeral compaction,rocksdb还支持一种FIFO的compaction。FIFO顾名思义就是先进先出,这种模式周期性地删除旧数据。在FIFO模式下,所有文件都在level0,当sst文件总大小超过阀值max_table_files_size,则删除最老的sst文件。整个compaction是LSM-tree数据结构的核心,也是rocksDB的核心,本文梳理了几种compaction方式的基本流程,里面还有很多的细节没有涉及到,有兴趣的同学可以在本文的基础上仔细阅读源码,加深对compaction的理解。

rocksdb的基本数据结构,成员包括length与一个指向外部存储空间的指针,是二进制安全的,可以包含‘',提供了一些可与string/char*相互转换的接口,

附录

3.2 Status

相关文件:

rocksdb的状态类,将错误号与错误信息封装,同样是为了节省空间,Status类将返回码,错误信息与长度打包存储在一个字符数组中。

rocksdb/db/flush_job.cc 

格式如下:

include/rocksdb/universal_compaction.h

state_[0..3] ==消息长度

rocksdb/db/compaction_job.cc

state_[4]    ==消息code

db/compaction_picker.cc

state_[5..]  ==消息

rocksdb/table/block_based_table_builder.cc

3.3 Arena

相关接口:

rocksdb的简单内存池,

FlushMemTableToOutputFile //flush memtable到level0

申请内存时,将申请到的内存块放入std::vector blocks_中,在Arena的生命周期结束后,统一释放掉所有申请到的内存,内部结构如图1-3所示。

FlushJob::Run  //flush memtable 任务

澳门新萄京官方网站 19

PickMemtablesToFlush //选择可以flush的immutable-memtable

另外,rocksdb同样可以使用tcmalloc与jemalloc,在性能方面还是会有不小的提升.

WriteLevel0Table //刷sst文件到level0

3.4memtable

BuildTable //实现创建sst文件

leveldb中,memtable在内存中核心s的数据结构为skiplist,而在rocksdb中,memtable在内存中的形式有三种:skiplist、hash-skiplist、hash-linklist,从字面中就可以看出数据结构的大体形式,hash-skiplist就是每个hash bucket中是一个skiplist,hash-linklist中,每个hash bucket中是一个link-list,启用何用数据结构可在配置中选择,下面是skiplist的数据结构:

UniversalCompactionPicker::NeedsCompaction //是否需要compact

澳门新萄京官方网站 20

PickCompaction //需要进行compact的sst文件

下面是hash-skiplist的结构,

PickCompactionUniversalReadAmp //选择相邻的sst文件进行合并

澳门新萄京官方网站 21

NeedsCompaction //判断文件是否level是否需要compact

下面是hash-linklist的框架图,

LevelCompactionPicker::PickCompaction // 获取level中sst文件进行compact

澳门新萄京官方网站 22

LevelCompactionPicker::PickCompactionBySize

3.5 Cache

IsTrivialMove // 是否可以移动更深的Level,没有overlap的情况下。

澳门新萄京官方网站:开源项目rocksdb分析,Compaction原理分析。rocksdb内部根据双向链表实现了一个标准的LRUCache,由于LRUCache的设计实现比较通用经典,这里详细分析一下LRUCache的实现过程,根据LRUCache的从小到大的顺序来看基本组件,

ShouldFormSubcompactions  // 判断是否可以将compaction任务分片

A. LRUHandle结构体,Cache中最小粒度的元素,代表了一个k/v存储对,下面是LRUHandle的所有信息,

CompactionJob::Prepare    // 划分子任务

struct LRUHandle {

CompactionJob::Run()      // compaction的具体实现 

void* value;  // value信息

BlockBasedTableBuilder::Finish  //生成sst文件

void (*deleter)(const Slice&, void* value); //删除元素时,可调用的回调函数

参考文档

LRUHandle* next_hash; //解决hash冲突时,使用链表法

LRUHandle* next;//next/prev构成了双链,由LRU算法使用

LRUHandle* prev;

size_t charge;      // TODO(opt): Only allow uint32_t?

size_t key_length; //key的长度

uint32_t refs;      // a number of refs to this entry

// cache itself is counted as 1

bool in_cache;      // true, if this entry is referenced by the hash table

uint32_t hash;      // Hash of key(); used for fast sharding and comparisons

char key_data[1];   // Beginning of key

Slice key() const {

// For cheaper lookups, we allow a temporary Handle object

// to store a pointer to a key in "value".

if (next == this) {

return *(reinterpret_cast(value));

} else {

return Slice(key_data, key_length);

}

}

void Free() {

assert((refs == 1 && in_cache) || (refs == 0 && !in_cache));

(*deleter)(key(), value);

free(this);

}

};

B. 实现了rocksdb自己的HandleTable,其实就是实现了自己的hash table,  速度号称比g 4.4.3版本自带的hash table的速度要快不少

class HandleTable {

public:

HandleTable() : length_(0), elems_(0), list_(nullptr) { Resize(); }

template

void ApplyToAllCacheEntries(T func) {

for (uint32_t i = 0; i < length_; i ) {

LRUHandle* h = list_[i];

while (h != nullptr) {

auto n = h->next_hash;

assert(h->in_cache);

func(h);

h = n;

}

}

}

~HandleTable() {

ApplyToAllCacheEntries([](LRUHandle* h) {

if (h->refs == 1) {

h->Free();

}

});

delete[] list_;

}

LRUHandle* Lookup(const Slice& key, uint32_t hash) {

return *FindPointer(key, hash);

}

LRUHandle* Insert(LRUHandle* h) {

LRUHandle** ptr = FindPointer(h->key(), h->hash);

LRUHandle*澳门新萄京官方网站:开源项目rocksdb分析,Compaction原理分析。 old = *ptr;

h->next_hash = (old == nullptr ? nullptr : old->next_hash);

*ptr = h;

if (old == nullptr) {

elems_;

if (elems_ > length_) {

// Since each cache entry is fairly large, we aim for a small

// average linked list length (<= 1).

Resize();

}

}

return old;

}

LRUHandle* Remove(const Slice& key, uint32_t hash) {

LRUHandle** ptr = FindPointer(key, hash);

LRUHandle* result = *ptr;

if (result != nullptr) {

*ptr = result->next_hash;

--elems_;

}

return result;

}

private:

// The table consists of an array of buckets where each bucket is

// a linked list of cache entries that hash into the bucket.

uint32_t length_;

uint32_t elems_;

LRUHandle** list_;

// Return a pointer to slot that points to a cache entry that

// matches key/hash.  If there is no such cache entry, return a

// pointer to the trailing slot in the corresponding linked list.

LRUHandle** FindPointer(const Slice& key, uint32_t hash) {

LRUHandle** ptr = &list_[hash & (length_ - 1)];

while (*ptr != nullptr &&

((*ptr)->hash != hash || key != (*ptr)->key())) {

ptr = &(*ptr)->next_hash;

}

return ptr;

}

void Resize() {

uint32_t new_length = 16;

while (new_length < elems_ * 1.5) {

new_length *= 2;

}

LRUHandle** new_list = new LRUHandle*[new_length];

memset(new_list, 0, sizeof(new_list[0]) * new_length);

uint32_t count = 0;

for (uint32_t i = 0; i < length_; i ) {

LRUHandle* h = list_[i];

while (h != nullptr) {

LRUHandle* next = h->next_hash;

uint32_t hash = h->hash;

LRUHandle** ptr = &new_list[hash & (new_length - 1)];

h->next_hash = *ptr;

*ptr = h;

h = next;

count ;

}

}

assert(elems_ == count);

delete[] list_;

list_ = new_list;

length_ = new_length;

}

};

HandleTable的结构也是很简单,就是连续一些hash slot,然后用链表法解决hash 冲突,

澳门新萄京官方网站 23

C. LRUCahe

LRUCache是由LRUHandle与HandleTable组成,并且LRUCache内部是有锁的,所以外部多线程可以安全使用。

HandleTable很好理解,就是把Cache中的数据hash散列存储,可以加快查找速度;

LRUHandle lru_是个dummy pointer,也就是双链表的头,也就是LRU是由双链表保存的,队头是最早进入Cache的,队尾是最后进入Cache的,所以,在Cache满了需要释放空间的时候是从队头开始的,队尾是刚进入Cache的元素

class LRUCache {

public:

LRUCache();

~LRUCache();

// Separate from constructor so caller can easily make an array of LRUCache

// if current usage is more than new capacity, the function will attempt to

// free the needed space

void SetCapacity(size_t capacity);

// Like Cache methods, but with an extra "hash" parameter.

Cache::Handle* Insert(const Slice& key, uint32_t hash,

void* value, size_t charge,

void (*deleter)(const Slice& key, void* value));

Cache::Handle* Lookup(const Slice& key, uint32_t hash);

void Release(Cache::Handle* handle);

void Erase(const Slice& key, uint32_t hash);

// Although in some platforms the update of size_t is atomic, to make sure

// GetUsage() and GetPinnedUsage() work correctly under any platform, we'll

// protect them with mutex_.

size_t GetUsage() const {

MutexLock l(&mutex_);

return usage_;

}

size_t GetPinnedUsage() const {

MutexLock l(&mutex_);

assert(usage_ >= lru_usage_);

return usage_ - lru_usage_;

}

void ApplyToAllCacheEntries(void (*callback)(void*, size_t),

bool thread_safe);

private:

void LRU_Remove(LRUHandle* e);

void LRU_Append(LRUHandle* e);

// Just reduce the reference count by 1.

// Return true if last reference

bool Unref(LRUHandle* e);

// Free some space following strict LRU policy until enough space

// to hold (usage_ charge) is freed or the lru list is empty

// This function is not thread safe - it needs to be executed while

// holding the mutex_

void EvictFromLRU(size_t charge,

autovector* deleted);

// Initialized before use.

size_t capacity_;

// Memory size for entries residing in the cache

size_t usage_;

// Memory size for entries residing only in the LRU list

size_t lru_usage_;

// mutex_ protects the following state.

// We don't count mutex_ as the cache's internal state so semantically we

// don't mind mutex_ invoking the non-const actions.

mutable port::Mutex mutex_;

// Dummy head of LRU list.

// lru.prev is newest entry, lru.next is oldest entry.

// LRU contains items which can be evicted, ie reference only by cache

LRUHandle lru_;

HandleTable table_;

};

     到这,我们从设计现实就能看出一个标准的LRUCache已经成形了,接下来更有意思的是rocksdb又实现了一个ShardedLRUCache,它就是一个封装类,实现了分片LRUCache,在多线程使用的时候,根据key散列到不同的分片LRUCache中,以降低锁的竞争,尽量提高性能。下面一行的代码是一目了然,

LRUCache shard_[kNumShards]

D. 另一个很有用的就是ENV,基于不同的平台继承实现了不同的ENV,提供了系统级的各种实现,功能很是强大,对于想做跨平台软件的同学很有借鉴意义。ENV的具体实现就不贴了,主要就是太多。对于其它的工具类,具体可参考src下的相关实现。

本文由澳门新萄京官方网站发布于数据库网络,转载请注明出处:澳门新萄京官方网站:开源项目rocksdb分析,Com

关键词: