From 72aacf6b962abc7f7b197fad17f618e73b20474c Mon Sep 17 00:00:00 2001
From: Albert Strasheim <albert@cloudflare.com>
Date: Tue, 25 Feb 2014 10:32:28 -0800
Subject: [PATCH 1/3] A few more C API functions.

---
 db/c.cc             | 82 ++++++++++++++++++++++++++++++++++++++++++++-
 include/rocksdb/c.h | 38 +++++++++++++++++++++
 2 files changed, 119 insertions(+), 1 deletion(-)

diff --git a/db/c.cc b/db/c.cc
index 935a297f1..0a8e0700e 100644
--- a/db/c.cc
+++ b/db/c.cc
@@ -51,6 +51,7 @@ using rocksdb::Status;
 using rocksdb::WritableFile;
 using rocksdb::WriteBatch;
 using rocksdb::WriteOptions;
+using rocksdb::LiveFileMetaData;
 
 using std::shared_ptr;
 
@@ -70,6 +71,7 @@ struct rocksdb_writablefile_t    { WritableFile*     rep; };
 struct rocksdb_filelock_t        { FileLock*         rep; };
 struct rocksdb_logger_t          { shared_ptr<Logger>  rep; };
 struct rocksdb_cache_t           { shared_ptr<Cache>   rep; };
+struct rocksdb_livefiles_t       { std::vector<LiveFileMetaData> rep; };
 
 struct rocksdb_comparator_t : public Comparator {
   void* state_;
@@ -435,6 +437,19 @@ void rocksdb_approximate_sizes(
   delete[] ranges;
 }
 
+void rocksdb_delete_file(
+    rocksdb_t* db,
+    const char* name) {
+  db->rep->DeleteFile(name);
+}
+
+const rocksdb_livefiles_t* rocksdb_livefiles(
+    rocksdb_t* db) {
+  rocksdb_livefiles_t* result = new rocksdb_livefiles_t;
+  db->rep->GetLiveFilesMetaData(&result->rep);
+  return result;
+}
+
 void rocksdb_compact_range(
     rocksdb_t* db,
     const char* start_key, size_t start_key_len,
@@ -537,6 +552,10 @@ void rocksdb_writebatch_clear(rocksdb_writebatch_t* b) {
   b->rep.Clear();
 }
 
+int rocksdb_writebatch_count(rocksdb_writebatch_t* b) {
+  return b->rep.Count();
+}
+
 void rocksdb_writebatch_put(
     rocksdb_writebatch_t* b,
     const char* key, size_t klen,
@@ -581,6 +600,11 @@ void rocksdb_writebatch_iterate(
   b->rep.Iterate(&handler);
 }
 
+const char* rocksdb_writebatch_data(rocksdb_writebatch_t* b, size_t* size) {
+  *size = b->rep.GetDataSize();
+  return b->rep.Data().c_str();
+}
+
 rocksdb_options_t* rocksdb_options_create() {
   return new rocksdb_options_t;
 }
@@ -983,7 +1007,6 @@ DB::GetSortedWalFiles
 DB::GetLatestSequenceNumber
 DB::GetUpdatesSince
 DB::DeleteFile
-DB::GetLiveFilesMetaData
 DB::GetDbIdentity
 DB::RunManualCompaction
 custom cache
@@ -1304,4 +1327,61 @@ void rocksdb_universal_compaction_options_destroy(
   delete uco;
 }
 
+void rocksdb_options_set_min_level_to_compress(rocksdb_options_t* opt, int level) {
+  if (level >= 0) {
+    assert(level <= opt->rep.num_levels);
+    opt->rep.compression_per_level.resize(opt->rep.num_levels);
+    for (int i = 0; i < level; i++) {
+      opt->rep.compression_per_level[i] = rocksdb::kNoCompression;
+    }
+    for (int i = level; i < opt->rep.num_levels; i++) {
+      opt->rep.compression_per_level[i] = opt->rep.compression;
+    }
+  }
+}
+
+int rocksdb_livefiles_count(
+  const rocksdb_livefiles_t* lf) {
+  return lf->rep.size();
+}
+
+const char* rocksdb_livefiles_name(
+  const rocksdb_livefiles_t* lf,
+  int index) {
+  return lf->rep[index].name.c_str();
+}
+
+int rocksdb_livefiles_level(
+  const rocksdb_livefiles_t* lf,
+  int index) {
+  return lf->rep[index].level;
+}
+
+size_t rocksdb_livefiles_size(
+  const rocksdb_livefiles_t* lf,
+  int index) {
+  return lf->rep[index].size;
+}
+
+const char* rocksdb_livefiles_smallestkey(
+  const rocksdb_livefiles_t* lf,
+  int index,
+  size_t* size) {
+  *size = lf->rep[index].smallestkey.size();
+  return lf->rep[index].smallestkey.data();
+}
+
+const char* rocksdb_livefiles_largestkey(
+  const rocksdb_livefiles_t* lf,
+  int index,
+  size_t* size) {
+  *size = lf->rep[index].largestkey.size();
+  return lf->rep[index].largestkey.data();
+}
+
+extern void rocksdb_livefiles_destroy(
+  const rocksdb_livefiles_t* lf) {
+  delete lf;
+}
+
 }  // end extern "C"
diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h
index 91efed37f..62be94fe4 100644
--- a/include/rocksdb/c.h
+++ b/include/rocksdb/c.h
@@ -74,6 +74,7 @@ typedef struct rocksdb_writablefile_t    rocksdb_writablefile_t;
 typedef struct rocksdb_writebatch_t      rocksdb_writebatch_t;
 typedef struct rocksdb_writeoptions_t    rocksdb_writeoptions_t;
 typedef struct rocksdb_universal_compaction_options_t rocksdb_universal_compaction_options_t;
+typedef struct rocksdb_livefiles_t     rocksdb_livefiles_t;
 
 /* DB operations */
 
@@ -148,6 +149,13 @@ extern void rocksdb_compact_range(
     const char* start_key, size_t start_key_len,
     const char* limit_key, size_t limit_key_len);
 
+extern void rocksdb_delete_file(
+    rocksdb_t* db,
+    const char* name);
+
+extern const rocksdb_livefiles_t* rocksdb_livefiles(
+    rocksdb_t* db);
+
 extern void rocksdb_flush(
     rocksdb_t* db,
     const rocksdb_flushoptions_t* options,
@@ -192,6 +200,7 @@ extern void rocksdb_iter_get_error(const rocksdb_iterator_t*, char** errptr);
 extern rocksdb_writebatch_t* rocksdb_writebatch_create();
 extern void rocksdb_writebatch_destroy(rocksdb_writebatch_t*);
 extern void rocksdb_writebatch_clear(rocksdb_writebatch_t*);
+extern int rocksdb_writebatch_count(rocksdb_writebatch_t*);
 extern void rocksdb_writebatch_put(
     rocksdb_writebatch_t*,
     const char* key, size_t klen,
@@ -208,6 +217,7 @@ extern void rocksdb_writebatch_iterate(
     void* state,
     void (*put)(void*, const char* k, size_t klen, const char* v, size_t vlen),
     void (*deleted)(void*, const char* k, size_t klen));
+extern const char* rocksdb_writebatch_data(rocksdb_writebatch_t*, size_t *size);
 
 /* Options */
 
@@ -336,6 +346,12 @@ extern void rocksdb_options_set_delete_obsolete_files_period_micros(
 extern void rocksdb_options_set_source_compaction_factor(rocksdb_options_t*, int);
 extern void rocksdb_options_prepare_for_bulk_load(rocksdb_options_t*);
 extern void rocksdb_options_set_memtable_vector_rep(rocksdb_options_t*);
+
+extern void rocksdb_options_set_max_bytes_for_level_base(rocksdb_options_t* opt, uint64_t n);
+extern void rocksdb_options_set_stats_dump_period_sec(rocksdb_options_t* opt, unsigned int sec);
+
+extern void rocksdb_options_set_min_level_to_compress(rocksdb_options_t* opt, int level);
+
 extern void rocksdb_options_set_memtable_prefix_bloom_bits(
     rocksdb_options_t*, uint32_t);
 extern void rocksdb_options_set_memtable_prefix_bloom_probes(
@@ -508,6 +524,28 @@ extern void rocksdb_universal_compaction_options_set_stop_style(
 extern void rocksdb_universal_compaction_options_destroy(
   rocksdb_universal_compaction_options_t*);
 
+extern int rocksdb_livefiles_count(
+  const rocksdb_livefiles_t*);
+extern const char* rocksdb_livefiles_name(
+  const rocksdb_livefiles_t*,
+  int index);
+extern int rocksdb_livefiles_level(
+  const rocksdb_livefiles_t*,
+  int index);
+extern size_t rocksdb_livefiles_size(
+  const rocksdb_livefiles_t*,
+  int index);
+extern const char* rocksdb_livefiles_smallestkey(
+  const rocksdb_livefiles_t*,
+  int index,
+  size_t* size);
+extern const char* rocksdb_livefiles_largestkey(
+  const rocksdb_livefiles_t*,
+  int index,
+  size_t* size);
+extern void rocksdb_livefiles_destroy(
+  const rocksdb_livefiles_t*);
+
 #ifdef __cplusplus
 }  /* end extern "C" */
 #endif

From dea894ef8de6f5dabd616c14a05af2d261239ae5 Mon Sep 17 00:00:00 2001
From: Lei Jin <lei@fb.com>
Date: Tue, 25 Feb 2014 10:43:46 -0800
Subject: [PATCH 2/3] expose wal_dir in db_bench

Summary: as title

Test Plan: ran db_bench

Reviewers: dhruba, haobo

Reviewed By: haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D16269
---
 db/db_bench.cc | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/db/db_bench.cc b/db/db_bench.cc
index e40732f28..291a0ce8c 100644
--- a/db/db_bench.cc
+++ b/db/db_bench.cc
@@ -265,6 +265,8 @@ DEFINE_bool(use_fsync, false, "If true, issue fsync instead of fdatasync");
 
 DEFINE_bool(disable_wal, false, "If true, do not write WAL for write.");
 
+DEFINE_string(wal_dir, "", "If not empty, use the given dir for WAL");
+
 DEFINE_bool(use_snapshot, false, "If true, create a snapshot per query when"
             " randomread benchmark is used");
 
@@ -1478,6 +1480,7 @@ class Benchmark {
     options.env = FLAGS_env;
     options.disableDataSync = FLAGS_disable_data_sync;
     options.use_fsync = FLAGS_use_fsync;
+    options.wal_dir = FLAGS_wal_dir;
     options.num_levels = FLAGS_num_levels;
     options.target_file_size_base = FLAGS_target_file_size_base;
     options.target_file_size_multiplier = FLAGS_target_file_size_multiplier;

From 4209516359860ed4da676f217a648c5ce932d743 Mon Sep 17 00:00:00 2001
From: Igor Canadi <icanadi@fb.com>
Date: Tue, 25 Feb 2014 12:04:14 -0800
Subject: [PATCH 3/3] Schedule flush when waiting on flush

Summary:
This will also help with avoiding the deadlock. If a flush failed and we're waiting for a memtable to be flushed, we should schedule a new flush and hope a new one succeedes.

If paranoid_checks = false, Wait() will still hang on ENOSPC, but at least it will automatically continue when the space frees up. Current behavior both hangs and deadlocks.

Also, I renamed some 'compaction' to 'flush'. 'compaction' was leveldb way of saying things.

Test Plan: make check

Reviewers: dhruba, haobo, ljin

Reviewed By: haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D16281
---
 db/db_impl.cc | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/db/db_impl.cc b/db/db_impl.cc
index 12042585b..b1ce96e1b 100644
--- a/db/db_impl.cc
+++ b/db/db_impl.cc
@@ -3364,9 +3364,10 @@ Status DBImpl::MakeRoomForWrite(bool force,
       break;
     } else if (imm_.size() == options_.max_write_buffer_number - 1) {
       // We have filled up the current memtable, but the previous
-      // ones are still being compacted, so we wait.
+      // ones are still being flushed, so we wait.
       DelayLoggingAndReset();
-      Log(options_.info_log, "wait for memtable compaction...\n");
+      Log(options_.info_log, "wait for memtable flush...\n");
+      MaybeScheduleFlushOrCompaction();
       uint64_t stall;
       {
         StopWatch sw(env_, options_.statistics.get(),
@@ -3440,7 +3441,7 @@ Status DBImpl::MakeRoomForWrite(bool force,
       unique_ptr<WritableFile> lfile;
       MemTable* new_mem = nullptr;
 
-      // Attempt to switch to a new memtable and trigger compaction of old.
+      // Attempt to switch to a new memtable and trigger flush of old.
       // Do this without holding the dbmutex lock.
       assert(versions_->PrevLogNumber() == 0);
       uint64_t new_log_number = versions_->NewFileNumber();