Add in-transaction multi-get-for-update to the C interface (#11107)

Summary:
Hi, this is basically a part of https://github.com/facebook/rocksdb/pull/6488 that only adds `multi_get_for_update` functionality to C API (I'd like to call it from Rust), since `multi_get` was already added here https://github.com/facebook/rocksdb/pull/9252

https://github.com/facebook/rocksdb/pull/6488 has conflicts, so I guess it might be easier to get this one in

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11107

Reviewed By: pdillinger

Differential Revision: D42680764

Pulled By: ajkr

fbshipit-source-id: a50f96e1c7f3d470b4ab07e9ff5a283e5cf44865
oxigraph-8.3.2
karemta-orday 2 years ago committed by Facebook GitHub Bot
parent 9f8cdc8ad6
commit 40c2ec6d08
  1. 1
      HISTORY.md
  2. 61
      db/c.cc
  3. 141
      db/c_test.c
  4. 13
      include/rocksdb/c.h

@ -2,6 +2,7 @@
## Unreleased
### Public API Changes
* `SstFileWriter::DeleteRange()` now returns `Status::InvalidArgument` if the range's end key comes before its start key according to the user comparator. Previously the behavior was undefined.
* Add `multi_get_for_update` to C API.
## 8.1.0 (03/18/2023)
### Behavior changes

@ -5853,6 +5853,35 @@ void rocksdb_transaction_multi_get(rocksdb_transaction_t* txn,
}
}
void rocksdb_transaction_multi_get_for_update(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
size_t num_keys, const char* const* keys_list,
const size_t* keys_list_sizes, char** values_list,
size_t* values_list_sizes, char** errs) {
std::vector<Slice> keys(num_keys);
for (size_t i = 0; i < num_keys; i++) {
keys[i] = Slice(keys_list[i], keys_list_sizes[i]);
}
std::vector<std::string> values(num_keys);
std::vector<Status> statuses =
txn->rep->MultiGetForUpdate(options->rep, keys, &values);
for (size_t i = 0; i < num_keys; i++) {
if (statuses[i].ok()) {
values_list[i] = CopyString(values[i]);
values_list_sizes[i] = values[i].size();
errs[i] = nullptr;
} else {
values_list[i] = nullptr;
values_list_sizes[i] = 0;
if (!statuses[i].IsNotFound()) {
errs[i] = strdup(statuses[i].ToString().c_str());
} else {
errs[i] = nullptr;
}
}
}
}
void rocksdb_transaction_multi_get_cf(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
const rocksdb_column_family_handle_t* const* column_families,
@ -5885,6 +5914,38 @@ void rocksdb_transaction_multi_get_cf(
}
}
void rocksdb_transaction_multi_get_for_update_cf(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
const rocksdb_column_family_handle_t* const* column_families,
size_t num_keys, const char* const* keys_list,
const size_t* keys_list_sizes, char** values_list,
size_t* values_list_sizes, char** errs) {
std::vector<Slice> keys(num_keys);
std::vector<ColumnFamilyHandle*> cfs(num_keys);
for (size_t i = 0; i < num_keys; i++) {
keys[i] = Slice(keys_list[i], keys_list_sizes[i]);
cfs[i] = column_families[i]->rep;
}
std::vector<std::string> values(num_keys);
std::vector<Status> statuses =
txn->rep->MultiGetForUpdate(options->rep, cfs, keys, &values);
for (size_t i = 0; i < num_keys; i++) {
if (statuses[i].ok()) {
values_list[i] = CopyString(values[i]);
values_list_sizes[i] = values[i].size();
errs[i] = nullptr;
} else {
values_list[i] = nullptr;
values_list_sizes[i] = 0;
if (!statuses[i].IsNotFound()) {
errs[i] = strdup(statuses[i].ToString().c_str());
} else {
errs[i] = nullptr;
}
}
}
}
// Read a key outside a transaction
char* rocksdb_transactiondb_get(rocksdb_transactiondb_t* txn_db,
const rocksdb_readoptions_t* options,

@ -488,6 +488,19 @@ static void CheckTxnPinGetCF(rocksdb_transaction_t* txn,
rocksdb_pinnableslice_destroy(p);
}
static void CheckTxnGetForUpdate(rocksdb_transaction_t* txn,
const rocksdb_readoptions_t* options,
const char* key, const char* expected) {
char* err = NULL;
size_t val_len;
char* val;
val = rocksdb_transaction_get_for_update(txn, options, key, strlen(key),
&val_len, true, &err);
CheckNoError(err);
CheckEqual(expected, val, val_len);
Free(&val);
}
static void CheckTxnDBGet(rocksdb_transactiondb_t* txn_db,
const rocksdb_readoptions_t* options, const char* key,
const char* expected) {
@ -515,6 +528,20 @@ static void CheckTxnDBGetCF(rocksdb_transactiondb_t* txn_db,
Free(&val);
}
static void CheckTxnGetForUpdateCF(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key,
const char* expected) {
char* err = NULL;
size_t val_len;
char* val;
val = rocksdb_transaction_get_for_update_cf(
txn, options, column_family, key, strlen(key), &val_len, true, &err);
CheckNoError(err);
CheckEqual(expected, val, val_len);
Free(&val);
}
static void CheckTxnDBPinGet(rocksdb_transactiondb_t* txn_db,
const rocksdb_readoptions_t* options,
const char* key, const char* expected) {
@ -3204,6 +3231,120 @@ int main(int argc, char** argv) {
rocksdb_transactiondb_options_destroy(txn_db_options);
}
StartPhase("transactions_multi_get_for_update");
{
// open a TransactionDB
txn_db_options = rocksdb_transactiondb_options_create();
rocksdb_transactiondb_options_set_transaction_lock_timeout(txn_db_options,
0);
txn_options = rocksdb_transaction_options_create();
rocksdb_options_set_create_if_missing(options, 1);
txn_db = rocksdb_transactiondb_open(options, txn_db_options, dbname, &err);
CheckNoError(err);
rocksdb_transactiondb_put(txn_db, woptions, "foo", 3, "hey", 3, &err);
CheckNoError(err);
rocksdb_transactiondb_put(txn_db, woptions, "bar", 3, "hello", 5, &err);
CheckNoError(err);
// begin transactions
txn = rocksdb_transaction_begin(txn_db, woptions, txn_options, NULL);
rocksdb_transaction_t* txn2 =
rocksdb_transaction_begin(txn_db, woptions, txn_options, NULL);
// multi get
{
const char* keys[2] = {"foo", "bar"};
const size_t keys_sizes[2] = {3, 3};
char* vals[2];
size_t vals_sizes[2];
char* errs[2];
const char* expected[2] = {"hey", "hello"};
rocksdb_transaction_multi_get_for_update(
txn, roptions, 2, keys, keys_sizes, vals, vals_sizes, errs);
CheckMultiGetValues(2, vals, vals_sizes, errs, expected);
}
char* conflict_err = NULL;
size_t val_len;
rocksdb_transaction_get_for_update(txn2, roptions, "foo", 3, &val_len, true,
&conflict_err);
// get-for-update conflict
CheckCondition(conflict_err != NULL);
Free(&conflict_err);
// commit
rocksdb_transaction_commit(txn, &err);
CheckNoError(err);
// should work after first tx is commited
CheckTxnGetForUpdate(txn2, roptions, "foo", "hey");
// commit the second one
rocksdb_transaction_commit(txn2, &err);
CheckNoError(err);
// destroy txns
rocksdb_transaction_destroy(txn);
rocksdb_transaction_destroy(txn2);
// same for column families
rocksdb_column_family_handle_t* cfh;
cfh = rocksdb_transactiondb_create_column_family(txn_db, options,
"txn_db_cf", &err);
CheckNoError(err);
rocksdb_transactiondb_put_cf(txn_db, woptions, cfh, "cf_foo", 6, "cf_hello",
8, &err);
CheckNoError(err);
rocksdb_transactiondb_put_cf(txn_db, woptions, cfh, "cf_bar", 6, "cf_hey",
6, &err);
CheckNoError(err);
txn = rocksdb_transaction_begin(txn_db, woptions, txn_options, NULL);
txn2 = rocksdb_transaction_begin(txn_db, woptions, txn_options, NULL);
{
const rocksdb_column_family_handle_t* get_handles[2] = {cfh, cfh};
const char* keys[2] = {"cf_foo", "cf_bar"};
const size_t keys_sizes[2] = {6, 6};
char* vals[2];
size_t vals_sizes[2];
char* errs[2];
const char* expected[2] = {"cf_hello", "cf_hey"};
rocksdb_transaction_multi_get_for_update_cf(txn, roptions, get_handles, 2,
keys, keys_sizes, vals,
vals_sizes, errs);
CheckMultiGetValues(2, vals, vals_sizes, errs, expected);
}
char* conflict_err_cf = NULL;
size_t val_len_cf;
rocksdb_transaction_get_for_update_cf(txn2, roptions, cfh, "cf_foo", 6,
&val_len_cf, true, &conflict_err_cf);
CheckCondition(conflict_err_cf != NULL);
Free(&conflict_err_cf);
rocksdb_transaction_commit(txn, &err);
CheckNoError(err);
CheckTxnGetForUpdateCF(txn2, roptions, cfh, "cf_foo", "cf_hello");
rocksdb_transaction_commit(txn2, &err);
CheckNoError(err);
// close and destroy
rocksdb_column_family_handle_destroy(cfh);
rocksdb_transaction_destroy(txn);
rocksdb_transaction_destroy(txn2);
rocksdb_transactiondb_close(txn_db);
rocksdb_destroy_db(options, dbname, &err);
CheckNoError(err);
rocksdb_transaction_options_destroy(txn_options);
rocksdb_transactiondb_options_destroy(txn_db_options);
}
StartPhase("optimistic_transactions");
{
rocksdb_options_t* db_options = rocksdb_options_create();

@ -2540,6 +2540,12 @@ extern ROCKSDB_LIBRARY_API void rocksdb_transaction_multi_get(
const size_t* keys_list_sizes, char** values_list,
size_t* values_list_sizes, char** errs);
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_multi_get_for_update(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
size_t num_keys, const char* const* keys_list,
const size_t* keys_list_sizes, char** values_list,
size_t* values_list_sizes, char** errs);
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_multi_get_cf(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
const rocksdb_column_family_handle_t* const* column_families,
@ -2547,6 +2553,13 @@ extern ROCKSDB_LIBRARY_API void rocksdb_transaction_multi_get_cf(
const size_t* keys_list_sizes, char** values_list,
size_t* values_list_sizes, char** errs);
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_multi_get_for_update_cf(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
const rocksdb_column_family_handle_t* const* column_families,
size_t num_keys, const char* const* keys_list,
const size_t* keys_list_sizes, char** values_list,
size_t* values_list_sizes, char** errs);
extern ROCKSDB_LIBRARY_API char* rocksdb_transactiondb_get(
rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options,
const char* key, size_t klen, size_t* vlen, char** errptr);

Loading…
Cancel
Save