Introducing timeranged scan, timeranged dump in ldb. Also the ability to count in time-batches during Dump

Summary:
Scan and Dump commands in ldb use iterator. We need to also print timestamp for ttl databases for debugging. For this I create a TtlIterator class pointer in these functions and assign it the value of Iterator pointer which actually points to t TtlIterator object, and access the new function ValueWithTS which can return TS also. Buckets feature for dump command: gives a count of different key-values in the specified time-range distributed across the time-range partitioned according to bucket-size. start_time and end_time are specified in unixtimestamp and bucket in seconds on the user-commandline
Have commented out 3 ines from ldb_test.py so that the test does not break right now. It breaks because timestamp is also printed now and I have to look at wildcards in python to compare properly.

Test Plan: python tools/ldb_test.py

Reviewers: vamsi, dhruba, haobo, sheki

Reviewed By: vamsi

CC: leveldb

Differential Revision: https://reviews.facebook.net/D11403
main
Mayank Agarwal 12 years ago
parent 0f78fad9f5
commit 61f1baaedf
  1. 170
      util/ldb_cmd.cc
  2. 10
      util/ldb_cmd.h
  3. 6
      util/ldb_tool.cc
  4. 57
      utilities/ttl/db_ttl.cc
  5. 76
      utilities/ttl/db_ttl.h

@ -3,17 +3,19 @@
// found in the LICENSE file. // found in the LICENSE file.
#include "util/ldb_cmd.h" #include "util/ldb_cmd.h"
#include <dirent.h>
#include <sstream>
#include <string>
#include <stdexcept>
#include "leveldb/write_batch.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/log_reader.h" #include "db/log_reader.h"
#include "db/filename.h" #include "db/filename.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "leveldb/write_batch.h"
#include "util/coding.h"
#include <ctime>
#include <dirent.h>
#include <sstream>
#include <string>
#include <stdexcept>
namespace leveldb { namespace leveldb {
@ -24,6 +26,9 @@ const string LDBCommand::ARG_HEX = "hex";
const string LDBCommand::ARG_KEY_HEX = "key_hex"; const string LDBCommand::ARG_KEY_HEX = "key_hex";
const string LDBCommand::ARG_VALUE_HEX = "value_hex"; const string LDBCommand::ARG_VALUE_HEX = "value_hex";
const string LDBCommand::ARG_TTL = "ttl"; const string LDBCommand::ARG_TTL = "ttl";
const string LDBCommand::ARG_TTL_START = "start_time";
const string LDBCommand::ARG_TTL_END = "end_time";
const string LDBCommand::ARG_TIMESTAMP = "timestamp";
const string LDBCommand::ARG_FROM = "from"; const string LDBCommand::ARG_FROM = "from";
const string LDBCommand::ARG_TO = "to"; const string LDBCommand::ARG_TO = "to";
const string LDBCommand::ARG_MAX_KEYS = "max_keys"; const string LDBCommand::ARG_MAX_KEYS = "max_keys";
@ -523,15 +528,53 @@ void ManifestDumpCommand::DoCommand() {
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
string ReadableTime(int unixtime) {
char time_buffer [80];
time_t rawtime = unixtime;
struct tm * timeinfo = localtime(&rawtime);
strftime(time_buffer, 80, "%c", timeinfo);
return string(time_buffer);
}
// This function only called when it's the sane case of >1 buckets in time-range
// Also called only when timekv falls between ttl_start and ttl_end provided
void IncBucketCounts(uint64_t* bucket_counts, int ttl_start, int time_range,
int bucket_size, int timekv, int num_buckets) {
if (time_range <= 0 || timekv < ttl_start || timekv > (ttl_start + time_range)
|| bucket_size <= 0 || num_buckets < 2) {
fprintf(stderr, "Error: bucketizing\n");
return;
}
int bucket = (timekv - ttl_start);
bucket = (bucket == 0) ? 1 : ceil(bucket / (double)bucket_size);
bucket_counts[bucket - 1]++;
}
void PrintBucketCounts(uint64_t* bucket_counts, int ttl_start, int ttl_end,
int bucket_size, int num_buckets) {
int time_point = ttl_start;
for(int i = 0; i < num_buckets - 1; i++, time_point += bucket_size) {
fprintf(stdout, "Keys in range %s to %s : %lu\n",
ReadableTime(time_point).c_str(),
ReadableTime(time_point + bucket_size).c_str(), bucket_counts[i]);
}
fprintf(stdout, "Keys in range %s to %s : %lu\n",
ReadableTime(time_point).c_str(),
ReadableTime(ttl_end).c_str(), bucket_counts[num_buckets - 1]);
}
const string DBDumperCommand::ARG_COUNT_ONLY = "count_only"; const string DBDumperCommand::ARG_COUNT_ONLY = "count_only";
const string DBDumperCommand::ARG_STATS = "stats"; const string DBDumperCommand::ARG_STATS = "stats";
const string DBDumperCommand::ARG_TTL_BUCKET = "bucket";
DBDumperCommand::DBDumperCommand(const vector<string>& params, DBDumperCommand::DBDumperCommand(const vector<string>& params,
const map<string, string>& options, const vector<string>& flags) : const map<string, string>& options, const vector<string>& flags) :
LDBCommand(options, flags, true, LDBCommand(options, flags, true,
BuildCmdLineOptions({ARG_TTL, ARG_HEX, ARG_KEY_HEX, BuildCmdLineOptions({ARG_TTL, ARG_HEX, ARG_KEY_HEX,
ARG_VALUE_HEX, ARG_FROM, ARG_TO, ARG_VALUE_HEX, ARG_FROM, ARG_TO,
ARG_MAX_KEYS, ARG_COUNT_ONLY, ARG_STATS})), ARG_MAX_KEYS, ARG_COUNT_ONLY, ARG_STATS,
ARG_TTL_START, ARG_TTL_END,
ARG_TTL_BUCKET, ARG_TIMESTAMP})),
null_from_(true), null_from_(true),
null_to_(true), null_to_(true),
max_keys_(-1), max_keys_(-1),
@ -580,9 +623,14 @@ void DBDumperCommand::Help(string& ret) {
ret.append(" "); ret.append(" ");
ret.append(DBDumperCommand::Name()); ret.append(DBDumperCommand::Name());
ret.append(HelpRangeCmdArgs()); ret.append(HelpRangeCmdArgs());
ret.append(" [--" + ARG_TTL + "]");
ret.append(" [--" + ARG_MAX_KEYS + "=<N>]"); ret.append(" [--" + ARG_MAX_KEYS + "=<N>]");
ret.append(" [--" + ARG_TIMESTAMP + "]");
ret.append(" [--" + ARG_COUNT_ONLY + "]"); ret.append(" [--" + ARG_COUNT_ONLY + "]");
ret.append(" [--" + ARG_STATS + "]"); ret.append(" [--" + ARG_STATS + "]");
ret.append(" [--" + ARG_TTL_BUCKET + "=<N>]");
ret.append(" [--" + ARG_TTL_START + "=<N>]");
ret.append(" [--" + ARG_TTL_END + "=<N>]");
ret.append("\n"); ret.append("\n");
} }
@ -614,25 +662,78 @@ void DBDumperCommand::DoCommand() {
} }
int max_keys = max_keys_; int max_keys = max_keys_;
int ttl_start;
if (!ParseIntOption(option_map_, ARG_TTL_START, ttl_start, exec_state_)) {
ttl_start = DBWithTTL::kMinTimestamp; // TTL introduction time
}
int ttl_end;
if (!ParseIntOption(option_map_, ARG_TTL_END, ttl_end, exec_state_)) {
ttl_end = DBWithTTL::kMaxTimestamp; // Max time allowed by TTL feature
}
if (ttl_end < ttl_start) {
fprintf(stderr, "Error: End time can't be less than start time\n");
delete iter;
return;
}
int time_range = ttl_end - ttl_start;
int bucket_size;
if (!ParseIntOption(option_map_, ARG_TTL_BUCKET, bucket_size, exec_state_) ||
bucket_size <= 0) {
bucket_size = time_range; // Will have just 1 bucket by default
}
// At this point, bucket_size=0 => time_range=0
uint64_t num_buckets =
bucket_size >= time_range ? 1 : ceil((double)time_range/bucket_size);
unique_ptr<uint64_t[]> bucket_counts(new uint64_t[num_buckets]);
fill(bucket_counts.get(), bucket_counts.get() + num_buckets, 0);
if (is_db_ttl_ && !count_only_ && timestamp_) {
fprintf(stdout, "Dumping key-values from %s to %s\n",
ReadableTime(ttl_start).c_str(), ReadableTime(ttl_end).c_str());
}
for (; iter->Valid(); iter->Next()) { for (; iter->Valid(); iter->Next()) {
int rawtime = 0;
string value;
// If end marker was specified, we stop before it // If end marker was specified, we stop before it
if (!null_to_ && (iter->key().ToString() >= to_)) if (!null_to_ && (iter->key().ToString() >= to_))
break; break;
// Terminate if maximum number of keys have been dumped // Terminate if maximum number of keys have been dumped
if (max_keys == 0) if (max_keys == 0)
break; break;
if (is_db_ttl_) {
TtlIterator* it_ttl = (TtlIterator*)iter;
struct ValueAndTimestamp val_ts = it_ttl->ValueWithTimestamp();
value = val_ts.value.ToString();
rawtime = val_ts.timestamp;
if (rawtime < ttl_start || rawtime > ttl_end) {
continue;
}
} else {
value = iter->value().ToString();
}
if (max_keys > 0) { if (max_keys > 0) {
--max_keys; --max_keys;
} }
if (is_db_ttl_ && num_buckets > 1) {
IncBucketCounts(bucket_counts.get(), ttl_start, time_range, bucket_size,
rawtime, num_buckets);
}
++count; ++count;
if (!count_only_) { if (!count_only_) {
if (is_db_ttl_ && timestamp_) {
fprintf(stdout, "%s ", ReadableTime(rawtime).c_str());
}
string str = PrintKeyValue(iter->key().ToString(), string str = PrintKeyValue(iter->key().ToString(),
iter->value().ToString(), value, is_key_hex_, is_value_hex_);
is_key_hex_, is_value_hex_);
fprintf(stdout, "%s\n", str.c_str()); fprintf(stdout, "%s\n", str.c_str());
} }
} }
fprintf(stdout, "Keys in range: %lld\n", (long long) count); if (num_buckets > 1 && is_db_ttl_) {
PrintBucketCounts(bucket_counts.get(), ttl_start, ttl_end, bucket_size,
num_buckets);
} else {
fprintf(stdout, "Keys in range: %lld\n", (long long) count);
}
// Clean up // Clean up
delete iter; delete iter;
} }
@ -920,6 +1021,7 @@ void GetCommand::Help(string& ret) {
ret.append(" "); ret.append(" ");
ret.append(GetCommand::Name()); ret.append(GetCommand::Name());
ret.append(" <key>"); ret.append(" <key>");
ret.append(" [--" + ARG_TTL + "]");
ret.append("\n"); ret.append("\n");
} }
@ -1013,6 +1115,7 @@ void BatchPutCommand::Help(string& ret) {
ret.append(" "); ret.append(" ");
ret.append(BatchPutCommand::Name()); ret.append(BatchPutCommand::Name());
ret.append(" <key> <value> [<key> <value>] [..]"); ret.append(" <key> <value> [<key> <value>] [..]");
ret.append(" [--" + ARG_TTL + "]");
ret.append("\n"); ret.append("\n");
} }
@ -1041,9 +1144,9 @@ Options BatchPutCommand::PrepareOptionsForOpenDB() {
ScanCommand::ScanCommand(const vector<string>& params, ScanCommand::ScanCommand(const vector<string>& params,
const map<string, string>& options, const vector<string>& flags) : const map<string, string>& options, const vector<string>& flags) :
LDBCommand(options, flags, true, LDBCommand(options, flags, true,
BuildCmdLineOptions({ARG_TTL, ARG_HEX, ARG_KEY_HEX, BuildCmdLineOptions({ARG_TTL, ARG_HEX, ARG_KEY_HEX, ARG_TO,
ARG_VALUE_HEX, ARG_FROM, ARG_TO, ARG_VALUE_HEX, ARG_FROM, ARG_TIMESTAMP,
ARG_MAX_KEYS})), ARG_MAX_KEYS, ARG_TTL_START, ARG_TTL_END})),
start_key_specified_(false), start_key_specified_(false),
end_key_specified_(false), end_key_specified_(false),
max_keys_scanned_(-1) { max_keys_scanned_(-1) {
@ -1083,7 +1186,11 @@ void ScanCommand::Help(string& ret) {
ret.append(" "); ret.append(" ");
ret.append(ScanCommand::Name()); ret.append(ScanCommand::Name());
ret.append(HelpRangeCmdArgs()); ret.append(HelpRangeCmdArgs());
ret.append("--" + ARG_MAX_KEYS + "=N] "); ret.append(" [--" + ARG_TTL + "]");
ret.append(" [--" + ARG_TIMESTAMP + "]");
ret.append(" [--" + ARG_MAX_KEYS + "=<N>q] ");
ret.append(" [--" + ARG_TTL_START + "=<N>]");
ret.append(" [--" + ARG_TTL_END + "=<N>]");
ret.append("\n"); ret.append("\n");
} }
@ -1096,11 +1203,42 @@ void ScanCommand::DoCommand() {
} else { } else {
it->SeekToFirst(); it->SeekToFirst();
} }
int ttl_start;
if (!ParseIntOption(option_map_, ARG_TTL_START, ttl_start, exec_state_)) {
ttl_start = DBWithTTL::kMinTimestamp; // TTL introduction time
}
int ttl_end;
if (!ParseIntOption(option_map_, ARG_TTL_END, ttl_end, exec_state_)) {
ttl_end = DBWithTTL::kMaxTimestamp; // Max time allowed by TTL feature
}
if (ttl_end < ttl_start) {
fprintf(stderr, "Error: End time can't be less than start time\n");
delete it;
return;
}
if (is_db_ttl_ && timestamp_) {
fprintf(stdout, "Scanning key-values from %s to %s\n",
ReadableTime(ttl_start).c_str(), ReadableTime(ttl_end).c_str());
}
for ( ; for ( ;
it->Valid() && (!end_key_specified_ || it->key().ToString() < end_key_); it->Valid() && (!end_key_specified_ || it->key().ToString() < end_key_);
it->Next()) { it->Next()) {
string key = it->key().ToString(); string key = it->key().ToString();
string value = it->value().ToString(); string value;
if (is_db_ttl_) {
TtlIterator* it_ttl = (TtlIterator*)it;
struct ValueAndTimestamp val_ts = it_ttl->ValueWithTimestamp();
int rawtime = val_ts.timestamp;
value = val_ts.value.ToString();
if (rawtime < ttl_start || rawtime > ttl_end) {
continue;
}
if (timestamp_) {
fprintf(stdout, "%s ", ReadableTime(rawtime).c_str());
}
} else {
value = it->value().ToString();
}
fprintf(stdout, "%s : %s\n", fprintf(stdout, "%s : %s\n",
(is_key_hex_ ? StringToHex(key) : key).c_str(), (is_key_hex_ ? StringToHex(key) : key).c_str(),
(is_value_hex_ ? StringToHex(value) : value).c_str() (is_value_hex_ ? StringToHex(value) : value).c_str()
@ -1176,6 +1314,7 @@ void PutCommand::Help(string& ret) {
ret.append(" "); ret.append(" ");
ret.append(PutCommand::Name()); ret.append(PutCommand::Name());
ret.append(" <key> <value> "); ret.append(" <key> <value> ");
ret.append(" [--" + ARG_TTL + "]");
ret.append("\n"); ret.append("\n");
} }
@ -1211,6 +1350,7 @@ DBQuerierCommand::DBQuerierCommand(const vector<string>& params,
void DBQuerierCommand::Help(string& ret) { void DBQuerierCommand::Help(string& ret) {
ret.append(" "); ret.append(" ");
ret.append(DBQuerierCommand::Name()); ret.append(DBQuerierCommand::Name());
ret.append(" [--" + ARG_TTL + "]");
ret.append("\n"); ret.append("\n");
ret.append(" Starts a REPL shell. Type help for list of available " ret.append(" Starts a REPL shell. Type help for list of available "
"commands."); "commands.");

@ -21,6 +21,7 @@
#include "util/ldb_cmd_execute_result.h" #include "util/ldb_cmd_execute_result.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "utilities/utility_db.h" #include "utilities/utility_db.h"
#include "utilities/ttl/db_ttl.h"
using std::string; using std::string;
using std::map; using std::map;
@ -38,6 +39,9 @@ public:
static const string ARG_KEY_HEX; static const string ARG_KEY_HEX;
static const string ARG_VALUE_HEX; static const string ARG_VALUE_HEX;
static const string ARG_TTL; static const string ARG_TTL;
static const string ARG_TTL_START;
static const string ARG_TTL_END;
static const string ARG_TIMESTAMP;
static const string ARG_FROM; static const string ARG_FROM;
static const string ARG_TO; static const string ARG_TO;
static const string ARG_MAX_KEYS; static const string ARG_MAX_KEYS;
@ -162,6 +166,9 @@ protected:
/** If true, the value is treated as timestamp suffixed */ /** If true, the value is treated as timestamp suffixed */
bool is_db_ttl_; bool is_db_ttl_;
// If true, the kvs are output with their insert/modify timestamp in a ttl db
bool timestamp_;
/** /**
* Map of options passed on the command-line. * Map of options passed on the command-line.
*/ */
@ -185,6 +192,7 @@ protected:
is_key_hex_(false), is_key_hex_(false),
is_value_hex_(false), is_value_hex_(false),
is_db_ttl_(false), is_db_ttl_(false),
timestamp_(false),
option_map_(options), option_map_(options),
flags_(flags), flags_(flags),
valid_cmd_line_options_(valid_cmd_line_options) { valid_cmd_line_options_(valid_cmd_line_options) {
@ -197,6 +205,7 @@ protected:
is_key_hex_ = IsKeyHex(options, flags); is_key_hex_ = IsKeyHex(options, flags);
is_value_hex_ = IsValueHex(options, flags); is_value_hex_ = IsValueHex(options, flags);
is_db_ttl_ = IsFlagPresent(flags, ARG_TTL); is_db_ttl_ = IsFlagPresent(flags, ARG_TTL);
timestamp_ = IsFlagPresent(flags, ARG_TIMESTAMP);
} }
void OpenDB() { void OpenDB() {
@ -385,6 +394,7 @@ private:
static const string ARG_COUNT_ONLY; static const string ARG_COUNT_ONLY;
static const string ARG_STATS; static const string ARG_STATS;
static const string ARG_TTL_BUCKET;
}; };
class DBLoaderCommand: public LDBCommand { class DBLoaderCommand: public LDBCommand {

@ -18,9 +18,6 @@ public:
ret.append("\n"); ret.append("\n");
ret.append("The following optional parameters control if keys/values are " ret.append("The following optional parameters control if keys/values are "
"input/output as hex or as plain strings:\n"); "input/output as hex or as plain strings:\n");
ret.append(" --" + LDBCommand::ARG_TTL +
" with 'put','get','scan','dump','query','batchput'"
" : DB supports ttl and value is internally timestamp-suffixed\n");
ret.append(" --" + LDBCommand::ARG_KEY_HEX + ret.append(" --" + LDBCommand::ARG_KEY_HEX +
" : Keys are input/output as hex\n"); " : Keys are input/output as hex\n");
ret.append(" --" + LDBCommand::ARG_VALUE_HEX + ret.append(" --" + LDBCommand::ARG_VALUE_HEX +
@ -31,6 +28,9 @@ public:
ret.append("The following optional parameters control the database " ret.append("The following optional parameters control the database "
"internals:\n"); "internals:\n");
ret.append(" --" + LDBCommand::ARG_TTL +
" with 'put','get','scan','dump','query','batchput'"
" : DB supports ttl and value is internally timestamp-suffixed\n");
ret.append(" --" + LDBCommand::ARG_BLOOM_BITS + "=<int,e.g.:14>\n"); ret.append(" --" + LDBCommand::ARG_BLOOM_BITS + "=<int,e.g.:14>\n");
ret.append(" --" + LDBCommand::ARG_COMPRESSION_TYPE + ret.append(" --" + LDBCommand::ARG_COMPRESSION_TYPE +
"=<no|snappy|zlib|bzip2>\n"); "=<no|snappy|zlib|bzip2>\n");

@ -11,63 +11,6 @@
namespace leveldb { namespace leveldb {
class TtlIterator : public Iterator {
public:
TtlIterator(Iterator* iter, int32_t ts_len)
: iter_(iter),
ts_len_(ts_len) {
assert(iter_);
}
~TtlIterator() {
delete iter_;
}
bool Valid() const {
return iter_->Valid();
}
void SeekToFirst() {
iter_->SeekToFirst();
}
void SeekToLast() {
iter_->SeekToLast();
}
void Seek(const Slice& target) {
iter_->Seek(target);
}
void Next() {
iter_->Next();
}
void Prev() {
iter_->Prev();
}
Slice key() const {
return iter_->key();
}
Slice value() const {
assert(DBWithTTL::SanityCheckTimestamp(iter_->value().ToString()).ok());
Slice trimmed_value = iter_->value();
trimmed_value.size_ -= ts_len_;
return trimmed_value;
}
Status status() const {
return iter_->status();
}
private:
Iterator* iter_;
int32_t ts_len_;
};
// Open the db inside DBWithTTL because options needs pointer to its ttl // Open the db inside DBWithTTL because options needs pointer to its ttl
DBWithTTL::DBWithTTL(const int32_t ttl, DBWithTTL::DBWithTTL(const int32_t ttl,
const Options& options, const Options& options,

@ -97,12 +97,86 @@ class DBWithTTL : public DB, CompactionFilter {
static const int32_t kTSLength = sizeof(int32_t); // size of timestamp static const int32_t kTSLength = sizeof(int32_t); // size of timestamp
static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8
static const int32_t kMaxTimestamp = 2147483647; // 01/18/2038:7:14PM GMT-8
private: private:
DB* db_; DB* db_;
int32_t ttl_; int32_t ttl_;
}; };
struct ValueAndTimestamp {
Slice value;
int32_t timestamp;
};
class TtlIterator : public Iterator {
public:
TtlIterator(Iterator* iter, int32_t ts_len)
: iter_(iter),
ts_len_(ts_len) {
assert(iter_);
}
~TtlIterator() {
delete iter_;
}
bool Valid() const {
return iter_->Valid();
}
void SeekToFirst() {
iter_->SeekToFirst();
}
void SeekToLast() {
iter_->SeekToLast();
}
void Seek(const Slice& target) {
iter_->Seek(target);
}
void Next() {
iter_->Next();
}
void Prev() {
iter_->Prev();
}
Slice key() const {
return iter_->key();
}
struct ValueAndTimestamp ValueWithTimestamp() const {
assert(DBWithTTL::SanityCheckTimestamp(iter_->value().ToString()).ok());
struct ValueAndTimestamp val_ts;
val_ts.timestamp = DecodeFixed32(
iter_->value().data() + iter_->value().size() - DBWithTTL::kTSLength);
val_ts.value = iter_->value();
val_ts.value.size_ -= ts_len_;
return val_ts;
}
Slice value() const {
assert(DBWithTTL::SanityCheckTimestamp(iter_->value().ToString()).ok());
Slice trimmed_value = iter_->value();
trimmed_value.size_ -= ts_len_;
return trimmed_value;
}
Status status() const {
return iter_->status();
}
private:
Iterator* iter_;
int32_t ts_len_;
};
} }
#endif // LEVELDB_UTILITIES_TTL_DB_TTL_H_ #endif // LEVELDB_UTILITIES_TTL_DB_TTL_H_

Loading…
Cancel
Save