diff --git a/HISTORY.md b/HISTORY.md index 4e5f4f93e..055df7a59 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### Public API Changes * `SstFileWriter::DeleteRange()` now returns `Status::InvalidArgument` if the range's end key comes before its start key according to the user comparator. Previously the behavior was undefined. +* Add `multi_get_for_update` to C API. ## 8.1.0 (03/18/2023) ### Behavior changes diff --git a/db/c.cc b/db/c.cc index ed382d4e4..e4569b967 100644 --- a/db/c.cc +++ b/db/c.cc @@ -5853,6 +5853,35 @@ void rocksdb_transaction_multi_get(rocksdb_transaction_t* txn, } } +void rocksdb_transaction_multi_get_for_update( + rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options, + size_t num_keys, const char* const* keys_list, + const size_t* keys_list_sizes, char** values_list, + size_t* values_list_sizes, char** errs) { + std::vector keys(num_keys); + for (size_t i = 0; i < num_keys; i++) { + keys[i] = Slice(keys_list[i], keys_list_sizes[i]); + } + std::vector values(num_keys); + std::vector statuses = + txn->rep->MultiGetForUpdate(options->rep, keys, &values); + for (size_t i = 0; i < num_keys; i++) { + if (statuses[i].ok()) { + values_list[i] = CopyString(values[i]); + values_list_sizes[i] = values[i].size(); + errs[i] = nullptr; + } else { + values_list[i] = nullptr; + values_list_sizes[i] = 0; + if (!statuses[i].IsNotFound()) { + errs[i] = strdup(statuses[i].ToString().c_str()); + } else { + errs[i] = nullptr; + } + } + } +} + void rocksdb_transaction_multi_get_cf( rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options, const rocksdb_column_family_handle_t* const* column_families, @@ -5885,6 +5914,38 @@ void rocksdb_transaction_multi_get_cf( } } +void rocksdb_transaction_multi_get_for_update_cf( + rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options, + const rocksdb_column_family_handle_t* const* column_families, + size_t num_keys, const char* const* keys_list, + const size_t* keys_list_sizes, char** values_list, + size_t* values_list_sizes, char** errs) { + std::vector keys(num_keys); + std::vector cfs(num_keys); + for (size_t i = 0; i < num_keys; i++) { + keys[i] = Slice(keys_list[i], keys_list_sizes[i]); + cfs[i] = column_families[i]->rep; + } + std::vector values(num_keys); + std::vector statuses = + txn->rep->MultiGetForUpdate(options->rep, cfs, keys, &values); + for (size_t i = 0; i < num_keys; i++) { + if (statuses[i].ok()) { + values_list[i] = CopyString(values[i]); + values_list_sizes[i] = values[i].size(); + errs[i] = nullptr; + } else { + values_list[i] = nullptr; + values_list_sizes[i] = 0; + if (!statuses[i].IsNotFound()) { + errs[i] = strdup(statuses[i].ToString().c_str()); + } else { + errs[i] = nullptr; + } + } + } +} + // Read a key outside a transaction char* rocksdb_transactiondb_get(rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options, diff --git a/db/c_test.c b/db/c_test.c index 415f30d36..57877c06b 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -488,6 +488,19 @@ static void CheckTxnPinGetCF(rocksdb_transaction_t* txn, rocksdb_pinnableslice_destroy(p); } +static void CheckTxnGetForUpdate(rocksdb_transaction_t* txn, + const rocksdb_readoptions_t* options, + const char* key, const char* expected) { + char* err = NULL; + size_t val_len; + char* val; + val = rocksdb_transaction_get_for_update(txn, options, key, strlen(key), + &val_len, true, &err); + CheckNoError(err); + CheckEqual(expected, val, val_len); + Free(&val); +} + static void CheckTxnDBGet(rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options, const char* key, const char* expected) { @@ -515,6 +528,20 @@ static void CheckTxnDBGetCF(rocksdb_transactiondb_t* txn_db, Free(&val); } +static void CheckTxnGetForUpdateCF( + rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options, + rocksdb_column_family_handle_t* column_family, const char* key, + const char* expected) { + char* err = NULL; + size_t val_len; + char* val; + val = rocksdb_transaction_get_for_update_cf( + txn, options, column_family, key, strlen(key), &val_len, true, &err); + CheckNoError(err); + CheckEqual(expected, val, val_len); + Free(&val); +} + static void CheckTxnDBPinGet(rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options, const char* key, const char* expected) { @@ -3204,6 +3231,120 @@ int main(int argc, char** argv) { rocksdb_transactiondb_options_destroy(txn_db_options); } + StartPhase("transactions_multi_get_for_update"); + { + // open a TransactionDB + txn_db_options = rocksdb_transactiondb_options_create(); + rocksdb_transactiondb_options_set_transaction_lock_timeout(txn_db_options, + 0); + txn_options = rocksdb_transaction_options_create(); + rocksdb_options_set_create_if_missing(options, 1); + txn_db = rocksdb_transactiondb_open(options, txn_db_options, dbname, &err); + CheckNoError(err); + + rocksdb_transactiondb_put(txn_db, woptions, "foo", 3, "hey", 3, &err); + CheckNoError(err); + rocksdb_transactiondb_put(txn_db, woptions, "bar", 3, "hello", 5, &err); + CheckNoError(err); + + // begin transactions + txn = rocksdb_transaction_begin(txn_db, woptions, txn_options, NULL); + rocksdb_transaction_t* txn2 = + rocksdb_transaction_begin(txn_db, woptions, txn_options, NULL); + + // multi get + { + const char* keys[2] = {"foo", "bar"}; + const size_t keys_sizes[2] = {3, 3}; + char* vals[2]; + size_t vals_sizes[2]; + char* errs[2]; + const char* expected[2] = {"hey", "hello"}; + rocksdb_transaction_multi_get_for_update( + txn, roptions, 2, keys, keys_sizes, vals, vals_sizes, errs); + CheckMultiGetValues(2, vals, vals_sizes, errs, expected); + } + + char* conflict_err = NULL; + size_t val_len; + rocksdb_transaction_get_for_update(txn2, roptions, "foo", 3, &val_len, true, + &conflict_err); + // get-for-update conflict + CheckCondition(conflict_err != NULL); + Free(&conflict_err); + + // commit + rocksdb_transaction_commit(txn, &err); + CheckNoError(err); + + // should work after first tx is commited + CheckTxnGetForUpdate(txn2, roptions, "foo", "hey"); + + // commit the second one + rocksdb_transaction_commit(txn2, &err); + CheckNoError(err); + + // destroy txns + rocksdb_transaction_destroy(txn); + rocksdb_transaction_destroy(txn2); + + // same for column families + + rocksdb_column_family_handle_t* cfh; + cfh = rocksdb_transactiondb_create_column_family(txn_db, options, + "txn_db_cf", &err); + CheckNoError(err); + + rocksdb_transactiondb_put_cf(txn_db, woptions, cfh, "cf_foo", 6, "cf_hello", + 8, &err); + CheckNoError(err); + rocksdb_transactiondb_put_cf(txn_db, woptions, cfh, "cf_bar", 6, "cf_hey", + 6, &err); + CheckNoError(err); + + txn = rocksdb_transaction_begin(txn_db, woptions, txn_options, NULL); + txn2 = rocksdb_transaction_begin(txn_db, woptions, txn_options, NULL); + + { + const rocksdb_column_family_handle_t* get_handles[2] = {cfh, cfh}; + const char* keys[2] = {"cf_foo", "cf_bar"}; + const size_t keys_sizes[2] = {6, 6}; + char* vals[2]; + size_t vals_sizes[2]; + char* errs[2]; + const char* expected[2] = {"cf_hello", "cf_hey"}; + rocksdb_transaction_multi_get_for_update_cf(txn, roptions, get_handles, 2, + keys, keys_sizes, vals, + vals_sizes, errs); + CheckMultiGetValues(2, vals, vals_sizes, errs, expected); + } + + char* conflict_err_cf = NULL; + size_t val_len_cf; + rocksdb_transaction_get_for_update_cf(txn2, roptions, cfh, "cf_foo", 6, + &val_len_cf, true, &conflict_err_cf); + CheckCondition(conflict_err_cf != NULL); + Free(&conflict_err_cf); + + rocksdb_transaction_commit(txn, &err); + CheckNoError(err); + + CheckTxnGetForUpdateCF(txn2, roptions, cfh, "cf_foo", "cf_hello"); + + rocksdb_transaction_commit(txn2, &err); + CheckNoError(err); + + // close and destroy + rocksdb_column_family_handle_destroy(cfh); + rocksdb_transaction_destroy(txn); + rocksdb_transaction_destroy(txn2); + rocksdb_transactiondb_close(txn_db); + rocksdb_destroy_db(options, dbname, &err); + CheckNoError(err); + rocksdb_transaction_options_destroy(txn_options); + rocksdb_transactiondb_options_destroy(txn_db_options); + } + StartPhase("optimistic_transactions"); { rocksdb_options_t* db_options = rocksdb_options_create(); diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index e1835f8c5..7811f3ee4 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -2540,6 +2540,12 @@ extern ROCKSDB_LIBRARY_API void rocksdb_transaction_multi_get( const size_t* keys_list_sizes, char** values_list, size_t* values_list_sizes, char** errs); +extern ROCKSDB_LIBRARY_API void rocksdb_transaction_multi_get_for_update( + rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options, + size_t num_keys, const char* const* keys_list, + const size_t* keys_list_sizes, char** values_list, + size_t* values_list_sizes, char** errs); + extern ROCKSDB_LIBRARY_API void rocksdb_transaction_multi_get_cf( rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options, const rocksdb_column_family_handle_t* const* column_families, @@ -2547,6 +2553,13 @@ extern ROCKSDB_LIBRARY_API void rocksdb_transaction_multi_get_cf( const size_t* keys_list_sizes, char** values_list, size_t* values_list_sizes, char** errs); +extern ROCKSDB_LIBRARY_API void rocksdb_transaction_multi_get_for_update_cf( + rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options, + const rocksdb_column_family_handle_t* const* column_families, + size_t num_keys, const char* const* keys_list, + const size_t* keys_list_sizes, char** values_list, + size_t* values_list_sizes, char** errs); + extern ROCKSDB_LIBRARY_API char* rocksdb_transactiondb_get( rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options, const char* key, size_t klen, size_t* vlen, char** errptr);