Improve transaction C-API (#9252)

Summary:
This PR wants to improve support for transaction in C-API:
* Support two-phase commit.
* Support `get_pinned` and `multi_get` in transaction.
* Add `rocksdb_transactiondb_flush`
* Support get writebatch from transaction and rebuild transaction from writebatch.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9252

Reviewed By: jay-zhuang

Differential Revision: D36459007

Pulled By: riversand963

fbshipit-source-id: 47371d527be821c496353a7fe2fd18d628069a98
main
Yiyuan Liu 3 years ago committed by Facebook GitHub Bot
parent 9901e7f681
commit b71466e982
  1. 3
      HISTORY.md
  2. 303
      db/c.cc
  3. 302
      db/c_test.c
  4. 100
      include/rocksdb/c.h

@ -2,6 +2,9 @@
## Unreleased
### Public API changes
* Add new API GetUnixTime in Snapshot class which returns the unix time at which Snapshot is taken.
* Add transaction `get_pinned` and `multi_get` to C API.
* Add two-phase commit support to C API.
* Add `rocksdb_transaction_get_writebatch_wi` and `rocksdb_transaction_rebuild_from_writebatch` to C API.
## 7.3.0 (05/20/2022)
### Bug Fixes

@ -4844,6 +4844,11 @@ void rocksdb_transaction_options_set_max_write_batch_size(
opt->rep.max_write_batch_size = size;
}
void rocksdb_transaction_options_set_skip_prepare(
rocksdb_transaction_options_t* opt, unsigned char v) {
opt->rep.skip_prepare = v;
}
rocksdb_optimistictransaction_options_t*
rocksdb_optimistictransaction_options_create() {
return new rocksdb_optimistictransaction_options_t;
@ -4987,6 +4992,62 @@ rocksdb_transaction_t* rocksdb_transaction_begin(
return old_txn;
}
rocksdb_transaction_t** rocksdb_transactiondb_get_prepared_transactions(
rocksdb_transactiondb_t* txn_db, size_t* cnt) {
std::vector<Transaction*> txns;
txn_db->rep->GetAllPreparedTransactions(&txns);
*cnt = txns.size();
if (txns.empty()) {
return nullptr;
} else {
rocksdb_transaction_t** buf = (rocksdb_transaction_t**)malloc(
txns.size() * sizeof(rocksdb_transaction_t*));
for (size_t i = 0; i < txns.size(); i++) {
buf[i] = new rocksdb_transaction_t;
buf[i]->rep = txns[i];
}
return buf;
}
}
void rocksdb_transaction_set_name(rocksdb_transaction_t* txn, const char* name,
size_t name_len, char** errptr) {
std::string str = std::string(name, name_len);
SaveError(errptr, txn->rep->SetName(str));
}
char* rocksdb_transaction_get_name(rocksdb_transaction_t* txn,
size_t* name_len) {
auto name = txn->rep->GetName();
*name_len = name.size();
return CopyString(name);
}
void rocksdb_transaction_prepare(rocksdb_transaction_t* txn, char** errptr) {
SaveError(errptr, txn->rep->Prepare());
}
rocksdb_writebatch_wi_t* rocksdb_transaction_get_writebatch_wi(
rocksdb_transaction_t* txn) {
rocksdb_writebatch_wi_t* wi =
(rocksdb_writebatch_wi_t*)malloc(sizeof(rocksdb_writebatch_wi_t));
wi->rep = txn->rep->GetWriteBatch();
return wi;
}
void rocksdb_transaction_rebuild_from_writebatch(
rocksdb_transaction_t* txn, rocksdb_writebatch_t* writebatch,
char** errptr) {
SaveError(errptr, txn->rep->RebuildFromWriteBatch(&writebatch->rep));
}
void rocksdb_transaction_rebuild_from_writebatch_wi(rocksdb_transaction_t* txn,
rocksdb_writebatch_wi_t* wi,
char** errptr) {
SaveError(errptr, txn->rep->RebuildFromWriteBatch(wi->rep->GetWriteBatch()));
}
void rocksdb_transaction_commit(rocksdb_transaction_t* txn, char** errptr) {
SaveError(errptr, txn->rep->Commit());
}
@ -5038,6 +5099,21 @@ char* rocksdb_transaction_get(rocksdb_transaction_t* txn,
return result;
}
rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
const char* key, size_t klen, char** errptr) {
rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t);
Status s = txn->rep->Get(options->rep, Slice(key, klen), &v->rep);
if (!s.ok()) {
delete (v);
if (!s.IsNotFound()) {
SaveError(errptr, s);
}
return nullptr;
}
return v;
}
char* rocksdb_transaction_get_cf(rocksdb_transaction_t* txn,
const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family,
@ -5059,6 +5135,23 @@ char* rocksdb_transaction_get_cf(rocksdb_transaction_t* txn,
return result;
}
rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned_cf(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key, size_t klen,
char** errptr) {
rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t);
Status s = txn->rep->Get(options->rep, column_family->rep, Slice(key, klen),
&v->rep);
if (!s.ok()) {
delete (v);
if (!s.IsNotFound()) {
SaveError(errptr, s);
}
return nullptr;
}
return v;
}
// Read a key inside a transaction
char* rocksdb_transaction_get_for_update(rocksdb_transaction_t* txn,
const rocksdb_readoptions_t* options,
@ -5081,6 +5174,23 @@ char* rocksdb_transaction_get_for_update(rocksdb_transaction_t* txn,
return result;
}
rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned_for_update(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
const char* key, size_t klen, unsigned char exclusive, char** errptr) {
rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t);
Status s = txn->rep->GetForUpdate(options->rep, Slice(key, klen),
v->rep.GetSelf(), exclusive);
v->rep.PinSelf();
if (!s.ok()) {
delete (v);
if (!s.IsNotFound()) {
SaveError(errptr, s);
}
return nullptr;
}
return v;
}
char* rocksdb_transaction_get_for_update_cf(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key, size_t klen,
@ -5101,6 +5211,86 @@ char* rocksdb_transaction_get_for_update_cf(
return result;
}
rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned_for_update_cf(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key, size_t klen,
unsigned char exclusive, char** errptr) {
rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t);
Status s = txn->rep->GetForUpdate(options->rep, column_family->rep,
Slice(key, klen), &v->rep, exclusive);
if (!s.ok()) {
delete (v);
if (!s.IsNotFound()) {
SaveError(errptr, s);
}
return nullptr;
}
return v;
}
void rocksdb_transaction_multi_get(rocksdb_transaction_t* txn,
const rocksdb_readoptions_t* options,
size_t num_keys,
const char* const* keys_list,
const size_t* keys_list_sizes,
char** values_list,
size_t* values_list_sizes, char** errs) {
std::vector<Slice> keys(num_keys);
for (size_t i = 0; i < num_keys; i++) {
keys[i] = Slice(keys_list[i], keys_list_sizes[i]);
}
std::vector<std::string> values(num_keys);
std::vector<Status> statuses =
txn->rep->MultiGet(options->rep, keys, &values);
for (size_t i = 0; i < num_keys; i++) {
if (statuses[i].ok()) {
values_list[i] = CopyString(values[i]);
values_list_sizes[i] = values[i].size();
errs[i] = nullptr;
} else {
values_list[i] = nullptr;
values_list_sizes[i] = 0;
if (!statuses[i].IsNotFound()) {
errs[i] = strdup(statuses[i].ToString().c_str());
} else {
errs[i] = nullptr;
}
}
}
}
void rocksdb_transaction_multi_get_cf(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
const rocksdb_column_family_handle_t* const* column_families,
size_t num_keys, const char* const* keys_list,
const size_t* keys_list_sizes, char** values_list,
size_t* values_list_sizes, char** errs) {
std::vector<Slice> keys(num_keys);
std::vector<ColumnFamilyHandle*> cfs(num_keys);
for (size_t i = 0; i < num_keys; i++) {
keys[i] = Slice(keys_list[i], keys_list_sizes[i]);
cfs[i] = column_families[i]->rep;
}
std::vector<std::string> values(num_keys);
std::vector<Status> statuses =
txn->rep->MultiGet(options->rep, cfs, keys, &values);
for (size_t i = 0; i < num_keys; i++) {
if (statuses[i].ok()) {
values_list[i] = CopyString(values[i]);
values_list_sizes[i] = values[i].size();
errs[i] = nullptr;
} else {
values_list[i] = nullptr;
values_list_sizes[i] = 0;
if (!statuses[i].IsNotFound()) {
errs[i] = strdup(statuses[i].ToString().c_str());
} else {
errs[i] = nullptr;
}
}
}
}
// Read a key outside a transaction
char* rocksdb_transactiondb_get(
rocksdb_transactiondb_t* txn_db,
@ -5123,6 +5313,22 @@ char* rocksdb_transactiondb_get(
return result;
}
rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pinned(
rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options,
const char* key, size_t klen, char** errptr) {
rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t);
Status s = txn_db->rep->Get(options->rep, txn_db->rep->DefaultColumnFamily(),
Slice(key, klen), &v->rep);
if (!s.ok()) {
delete (v);
if (!s.IsNotFound()) {
SaveError(errptr, s);
}
return nullptr;
}
return v;
}
char* rocksdb_transactiondb_get_cf(
rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key,
@ -5143,6 +5349,86 @@ char* rocksdb_transactiondb_get_cf(
return result;
}
rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pinned_cf(
rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key,
size_t keylen, char** errptr) {
rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t);
Status s = txn_db->rep->Get(options->rep, column_family->rep,
Slice(key, keylen), &v->rep);
if (!s.ok()) {
delete (v);
if (!s.IsNotFound()) {
SaveError(errptr, s);
}
return nullptr;
}
return v;
}
void rocksdb_transactiondb_multi_get(rocksdb_transactiondb_t* txn_db,
const rocksdb_readoptions_t* options,
size_t num_keys,
const char* const* keys_list,
const size_t* keys_list_sizes,
char** values_list,
size_t* values_list_sizes, char** errs) {
std::vector<Slice> keys(num_keys);
for (size_t i = 0; i < num_keys; i++) {
keys[i] = Slice(keys_list[i], keys_list_sizes[i]);
}
std::vector<std::string> values(num_keys);
std::vector<Status> statuses =
txn_db->rep->MultiGet(options->rep, keys, &values);
for (size_t i = 0; i < num_keys; i++) {
if (statuses[i].ok()) {
values_list[i] = CopyString(values[i]);
values_list_sizes[i] = values[i].size();
errs[i] = nullptr;
} else {
values_list[i] = nullptr;
values_list_sizes[i] = 0;
if (!statuses[i].IsNotFound()) {
errs[i] = strdup(statuses[i].ToString().c_str());
} else {
errs[i] = nullptr;
}
}
}
}
void rocksdb_transactiondb_multi_get_cf(
rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options,
const rocksdb_column_family_handle_t* const* column_families,
size_t num_keys, const char* const* keys_list,
const size_t* keys_list_sizes, char** values_list,
size_t* values_list_sizes, char** errs) {
std::vector<Slice> keys(num_keys);
std::vector<ColumnFamilyHandle*> cfs(num_keys);
for (size_t i = 0; i < num_keys; i++) {
keys[i] = Slice(keys_list[i], keys_list_sizes[i]);
cfs[i] = column_families[i]->rep;
}
std::vector<std::string> values(num_keys);
std::vector<Status> statuses =
txn_db->rep->MultiGet(options->rep, cfs, keys, &values);
for (size_t i = 0; i < num_keys; i++) {
if (statuses[i].ok()) {
values_list[i] = CopyString(values[i]);
values_list_sizes[i] = values[i].size();
errs[i] = nullptr;
} else {
values_list[i] = nullptr;
values_list_sizes[i] = 0;
if (!statuses[i].IsNotFound()) {
errs[i] = strdup(statuses[i].ToString().c_str());
} else {
errs[i] = nullptr;
}
}
}
}
// Put a key inside a transaction
void rocksdb_transaction_put(rocksdb_transaction_t* txn, const char* key,
size_t klen, const char* val, size_t vlen,
@ -5283,6 +5569,23 @@ void rocksdb_transactiondb_close(rocksdb_transactiondb_t* txn_db) {
delete txn_db;
}
void rocksdb_transactiondb_flush_wal(rocksdb_transactiondb_t* txn_db,
unsigned char sync, char** errptr) {
SaveError(errptr, txn_db->rep->FlushWAL(sync));
}
void rocksdb_transactiondb_flush(rocksdb_transactiondb_t* txn_db,
const rocksdb_flushoptions_t* options,
char** errptr) {
SaveError(errptr, txn_db->rep->Flush(options->rep));
}
void rocksdb_transactiondb_flush_cf(
rocksdb_transactiondb_t* txn_db, const rocksdb_flushoptions_t* options,
rocksdb_column_family_handle_t* column_family, char** errptr) {
SaveError(errptr, txn_db->rep->Flush(options->rep, column_family->rep));
}
rocksdb_checkpoint_t* rocksdb_transactiondb_checkpoint_object_create(
rocksdb_transactiondb_t* txn_db, char** errptr) {
Checkpoint* checkpoint;

@ -169,6 +169,16 @@ static void CheckPinGetCF(rocksdb_t* db, const rocksdb_readoptions_t* options,
rocksdb_pinnableslice_destroy(p);
}
static void CheckMultiGetValues(size_t num_keys, char** values,
size_t* values_sizes, char** errs,
const char** expected) {
for (size_t i = 0; i < num_keys; i++) {
CheckNoError(errs[i]);
CheckEqual(expected[i], values[i], values_sizes[i]);
Free(&values[i]);
}
}
static void CheckIter(rocksdb_iterator_t* iter,
const char* key, const char* val) {
size_t len;
@ -366,18 +376,49 @@ static void CheckTxnGetCF(rocksdb_transaction_t* txn,
Free(&val);
}
static void CheckTxnPinGet(rocksdb_transaction_t* txn,
const rocksdb_readoptions_t* options,
const char* key, const char* expected) {
rocksdb_pinnableslice_t* p = NULL;
const char* val = NULL;
char* err = NULL;
size_t val_len;
p = rocksdb_transaction_get_pinned(txn, options, key, strlen(key), &err);
CheckNoError(err);
val = rocksdb_pinnableslice_value(p, &val_len);
CheckEqual(expected, val, val_len);
rocksdb_pinnableslice_destroy(p);
}
static void CheckTxnPinGetCF(rocksdb_transaction_t* txn,
const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family,
const char* key, const char* expected) {
rocksdb_pinnableslice_t* p = NULL;
const char* val = NULL;
char* err = NULL;
size_t val_len;
p = rocksdb_transaction_get_pinned_cf(txn, options, column_family, key,
strlen(key), &err);
CheckNoError(err);
val = rocksdb_pinnableslice_value(p, &val_len);
CheckEqual(expected, val, val_len);
rocksdb_pinnableslice_destroy(p);
}
static void CheckTxnDBGet(
rocksdb_transactiondb_t* txn_db,
const rocksdb_readoptions_t* options,
const char* key,
const char* expected) {
char* err = NULL;
size_t val_len;
char* val;
val = rocksdb_transactiondb_get(txn_db, options, key, strlen(key), &val_len, &err);
CheckNoError(err);
CheckEqual(expected, val, val_len);
Free(&val);
char* err = NULL;
size_t val_len;
char* val;
val = rocksdb_transactiondb_get(txn_db, options, key, strlen(key), &val_len,
&err);
CheckNoError(err);
CheckEqual(expected, val, val_len);
Free(&val);
}
static void CheckTxnDBGetCF(rocksdb_transactiondb_t* txn_db,
@ -394,6 +435,36 @@ static void CheckTxnDBGetCF(rocksdb_transactiondb_t* txn_db,
Free(&val);
}
static void CheckTxnDBPinGet(rocksdb_transactiondb_t* txn_db,
const rocksdb_readoptions_t* options,
const char* key, const char* expected) {
rocksdb_pinnableslice_t* p = NULL;
const char* val = NULL;
char* err = NULL;
size_t val_len;
p = rocksdb_transactiondb_get_pinned(txn_db, options, key, strlen(key), &err);
CheckNoError(err);
val = rocksdb_pinnableslice_value(p, &val_len);
CheckEqual(expected, val, val_len);
rocksdb_pinnableslice_destroy(p);
}
static void CheckTxnDBPinGetCF(rocksdb_transactiondb_t* txn_db,
const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family,
const char* key, const char* expected) {
rocksdb_pinnableslice_t* p = NULL;
const char* val = NULL;
char* err = NULL;
size_t val_len;
p = rocksdb_transactiondb_get_pinned_cf(txn_db, options, column_family, key,
strlen(key), &err);
CheckNoError(err);
val = rocksdb_pinnableslice_value(p, &val_len);
CheckEqual(expected, val, val_len);
rocksdb_pinnableslice_destroy(p);
}
int main(int argc, char** argv) {
(void)argc;
(void)argv;
@ -914,24 +985,9 @@ int main(int argc, char** argv) {
char* vals[3];
size_t vals_sizes[3];
char* errs[3];
const char* expected[3] = {"c", "hello", NULL};
rocksdb_multi_get(db, roptions, 3, keys, keys_sizes, vals, vals_sizes, errs);
int i;
for (i = 0; i < 3; i++) {
CheckEqual(NULL, errs[i], 0);
switch (i) {
case 0:
CheckEqual("c", vals[i], vals_sizes[i]);
break;
case 1:
CheckEqual("hello", vals[i], vals_sizes[i]);
break;
case 2:
CheckEqual(NULL, vals[i], vals_sizes[i]);
break;
}
Free(&vals[i]);
}
CheckMultiGetValues(3, vals, vals_sizes, errs, expected);
}
StartPhase("pin_get");
@ -2623,11 +2679,13 @@ int main(int argc, char** argv) {
rocksdb_transactiondb_put(txn_db, woptions, "foo", 3, "hello", 5, &err);
CheckNoError(err);
CheckTxnDBGet(txn_db, roptions, "foo", "hello");
CheckTxnDBPinGet(txn_db, roptions, "foo", "hello");
// delete from outside transaction
rocksdb_transactiondb_delete(txn_db, woptions, "foo", 3, &err);
CheckNoError(err);
CheckTxnDBGet(txn_db, roptions, "foo", NULL);
CheckTxnDBPinGet(txn_db, roptions, "foo", NULL);
// write batch into TransactionDB
rocksdb_writebatch_t* wb = rocksdb_writebatch_create();
@ -2639,24 +2697,63 @@ int main(int argc, char** argv) {
rocksdb_transactiondb_write(txn_db, woptions, wb, &err);
rocksdb_writebatch_destroy(wb);
CheckTxnDBGet(txn_db, roptions, "box", "c");
CheckTxnDBPinGet(txn_db, roptions, "box", "c");
CheckNoError(err);
// multi get
{
const char* keys[3] = {"box", "foo", "notfound"};
const size_t keys_sizes[3] = {3, 3, 8};
char* vals[3];
size_t vals_sizes[3];
char* errs[3];
const char* expected[3] = {"c", NULL, NULL};
rocksdb_transactiondb_multi_get(txn_db, roptions, 3, keys, keys_sizes,
vals, vals_sizes, errs);
CheckMultiGetValues(3, vals, vals_sizes, errs, expected);
}
// begin a transaction
txn = rocksdb_transaction_begin(txn_db, woptions, txn_options, NULL);
// put
rocksdb_transaction_put(txn, "foo", 3, "hello", 5, &err);
CheckNoError(err);
CheckTxnGet(txn, roptions, "foo", "hello");
CheckTxnPinGet(txn, roptions, "foo", "hello");
{
const char* keys[3] = {"box", "foo", "notfound"};
const size_t keys_sizes[3] = {3, 3, 8};
char* vals[3];
size_t vals_sizes[3];
char* errs[3];
const char* expected[3] = {"c", "hello", NULL};
rocksdb_transaction_multi_get(txn, roptions, 3, keys, keys_sizes, vals,
vals_sizes, errs);
CheckMultiGetValues(3, vals, vals_sizes, errs, expected);
}
// delete
rocksdb_transaction_delete(txn, "foo", 3, &err);
CheckNoError(err);
CheckTxnGet(txn, roptions, "foo", NULL);
CheckTxnPinGet(txn, roptions, "foo", NULL);
rocksdb_transaction_put(txn, "foo", 3, "hello", 5, &err);
CheckNoError(err);
// read from outside transaction, before commit
CheckTxnDBGet(txn_db, roptions, "foo", NULL);
CheckTxnDBPinGet(txn_db, roptions, "foo", NULL);
{
const char* keys[3] = {"box", "foo", "notfound"};
const size_t keys_sizes[3] = {3, 3, 8};
char* vals[3];
size_t vals_sizes[3];
char* errs[3];
const char* expected[3] = {"c", NULL, NULL};
rocksdb_transactiondb_multi_get(txn_db, roptions, 3, keys, keys_sizes,
vals, vals_sizes, errs);
CheckMultiGetValues(3, vals, vals_sizes, errs, expected);
}
// commit
rocksdb_transaction_commit(txn, &err);
@ -2664,6 +2761,18 @@ int main(int argc, char** argv) {
// read from outside transaction, after commit
CheckTxnDBGet(txn_db, roptions, "foo", "hello");
CheckTxnDBPinGet(txn_db, roptions, "foo", "hello");
{
const char* keys[3] = {"box", "foo", "notfound"};
const size_t keys_sizes[3] = {3, 3, 8};
char* vals[3];
size_t vals_sizes[3];
char* errs[3];
const char* expected[3] = {"c", "hello", NULL};
rocksdb_transactiondb_multi_get(txn_db, roptions, 3, keys, keys_sizes,
vals, vals_sizes, errs);
CheckMultiGetValues(3, vals, vals_sizes, errs, expected);
}
// reuse old transaction
txn = rocksdb_transaction_begin(txn_db, woptions, txn_options, txn);
@ -2677,9 +2786,11 @@ int main(int argc, char** argv) {
CheckNoError(err);
CheckTxnDBGet(txn_db, roptions, "foo", "hello");
CheckTxnDBPinGet(txn_db, roptions, "foo", "hello");
rocksdb_readoptions_set_snapshot(roptions, NULL);
rocksdb_transactiondb_release_snapshot(txn_db, snapshot);
CheckTxnDBGet(txn_db, roptions, "foo", "hey");
CheckTxnDBPinGet(txn_db, roptions, "foo", "hey");
// iterate
rocksdb_transaction_put(txn, "bar", 3, "hi", 2, &err);
@ -2696,25 +2807,34 @@ int main(int argc, char** argv) {
rocksdb_transaction_rollback(txn, &err);
CheckNoError(err);
CheckTxnDBGet(txn_db, roptions, "bar", NULL);
CheckTxnDBPinGet(txn_db, roptions, "bar", NULL);
// save point
rocksdb_transaction_put(txn, "foo1", 4, "hi1", 3, &err);
rocksdb_transaction_set_savepoint(txn);
CheckTxnGet(txn, roptions, "foo1", "hi1");
CheckTxnPinGet(txn, roptions, "foo1", "hi1");
rocksdb_transaction_put(txn, "foo2", 4, "hi2", 3, &err);
CheckTxnGet(txn, roptions, "foo2", "hi2");
CheckTxnPinGet(txn, roptions, "foo2", "hi2");
// rollback to savepoint
rocksdb_transaction_rollback_to_savepoint(txn, &err);
CheckNoError(err);
CheckTxnGet(txn, roptions, "foo2", NULL);
CheckTxnGet(txn, roptions, "foo1", "hi1");
CheckTxnPinGet(txn, roptions, "foo2", NULL);
CheckTxnPinGet(txn, roptions, "foo1", "hi1");
CheckTxnDBGet(txn_db, roptions, "foo1", NULL);
CheckTxnDBGet(txn_db, roptions, "foo2", NULL);
CheckTxnDBPinGet(txn_db, roptions, "foo1", NULL);
CheckTxnDBPinGet(txn_db, roptions, "foo2", NULL);
rocksdb_transaction_commit(txn, &err);
CheckNoError(err);
CheckTxnDBGet(txn_db, roptions, "foo1", "hi1");
CheckTxnDBGet(txn_db, roptions, "foo2", NULL);
CheckTxnDBPinGet(txn_db, roptions, "foo1", "hi1");
CheckTxnDBPinGet(txn_db, roptions, "foo2", NULL);
// Column families.
rocksdb_column_family_handle_t* cfh;
@ -2726,15 +2846,130 @@ int main(int argc, char** argv) {
8, &err);
CheckNoError(err);
CheckTxnDBGetCF(txn_db, roptions, cfh, "cf_foo", "cf_hello");
CheckTxnDBPinGetCF(txn_db, roptions, cfh, "cf_foo", "cf_hello");
{
const rocksdb_column_family_handle_t* get_handles[2] = {cfh, cfh};
const char* keys[2] = {"cf_foo", "notfound"};
const size_t keys_sizes[2] = {6, 8};
char* vals[2];
size_t vals_sizes[2];
char* errs[2];
const char* expected[2] = {"cf_hello", NULL};
rocksdb_transactiondb_multi_get_cf(txn_db, roptions, get_handles, 2, keys,
keys_sizes, vals, vals_sizes, errs);
CheckMultiGetValues(2, vals, vals_sizes, errs, expected);
}
rocksdb_transactiondb_delete_cf(txn_db, woptions, cfh, "cf_foo", 6, &err);
CheckNoError(err);
CheckTxnDBGetCF(txn_db, roptions, cfh, "cf_foo", NULL);
CheckTxnDBPinGetCF(txn_db, roptions, cfh, "cf_foo", NULL);
rocksdb_column_family_handle_destroy(cfh);
// flush
rocksdb_flushoptions_t* flush_options = rocksdb_flushoptions_create();
rocksdb_flushoptions_set_wait(flush_options, 1);
rocksdb_transactiondb_flush_wal(txn_db, 1, &err);
CheckNoError(err);
rocksdb_transactiondb_flush_cf(txn_db, flush_options, cfh, &err);
CheckNoError(err);
rocksdb_transactiondb_flush(txn_db, flush_options, &err);
CheckNoError(err);
rocksdb_flushoptions_destroy(flush_options);
// close and destroy
rocksdb_column_family_handle_destroy(cfh);
rocksdb_transaction_destroy(txn);
rocksdb_transactiondb_close(txn_db);
rocksdb_destroy_db(options, dbname, &err);
CheckNoError(err);
rocksdb_transaction_options_destroy(txn_options);
rocksdb_transactiondb_options_destroy(txn_db_options);
}
StartPhase("two-phase commit");
{
// open a TransactionDB
txn_db_options = rocksdb_transactiondb_options_create();
txn_options = rocksdb_transaction_options_create();
rocksdb_options_set_create_if_missing(options, 1);
txn_db = rocksdb_transactiondb_open(options, txn_db_options, dbname, &err);
CheckNoError(err);
rocksdb_transaction_options_set_skip_prepare(txn_options, 0);
txn = rocksdb_transaction_begin(txn_db, woptions, txn_options, NULL);
rocksdb_transaction_commit(txn, &err);
CheckCondition(err != NULL);
Free(&err);
err = NULL;
rocksdb_transaction_prepare(txn, &err);
CheckCondition(err != NULL);
Free(&err);
err = NULL;
rocksdb_transaction_set_name(txn, "txn1", 4, &err);
CheckNoError(err);
rocksdb_transaction_prepare(txn, &err);
CheckNoError(err);
rocksdb_transaction_commit(txn, &err);
CheckNoError(err);
rocksdb_transaction_destroy(txn);
// prepare 2 transactions and close db.
rocksdb_transaction_t* txn1 =
rocksdb_transaction_begin(txn_db, woptions, txn_options, NULL);
rocksdb_transaction_put(txn1, "bar1", 4, "1", 1, &err);
CheckNoError(err);
rocksdb_transaction_set_name(txn1, "txn1", 4, &err);
CheckNoError(err);
rocksdb_transaction_prepare(txn1, &err);
CheckNoError(err);
rocksdb_transaction_t* txn2 =
rocksdb_transaction_begin(txn_db, woptions, txn_options, NULL);
rocksdb_transaction_put(txn2, "bar2", 4, "2", 1, &err);
CheckNoError(err);
rocksdb_transaction_set_name(txn2, "txn2", 4, &err);
CheckNoError(err);
rocksdb_transaction_prepare(txn2, &err);
CheckNoError(err);
rocksdb_transaction_destroy(txn1);
rocksdb_transaction_destroy(txn2);
rocksdb_transactiondb_close(txn_db);
rocksdb_transaction_options_destroy(txn_options);
rocksdb_transactiondb_options_destroy(txn_db_options);
// reopen db and get all prepared.
txn_db_options = rocksdb_transactiondb_options_create();
txn_options = rocksdb_transaction_options_create();
rocksdb_options_set_error_if_exists(options, 0);
txn_db = rocksdb_transactiondb_open(options, txn_db_options, dbname, &err);
CheckNoError(err);
CheckTxnDBPinGet(txn_db, roptions, "bar1", NULL);
CheckTxnDBPinGet(txn_db, roptions, "bar2", NULL);
size_t cnt;
rocksdb_transaction_t** txns =
rocksdb_transactiondb_get_prepared_transactions(txn_db, &cnt);
CheckCondition(cnt == 2);
size_t i;
for (i = 0; i < cnt; i++) {
txn = txns[i];
size_t name_len = 0;
char* name = rocksdb_transaction_get_name(txn, &name_len);
CheckCondition(name_len == 4);
if (strncmp(name, "txn1", name_len) == 0) {
rocksdb_transaction_commit(txn, &err);
} else if (strncmp(name, "txn2", name_len) == 0) {
rocksdb_transaction_rollback(txn, &err);
}
rocksdb_free(name);
CheckNoError(err);
rocksdb_transaction_destroy(txn);
}
rocksdb_free(txns);
CheckTxnDBGet(txn_db, roptions, "bar1", "1");
CheckTxnDBGet(txn_db, roptions, "bar2", NULL);
rocksdb_transactiondb_put(txn_db, woptions, "bar2", 4, "2", 1, &err);
CheckNoError(err);
// close and destroy
rocksdb_transactiondb_close(txn_db);
rocksdb_destroy_db(options, dbname, &err);
CheckNoError(err);
@ -2758,6 +2993,7 @@ int main(int argc, char** argv) {
rocksdb_transaction_put(txn2, "key1", 4, "value1", 6, &err);
CheckNoError(err);
CheckTxnGet(txn1, roptions, "key", "value");
CheckTxnPinGet(txn1, roptions, "key", "value");
rocksdb_transaction_commit(txn1, &err);
CheckNoError(err);
rocksdb_transaction_commit(txn2, &err);
@ -2784,6 +3020,19 @@ int main(int argc, char** argv) {
txn);
CheckGetCF(db, roptions, cfh1, "key_cf1", "val_cf1");
CheckTxnGetCF(txn, roptions, cfh1, "key_cf1", "val_cf1");
CheckTxnPinGetCF(txn, roptions, cfh1, "key_cf1", "val_cf1");
{
const rocksdb_column_family_handle_t* get_handles[3] = {cfh1, cfh2, cfh2};
const char* keys[3] = {"key_cf1", "key_cf2", "notfound"};
const size_t keys_sizes[3] = {7, 7, 8};
char* vals[3];
size_t vals_sizes[3];
char* errs[3];
const char* expected[3] = {"val_cf1", "val_cf2", NULL};
rocksdb_transaction_multi_get_cf(txn, roptions, get_handles, 3, keys,
keys_sizes, vals, vals_sizes, errs);
CheckMultiGetValues(3, vals, vals_sizes, errs, expected);
}
// Check iterator with column family
rocksdb_transaction_put_cf(txn, cfh1, "key1_cf", 7, "val1_cf", 7, &err);
@ -2829,6 +3078,9 @@ int main(int argc, char** argv) {
CheckTxnGetCF(txn_cf, roptions, cf_handles[0], "key", "value");
CheckTxnGetCF(txn_cf, roptions, cf_handles[1], "key_cf1", "val_cf1");
CheckTxnGetCF(txn_cf, roptions, cf_handles[2], "key_cf2", "val_cf2");
CheckTxnPinGetCF(txn_cf, roptions, cf_handles[0], "key", "value");
CheckTxnPinGetCF(txn_cf, roptions, cf_handles[1], "key_cf1", "val_cf1");
CheckTxnPinGetCF(txn_cf, roptions, cf_handles[2], "key_cf2", "val_cf2");
rocksdb_transaction_destroy(txn_cf);
rocksdb_options_destroy(cf_options);
rocksdb_column_family_handle_destroy(cf_handles[0]);

@ -2083,6 +2083,20 @@ extern ROCKSDB_LIBRARY_API rocksdb_transaction_t* rocksdb_transaction_begin(
const rocksdb_transaction_options_t* txn_options,
rocksdb_transaction_t* old_txn);
extern ROCKSDB_LIBRARY_API rocksdb_transaction_t**
rocksdb_transactiondb_get_prepared_transactions(rocksdb_transactiondb_t* txn_db,
size_t* cnt);
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_set_name(
rocksdb_transaction_t* txn, const char* name, size_t name_len,
char** errptr);
extern ROCKSDB_LIBRARY_API char* rocksdb_transaction_get_name(
rocksdb_transaction_t* txn, size_t* name_len);
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_prepare(
rocksdb_transaction_t* txn, char** errptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_commit(
rocksdb_transaction_t* txn, char** errptr);
@ -2098,6 +2112,17 @@ extern ROCKSDB_LIBRARY_API void rocksdb_transaction_rollback_to_savepoint(
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_destroy(
rocksdb_transaction_t* txn);
extern ROCKSDB_LIBRARY_API rocksdb_writebatch_wi_t*
rocksdb_transaction_get_writebatch_wi(rocksdb_transaction_t* txn);
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_rebuild_from_writebatch(
rocksdb_transaction_t* txn, rocksdb_writebatch_t* writebatch,
char** errptr);
// This rocksdb_writebatch_wi_t should be freed with rocksdb_free
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_rebuild_from_writebatch_wi(
rocksdb_transaction_t* txn, rocksdb_writebatch_wi_t* wi, char** errptr);
// This snapshot should be freed using rocksdb_free
extern ROCKSDB_LIBRARY_API const rocksdb_snapshot_t*
rocksdb_transaction_get_snapshot(rocksdb_transaction_t* txn);
@ -2106,30 +2131,91 @@ extern ROCKSDB_LIBRARY_API char* rocksdb_transaction_get(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
const char* key, size_t klen, size_t* vlen, char** errptr);
extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t*
rocksdb_transaction_get_pinned(rocksdb_transaction_t* txn,
const rocksdb_readoptions_t* options,
const char* key, size_t klen, char** errptr);
extern ROCKSDB_LIBRARY_API char* rocksdb_transaction_get_cf(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key, size_t klen,
size_t* vlen, char** errptr);
extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t*
rocksdb_transaction_get_pinned_cf(rocksdb_transaction_t* txn,
const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family,
const char* key, size_t klen, char** errptr);
extern ROCKSDB_LIBRARY_API char* rocksdb_transaction_get_for_update(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
const char* key, size_t klen, size_t* vlen, unsigned char exclusive,
char** errptr);
extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t*
rocksdb_transaction_get_pinned_for_update(rocksdb_transaction_t* txn,
const rocksdb_readoptions_t* options,
const char* key, size_t klen,
unsigned char exclusive,
char** errptr);
extern ROCKSDB_LIBRARY_API char* rocksdb_transaction_get_for_update_cf(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key, size_t klen,
size_t* vlen, unsigned char exclusive, char** errptr);
extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t*
rocksdb_transaction_get_pinned_for_update_cf(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key, size_t klen,
unsigned char exclusive, char** errptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_multi_get(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
size_t num_keys, const char* const* keys_list,
const size_t* keys_list_sizes, char** values_list,
size_t* values_list_sizes, char** errs);
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_multi_get_cf(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
const rocksdb_column_family_handle_t* const* column_families,
size_t num_keys, const char* const* keys_list,
const size_t* keys_list_sizes, char** values_list,
size_t* values_list_sizes, char** errs);
extern ROCKSDB_LIBRARY_API char* rocksdb_transactiondb_get(
rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options,
const char* key, size_t klen, size_t* vlen, char** errptr);
extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t*
rocksdb_transactiondb_get_pinned(rocksdb_transactiondb_t* txn_db,
const rocksdb_readoptions_t* options,
const char* key, size_t klen, char** errptr);
extern ROCKSDB_LIBRARY_API char* rocksdb_transactiondb_get_cf(
rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key,
size_t keylen, size_t* vallen, char** errptr);
extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t*
rocksdb_transactiondb_get_pinned_cf(
rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key,
size_t keylen, char** errptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_multi_get(
rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options,
size_t num_keys, const char* const* keys_list,
const size_t* keys_list_sizes, char** values_list,
size_t* values_list_sizes, char** errs);
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_multi_get_cf(
rocksdb_transactiondb_t* txn_db, const rocksdb_readoptions_t* options,
const rocksdb_column_family_handle_t* const* column_families,
size_t num_keys, const char* const* keys_list,
const size_t* keys_list_sizes, char** values_list,
size_t* values_list_sizes, char** errs);
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_put(
rocksdb_transaction_t* txn, const char* key, size_t klen, const char* val,
size_t vlen, char** errptr);
@ -2205,6 +2291,17 @@ rocksdb_transactiondb_create_iterator_cf(
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_close(
rocksdb_transactiondb_t* txn_db);
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush(
rocksdb_transactiondb_t* txn_db, const rocksdb_flushoptions_t* options,
char** errptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush_cf(
rocksdb_transactiondb_t* txn_db, const rocksdb_flushoptions_t* options,
rocksdb_column_family_handle_t* column_family, char** errptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush_wal(
rocksdb_transactiondb_t* txn_db, unsigned char sync, char** errptr);
extern ROCKSDB_LIBRARY_API rocksdb_checkpoint_t*
rocksdb_transactiondb_checkpoint_object_create(rocksdb_transactiondb_t* txn_db,
char** errptr);
@ -2294,6 +2391,9 @@ extern ROCKSDB_LIBRARY_API void
rocksdb_transaction_options_set_max_write_batch_size(
rocksdb_transaction_options_t* opt, size_t size);
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_options_set_skip_prepare(
rocksdb_transaction_options_t* opt, unsigned char v);
extern ROCKSDB_LIBRARY_API rocksdb_optimistictransaction_options_t*
rocksdb_optimistictransaction_options_create(void);

Loading…
Cancel
Save