From 36b4ad62cf2f2dabba6c52ef813cd41ab305fc88 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Sun, 2 Aug 2015 17:55:00 -0700 Subject: [PATCH] Add column family implementations for put/merge/delete --- src/ffi.rs | 18 ++++++++++ src/rocksdb.rs | 90 ++++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 101 insertions(+), 7 deletions(-) diff --git a/src/ffi.rs b/src/ffi.rs index c5ecee6..6e86d80 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -190,6 +190,12 @@ extern { k: *const u8, kLen: size_t, v: *const u8, vLen: size_t, err: *mut *const i8); + pub fn rocksdb_put_cf(db: RocksDBInstance, + writeopts: RocksDBWriteOptions, + cf: RocksDBCFHandle, + k: *const u8, kLen: size_t, + v: *const u8, vLen: size_t, + err: *mut *const i8); pub fn rocksdb_readoptions_create() -> RocksDBReadOptions; pub fn rocksdb_readoptions_destroy(readopts: RocksDBReadOptions); pub fn rocksdb_readoptions_set_verify_checksums( @@ -241,6 +247,12 @@ extern { k: *const u8, kLen: size_t, err: *mut *const i8 ) -> *mut c_void; + pub fn rocksdb_delete_cf(db: RocksDBInstance, + writeopts: RocksDBWriteOptions, + cf: RocksDBCFHandle, + k: *const u8, kLen: size_t, + err: *mut *const i8 + ) -> *mut c_void; pub fn rocksdb_close(db: RocksDBInstance); pub fn rocksdb_destroy_db(options: RocksDBOptions, path: *const i8, err: *mut *const i8); @@ -252,6 +264,12 @@ extern { k: *const u8, kLen: size_t, v: *const u8, vLen: size_t, err: *mut *const i8); + pub fn rocksdb_merge_cf(db: RocksDBInstance, + writeopts: RocksDBWriteOptions, + cf: RocksDBCFHandle, + k: *const u8, kLen: size_t, + v: *const u8, vLen: size_t, + err: *mut *const i8); pub fn rocksdb_mergeoperator_create( state: *mut c_void, destroy: extern fn(*mut c_void) -> (), diff --git a/src/rocksdb.rs b/src/rocksdb.rs index a88b3a3..61596f2 100644 --- a/src/rocksdb.rs +++ b/src/rocksdb.rs @@ -163,8 +163,11 @@ impl <'a> Drop for Snapshot<'a> { // This is for the RocksDB and write batches to share the same API pub trait Writable { fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String>; + fn put_cf(&self, cf: &str, key: &[u8], value: &[u8]) -> Result<(), String>; fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String>; + fn merge_cf(&self, cf: &str, key: &[u8], value: &[u8]) -> Result<(), String>; fn delete(&self, key: &[u8]) -> Result<(), String>; + fn delete_cf(&self, cf: &str, key: &[u8]) -> Result<(), String>; } fn error_message(ptr: *const i8) -> String { @@ -392,7 +395,7 @@ impl RocksDB { } impl Writable for RocksDB { - fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> { + fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> { unsafe { let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let mut err: *const i8 = 0 as *const i8; @@ -404,10 +407,30 @@ impl Writable for RocksDB { if !err.is_null() { return Err(error_message(err)); } - return Ok(()) + Ok(()) } } + fn put_cf(&self, cf_name: &str, key: &[u8], value: &[u8]) -> Result<(), String> { + let cf = self.cfs.get(cf_name); + if cf.is_none() { + return Err(format!("Invalid column family: {}", cf_name).to_string()); + } + unsafe { + let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; + rocksdb_ffi::rocksdb_put_cf(self.inner, writeopts.clone(), *cf.unwrap(), + key.as_ptr(), key.len() as size_t, value.as_ptr(), + value.len() as size_t, err_ptr); + rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); + if !err.is_null() { + return Err(error_message(err)); + } + Ok(()) + } + } + fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> { unsafe { let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); @@ -420,7 +443,28 @@ impl Writable for RocksDB { if !err.is_null() { return Err(error_message(err)); } - return Ok(()) + Ok(()) + } + } + + fn merge_cf(&self, cf_name: &str, key: &[u8], value: &[u8]) -> Result<(), String> { + let cf = self.cfs.get(cf_name); + if cf.is_none() { + return Err(format!("Invalid column family: {}", cf_name).to_string()); + } + unsafe { + let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; + rocksdb_ffi::rocksdb_merge_cf(self.inner, writeopts.clone(), + *cf.unwrap(), key.as_ptr(), + key.len() as size_t, value.as_ptr(), + value.len() as size_t, err_ptr); + rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); + if !err.is_null() { + return Err(error_message(err)); + } + Ok(()) } } @@ -435,7 +479,27 @@ impl Writable for RocksDB { if !err.is_null() { return Err(error_message(err)); } - return Ok(()) + Ok(()) + } + } + + fn delete_cf(&self, cf_name: &str, key: &[u8]) -> Result<(), String> { + let cf = self.cfs.get(cf_name); + if cf.is_none() { + return Err(format!("Invalid column family: {}", cf_name).to_string()); + } + unsafe { + let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); + let mut err: *const i8 = 0 as *const i8; + let err_ptr: *mut *const i8 = &mut err; + rocksdb_ffi::rocksdb_delete_cf(self.inner, writeopts.clone(), + *cf.unwrap(), key.as_ptr(), + key.len() as size_t, err_ptr); + rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); + if !err.is_null() { + return Err(error_message(err)); + } + Ok(()) } } } @@ -475,26 +539,38 @@ impl Writable for WriteBatch { rocksdb_ffi::rocksdb_writebatch_put(self.inner, key.as_ptr(), key.len() as size_t, value.as_ptr(), value.len() as size_t); - return Ok(()) + Ok(()) } } + fn put_cf(&self, cf_name: &str, key: &[u8], value: &[u8]) -> Result<(), String> { + Err("not implemented for write batches yet".to_string()) + } + fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> { unsafe { rocksdb_ffi::rocksdb_writebatch_merge(self.inner, key.as_ptr(), key.len() as size_t, value.as_ptr(), value.len() as size_t); - return Ok(()) + Ok(()) } } + fn merge_cf(&self, cf: &str, key: &[u8], value: &[u8]) -> Result<(), String> { + Err("not implemented for write batches yet".to_string()) + } + fn delete(&self, key: &[u8]) -> Result<(), String> { unsafe { rocksdb_ffi::rocksdb_writebatch_delete(self.inner, key.as_ptr(), key.len() as size_t); - return Ok(()) + Ok(()) } } + + fn delete_cf(&self, cf: &str, key: &[u8]) -> Result<(), String> { + Err("not implemented for write batches yet".to_string()) + } } impl Drop for ReadOptions {