Reuse RocksDB transactional DB

First step for snapshots and transactions support
pull/171/head
Tpt 3 years ago
parent 196d6d6576
commit 367a1b4585
  1. 55
      lib/src/storage/backend/rocksdb.rs
  2. 44
      rocksdb-sys/api/c.cc
  3. 20
      rocksdb-sys/api/c.h

@ -56,8 +56,9 @@ unsafe impl Send for Db {}
unsafe impl Sync for Db {}
struct DbHandler {
db: *mut rocksdb_t,
db: *mut rocksdb_transactiondb_t,
options: *mut rocksdb_options_t,
txn_options: *mut rocksdb_transactiondb_options_t,
read_options: *mut rocksdb_readoptions_t,
write_options: *mut rocksdb_writeoptions_t,
flush_options: *mut rocksdb_flushoptions_t,
@ -79,7 +80,7 @@ impl Drop for DbHandler {
for cf_handle in &self.cf_handles {
rocksdb_column_family_handle_destroy(*cf_handle);
}
rocksdb_close(self.db);
rocksdb_transactiondb_close(self.db);
for cf_option in &self.cf_options {
rocksdb_options_destroy(*cf_option);
}
@ -89,6 +90,7 @@ impl Drop for DbHandler {
rocksdb_envoptions_destroy(self.env_options);
rocksdb_ingestexternalfileoptions_destroy(self.ingest_external_file_options);
rocksdb_compactoptions_destroy(self.compaction_options);
rocksdb_transactiondb_options_destroy(self.txn_options);
rocksdb_options_destroy(self.options);
rocksdb_block_based_options_destroy(self.block_based_table_options);
if let Some(env) = self.env {
@ -179,10 +181,17 @@ impl Db {
);
rocksdb_options_set_block_based_table_factory(options, block_based_table_options);
let txn_options = rocksdb_transactiondb_options_create();
assert!(
!txn_options.is_null(),
"rocksdb_transactiondb_options_create returned null"
);
let env = if in_memory {
let env = rocksdb_create_mem_env();
if env.is_null() {
rocksdb_options_destroy(options);
rocksdb_transactiondb_options_destroy(txn_options);
return Err(other_error("Not able to create an in-memory environment."));
}
rocksdb_options_set_env(options, env);
@ -256,8 +265,9 @@ impl Db {
let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> =
vec![ptr::null_mut(); column_family_names.len()];
let db = ffi_result!(rocksdb_open_column_families(
let db = ffi_result!(rocksdb_transactiondb_open_column_families(
options,
txn_options,
c_path.as_ptr(),
c_column_families.len().try_into().unwrap(),
c_column_families
@ -271,7 +281,7 @@ impl Db {
assert!(!db.is_null(), "rocksdb_create returned null");
for handle in &cf_handles {
if handle.is_null() {
rocksdb_close(db);
rocksdb_transactiondb_close(db);
return Err(other_error(
"Received null column family handle from RocksDB.",
));
@ -320,6 +330,7 @@ impl Db {
Ok(DbHandler {
db,
options,
txn_options,
read_options,
write_options,
flush_options,
@ -348,7 +359,7 @@ impl Db {
pub fn flush(&self, column_family: &ColumnFamily) -> Result<()> {
unsafe {
ffi_result!(rocksdb_flush_cf(
ffi_result!(rocksdb_transactiondb_flush_cf(
self.0.db,
self.0.flush_options,
column_family.0,
@ -359,7 +370,7 @@ impl Db {
#[allow(clippy::unnecessary_wraps)]
pub fn compact(&self, column_family: &ColumnFamily) -> Result<()> {
unsafe {
rocksdb_compact_range_cf_opt(
ffi_result!(rocksdb_transactiondb_compact_range_cf_opt(
self.0.db,
column_family.0,
self.0.compaction_options,
@ -367,14 +378,13 @@ impl Db {
0,
ptr::null(),
0,
);
))
}
Ok(())
}
pub fn get(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<Option<PinnableSlice>> {
unsafe {
let slice = ffi_result!(rocksdb_get_pinned_cf(
let slice = ffi_result!(rocksdb_transactiondb_get_pinned_cf(
self.0.db,
self.0.read_options,
column_family.0,
@ -406,7 +416,7 @@ impl Db {
pub fn write(&self, batch: &mut WriteBatchWithIndex) -> Result<()> {
unsafe {
ffi_result!(rocksdb_write_writebatch_wi(
ffi_result!(rocksdb_transactiondb_write_writebatch_wi(
self.0.db,
self.0.write_options,
batch.batch
@ -452,7 +462,8 @@ impl Db {
upper_bound.len(),
);
}
let iter = rocksdb_create_iterator_cf(self.0.db, options, column_family.0);
let iter =
rocksdb_transactiondb_create_iterator_cf(self.0.db, options, column_family.0);
assert!(!iter.is_null(), "rocksdb_create_iterator returned null");
if prefix.is_empty() {
rocksdb_iter_seek_to_first(iter);
@ -510,7 +521,7 @@ impl Db {
for (cf, writer) in writers_with_cf {
unsafe {
ffi_result!(rocksdb_sstfilewriter_finish(writer.writer))?;
ffi_result!(rocksdb_ingest_external_file_cf(
ffi_result!(rocksdb_transactiondb_ingest_external_file_cf(
self.0.db,
cf.0,
&writer.cpath.as_ptr(),
@ -587,15 +598,17 @@ impl WriteBatchWithIndex {
pub fn get(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<Option<Buffer>> {
unsafe {
let mut len = 0;
let base = ffi_result!(rocksdb_writebatch_wi_get_from_batch_and_db_cf(
self.batch,
self.db.0.db,
self.db.0.read_options,
column_family.0,
key.as_ptr() as *const c_char,
key.len(),
&mut len
))? as *mut u8;
let base = ffi_result!(
rocksdb_transactiondb_writebatch_wi_get_from_batch_and_db_cf(
self.batch,
self.db.0.db,
self.db.0.read_options,
column_family.0,
key.as_ptr() as *const c_char,
key.len(),
&mut len
)
)? as *mut u8;
Ok(if base.is_null() {
None
} else {

@ -42,6 +42,50 @@ void rocksdb_transactiondb_compact_range_cf_opt(rocksdb_transactiondb_t* db,
(limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr)));
}
void rocksdb_transactiondb_write_writebatch_wi(
rocksdb_transactiondb_t* db,
const rocksdb_writeoptions_t* options,
rocksdb_writebatch_wi_t* wbwi,
char** errptr) {
WriteBatch* wb = wbwi->rep->GetWriteBatch();
SaveError(errptr, db->rep->Write(options->rep, wb));
}
char* rocksdb_transactiondb_writebatch_wi_get_from_batch_and_db_cf(
rocksdb_writebatch_wi_t* wbwi,
rocksdb_transactiondb_t* db,
const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family,
const char* key, size_t keylen,
size_t* vallen,
char** errptr) {
char* result = nullptr;
std::string tmp;
Status s = wbwi->rep->GetFromBatchAndDB(db->rep, options->rep, column_family->rep,
Slice(key, keylen), &tmp);
if (s.ok()) {
*vallen = tmp.size();
result = CopyString(tmp);
} else {
*vallen = 0;
if (!s.IsNotFound()) {
SaveError(errptr, s);
}
}
return result;
}
void rocksdb_transactiondb_ingest_external_file_cf(
rocksdb_transactiondb_t* db, rocksdb_column_family_handle_t* handle,
const char* const* file_list, const size_t list_len,
const rocksdb_ingestexternalfileoptions_t* opt, char** errptr) {
std::vector<std::string> files(list_len);
for (size_t i = 0; i < list_len; ++i) {
files[i] = std::string(file_list[i]);
}
SaveError(errptr, db->rep->IngestExternalFile(handle->rep, files, opt->rep));
}
rocksdb_writeoptions_t* rocksdb_writeoptions_create_copy(rocksdb_writeoptions_t* options) {
return new rocksdb_writeoptions_t(*options);
}

@ -20,6 +20,26 @@ extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_compact_range_cf_opt(
rocksdb_compactoptions_t* opt, const char* start_key, size_t start_key_len,
const char* limit_key, size_t limit_key_len, char** errptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_write_writebatch_wi(
rocksdb_transactiondb_t* db,
const rocksdb_writeoptions_t* options,
rocksdb_writebatch_wi_t* wbwi,
char** errptr);
extern ROCKSDB_LIBRARY_API char* rocksdb_transactiondb_writebatch_wi_get_from_batch_and_db_cf(
rocksdb_writebatch_wi_t* wbwi,
rocksdb_transactiondb_t* db,
const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family,
const char* key, size_t keylen,
size_t* vallen,
char** errptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_ingest_external_file_cf(
rocksdb_transactiondb_t* db, rocksdb_column_family_handle_t* handle,
const char* const* file_list, const size_t list_len,
const rocksdb_ingestexternalfileoptions_t* opt, char** errptr);
extern ROCKSDB_LIBRARY_API rocksdb_writeoptions_t* rocksdb_writeoptions_create_copy(
rocksdb_writeoptions_t*);

Loading…
Cancel
Save