diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index 9985bad86..8476f3e8e 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -3,17 +3,19 @@ // found in the LICENSE file. #include "util/ldb_cmd.h" -#include - -#include -#include -#include -#include "leveldb/write_batch.h" #include "db/dbformat.h" #include "db/log_reader.h" #include "db/filename.h" #include "db/write_batch_internal.h" +#include "leveldb/write_batch.h" +#include "util/coding.h" + +#include +#include +#include +#include +#include namespace leveldb { @@ -24,6 +26,9 @@ const string LDBCommand::ARG_HEX = "hex"; const string LDBCommand::ARG_KEY_HEX = "key_hex"; const string LDBCommand::ARG_VALUE_HEX = "value_hex"; const string LDBCommand::ARG_TTL = "ttl"; +const string LDBCommand::ARG_TTL_START = "start_time"; +const string LDBCommand::ARG_TTL_END = "end_time"; +const string LDBCommand::ARG_TIMESTAMP = "timestamp"; const string LDBCommand::ARG_FROM = "from"; const string LDBCommand::ARG_TO = "to"; const string LDBCommand::ARG_MAX_KEYS = "max_keys"; @@ -523,15 +528,53 @@ void ManifestDumpCommand::DoCommand() { // ---------------------------------------------------------------------------- +string ReadableTime(int unixtime) { + char time_buffer [80]; + time_t rawtime = unixtime; + struct tm * timeinfo = localtime(&rawtime); + strftime(time_buffer, 80, "%c", timeinfo); + return string(time_buffer); +} + +// This function only called when it's the sane case of >1 buckets in time-range +// Also called only when timekv falls between ttl_start and ttl_end provided +void IncBucketCounts(uint64_t* bucket_counts, int ttl_start, int time_range, + int bucket_size, int timekv, int num_buckets) { + if (time_range <= 0 || timekv < ttl_start || timekv > (ttl_start + time_range) + || bucket_size <= 0 || num_buckets < 2) { + fprintf(stderr, "Error: bucketizing\n"); + return; + } + int bucket = (timekv - ttl_start); + bucket = (bucket == 0) ? 1 : ceil(bucket / (double)bucket_size); + bucket_counts[bucket - 1]++; +} + +void PrintBucketCounts(uint64_t* bucket_counts, int ttl_start, int ttl_end, + int bucket_size, int num_buckets) { + int time_point = ttl_start; + for(int i = 0; i < num_buckets - 1; i++, time_point += bucket_size) { + fprintf(stdout, "Keys in range %s to %s : %lu\n", + ReadableTime(time_point).c_str(), + ReadableTime(time_point + bucket_size).c_str(), bucket_counts[i]); + } + fprintf(stdout, "Keys in range %s to %s : %lu\n", + ReadableTime(time_point).c_str(), + ReadableTime(ttl_end).c_str(), bucket_counts[num_buckets - 1]); +} + const string DBDumperCommand::ARG_COUNT_ONLY = "count_only"; const string DBDumperCommand::ARG_STATS = "stats"; +const string DBDumperCommand::ARG_TTL_BUCKET = "bucket"; DBDumperCommand::DBDumperCommand(const vector& params, const map& options, const vector& flags) : LDBCommand(options, flags, true, BuildCmdLineOptions({ARG_TTL, ARG_HEX, ARG_KEY_HEX, ARG_VALUE_HEX, ARG_FROM, ARG_TO, - ARG_MAX_KEYS, ARG_COUNT_ONLY, ARG_STATS})), + ARG_MAX_KEYS, ARG_COUNT_ONLY, ARG_STATS, + ARG_TTL_START, ARG_TTL_END, + ARG_TTL_BUCKET, ARG_TIMESTAMP})), null_from_(true), null_to_(true), max_keys_(-1), @@ -580,9 +623,14 @@ void DBDumperCommand::Help(string& ret) { ret.append(" "); ret.append(DBDumperCommand::Name()); ret.append(HelpRangeCmdArgs()); + ret.append(" [--" + ARG_TTL + "]"); ret.append(" [--" + ARG_MAX_KEYS + "=]"); + ret.append(" [--" + ARG_TIMESTAMP + "]"); ret.append(" [--" + ARG_COUNT_ONLY + "]"); ret.append(" [--" + ARG_STATS + "]"); + ret.append(" [--" + ARG_TTL_BUCKET + "=]"); + ret.append(" [--" + ARG_TTL_START + "=]"); + ret.append(" [--" + ARG_TTL_END + "=]"); ret.append("\n"); } @@ -614,25 +662,78 @@ void DBDumperCommand::DoCommand() { } int max_keys = max_keys_; + int ttl_start; + if (!ParseIntOption(option_map_, ARG_TTL_START, ttl_start, exec_state_)) { + ttl_start = DBWithTTL::kMinTimestamp; // TTL introduction time + } + int ttl_end; + if (!ParseIntOption(option_map_, ARG_TTL_END, ttl_end, exec_state_)) { + ttl_end = DBWithTTL::kMaxTimestamp; // Max time allowed by TTL feature + } + if (ttl_end < ttl_start) { + fprintf(stderr, "Error: End time can't be less than start time\n"); + delete iter; + return; + } + int time_range = ttl_end - ttl_start; + int bucket_size; + if (!ParseIntOption(option_map_, ARG_TTL_BUCKET, bucket_size, exec_state_) || + bucket_size <= 0) { + bucket_size = time_range; // Will have just 1 bucket by default + } + // At this point, bucket_size=0 => time_range=0 + uint64_t num_buckets = + bucket_size >= time_range ? 1 : ceil((double)time_range/bucket_size); + unique_ptr bucket_counts(new uint64_t[num_buckets]); + fill(bucket_counts.get(), bucket_counts.get() + num_buckets, 0); + if (is_db_ttl_ && !count_only_ && timestamp_) { + fprintf(stdout, "Dumping key-values from %s to %s\n", + ReadableTime(ttl_start).c_str(), ReadableTime(ttl_end).c_str()); + } + for (; iter->Valid(); iter->Next()) { + int rawtime = 0; + string value; // If end marker was specified, we stop before it if (!null_to_ && (iter->key().ToString() >= to_)) break; // Terminate if maximum number of keys have been dumped if (max_keys == 0) break; + if (is_db_ttl_) { + TtlIterator* it_ttl = (TtlIterator*)iter; + struct ValueAndTimestamp val_ts = it_ttl->ValueWithTimestamp(); + value = val_ts.value.ToString(); + rawtime = val_ts.timestamp; + if (rawtime < ttl_start || rawtime > ttl_end) { + continue; + } + } else { + value = iter->value().ToString(); + } if (max_keys > 0) { --max_keys; } + if (is_db_ttl_ && num_buckets > 1) { + IncBucketCounts(bucket_counts.get(), ttl_start, time_range, bucket_size, + rawtime, num_buckets); + } ++count; if (!count_only_) { + if (is_db_ttl_ && timestamp_) { + fprintf(stdout, "%s ", ReadableTime(rawtime).c_str()); + } string str = PrintKeyValue(iter->key().ToString(), - iter->value().ToString(), - is_key_hex_, is_value_hex_); + value, is_key_hex_, is_value_hex_); fprintf(stdout, "%s\n", str.c_str()); } } - fprintf(stdout, "Keys in range: %lld\n", (long long) count); + if (num_buckets > 1 && is_db_ttl_) { + PrintBucketCounts(bucket_counts.get(), ttl_start, ttl_end, bucket_size, + num_buckets); + } else { + fprintf(stdout, "Keys in range: %lld\n", (long long) count); + } // Clean up delete iter; } @@ -920,6 +1021,7 @@ void GetCommand::Help(string& ret) { ret.append(" "); ret.append(GetCommand::Name()); ret.append(" "); + ret.append(" [--" + ARG_TTL + "]"); ret.append("\n"); } @@ -1013,6 +1115,7 @@ void BatchPutCommand::Help(string& ret) { ret.append(" "); ret.append(BatchPutCommand::Name()); ret.append(" [ ] [..]"); + ret.append(" [--" + ARG_TTL + "]"); ret.append("\n"); } @@ -1041,9 +1144,9 @@ Options BatchPutCommand::PrepareOptionsForOpenDB() { ScanCommand::ScanCommand(const vector& params, const map& options, const vector& flags) : LDBCommand(options, flags, true, - BuildCmdLineOptions({ARG_TTL, ARG_HEX, ARG_KEY_HEX, - ARG_VALUE_HEX, ARG_FROM, ARG_TO, - ARG_MAX_KEYS})), + BuildCmdLineOptions({ARG_TTL, ARG_HEX, ARG_KEY_HEX, ARG_TO, + ARG_VALUE_HEX, ARG_FROM, ARG_TIMESTAMP, + ARG_MAX_KEYS, ARG_TTL_START, ARG_TTL_END})), start_key_specified_(false), end_key_specified_(false), max_keys_scanned_(-1) { @@ -1083,7 +1186,11 @@ void ScanCommand::Help(string& ret) { ret.append(" "); ret.append(ScanCommand::Name()); ret.append(HelpRangeCmdArgs()); - ret.append("--" + ARG_MAX_KEYS + "=N] "); + ret.append(" [--" + ARG_TTL + "]"); + ret.append(" [--" + ARG_TIMESTAMP + "]"); + ret.append(" [--" + ARG_MAX_KEYS + "=q] "); + ret.append(" [--" + ARG_TTL_START + "=]"); + ret.append(" [--" + ARG_TTL_END + "=]"); ret.append("\n"); } @@ -1096,11 +1203,42 @@ void ScanCommand::DoCommand() { } else { it->SeekToFirst(); } + int ttl_start; + if (!ParseIntOption(option_map_, ARG_TTL_START, ttl_start, exec_state_)) { + ttl_start = DBWithTTL::kMinTimestamp; // TTL introduction time + } + int ttl_end; + if (!ParseIntOption(option_map_, ARG_TTL_END, ttl_end, exec_state_)) { + ttl_end = DBWithTTL::kMaxTimestamp; // Max time allowed by TTL feature + } + if (ttl_end < ttl_start) { + fprintf(stderr, "Error: End time can't be less than start time\n"); + delete it; + return; + } + if (is_db_ttl_ && timestamp_) { + fprintf(stdout, "Scanning key-values from %s to %s\n", + ReadableTime(ttl_start).c_str(), ReadableTime(ttl_end).c_str()); + } for ( ; it->Valid() && (!end_key_specified_ || it->key().ToString() < end_key_); it->Next()) { string key = it->key().ToString(); - string value = it->value().ToString(); + string value; + if (is_db_ttl_) { + TtlIterator* it_ttl = (TtlIterator*)it; + struct ValueAndTimestamp val_ts = it_ttl->ValueWithTimestamp(); + int rawtime = val_ts.timestamp; + value = val_ts.value.ToString(); + if (rawtime < ttl_start || rawtime > ttl_end) { + continue; + } + if (timestamp_) { + fprintf(stdout, "%s ", ReadableTime(rawtime).c_str()); + } + } else { + value = it->value().ToString(); + } fprintf(stdout, "%s : %s\n", (is_key_hex_ ? StringToHex(key) : key).c_str(), (is_value_hex_ ? StringToHex(value) : value).c_str() @@ -1176,6 +1314,7 @@ void PutCommand::Help(string& ret) { ret.append(" "); ret.append(PutCommand::Name()); ret.append(" "); + ret.append(" [--" + ARG_TTL + "]"); ret.append("\n"); } @@ -1211,6 +1350,7 @@ DBQuerierCommand::DBQuerierCommand(const vector& params, void DBQuerierCommand::Help(string& ret) { ret.append(" "); ret.append(DBQuerierCommand::Name()); + ret.append(" [--" + ARG_TTL + "]"); ret.append("\n"); ret.append(" Starts a REPL shell. Type help for list of available " "commands."); diff --git a/util/ldb_cmd.h b/util/ldb_cmd.h index 1fc51c4e3..d8e4c4b11 100644 --- a/util/ldb_cmd.h +++ b/util/ldb_cmd.h @@ -21,6 +21,7 @@ #include "util/ldb_cmd_execute_result.h" #include "util/string_util.h" #include "utilities/utility_db.h" +#include "utilities/ttl/db_ttl.h" using std::string; using std::map; @@ -38,6 +39,9 @@ public: static const string ARG_KEY_HEX; static const string ARG_VALUE_HEX; static const string ARG_TTL; + static const string ARG_TTL_START; + static const string ARG_TTL_END; + static const string ARG_TIMESTAMP; static const string ARG_FROM; static const string ARG_TO; static const string ARG_MAX_KEYS; @@ -162,6 +166,9 @@ protected: /** If true, the value is treated as timestamp suffixed */ bool is_db_ttl_; + // If true, the kvs are output with their insert/modify timestamp in a ttl db + bool timestamp_; + /** * Map of options passed on the command-line. */ @@ -185,6 +192,7 @@ protected: is_key_hex_(false), is_value_hex_(false), is_db_ttl_(false), + timestamp_(false), option_map_(options), flags_(flags), valid_cmd_line_options_(valid_cmd_line_options) { @@ -197,6 +205,7 @@ protected: is_key_hex_ = IsKeyHex(options, flags); is_value_hex_ = IsValueHex(options, flags); is_db_ttl_ = IsFlagPresent(flags, ARG_TTL); + timestamp_ = IsFlagPresent(flags, ARG_TIMESTAMP); } void OpenDB() { @@ -385,6 +394,7 @@ private: static const string ARG_COUNT_ONLY; static const string ARG_STATS; + static const string ARG_TTL_BUCKET; }; class DBLoaderCommand: public LDBCommand { diff --git a/util/ldb_tool.cc b/util/ldb_tool.cc index 3f4cfe950..e46aee39d 100644 --- a/util/ldb_tool.cc +++ b/util/ldb_tool.cc @@ -18,9 +18,6 @@ public: ret.append("\n"); ret.append("The following optional parameters control if keys/values are " "input/output as hex or as plain strings:\n"); - ret.append(" --" + LDBCommand::ARG_TTL + - " with 'put','get','scan','dump','query','batchput'" - " : DB supports ttl and value is internally timestamp-suffixed\n"); ret.append(" --" + LDBCommand::ARG_KEY_HEX + " : Keys are input/output as hex\n"); ret.append(" --" + LDBCommand::ARG_VALUE_HEX + @@ -31,6 +28,9 @@ public: ret.append("The following optional parameters control the database " "internals:\n"); + ret.append(" --" + LDBCommand::ARG_TTL + + " with 'put','get','scan','dump','query','batchput'" + " : DB supports ttl and value is internally timestamp-suffixed\n"); ret.append(" --" + LDBCommand::ARG_BLOOM_BITS + "=\n"); ret.append(" --" + LDBCommand::ARG_COMPRESSION_TYPE + "=\n"); diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index d4c8fc557..af99f80e4 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -11,63 +11,6 @@ namespace leveldb { -class TtlIterator : public Iterator { - - public: - TtlIterator(Iterator* iter, int32_t ts_len) - : iter_(iter), - ts_len_(ts_len) { - assert(iter_); - } - - ~TtlIterator() { - delete iter_; - } - - bool Valid() const { - return iter_->Valid(); - } - - void SeekToFirst() { - iter_->SeekToFirst(); - } - - void SeekToLast() { - iter_->SeekToLast(); - } - - void Seek(const Slice& target) { - iter_->Seek(target); - } - - void Next() { - iter_->Next(); - } - - void Prev() { - iter_->Prev(); - } - - Slice key() const { - return iter_->key(); - } - - Slice value() const { - assert(DBWithTTL::SanityCheckTimestamp(iter_->value().ToString()).ok()); - Slice trimmed_value = iter_->value(); - trimmed_value.size_ -= ts_len_; - return trimmed_value; - } - - Status status() const { - return iter_->status(); - } - - private: - Iterator* iter_; - int32_t ts_len_; -}; - // Open the db inside DBWithTTL because options needs pointer to its ttl DBWithTTL::DBWithTTL(const int32_t ttl, const Options& options, diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index 924bd8175..b7533cd49 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -97,12 +97,86 @@ class DBWithTTL : public DB, CompactionFilter { static const int32_t kTSLength = sizeof(int32_t); // size of timestamp - static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM + static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8 + + static const int32_t kMaxTimestamp = 2147483647; // 01/18/2038:7:14PM GMT-8 private: DB* db_; int32_t ttl_; }; +struct ValueAndTimestamp { + Slice value; + int32_t timestamp; +}; + +class TtlIterator : public Iterator { + + public: + TtlIterator(Iterator* iter, int32_t ts_len) + : iter_(iter), + ts_len_(ts_len) { + assert(iter_); + } + + ~TtlIterator() { + delete iter_; + } + + bool Valid() const { + return iter_->Valid(); + } + + void SeekToFirst() { + iter_->SeekToFirst(); + } + + void SeekToLast() { + iter_->SeekToLast(); + } + + void Seek(const Slice& target) { + iter_->Seek(target); + } + + void Next() { + iter_->Next(); + } + + void Prev() { + iter_->Prev(); + } + + Slice key() const { + return iter_->key(); + } + + struct ValueAndTimestamp ValueWithTimestamp() const { + assert(DBWithTTL::SanityCheckTimestamp(iter_->value().ToString()).ok()); + struct ValueAndTimestamp val_ts; + val_ts.timestamp = DecodeFixed32( + iter_->value().data() + iter_->value().size() - DBWithTTL::kTSLength); + val_ts.value = iter_->value(); + val_ts.value.size_ -= ts_len_; + return val_ts; + } + + Slice value() const { + assert(DBWithTTL::SanityCheckTimestamp(iter_->value().ToString()).ok()); + Slice trimmed_value = iter_->value(); + trimmed_value.size_ -= ts_len_; + return trimmed_value; + } + + Status status() const { + return iter_->status(); + } + + private: + Iterator* iter_; + int32_t ts_len_; +}; + } #endif // LEVELDB_UTILITIES_TTL_DB_TTL_H_