Merge branch 'master' into mutable_cf_map

# Conflicts:
#	src/db.rs
#	src/lib.rs
master
Oleksandr Anyshchenko 5 years ago
commit 165178989f
  1. 11
      CHANGELOG.md
  2. 2
      Cargo.toml
  3. 29
      librocksdb-sys/build.rs
  4. 78
      src/db.rs
  5. 43
      src/db_options.rs
  6. 46
      src/lib.rs
  7. 54
      tests/test_db.rs

@ -1,5 +1,16 @@
# Changelog # Changelog
## Unreleased
### 0.12.3 (2019-07-19)
* Enabled sse4.2/pclmul for accelerated crc32c (yjh0502)
* Added `set_db_write_buffer_size` to the Options API (rnarubin)
* Bumped RocksDB to 6.1.2 (lispy)
* Added `Sync` and `Send` implementations to `Snapshot` (pavel-mukhanov)
* Added `raw_iterator_cf_opt` to the DB API (rnarubin)
* Added `DB::latest_sequence_number` method (vitvakatu)
## 0.12.2 (2019-05-03) ## 0.12.2 (2019-05-03)
### Changes ### Changes

@ -1,7 +1,7 @@
[package] [package]
name = "rocksdb" name = "rocksdb"
description = "Rust wrapper for Facebook's RocksDB embeddable database" description = "Rust wrapper for Facebook's RocksDB embeddable database"
version = "0.12.2" version = "0.12.3"
authors = ["Tyler Neely <t@jujit.su>", "David Greenberg <dsg123456789@gmail.com>"] authors = ["Tyler Neely <t@jujit.su>", "David Greenberg <dsg123456789@gmail.com>"]
license = "Apache-2.0" license = "Apache-2.0"
keywords = ["database", "embedded", "LSM-tree", "persistence"] keywords = ["database", "embedded", "LSM-tree", "persistence"]

@ -45,6 +45,8 @@ fn bindgen_rocksdb() {
} }
fn build_rocksdb() { fn build_rocksdb() {
let target = env::var("TARGET").unwrap();
let mut config = cc::Build::new(); let mut config = cc::Build::new();
config.include("rocksdb/include/"); config.include("rocksdb/include/");
config.include("rocksdb/"); config.include("rocksdb/");
@ -90,27 +92,40 @@ fn build_rocksdb() {
.filter(|file| *file != "util/build_version.cc") .filter(|file| *file != "util/build_version.cc")
.collect::<Vec<&'static str>>(); .collect::<Vec<&'static str>>();
if cfg!(target_os = "macos") { if target.contains("x86_64") {
// This is needed to enable hardware CRC32C. Technically, SSE 4.2 is
// only available since Intel Nehalem (about 2010) and AMD Bulldozer
// (about 2011).
config.define("HAVE_PCLMUL", Some("1"));
config.define("HAVE_SSE42", Some("1"));
config.flag("-msse2");
config.flag("-msse4.1");
config.flag("-msse4.2");
config.flag("-mpclmul");
}
if target.contains("darwin") {
config.define("OS_MACOSX", Some("1")); config.define("OS_MACOSX", Some("1"));
config.define("ROCKSDB_PLATFORM_POSIX", Some("1")); config.define("ROCKSDB_PLATFORM_POSIX", Some("1"));
config.define("ROCKSDB_LIB_IO_POSIX", Some("1")); config.define("ROCKSDB_LIB_IO_POSIX", Some("1"));
} }
if cfg!(target_os = "linux") { if target.contains("linux") {
config.define("OS_LINUX", Some("1")); config.define("OS_LINUX", Some("1"));
config.define("ROCKSDB_PLATFORM_POSIX", Some("1")); config.define("ROCKSDB_PLATFORM_POSIX", Some("1"));
config.define("ROCKSDB_LIB_IO_POSIX", Some("1")); config.define("ROCKSDB_LIB_IO_POSIX", Some("1"));
// COMMON_FLAGS="$COMMON_FLAGS -fno-builtin-memcmp" // COMMON_FLAGS="$COMMON_FLAGS -fno-builtin-memcmp"
} }
if cfg!(target_os = "freebsd") { if target.contains("freebsd") {
config.define("OS_FREEBSD", Some("1")); config.define("OS_FREEBSD", Some("1"));
config.define("ROCKSDB_PLATFORM_POSIX", Some("1")); config.define("ROCKSDB_PLATFORM_POSIX", Some("1"));
config.define("ROCKSDB_LIB_IO_POSIX", Some("1")); config.define("ROCKSDB_LIB_IO_POSIX", Some("1"));
} }
if cfg!(target_os = "windows") { if target.contains("windows") {
link("rpcrt4", false); link("rpcrt4", false);
link("shlwapi", false); link("shlwapi", false);
config.define("OS_WIN", Some("1")); config.define("OS_WIN", Some("1"));
config.define("ROCKSDB_WINDOWS_UTF8_FILENAMES", Some("1"));
// Remove POSIX-specific sources // Remove POSIX-specific sources
lib_sources = lib_sources lib_sources = lib_sources
@ -131,7 +146,7 @@ fn build_rocksdb() {
lib_sources.push("port/win/win_thread.cc"); lib_sources.push("port/win/win_thread.cc");
} }
if cfg!(target_env = "msvc") { if target.contains("msvc") {
config.flag("-EHsc"); config.flag("-EHsc");
} else { } else {
config.flag("-std=c++11"); config.flag("-std=c++11");
@ -152,13 +167,15 @@ fn build_rocksdb() {
} }
fn build_snappy() { fn build_snappy() {
let target = env::var("TARGET").unwrap();
let mut config = cc::Build::new(); let mut config = cc::Build::new();
config.include("snappy/"); config.include("snappy/");
config.include("."); config.include(".");
config.define("NDEBUG", Some("1")); config.define("NDEBUG", Some("1"));
if cfg!(target_env = "msvc") { if target.contains("msvc") {
config.flag("-EHsc"); config.flag("-EHsc");
} else { } else {
config.flag("-std=c++11"); config.flag("-std=c++11");

@ -103,6 +103,11 @@ pub struct Snapshot<'a> {
inner: *const ffi::rocksdb_snapshot_t, inner: *const ffi::rocksdb_snapshot_t,
} }
/// `Send` and `Sync` implementations for `Snapshot` are safe, because `Snapshot` is
/// immutable and can be safely shared between threads.
unsafe impl<'a> Send for Snapshot<'a> {}
unsafe impl<'a> Sync for Snapshot<'a> {}
/// An iterator over a database or column family, with specifiable /// An iterator over a database or column family, with specifiable
/// ranges and direction. /// ranges and direction.
/// ///
@ -193,8 +198,6 @@ pub struct DBIterator<'a> {
just_seeked: bool, just_seeked: bool,
} }
unsafe impl<'a> Send for DBIterator<'a> {}
pub enum Direction { pub enum Direction {
Forward, Forward,
Reverse, Reverse,
@ -231,11 +234,28 @@ impl<'a> DBRawIterator<'a> {
} }
} }
/// Returns true if the iterator is valid. /// Returns `true` if the iterator is valid. An iterator is invalidated when
/// it reaches the end of its defined range, or when it encounters an error.
///
/// To check whether the iterator encountered an error after `valid` has
/// returned `false`, use the [`status`](DBRawIterator::status) method. `status` will never
/// return an error when `valid` is `true`.
pub fn valid(&self) -> bool { pub fn valid(&self) -> bool {
unsafe { ffi::rocksdb_iter_valid(self.inner) != 0 } unsafe { ffi::rocksdb_iter_valid(self.inner) != 0 }
} }
/// Returns an error `Result` if the iterator has encountered an error
/// during operation. When an error is encountered, the iterator is
/// invalidated and [`valid`](DBRawIterator::valid) will return `false` when called.
///
/// Performing a seek will discard the current status.
pub fn status(&self) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_iter_get_error(self.inner,));
}
Ok(())
}
/// Seeks to the first key in the database. /// Seeks to the first key in the database.
/// ///
/// # Examples /// # Examples
@ -511,9 +531,15 @@ impl<'a> DBIterator<'a> {
self.just_seeked = true; self.just_seeked = true;
} }
/// See [`valid`](DBRawIterator::valid)
pub fn valid(&self) -> bool { pub fn valid(&self) -> bool {
self.raw.valid() self.raw.valid()
} }
/// See [`status`](DBRawIterator::status)
pub fn status(&self) -> Result<(), Error> {
self.raw.status()
}
} }
impl<'a> Iterator for DBIterator<'a> { impl<'a> Iterator for DBIterator<'a> {
@ -587,21 +613,25 @@ impl<'a> Snapshot<'a> {
DBIterator::new_cf(self.db, cf_handle, &readopts, mode) DBIterator::new_cf(self.db, cf_handle, &readopts, mode)
} }
/// Opens a raw iterator over the data in this snapshot, using the default read options.
pub fn raw_iterator(&self) -> DBRawIterator { pub fn raw_iterator(&self) -> DBRawIterator {
let readopts = ReadOptions::default(); let readopts = ReadOptions::default();
self.raw_iterator_opt(readopts) self.raw_iterator_opt(readopts)
} }
/// Opens a raw iterator over the data in this snapshot under the given column family, using the default read options.
pub fn raw_iterator_cf(&self, cf_handle: &ColumnFamily) -> Result<DBRawIterator, Error> { pub fn raw_iterator_cf(&self, cf_handle: &ColumnFamily) -> Result<DBRawIterator, Error> {
let readopts = ReadOptions::default(); let readopts = ReadOptions::default();
self.raw_iterator_cf_opt(cf_handle, readopts) self.raw_iterator_cf_opt(cf_handle, readopts)
} }
/// Opens a raw iterator over the data in this snapshot, using the given read options.
pub fn raw_iterator_opt(&self, mut readopts: ReadOptions) -> DBRawIterator { pub fn raw_iterator_opt(&self, mut readopts: ReadOptions) -> DBRawIterator {
readopts.set_snapshot(self); readopts.set_snapshot(self);
DBRawIterator::new(self.db, &readopts) DBRawIterator::new(self.db, &readopts)
} }
/// Opens a raw iterator over the data in this snapshot under the given column family, using the given read options.
pub fn raw_iterator_cf_opt( pub fn raw_iterator_cf_opt(
&self, &self,
cf_handle: &ColumnFamily, cf_handle: &ColumnFamily,
@ -1151,16 +1181,32 @@ impl DB {
) )
} }
/// Opens a raw iterator over the database, using the default read options
pub fn raw_iterator(&self) -> DBRawIterator { pub fn raw_iterator(&self) -> DBRawIterator {
let opts = ReadOptions::default(); let opts = ReadOptions::default();
DBRawIterator::new(self, &opts) DBRawIterator::new(self, &opts)
} }
/// Opens a raw iterator over the given column family, using the default read options
pub fn raw_iterator_cf(&self, cf_handle: &ColumnFamily) -> Result<DBRawIterator, Error> { pub fn raw_iterator_cf(&self, cf_handle: &ColumnFamily) -> Result<DBRawIterator, Error> {
let opts = ReadOptions::default(); let opts = ReadOptions::default();
DBRawIterator::new_cf(self, cf_handle, &opts) DBRawIterator::new_cf(self, cf_handle, &opts)
} }
/// Opens a raw iterator over the database, using the given read options
pub fn raw_iterator_opt(&self, readopts: &ReadOptions) -> DBRawIterator {
DBRawIterator::new(self, readopts)
}
/// Opens a raw iterator over the given column family, using the given read options
pub fn raw_iterator_cf_opt(
&self,
cf_handle: &ColumnFamily,
readopts: &ReadOptions,
) -> Result<DBRawIterator, Error> {
DBRawIterator::new_cf(self, cf_handle, readopts)
}
pub fn snapshot(&self) -> Snapshot { pub fn snapshot(&self) -> Snapshot {
Snapshot::new(self) Snapshot::new(self)
} }
@ -1522,6 +1568,11 @@ impl DB {
Err(e) => Err(e), Err(e) => Err(e),
} }
} }
/// The sequence number of the most recent transaction.
pub fn latest_sequence_number(&self) -> u64 {
unsafe { ffi::rocksdb_get_latest_sequence_number(self.inner) }
}
} }
impl WriteBatch { impl WriteBatch {
@ -1764,8 +1815,15 @@ impl ReadOptions {
} }
} }
/// Set the upper bound for an iterator, and the upper bound itself is not included on the iteration result. /// Set the upper bound for an iterator.
pub fn set_iterate_upper_bound<K: AsRef<[u8]>>(&mut self, key: K) { /// The upper bound itself is not included on the iteration result.
///
/// # Safety
///
/// This function will store a clone of key and will give a raw pointer of it to the
/// underlying C++ API, therefore, when given to any other [`DB`] method you must ensure
/// that this [`ReadOptions`] value does not leave the scope too early (e.g. `DB::iterator_cf_opt`).
pub unsafe fn set_iterate_upper_bound<K: AsRef<[u8]>>(&mut self, key: K) {
let key = key.as_ref(); let key = key.as_ref();
unsafe { unsafe {
ffi::rocksdb_readoptions_set_iterate_upper_bound( ffi::rocksdb_readoptions_set_iterate_upper_bound(
@ -1812,6 +1870,16 @@ impl Default for ReadOptions {
} }
} }
// Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI
// pointer. In most cases, however, this pointer is Send-safe because it is never aliased and
// rocksdb internally does not rely on thread-local information for its user-exposed types.
unsafe impl<'a> Send for DBRawIterator<'a> {}
unsafe impl Send for ReadOptions {}
// Sync is similarly safe for many types because they do not expose interior mutability, and their
// use within the rocksdb library is generally behind a const reference
unsafe impl Sync for ReadOptions {}
/// Vector of bytes stored in the database. /// Vector of bytes stored in the database.
/// ///
/// This is a `C` allocated byte array and a length value. /// This is a `C` allocated byte array and a length value.

@ -34,7 +34,17 @@ pub fn new_cache(capacity: size_t) -> *mut ffi::rocksdb_cache_t {
unsafe { ffi::rocksdb_cache_create_lru(capacity) } unsafe { ffi::rocksdb_cache_create_lru(capacity) }
} }
// Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI
// pointer. In most cases, however, this pointer is Send-safe because it is never aliased and
// rocksdb internally does not rely on thread-local information for its user-exposed types.
unsafe impl Send for Options {} unsafe impl Send for Options {}
unsafe impl Send for WriteOptions {}
unsafe impl Send for BlockBasedOptions {}
// Sync is similarly safe for many types because they do not expose interior mutability, and their
// use within the rocksdb library is generally behind a const reference
unsafe impl Sync for Options {}
unsafe impl Sync for WriteOptions {}
unsafe impl Sync for BlockBasedOptions {}
impl Drop for Options { impl Drop for Options {
fn drop(&mut self) { fn drop(&mut self) {
@ -603,30 +613,15 @@ impl Options {
} }
} }
/// Sets the total maximum number of write buffers to maintain in memory including /// Sets the maximum number of write buffers that are built up in memory.
/// copies of buffers that have already been flushed. Unlike /// The default and the minimum number is 2, so that when 1 write buffer
/// max_write_buffer_number, this parameter does not affect flushing. /// is being flushed to storage, new writes can continue to the other
/// This controls the minimum amount of write history that will be available /// write buffer.
/// in memory for conflict checking when Transactions are used. /// If max_write_buffer_number > 3, writing will be slowed down to
/// options.delayed_write_rate if we are writing to the last write buffer
/// allowed.
/// ///
/// When using an OptimisticTransactionDB: /// Default: `2`
/// If this value is too low, some transactions may fail at commit time due
/// to not being able to determine whether there were any write conflicts.
///
/// When using a TransactionDB:
/// If Transaction::SetSnapshot is used, TransactionDB will read either
/// in-memory write buffers or SST files to do write-conflict checking.
/// Increasing this value can reduce the number of reads to SST files
/// done for conflict detection.
///
/// Setting this value to `0` will cause write buffers to be freed immediately
/// after they are flushed.
/// If this value is set to `-1`, 'max_write_buffer_number' will be used.
///
/// Default:
/// If using a TransactionDB/OptimisticTransactionDB, the default value will
/// be set to the value of 'max_write_buffer_number' if it is not explicitly
/// set by the user. Otherwise, the default is 0.
/// ///
/// # Example /// # Example
/// ///
@ -634,7 +629,7 @@ impl Options {
/// use rocksdb::Options; /// use rocksdb::Options;
/// ///
/// let mut opts = Options::default(); /// let mut opts = Options::default();
/// opts.set_min_write_buffer_number(4); /// opts.set_max_write_buffer_number(4);
/// ``` /// ```
pub fn set_max_write_buffer_number(&mut self, nbuf: c_int) { pub fn set_max_write_buffer_number(&mut self, nbuf: c_int) {
unsafe { unsafe {

@ -286,3 +286,49 @@ pub struct ColumnFamily {
} }
unsafe impl Send for ColumnFamily {} unsafe impl Send for ColumnFamily {}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn is_send() {
// test (at compile time) that certain types implement the auto-trait Send, either directly for
// pointer-wrapping types or transitively for types with all Send fields
fn is_send<T: Send>() {
// dummy function just used for its parameterized type bound
}
is_send::<DB>();
is_send::<DBIterator<'_>>();
is_send::<DBRawIterator<'_>>();
is_send::<Snapshot>();
is_send::<Options>();
is_send::<ReadOptions>();
is_send::<WriteOptions>();
is_send::<BlockBasedOptions>();
is_send::<PlainTableFactoryOptions>();
is_send::<ColumnFamilyDescriptor>();
is_send::<ColumnFamily>();
}
#[test]
fn is_sync() {
// test (at compile time) that certain types implement the auto-trait Sync
fn is_sync<T: Sync>() {
// dummy function just used for its parameterized type bound
}
is_sync::<DB>();
is_sync::<Snapshot>();
is_sync::<Options>();
is_sync::<ReadOptions>();
is_sync::<WriteOptions>();
is_sync::<BlockBasedOptions>();
is_sync::<PlainTableFactoryOptions>();
is_sync::<ColumnFamilyDescriptor>();
}
}

@ -19,7 +19,9 @@ mod util;
use libc::size_t; use libc::size_t;
use rocksdb::{DBVector, Error, IteratorMode, Options, WriteBatch, DB}; use rocksdb::{DBVector, Error, IteratorMode, Options, Snapshot, WriteBatch, DB};
use std::sync::Arc;
use std::{mem, thread};
use util::DBPath; use util::DBPath;
#[test] #[test]
@ -163,6 +165,45 @@ fn snapshot_test() {
} }
} }
#[derive(Clone)]
struct SnapshotWrapper {
snapshot: Arc<Snapshot<'static>>,
}
impl SnapshotWrapper {
fn new(db: &DB) -> Self {
Self {
snapshot: Arc::new(unsafe { mem::transmute(db.snapshot()) }),
}
}
fn check<K>(&self, key: K, value: &str) -> bool
where
K: AsRef<[u8]>,
{
self.snapshot.get(key).unwrap().unwrap().to_utf8().unwrap() == value
}
}
#[test]
fn sync_snapshot_test() {
let path = DBPath::new("_rust_rocksdb_sync_snapshottest");
let db = DB::open_default(&path).unwrap();
assert!(db.put(b"k1", b"v1").is_ok());
assert!(db.put(b"k2", b"v2").is_ok());
let wrapper = SnapshotWrapper::new(&db);
let wrapper_1 = wrapper.clone();
let handler_1 = thread::spawn(move || wrapper_1.check("k1", "v1"));
let wrapper_2 = wrapper.clone();
let handler_2 = thread::spawn(move || wrapper_2.check("k2", "v2"));
assert!(handler_1.join().unwrap());
assert!(handler_2.join().unwrap());
}
#[test] #[test]
fn set_option_test() { fn set_option_test() {
let path = DBPath::new("_rust_rocksdb_set_optionstest"); let path = DBPath::new("_rust_rocksdb_set_optionstest");
@ -199,3 +240,14 @@ fn set_option_test() {
db.set_options(&multiple_options).unwrap(); db.set_options(&multiple_options).unwrap();
} }
} }
#[test]
fn test_sequence_number() {
let path = DBPath::new("_rust_rocksdb_test_sequence_number");
{
let db = DB::open_default(&path).unwrap();
assert_eq!(db.latest_sequence_number(), 0);
db.put(b"key", b"value");
assert_eq!(db.latest_sequence_number(), 1);
}
}

Loading…
Cancel
Save