From 10b1ef7f8540d065a9e603413a56017307079095 Mon Sep 17 00:00:00 2001 From: pavel-mukhanov Date: Tue, 25 Jun 2019 12:24:15 +0300 Subject: [PATCH 1/7] Implement Send and Sync for Snapshot. --- src/db.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/db.rs b/src/db.rs index ca8430c..c799b01 100644 --- a/src/db.rs +++ b/src/db.rs @@ -104,6 +104,9 @@ pub struct Snapshot<'a> { inner: *const ffi::rocksdb_snapshot_t, } +unsafe impl<'a> Send for Snapshot<'a> {} +unsafe impl<'a> Sync for Snapshot<'a> {} + /// An iterator over a database or column family, with specifiable /// ranges and direction. /// From e08fe8f6e066587b1d47f35d4787f9626385788e Mon Sep 17 00:00:00 2001 From: pavel-mukhanov Date: Wed, 26 Jun 2019 16:04:01 +0300 Subject: [PATCH 2/7] Add test for sync snapshot. --- tests/test_db.rs | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/tests/test_db.rs b/tests/test_db.rs index 47dac85..f0ec54c 100644 --- a/tests/test_db.rs +++ b/tests/test_db.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(thread_spawn_unchecked)] + extern crate libc; extern crate rocksdb; @@ -21,6 +23,7 @@ use libc::size_t; use rocksdb::{DBVector, Error, IteratorMode, Options, WriteBatch, DB}; use util::DBPath; +use std::thread; #[test] fn test_db_vector() { @@ -163,6 +166,36 @@ fn snapshot_test() { } } +#[test] +fn sync_snapshot_test() { + let path = DBPath::new("_rust_rocksdb_sync_snapshottest"); + { + let db = DB::open_default(&path).unwrap(); + + assert!(db.put(b"k1", b"v1").is_ok()); + assert!(db.put(b"k2", b"v2").is_ok()); + + let snapshot = db.snapshot(); + + // Unsafe here is safe, because `handler.join()` is called at the end of the + // method to ensure that snapshot will not outlive database. + let handler_1 = unsafe { + thread::Builder::new().spawn_unchecked(|| { + assert_eq!(snapshot.get(b"k1").unwrap().unwrap().to_utf8().unwrap(), "v1"); + }).unwrap() + }; + + let handler_2 = unsafe { + thread::Builder::new().spawn_unchecked(|| { + assert_eq!(snapshot.get(b"k2").unwrap().unwrap().to_utf8().unwrap(), "v2"); + }).unwrap() + }; + + handler_1.join().unwrap(); + handler_2.join().unwrap(); + } +} + #[test] fn set_option_test() { let path = DBPath::new("_rust_rocksdb_set_optionstest"); From c8d9f6e56f17d546355960e152e38337089fc0d2 Mon Sep 17 00:00:00 2001 From: pavel-mukhanov Date: Mon, 1 Jul 2019 13:51:12 +0300 Subject: [PATCH 3/7] Fix fmt. --- tests/test_db.rs | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/tests/test_db.rs b/tests/test_db.rs index f0ec54c..151513b 100644 --- a/tests/test_db.rs +++ b/tests/test_db.rs @@ -22,8 +22,8 @@ mod util; use libc::size_t; use rocksdb::{DBVector, Error, IteratorMode, Options, WriteBatch, DB}; -use util::DBPath; use std::thread; +use util::DBPath; #[test] fn test_db_vector() { @@ -180,15 +180,25 @@ fn sync_snapshot_test() { // Unsafe here is safe, because `handler.join()` is called at the end of the // method to ensure that snapshot will not outlive database. let handler_1 = unsafe { - thread::Builder::new().spawn_unchecked(|| { - assert_eq!(snapshot.get(b"k1").unwrap().unwrap().to_utf8().unwrap(), "v1"); - }).unwrap() + thread::Builder::new() + .spawn_unchecked(|| { + assert_eq!( + snapshot.get(b"k1").unwrap().unwrap().to_utf8().unwrap(), + "v1" + ); + }) + .unwrap() }; let handler_2 = unsafe { - thread::Builder::new().spawn_unchecked(|| { - assert_eq!(snapshot.get(b"k2").unwrap().unwrap().to_utf8().unwrap(), "v2"); - }).unwrap() + thread::Builder::new() + .spawn_unchecked(|| { + assert_eq!( + snapshot.get(b"k2").unwrap().unwrap().to_utf8().unwrap(), + "v2" + ); + }) + .unwrap() }; handler_1.join().unwrap(); From 7449fda48e7b82d1ec57a418073a3ba7aa558f37 Mon Sep 17 00:00:00 2001 From: pavel-mukhanov Date: Mon, 1 Jul 2019 14:29:54 +0300 Subject: [PATCH 4/7] Rewrite sync_snapshot_test without unchecked threads. --- tests/test_db.rs | 71 ++++++++++++++++++++++-------------------------- 1 file changed, 33 insertions(+), 38 deletions(-) diff --git a/tests/test_db.rs b/tests/test_db.rs index 151513b..d8403b3 100644 --- a/tests/test_db.rs +++ b/tests/test_db.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(thread_spawn_unchecked)] - extern crate libc; extern crate rocksdb; @@ -21,8 +19,8 @@ mod util; use libc::size_t; -use rocksdb::{DBVector, Error, IteratorMode, Options, WriteBatch, DB}; -use std::thread; +use rocksdb::{DBVector, Error, IteratorMode, Options, Snapshot, WriteBatch, DB}; +use std::{mem, thread}; use util::DBPath; #[test] @@ -166,44 +164,41 @@ fn snapshot_test() { } } +struct SnapshotWrapper { + snapshot: Snapshot<'static>, +} + +impl SnapshotWrapper { + fn new(db: &DB) -> Self { + Self { + snapshot: unsafe { mem::transmute(db.snapshot()) }, + } + } + + fn check(&self, key: K, value: &str) -> bool + where + K: AsRef<[u8]>, + { + self.snapshot.get(key).unwrap().unwrap().to_utf8().unwrap() == value + } +} + #[test] fn sync_snapshot_test() { let path = DBPath::new("_rust_rocksdb_sync_snapshottest"); - { - let db = DB::open_default(&path).unwrap(); + let db = DB::open_default(&path).unwrap(); - assert!(db.put(b"k1", b"v1").is_ok()); - assert!(db.put(b"k2", b"v2").is_ok()); - - let snapshot = db.snapshot(); - - // Unsafe here is safe, because `handler.join()` is called at the end of the - // method to ensure that snapshot will not outlive database. - let handler_1 = unsafe { - thread::Builder::new() - .spawn_unchecked(|| { - assert_eq!( - snapshot.get(b"k1").unwrap().unwrap().to_utf8().unwrap(), - "v1" - ); - }) - .unwrap() - }; - - let handler_2 = unsafe { - thread::Builder::new() - .spawn_unchecked(|| { - assert_eq!( - snapshot.get(b"k2").unwrap().unwrap().to_utf8().unwrap(), - "v2" - ); - }) - .unwrap() - }; - - handler_1.join().unwrap(); - handler_2.join().unwrap(); - } + assert!(db.put(b"k1", b"v1").is_ok()); + assert!(db.put(b"k2", b"v2").is_ok()); + + let wrapper = SnapshotWrapper::new(&db); + let handler_1 = thread::spawn(move || wrapper.check("k1", "v1")); + + let wrapper = SnapshotWrapper::new(&db); + let handler_2 = thread::spawn(move || wrapper.check("k2", "v2")); + + assert!(handler_1.join().unwrap()); + assert!(handler_2.join().unwrap()); } #[test] From bc63f2b057e7015e8e34444d8c86c68aa58d11d4 Mon Sep 17 00:00:00 2001 From: pavel-mukhanov Date: Mon, 1 Jul 2019 14:34:13 +0300 Subject: [PATCH 5/7] Clone snapshot in test instead of create new. --- tests/test_db.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tests/test_db.rs b/tests/test_db.rs index d8403b3..4240d96 100644 --- a/tests/test_db.rs +++ b/tests/test_db.rs @@ -20,6 +20,7 @@ mod util; use libc::size_t; use rocksdb::{DBVector, Error, IteratorMode, Options, Snapshot, WriteBatch, DB}; +use std::sync::Arc; use std::{mem, thread}; use util::DBPath; @@ -164,14 +165,15 @@ fn snapshot_test() { } } +#[derive(Clone)] struct SnapshotWrapper { - snapshot: Snapshot<'static>, + snapshot: Arc>, } impl SnapshotWrapper { fn new(db: &DB) -> Self { Self { - snapshot: unsafe { mem::transmute(db.snapshot()) }, + snapshot: Arc::new(unsafe { mem::transmute(db.snapshot()) }), } } @@ -192,10 +194,11 @@ fn sync_snapshot_test() { assert!(db.put(b"k2", b"v2").is_ok()); let wrapper = SnapshotWrapper::new(&db); - let handler_1 = thread::spawn(move || wrapper.check("k1", "v1")); + let wrapper_1 = wrapper.clone(); + let handler_1 = thread::spawn(move || wrapper_1.check("k1", "v1")); - let wrapper = SnapshotWrapper::new(&db); - let handler_2 = thread::spawn(move || wrapper.check("k2", "v2")); + let wrapper_2 = wrapper.clone(); + let handler_2 = thread::spawn(move || wrapper_2.check("k2", "v2")); assert!(handler_1.join().unwrap()); assert!(handler_2.join().unwrap()); From 892dea0fb7b755dd78d27ac863a823db6bf1d2be Mon Sep 17 00:00:00 2001 From: pavel-mukhanov Date: Tue, 9 Jul 2019 12:49:03 +0300 Subject: [PATCH 6/7] Add changelog and comment. --- CHANGELOG.md | 6 ++++++ src/db.rs | 2 ++ 2 files changed, 8 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 117b034..be588ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## 0.12.3 + +### Changes + +* Added `Sync` and `Send` implementations to `Snapshot` (pavel-mukhanov) + ## 0.12.2 (2019-05-03) ### Changes diff --git a/src/db.rs b/src/db.rs index c799b01..c59a871 100644 --- a/src/db.rs +++ b/src/db.rs @@ -104,6 +104,8 @@ pub struct Snapshot<'a> { inner: *const ffi::rocksdb_snapshot_t, } +/// `Send` and `Sync` implementations for `Snapshot` are safe, because `Snapshot` is +/// immutable and can be safely shared between threads. unsafe impl<'a> Send for Snapshot<'a> {} unsafe impl<'a> Sync for Snapshot<'a> {} From b5cca05e0193adb39085117b745a3f082c59979a Mon Sep 17 00:00:00 2001 From: pavel-mukhanov Date: Tue, 9 Jul 2019 13:14:59 +0300 Subject: [PATCH 7/7] Tiny changelog fix. --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index be588ba..91928dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## 0.12.3 +## Unreleased ### Changes