Exposes more of RocksDB internal Status struct

pull/190/head
Tpt 3 years ago
parent 1e3fe90c5e
commit 18ec80c362
  1. 48
      Cargo.lock
  2. 129
      lib/src/storage/backend/rocksdb.rs
  3. 135
      rocksdb-sys/api/c.cc
  4. 120
      rocksdb-sys/api/c.h

48
Cargo.lock generated

@ -292,9 +292,9 @@ dependencies = [
[[package]] [[package]]
name = "crypto-common" name = "crypto-common"
version = "0.1.0" version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "567569e659735adb39ff2d4c20600f7cd78be5471f8c58ab162bce3c03fdbc5f" checksum = "683d6b536309245c849479fba3da410962a43ed8e51c26b729208ec0ac2798d0"
dependencies = [ dependencies = [
"generic-array", "generic-array",
] ]
@ -307,7 +307,7 @@ checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1"
dependencies = [ dependencies = [
"bstr", "bstr",
"csv-core", "csv-core",
"itoa", "itoa 0.4.8",
"ryu", "ryu",
"serde", "serde",
] ]
@ -333,9 +333,9 @@ dependencies = [
[[package]] [[package]]
name = "digest" name = "digest"
version = "0.10.0" version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8549e6bfdecd113b7e221fe60b433087f6957387a20f8118ebca9b12af19143d" checksum = "b697d66081d42af4fba142d56918a3cb21dc8eb63372c6b85d14f44fb9c5979b"
dependencies = [ dependencies = [
"block-buffer", "block-buffer",
"crypto-common", "crypto-common",
@ -524,6 +524,12 @@ version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
[[package]]
name = "itoa"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
[[package]] [[package]]
name = "jobserver" name = "jobserver"
version = "0.1.24" version = "0.1.24"
@ -582,9 +588,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.111" version = "0.2.112"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e167738f1866a7ec625567bae89ca0d44477232a4f7c52b1c7f2adc2c98804f" checksum = "1b03d17f364a3a042d5e5d46b053bbbf82c92c9430c592dd4c064dc6ee997125"
[[package]] [[package]]
name = "libloading" name = "libloading"
@ -706,9 +712,9 @@ dependencies = [
[[package]] [[package]]
name = "once_cell" name = "once_cell"
version = "1.8.0" version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56" checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5"
[[package]] [[package]]
name = "oorandom" name = "oorandom"
@ -984,9 +990,9 @@ checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.33" version = "1.0.34"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb37d2df5df740e582f28f8560cf425f52bb267d872fe58358eadb554909f07a" checksum = "2f84e92c0f7c9d58328b85a78557813e4bd845130db68d7184635344399423b1"
dependencies = [ dependencies = [
"unicode-xid", "unicode-xid",
] ]
@ -1272,9 +1278,9 @@ dependencies = [
[[package]] [[package]]
name = "ryu" name = "ryu"
version = "1.0.8" version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b30e4c09749c107e83dd61baf9604198efc4542863c88af39dafcaca89c7c9f9" checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f"
[[package]] [[package]]
name = "same-file" name = "same-file"
@ -1348,9 +1354,9 @@ checksum = "568a8e6258aa33c13358f81fd834adb854c6f7c9468520910a9b1e8fac068012"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.131" version = "1.0.132"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4ad69dfbd3e45369132cc64e6748c2d65cdfb001a2b1c232d128b4ad60561c1" checksum = "8b9875c23cf305cd1fd7eb77234cbb705f21ea6a72c637a5c6db5fe4b8e7f008"
[[package]] [[package]]
name = "serde_cbor" name = "serde_cbor"
@ -1364,9 +1370,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.131" version = "1.0.132"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b710a83c4e0dff6a3d511946b95274ad9ca9e5d3ae497b63fda866ac955358d2" checksum = "ecc0db5cb2556c0e558887d9bbdcf6ac4471e83ff66cf696e5419024d1606276"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -1375,11 +1381,11 @@ dependencies = [
[[package]] [[package]]
name = "serde_json" name = "serde_json"
version = "1.0.72" version = "1.0.73"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0ffa0837f2dfa6fb90868c2b5468cad482e175f7dad97e7421951e663f2b527" checksum = "bcbd0344bc6533bc7ec56df11d42fb70f1b912351c0825ccb7211b59d8af7cf5"
dependencies = [ dependencies = [
"itoa", "itoa 1.0.1",
"ryu", "ryu",
"serde", "serde",
] ]
@ -1561,7 +1567,7 @@ version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41effe7cfa8af36f439fac33861b66b049edc6f9a32331e2312660529c1c24ad" checksum = "41effe7cfa8af36f439fac33861b66b049edc6f9a32331e2312660529c1c24ad"
dependencies = [ dependencies = [
"itoa", "itoa 0.4.8",
"libc", "libc",
] ]

@ -13,6 +13,7 @@ use std::borrow::Borrow;
use std::collections::HashMap; use std::collections::HashMap;
use std::env::temp_dir; use std::env::temp_dir;
use std::ffi::{CStr, CString}; use std::ffi::{CStr, CString};
use std::fmt;
use std::fs::remove_dir_all; use std::fs::remove_dir_all;
use std::io::{Error, ErrorKind, Result}; use std::io::{Error, ErrorKind, Result};
use std::marker::PhantomData; use std::marker::PhantomData;
@ -35,13 +36,14 @@ macro_rules! ffi_result {
macro_rules! ffi_result_impl { macro_rules! ffi_result_impl {
( $($function:ident)::*( $($arg:expr,)*) ) => {{ ( $($function:ident)::*( $($arg:expr,)*) ) => {{
let mut err: *mut ::libc::c_char = ::std::ptr::null_mut(); let mut status = rocksdb_status_t {
let result = $($function)::*($($arg,)* &mut err); code: rocksdb_status_code_t_rocksdb_status_code_ok,
if err.is_null() { subcode: rocksdb_status_subcode_t_rocksdb_status_subcode_none,
Ok(result) severity: rocksdb_status_severity_t_rocksdb_status_severity_none,
} else { string: ptr::null()
Err(convert_error(err)) };
} let result = $($function)::*($($arg,)* &mut status);
convert_status(status).map(move |_| result)
}} }}
} }
@ -224,7 +226,7 @@ impl Db {
let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> = let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> =
vec![ptr::null_mut(); column_family_names.len()]; vec![ptr::null_mut(); column_family_names.len()];
let db = ffi_result!(rocksdb_transactiondb_open_column_families( let db = ffi_result!(rocksdb_transactiondb_open_column_families_with_status(
options, options,
transactiondb_options, transactiondb_options,
c_path.as_ptr(), c_path.as_ptr(),
@ -241,7 +243,8 @@ impl Db {
for handle in &cf_handles { for handle in &cf_handles {
if handle.is_null() { if handle.is_null() {
rocksdb_transactiondb_close(db); rocksdb_transactiondb_close(db);
return Err(other_error( return Err(Error::new(
ErrorKind::Other,
"Received null column family handle from RocksDB.", "Received null column family handle from RocksDB.",
)); ));
} }
@ -377,7 +380,7 @@ impl Db {
match result { match result {
Ok(result) => { Ok(result) => {
unsafe { unsafe {
ffi_result!(rocksdb_transaction_commit(transaction))?; ffi_result!(rocksdb_transaction_commit_with_status(transaction))?;
rocksdb_transaction_destroy(transaction); rocksdb_transaction_destroy(transaction);
rocksdb_readoptions_destroy(read_options); rocksdb_readoptions_destroy(read_options);
} }
@ -385,14 +388,21 @@ impl Db {
} }
Err(e) => { Err(e) => {
unsafe { unsafe {
ffi_result!(rocksdb_transaction_rollback(transaction))?; ffi_result!(rocksdb_transaction_rollback_with_status(transaction))?;
rocksdb_transaction_destroy(transaction); rocksdb_transaction_destroy(transaction);
rocksdb_readoptions_destroy(read_options); rocksdb_readoptions_destroy(read_options);
} }
let is_conflict_error = e.get_ref().map_or(false, |e| { let is_conflict_error = e.get_ref().map_or(false, |error| {
let msg = e.to_string(); let mut error: &dyn std::error::Error = error;
msg == "Resource busy: " // We use the narroest source: as a chance to be a RocksDB status
|| msg == "Operation timed out: Timeout waiting to lock key" while let Some(e) = error.source() {
error = e;
}
error.downcast_ref::<ErrorStatus>().map_or(false, |e| {
e.0.code == rocksdb_status_code_t_rocksdb_status_code_busy
|| e.0.code == rocksdb_status_code_t_rocksdb_status_code_timed_out
|| e.0.code == rocksdb_status_code_t_rocksdb_status_code_try_again
})
}); });
if is_conflict_error { if is_conflict_error {
// We give a chance to the OS to do something else before retrying in order to help avoiding an other conflict // We give a chance to the OS to do something else before retrying in order to help avoiding an other conflict
@ -408,7 +418,7 @@ impl Db {
pub fn flush(&self, column_family: &ColumnFamily) -> Result<()> { pub fn flush(&self, column_family: &ColumnFamily) -> Result<()> {
unsafe { unsafe {
ffi_result!(rocksdb_transactiondb_flush_cf( ffi_result!(rocksdb_transactiondb_flush_cf_with_status(
self.0.db, self.0.db,
self.0.flush_options, self.0.flush_options,
column_family.0, column_family.0,
@ -419,7 +429,7 @@ impl Db {
#[allow(clippy::unnecessary_wraps)] #[allow(clippy::unnecessary_wraps)]
pub fn compact(&self, column_family: &ColumnFamily) -> Result<()> { pub fn compact(&self, column_family: &ColumnFamily) -> Result<()> {
unsafe { unsafe {
ffi_result!(rocksdb_transactiondb_compact_range_cf_opt( ffi_result!(rocksdb_transactiondb_compact_range_cf_opt_with_status(
self.0.db, self.0.db,
column_family.0, column_family.0,
self.0.compaction_options, self.0.compaction_options,
@ -435,7 +445,7 @@ impl Db {
unsafe { unsafe {
let path = self.0.path.join(random::<u128>().to_string()); let path = self.0.path.join(random::<u128>().to_string());
let writer = rocksdb_sstfilewriter_create(self.0.env_options, self.0.options); let writer = rocksdb_sstfilewriter_create(self.0.env_options, self.0.options);
ffi_result!(rocksdb_sstfilewriter_open( ffi_result!(rocksdb_sstfilewriter_open_with_status(
writer, writer,
path_to_cstring(&path)?.as_ptr() path_to_cstring(&path)?.as_ptr()
))?; ))?;
@ -465,7 +475,7 @@ impl Db {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
unsafe { unsafe {
ffi_result!(rocksdb_transactiondb_ingest_external_files( ffi_result!(rocksdb_transactiondb_ingest_external_files_with_status(
self.0.db, self.0.db,
args.as_ptr(), args.as_ptr(),
args.len() args.len()
@ -523,16 +533,18 @@ impl Reader {
pub fn get(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<Option<PinnableSlice>> { pub fn get(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<Option<PinnableSlice>> {
unsafe { unsafe {
let slice = match &self.inner { let slice = match &self.inner {
InnerReader::Snapshot(inner) => ffi_result!(rocksdb_transactiondb_get_pinned_cf( InnerReader::Snapshot(inner) => {
ffi_result!(rocksdb_transactiondb_get_pinned_cf_with_status(
inner.db.db, inner.db.db,
self.options, self.options,
column_family.0, column_family.0,
key.as_ptr() as *const c_char, key.as_ptr() as *const c_char,
key.len() key.len()
)), ))
}
InnerReader::Transaction(inner) => { InnerReader::Transaction(inner) => {
if let Some(inner) = inner.upgrade() { if let Some(inner) = inner.upgrade() {
ffi_result!(rocksdb_transaction_get_pinned_cf( ffi_result!(rocksdb_transaction_get_pinned_cf_with_status(
*inner, *inner,
self.options, self.options,
column_family.0, column_family.0,
@ -659,7 +671,7 @@ impl Transaction<'_> {
key: &[u8], key: &[u8],
) -> Result<bool> { ) -> Result<bool> {
unsafe { unsafe {
let slice = ffi_result!(rocksdb_transaction_get_for_update_pinned_cf( let slice = ffi_result!(rocksdb_transaction_get_for_update_pinned_cf_with_status(
*self.transaction, *self.transaction,
self.read_options, self.read_options,
column_family.0, column_family.0,
@ -672,7 +684,7 @@ impl Transaction<'_> {
pub fn insert(&mut self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> { pub fn insert(&mut self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> {
unsafe { unsafe {
ffi_result!(rocksdb_transaction_put_cf( ffi_result!(rocksdb_transaction_put_cf_with_status(
*self.transaction, *self.transaction,
column_family.0, column_family.0,
key.as_ptr() as *const c_char, key.as_ptr() as *const c_char,
@ -689,7 +701,7 @@ impl Transaction<'_> {
pub fn remove(&mut self, column_family: &ColumnFamily, key: &[u8]) -> Result<()> { pub fn remove(&mut self, column_family: &ColumnFamily, key: &[u8]) -> Result<()> {
unsafe { unsafe {
ffi_result!(rocksdb_transaction_delete_cf( ffi_result!(rocksdb_transaction_delete_cf_with_status(
*self.transaction, *self.transaction,
column_family.0, column_family.0,
key.as_ptr() as *const c_char, key.as_ptr() as *const c_char,
@ -804,7 +816,7 @@ impl Iter {
} }
pub fn status(&self) -> Result<()> { pub fn status(&self) -> Result<()> {
unsafe { ffi_result!(rocksdb_iter_get_error(self.iter)) } unsafe { ffi_result!(rocksdb_iter_get_status(self.iter)) }
} }
pub fn next(&mut self) { pub fn next(&mut self) {
@ -843,7 +855,7 @@ impl Drop for SstFileWriter {
impl SstFileWriter { impl SstFileWriter {
pub fn insert(&mut self, key: &[u8], value: &[u8]) -> Result<()> { pub fn insert(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
unsafe { unsafe {
ffi_result!(rocksdb_sstfilewriter_put( ffi_result!(rocksdb_sstfilewriter_put_with_status(
self.writer, self.writer,
key.as_ptr() as *const c_char, key.as_ptr() as *const c_char,
key.len(), key.len(),
@ -859,25 +871,70 @@ impl SstFileWriter {
pub fn finish(self) -> Result<PathBuf> { pub fn finish(self) -> Result<PathBuf> {
unsafe { unsafe {
ffi_result!(rocksdb_sstfilewriter_finish(self.writer))?; ffi_result!(rocksdb_sstfilewriter_finish_with_status(self.writer))?;
} }
Ok(self.path.clone()) Ok(self.path.clone())
} }
} }
fn convert_error(ptr: *const c_char) -> Error { fn convert_status(status: rocksdb_status_t) -> Result<()> {
let message = unsafe { let kind = if status.code == rocksdb_status_code_t_rocksdb_status_code_ok {
let s = CStr::from_ptr(ptr).to_string_lossy().into_owned(); return Ok(()); // No error
free(ptr as *mut c_void); } else if status.code == rocksdb_status_code_t_rocksdb_status_code_not_supported {
s ErrorKind::Unsupported
} else if status.code == rocksdb_status_code_t_rocksdb_status_code_invalid_argument {
ErrorKind::InvalidInput
} else if status.code == rocksdb_status_code_t_rocksdb_status_code_timed_out {
ErrorKind::TimedOut
} else if status.code == rocksdb_status_code_t_rocksdb_status_code_busy {
ErrorKind::Other // TODO ErrorKind::ResourceBusy
} else if status.code == rocksdb_status_code_t_rocksdb_status_code_expired {
ErrorKind::TimedOut
} else if status.code == rocksdb_status_code_t_rocksdb_status_code_io_error
&& status.subcode == rocksdb_status_subcode_t_rocksdb_status_subcode_no_space
{
ErrorKind::Other // TODO ErrorKind::StorageFull
} else if status.code == rocksdb_status_code_t_rocksdb_status_code_aborted
&& status.subcode == rocksdb_status_subcode_t_rocksdb_status_subcode_memory_limit
{
ErrorKind::OutOfMemory
} else if (status.code == rocksdb_status_code_t_rocksdb_status_code_not_found
|| status.code == rocksdb_status_code_t_rocksdb_status_code_io_error)
&& status.subcode == rocksdb_status_subcode_t_rocksdb_status_subcode_path_not_found
{
ErrorKind::NotFound
} else {
ErrorKind::Other
}; };
other_error(message) Err(Error::new(kind, ErrorStatus(status)))
}
#[derive(Debug)]
struct ErrorStatus(rocksdb_status_t);
unsafe impl Send for ErrorStatus {}
unsafe impl Sync for ErrorStatus {}
impl Drop for ErrorStatus {
fn drop(&mut self) {
unsafe {
free(self.0.string as *mut c_void);
}
}
} }
fn other_error(error: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> Error { impl fmt::Display for ErrorStatus {
Error::new(ErrorKind::Other, error) fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(
unsafe { CStr::from_ptr(self.0.string) }
.to_str()
.map_err(|_| fmt::Error)?,
)
}
} }
impl std::error::Error for ErrorStatus {}
struct UnsafeEnv(*mut rocksdb_env_t); struct UnsafeEnv(*mut rocksdb_env_t);
// Hack for lazy_static. OK because only written in lazy static and used in a thread-safe way by RocksDB // Hack for lazy_static. OK because only written in lazy static and used in a thread-safe way by RocksDB

@ -1,61 +1,91 @@
#include "../rocksdb/db/c.cc" #include "../rocksdb/db/c.cc"
#include "c.h" #include "c.h"
static bool SaveStatus(rocksdb_status_t* target, const Status source) {
target->code = static_cast<rocksdb_status_code_t>(source.code());
target->subcode = static_cast<rocksdb_status_subcode_t>(source.subcode());
target->severity = static_cast<rocksdb_status_severity_t>(source.severity());
target->string = source.ok() ? nullptr : source.ToString().c_str();
return !source.ok();
}
extern "C" { extern "C" {
rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pinned_cf(
rocksdb_transactiondb_t* rocksdb_transactiondb_open_column_families_with_status(
const rocksdb_options_t* options,
const rocksdb_transactiondb_options_t* txn_db_options, const char* name,
int num_column_families, const char* const* column_family_names,
const rocksdb_options_t* const* column_family_options,
rocksdb_column_family_handle_t** column_family_handles, rocksdb_status_t* statusptr) {
std::vector<ColumnFamilyDescriptor> column_families;
for (int i = 0; i < num_column_families; i++) {
column_families.push_back(ColumnFamilyDescriptor(
std::string(column_family_names[i]),
ColumnFamilyOptions(column_family_options[i]->rep)));
}
TransactionDB* txn_db;
std::vector<ColumnFamilyHandle*> handles;
if (SaveStatus(statusptr, TransactionDB::Open(options->rep, txn_db_options->rep,
std::string(name), column_families,
&handles, &txn_db))) {
return nullptr;
}
for (size_t i = 0; i < handles.size(); i++) {
rocksdb_column_family_handle_t* c_handle =
new rocksdb_column_family_handle_t;
c_handle->rep = handles[i];
column_family_handles[i] = c_handle;
}
rocksdb_transactiondb_t* result = new rocksdb_transactiondb_t;
result->rep = txn_db;
return result;
}
rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pinned_cf_with_status(
rocksdb_transactiondb_t* db, const rocksdb_readoptions_t* options, rocksdb_transactiondb_t* db, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key, rocksdb_column_family_handle_t* column_family, const char* key,
size_t keylen, char** errptr) { size_t keylen, rocksdb_status_t* statusptr) {
rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t); rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t);
Status s = db->rep->Get(options->rep, column_family->rep, Slice(key, keylen), Status s = db->rep->Get(options->rep, column_family->rep, Slice(key, keylen),
&v->rep); &v->rep);
if (!s.ok()) { if (!s.ok()) {
delete v; delete v;
if (!s.IsNotFound()) { if (!s.IsNotFound()) {
SaveError(errptr, s); SaveStatus(statusptr, s);
} }
return nullptr; return nullptr;
} }
return v; return v;
} }
void rocksdb_transactiondb_flush_cf( void rocksdb_transactiondb_flush_cf_with_status(
rocksdb_transactiondb_t* db, rocksdb_transactiondb_t* db,
const rocksdb_flushoptions_t* options, const rocksdb_flushoptions_t* options,
rocksdb_column_family_handle_t* column_family, rocksdb_column_family_handle_t* column_family,
char** errptr) { rocksdb_status_t* statusptr) {
SaveError(errptr, db->rep->Flush(options->rep, column_family->rep)); SaveStatus(statusptr, db->rep->Flush(options->rep, column_family->rep));
} }
void rocksdb_transactiondb_compact_range_cf_opt(rocksdb_transactiondb_t* db, void rocksdb_transactiondb_compact_range_cf_opt_with_status(rocksdb_transactiondb_t* db,
rocksdb_column_family_handle_t* column_family, rocksdb_column_family_handle_t* column_family,
rocksdb_compactoptions_t* opt, rocksdb_compactoptions_t* opt,
const char* start_key, size_t start_key_len, const char* start_key, size_t start_key_len,
const char* limit_key, size_t limit_key_len, const char* limit_key, size_t limit_key_len,
char** errptr) { rocksdb_status_t* statusptr) {
Slice a, b; Slice a, b;
SaveError(errptr, db->rep->CompactRange( SaveStatus(statusptr, db->rep->CompactRange(
opt->rep, column_family->rep, opt->rep, column_family->rep,
// Pass nullptr Slice if corresponding "const char*" is nullptr // Pass nullptr Slice if corresponding "const char*" is nullptr
(start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr), (start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr),
(limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr))); (limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr)));
} }
void rocksdb_transactiondb_ingest_external_file_cf( void rocksdb_transactiondb_ingest_external_files_with_status(
rocksdb_transactiondb_t* db, rocksdb_column_family_handle_t* handle,
const char* const* file_list, const size_t list_len,
const rocksdb_ingestexternalfileoptions_t* opt, char** errptr) {
std::vector<std::string> files(list_len);
for (size_t i = 0; i < list_len; ++i) {
files[i] = std::string(file_list[i]);
}
SaveError(errptr, db->rep->IngestExternalFile(handle->rep, files, opt->rep));
}
void rocksdb_transactiondb_ingest_external_files(
rocksdb_transactiondb_t* db, const rocksdb_ingestexternalfilearg_t* list, rocksdb_transactiondb_t* db, const rocksdb_ingestexternalfilearg_t* list,
const size_t list_len, char** errptr) { const size_t list_len, rocksdb_status_t* statusptr) {
std::vector<rocksdb::IngestExternalFileArg> args(list_len); std::vector<rocksdb::IngestExternalFileArg> args(list_len);
for (size_t i = 0; i < list_len; ++i) { for (size_t i = 0; i < list_len; ++i) {
args[i].column_family = list[i].column_family->rep; args[i].column_family = list[i].column_family->rep;
@ -66,43 +96,90 @@ void rocksdb_transactiondb_ingest_external_files(
args[i].external_files = files; args[i].external_files = files;
args[i].options = list[i].options->rep; args[i].options = list[i].options->rep;
} }
SaveError(errptr, db->rep->IngestExternalFiles(args)); SaveStatus(statusptr, db->rep->IngestExternalFiles(args));
} }
rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned_cf(
void rocksdb_transaction_commit_with_status(rocksdb_transaction_t* txn, rocksdb_status_t* statusptr) {
SaveStatus(statusptr, txn->rep->Commit());
}
void rocksdb_transaction_rollback_with_status(rocksdb_transaction_t* txn, rocksdb_status_t* statusptr) {
SaveStatus(statusptr, txn->rep->Rollback());
}
rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned_cf_with_status(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options, rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key, rocksdb_column_family_handle_t* column_family, const char* key,
size_t keylen, char** errptr) { size_t keylen, rocksdb_status_t* statusptr) {
rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t); rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t);
Status s = txn->rep->Get(options->rep, column_family->rep, Slice(key, keylen), Status s = txn->rep->Get(options->rep, column_family->rep, Slice(key, keylen),
&v->rep); &v->rep);
if (!s.ok()) { if (!s.ok()) {
delete v; delete v;
if (!s.IsNotFound()) { if (!s.IsNotFound()) {
SaveError(errptr, s); SaveStatus(statusptr, s);
} }
return nullptr; return nullptr;
} }
return v; return v;
} }
rocksdb_pinnableslice_t* rocksdb_transaction_get_for_update_pinned_cf( rocksdb_pinnableslice_t* rocksdb_transaction_get_for_update_pinned_cf_with_status(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options, rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key, rocksdb_column_family_handle_t* column_family, const char* key,
size_t keylen, char** errptr) { size_t keylen, rocksdb_status_t* statusptr) {
rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t); rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t);
Status s = txn->rep->GetForUpdate(options->rep, column_family->rep, Slice(key, keylen), Status s = txn->rep->GetForUpdate(options->rep, column_family->rep, Slice(key, keylen),
&v->rep); &v->rep);
if (!s.ok()) { if (!s.ok()) {
delete v; delete v;
if (!s.IsNotFound()) { if (!s.IsNotFound()) {
SaveError(errptr, s); SaveStatus(statusptr, s);
} }
return nullptr; return nullptr;
} }
return v; return v;
} }
void rocksdb_transaction_put_cf_with_status(rocksdb_transaction_t* txn,
rocksdb_column_family_handle_t* column_family,
const char* key, size_t klen, const char* val,
size_t vlen, rocksdb_status_t* statusptr) {
SaveStatus(statusptr, txn->rep->Put(column_family->rep, Slice(key, klen),
Slice(val, vlen)));
}
void rocksdb_transaction_delete_cf_with_status(
rocksdb_transaction_t* txn, rocksdb_column_family_handle_t* column_family,
const char* key, size_t klen, rocksdb_status_t* statusptr) {
SaveStatus(statusptr, txn->rep->Delete(column_family->rep, Slice(key, klen)));
}
void rocksdb_sstfilewriter_open_with_status(rocksdb_sstfilewriter_t* writer,
const char* name, rocksdb_status_t* statusptr) {
SaveStatus(statusptr, writer->rep->Open(std::string(name)));
}
void rocksdb_sstfilewriter_put_with_status(rocksdb_sstfilewriter_t* writer, const char* key,
size_t keylen, const char* val, size_t vallen,
rocksdb_status_t* statusptr) {
SaveStatus(statusptr, writer->rep->Put(Slice(key, keylen), Slice(val, vallen)));
}
void rocksdb_sstfilewriter_finish_with_status(rocksdb_sstfilewriter_t* writer,
rocksdb_status_t* statusptr) {
SaveStatus(statusptr, writer->rep->Finish(nullptr));
}
void rocksdb_iter_get_status(const rocksdb_iterator_t* iter, rocksdb_status_t* statusptr) {
SaveStatus(statusptr, iter->rep->status());
}
rocksdb_readoptions_t* rocksdb_readoptions_create_copy(rocksdb_readoptions_t* options) { rocksdb_readoptions_t* rocksdb_readoptions_create_copy(rocksdb_readoptions_t* options) {
return new rocksdb_readoptions_t(*options); return new rocksdb_readoptions_t(*options);
} }

@ -6,6 +6,58 @@
extern "C" { extern "C" {
#endif #endif
typedef enum rocksdb_status_code_t {
rocksdb_status_code_ok = 0,
rocksdb_status_code_not_found = 1,
rocksdb_status_code_corruption = 2,
rocksdb_status_code_not_supported = 3,
rocksdb_status_code_invalid_argument = 4,
rocksdb_status_code_io_error = 5,
rocksdb_status_code_merge_in_progress = 6,
rocksdb_status_code_incomplete = 7,
rocksdb_status_code_shutdown_in_progress = 8,
rocksdb_status_code_timed_out = 9,
rocksdb_status_code_aborted = 10,
rocksdb_status_code_busy = 11,
rocksdb_status_code_expired = 12,
rocksdb_status_code_try_again = 13,
rocksdb_status_code_compaction_too_large = 14,
rocksdb_status_code_column_family_dropped = 15,
} rocksdb_status_code_t;
typedef enum rocksdb_status_subcode_t {
rocksdb_status_subcode_none = 0,
rocksdb_status_subcode_mutex_timeout = 1,
rocksdb_status_subcode_lock_timeout = 2,
rocksdb_status_subcode_lock_limit = 3,
rocksdb_status_subcode_no_space = 4,
rocksdb_status_subcode_deadlock = 5,
rocksdb_status_subcode_stale_file = 6,
rocksdb_status_subcode_memory_limit = 7,
rocksdb_status_subcode_space_limit = 8,
rocksdb_status_subcode_path_not_found = 9,
rocksdb_status_subcode_merge_operands_insufficient_capacity = 10,
rocksdb_status_subcode_manual_compaction_paused = 11,
rocksdb_status_subcode_overwritten = 12,
rocksdb_status_subcode_txn_not_prepared = 13,
rocksdb_status_subcode_io_fenced = 14,
} rocksdb_status_subcode_t;
typedef enum rocksdb_status_severity_t {
rocksdb_status_severity_none = 0,
rocksdb_status_severity_soft_error = 1,
rocksdb_status_severity_hard_error = 2,
rocksdb_status_severity_fatal_error = 3,
rocksdb_status_severity_unrecoverable_error = 4,
} rocksdb_status_severity_t;
typedef struct rocksdb_status_t {
rocksdb_status_code_t code;
rocksdb_status_subcode_t subcode;
rocksdb_status_severity_t severity;
const char* string;
} rocksdb_status_t;
typedef struct rocksdb_ingestexternalfilearg_t { typedef struct rocksdb_ingestexternalfilearg_t {
rocksdb_column_family_handle_t* column_family; rocksdb_column_family_handle_t* column_family;
char const* const* external_files; char const* const* external_files;
@ -13,38 +65,72 @@ typedef struct rocksdb_ingestexternalfilearg_t {
rocksdb_ingestexternalfileoptions_t* options; rocksdb_ingestexternalfileoptions_t* options;
} rocksdb_ingestexternalfilearg_t; } rocksdb_ingestexternalfilearg_t;
extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pinned_cf( extern ROCKSDB_LIBRARY_API rocksdb_transactiondb_t* rocksdb_transactiondb_open_column_families_with_status(
const rocksdb_options_t* options,
const rocksdb_transactiondb_options_t* txn_db_options, const char* name,
int num_column_families, const char* const* column_family_names,
const rocksdb_options_t* const* column_family_options,
rocksdb_column_family_handle_t** column_family_handles, rocksdb_status_t* statusptr);
extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pinned_cf_with_status(
rocksdb_transactiondb_t* db, const rocksdb_readoptions_t* options, rocksdb_transactiondb_t* db, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key, rocksdb_column_family_handle_t* column_family, const char* key,
size_t keylen, char** errptr); size_t keylen, rocksdb_status_t* statusptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush_cf( extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush_cf_with_status(
rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options, rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options,
rocksdb_column_family_handle_t* column_family, char** errptr); rocksdb_column_family_handle_t* column_family, rocksdb_status_t* statusptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_compact_range_cf_opt( extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_compact_range_cf_opt_with_status(
rocksdb_transactiondb_t* db, rocksdb_column_family_handle_t* column_family, rocksdb_transactiondb_t* db, rocksdb_column_family_handle_t* column_family,
rocksdb_compactoptions_t* opt, const char* start_key, size_t start_key_len, rocksdb_compactoptions_t* opt, const char* start_key, size_t start_key_len,
const char* limit_key, size_t limit_key_len, char** errptr); const char* limit_key, size_t limit_key_len, rocksdb_status_t* statusptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_ingest_external_file_cf(
rocksdb_transactiondb_t* db, rocksdb_column_family_handle_t* handle,
const char* const* file_list, const size_t list_len,
const rocksdb_ingestexternalfileoptions_t* opt, char** errptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_ingest_external_files( extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_ingest_external_files_with_status(
rocksdb_transactiondb_t* db, const rocksdb_ingestexternalfilearg_t* list, rocksdb_transactiondb_t* db, const rocksdb_ingestexternalfilearg_t* list,
const size_t list_len, char** errptr); const size_t list_len, rocksdb_status_t* statusptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_commit_with_status(
rocksdb_transaction_t* txn, rocksdb_status_t* statusptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_rollback_with_status(
rocksdb_transaction_t* txn, rocksdb_status_t* statusptr);
extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned_cf( extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned_cf_with_status(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options, rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key, rocksdb_column_family_handle_t* column_family, const char* key,
size_t keylen, char** errptr); size_t keylen, rocksdb_status_t* statusptr);
extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transaction_get_for_update_pinned_cf( extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transaction_get_for_update_pinned_cf_with_status(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options, rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key, rocksdb_column_family_handle_t* column_family, const char* key,
size_t keylen, char** errptr); size_t keylen, rocksdb_status_t* statusptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_put_cf_with_status(
rocksdb_transaction_t* txn, rocksdb_column_family_handle_t* column_family,
const char* key, size_t klen, const char* val, size_t vlen, rocksdb_status_t* statusptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_delete_cf_with_status(
rocksdb_transaction_t* txn, rocksdb_column_family_handle_t* column_family,
const char* key, size_t klen, rocksdb_status_t* statusptr);
extern ROCKSDB_LIBRARY_API void rocksdb_sstfilewriter_open_with_status(
rocksdb_sstfilewriter_t* writer, const char* name, rocksdb_status_t* statusptr);
extern ROCKSDB_LIBRARY_API void rocksdb_sstfilewriter_put_with_status(
rocksdb_sstfilewriter_t* writer, const char* key, size_t keylen,
const char* val, size_t vallen, rocksdb_status_t* statusptr);
extern ROCKSDB_LIBRARY_API void rocksdb_sstfilewriter_finish_with_status(
rocksdb_sstfilewriter_t* writer, rocksdb_status_t* statusptr);
extern ROCKSDB_LIBRARY_API void rocksdb_iter_get_status(
const rocksdb_iterator_t*, rocksdb_status_t* statusptr);
extern ROCKSDB_LIBRARY_API rocksdb_readoptions_t* rocksdb_readoptions_create_copy( extern ROCKSDB_LIBRARY_API rocksdb_readoptions_t* rocksdb_readoptions_create_copy(
rocksdb_readoptions_t*); rocksdb_readoptions_t*);

Loading…
Cancel
Save