Add column family implementations for put/merge/delete

master
Tyler Neely 9 years ago
parent fe0d4c5aeb
commit 36b4ad62cf
  1. 18
      src/ffi.rs
  2. 90
      src/rocksdb.rs

@ -190,6 +190,12 @@ extern {
k: *const u8, kLen: size_t, k: *const u8, kLen: size_t,
v: *const u8, vLen: size_t, v: *const u8, vLen: size_t,
err: *mut *const i8); err: *mut *const i8);
pub fn rocksdb_put_cf(db: RocksDBInstance,
writeopts: RocksDBWriteOptions,
cf: RocksDBCFHandle,
k: *const u8, kLen: size_t,
v: *const u8, vLen: size_t,
err: *mut *const i8);
pub fn rocksdb_readoptions_create() -> RocksDBReadOptions; pub fn rocksdb_readoptions_create() -> RocksDBReadOptions;
pub fn rocksdb_readoptions_destroy(readopts: RocksDBReadOptions); pub fn rocksdb_readoptions_destroy(readopts: RocksDBReadOptions);
pub fn rocksdb_readoptions_set_verify_checksums( pub fn rocksdb_readoptions_set_verify_checksums(
@ -241,6 +247,12 @@ extern {
k: *const u8, kLen: size_t, k: *const u8, kLen: size_t,
err: *mut *const i8 err: *mut *const i8
) -> *mut c_void; ) -> *mut c_void;
pub fn rocksdb_delete_cf(db: RocksDBInstance,
writeopts: RocksDBWriteOptions,
cf: RocksDBCFHandle,
k: *const u8, kLen: size_t,
err: *mut *const i8
) -> *mut c_void;
pub fn rocksdb_close(db: RocksDBInstance); pub fn rocksdb_close(db: RocksDBInstance);
pub fn rocksdb_destroy_db(options: RocksDBOptions, pub fn rocksdb_destroy_db(options: RocksDBOptions,
path: *const i8, err: *mut *const i8); path: *const i8, err: *mut *const i8);
@ -252,6 +264,12 @@ extern {
k: *const u8, kLen: size_t, k: *const u8, kLen: size_t,
v: *const u8, vLen: size_t, v: *const u8, vLen: size_t,
err: *mut *const i8); err: *mut *const i8);
pub fn rocksdb_merge_cf(db: RocksDBInstance,
writeopts: RocksDBWriteOptions,
cf: RocksDBCFHandle,
k: *const u8, kLen: size_t,
v: *const u8, vLen: size_t,
err: *mut *const i8);
pub fn rocksdb_mergeoperator_create( pub fn rocksdb_mergeoperator_create(
state: *mut c_void, state: *mut c_void,
destroy: extern fn(*mut c_void) -> (), destroy: extern fn(*mut c_void) -> (),

@ -163,8 +163,11 @@ impl <'a> Drop for Snapshot<'a> {
// This is for the RocksDB and write batches to share the same API // This is for the RocksDB and write batches to share the same API
pub trait Writable { pub trait Writable {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String>; fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String>;
fn put_cf(&self, cf: &str, key: &[u8], value: &[u8]) -> Result<(), String>;
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String>; fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String>;
fn merge_cf(&self, cf: &str, key: &[u8], value: &[u8]) -> Result<(), String>;
fn delete(&self, key: &[u8]) -> Result<(), String>; fn delete(&self, key: &[u8]) -> Result<(), String>;
fn delete_cf(&self, cf: &str, key: &[u8]) -> Result<(), String>;
} }
fn error_message(ptr: *const i8) -> String { fn error_message(ptr: *const i8) -> String {
@ -392,7 +395,7 @@ impl RocksDB {
} }
impl Writable for RocksDB { impl Writable for RocksDB {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> { fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe { unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let mut err: *const i8 = 0 as *const i8; let mut err: *const i8 = 0 as *const i8;
@ -404,10 +407,30 @@ impl Writable for RocksDB {
if !err.is_null() { if !err.is_null() {
return Err(error_message(err)); return Err(error_message(err));
} }
return Ok(()) Ok(())
} }
} }
fn put_cf(&self, cf_name: &str, key: &[u8], value: &[u8]) -> Result<(), String> {
let cf = self.cfs.get(cf_name);
if cf.is_none() {
return Err(format!("Invalid column family: {}", cf_name).to_string());
}
unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
rocksdb_ffi::rocksdb_put_cf(self.inner, writeopts.clone(), *cf.unwrap(),
key.as_ptr(), key.len() as size_t, value.as_ptr(),
value.len() as size_t, err_ptr);
rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts);
if !err.is_null() {
return Err(error_message(err));
}
Ok(())
}
}
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> { fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe { unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
@ -420,7 +443,28 @@ impl Writable for RocksDB {
if !err.is_null() { if !err.is_null() {
return Err(error_message(err)); return Err(error_message(err));
} }
return Ok(()) Ok(())
}
}
fn merge_cf(&self, cf_name: &str, key: &[u8], value: &[u8]) -> Result<(), String> {
let cf = self.cfs.get(cf_name);
if cf.is_none() {
return Err(format!("Invalid column family: {}", cf_name).to_string());
}
unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
rocksdb_ffi::rocksdb_merge_cf(self.inner, writeopts.clone(),
*cf.unwrap(), key.as_ptr(),
key.len() as size_t, value.as_ptr(),
value.len() as size_t, err_ptr);
rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts);
if !err.is_null() {
return Err(error_message(err));
}
Ok(())
} }
} }
@ -435,7 +479,27 @@ impl Writable for RocksDB {
if !err.is_null() { if !err.is_null() {
return Err(error_message(err)); return Err(error_message(err));
} }
return Ok(()) Ok(())
}
}
fn delete_cf(&self, cf_name: &str, key: &[u8]) -> Result<(), String> {
let cf = self.cfs.get(cf_name);
if cf.is_none() {
return Err(format!("Invalid column family: {}", cf_name).to_string());
}
unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
rocksdb_ffi::rocksdb_delete_cf(self.inner, writeopts.clone(),
*cf.unwrap(), key.as_ptr(),
key.len() as size_t, err_ptr);
rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts);
if !err.is_null() {
return Err(error_message(err));
}
Ok(())
} }
} }
} }
@ -475,26 +539,38 @@ impl Writable for WriteBatch {
rocksdb_ffi::rocksdb_writebatch_put(self.inner, key.as_ptr(), rocksdb_ffi::rocksdb_writebatch_put(self.inner, key.as_ptr(),
key.len() as size_t, value.as_ptr(), key.len() as size_t, value.as_ptr(),
value.len() as size_t); value.len() as size_t);
return Ok(()) Ok(())
} }
} }
fn put_cf(&self, cf_name: &str, key: &[u8], value: &[u8]) -> Result<(), String> {
Err("not implemented for write batches yet".to_string())
}
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> { fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe { unsafe {
rocksdb_ffi::rocksdb_writebatch_merge(self.inner, key.as_ptr(), rocksdb_ffi::rocksdb_writebatch_merge(self.inner, key.as_ptr(),
key.len() as size_t, value.as_ptr(), key.len() as size_t, value.as_ptr(),
value.len() as size_t); value.len() as size_t);
return Ok(()) Ok(())
} }
} }
fn merge_cf(&self, cf: &str, key: &[u8], value: &[u8]) -> Result<(), String> {
Err("not implemented for write batches yet".to_string())
}
fn delete(&self, key: &[u8]) -> Result<(), String> { fn delete(&self, key: &[u8]) -> Result<(), String> {
unsafe { unsafe {
rocksdb_ffi::rocksdb_writebatch_delete(self.inner, key.as_ptr(), rocksdb_ffi::rocksdb_writebatch_delete(self.inner, key.as_ptr(),
key.len() as size_t); key.len() as size_t);
return Ok(()) Ok(())
} }
} }
fn delete_cf(&self, cf: &str, key: &[u8]) -> Result<(), String> {
Err("not implemented for write batches yet".to_string())
}
} }
impl Drop for ReadOptions { impl Drop for ReadOptions {

Loading…
Cancel
Save