diff --git a/CHANGELOG.md b/CHANGELOG.md index 117b034..85e2c0a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,16 @@ # Changelog +## Unreleased + +### 0.12.3 (2019-07-19) + +* Enabled sse4.2/pclmul for accelerated crc32c (yjh0502) +* Added `set_db_write_buffer_size` to the Options API (rnarubin) +* Bumped RocksDB to 6.1.2 (lispy) +* Added `Sync` and `Send` implementations to `Snapshot` (pavel-mukhanov) +* Added `raw_iterator_cf_opt` to the DB API (rnarubin) +* Added `DB::latest_sequence_number` method (vitvakatu) + ## 0.12.2 (2019-05-03) ### Changes diff --git a/Cargo.toml b/Cargo.toml index a2db300..4b0f81f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "rocksdb" description = "Rust wrapper for Facebook's RocksDB embeddable database" -version = "0.12.2" +version = "0.12.3" authors = ["Tyler Neely ", "David Greenberg "] license = "Apache-2.0" keywords = ["database", "embedded", "LSM-tree", "persistence"] diff --git a/librocksdb-sys/build.rs b/librocksdb-sys/build.rs index 503fecf..1b1567a 100644 --- a/librocksdb-sys/build.rs +++ b/librocksdb-sys/build.rs @@ -45,6 +45,8 @@ fn bindgen_rocksdb() { } fn build_rocksdb() { + let target = env::var("TARGET").unwrap(); + let mut config = cc::Build::new(); config.include("rocksdb/include/"); config.include("rocksdb/"); @@ -90,27 +92,40 @@ fn build_rocksdb() { .filter(|file| *file != "util/build_version.cc") .collect::>(); - if cfg!(target_os = "macos") { + if target.contains("x86_64") { + // This is needed to enable hardware CRC32C. Technically, SSE 4.2 is + // only available since Intel Nehalem (about 2010) and AMD Bulldozer + // (about 2011). + config.define("HAVE_PCLMUL", Some("1")); + config.define("HAVE_SSE42", Some("1")); + config.flag("-msse2"); + config.flag("-msse4.1"); + config.flag("-msse4.2"); + config.flag("-mpclmul"); + } + + if target.contains("darwin") { config.define("OS_MACOSX", Some("1")); config.define("ROCKSDB_PLATFORM_POSIX", Some("1")); config.define("ROCKSDB_LIB_IO_POSIX", Some("1")); } - if cfg!(target_os = "linux") { + if target.contains("linux") { config.define("OS_LINUX", Some("1")); config.define("ROCKSDB_PLATFORM_POSIX", Some("1")); config.define("ROCKSDB_LIB_IO_POSIX", Some("1")); // COMMON_FLAGS="$COMMON_FLAGS -fno-builtin-memcmp" } - if cfg!(target_os = "freebsd") { + if target.contains("freebsd") { config.define("OS_FREEBSD", Some("1")); config.define("ROCKSDB_PLATFORM_POSIX", Some("1")); config.define("ROCKSDB_LIB_IO_POSIX", Some("1")); } - if cfg!(target_os = "windows") { + if target.contains("windows") { link("rpcrt4", false); link("shlwapi", false); config.define("OS_WIN", Some("1")); + config.define("ROCKSDB_WINDOWS_UTF8_FILENAMES", Some("1")); // Remove POSIX-specific sources lib_sources = lib_sources @@ -131,7 +146,7 @@ fn build_rocksdb() { lib_sources.push("port/win/win_thread.cc"); } - if cfg!(target_env = "msvc") { + if target.contains("msvc") { config.flag("-EHsc"); } else { config.flag("-std=c++11"); @@ -152,13 +167,15 @@ fn build_rocksdb() { } fn build_snappy() { + let target = env::var("TARGET").unwrap(); + let mut config = cc::Build::new(); config.include("snappy/"); config.include("."); config.define("NDEBUG", Some("1")); - if cfg!(target_env = "msvc") { + if target.contains("msvc") { config.flag("-EHsc"); } else { config.flag("-std=c++11"); diff --git a/src/db.rs b/src/db.rs index 198eec2..261f1f0 100644 --- a/src/db.rs +++ b/src/db.rs @@ -103,6 +103,11 @@ pub struct Snapshot<'a> { inner: *const ffi::rocksdb_snapshot_t, } +/// `Send` and `Sync` implementations for `Snapshot` are safe, because `Snapshot` is +/// immutable and can be safely shared between threads. +unsafe impl<'a> Send for Snapshot<'a> {} +unsafe impl<'a> Sync for Snapshot<'a> {} + /// An iterator over a database or column family, with specifiable /// ranges and direction. /// @@ -193,8 +198,6 @@ pub struct DBIterator<'a> { just_seeked: bool, } -unsafe impl<'a> Send for DBIterator<'a> {} - pub enum Direction { Forward, Reverse, @@ -231,11 +234,28 @@ impl<'a> DBRawIterator<'a> { } } - /// Returns true if the iterator is valid. + /// Returns `true` if the iterator is valid. An iterator is invalidated when + /// it reaches the end of its defined range, or when it encounters an error. + /// + /// To check whether the iterator encountered an error after `valid` has + /// returned `false`, use the [`status`](DBRawIterator::status) method. `status` will never + /// return an error when `valid` is `true`. pub fn valid(&self) -> bool { unsafe { ffi::rocksdb_iter_valid(self.inner) != 0 } } + /// Returns an error `Result` if the iterator has encountered an error + /// during operation. When an error is encountered, the iterator is + /// invalidated and [`valid`](DBRawIterator::valid) will return `false` when called. + /// + /// Performing a seek will discard the current status. + pub fn status(&self) -> Result<(), Error> { + unsafe { + ffi_try!(ffi::rocksdb_iter_get_error(self.inner,)); + } + Ok(()) + } + /// Seeks to the first key in the database. /// /// # Examples @@ -511,9 +531,15 @@ impl<'a> DBIterator<'a> { self.just_seeked = true; } + /// See [`valid`](DBRawIterator::valid) pub fn valid(&self) -> bool { self.raw.valid() } + + /// See [`status`](DBRawIterator::status) + pub fn status(&self) -> Result<(), Error> { + self.raw.status() + } } impl<'a> Iterator for DBIterator<'a> { @@ -587,21 +613,25 @@ impl<'a> Snapshot<'a> { DBIterator::new_cf(self.db, cf_handle, &readopts, mode) } + /// Opens a raw iterator over the data in this snapshot, using the default read options. pub fn raw_iterator(&self) -> DBRawIterator { let readopts = ReadOptions::default(); self.raw_iterator_opt(readopts) } + /// Opens a raw iterator over the data in this snapshot under the given column family, using the default read options. pub fn raw_iterator_cf(&self, cf_handle: &ColumnFamily) -> Result { let readopts = ReadOptions::default(); self.raw_iterator_cf_opt(cf_handle, readopts) } + /// Opens a raw iterator over the data in this snapshot, using the given read options. pub fn raw_iterator_opt(&self, mut readopts: ReadOptions) -> DBRawIterator { readopts.set_snapshot(self); DBRawIterator::new(self.db, &readopts) } + /// Opens a raw iterator over the data in this snapshot under the given column family, using the given read options. pub fn raw_iterator_cf_opt( &self, cf_handle: &ColumnFamily, @@ -1151,16 +1181,32 @@ impl DB { ) } + /// Opens a raw iterator over the database, using the default read options pub fn raw_iterator(&self) -> DBRawIterator { let opts = ReadOptions::default(); DBRawIterator::new(self, &opts) } + /// Opens a raw iterator over the given column family, using the default read options pub fn raw_iterator_cf(&self, cf_handle: &ColumnFamily) -> Result { let opts = ReadOptions::default(); DBRawIterator::new_cf(self, cf_handle, &opts) } + /// Opens a raw iterator over the database, using the given read options + pub fn raw_iterator_opt(&self, readopts: &ReadOptions) -> DBRawIterator { + DBRawIterator::new(self, readopts) + } + + /// Opens a raw iterator over the given column family, using the given read options + pub fn raw_iterator_cf_opt( + &self, + cf_handle: &ColumnFamily, + readopts: &ReadOptions, + ) -> Result { + DBRawIterator::new_cf(self, cf_handle, readopts) + } + pub fn snapshot(&self) -> Snapshot { Snapshot::new(self) } @@ -1522,6 +1568,11 @@ impl DB { Err(e) => Err(e), } } + + /// The sequence number of the most recent transaction. + pub fn latest_sequence_number(&self) -> u64 { + unsafe { ffi::rocksdb_get_latest_sequence_number(self.inner) } + } } impl WriteBatch { @@ -1764,8 +1815,15 @@ impl ReadOptions { } } - /// Set the upper bound for an iterator, and the upper bound itself is not included on the iteration result. - pub fn set_iterate_upper_bound>(&mut self, key: K) { + /// Set the upper bound for an iterator. + /// The upper bound itself is not included on the iteration result. + /// + /// # Safety + /// + /// This function will store a clone of key and will give a raw pointer of it to the + /// underlying C++ API, therefore, when given to any other [`DB`] method you must ensure + /// that this [`ReadOptions`] value does not leave the scope too early (e.g. `DB::iterator_cf_opt`). + pub unsafe fn set_iterate_upper_bound>(&mut self, key: K) { let key = key.as_ref(); unsafe { ffi::rocksdb_readoptions_set_iterate_upper_bound( @@ -1812,6 +1870,16 @@ impl Default for ReadOptions { } } +// Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI +// pointer. In most cases, however, this pointer is Send-safe because it is never aliased and +// rocksdb internally does not rely on thread-local information for its user-exposed types. +unsafe impl<'a> Send for DBRawIterator<'a> {} +unsafe impl Send for ReadOptions {} + +// Sync is similarly safe for many types because they do not expose interior mutability, and their +// use within the rocksdb library is generally behind a const reference +unsafe impl Sync for ReadOptions {} + /// Vector of bytes stored in the database. /// /// This is a `C` allocated byte array and a length value. diff --git a/src/db_options.rs b/src/db_options.rs index 1e72037..890760e 100644 --- a/src/db_options.rs +++ b/src/db_options.rs @@ -34,7 +34,17 @@ pub fn new_cache(capacity: size_t) -> *mut ffi::rocksdb_cache_t { unsafe { ffi::rocksdb_cache_create_lru(capacity) } } +// Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI +// pointer. In most cases, however, this pointer is Send-safe because it is never aliased and +// rocksdb internally does not rely on thread-local information for its user-exposed types. unsafe impl Send for Options {} +unsafe impl Send for WriteOptions {} +unsafe impl Send for BlockBasedOptions {} +// Sync is similarly safe for many types because they do not expose interior mutability, and their +// use within the rocksdb library is generally behind a const reference +unsafe impl Sync for Options {} +unsafe impl Sync for WriteOptions {} +unsafe impl Sync for BlockBasedOptions {} impl Drop for Options { fn drop(&mut self) { @@ -603,30 +613,15 @@ impl Options { } } - /// Sets the total maximum number of write buffers to maintain in memory including - /// copies of buffers that have already been flushed. Unlike - /// max_write_buffer_number, this parameter does not affect flushing. - /// This controls the minimum amount of write history that will be available - /// in memory for conflict checking when Transactions are used. + /// Sets the maximum number of write buffers that are built up in memory. + /// The default and the minimum number is 2, so that when 1 write buffer + /// is being flushed to storage, new writes can continue to the other + /// write buffer. + /// If max_write_buffer_number > 3, writing will be slowed down to + /// options.delayed_write_rate if we are writing to the last write buffer + /// allowed. /// - /// When using an OptimisticTransactionDB: - /// If this value is too low, some transactions may fail at commit time due - /// to not being able to determine whether there were any write conflicts. - /// - /// When using a TransactionDB: - /// If Transaction::SetSnapshot is used, TransactionDB will read either - /// in-memory write buffers or SST files to do write-conflict checking. - /// Increasing this value can reduce the number of reads to SST files - /// done for conflict detection. - /// - /// Setting this value to `0` will cause write buffers to be freed immediately - /// after they are flushed. - /// If this value is set to `-1`, 'max_write_buffer_number' will be used. - /// - /// Default: - /// If using a TransactionDB/OptimisticTransactionDB, the default value will - /// be set to the value of 'max_write_buffer_number' if it is not explicitly - /// set by the user. Otherwise, the default is 0. + /// Default: `2` /// /// # Example /// @@ -634,7 +629,7 @@ impl Options { /// use rocksdb::Options; /// /// let mut opts = Options::default(); - /// opts.set_min_write_buffer_number(4); + /// opts.set_max_write_buffer_number(4); /// ``` pub fn set_max_write_buffer_number(&mut self, nbuf: c_int) { unsafe { diff --git a/src/lib.rs b/src/lib.rs index c943f6e..0cc760c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -286,3 +286,49 @@ pub struct ColumnFamily { } unsafe impl Send for ColumnFamily {} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn is_send() { + // test (at compile time) that certain types implement the auto-trait Send, either directly for + // pointer-wrapping types or transitively for types with all Send fields + + fn is_send() { + // dummy function just used for its parameterized type bound + } + + is_send::(); + is_send::>(); + is_send::>(); + is_send::(); + is_send::(); + is_send::(); + is_send::(); + is_send::(); + is_send::(); + is_send::(); + is_send::(); + } + + #[test] + fn is_sync() { + // test (at compile time) that certain types implement the auto-trait Sync + + fn is_sync() { + // dummy function just used for its parameterized type bound + } + + is_sync::(); + is_sync::(); + is_sync::(); + is_sync::(); + is_sync::(); + is_sync::(); + is_sync::(); + is_sync::(); + } + +} diff --git a/tests/test_db.rs b/tests/test_db.rs index 47dac85..a2da749 100644 --- a/tests/test_db.rs +++ b/tests/test_db.rs @@ -19,7 +19,9 @@ mod util; use libc::size_t; -use rocksdb::{DBVector, Error, IteratorMode, Options, WriteBatch, DB}; +use rocksdb::{DBVector, Error, IteratorMode, Options, Snapshot, WriteBatch, DB}; +use std::sync::Arc; +use std::{mem, thread}; use util::DBPath; #[test] @@ -163,6 +165,45 @@ fn snapshot_test() { } } +#[derive(Clone)] +struct SnapshotWrapper { + snapshot: Arc>, +} + +impl SnapshotWrapper { + fn new(db: &DB) -> Self { + Self { + snapshot: Arc::new(unsafe { mem::transmute(db.snapshot()) }), + } + } + + fn check(&self, key: K, value: &str) -> bool + where + K: AsRef<[u8]>, + { + self.snapshot.get(key).unwrap().unwrap().to_utf8().unwrap() == value + } +} + +#[test] +fn sync_snapshot_test() { + let path = DBPath::new("_rust_rocksdb_sync_snapshottest"); + let db = DB::open_default(&path).unwrap(); + + assert!(db.put(b"k1", b"v1").is_ok()); + assert!(db.put(b"k2", b"v2").is_ok()); + + let wrapper = SnapshotWrapper::new(&db); + let wrapper_1 = wrapper.clone(); + let handler_1 = thread::spawn(move || wrapper_1.check("k1", "v1")); + + let wrapper_2 = wrapper.clone(); + let handler_2 = thread::spawn(move || wrapper_2.check("k2", "v2")); + + assert!(handler_1.join().unwrap()); + assert!(handler_2.join().unwrap()); +} + #[test] fn set_option_test() { let path = DBPath::new("_rust_rocksdb_set_optionstest"); @@ -199,3 +240,14 @@ fn set_option_test() { db.set_options(&multiple_options).unwrap(); } } + +#[test] +fn test_sequence_number() { + let path = DBPath::new("_rust_rocksdb_test_sequence_number"); + { + let db = DB::open_default(&path).unwrap(); + assert_eq!(db.latest_sequence_number(), 0); + db.put(b"key", b"value"); + assert_eq!(db.latest_sequence_number(), 1); + } +}