fork of https://github.com/oxigraph/rocksdb and https://github.com/facebook/rocksdb for nextgraph and oxigraph
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
327 lines
10 KiB
327 lines
10 KiB
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "rocksdb/table_builder.h"
|
|
|
|
#include <assert.h>
|
|
#include "rocksdb/comparator.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/filter_policy.h"
|
|
#include "rocksdb/options.h"
|
|
#include "table/block_builder.h"
|
|
#include "table/filter_block.h"
|
|
#include "table/format.h"
|
|
#include "util/coding.h"
|
|
#include "util/crc32c.h"
|
|
#include "util/stop_watch.h"
|
|
|
|
namespace leveldb {
|
|
|
|
struct TableBuilder::Rep {
|
|
Options options;
|
|
Options index_block_options;
|
|
WritableFile* file;
|
|
uint64_t offset;
|
|
Status status;
|
|
BlockBuilder data_block;
|
|
BlockBuilder index_block;
|
|
std::string last_key;
|
|
int64_t num_entries;
|
|
bool closed; // Either Finish() or Abandon() has been called.
|
|
FilterBlockBuilder* filter_block;
|
|
|
|
// We do not emit the index entry for a block until we have seen the
|
|
// first key for the next data block. This allows us to use shorter
|
|
// keys in the index block. For example, consider a block boundary
|
|
// between the keys "the quick brown fox" and "the who". We can use
|
|
// "the r" as the key for the index block entry since it is >= all
|
|
// entries in the first block and < all entries in subsequent
|
|
// blocks.
|
|
//
|
|
// Invariant: r->pending_index_entry is true only if data_block is empty.
|
|
bool pending_index_entry;
|
|
BlockHandle pending_handle; // Handle to add to index block
|
|
|
|
std::string compressed_output;
|
|
|
|
Rep(const Options& opt, WritableFile* f)
|
|
: options(opt),
|
|
index_block_options(opt),
|
|
file(f),
|
|
offset(0),
|
|
data_block(&options),
|
|
index_block(&index_block_options),
|
|
num_entries(0),
|
|
closed(false),
|
|
filter_block(opt.filter_policy == nullptr ? nullptr
|
|
: new FilterBlockBuilder(opt)),
|
|
pending_index_entry(false) {
|
|
index_block_options.block_restart_interval = 1;
|
|
}
|
|
};
|
|
|
|
TableBuilder::TableBuilder(const Options& options, WritableFile* file,
|
|
int level)
|
|
: rep_(new Rep(options, file)), level_(level) {
|
|
if (rep_->filter_block != nullptr) {
|
|
rep_->filter_block->StartBlock(0);
|
|
}
|
|
}
|
|
|
|
TableBuilder::~TableBuilder() {
|
|
assert(rep_->closed); // Catch errors where caller forgot to call Finish()
|
|
delete rep_->filter_block;
|
|
delete rep_;
|
|
}
|
|
|
|
Status TableBuilder::ChangeOptions(const Options& options) {
|
|
// Note: if more fields are added to Options, update
|
|
// this function to catch changes that should not be allowed to
|
|
// change in the middle of building a Table.
|
|
if (options.comparator != rep_->options.comparator) {
|
|
return Status::InvalidArgument("changing comparator while building table");
|
|
}
|
|
|
|
// Note that any live BlockBuilders point to rep_->options and therefore
|
|
// will automatically pick up the updated options.
|
|
rep_->options = options;
|
|
rep_->index_block_options = options;
|
|
rep_->index_block_options.block_restart_interval = 1;
|
|
return Status::OK();
|
|
}
|
|
|
|
void TableBuilder::Add(const Slice& key, const Slice& value) {
|
|
Rep* r = rep_;
|
|
assert(!r->closed);
|
|
if (!ok()) return;
|
|
if (r->num_entries > 0) {
|
|
assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
|
|
}
|
|
|
|
const size_t curr_size = r->data_block.CurrentSizeEstimate();
|
|
const size_t estimated_size_after = r->data_block.EstimateSizeAfterKV(key,
|
|
value);
|
|
// Do flush if one of the below two conditions is true:
|
|
// 1) if the current estimated size already exceeds the block size,
|
|
// 2) block_size_deviation is set and the estimated size after appending
|
|
// the kv will exceed the block size and the current size is under the
|
|
// the deviation.
|
|
if (curr_size >= r->options.block_size ||
|
|
(estimated_size_after > r->options.block_size &&
|
|
r->options.block_size_deviation > 0 &&
|
|
(curr_size * 100) >
|
|
r->options.block_size * (100 - r->options.block_size_deviation))) {
|
|
Flush();
|
|
}
|
|
|
|
if (r->pending_index_entry) {
|
|
assert(r->data_block.empty());
|
|
r->options.comparator->FindShortestSeparator(&r->last_key, key);
|
|
std::string handle_encoding;
|
|
r->pending_handle.EncodeTo(&handle_encoding);
|
|
r->index_block.Add(r->last_key, Slice(handle_encoding));
|
|
r->pending_index_entry = false;
|
|
}
|
|
|
|
if (r->filter_block != nullptr) {
|
|
r->filter_block->AddKey(key);
|
|
}
|
|
|
|
r->last_key.assign(key.data(), key.size());
|
|
r->num_entries++;
|
|
r->data_block.Add(key, value);
|
|
}
|
|
|
|
void TableBuilder::Flush() {
|
|
Rep* r = rep_;
|
|
assert(!r->closed);
|
|
if (!ok()) return;
|
|
if (r->data_block.empty()) return;
|
|
assert(!r->pending_index_entry);
|
|
WriteBlock(&r->data_block, &r->pending_handle);
|
|
if (ok()) {
|
|
r->pending_index_entry = true;
|
|
r->status = r->file->Flush();
|
|
}
|
|
if (r->filter_block != nullptr) {
|
|
r->filter_block->StartBlock(r->offset);
|
|
}
|
|
}
|
|
|
|
static bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
|
|
// Check to see if compressed less than 12.5%
|
|
return compressed_size < raw_size - (raw_size / 8u);
|
|
}
|
|
|
|
void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
|
|
// File format contains a sequence of blocks where each block has:
|
|
// block_data: uint8[n]
|
|
// type: uint8
|
|
// crc: uint32
|
|
assert(ok());
|
|
Rep* r = rep_;
|
|
Slice raw = block->Finish();
|
|
|
|
Slice block_contents;
|
|
std::string* compressed = &r->compressed_output;
|
|
CompressionType type;
|
|
// If the use has specified a different compression level for each level,
|
|
// then pick the compresison for that level.
|
|
if (!r->options.compression_per_level.empty()) {
|
|
const int n = r->options.compression_per_level.size();
|
|
// It is possible for level_ to be -1; in that case, we use level
|
|
// 0's compression. This occurs mostly in backwards compatibility
|
|
// situations when the builder doesn't know what level the file
|
|
// belongs to. Likewise, if level_ is beyond the end of the
|
|
// specified compression levels, use the last value.
|
|
type = r->options.compression_per_level[std::max(0, std::min(level_, n))];
|
|
} else {
|
|
type = r->options.compression;
|
|
}
|
|
switch (type) {
|
|
case kNoCompression:
|
|
block_contents = raw;
|
|
break;
|
|
|
|
case kSnappyCompression: {
|
|
std::string* compressed = &r->compressed_output;
|
|
if (port::Snappy_Compress(r->options.compression_opts, raw.data(),
|
|
raw.size(), compressed) &&
|
|
GoodCompressionRatio(compressed->size(), raw.size())) {
|
|
block_contents = *compressed;
|
|
} else {
|
|
// Snappy not supported, or not good compression ratio, so just
|
|
// store uncompressed form
|
|
block_contents = raw;
|
|
type = kNoCompression;
|
|
}
|
|
break;
|
|
}
|
|
case kZlibCompression:
|
|
if (port::Zlib_Compress(r->options.compression_opts, raw.data(),
|
|
raw.size(), compressed) &&
|
|
GoodCompressionRatio(compressed->size(), raw.size())) {
|
|
block_contents = *compressed;
|
|
} else {
|
|
// Zlib not supported, or not good compression ratio, so just
|
|
// store uncompressed form
|
|
block_contents = raw;
|
|
type = kNoCompression;
|
|
}
|
|
break;
|
|
case kBZip2Compression:
|
|
if (port::BZip2_Compress(r->options.compression_opts, raw.data(),
|
|
raw.size(), compressed) &&
|
|
GoodCompressionRatio(compressed->size(), raw.size())) {
|
|
block_contents = *compressed;
|
|
} else {
|
|
// BZip not supported, or not good compression ratio, so just
|
|
// store uncompressed form
|
|
block_contents = raw;
|
|
type = kNoCompression;
|
|
}
|
|
break;
|
|
}
|
|
WriteRawBlock(block_contents, type, handle);
|
|
r->compressed_output.clear();
|
|
block->Reset();
|
|
}
|
|
|
|
void TableBuilder::WriteRawBlock(const Slice& block_contents,
|
|
CompressionType type,
|
|
BlockHandle* handle) {
|
|
Rep* r = rep_;
|
|
StopWatch sw(r->options.env, r->options.statistics, WRITE_RAW_BLOCK_MICROS);
|
|
handle->set_offset(r->offset);
|
|
handle->set_size(block_contents.size());
|
|
r->status = r->file->Append(block_contents);
|
|
if (r->status.ok()) {
|
|
char trailer[kBlockTrailerSize];
|
|
trailer[0] = type;
|
|
uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
|
|
crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type
|
|
EncodeFixed32(trailer+1, crc32c::Mask(crc));
|
|
r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
|
|
if (r->status.ok()) {
|
|
r->offset += block_contents.size() + kBlockTrailerSize;
|
|
}
|
|
}
|
|
}
|
|
|
|
Status TableBuilder::status() const {
|
|
return rep_->status;
|
|
}
|
|
|
|
Status TableBuilder::Finish() {
|
|
Rep* r = rep_;
|
|
Flush();
|
|
assert(!r->closed);
|
|
r->closed = true;
|
|
|
|
BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
|
|
|
|
// Write filter block
|
|
if (ok() && r->filter_block != nullptr) {
|
|
WriteRawBlock(r->filter_block->Finish(), kNoCompression,
|
|
&filter_block_handle);
|
|
}
|
|
|
|
// Write metaindex block
|
|
if (ok()) {
|
|
BlockBuilder meta_index_block(&r->options);
|
|
if (r->filter_block != nullptr) {
|
|
// Add mapping from "filter.Name" to location of filter data
|
|
std::string key = "filter.";
|
|
key.append(r->options.filter_policy->Name());
|
|
std::string handle_encoding;
|
|
filter_block_handle.EncodeTo(&handle_encoding);
|
|
meta_index_block.Add(key, handle_encoding);
|
|
}
|
|
|
|
// TODO(postrelease): Add stats and other meta blocks
|
|
WriteBlock(&meta_index_block, &metaindex_block_handle);
|
|
}
|
|
|
|
// Write index block
|
|
if (ok()) {
|
|
if (r->pending_index_entry) {
|
|
r->options.comparator->FindShortSuccessor(&r->last_key);
|
|
std::string handle_encoding;
|
|
r->pending_handle.EncodeTo(&handle_encoding);
|
|
r->index_block.Add(r->last_key, Slice(handle_encoding));
|
|
r->pending_index_entry = false;
|
|
}
|
|
WriteBlock(&r->index_block, &index_block_handle);
|
|
}
|
|
|
|
// Write footer
|
|
if (ok()) {
|
|
Footer footer;
|
|
footer.set_metaindex_handle(metaindex_block_handle);
|
|
footer.set_index_handle(index_block_handle);
|
|
std::string footer_encoding;
|
|
footer.EncodeTo(&footer_encoding);
|
|
r->status = r->file->Append(footer_encoding);
|
|
if (r->status.ok()) {
|
|
r->offset += footer_encoding.size();
|
|
}
|
|
}
|
|
return r->status;
|
|
}
|
|
|
|
void TableBuilder::Abandon() {
|
|
Rep* r = rep_;
|
|
assert(!r->closed);
|
|
r->closed = true;
|
|
}
|
|
|
|
uint64_t TableBuilder::NumEntries() const {
|
|
return rep_->num_entries;
|
|
}
|
|
|
|
uint64_t TableBuilder::FileSize() const {
|
|
return rep_->offset;
|
|
}
|
|
|
|
} // namespace leveldb
|
|
|