ldb to support --column_family option

Summary:
Add an option --column_family option, so that users can query or update specific column family.
Also add an create column family parameter to make unit test easier.
Still need to add unit tests.

Test Plan: Will add a test case in ldb python test.

Reviewers: yhchiang, rven, andrewkr, IslamAbdelRahman, kradhakrishnan, anthony

Reviewed By: anthony

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D53265
main
sdong 9 years ago
parent da33dfe188
commit 38e1d7fea3
  1. 4
      HISTORY.md
  2. 8
      include/rocksdb/ldb_tool.h
  3. 123
      tools/ldb_cmd.cc
  4. 141
      tools/ldb_cmd.h
  5. 31
      tools/ldb_test.py
  6. 20
      tools/ldb_tool.cc
  7. 4
      tools/reduce_levels_test.cc

@ -1,9 +1,11 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased ## Unreleased
### Public API Changes ### Public API Changes
* Add a new perf context level between kEnableCount and kEnableTime. Level 2 now doesn't include timers for mutexes. * Add a new perf context level between kEnableCount and kEnableTime. Level 2 now doesn't include timers for mutexes.
### New Features
* ldb tool now supports operations to non-default column families.
## 4.4.0 (1/14/2016) ## 4.4.0 (1/14/2016)
### Public API Changes ### Public API Changes
* Change names in CompactionPri and add a new one. * Change names in CompactionPri and add a new one.

@ -5,6 +5,8 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#pragma once #pragma once
#include <string> #include <string>
#include <vector>
#include "rocksdb/db.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
namespace rocksdb { namespace rocksdb {
@ -28,8 +30,10 @@ struct LDBOptions {
class LDBTool { class LDBTool {
public: public:
void Run(int argc, char** argv, Options db_options= Options(), void Run(
const LDBOptions& ldb_options = LDBOptions()); int argc, char** argv, Options db_options = Options(),
const LDBOptions& ldb_options = LDBOptions(),
const std::vector<ColumnFamilyDescriptor>* column_families = nullptr);
}; };
} // namespace rocksdb } // namespace rocksdb

@ -44,6 +44,7 @@ const string LDBCommand::ARG_PATH = "path";
const string LDBCommand::ARG_HEX = "hex"; const string LDBCommand::ARG_HEX = "hex";
const string LDBCommand::ARG_KEY_HEX = "key_hex"; const string LDBCommand::ARG_KEY_HEX = "key_hex";
const string LDBCommand::ARG_VALUE_HEX = "value_hex"; const string LDBCommand::ARG_VALUE_HEX = "value_hex";
const string LDBCommand::ARG_CF_NAME = "column_family";
const string LDBCommand::ARG_TTL = "ttl"; const string LDBCommand::ARG_TTL = "ttl";
const string LDBCommand::ARG_TTL_START = "start_time"; const string LDBCommand::ARG_TTL_START = "start_time";
const string LDBCommand::ARG_TTL_END = "end_time"; const string LDBCommand::ARG_TTL_END = "end_time";
@ -72,16 +73,14 @@ void DumpSstFile(std::string filename, bool output_hex, bool show_properties);
}; };
LDBCommand* LDBCommand::InitFromCmdLineArgs( LDBCommand* LDBCommand::InitFromCmdLineArgs(
int argc, int argc, char** argv, const Options& options,
char** argv, const LDBOptions& ldb_options,
const Options& options, const std::vector<ColumnFamilyDescriptor>* column_families) {
const LDBOptions& ldb_options
) {
vector<string> args; vector<string> args;
for (int i = 1; i < argc; i++) { for (int i = 1; i < argc; i++) {
args.push_back(argv[i]); args.push_back(argv[i]);
} }
return InitFromCmdLineArgs(args, options, ldb_options); return InitFromCmdLineArgs(args, options, ldb_options, column_families);
} }
/** /**
@ -95,10 +94,9 @@ LDBCommand* LDBCommand::InitFromCmdLineArgs(
* Returns nullptr if the command-line cannot be parsed. * Returns nullptr if the command-line cannot be parsed.
*/ */
LDBCommand* LDBCommand::InitFromCmdLineArgs( LDBCommand* LDBCommand::InitFromCmdLineArgs(
const vector<string>& args, const vector<string>& args, const Options& options,
const Options& options, const LDBOptions& ldb_options,
const LDBOptions& ldb_options const std::vector<ColumnFamilyDescriptor>* column_families) {
) {
// --x=y command line arguments are added as x->y map entries. // --x=y command line arguments are added as x->y map entries.
map<string, string> option_map; map<string, string> option_map;
@ -184,6 +182,8 @@ LDBCommand* LDBCommand::SelectCommand(
return new ManifestDumpCommand(cmdParams, option_map, flags); return new ManifestDumpCommand(cmdParams, option_map, flags);
} else if (cmd == ListColumnFamiliesCommand::Name()) { } else if (cmd == ListColumnFamiliesCommand::Name()) {
return new ListColumnFamiliesCommand(cmdParams, option_map, flags); return new ListColumnFamiliesCommand(cmdParams, option_map, flags);
} else if (cmd == CreateColumnFamilyCommand::Name()) {
return new CreateColumnFamilyCommand(cmdParams, option_map, flags);
} else if (cmd == DBFileDumperCommand::Name()) { } else if (cmd == DBFileDumperCommand::Name()) {
return new DBFileDumperCommand(cmdParams, option_map, flags); return new DBFileDumperCommand(cmdParams, option_map, flags);
} else if (cmd == InternalDumpCommand::Name()) { } else if (cmd == InternalDumpCommand::Name()) {
@ -450,6 +450,10 @@ void CompactorCommand::Help(string& ret) {
} }
void CompactorCommand::DoCommand() { void CompactorCommand::DoCommand() {
if (!db_) {
assert(GetExecuteState().IsFailed());
return;
}
Slice* begin = nullptr; Slice* begin = nullptr;
Slice* end = nullptr; Slice* end = nullptr;
@ -513,6 +517,7 @@ Options DBLoaderCommand::PrepareOptionsForOpenDB() {
void DBLoaderCommand::DoCommand() { void DBLoaderCommand::DoCommand() {
if (!db_) { if (!db_) {
assert(GetExecuteState().IsFailed());
return; return;
} }
@ -527,7 +532,7 @@ void DBLoaderCommand::DoCommand() {
string key; string key;
string value; string value;
if (ParseKeyValue(line, &key, &value, is_key_hex_, is_value_hex_)) { if (ParseKeyValue(line, &key, &value, is_key_hex_, is_value_hex_)) {
db_->Put(write_options, Slice(key), Slice(value)); db_->Put(write_options, GetCfHandle(), Slice(key), Slice(value));
} else if (0 == line.find("Keys in range:")) { } else if (0 == line.find("Keys in range:")) {
// ignore this line // ignore this line
} else if (0 == line.find("Created bg thread 0x")) { } else if (0 == line.find("Created bg thread 0x")) {
@ -541,7 +546,7 @@ void DBLoaderCommand::DoCommand() {
cout << "Warning: " << bad_lines << " bad lines ignored." << endl; cout << "Warning: " << bad_lines << " bad lines ignored." << endl;
} }
if (compact_) { if (compact_) {
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); db_->CompactRange(CompactRangeOptions(), GetCfHandle(), nullptr, nullptr);
} }
} }
@ -696,6 +701,38 @@ void ListColumnFamiliesCommand::DoCommand() {
} }
} }
void CreateColumnFamilyCommand::Help(string& ret) {
ret.append(" ");
ret.append(CreateColumnFamilyCommand::Name());
ret.append(" --db=<db_path> <new_column_family_name>");
ret.append("\n");
}
CreateColumnFamilyCommand::CreateColumnFamilyCommand(
const vector<string>& params, const map<string, string>& options,
const vector<string>& flags)
: LDBCommand(options, flags, true, {ARG_DB}) {
if (params.size() != 1) {
exec_state_ = LDBCommandExecuteResult::Failed(
"new column family name must be specified");
} else {
new_cf_name_ = params[0];
}
}
void CreateColumnFamilyCommand::DoCommand() {
ColumnFamilyHandle* new_cf_handle;
Status st = db_->CreateColumnFamily(options_, new_cf_name_, &new_cf_handle);
if (st.ok()) {
fprintf(stdout, "OK\n");
} else {
exec_state_ = LDBCommandExecuteResult::Failed(
"Fail to create new column family: " + st.ToString());
}
delete new_cf_handle;
CloseDB();
}
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
namespace { namespace {
@ -800,12 +837,13 @@ void InternalDumpCommand::Help(string& ret) {
void InternalDumpCommand::DoCommand() { void InternalDumpCommand::DoCommand() {
if (!db_) { if (!db_) {
assert(GetExecuteState().IsFailed());
return; return;
} }
if (print_stats_) { if (print_stats_) {
string stats; string stats;
if (db_->GetProperty("rocksdb.stats", &stats)) { if (db_->GetProperty(GetCfHandle(), "rocksdb.stats", &stats)) {
fprintf(stdout, "%s\n", stats.c_str()); fprintf(stdout, "%s\n", stats.c_str());
} }
} }
@ -1050,7 +1088,7 @@ void DBDumperCommand::DoDumpCommand() {
} }
// Setup key iterator // Setup key iterator
Iterator* iter = db_->NewIterator(ReadOptions()); Iterator* iter = db_->NewIterator(ReadOptions(), GetCfHandle());
Status st = iter->status(); Status st = iter->status();
if (!st.ok()) { if (!st.ok()) {
exec_state_ = exec_state_ =
@ -1285,7 +1323,7 @@ void ReduceDBLevelsCommand::DoCommand() {
} }
// Compact the whole DB to put all files to the highest level. // Compact the whole DB to put all files to the highest level.
fprintf(stdout, "Compacting the db...\n"); fprintf(stdout, "Compacting the db...\n");
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); db_->CompactRange(CompactRangeOptions(), GetCfHandle(), nullptr, nullptr);
CloseDB(); CloseDB();
EnvOptions soptions; EnvOptions soptions;
@ -1377,8 +1415,9 @@ void ChangeCompactionStyleCommand::DoCommand() {
// print db stats before we have made any change // print db stats before we have made any change
std::string property; std::string property;
std::string files_per_level; std::string files_per_level;
for (int i = 0; i < db_->NumberLevels(); i++) { for (int i = 0; i < db_->NumberLevels(GetCfHandle()); i++) {
db_->GetProperty("rocksdb.num-files-at-level" + NumberToString(i), db_->GetProperty(GetCfHandle(),
"rocksdb.num-files-at-level" + NumberToString(i),
&property); &property);
// format print string // format print string
@ -1393,13 +1432,14 @@ void ChangeCompactionStyleCommand::DoCommand() {
CompactRangeOptions compact_options; CompactRangeOptions compact_options;
compact_options.change_level = true; compact_options.change_level = true;
compact_options.target_level = 0; compact_options.target_level = 0;
db_->CompactRange(compact_options, nullptr, nullptr); db_->CompactRange(compact_options, GetCfHandle(), nullptr, nullptr);
// verify compaction result // verify compaction result
files_per_level = ""; files_per_level = "";
int num_files = 0; int num_files = 0;
for (int i = 0; i < db_->NumberLevels(); i++) { for (int i = 0; i < db_->NumberLevels(); i++) {
db_->GetProperty("rocksdb.num-files-at-level" + NumberToString(i), db_->GetProperty(GetCfHandle(),
"rocksdb.num-files-at-level" + NumberToString(i),
&property); &property);
// format print string // format print string
@ -1622,8 +1662,12 @@ void GetCommand::Help(string& ret) {
} }
void GetCommand::DoCommand() { void GetCommand::DoCommand() {
if (!db_) {
assert(GetExecuteState().IsFailed());
return;
}
string value; string value;
Status st = db_->Get(ReadOptions(), key_, &value); Status st = db_->Get(ReadOptions(), GetCfHandle(), key_, &value);
if (st.ok()) { if (st.ok()) {
fprintf(stdout, "%s\n", fprintf(stdout, "%s\n",
(is_value_hex_ ? StringToHex(value) : value).c_str()); (is_value_hex_ ? StringToHex(value) : value).c_str());
@ -1670,11 +1714,14 @@ void ApproxSizeCommand::Help(string& ret) {
} }
void ApproxSizeCommand::DoCommand() { void ApproxSizeCommand::DoCommand() {
if (!db_) {
assert(GetExecuteState().IsFailed());
return;
}
Range ranges[1]; Range ranges[1];
ranges[0] = Range(start_key_, end_key_); ranges[0] = Range(start_key_, end_key_);
uint64_t sizes[1]; uint64_t sizes[1];
db_->GetApproximateSizes(ranges, 1, sizes); db_->GetApproximateSizes(GetCfHandle(), ranges, 1, sizes);
fprintf(stdout, "%lu\n", (unsigned long)sizes[0]); fprintf(stdout, "%lu\n", (unsigned long)sizes[0]);
/* Weird that GetApproximateSizes() returns void, although documentation /* Weird that GetApproximateSizes() returns void, although documentation
* says that it returns a Status object. * says that it returns a Status object.
@ -1718,11 +1765,15 @@ void BatchPutCommand::Help(string& ret) {
} }
void BatchPutCommand::DoCommand() { void BatchPutCommand::DoCommand() {
if (!db_) {
assert(GetExecuteState().IsFailed());
return;
}
WriteBatch batch; WriteBatch batch;
for (vector<pair<string, string>>::const_iterator itr for (vector<pair<string, string>>::const_iterator itr
= key_values_.begin(); itr != key_values_.end(); ++itr) { = key_values_.begin(); itr != key_values_.end(); ++itr) {
batch.Put(itr->first, itr->second); batch.Put(GetCfHandle(), itr->first, itr->second);
} }
Status st = db_->Write(WriteOptions(), &batch); Status st = db_->Write(WriteOptions(), &batch);
if (st.ok()) { if (st.ok()) {
@ -1798,9 +1849,13 @@ void ScanCommand::Help(string& ret) {
} }
void ScanCommand::DoCommand() { void ScanCommand::DoCommand() {
if (!db_) {
assert(GetExecuteState().IsFailed());
return;
}
int num_keys_scanned = 0; int num_keys_scanned = 0;
Iterator* it = db_->NewIterator(ReadOptions()); Iterator* it = db_->NewIterator(ReadOptions(), GetCfHandle());
if (start_key_specified_) { if (start_key_specified_) {
it->Seek(start_key_); it->Seek(start_key_);
} else { } else {
@ -1896,7 +1951,11 @@ void DeleteCommand::Help(string& ret) {
} }
void DeleteCommand::DoCommand() { void DeleteCommand::DoCommand() {
Status st = db_->Delete(WriteOptions(), key_); if (!db_) {
assert(GetExecuteState().IsFailed());
return;
}
Status st = db_->Delete(WriteOptions(), GetCfHandle(), key_);
if (st.ok()) { if (st.ok()) {
fprintf(stdout, "OK\n"); fprintf(stdout, "OK\n");
} else { } else {
@ -1937,7 +1996,11 @@ void PutCommand::Help(string& ret) {
} }
void PutCommand::DoCommand() { void PutCommand::DoCommand() {
Status st = db_->Put(WriteOptions(), key_, value_); if (!db_) {
assert(GetExecuteState().IsFailed());
return;
}
Status st = db_->Put(WriteOptions(), GetCfHandle(), key_, value_);
if (st.ok()) { if (st.ok()) {
fprintf(stdout, "OK\n"); fprintf(stdout, "OK\n");
} else { } else {
@ -1978,6 +2041,7 @@ void DBQuerierCommand::Help(string& ret) {
void DBQuerierCommand::DoCommand() { void DBQuerierCommand::DoCommand() {
if (!db_) { if (!db_) {
assert(GetExecuteState().IsFailed());
return; return;
} }
@ -2011,17 +2075,17 @@ void DBQuerierCommand::DoCommand() {
"delete <key>\n"); "delete <key>\n");
} else if (cmd == DELETE_CMD && tokens.size() == 2) { } else if (cmd == DELETE_CMD && tokens.size() == 2) {
key = (is_key_hex_ ? HexToString(tokens[1]) : tokens[1]); key = (is_key_hex_ ? HexToString(tokens[1]) : tokens[1]);
db_->Delete(write_options, Slice(key)); db_->Delete(write_options, GetCfHandle(), Slice(key));
fprintf(stdout, "Successfully deleted %s\n", tokens[1].c_str()); fprintf(stdout, "Successfully deleted %s\n", tokens[1].c_str());
} else if (cmd == PUT_CMD && tokens.size() == 3) { } else if (cmd == PUT_CMD && tokens.size() == 3) {
key = (is_key_hex_ ? HexToString(tokens[1]) : tokens[1]); key = (is_key_hex_ ? HexToString(tokens[1]) : tokens[1]);
value = (is_value_hex_ ? HexToString(tokens[2]) : tokens[2]); value = (is_value_hex_ ? HexToString(tokens[2]) : tokens[2]);
db_->Put(write_options, Slice(key), Slice(value)); db_->Put(write_options, GetCfHandle(), Slice(key), Slice(value));
fprintf(stdout, "Successfully put %s %s\n", fprintf(stdout, "Successfully put %s %s\n",
tokens[1].c_str(), tokens[2].c_str()); tokens[1].c_str(), tokens[2].c_str());
} else if (cmd == GET_CMD && tokens.size() == 2) { } else if (cmd == GET_CMD && tokens.size() == 2) {
key = (is_key_hex_ ? HexToString(tokens[1]) : tokens[1]); key = (is_key_hex_ ? HexToString(tokens[1]) : tokens[1]);
if (db_->Get(read_options, Slice(key), &value).ok()) { if (db_->Get(read_options, GetCfHandle(), Slice(key), &value).ok()) {
fprintf(stdout, "%s\n", PrintKeyValue(key, value, fprintf(stdout, "%s\n", PrintKeyValue(key, value,
is_key_hex_, is_value_hex_).c_str()); is_key_hex_, is_value_hex_).c_str());
} else { } else {
@ -2125,6 +2189,7 @@ void DBFileDumperCommand::Help(string& ret) {
void DBFileDumperCommand::DoCommand() { void DBFileDumperCommand::DoCommand() {
if (!db_) { if (!db_) {
assert(GetExecuteState().IsFailed());
return; return;
} }
Status s; Status s;

@ -44,6 +44,7 @@ public:
static const string ARG_HEX; static const string ARG_HEX;
static const string ARG_KEY_HEX; static const string ARG_KEY_HEX;
static const string ARG_VALUE_HEX; static const string ARG_VALUE_HEX;
static const string ARG_CF_NAME;
static const string ARG_TTL; static const string ARG_TTL;
static const string ARG_TTL_START; static const string ARG_TTL_START;
static const string ARG_TTL_END; static const string ARG_TTL_END;
@ -62,17 +63,14 @@ public:
static const string ARG_CREATE_IF_MISSING; static const string ARG_CREATE_IF_MISSING;
static LDBCommand* InitFromCmdLineArgs( static LDBCommand* InitFromCmdLineArgs(
const vector<string>& args, const vector<string>& args, const Options& options,
const Options& options, const LDBOptions& ldb_options,
const LDBOptions& ldb_options const std::vector<ColumnFamilyDescriptor>* column_families);
);
static LDBCommand* InitFromCmdLineArgs( static LDBCommand* InitFromCmdLineArgs(
int argc, int argc, char** argv, const Options& options,
char** argv, const LDBOptions& ldb_options,
const Options& options, const std::vector<ColumnFamilyDescriptor>* column_families);
const LDBOptions& ldb_options
);
bool ValidateCmdLineOptions(); bool ValidateCmdLineOptions();
@ -82,6 +80,15 @@ public:
options_ = options; options_ = options;
} }
virtual void SetColumnFamilies(
const std::vector<ColumnFamilyDescriptor>* column_families) {
if (column_families != nullptr) {
column_families_ = *column_families;
} else {
column_families_.clear();
}
}
void SetLDBOptions(const LDBOptions& ldb_options) { void SetLDBOptions(const LDBOptions& ldb_options) {
ldb_options_ = ldb_options; ldb_options_ = ldb_options;
} }
@ -90,10 +97,7 @@ public:
return false; return false;
} }
virtual ~LDBCommand() { virtual ~LDBCommand() { CloseDB(); }
delete db_;
db_ = nullptr;
}
/* Run the command, and return the execute result. */ /* Run the command, and return the execute result. */
void Run() { void Run() {
@ -181,8 +185,10 @@ protected:
LDBCommandExecuteResult exec_state_; LDBCommandExecuteResult exec_state_;
string db_path_; string db_path_;
string column_family_name_;
DB* db_; DB* db_;
DBWithTTL* db_ttl_; DBWithTTL* db_ttl_;
std::map<std::string, ColumnFamilyHandle*> cf_handles_;
/** /**
* true implies that this command can work if the db is opened in read-only * true implies that this command can work if the db is opened in read-only
@ -235,6 +241,13 @@ protected:
db_path_ = itr->second; db_path_ = itr->second;
} }
itr = options.find(ARG_CF_NAME);
if (itr != options.end()) {
column_family_name_ = itr->second;
} else {
column_family_name_ = kDefaultColumnFamilyName;
}
is_key_hex_ = IsKeyHex(options, flags); is_key_hex_ = IsKeyHex(options, flags);
is_value_hex_ = IsValueHex(options, flags); is_value_hex_ = IsValueHex(options, flags);
is_db_ttl_ = IsFlagPresent(flags, ARG_TTL); is_db_ttl_ = IsFlagPresent(flags, ARG_TTL);
@ -248,21 +261,75 @@ protected:
} }
// Open the DB. // Open the DB.
Status st; Status st;
std::vector<ColumnFamilyHandle*> handles_opened;
if (is_db_ttl_) { if (is_db_ttl_) {
// ldb doesn't yet support TTL DB with multiple column families
if (!column_family_name_.empty() || !column_families_.empty()) {
exec_state_ = LDBCommandExecuteResult::Failed(
"ldb doesn't support TTL DB with multiple column families");
}
if (is_read_only_) { if (is_read_only_) {
st = DBWithTTL::Open(opt, db_path_, &db_ttl_, 0, true); st = DBWithTTL::Open(opt, db_path_, &db_ttl_, 0, true);
} else { } else {
st = DBWithTTL::Open(opt, db_path_, &db_ttl_); st = DBWithTTL::Open(opt, db_path_, &db_ttl_);
} }
db_ = db_ttl_; db_ = db_ttl_;
} else if (is_read_only_) {
st = DB::OpenForReadOnly(opt, db_path_, &db_);
} else { } else {
st = DB::Open(opt, db_path_, &db_); if (column_families_.empty()) {
// Try to figure out column family lists
std::vector<std::string> cf_list;
st = DB::ListColumnFamilies(DBOptions(), db_path_, &cf_list);
// There is possible the DB doesn't exist yet, for "create if not
// "existing case". The failure is ignored here. We rely on DB::Open()
// to give us the correct error message for problem with opening
// existing DB.
if (st.ok() && cf_list.size() > 1) {
// Ignore single column family DB.
for (auto cf_name : cf_list) {
column_families_.emplace_back(cf_name, opt);
}
}
}
if (is_read_only_) {
if (column_families_.empty()) {
st = DB::OpenForReadOnly(opt, db_path_, &db_);
} else {
st = DB::OpenForReadOnly(opt, db_path_, column_families_,
&handles_opened, &db_);
}
} else {
if (column_families_.empty()) {
st = DB::Open(opt, db_path_, &db_);
} else {
st = DB::Open(opt, db_path_, column_families_, &handles_opened, &db_);
}
}
} }
if (!st.ok()) { if (!st.ok()) {
string msg = st.ToString(); string msg = st.ToString();
exec_state_ = LDBCommandExecuteResult::Failed(msg); exec_state_ = LDBCommandExecuteResult::Failed(msg);
} else if (!handles_opened.empty()) {
assert(handles_opened.size() == column_families_.size());
bool found_cf_name = false;
for (size_t i = 0; i < handles_opened.size(); i++) {
cf_handles_[column_families_[i].name] = handles_opened[i];
if (column_family_name_ == column_families_[i].name) {
found_cf_name = true;
}
}
if (!found_cf_name) {
exec_state_ = LDBCommandExecuteResult::Failed(
"Non-existing column family " + column_family_name_);
CloseDB();
}
} else {
// We successfully opened DB in single column family mode.
assert(column_families_.empty());
if (column_family_name_ != kDefaultColumnFamilyName) {
exec_state_ = LDBCommandExecuteResult::Failed(
"Non-existing column family " + column_family_name_);
CloseDB();
}
} }
options_ = opt; options_ = opt;
@ -270,11 +337,27 @@ protected:
void CloseDB () { void CloseDB () {
if (db_ != nullptr) { if (db_ != nullptr) {
for (auto& pair : cf_handles_) {
delete pair.second;
}
delete db_; delete db_;
db_ = nullptr; db_ = nullptr;
} }
} }
ColumnFamilyHandle* GetCfHandle() {
if (!cf_handles_.empty()) {
auto it = cf_handles_.find(column_family_name_);
if (it == cf_handles_.end()) {
exec_state_ = LDBCommandExecuteResult::Failed(
"Cannot find column family " + column_family_name_);
} else {
return it->second;
}
}
return db_->DefaultColumnFamily();
}
static string PrintKeyValue(const string& key, const string& value, static string PrintKeyValue(const string& key, const string& value,
bool is_key_hex, bool is_value_hex) { bool is_key_hex, bool is_value_hex) {
string result; string result;
@ -310,10 +393,10 @@ protected:
* passed in. * passed in.
*/ */
static vector<string> BuildCmdLineOptions(vector<string> options) { static vector<string> BuildCmdLineOptions(vector<string> options) {
vector<string> ret = {ARG_DB, ARG_BLOOM_BITS, vector<string> ret = {ARG_DB, ARG_BLOOM_BITS, ARG_BLOCK_SIZE,
ARG_BLOCK_SIZE, ARG_AUTO_COMPACTION, ARG_AUTO_COMPACTION, ARG_COMPRESSION_TYPE,
ARG_COMPRESSION_TYPE, ARG_WRITE_BUFFER_SIZE, ARG_WRITE_BUFFER_SIZE, ARG_FILE_SIZE,
ARG_FILE_SIZE, ARG_FIX_PREFIX_LEN}; ARG_FIX_PREFIX_LEN, ARG_CF_NAME};
ret.insert(ret.end(), options.begin(), options.end()); ret.insert(ret.end(), options.begin(), options.end());
return ret; return ret;
} }
@ -325,6 +408,7 @@ protected:
const string& option, string* value); const string& option, string* value);
Options options_; Options options_;
std::vector<ColumnFamilyDescriptor> column_families_;
LDBOptions ldb_options_; LDBOptions ldb_options_;
private: private:
@ -568,6 +652,23 @@ class ListColumnFamiliesCommand : public LDBCommand {
string dbname_; string dbname_;
}; };
class CreateColumnFamilyCommand : public LDBCommand {
public:
static string Name() { return "create_column_family"; }
CreateColumnFamilyCommand(const vector<string>& params,
const map<string, string>& options,
const vector<string>& flags);
static void Help(string& ret);
virtual void DoCommand() override;
virtual bool NoDBOpen() override { return false; }
private:
string new_cf_name_;
};
class ReduceDBLevelsCommand : public LDBCommand { class ReduceDBLevelsCommand : public LDBCommand {
public: public:
static string Name() { return "reduce_levels"; } static string Name() { return "reduce_levels"; }

@ -503,7 +503,36 @@ class LDBTestCase(unittest.TestCase):
# Test on empty path. # Test on empty path.
self.assertRunFAILFull(cmd % "") self.assertRunFAILFull(cmd % "")
def testColumnFamilies(self):
print "Running testColumnFamilies..."
dbPath = os.path.join(self.TMP_DIR, self.DB_NAME)
self.assertRunOK("put cf1_1 1 --create_if_missing", "OK")
self.assertRunOK("put cf1_2 2 --create_if_missing", "OK")
self.assertRunOK("put cf1_3 3", "OK")
# Given non-default column family to single CF DB.
self.assertRunFAIL("get cf1_1 --column_family=two")
self.assertRunOK("create_column_family two", "OK")
self.assertRunOK("put cf2_1 1 --create_if_missing --column_family=two",
"OK")
self.assertRunOK("put cf2_2 2 --create_if_missing --column_family=two",
"OK")
self.assertRunOK("delete cf1_2", "OK")
self.assertRunOK("create_column_family three", "OK")
self.assertRunOK("delete cf2_2 --column_family=two", "OK")
self.assertRunOK(
"put cf3_1 3 --create_if_missing --column_family=three",
"OK")
self.assertRunOK("get cf1_1 --column_family=default", "1")
self.assertRunOK("dump --column_family=two",
"cf2_1 ==> 1\nKeys in range: 1")
self.assertRunOK("dump",
"cf1_1 ==> 1\ncf1_3 ==> 3\nKeys in range: 2")
self.assertRunOK("get cf2_1 --column_family=two",
"1")
self.assertRunOK("get cf3_1 --column_family=three",
"3")
# non-existing column family.
self.assertRunFAIL("get cf3_1 --column_family=four")
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

@ -30,6 +30,10 @@ public:
" : Values are input/output as hex\n"); " : Values are input/output as hex\n");
ret.append(" --" + LDBCommand::ARG_HEX + ret.append(" --" + LDBCommand::ARG_HEX +
" : Both keys and values are input/output as hex\n"); " : Both keys and values are input/output as hex\n");
ret.append(
" --" + LDBCommand::ARG_CF_NAME +
" : name of the column family to operate on. default: default column "
"family\n");
ret.append("\n"); ret.append("\n");
ret.append("The following optional parameters control the database " ret.append("The following optional parameters control the database "
@ -77,15 +81,16 @@ public:
fprintf(stderr, "%s\n", ret.c_str()); fprintf(stderr, "%s\n", ret.c_str());
} }
static void RunCommand(int argc, char** argv, Options options, static void RunCommand(
const LDBOptions& ldb_options) { int argc, char** argv, Options options, const LDBOptions& ldb_options,
const std::vector<ColumnFamilyDescriptor>* column_families) {
if (argc <= 2) { if (argc <= 2) {
PrintHelp(argv[0]); PrintHelp(argv[0]);
exit(1); exit(1);
} }
LDBCommand* cmdObj = LDBCommand::InitFromCmdLineArgs(argc, argv, options, LDBCommand* cmdObj = LDBCommand::InitFromCmdLineArgs(
ldb_options); argc, argv, options, ldb_options, column_families);
if (cmdObj == nullptr) { if (cmdObj == nullptr) {
fprintf(stderr, "Unknown command\n"); fprintf(stderr, "Unknown command\n");
PrintHelp(argv[0]); PrintHelp(argv[0]);
@ -106,10 +111,11 @@ public:
}; };
void LDBTool::Run(int argc, char** argv, Options options, void LDBTool::Run(int argc, char** argv, Options options,
const LDBOptions& ldb_options) { const LDBOptions& ldb_options,
LDBCommandRunner::RunCommand(argc, argv, options, ldb_options); const std::vector<ColumnFamilyDescriptor>* column_families) {
LDBCommandRunner::RunCommand(argc, argv, options, ldb_options,
column_families);
} }
} // namespace rocksdb } // namespace rocksdb

@ -92,8 +92,8 @@ Status ReduceLevelTest::OpenDB(bool create_if_missing, int num_levels) {
bool ReduceLevelTest::ReduceLevels(int target_level) { bool ReduceLevelTest::ReduceLevels(int target_level) {
std::vector<std::string> args = rocksdb::ReduceDBLevelsCommand::PrepareArgs( std::vector<std::string> args = rocksdb::ReduceDBLevelsCommand::PrepareArgs(
dbname_, target_level, false); dbname_, target_level, false);
LDBCommand* level_reducer = LDBCommand::InitFromCmdLineArgs( LDBCommand* level_reducer =
args, Options(), LDBOptions()); LDBCommand::InitFromCmdLineArgs(args, Options(), LDBOptions(), nullptr);
level_reducer->Run(); level_reducer->Run();
bool is_succeed = level_reducer->GetExecuteState().IsSucceed(); bool is_succeed = level_reducer->GetExecuteState().IsSucceed();
delete level_reducer; delete level_reducer;

Loading…
Cancel
Save