master
Tyler Neely 9 years ago
parent ad201263f8
commit edf9421ff4
  1. 4
      .gitignore
  2. 3
      rustfmt.toml
  3. 46
      src/comparator.rs
  4. 338
      src/ffi.rs
  5. 36
      src/lib.rs
  6. 160
      src/main.rs
  7. 68
      src/merge_operator.rs
  8. 422
      src/rocksdb.rs
  9. 138
      src/rocksdb_options.rs
  10. 55
      test/test_column_family.rs
  11. 32
      test/test_iterator.rs
  12. 8
      test/test_multithreaded.rs

4
.gitignore vendored

@ -3,3 +3,7 @@
target target
Cargo.lock Cargo.lock
*.orig *.orig
*.bk
_rust_rocksdb*
*rlib
tags

@ -0,0 +1,3 @@
reorder_imports = true
max_width = 80
ideal_width = 80

@ -1,18 +1,18 @@
/* //
Copyright 2014 Tyler Neely // Copyright 2014 Tyler Neely
//
Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
You may obtain a copy of the License at // You may obtain a copy of the License at
//
http://www.apache.org/licenses/LICENSE-2.0 // http://www.apache.org/licenses/LICENSE-2.0
//
Unless required by applicable law or agreed to in writing, software // Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, // distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
limitations under the License. // limitations under the License.
*/ //
extern crate libc; extern crate libc;
use self::libc::{c_char, c_int, c_void, size_t}; use self::libc::{c_char, c_int, c_void, size_t};
use std::ffi::CString; use std::ffi::CString;
@ -30,7 +30,7 @@ pub struct ComparatorCallback {
pub extern "C" fn destructor_callback(raw_cb: *mut c_void) { pub extern "C" fn destructor_callback(raw_cb: *mut c_void) {
// turn this back into a local variable so rust will reclaim it // turn this back into a local variable so rust will reclaim it
let _: Box<ComparatorCallback> = unsafe {mem::transmute(raw_cb)}; let _: Box<ComparatorCallback> = unsafe { mem::transmute(raw_cb) };
} }
pub extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char { pub extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char {
@ -51,8 +51,10 @@ pub extern "C" fn compare_callback(raw_cb: *mut c_void,
unsafe { unsafe {
let cb: &mut ComparatorCallback = let cb: &mut ComparatorCallback =
&mut *(raw_cb as *mut ComparatorCallback); &mut *(raw_cb as *mut ComparatorCallback);
let a: &[u8] = slice::from_raw_parts(a_raw as *const u8, a_len as usize); let a: &[u8] = slice::from_raw_parts(a_raw as *const u8,
let b: &[u8] = slice::from_raw_parts(b_raw as *const u8, b_len as usize); a_len as usize);
let b: &[u8] = slice::from_raw_parts(b_raw as *const u8,
b_len as usize);
(cb.f)(a, b) (cb.f)(a, b)
} }
} }
@ -67,9 +69,9 @@ fn test_reverse_compare(a: &[u8], b: &[u8]) -> c_int {
} }
} }
//#[allow(dead_code)] // #[allow(dead_code)]
//#[test] // #[test]
//fn compare_works() { // fn compare_works() {
// let path = "_rust_rocksdb_comparetest"; // let path = "_rust_rocksdb_comparetest";
// let mut opts = Options::new(); // let mut opts = Options::new();
// opts.create_if_missing(true); // opts.create_if_missing(true);
@ -79,4 +81,4 @@ fn test_reverse_compare(a: &[u8], b: &[u8]) -> c_int {
// // TODO add interesting test // // TODO add interesting test
// } // }
// assert!(DB::destroy(&opts, path).is_ok()); // assert!(DB::destroy(&opts, path).is_ok());
//} // }

@ -1,21 +1,21 @@
/* //
Copyright 2014 Tyler Neely // Copyright 2014 Tyler Neely
//
Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
You may obtain a copy of the License at // You may obtain a copy of the License at
//
http://www.apache.org/licenses/LICENSE-2.0 // http://www.apache.org/licenses/LICENSE-2.0
//
Unless required by applicable law or agreed to in writing, software // Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, // distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
limitations under the License. // limitations under the License.
*/ //
extern crate libc; extern crate libc;
use self::libc::{c_char, c_int, c_void, size_t}; use self::libc::{c_char, c_int, c_void, size_t};
use std::ffi::{CString, CStr}; use std::ffi::{CStr, CString};
use std::str::from_utf8; use std::str::from_utf8;
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
@ -59,15 +59,11 @@ pub struct DBWriteBatch(pub *const c_void);
pub struct DBComparator(pub *const c_void); pub struct DBComparator(pub *const c_void);
pub fn new_bloom_filter(bits: c_int) -> DBFilterPolicy { pub fn new_bloom_filter(bits: c_int) -> DBFilterPolicy {
unsafe { unsafe { rocksdb_filterpolicy_create_bloom(bits) }
rocksdb_filterpolicy_create_bloom(bits)
}
} }
pub fn new_cache(capacity: size_t) -> DBCache { pub fn new_cache(capacity: size_t) -> DBCache {
unsafe { unsafe { rocksdb_cache_create_lru(capacity) }
rocksdb_cache_create_lru(capacity)
}
} }
#[repr(C)] #[repr(C)]
@ -96,13 +92,14 @@ pub enum DBUniversalCompactionStyle {
pub fn error_message(ptr: *const i8) -> String { pub fn error_message(ptr: *const i8) -> String {
let c_str = unsafe { CStr::from_ptr(ptr) }; let c_str = unsafe { CStr::from_ptr(ptr) };
let s = from_utf8(c_str.to_bytes()).unwrap().to_owned(); let s = from_utf8(c_str.to_bytes()).unwrap().to_owned();
unsafe{ unsafe {
libc::free(ptr as *mut libc::c_void); libc::free(ptr as *mut libc::c_void);
} }
s s
} }
//TODO audit the use of boolean arguments, b/c I think they need to be u8 instead... // TODO audit the use of boolean arguments, b/c I think they need to be u8
// instead...
#[link(name = "rocksdb")] #[link(name = "rocksdb")]
extern { extern {
pub fn rocksdb_options_create() -> DBOptions; pub fn rocksdb_options_create() -> DBOptions;
@ -135,151 +132,155 @@ extern {
pub fn rocksdb_options_set_block_based_table_factory( pub fn rocksdb_options_set_block_based_table_factory(
options: DBOptions, options: DBOptions,
block_options: DBBlockBasedTableOptions); block_options: DBBlockBasedTableOptions);
pub fn rocksdb_options_increase_parallelism( pub fn rocksdb_options_increase_parallelism(options: DBOptions,
options: DBOptions, threads: c_int); threads: c_int);
pub fn rocksdb_options_optimize_level_style_compaction( pub fn rocksdb_options_optimize_level_style_compaction(
options: DBOptions, memtable_memory_budget: c_int); options: DBOptions, memtable_memory_budget: c_int);
pub fn rocksdb_options_set_create_if_missing( pub fn rocksdb_options_set_create_if_missing(options: DBOptions, v: bool);
options: DBOptions, v: bool); pub fn rocksdb_options_set_max_open_files(options: DBOptions,
pub fn rocksdb_options_set_max_open_files( files: c_int);
options: DBOptions, files: c_int); pub fn rocksdb_options_set_use_fsync(options: DBOptions, v: c_int);
pub fn rocksdb_options_set_use_fsync( pub fn rocksdb_options_set_bytes_per_sync(options: DBOptions, bytes: u64);
options: DBOptions, v: c_int); pub fn rocksdb_options_set_disable_data_sync(options: DBOptions,
pub fn rocksdb_options_set_bytes_per_sync( v: c_int);
options: DBOptions, bytes: u64); pub fn rocksdb_options_optimize_for_point_lookup(options: DBOptions,
pub fn rocksdb_options_set_disable_data_sync( block_cache_size_mb: u64);
options: DBOptions, v: c_int); pub fn rocksdb_options_set_table_cache_numshardbits(options: DBOptions,
pub fn rocksdb_options_optimize_for_point_lookup( bits: c_int);
options: DBOptions, block_cache_size_mb: u64); pub fn rocksdb_options_set_max_write_buffer_number(options: DBOptions,
pub fn rocksdb_options_set_table_cache_numshardbits( bufno: c_int);
options: DBOptions, bits: c_int);
pub fn rocksdb_options_set_max_write_buffer_number(
options: DBOptions, bufno: c_int);
pub fn rocksdb_options_set_min_write_buffer_number_to_merge( pub fn rocksdb_options_set_min_write_buffer_number_to_merge(
options: DBOptions, bufno: c_int); options: DBOptions, bufno: c_int);
pub fn rocksdb_options_set_level0_file_num_compaction_trigger( pub fn rocksdb_options_set_level0_file_num_compaction_trigger(
options: DBOptions, no: c_int); options: DBOptions, no: c_int);
pub fn rocksdb_options_set_level0_slowdown_writes_trigger( pub fn rocksdb_options_set_level0_slowdown_writes_trigger(
options: DBOptions, no: c_int); options: DBOptions, no: c_int);
pub fn rocksdb_options_set_level0_stop_writes_trigger( pub fn rocksdb_options_set_level0_stop_writes_trigger(options: DBOptions,
options: DBOptions, no: c_int); no: c_int);
pub fn rocksdb_options_set_write_buffer_size( pub fn rocksdb_options_set_write_buffer_size(options: DBOptions,
options: DBOptions, bytes: u64); bytes: u64);
pub fn rocksdb_options_set_target_file_size_base( pub fn rocksdb_options_set_target_file_size_base(options: DBOptions,
options: DBOptions, bytes: u64); bytes: u64);
pub fn rocksdb_options_set_target_file_size_multiplier( pub fn rocksdb_options_set_target_file_size_multiplier(options: DBOptions,
options: DBOptions, mul: c_int); mul: c_int);
pub fn rocksdb_options_set_max_log_file_size( pub fn rocksdb_options_set_max_log_file_size(options: DBOptions,
options: DBOptions, bytes: u64); bytes: u64);
pub fn rocksdb_options_set_max_manifest_file_size( pub fn rocksdb_options_set_max_manifest_file_size(options: DBOptions,
options: DBOptions, bytes: u64); bytes: u64);
pub fn rocksdb_options_set_hash_skip_list_rep( pub fn rocksdb_options_set_hash_skip_list_rep(options: DBOptions,
options: DBOptions, bytes: u64, a1: i32, a2: i32); bytes: u64,
pub fn rocksdb_options_set_compaction_style( a1: i32,
options: DBOptions, cs: DBCompactionStyle); a2: i32);
pub fn rocksdb_options_set_compression( pub fn rocksdb_options_set_compaction_style(options: DBOptions,
options: DBOptions, compression_style_no: c_int); cs: DBCompactionStyle);
pub fn rocksdb_options_set_compression(options: DBOptions,
compression_style_no: c_int);
pub fn rocksdb_options_set_max_background_compactions( pub fn rocksdb_options_set_max_background_compactions(
options: DBOptions, max_bg_compactions: c_int); options: DBOptions, max_bg_compactions: c_int);
pub fn rocksdb_options_set_max_background_flushes( pub fn rocksdb_options_set_max_background_flushes(options: DBOptions,
options: DBOptions, max_bg_flushes: c_int); max_bg_flushes: c_int);
pub fn rocksdb_options_set_filter_deletes( pub fn rocksdb_options_set_filter_deletes(options: DBOptions, v: bool);
options: DBOptions, v: bool); pub fn rocksdb_options_set_disable_auto_compactions(options: DBOptions,
pub fn rocksdb_options_set_disable_auto_compactions( v: c_int);
options: DBOptions, v: c_int); pub fn rocksdb_filterpolicy_create_bloom(bits_per_key: c_int)
pub fn rocksdb_filterpolicy_create_bloom( -> DBFilterPolicy;
bits_per_key: c_int) -> DBFilterPolicy;
pub fn rocksdb_open(options: DBOptions, pub fn rocksdb_open(options: DBOptions,
path: *const i8, path: *const i8,
err: *mut *const i8 err: *mut *const i8)
) -> DBInstance; -> DBInstance;
pub fn rocksdb_writeoptions_create() -> DBWriteOptions; pub fn rocksdb_writeoptions_create() -> DBWriteOptions;
pub fn rocksdb_writeoptions_destroy(writeopts: DBWriteOptions); pub fn rocksdb_writeoptions_destroy(writeopts: DBWriteOptions);
pub fn rocksdb_put(db: DBInstance, pub fn rocksdb_put(db: DBInstance,
writeopts: DBWriteOptions, writeopts: DBWriteOptions,
k: *const u8, kLen: size_t, k: *const u8,
v: *const u8, vLen: size_t, kLen: size_t,
v: *const u8,
vLen: size_t,
err: *mut *const i8); err: *mut *const i8);
pub fn rocksdb_put_cf(db: DBInstance, pub fn rocksdb_put_cf(db: DBInstance,
writeopts: DBWriteOptions, writeopts: DBWriteOptions,
cf: DBCFHandle, cf: DBCFHandle,
k: *const u8, kLen: size_t, k: *const u8,
v: *const u8, vLen: size_t, kLen: size_t,
err: *mut *const i8); v: *const u8,
vLen: size_t,
err: *mut *const i8);
pub fn rocksdb_readoptions_create() -> DBReadOptions; pub fn rocksdb_readoptions_create() -> DBReadOptions;
pub fn rocksdb_readoptions_destroy(readopts: DBReadOptions); pub fn rocksdb_readoptions_destroy(readopts: DBReadOptions);
pub fn rocksdb_readoptions_set_verify_checksums( pub fn rocksdb_readoptions_set_verify_checksums(readopts: DBReadOptions,
readopts: DBReadOptions, v: bool);
v: bool); pub fn rocksdb_readoptions_set_fill_cache(readopts: DBReadOptions,
pub fn rocksdb_readoptions_set_fill_cache( v: bool);
readopts: DBReadOptions, pub fn rocksdb_readoptions_set_snapshot(readopts: DBReadOptions,
v: bool); snapshot: DBSnapshot); //TODO how do I make this a const ref?
pub fn rocksdb_readoptions_set_snapshot( pub fn rocksdb_readoptions_set_iterate_upper_bound(readopts: DBReadOptions,
readopts: DBReadOptions, k: *const u8,
snapshot: DBSnapshot); //TODO how do I make this a const ref? kLen: size_t);
pub fn rocksdb_readoptions_set_iterate_upper_bound( pub fn rocksdb_readoptions_set_read_tier(readopts: DBReadOptions,
readopts: DBReadOptions, tier: c_int);
k: *const u8, pub fn rocksdb_readoptions_set_tailing(readopts: DBReadOptions, v: bool);
kLen: size_t);
pub fn rocksdb_readoptions_set_read_tier(
readopts: DBReadOptions,
tier: c_int);
pub fn rocksdb_readoptions_set_tailing(
readopts: DBReadOptions,
v: bool);
pub fn rocksdb_get(db: DBInstance, pub fn rocksdb_get(db: DBInstance,
readopts: DBReadOptions, readopts: DBReadOptions,
k: *const u8, kLen: size_t, k: *const u8,
kLen: size_t,
valLen: *const size_t, valLen: *const size_t,
err: *mut *const i8 err: *mut *const i8)
) -> *mut c_void; -> *mut c_void;
pub fn rocksdb_get_cf(db: DBInstance, pub fn rocksdb_get_cf(db: DBInstance,
readopts: DBReadOptions, readopts: DBReadOptions,
cf_handle: DBCFHandle, cf_handle: DBCFHandle,
k: *const u8, kLen: size_t, k: *const u8,
valLen: *const size_t, kLen: size_t,
err: *mut *const i8 valLen: *const size_t,
) -> *mut c_void; err: *mut *const i8)
-> *mut c_void;
pub fn rocksdb_create_iterator(db: DBInstance, pub fn rocksdb_create_iterator(db: DBInstance,
readopts: DBReadOptions readopts: DBReadOptions)
) -> DBIterator; -> DBIterator;
pub fn rocksdb_create_iterator_cf(db: DBInstance, pub fn rocksdb_create_iterator_cf(db: DBInstance,
readopts: DBReadOptions, readopts: DBReadOptions,
cf_handle: DBCFHandle cf_handle: DBCFHandle)
) -> DBIterator; -> DBIterator;
pub fn rocksdb_create_snapshot(db: DBInstance) -> DBSnapshot; pub fn rocksdb_create_snapshot(db: DBInstance) -> DBSnapshot;
pub fn rocksdb_release_snapshot(db: DBInstance, pub fn rocksdb_release_snapshot(db: DBInstance, snapshot: DBSnapshot);
snapshot: DBSnapshot);
pub fn rocksdb_delete(db: DBInstance, pub fn rocksdb_delete(db: DBInstance,
writeopts: DBWriteOptions, writeopts: DBWriteOptions,
k: *const u8, kLen: size_t, k: *const u8,
err: *mut *const i8 kLen: size_t,
) -> *mut c_void; err: *mut *const i8)
-> *mut c_void;
pub fn rocksdb_delete_cf(db: DBInstance, pub fn rocksdb_delete_cf(db: DBInstance,
writeopts: DBWriteOptions, writeopts: DBWriteOptions,
cf: DBCFHandle, cf: DBCFHandle,
k: *const u8, kLen: size_t, k: *const u8,
err: *mut *const i8 kLen: size_t,
) -> *mut c_void; err: *mut *const i8)
-> *mut c_void;
pub fn rocksdb_close(db: DBInstance); pub fn rocksdb_close(db: DBInstance);
pub fn rocksdb_destroy_db(options: DBOptions, pub fn rocksdb_destroy_db(options: DBOptions,
path: *const i8, err: *mut *const i8); path: *const i8,
err: *mut *const i8);
pub fn rocksdb_repair_db(options: DBOptions, pub fn rocksdb_repair_db(options: DBOptions,
path: *const i8, err: *mut *const i8); path: *const i8,
err: *mut *const i8);
// Merge // Merge
pub fn rocksdb_merge(db: DBInstance, pub fn rocksdb_merge(db: DBInstance,
writeopts: DBWriteOptions, writeopts: DBWriteOptions,
k: *const u8, kLen: size_t, k: *const u8,
v: *const u8, vLen: size_t, kLen: size_t,
v: *const u8,
vLen: size_t,
err: *mut *const i8); err: *mut *const i8);
pub fn rocksdb_merge_cf(db: DBInstance, pub fn rocksdb_merge_cf(db: DBInstance,
writeopts: DBWriteOptions, writeopts: DBWriteOptions,
cf: DBCFHandle, cf: DBCFHandle,
k: *const u8, kLen: size_t, k: *const u8,
v: *const u8, vLen: size_t, kLen: size_t,
err: *mut *const i8); v: *const u8,
vLen: size_t,
err: *mut *const i8);
pub fn rocksdb_mergeoperator_create( pub fn rocksdb_mergeoperator_create(
state: *mut c_void, state: *mut c_void,
destroy: extern fn(*mut c_void) -> (), destroy: extern fn(*mut c_void) -> (),
@ -313,50 +314,53 @@ extern {
pub fn rocksdb_iter_valid(iter: DBIterator) -> bool; pub fn rocksdb_iter_valid(iter: DBIterator) -> bool;
pub fn rocksdb_iter_seek_to_first(iter: DBIterator); pub fn rocksdb_iter_seek_to_first(iter: DBIterator);
pub fn rocksdb_iter_seek_to_last(iter: DBIterator); pub fn rocksdb_iter_seek_to_last(iter: DBIterator);
pub fn rocksdb_iter_seek(iter: DBIterator, pub fn rocksdb_iter_seek(iter: DBIterator, key: *const u8, klen: size_t);
key: *const u8, klen: size_t);
pub fn rocksdb_iter_next(iter: DBIterator); pub fn rocksdb_iter_next(iter: DBIterator);
pub fn rocksdb_iter_prev(iter: DBIterator); pub fn rocksdb_iter_prev(iter: DBIterator);
pub fn rocksdb_iter_key(iter: DBIterator, pub fn rocksdb_iter_key(iter: DBIterator, klen: *mut size_t) -> *mut u8;
klen: *mut size_t) -> *mut u8; pub fn rocksdb_iter_value(iter: DBIterator, vlen: *mut size_t) -> *mut u8;
pub fn rocksdb_iter_value(iter: DBIterator, pub fn rocksdb_iter_get_error(iter: DBIterator, err: *mut *const u8);
vlen: *mut size_t) -> *mut u8;
pub fn rocksdb_iter_get_error(iter: DBIterator,
err: *mut *const u8);
// Write batch // Write batch
pub fn rocksdb_write(db: DBInstance, pub fn rocksdb_write(db: DBInstance,
writeopts: DBWriteOptions, writeopts: DBWriteOptions,
batch : DBWriteBatch, batch: DBWriteBatch,
err: *mut *const i8); err: *mut *const i8);
pub fn rocksdb_writebatch_create() -> DBWriteBatch; pub fn rocksdb_writebatch_create() -> DBWriteBatch;
pub fn rocksdb_writebatch_create_from(rep: *const u8, pub fn rocksdb_writebatch_create_from(rep: *const u8,
size: size_t) -> DBWriteBatch; size: size_t)
-> DBWriteBatch;
pub fn rocksdb_writebatch_destroy(batch: DBWriteBatch); pub fn rocksdb_writebatch_destroy(batch: DBWriteBatch);
pub fn rocksdb_writebatch_clear(batch: DBWriteBatch); pub fn rocksdb_writebatch_clear(batch: DBWriteBatch);
pub fn rocksdb_writebatch_count(batch: DBWriteBatch) -> c_int; pub fn rocksdb_writebatch_count(batch: DBWriteBatch) -> c_int;
pub fn rocksdb_writebatch_put(batch: DBWriteBatch, pub fn rocksdb_writebatch_put(batch: DBWriteBatch,
key: *const u8, klen: size_t, key: *const u8,
val: *const u8, vlen: size_t); klen: size_t,
val: *const u8,
vlen: size_t);
pub fn rocksdb_writebatch_put_cf(batch: DBWriteBatch, pub fn rocksdb_writebatch_put_cf(batch: DBWriteBatch,
cf: DBCFHandle, cf: DBCFHandle,
key: *const u8, klen: size_t, key: *const u8,
val: *const u8, vlen: size_t); klen: size_t,
pub fn rocksdb_writebatch_merge( val: *const u8,
batch: DBWriteBatch, vlen: size_t);
key: *const u8, klen: size_t, pub fn rocksdb_writebatch_merge(batch: DBWriteBatch,
val: *const u8, vlen: size_t); key: *const u8,
pub fn rocksdb_writebatch_merge_cf( klen: size_t,
batch: DBWriteBatch, val: *const u8,
cf: DBCFHandle, vlen: size_t);
key: *const u8, klen: size_t, pub fn rocksdb_writebatch_merge_cf(batch: DBWriteBatch,
val: *const u8, vlen: size_t); cf: DBCFHandle,
pub fn rocksdb_writebatch_delete( key: *const u8,
batch: DBWriteBatch, klen: size_t,
key: *const u8, klen: size_t); val: *const u8,
pub fn rocksdb_writebatch_delete_cf( vlen: size_t);
batch: DBWriteBatch, pub fn rocksdb_writebatch_delete(batch: DBWriteBatch,
cf: DBCFHandle, key: *const u8,
key: *const u8, klen: size_t); klen: size_t);
pub fn rocksdb_writebatch_delete_cf(batch: DBWriteBatch,
cf: DBCFHandle,
key: *const u8,
klen: size_t);
pub fn rocksdb_writebatch_iterate( pub fn rocksdb_writebatch_iterate(
batch: DBWriteBatch, batch: DBWriteBatch,
state: *mut c_void, state: *mut c_void,
@ -366,7 +370,8 @@ extern {
deleted_fn: extern fn(state: *mut c_void, deleted_fn: extern fn(state: *mut c_void,
k: *const u8, klen: size_t)); k: *const u8, klen: size_t));
pub fn rocksdb_writebatch_data(batch: DBWriteBatch, pub fn rocksdb_writebatch_data(batch: DBWriteBatch,
size: *mut size_t) -> *const u8; size: *mut size_t)
-> *const u8;
// Comparator // Comparator
pub fn rocksdb_options_set_comparator(options: DBOptions, pub fn rocksdb_options_set_comparator(options: DBOptions,
@ -394,8 +399,8 @@ extern {
pub fn rocksdb_create_column_family(db: DBInstance, pub fn rocksdb_create_column_family(db: DBInstance,
column_family_options: DBOptions, column_family_options: DBOptions,
column_family_name: *const i8, column_family_name: *const i8,
err: *mut *const i8 err: *mut *const i8)
) -> DBCFHandle; -> DBCFHandle;
pub fn rocksdb_drop_column_family(db: DBInstance, pub fn rocksdb_drop_column_family(db: DBInstance,
column_family_handle: DBCFHandle, column_family_handle: DBCFHandle,
err: *mut *const i8); err: *mut *const i8);
@ -430,7 +435,13 @@ fn internal() {
let key = b"name\x00"; let key = b"name\x00";
let val = b"spacejam\x00"; let val = b"spacejam\x00";
rocksdb_put(db, writeopts.clone(), key.as_ptr(), 4, val.as_ptr(), 8, err_ptr); rocksdb_put(db,
writeopts.clone(),
key.as_ptr(),
4,
val.as_ptr(),
8,
err_ptr);
rocksdb_writeoptions_destroy(writeopts); rocksdb_writeoptions_destroy(writeopts);
assert!(err.is_null()); assert!(err.is_null());
@ -439,7 +450,12 @@ fn internal() {
let val_len: size_t = 0; let val_len: size_t = 0;
let val_len_ptr = &val_len as *const size_t; let val_len_ptr = &val_len as *const size_t;
rocksdb_get(db, readopts.clone(), key.as_ptr(), 4, val_len_ptr, err_ptr); rocksdb_get(db,
readopts.clone(),
key.as_ptr(),
4,
val_len_ptr,
err_ptr);
rocksdb_readoptions_destroy(readopts); rocksdb_readoptions_destroy(readopts);
assert!(err.is_null()); assert!(err.is_null());
rocksdb_close(db); rocksdb_close(db);

@ -1,22 +1,22 @@
/* //
Copyright 2014 Tyler Neely // Copyright 2014 Tyler Neely
//
Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
You may obtain a copy of the License at // You may obtain a copy of the License at
//
http://www.apache.org/licenses/LICENSE-2.0 // http://www.apache.org/licenses/LICENSE-2.0
//
Unless required by applicable law or agreed to in writing, software // Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, // distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
limitations under the License. // limitations under the License.
*/ //
pub use ffi as rocksdb_ffi; pub use ffi as rocksdb_ffi;
pub use ffi::{new_bloom_filter, DBCompactionStyle, DBComparator}; pub use ffi::{DBCompactionStyle, DBComparator, new_bloom_filter};
pub use rocksdb::{DB, DBVector, WriteBatch, Writable, Direction}; pub use rocksdb::{DB, DBVector, Direction, Writable, WriteBatch};
pub use rocksdb_options::{Options, BlockBasedOptions}; pub use rocksdb_options::{BlockBasedOptions, Options};
pub use merge_operator::MergeOperands; pub use merge_operator::MergeOperands;
pub mod rocksdb; pub mod rocksdb;
pub mod ffi; pub mod ffi;

@ -1,22 +1,22 @@
/* //
Copyright 2014 Tyler Neely // Copyright 2014 Tyler Neely
//
Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
You may obtain a copy of the License at // You may obtain a copy of the License at
//
http://www.apache.org/licenses/LICENSE-2.0 // http://www.apache.org/licenses/LICENSE-2.0
//
Unless required by applicable law or agreed to in writing, software // Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, // distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
limitations under the License. // limitations under the License.
*/ //
extern crate rocksdb; extern crate rocksdb;
use rocksdb::{Options, DB, MergeOperands, Writable, }; use rocksdb::{DB, MergeOperands, Options, Writable};
//fn snapshot_test() { // fn snapshot_test() {
// let path = "_rust_rocksdb_iteratortest"; // let path = "_rust_rocksdb_iteratortest";
// { // {
// let mut db = DB::open_default(path).unwrap(); // let mut db = DB::open_default(path).unwrap();
@ -30,18 +30,21 @@ use rocksdb::{Options, DB, MergeOperands, Writable, };
// let mut view1 = snap.iterator(); // let mut view1 = snap.iterator();
// println!("See the output of the first iter"); // println!("See the output of the first iter");
// for (k,v) in view1.from_start() { // for (k,v) in view1.from_start() {
// println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap()); // println!("Hello {}: {}", std::str::from_utf8(k).unwrap(),
// std::str::from_utf8(v).unwrap());
// }; // };
// for (k,v) in view1.from_start() { // for (k,v) in view1.from_start() {
// println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap()); // println!("Hello {}: {}", std::str::from_utf8(k).unwrap(),
// std::str::from_utf8(v).unwrap());
// }; // };
// for (k,v) in view1.from_end() { // for (k,v) in view1.from_end() {
// println!("Hello {}: {}", std::str::from_utf8(k).unwrap(), std::str::from_utf8(v).unwrap()); // println!("Hello {}: {}", std::str::from_utf8(k).unwrap(),
// std::str::from_utf8(v).unwrap());
// }; // };
// } // }
// let opts = Options::new(); // let opts = Options::new();
// assert!(DB::destroy(&opts, path).is_ok()); // assert!(DB::destroy(&opts, path).is_ok());
//} // }
#[cfg(not(feature = "valgrind"))] #[cfg(not(feature = "valgrind"))]
fn main() { fn main() {
@ -51,12 +54,10 @@ fn main() {
match db.get(b"my key") { match db.get(b"my key") {
Ok(Some(value)) => { Ok(Some(value)) => {
match value.to_utf8() { match value.to_utf8() {
Some(v) => Some(v) => println!("retrieved utf8 value: {}", v),
println!("retrieved utf8 value: {}", v), None => println!("did not read valid utf-8 out of the db"),
None =>
println!("did not read valid utf-8 out of the db"),
} }
}, }
Err(e) => println!("error retrieving value: {}", e), Err(e) => println!("error retrieving value: {}", e),
_ => panic!("value not present!"), _ => panic!("value not present!"),
} }
@ -66,8 +67,10 @@ fn main() {
custom_merge(); custom_merge();
} }
fn concat_merge(_: &[u8], existing_val: Option<&[u8]>, fn concat_merge(_: &[u8],
operands: &mut MergeOperands) -> Vec<u8> { existing_val: Option<&[u8]>,
operands: &mut MergeOperands)
-> Vec<u8> {
let mut result: Vec<u8> = Vec::with_capacity(operands.size_hint().0); let mut result: Vec<u8> = Vec::with_capacity(operands.size_hint().0);
match existing_val { match existing_val {
Some(v) => for e in v { Some(v) => for e in v {
@ -99,10 +102,8 @@ fn custom_merge() {
match db.get(b"k1") { match db.get(b"k1") {
Ok(Some(value)) => { Ok(Some(value)) => {
match value.to_utf8() { match value.to_utf8() {
Some(v) => Some(v) => println!("retrieved utf8 value: {}", v),
println!("retrieved utf8 value: {}", v), None => println!("did not read valid utf-8 out of the db"),
None =>
println!("did not read valid utf-8 out of the db"),
} }
} }
Err(e) => println!("error retrieving value: {}", e), Err(e) => println!("error retrieving value: {}", e),
@ -126,13 +127,14 @@ fn main() {
db.merge(b"k1", b"d"); db.merge(b"k1", b"d");
db.merge(b"k1", b"efg"); db.merge(b"k1", b"efg");
db.merge(b"k1", b"h"); db.merge(b"k1", b"h");
db.get(b"k1").map( |value| { db.get(b"k1")
match value.to_utf8() { .map(|value| {
Some(v) => (), match value.to_utf8() {
None => panic!("value corrupted"), Some(v) => (),
} None => panic!("value corrupted"),
}) }
.or_else( |e| { panic!("error retrieving value: {}", e) }); })
.or_else(|e| panic!("error retrieving value: {}", e));
db.delete(b"k1"); db.delete(b"k1");
} }
} }
@ -142,10 +144,14 @@ fn main() {
mod tests { mod tests {
use std::thread::sleep_ms; use std::thread::sleep_ms;
use rocksdb::{BlockBasedOptions, Options, DB, MergeOperands, new_bloom_filter, Writable }; use rocksdb::{BlockBasedOptions, DB, MergeOperands, Options, Writable,
new_bloom_filter};
use rocksdb::DBCompactionStyle::DBUniversalCompaction; use rocksdb::DBCompactionStyle::DBUniversalCompaction;
fn tuned_for_somebody_elses_disk(path: &str, opts: & mut Options, blockopts: &mut BlockBasedOptions) -> DB { fn tuned_for_somebody_elses_disk(path: &str,
opts: &mut Options,
blockopts: &mut BlockBasedOptions)
-> DB {
opts.create_if_missing(true); opts.create_if_missing(true);
opts.set_max_open_files(10000); opts.set_max_open_files(10000);
opts.set_use_fsync(false); opts.set_use_fsync(false);
@ -168,44 +174,44 @@ mod tests {
opts.set_disable_auto_compactions(true); opts.set_disable_auto_compactions(true);
let filter = new_bloom_filter(10); let filter = new_bloom_filter(10);
//opts.set_filter(filter); // opts.set_filter(filter);
DB::open(&opts, path).unwrap() DB::open(&opts, path).unwrap()
} }
/* TODO(tyler) unstable
#[bench]
fn a_writes(b: &mut Bencher) {
// dirty hack due to parallel tests causing contention.
sleep_ms(1000);
let path = "_rust_rocksdb_optimizetest";
let mut opts = Options::new();
let mut blockopts = BlockBasedOptions::new();
let mut db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts);
let mut i = 0 as u64;
b.iter(|| {
db.put(i.to_string().as_bytes(), b"v1111");
i += 1;
});
}
#[bench] // TODO(tyler) unstable
fn b_reads(b: &mut Bencher) { // #[bench]
let path = "_rust_rocksdb_optimizetest"; // fn a_writes(b: &mut Bencher) {
let mut opts = Options::new(); // dirty hack due to parallel tests causing contention.
let mut blockopts = BlockBasedOptions::new(); // sleep_ms(1000);
{ // let path = "_rust_rocksdb_optimizetest";
let db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts); // let mut opts = Options::new();
let mut i = 0 as u64; // let mut blockopts = BlockBasedOptions::new();
b.iter(|| { // let mut db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts);
db.get(i.to_string().as_bytes()).on_error( |e| { // let mut i = 0 as u64;
println!("error: {}", e); // b.iter(|| {
e // db.put(i.to_string().as_bytes(), b"v1111");
}); // i += 1;
i += 1; // });
}); // }
} //
DB::destroy(&opts, path).is_ok(); // #[bench]
} // fn b_reads(b: &mut Bencher) {
*/ // let path = "_rust_rocksdb_optimizetest";
// let mut opts = Options::new();
// let mut blockopts = BlockBasedOptions::new();
// {
// let db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts);
// let mut i = 0 as u64;
// b.iter(|| {
// db.get(i.to_string().as_bytes()).on_error( |e| {
// println!("error: {}", e);
// e
// });
// i += 1;
// });
// }
// DB::destroy(&opts, path).is_ok();
// }
//
} }

@ -1,18 +1,18 @@
/* //
Copyright 2014 Tyler Neely // Copyright 2014 Tyler Neely
//
Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
You may obtain a copy of the License at // You may obtain a copy of the License at
//
http://www.apache.org/licenses/LICENSE-2.0 // http://www.apache.org/licenses/LICENSE-2.0
//
Unless required by applicable law or agreed to in writing, software // Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, // distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
limitations under the License. // limitations under the License.
*/ //
extern crate libc; extern crate libc;
use self::libc::{c_char, c_int, c_void, size_t}; use self::libc::{c_char, c_int, c_void, size_t};
use std::ffi::CString; use std::ffi::CString;
@ -30,7 +30,7 @@ pub struct MergeOperatorCallback {
pub extern "C" fn destructor_callback(raw_cb: *mut c_void) { pub extern "C" fn destructor_callback(raw_cb: *mut c_void) {
// turn this back into a local variable so rust will reclaim it // turn this back into a local variable so rust will reclaim it
let _: Box<MergeOperatorCallback> = unsafe {mem::transmute(raw_cb)}; let _: Box<MergeOperatorCallback> = unsafe { mem::transmute(raw_cb) };
} }
@ -57,17 +57,16 @@ pub extern "C" fn full_merge_callback(raw_cb: *mut c_void,
unsafe { unsafe {
let cb: &mut MergeOperatorCallback = let cb: &mut MergeOperatorCallback =
&mut *(raw_cb as *mut MergeOperatorCallback); &mut *(raw_cb as *mut MergeOperatorCallback);
let operands = let operands = &mut MergeOperands::new(operands_list,
&mut MergeOperands::new(operands_list, operands_list_len,
operands_list_len, num_operands);
num_operands); let key: &[u8] = slice::from_raw_parts(raw_key as *const u8,
let key: &[u8] = slice::from_raw_parts(raw_key as *const u8, key_len as usize); key_len as usize);
let oldval: &[u8] = slice::from_raw_parts(existing_value as *const u8, let oldval: &[u8] = slice::from_raw_parts(existing_value as *const u8,
existing_value_len as usize); existing_value_len as usize);
let mut result = let mut result = (cb.merge_fn)(key, Some(oldval), operands);
(cb.merge_fn)(key, Some(oldval), operands);
result.shrink_to_fit(); result.shrink_to_fit();
//TODO(tan) investigate zero-copy techniques to improve performance // TODO(tan) investigate zero-copy techniques to improve performance
let buf = libc::malloc(result.len() as size_t); let buf = libc::malloc(result.len() as size_t);
assert!(!buf.is_null()); assert!(!buf.is_null());
*new_value_length = result.len() as size_t; *new_value_length = result.len() as size_t;
@ -92,10 +91,11 @@ pub extern "C" fn partial_merge_callback(raw_cb: *mut c_void,
let operands = &mut MergeOperands::new(operands_list, let operands = &mut MergeOperands::new(operands_list,
operands_list_len, operands_list_len,
num_operands); num_operands);
let key: &[u8] = slice::from_raw_parts(raw_key as *const u8, key_len as usize); let key: &[u8] = slice::from_raw_parts(raw_key as *const u8,
key_len as usize);
let mut result = (cb.merge_fn)(key, None, operands); let mut result = (cb.merge_fn)(key, None, operands);
result.shrink_to_fit(); result.shrink_to_fit();
//TODO(tan) investigate zero-copy techniques to improve performance // TODO(tan) investigate zero-copy techniques to improve performance
let buf = libc::malloc(result.len() as size_t); let buf = libc::malloc(result.len() as size_t);
assert!(!buf.is_null()); assert!(!buf.is_null());
*new_value_length = 1 as size_t; *new_value_length = 1 as size_t;
@ -168,7 +168,7 @@ fn test_provided_merge(new_key: &[u8],
for e in v { for e in v {
result.push(*e); result.push(*e);
} }
}, }
None => (), None => (),
} }
for op in operands { for op in operands {
@ -199,13 +199,13 @@ fn mergetest() {
match db.get(b"k1") { match db.get(b"k1") {
Ok(Some(value)) => { Ok(Some(value)) => {
match value.to_utf8() { match value.to_utf8() {
Some(v) => Some(v) => println!("retrieved utf8 value: {}", v),
println!("retrieved utf8 value: {}", v), None => println!("did not read valid utf-8 out of the db"),
None =>
println!("did not read valid utf-8 out of the db"),
} }
}, }
Err(e) => { println!("error reading value")}, Err(e) => {
println!("error reading value")
}
_ => panic!("value not present"), _ => panic!("value not present"),
} }

@ -1,23 +1,23 @@
/* //
Copyright 2014 Tyler Neely // Copyright 2014 Tyler Neely
//
Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
You may obtain a copy of the License at // You may obtain a copy of the License at
//
http://www.apache.org/licenses/LICENSE-2.0 // http://www.apache.org/licenses/LICENSE-2.0
//
Unless required by applicable law or agreed to in writing, software // Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, // distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
limitations under the License. // limitations under the License.
*/ //
extern crate libc; extern crate libc;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::ffi::{CString, CStr}; use std::ffi::{CStr, CString};
use std::fs; use std::fs;
use std::io; use std::io;
use std::ops::Deref; use std::ops::Deref;
@ -87,12 +87,21 @@ impl <'a> Iterator for SubDBIterator<'a> {
let key_len_ptr: *mut size_t = &mut key_len; let key_len_ptr: *mut size_t = &mut key_len;
let mut val_len: size_t = 0; let mut val_len: size_t = 0;
let val_len_ptr: *mut size_t = &mut val_len; let val_len_ptr: *mut size_t = &mut val_len;
let key_ptr = unsafe { rocksdb_ffi::rocksdb_iter_key(native_iter, key_len_ptr) }; let key_ptr = unsafe {
let key = unsafe { slice::from_raw_parts(key_ptr, key_len as usize) }; rocksdb_ffi::rocksdb_iter_key(native_iter, key_len_ptr)
let val_ptr = unsafe { rocksdb_ffi::rocksdb_iter_value(native_iter, val_len_ptr) }; };
let val = unsafe { slice::from_raw_parts(val_ptr, val_len as usize) }; let key = unsafe {
slice::from_raw_parts(key_ptr, key_len as usize)
Some((key.to_vec().into_boxed_slice(),val.to_vec().into_boxed_slice())) };
let val_ptr = unsafe {
rocksdb_ffi::rocksdb_iter_value(native_iter, val_len_ptr)
};
let val = unsafe {
slice::from_raw_parts(val_ptr, val_len as usize)
};
Some((key.to_vec().into_boxed_slice(),
val.to_vec().into_boxed_slice()))
} else { } else {
None None
} }
@ -100,26 +109,41 @@ impl <'a> Iterator for SubDBIterator<'a> {
} }
impl DBIterator { impl DBIterator {
//TODO alias db & opts to different lifetimes, and DBIterator to the db's lifetime // TODO alias db & opts to different lifetimes, and DBIterator to the db's
// lifetime
fn new(db: &DB, readopts: &ReadOptions) -> DBIterator { fn new(db: &DB, readopts: &ReadOptions) -> DBIterator {
unsafe { unsafe {
let iterator = rocksdb_ffi::rocksdb_create_iterator(db.inner, readopts.inner); let iterator = rocksdb_ffi::rocksdb_create_iterator(db.inner,
readopts.inner);
rocksdb_ffi::rocksdb_iter_seek_to_first(iterator); rocksdb_ffi::rocksdb_iter_seek_to_first(iterator);
DBIterator{ inner: iterator, direction: Direction::forward, just_seeked: true } DBIterator {
inner: iterator,
direction: Direction::forward,
just_seeked: true,
}
} }
} }
fn new_cf(db: &DB, cf_name: &str, readopts: &ReadOptions) -> Result<DBIterator, String> { fn new_cf(db: &DB,
cf_name: &str,
readopts: &ReadOptions)
-> Result<DBIterator, String> {
let cf = db.cfs.get(cf_name); let cf = db.cfs.get(cf_name);
if cf.is_none() { if cf.is_none() {
return Err(format!("Invalid column family: {}", cf_name).to_string()); return Err(format!("Invalid column family: {}", cf_name)
.to_string());
} }
unsafe { unsafe {
let iterator = rocksdb_ffi::rocksdb_create_iterator_cf(db.inner, let iterator =
readopts.inner, rocksdb_ffi::rocksdb_create_iterator_cf(db.inner,
*cf.unwrap()); readopts.inner,
*cf.unwrap());
rocksdb_ffi::rocksdb_iter_seek_to_first(iterator); rocksdb_ffi::rocksdb_iter_seek_to_first(iterator);
Ok(DBIterator{ inner: iterator, direction: Direction::forward, just_seeked: true }) Ok(DBIterator {
inner: iterator,
direction: Direction::forward,
just_seeked: true,
})
} }
} }
@ -127,24 +151,35 @@ impl DBIterator {
self.just_seeked = true; self.just_seeked = true;
unsafe { unsafe {
rocksdb_ffi::rocksdb_iter_seek_to_first(self.inner); rocksdb_ffi::rocksdb_iter_seek_to_first(self.inner);
}; }
SubDBIterator { iter: self, direction: Direction::forward } SubDBIterator {
iter: self,
direction: Direction::forward,
}
} }
pub fn from_end(&mut self) -> SubDBIterator { pub fn from_end(&mut self) -> SubDBIterator {
self.just_seeked = true; self.just_seeked = true;
unsafe { unsafe {
rocksdb_ffi::rocksdb_iter_seek_to_last(self.inner); rocksdb_ffi::rocksdb_iter_seek_to_last(self.inner);
}; }
SubDBIterator { iter: self, direction: Direction::reverse } SubDBIterator {
iter: self,
direction: Direction::reverse,
}
} }
pub fn from(&mut self, key: &[u8], dir: Direction) -> SubDBIterator { pub fn from(&mut self, key: &[u8], dir: Direction) -> SubDBIterator {
self.just_seeked = true; self.just_seeked = true;
unsafe { unsafe {
rocksdb_ffi::rocksdb_iter_seek(self.inner, key.as_ptr(), key.len() as size_t); rocksdb_ffi::rocksdb_iter_seek(self.inner,
key.as_ptr(),
key.len() as size_t);
}
SubDBIterator {
iter: self,
direction: dir,
} }
SubDBIterator { iter: self, direction: dir }
} }
} }
@ -158,8 +193,13 @@ impl Drop for DBIterator {
impl <'a> Snapshot<'a> { impl <'a> Snapshot<'a> {
pub fn new(db: &DB) -> Snapshot { pub fn new(db: &DB) -> Snapshot {
let snapshot = unsafe { rocksdb_ffi::rocksdb_create_snapshot(db.inner) }; let snapshot = unsafe {
Snapshot { db: db, inner: snapshot } rocksdb_ffi::rocksdb_create_snapshot(db.inner)
};
Snapshot {
db: db,
inner: snapshot,
}
} }
pub fn iterator(&self) -> DBIterator { pub fn iterator(&self) -> DBIterator {
@ -180,9 +220,17 @@ impl <'a> Drop for Snapshot<'a> {
// This is for the DB and write batches to share the same API // This is for the DB and write batches to share the same API
pub trait Writable { pub trait Writable {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String>; fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String>;
fn put_cf(&self, cf: DBCFHandle, key: &[u8], value: &[u8]) -> Result<(), String>; fn put_cf(&self,
cf: DBCFHandle,
key: &[u8],
value: &[u8])
-> Result<(), String>;
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String>; fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String>;
fn merge_cf(&self, cf: DBCFHandle, key: &[u8], value: &[u8]) -> Result<(), String>; fn merge_cf(&self,
cf: DBCFHandle,
key: &[u8],
value: &[u8])
-> Result<(), String>;
fn delete(&self, key: &[u8]) -> Result<(), String>; fn delete(&self, key: &[u8]) -> Result<(), String>;
fn delete_cf(&self, cf: DBCFHandle, key: &[u8]) -> Result<(), String>; fn delete_cf(&self, cf: DBCFHandle, key: &[u8]) -> Result<(), String>;
} }
@ -198,17 +246,22 @@ impl DB {
DB::open_cf(opts, path, &[]) DB::open_cf(opts, path, &[])
} }
pub fn open_cf(opts: &Options, path: &str, cfs: &[&str]) -> Result<DB, String> { pub fn open_cf(opts: &Options,
path: &str,
cfs: &[&str])
-> Result<DB, String> {
let cpath = match CString::new(path.as_bytes()) { let cpath = match CString::new(path.as_bytes()) {
Ok(c) => c, Ok(c) => c,
Err(_) => Err(_) => return Err("Failed to convert path to CString when \
return Err("Failed to convert path to CString when opening rocksdb".to_string()), opening rocksdb"
.to_string()),
}; };
let cpath_ptr = cpath.as_ptr(); let cpath_ptr = cpath.as_ptr();
let ospath = Path::new(path); let ospath = Path::new(path);
match fs::create_dir_all(&ospath) { match fs::create_dir_all(&ospath) {
Err(e) => return Err("Failed to create rocksdb directory.".to_string()), Err(e) =>
return Err("Failed to create rocksdb directory.".to_string()),
Ok(_) => (), Ok(_) => (),
} }
@ -230,24 +283,28 @@ impl DB {
// We need to store our CStrings in an intermediate vector // We need to store our CStrings in an intermediate vector
// so that their pointers remain valid. // so that their pointers remain valid.
let c_cfs: Vec<CString> = cfs_v.iter().map( |cf| { let c_cfs: Vec<CString> = cfs_v.iter()
CString::new(cf.as_bytes()).unwrap() .map(|cf| {
}).collect(); CString::new(cf.as_bytes())
.unwrap()
})
.collect();
let cfnames: Vec<*const i8> = c_cfs.iter().map( |cf| { let cfnames: Vec<*const i8> = c_cfs.iter()
cf.as_ptr() .map(|cf| cf.as_ptr())
}).collect(); .collect();
// These handles will be populated by DB. // These handles will be populated by DB.
let mut cfhandles: Vec<rocksdb_ffi::DBCFHandle> = let mut cfhandles: Vec<rocksdb_ffi::DBCFHandle> =
cfs_v.iter().map( |_| { cfs_v.iter()
rocksdb_ffi::DBCFHandle(0 as *mut c_void) .map(|_| rocksdb_ffi::DBCFHandle(0 as *mut c_void))
}).collect(); .collect();
// TODO(tyler) allow options to be passed in. // TODO(tyler) allow options to be passed in.
let cfopts: Vec<rocksdb_ffi::DBOptions> = cfs_v.iter().map( |_| { let cfopts: Vec<rocksdb_ffi::DBOptions> =
unsafe { rocksdb_ffi::rocksdb_options_create() } cfs_v.iter()
}).collect(); .map(|_| unsafe { rocksdb_ffi::rocksdb_options_create() })
.collect();
// Prepare to ship to C. // Prepare to ship to C.
let copts: *const rocksdb_ffi::DBOptions = cfopts.as_ptr(); let copts: *const rocksdb_ffi::DBOptions = cfopts.as_ptr();
@ -262,7 +319,8 @@ impl DB {
for handle in cfhandles.iter() { for handle in cfhandles.iter() {
if handle.0.is_null() { if handle.0.is_null() {
return Err("Received null column family handle from DB.".to_string()); return Err("Received null column family handle from DB."
.to_string());
} }
} }
@ -278,7 +336,10 @@ impl DB {
return Err("Could not initialize database.".to_string()); return Err("Could not initialize database.".to_string());
} }
Ok(DB { inner: db, cfs: cfMap }) Ok(DB {
inner: db,
cfs: cfMap,
})
} }
pub fn destroy(opts: &Options, path: &str) -> Result<(), String> { pub fn destroy(opts: &Options, path: &str) -> Result<(), String> {
@ -318,31 +379,40 @@ impl DB {
let mut err: *const i8 = 0 as *const i8; let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err; let err_ptr: *mut *const i8 = &mut err;
unsafe { unsafe {
rocksdb_ffi::rocksdb_write(self.inner, writeopts.clone(), batch.inner, err_ptr); rocksdb_ffi::rocksdb_write(self.inner,
writeopts.clone(),
batch.inner,
err_ptr);
rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts);
} }
if !err.is_null() { if !err.is_null() {
return Err(error_message(err)); return Err(error_message(err));
} }
return Ok(()) return Ok(());
} }
pub fn get(&self, key: &[u8]) -> Result<Option<DBVector>, String> { pub fn get(&self, key: &[u8]) -> Result<Option<DBVector>, String> {
unsafe { unsafe {
let readopts = rocksdb_ffi::rocksdb_readoptions_create(); let readopts = rocksdb_ffi::rocksdb_readoptions_create();
if readopts.0.is_null() { if readopts.0.is_null() {
return Err("Unable to create rocksdb read \ return Err("Unable to create rocksdb read options. This is \
options. This is a fairly trivial call, and its failure \ a fairly trivial call, and its failure may be \
may be indicative of a mis-compiled or mis-loaded rocksdb \ indicative of a mis-compiled or mis-loaded \
library.".to_string()); rocksdb library."
.to_string());
} }
let val_len: size_t = 0; let val_len: size_t = 0;
let val_len_ptr = &val_len as *const size_t; let val_len_ptr = &val_len as *const size_t;
let mut err: *const i8 = 0 as *const i8; let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err; let err_ptr: *mut *const i8 = &mut err;
let val = rocksdb_ffi::rocksdb_get(self.inner, readopts.clone(), let val =
key.as_ptr(), key.len() as size_t, val_len_ptr, err_ptr) as *mut u8; rocksdb_ffi::rocksdb_get(self.inner,
readopts.clone(),
key.as_ptr(),
key.len() as size_t,
val_len_ptr,
err_ptr) as *mut u8;
rocksdb_ffi::rocksdb_readoptions_destroy(readopts); rocksdb_ffi::rocksdb_readoptions_destroy(readopts);
if !err.is_null() { if !err.is_null() {
return Err(error_message(err)); return Err(error_message(err));
@ -356,23 +426,32 @@ impl DB {
} }
} }
pub fn get_cf(&self, cf: DBCFHandle, key: &[u8]) -> Result<Option<DBVector>, String> { pub fn get_cf(&self,
cf: DBCFHandle,
key: &[u8])
-> Result<Option<DBVector>, String> {
unsafe { unsafe {
let readopts = rocksdb_ffi::rocksdb_readoptions_create(); let readopts = rocksdb_ffi::rocksdb_readoptions_create();
if readopts.0.is_null() { if readopts.0.is_null() {
return Err("Unable to create rocksdb read \ return Err("Unable to create rocksdb read options. This is \
options. This is a fairly trivial call, and its failure \ a fairly trivial call, and its failure may be \
may be indicative of a mis-compiled or mis-loaded rocksdb \ indicative of a mis-compiled or mis-loaded \
library.".to_string()); rocksdb library."
.to_string());
} }
let val_len: size_t = 0; let val_len: size_t = 0;
let val_len_ptr = &val_len as *const size_t; let val_len_ptr = &val_len as *const size_t;
let mut err: *const i8 = 0 as *const i8; let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err; let err_ptr: *mut *const i8 = &mut err;
let val = rocksdb_ffi::rocksdb_get_cf(self.inner, readopts.clone(), let val =
cf, key.as_ptr(), key.len() as size_t, val_len_ptr, rocksdb_ffi::rocksdb_get_cf(self.inner,
err_ptr) as *mut u8; readopts.clone(),
cf,
key.as_ptr(),
key.len() as size_t,
val_len_ptr,
err_ptr) as *mut u8;
rocksdb_ffi::rocksdb_readoptions_destroy(readopts); rocksdb_ffi::rocksdb_readoptions_destroy(readopts);
if !err.is_null() { if !err.is_null() {
return Err(error_message(err)); return Err(error_message(err));
@ -386,18 +465,25 @@ impl DB {
} }
} }
pub fn create_cf(&mut self, name: &str, opts: &Options) -> Result<DBCFHandle, String> { pub fn create_cf(&mut self,
name: &str,
opts: &Options)
-> Result<DBCFHandle, String> {
let cname = match CString::new(name.as_bytes()) { let cname = match CString::new(name.as_bytes()) {
Ok(c) => c, Ok(c) => c,
Err(_) => Err(_) => return Err("Failed to convert path to CString when \
return Err("Failed to convert path to CString when opening rocksdb".to_string()), opening rocksdb"
.to_string()),
}; };
let cname_ptr = cname.as_ptr(); let cname_ptr = cname.as_ptr();
let mut err: *const i8 = 0 as *const i8; let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err; let err_ptr: *mut *const i8 = &mut err;
let cf_handler = unsafe { let cf_handler = unsafe {
let cf_handler = rocksdb_ffi::rocksdb_create_column_family( let cf_handler =
self.inner, opts.inner, cname_ptr, err_ptr); rocksdb_ffi::rocksdb_create_column_family(self.inner,
opts.inner,
cname_ptr,
err_ptr);
self.cfs.insert(name.to_string(), cf_handler); self.cfs.insert(name.to_string(), cf_handler);
cf_handler cf_handler
}; };
@ -416,7 +502,9 @@ impl DB {
let mut err: *const i8 = 0 as *const i8; let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err; let err_ptr: *mut *const i8 = &mut err;
unsafe { unsafe {
rocksdb_ffi::rocksdb_drop_column_family(self.inner, *cf.unwrap(), err_ptr); rocksdb_ffi::rocksdb_drop_column_family(self.inner,
*cf.unwrap(),
err_ptr);
} }
if !err.is_null() { if !err.is_null() {
return Err(error_message(err)); return Err(error_message(err));
@ -445,14 +533,18 @@ impl DB {
} }
impl Writable for DB { impl Writable for DB {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> { fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe { unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let mut err: *const i8 = 0 as *const i8; let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err; let err_ptr: *mut *const i8 = &mut err;
rocksdb_ffi::rocksdb_put(self.inner, writeopts.clone(), key.as_ptr(), rocksdb_ffi::rocksdb_put(self.inner,
key.len() as size_t, value.as_ptr(), writeopts.clone(),
value.len() as size_t, err_ptr); key.as_ptr(),
key.len() as size_t,
value.as_ptr(),
value.len() as size_t,
err_ptr);
rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts);
if !err.is_null() { if !err.is_null() {
return Err(error_message(err)); return Err(error_message(err));
@ -461,14 +553,23 @@ impl Writable for DB {
} }
} }
fn put_cf(&self, cf: DBCFHandle, key: &[u8], value: &[u8]) -> Result<(), String> { fn put_cf(&self,
cf: DBCFHandle,
key: &[u8],
value: &[u8])
-> Result<(), String> {
unsafe { unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let mut err: *const i8 = 0 as *const i8; let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err; let err_ptr: *mut *const i8 = &mut err;
rocksdb_ffi::rocksdb_put_cf(self.inner, writeopts.clone(), cf, rocksdb_ffi::rocksdb_put_cf(self.inner,
key.as_ptr(), key.len() as size_t, value.as_ptr(), writeopts.clone(),
value.len() as size_t, err_ptr); cf,
key.as_ptr(),
key.len() as size_t,
value.as_ptr(),
value.len() as size_t,
err_ptr);
rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts);
if !err.is_null() { if !err.is_null() {
return Err(error_message(err)); return Err(error_message(err));
@ -482,9 +583,13 @@ impl Writable for DB {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let mut err: *const i8 = 0 as *const i8; let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err; let err_ptr: *mut *const i8 = &mut err;
rocksdb_ffi::rocksdb_merge(self.inner, writeopts.clone(), key.as_ptr(), rocksdb_ffi::rocksdb_merge(self.inner,
key.len() as size_t, value.as_ptr(), writeopts.clone(),
value.len() as size_t, err_ptr); key.as_ptr(),
key.len() as size_t,
value.as_ptr(),
value.len() as size_t,
err_ptr);
rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts);
if !err.is_null() { if !err.is_null() {
return Err(error_message(err)); return Err(error_message(err));
@ -493,15 +598,23 @@ impl Writable for DB {
} }
} }
fn merge_cf(&self, cf: DBCFHandle, key: &[u8], value: &[u8]) -> Result<(), String> { fn merge_cf(&self,
cf: DBCFHandle,
key: &[u8],
value: &[u8])
-> Result<(), String> {
unsafe { unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let mut err: *const i8 = 0 as *const i8; let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err; let err_ptr: *mut *const i8 = &mut err;
rocksdb_ffi::rocksdb_merge_cf(self.inner, writeopts.clone(), rocksdb_ffi::rocksdb_merge_cf(self.inner,
cf, key.as_ptr(), writeopts.clone(),
key.len() as size_t, value.as_ptr(), cf,
value.len() as size_t, err_ptr); key.as_ptr(),
key.len() as size_t,
value.as_ptr(),
value.len() as size_t,
err_ptr);
rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts);
if !err.is_null() { if !err.is_null() {
return Err(error_message(err)); return Err(error_message(err));
@ -515,8 +628,11 @@ impl Writable for DB {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let mut err: *const i8 = 0 as *const i8; let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err; let err_ptr: *mut *const i8 = &mut err;
rocksdb_ffi::rocksdb_delete(self.inner, writeopts.clone(), key.as_ptr(), rocksdb_ffi::rocksdb_delete(self.inner,
key.len() as size_t, err_ptr); writeopts.clone(),
key.as_ptr(),
key.len() as size_t,
err_ptr);
rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts);
if !err.is_null() { if !err.is_null() {
return Err(error_message(err)); return Err(error_message(err));
@ -530,9 +646,12 @@ impl Writable for DB {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let mut err: *const i8 = 0 as *const i8; let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err; let err_ptr: *mut *const i8 = &mut err;
rocksdb_ffi::rocksdb_delete_cf(self.inner, writeopts.clone(), rocksdb_ffi::rocksdb_delete_cf(self.inner,
cf, key.as_ptr(), writeopts.clone(),
key.len() as size_t, err_ptr); cf,
key.as_ptr(),
key.len() as size_t,
err_ptr);
rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts); rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts);
if !err.is_null() { if !err.is_null() {
return Err(error_message(err)); return Err(error_message(err));
@ -545,18 +664,14 @@ impl Writable for DB {
impl WriteBatch { impl WriteBatch {
pub fn new() -> WriteBatch { pub fn new() -> WriteBatch {
WriteBatch { WriteBatch {
inner: unsafe { inner: unsafe { rocksdb_ffi::rocksdb_writebatch_create() },
rocksdb_ffi::rocksdb_writebatch_create()
},
} }
} }
} }
impl Drop for WriteBatch { impl Drop for WriteBatch {
fn drop(&mut self) { fn drop(&mut self) {
unsafe { unsafe { rocksdb_ffi::rocksdb_writebatch_destroy(self.inner) }
rocksdb_ffi::rocksdb_writebatch_destroy(self.inner)
}
} }
} }
@ -574,53 +689,73 @@ impl Drop for DB {
impl Writable for WriteBatch { impl Writable for WriteBatch {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> { fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe { unsafe {
rocksdb_ffi::rocksdb_writebatch_put(self.inner, key.as_ptr(), rocksdb_ffi::rocksdb_writebatch_put(self.inner,
key.len() as size_t, value.as_ptr(), key.as_ptr(),
value.len() as size_t); key.len() as size_t,
value.as_ptr(),
value.len() as size_t);
Ok(()) Ok(())
} }
} }
fn put_cf(&self, cf: DBCFHandle, key: &[u8], value: &[u8]) -> Result<(), String> { fn put_cf(&self,
cf: DBCFHandle,
key: &[u8],
value: &[u8])
-> Result<(), String> {
unsafe { unsafe {
rocksdb_ffi::rocksdb_writebatch_put_cf(self.inner, cf, key.as_ptr(), rocksdb_ffi::rocksdb_writebatch_put_cf(self.inner,
key.len() as size_t, value.as_ptr(), cf,
value.len() as size_t); key.as_ptr(),
key.len() as size_t,
value.as_ptr(),
value.len() as size_t);
Ok(()) Ok(())
} }
} }
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> { fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe { unsafe {
rocksdb_ffi::rocksdb_writebatch_merge(self.inner, key.as_ptr(), rocksdb_ffi::rocksdb_writebatch_merge(self.inner,
key.len() as size_t, value.as_ptr(), key.as_ptr(),
value.len() as size_t); key.len() as size_t,
value.as_ptr(),
value.len() as size_t);
Ok(()) Ok(())
} }
} }
fn merge_cf(&self, cf: DBCFHandle, key: &[u8], value: &[u8]) -> Result<(), String> { fn merge_cf(&self,
cf: DBCFHandle,
key: &[u8],
value: &[u8])
-> Result<(), String> {
unsafe { unsafe {
rocksdb_ffi::rocksdb_writebatch_merge_cf(self.inner, cf, key.as_ptr(), rocksdb_ffi::rocksdb_writebatch_merge_cf(self.inner,
key.len() as size_t, value.as_ptr(), cf,
value.len() as size_t); key.as_ptr(),
key.len() as size_t,
value.as_ptr(),
value.len() as size_t);
Ok(()) Ok(())
} }
} }
fn delete(&self, key: &[u8]) -> Result<(), String> { fn delete(&self, key: &[u8]) -> Result<(), String> {
unsafe { unsafe {
rocksdb_ffi::rocksdb_writebatch_delete(self.inner, key.as_ptr(), rocksdb_ffi::rocksdb_writebatch_delete(self.inner,
key.len() as size_t); key.as_ptr(),
key.len() as size_t);
Ok(()) Ok(())
} }
} }
fn delete_cf(&self, cf: DBCFHandle, key: &[u8]) -> Result<(), String> { fn delete_cf(&self, cf: DBCFHandle, key: &[u8]) -> Result<(), String> {
unsafe { unsafe {
rocksdb_ffi::rocksdb_writebatch_delete_cf(self.inner, rocksdb_ffi::rocksdb_writebatch_delete_cf(self.inner,
cf, key.as_ptr(), cf,
key.len() as size_t); key.as_ptr(),
key.len() as size_t);
Ok(()) Ok(())
} }
} }
@ -628,21 +763,19 @@ impl Writable for WriteBatch {
impl Drop for ReadOptions { impl Drop for ReadOptions {
fn drop(&mut self) { fn drop(&mut self) {
unsafe { unsafe { rocksdb_ffi::rocksdb_readoptions_destroy(self.inner) }
rocksdb_ffi::rocksdb_readoptions_destroy(self.inner)
}
} }
} }
impl ReadOptions { impl ReadOptions {
fn new() -> ReadOptions { fn new() -> ReadOptions {
unsafe { unsafe {
ReadOptions{inner: rocksdb_ffi::rocksdb_readoptions_create()} ReadOptions { inner: rocksdb_ffi::rocksdb_readoptions_create() }
} }
} }
// TODO add snapshot setting here // TODO add snapshot setting here
// TODO add snapshot wrapper structs with proper destructors; // TODO add snapshot wrapper structs with proper destructors;
// that struct needs an "iterator" impl too. // that struct needs an "iterator" impl too.
fn fill_cache(&mut self, v: bool) { fn fill_cache(&mut self, v: bool) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_readoptions_set_fill_cache(self.inner, v); rocksdb_ffi::rocksdb_readoptions_set_fill_cache(self.inner, v);
@ -651,7 +784,8 @@ impl ReadOptions {
fn set_snapshot(&mut self, snapshot: &Snapshot) { fn set_snapshot(&mut self, snapshot: &Snapshot) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_readoptions_set_snapshot(self.inner, snapshot.inner); rocksdb_ffi::rocksdb_readoptions_set_snapshot(self.inner,
snapshot.inner);
} }
} }
} }
@ -715,8 +849,10 @@ fn errors_do_stuff() {
let opts = Options::new(); let opts = Options::new();
// The DB will still be open when we try to destroy and the lock should fail // The DB will still be open when we try to destroy and the lock should fail
match DB::destroy(&opts, path) { match DB::destroy(&opts, path) {
Err(ref s) => assert!(s == "IO error: lock _rust_rocksdb_error/LOCK: No locks available"), Err(ref s) => assert!(s ==
Ok(_) => panic!("should fail") "IO error: lock _rust_rocksdb_error/LOCK: No \
locks available"),
Ok(_) => panic!("should fail"),
} }
} }
@ -725,7 +861,8 @@ fn writebatch_works() {
let path = "_rust_rocksdb_writebacktest"; let path = "_rust_rocksdb_writebacktest";
{ {
let mut db = DB::open_default(path).unwrap(); let mut db = DB::open_default(path).unwrap();
{ // test put {
// test put
let mut batch = WriteBatch::new(); let mut batch = WriteBatch::new();
assert!(db.get(b"k1").unwrap().is_none()); assert!(db.get(b"k1").unwrap().is_none());
batch.put(b"k1", b"v1111"); batch.put(b"k1", b"v1111");
@ -735,7 +872,8 @@ fn writebatch_works() {
let r: Result<Option<DBVector>, String> = db.get(b"k1"); let r: Result<Option<DBVector>, String> = db.get(b"k1");
assert!(r.unwrap().unwrap().to_utf8().unwrap() == "v1111"); assert!(r.unwrap().unwrap().to_utf8().unwrap() == "v1111");
} }
{ // test delete {
// test delete
let mut batch = WriteBatch::new(); let mut batch = WriteBatch::new();
batch.delete(b"k1"); batch.delete(b"k1");
let p = db.write(batch); let p = db.write(batch);
@ -759,8 +897,10 @@ fn iterator_test() {
let p = db.put(b"k3", b"v3333"); let p = db.put(b"k3", b"v3333");
assert!(p.is_ok()); assert!(p.is_ok());
let mut iter = db.iterator(); let mut iter = db.iterator();
for (k,v) in iter.from_start() { for (k, v) in iter.from_start() {
println!("Hello {}: {}", from_utf8(&*k).unwrap(), from_utf8(&*v).unwrap()); println!("Hello {}: {}",
from_utf8(&*k).unwrap(),
from_utf8(&*v).unwrap());
} }
} }
let opts = Options::new(); let opts = Options::new();

@ -1,25 +1,25 @@
/* //
Copyright 2014 Tyler Neely // Copyright 2014 Tyler Neely
//
Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
You may obtain a copy of the License at // You may obtain a copy of the License at
//
http://www.apache.org/licenses/LICENSE-2.0 // http://www.apache.org/licenses/LICENSE-2.0
//
Unless required by applicable law or agreed to in writing, software // Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, // distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
limitations under the License. // limitations under the License.
*/ //
extern crate libc; extern crate libc;
use self::libc::{c_int, size_t}; use self::libc::{c_int, size_t};
use std::ffi::CString; use std::ffi::CString;
use std::mem; use std::mem;
use rocksdb_ffi; use rocksdb_ffi;
use merge_operator::{self, MergeOperatorCallback, MergeOperands, use merge_operator::{self, MergeOperands, MergeOperatorCallback,
full_merge_callback, partial_merge_callback}; full_merge_callback, partial_merge_callback};
use comparator::{self, ComparatorCallback, compare_callback}; use comparator::{self, ComparatorCallback, compare_callback};
@ -49,7 +49,9 @@ impl Drop for BlockBasedOptions {
impl BlockBasedOptions { impl BlockBasedOptions {
pub fn new() -> BlockBasedOptions { pub fn new() -> BlockBasedOptions {
let block_opts = unsafe {rocksdb_ffi::rocksdb_block_based_options_create() }; let block_opts = unsafe {
rocksdb_ffi::rocksdb_block_based_options_create()
};
let rocksdb_ffi::DBBlockBasedTableOptions(opt_ptr) = block_opts; let rocksdb_ffi::DBBlockBasedTableOptions(opt_ptr) = block_opts;
if opt_ptr.is_null() { if opt_ptr.is_null() {
panic!("Could not create rocksdb block based options".to_string()); panic!("Could not create rocksdb block based options".to_string());
@ -59,34 +61,35 @@ impl BlockBasedOptions {
pub fn set_block_size(&mut self, size: u64) { pub fn set_block_size(&mut self, size: u64) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_block_based_options_set_block_size( rocksdb_ffi::rocksdb_block_based_options_set_block_size(self.inner,
self.inner, size); size);
} }
} }
}
//TODO figure out how to create these in a Rusty way // TODO figure out how to create these in a Rusty way
////pub fn set_filter(&mut self, filter: rocksdb_ffi::DBFilterPolicy) { // /pub fn set_filter(&mut self, filter: rocksdb_ffi::DBFilterPolicy) {
//// unsafe { // / unsafe {
//// rocksdb_ffi::rocksdb_block_based_options_set_filter_policy( // / rocksdb_ffi::rocksdb_block_based_options_set_filter_policy(
//// self.inner, filter); // / self.inner, filter);
//// } // / }
////} // /}
////pub fn set_cache(&mut self, cache: rocksdb_ffi::DBCache) { /// /pub fn set_cache(&mut self, cache: rocksdb_ffi::DBCache) {
//// unsafe { /// / unsafe {
//// rocksdb_ffi::rocksdb_block_based_options_set_block_cache( /// / rocksdb_ffi::rocksdb_block_based_options_set_block_cache(
//// self.inner, cache); /// / self.inner, cache);
//// } /// / }
////} /// /}
////pub fn set_cache_compressed(&mut self, cache: rocksdb_ffi::DBCache) { /// /pub fn set_cache_compressed(&mut self, cache: rocksdb_ffi::DBCache) {
//// unsafe { /// / unsafe {
//// rocksdb_ffi::rocksdb_block_based_options_set_block_cache_compressed( /// / rocksdb_ffi::
//// self.inner, cache); /// rocksdb_block_based_options_set_block_cache_compressed(
//// } /// / self.inner, cache);
////} /// / }
/// /}
}
impl Options { impl Options {
pub fn new() -> Options { pub fn new() -> Options {
@ -96,18 +99,19 @@ impl Options {
if opt_ptr.is_null() { if opt_ptr.is_null() {
panic!("Could not create rocksdb options".to_string()); panic!("Could not create rocksdb options".to_string());
} }
Options{ inner: opts, } Options { inner: opts }
} }
} }
pub fn increase_parallelism(&mut self, parallelism: i32) { pub fn increase_parallelism(&mut self, parallelism: i32) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_increase_parallelism( rocksdb_ffi::rocksdb_options_increase_parallelism(self.inner,
self.inner, parallelism); parallelism);
} }
} }
pub fn optimize_level_style_compaction(&mut self, memtable_memory_budget: i32) { pub fn optimize_level_style_compaction(&mut self,
memtable_memory_budget: i32) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_optimize_level_style_compaction( rocksdb_ffi::rocksdb_options_optimize_level_style_compaction(
self.inner, memtable_memory_budget); self.inner, memtable_memory_budget);
@ -141,7 +145,9 @@ impl Options {
} }
} }
pub fn add_comparator<'a>(&mut self, name: &str, compare_fn: fn(&[u8], &[u8]) -> i32) { pub fn add_comparator<'a>(&mut self,
name: &str,
compare_fn: fn(&[u8], &[u8]) -> i32) {
let cb = Box::new(ComparatorCallback { let cb = Box::new(ComparatorCallback {
name: CString::new(name.as_bytes()).unwrap(), name: CString::new(name.as_bytes()).unwrap(),
f: compare_fn, f: compare_fn,
@ -160,8 +166,8 @@ impl Options {
pub fn set_block_cache_size_mb(&mut self, cache_size: u64) { pub fn set_block_cache_size_mb(&mut self, cache_size: u64) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_optimize_for_point_lookup( rocksdb_ffi::rocksdb_options_optimize_for_point_lookup(self.inner,
self.inner, cache_size); cache_size);
} }
} }
@ -184,8 +190,7 @@ impl Options {
pub fn set_bytes_per_sync(&mut self, nbytes: u64) { pub fn set_bytes_per_sync(&mut self, nbytes: u64) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_bytes_per_sync( rocksdb_ffi::rocksdb_options_set_bytes_per_sync(self.inner, nbytes);
self.inner, nbytes);
} }
} }
@ -204,8 +209,8 @@ impl Options {
pub fn set_table_cache_num_shard_bits(&mut self, nbits: c_int) { pub fn set_table_cache_num_shard_bits(&mut self, nbits: c_int) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_table_cache_numshardbits( rocksdb_ffi::rocksdb_options_set_table_cache_numshardbits(self.inner,
self.inner, nbits); nbits);
} }
} }
@ -218,22 +223,22 @@ impl Options {
pub fn set_max_write_buffer_number(&mut self, nbuf: c_int) { pub fn set_max_write_buffer_number(&mut self, nbuf: c_int) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_max_write_buffer_number( rocksdb_ffi::rocksdb_options_set_max_write_buffer_number(self.inner,
self.inner, nbuf); nbuf);
} }
} }
pub fn set_write_buffer_size(&mut self, size: size_t) { pub fn set_write_buffer_size(&mut self, size: size_t) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_write_buffer_size( rocksdb_ffi::rocksdb_options_set_write_buffer_size(self.inner,
self.inner, size); size);
} }
} }
pub fn set_target_file_size_base(&mut self, size: u64) { pub fn set_target_file_size_base(&mut self, size: u64) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_target_file_size_base( rocksdb_ffi::rocksdb_options_set_target_file_size_base(self.inner,
self.inner, size); size);
} }
} }
@ -258,10 +263,11 @@ impl Options {
} }
} }
pub fn set_compaction_style(&mut self, style: rocksdb_ffi::DBCompactionStyle) { pub fn set_compaction_style(&mut self,
style: rocksdb_ffi::DBCompactionStyle) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_compaction_style( rocksdb_ffi::rocksdb_options_set_compaction_style(self.inner,
self.inner, style); style);
} }
} }
@ -274,15 +280,14 @@ impl Options {
pub fn set_max_background_flushes(&mut self, n: c_int) { pub fn set_max_background_flushes(&mut self, n: c_int) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_max_background_flushes( rocksdb_ffi::rocksdb_options_set_max_background_flushes(self.inner,
self.inner, n); n);
} }
} }
pub fn set_filter_deletes(&mut self, filter: bool) { pub fn set_filter_deletes(&mut self, filter: bool) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_filter_deletes( rocksdb_ffi::rocksdb_options_set_filter_deletes(self.inner, filter);
self.inner, filter);
} }
} }
@ -299,7 +304,8 @@ impl Options {
} }
} }
pub fn set_block_based_table_factory(&mut self, factory: &BlockBasedOptions) { pub fn set_block_based_table_factory(&mut self,
factory: &BlockBasedOptions) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_block_based_table_factory(self.inner, factory.inner); rocksdb_ffi::rocksdb_options_set_block_based_table_factory(self.inner, factory.inner);
} }

@ -1,19 +1,19 @@
/* //
Copyright 2014 Tyler Neely // Copyright 2014 Tyler Neely
//
Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
You may obtain a copy of the License at // You may obtain a copy of the License at
//
http://www.apache.org/licenses/LICENSE-2.0 // http://www.apache.org/licenses/LICENSE-2.0
//
Unless required by applicable law or agreed to in writing, software // Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, // distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
limitations under the License. // limitations under the License.
*/ //
use rocksdb::{Options, DB, Writable, MergeOperands}; use rocksdb::{DB, MergeOperands, Options, Writable};
#[test] #[test]
pub fn test_column_family() { pub fn test_column_family() {
@ -30,7 +30,7 @@ pub fn test_column_family() {
Ok(_) => println!("cf1 created successfully"), Ok(_) => println!("cf1 created successfully"),
Err(e) => { Err(e) => {
panic!("could not create column family: {}", e); panic!("could not create column family: {}", e);
}, }
} }
} }
@ -39,9 +39,11 @@ pub fn test_column_family() {
let mut opts = Options::new(); let mut opts = Options::new();
opts.add_merge_operator("test operator", test_provided_merge); opts.add_merge_operator("test operator", test_provided_merge);
match DB::open(&opts, path) { match DB::open(&opts, path) {
Ok(_) => panic!("should not have opened DB successfully without specifying column Ok(_) => panic!("should not have opened DB successfully without \
specifying column
families"), families"),
Err(e) => assert!(e.starts_with("Invalid argument: You have to open all column families.")), Err(e) => assert!(e.starts_with("Invalid argument: You have to \
open all column families.")),
} }
} }
@ -62,12 +64,13 @@ pub fn test_column_family() {
Ok(db) => { Ok(db) => {
println!("successfully opened db with column family"); println!("successfully opened db with column family");
db db
}, }
Err(e) => panic!("failed to open db with column family: {}", e), Err(e) => panic!("failed to open db with column family: {}", e),
}; };
let cf1 = *db.cf_handle("cf1").unwrap(); let cf1 = *db.cf_handle("cf1").unwrap();
assert!(db.put_cf(cf1, b"k1", b"v1").is_ok()); assert!(db.put_cf(cf1, b"k1", b"v1").is_ok());
assert!(db.get_cf(cf1, b"k1").unwrap().unwrap().to_utf8().unwrap() == "v1"); assert!(db.get_cf(cf1, b"k1").unwrap().unwrap().to_utf8().unwrap() ==
"v1");
let p = db.put_cf(cf1, b"k1", b"a"); let p = db.put_cf(cf1, b"k1", b"a");
assert!(p.is_ok()); assert!(p.is_ok());
db.merge_cf(cf1, b"k1", b"b").unwrap(); db.merge_cf(cf1, b"k1", b"b").unwrap();
@ -80,12 +83,10 @@ pub fn test_column_family() {
match db.get(b"k1") { match db.get(b"k1") {
Ok(Some(value)) => { Ok(Some(value)) => {
match value.to_utf8() { match value.to_utf8() {
Some(v) => Some(v) => println!("retrieved utf8 value: {}", v),
println!("retrieved utf8 value: {}", v), None => println!("did not read valid utf-8 out of the db"),
None =>
println!("did not read valid utf-8 out of the db"),
} }
}, }
Err(e) => println!("error reading value"), Err(e) => println!("error reading value"),
_ => panic!("value not present!"), _ => panic!("value not present!"),
} }
@ -124,7 +125,7 @@ fn test_provided_merge(_: &[u8],
for e in v { for e in v {
result.push(*e); result.push(*e);
} }
}, }
None => (), None => (),
} }
for op in operands { for op in operands {

@ -1,4 +1,4 @@
use rocksdb::{Options, DB, Writable, Direction}; use rocksdb::{DB, Direction, Options, Writable};
fn cba(input: &Box<[u8]>) -> Box<[u8]> { fn cba(input: &Box<[u8]>) -> Box<[u8]> {
input.iter().cloned().collect::<Vec<_>>().into_boxed_slice() input.iter().cloned().collect::<Vec<_>>().into_boxed_slice()
@ -8,14 +8,14 @@ fn cba(input: &Box<[u8]>) -> Box<[u8]> {
pub fn test_iterator() { pub fn test_iterator() {
let path = "_rust_rocksdb_iteratortest"; let path = "_rust_rocksdb_iteratortest";
{ {
let k1:Box<[u8]> = b"k1".to_vec().into_boxed_slice(); let k1: Box<[u8]> = b"k1".to_vec().into_boxed_slice();
let k2:Box<[u8]> = b"k2".to_vec().into_boxed_slice(); let k2: Box<[u8]> = b"k2".to_vec().into_boxed_slice();
let k3:Box<[u8]> = b"k3".to_vec().into_boxed_slice(); let k3: Box<[u8]> = b"k3".to_vec().into_boxed_slice();
let k4:Box<[u8]> = b"k4".to_vec().into_boxed_slice(); let k4: Box<[u8]> = b"k4".to_vec().into_boxed_slice();
let v1:Box<[u8]> = b"v1111".to_vec().into_boxed_slice(); let v1: Box<[u8]> = b"v1111".to_vec().into_boxed_slice();
let v2:Box<[u8]> = b"v2222".to_vec().into_boxed_slice(); let v2: Box<[u8]> = b"v2222".to_vec().into_boxed_slice();
let v3:Box<[u8]> = b"v3333".to_vec().into_boxed_slice(); let v3: Box<[u8]> = b"v3333".to_vec().into_boxed_slice();
let v4:Box<[u8]> = b"v4444".to_vec().into_boxed_slice(); let v4: Box<[u8]> = b"v4444".to_vec().into_boxed_slice();
let db = DB::open_default(path).unwrap(); let db = DB::open_default(path).unwrap();
let p = db.put(&*k1, &*v1); let p = db.put(&*k1, &*v1);
assert!(p.is_ok()); assert!(p.is_ok());
@ -24,7 +24,9 @@ pub fn test_iterator() {
let p = db.put(&*k3, &*v3); let p = db.put(&*k3, &*v3);
assert!(p.is_ok()); assert!(p.is_ok());
let mut view1 = db.iterator(); let mut view1 = db.iterator();
let expected = vec![(cba(&k1), cba(&v1)), (cba(&k2), cba(&v2)), (cba(&k3), cba(&v3))]; let expected = vec![(cba(&k1), cba(&v1)),
(cba(&k2), cba(&v2)),
(cba(&k3), cba(&v3))];
{ {
let iterator1 = view1.from_start(); let iterator1 = view1.from_start();
assert_eq!(iterator1.collect::<Vec<_>>(), expected); assert_eq!(iterator1.collect::<Vec<_>>(), expected);
@ -86,7 +88,10 @@ pub fn test_iterator() {
let p = db.put(&*k4, &*v4); let p = db.put(&*k4, &*v4);
assert!(p.is_ok()); assert!(p.is_ok());
let mut view3 = db.iterator(); let mut view3 = db.iterator();
let expected2 = vec![(cba(&k1), cba(&v1)), (cba(&k2), cba(&v2)), (cba(&k3), cba(&v3)), (cba(&k4), cba(&v4))]; let expected2 = vec![(cba(&k1), cba(&v1)),
(cba(&k2), cba(&v2)),
(cba(&k3), cba(&v3)),
(cba(&k4), cba(&v4))];
{ {
let iterator1 = view1.from_start(); let iterator1 = view1.from_start();
assert_eq!(iterator1.collect::<Vec<_>>(), expected); assert_eq!(iterator1.collect::<Vec<_>>(), expected);
@ -97,7 +102,9 @@ pub fn test_iterator() {
} }
{ {
let iterator1 = view3.from(b"k2", Direction::forward); let iterator1 = view3.from(b"k2", Direction::forward);
let expected = vec![(cba(&k2), cba(&v2)), (cba(&k3), cba(&v3)), (cba(&k4), cba(&v4))]; let expected = vec![(cba(&k2), cba(&v2)),
(cba(&k3), cba(&v3)),
(cba(&k4), cba(&v4))];
assert_eq!(iterator1.collect::<Vec<_>>(), expected); assert_eq!(iterator1.collect::<Vec<_>>(), expected);
} }
{ {
@ -109,4 +116,3 @@ pub fn test_iterator() {
let opts = Options::new(); let opts = Options::new();
assert!(DB::destroy(&opts, path).is_ok()); assert!(DB::destroy(&opts, path).is_ok());
} }

@ -1,4 +1,4 @@
use rocksdb::{Options, DB, Writable}; use rocksdb::{DB, Options, Writable};
use std::thread; use std::thread;
use std::sync::Arc; use std::sync::Arc;
@ -14,21 +14,21 @@ pub fn test_multithreaded() {
db.put(b"key", b"value1").unwrap(); db.put(b"key", b"value1").unwrap();
let db1 = db.clone(); let db1 = db.clone();
let j1 = thread::spawn(move|| { let j1 = thread::spawn(move || {
for _ in 1..N { for _ in 1..N {
db1.put(b"key", b"value1").unwrap(); db1.put(b"key", b"value1").unwrap();
} }
}); });
let db2 = db.clone(); let db2 = db.clone();
let j2 = thread::spawn(move|| { let j2 = thread::spawn(move || {
for _ in 1..N { for _ in 1..N {
db2.put(b"key", b"value2").unwrap(); db2.put(b"key", b"value2").unwrap();
} }
}); });
let db3 = db.clone(); let db3 = db.clone();
let j3 = thread::spawn(move|| { let j3 = thread::spawn(move || {
for _ in 1..N { for _ in 1..N {
match db3.get(b"key") { match db3.get(b"key") {
Ok(Some(v)) => { Ok(Some(v)) => {

Loading…
Cancel
Save