fork of https://github.com/rust-rocksdb/rust-rocksdb for nextgraph
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
368 lines
13 KiB
368 lines
13 KiB
2 years ago
|
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
|
||
|
// This source code is licensed under both the GPLv2 (found in the
|
||
|
// COPYING file in the root directory) and Apache 2.0 License
|
||
|
// (found in the LICENSE.Apache file in the root directory).
|
||
|
|
||
|
#include "format.h"
|
||
|
|
||
|
#include <algorithm>
|
||
|
#include <map>
|
||
|
#include <memory>
|
||
|
|
||
|
#include "utilities/cassandra/serialize.h"
|
||
|
|
||
|
namespace ROCKSDB_NAMESPACE {
|
||
|
namespace cassandra {
|
||
|
namespace {
|
||
|
const int32_t kDefaultLocalDeletionTime = std::numeric_limits<int32_t>::max();
|
||
|
const int64_t kDefaultMarkedForDeleteAt = std::numeric_limits<int64_t>::min();
|
||
|
} // namespace
|
||
|
|
||
|
ColumnBase::ColumnBase(int8_t mask, int8_t index)
|
||
|
: mask_(mask), index_(index) {}
|
||
|
|
||
|
std::size_t ColumnBase::Size() const { return sizeof(mask_) + sizeof(index_); }
|
||
|
|
||
|
int8_t ColumnBase::Mask() const { return mask_; }
|
||
|
|
||
|
int8_t ColumnBase::Index() const { return index_; }
|
||
|
|
||
|
void ColumnBase::Serialize(std::string* dest) const {
|
||
|
ROCKSDB_NAMESPACE::cassandra::Serialize<int8_t>(mask_, dest);
|
||
|
ROCKSDB_NAMESPACE::cassandra::Serialize<int8_t>(index_, dest);
|
||
|
}
|
||
|
|
||
|
std::shared_ptr<ColumnBase> ColumnBase::Deserialize(const char* src,
|
||
|
std::size_t offset) {
|
||
|
int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
|
||
|
if ((mask & ColumnTypeMask::DELETION_MASK) != 0) {
|
||
|
return Tombstone::Deserialize(src, offset);
|
||
|
} else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) {
|
||
|
return ExpiringColumn::Deserialize(src, offset);
|
||
|
} else {
|
||
|
return Column::Deserialize(src, offset);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
Column::Column(int8_t mask, int8_t index, int64_t timestamp, int32_t value_size,
|
||
|
const char* value)
|
||
|
: ColumnBase(mask, index),
|
||
|
timestamp_(timestamp),
|
||
|
value_size_(value_size),
|
||
|
value_(value) {}
|
||
|
|
||
|
int64_t Column::Timestamp() const { return timestamp_; }
|
||
|
|
||
|
std::size_t Column::Size() const {
|
||
|
return ColumnBase::Size() + sizeof(timestamp_) + sizeof(value_size_) +
|
||
|
value_size_;
|
||
|
}
|
||
|
|
||
|
void Column::Serialize(std::string* dest) const {
|
||
|
ColumnBase::Serialize(dest);
|
||
|
ROCKSDB_NAMESPACE::cassandra::Serialize<int64_t>(timestamp_, dest);
|
||
|
ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(value_size_, dest);
|
||
|
dest->append(value_, value_size_);
|
||
|
}
|
||
|
|
||
|
std::shared_ptr<Column> Column::Deserialize(const char* src,
|
||
|
std::size_t offset) {
|
||
|
int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
|
||
|
offset += sizeof(mask);
|
||
|
int8_t index = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
|
||
|
offset += sizeof(index);
|
||
|
int64_t timestamp =
|
||
|
ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
|
||
|
offset += sizeof(timestamp);
|
||
|
int32_t value_size =
|
||
|
ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
|
||
|
offset += sizeof(value_size);
|
||
|
return std::make_shared<Column>(mask, index, timestamp, value_size,
|
||
|
src + offset);
|
||
|
}
|
||
|
|
||
|
ExpiringColumn::ExpiringColumn(int8_t mask, int8_t index, int64_t timestamp,
|
||
|
int32_t value_size, const char* value,
|
||
|
int32_t ttl)
|
||
|
: Column(mask, index, timestamp, value_size, value), ttl_(ttl) {}
|
||
|
|
||
|
std::size_t ExpiringColumn::Size() const {
|
||
|
return Column::Size() + sizeof(ttl_);
|
||
|
}
|
||
|
|
||
|
void ExpiringColumn::Serialize(std::string* dest) const {
|
||
|
Column::Serialize(dest);
|
||
|
ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(ttl_, dest);
|
||
|
}
|
||
|
|
||
|
std::chrono::time_point<std::chrono::system_clock> ExpiringColumn::TimePoint()
|
||
|
const {
|
||
|
return std::chrono::time_point<std::chrono::system_clock>(
|
||
|
std::chrono::microseconds(Timestamp()));
|
||
|
}
|
||
|
|
||
|
std::chrono::seconds ExpiringColumn::Ttl() const {
|
||
|
return std::chrono::seconds(ttl_);
|
||
|
}
|
||
|
|
||
|
bool ExpiringColumn::Expired() const {
|
||
|
return TimePoint() + Ttl() < std::chrono::system_clock::now();
|
||
|
}
|
||
|
|
||
|
std::shared_ptr<Tombstone> ExpiringColumn::ToTombstone() const {
|
||
|
auto expired_at = (TimePoint() + Ttl()).time_since_epoch();
|
||
|
int32_t local_deletion_time = static_cast<int32_t>(
|
||
|
std::chrono::duration_cast<std::chrono::seconds>(expired_at).count());
|
||
|
int64_t marked_for_delete_at =
|
||
|
std::chrono::duration_cast<std::chrono::microseconds>(expired_at).count();
|
||
|
return std::make_shared<Tombstone>(
|
||
|
static_cast<int8_t>(ColumnTypeMask::DELETION_MASK), Index(),
|
||
|
local_deletion_time, marked_for_delete_at);
|
||
|
}
|
||
|
|
||
|
std::shared_ptr<ExpiringColumn> ExpiringColumn::Deserialize(
|
||
|
const char* src, std::size_t offset) {
|
||
|
int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
|
||
|
offset += sizeof(mask);
|
||
|
int8_t index = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
|
||
|
offset += sizeof(index);
|
||
|
int64_t timestamp =
|
||
|
ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
|
||
|
offset += sizeof(timestamp);
|
||
|
int32_t value_size =
|
||
|
ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
|
||
|
offset += sizeof(value_size);
|
||
|
const char* value = src + offset;
|
||
|
offset += value_size;
|
||
|
int32_t ttl = ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
|
||
|
return std::make_shared<ExpiringColumn>(mask, index, timestamp, value_size,
|
||
|
value, ttl);
|
||
|
}
|
||
|
|
||
|
Tombstone::Tombstone(int8_t mask, int8_t index, int32_t local_deletion_time,
|
||
|
int64_t marked_for_delete_at)
|
||
|
: ColumnBase(mask, index),
|
||
|
local_deletion_time_(local_deletion_time),
|
||
|
marked_for_delete_at_(marked_for_delete_at) {}
|
||
|
|
||
|
int64_t Tombstone::Timestamp() const { return marked_for_delete_at_; }
|
||
|
|
||
|
std::size_t Tombstone::Size() const {
|
||
|
return ColumnBase::Size() + sizeof(local_deletion_time_) +
|
||
|
sizeof(marked_for_delete_at_);
|
||
|
}
|
||
|
|
||
|
void Tombstone::Serialize(std::string* dest) const {
|
||
|
ColumnBase::Serialize(dest);
|
||
|
ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(local_deletion_time_, dest);
|
||
|
ROCKSDB_NAMESPACE::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest);
|
||
|
}
|
||
|
|
||
|
bool Tombstone::Collectable(int32_t gc_grace_period_in_seconds) const {
|
||
|
auto local_deleted_at = std::chrono::time_point<std::chrono::system_clock>(
|
||
|
std::chrono::seconds(local_deletion_time_));
|
||
|
auto gc_grace_period = std::chrono::seconds(gc_grace_period_in_seconds);
|
||
|
return local_deleted_at + gc_grace_period < std::chrono::system_clock::now();
|
||
|
}
|
||
|
|
||
|
std::shared_ptr<Tombstone> Tombstone::Deserialize(const char* src,
|
||
|
std::size_t offset) {
|
||
|
int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
|
||
|
offset += sizeof(mask);
|
||
|
int8_t index = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
|
||
|
offset += sizeof(index);
|
||
|
int32_t local_deletion_time =
|
||
|
ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
|
||
|
offset += sizeof(int32_t);
|
||
|
int64_t marked_for_delete_at =
|
||
|
ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
|
||
|
return std::make_shared<Tombstone>(mask, index, local_deletion_time,
|
||
|
marked_for_delete_at);
|
||
|
}
|
||
|
|
||
|
RowValue::RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at)
|
||
|
: local_deletion_time_(local_deletion_time),
|
||
|
marked_for_delete_at_(marked_for_delete_at),
|
||
|
columns_(),
|
||
|
last_modified_time_(0) {}
|
||
|
|
||
|
RowValue::RowValue(Columns columns, int64_t last_modified_time)
|
||
|
: local_deletion_time_(kDefaultLocalDeletionTime),
|
||
|
marked_for_delete_at_(kDefaultMarkedForDeleteAt),
|
||
|
columns_(std::move(columns)),
|
||
|
last_modified_time_(last_modified_time) {}
|
||
|
|
||
|
std::size_t RowValue::Size() const {
|
||
|
std::size_t size =
|
||
|
sizeof(local_deletion_time_) + sizeof(marked_for_delete_at_);
|
||
|
for (const auto& column : columns_) {
|
||
|
size += column->Size();
|
||
|
}
|
||
|
return size;
|
||
|
}
|
||
|
|
||
|
int64_t RowValue::LastModifiedTime() const {
|
||
|
if (IsTombstone()) {
|
||
|
return marked_for_delete_at_;
|
||
|
} else {
|
||
|
return last_modified_time_;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
bool RowValue::IsTombstone() const {
|
||
|
return marked_for_delete_at_ > kDefaultMarkedForDeleteAt;
|
||
|
}
|
||
|
|
||
|
void RowValue::Serialize(std::string* dest) const {
|
||
|
ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(local_deletion_time_, dest);
|
||
|
ROCKSDB_NAMESPACE::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest);
|
||
|
for (const auto& column : columns_) {
|
||
|
column->Serialize(dest);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
RowValue RowValue::RemoveExpiredColumns(bool* changed) const {
|
||
|
*changed = false;
|
||
|
Columns new_columns;
|
||
|
for (auto& column : columns_) {
|
||
|
if (column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
|
||
|
std::shared_ptr<ExpiringColumn> expiring_column =
|
||
|
std::static_pointer_cast<ExpiringColumn>(column);
|
||
|
|
||
|
if (expiring_column->Expired()) {
|
||
|
*changed = true;
|
||
|
continue;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
new_columns.push_back(column);
|
||
|
}
|
||
|
return RowValue(std::move(new_columns), last_modified_time_);
|
||
|
}
|
||
|
|
||
|
RowValue RowValue::ConvertExpiredColumnsToTombstones(bool* changed) const {
|
||
|
*changed = false;
|
||
|
Columns new_columns;
|
||
|
for (auto& column : columns_) {
|
||
|
if (column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
|
||
|
std::shared_ptr<ExpiringColumn> expiring_column =
|
||
|
std::static_pointer_cast<ExpiringColumn>(column);
|
||
|
|
||
|
if (expiring_column->Expired()) {
|
||
|
std::shared_ptr<Tombstone> tombstone = expiring_column->ToTombstone();
|
||
|
new_columns.push_back(tombstone);
|
||
|
*changed = true;
|
||
|
continue;
|
||
|
}
|
||
|
}
|
||
|
new_columns.push_back(column);
|
||
|
}
|
||
|
return RowValue(std::move(new_columns), last_modified_time_);
|
||
|
}
|
||
|
|
||
|
RowValue RowValue::RemoveTombstones(int32_t gc_grace_period) const {
|
||
|
Columns new_columns;
|
||
|
for (auto& column : columns_) {
|
||
|
if (column->Mask() == ColumnTypeMask::DELETION_MASK) {
|
||
|
std::shared_ptr<Tombstone> tombstone =
|
||
|
std::static_pointer_cast<Tombstone>(column);
|
||
|
|
||
|
if (tombstone->Collectable(gc_grace_period)) {
|
||
|
continue;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
new_columns.push_back(column);
|
||
|
}
|
||
|
return RowValue(std::move(new_columns), last_modified_time_);
|
||
|
}
|
||
|
|
||
|
bool RowValue::Empty() const { return columns_.empty(); }
|
||
|
|
||
|
RowValue RowValue::Deserialize(const char* src, std::size_t size) {
|
||
|
std::size_t offset = 0;
|
||
|
assert(size >= sizeof(local_deletion_time_) + sizeof(marked_for_delete_at_));
|
||
|
int32_t local_deletion_time =
|
||
|
ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
|
||
|
offset += sizeof(int32_t);
|
||
|
int64_t marked_for_delete_at =
|
||
|
ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
|
||
|
offset += sizeof(int64_t);
|
||
|
if (offset == size) {
|
||
|
return RowValue(local_deletion_time, marked_for_delete_at);
|
||
|
}
|
||
|
|
||
|
assert(local_deletion_time == kDefaultLocalDeletionTime);
|
||
|
assert(marked_for_delete_at == kDefaultMarkedForDeleteAt);
|
||
|
Columns columns;
|
||
|
int64_t last_modified_time = 0;
|
||
|
while (offset < size) {
|
||
|
auto c = ColumnBase::Deserialize(src, offset);
|
||
|
offset += c->Size();
|
||
|
assert(offset <= size);
|
||
|
last_modified_time = std::max(last_modified_time, c->Timestamp());
|
||
|
columns.push_back(std::move(c));
|
||
|
}
|
||
|
|
||
|
return RowValue(std::move(columns), last_modified_time);
|
||
|
}
|
||
|
|
||
|
// Merge multiple row values into one.
|
||
|
// For each column in rows with same index, we pick the one with latest
|
||
|
// timestamp. And we also take row tombstone into consideration, by iterating
|
||
|
// each row from reverse timestamp order, and stop once we hit the first
|
||
|
// row tombstone.
|
||
|
RowValue RowValue::Merge(std::vector<RowValue>&& values) {
|
||
|
assert(values.size() > 0);
|
||
|
if (values.size() == 1) {
|
||
|
return std::move(values[0]);
|
||
|
}
|
||
|
|
||
|
// Merge columns by their last modified time, and skip once we hit
|
||
|
// a row tombstone.
|
||
|
std::sort(values.begin(), values.end(),
|
||
|
[](const RowValue& r1, const RowValue& r2) {
|
||
|
return r1.LastModifiedTime() > r2.LastModifiedTime();
|
||
|
});
|
||
|
|
||
|
std::map<int8_t, std::shared_ptr<ColumnBase>> merged_columns;
|
||
|
int64_t tombstone_timestamp = 0;
|
||
|
|
||
|
for (auto& value : values) {
|
||
|
if (value.IsTombstone()) {
|
||
|
if (merged_columns.size() == 0) {
|
||
|
return std::move(value);
|
||
|
}
|
||
|
tombstone_timestamp = value.LastModifiedTime();
|
||
|
break;
|
||
|
}
|
||
|
for (auto& column : value.columns_) {
|
||
|
int8_t index = column->Index();
|
||
|
if (merged_columns.find(index) == merged_columns.end()) {
|
||
|
merged_columns[index] = column;
|
||
|
} else {
|
||
|
if (column->Timestamp() > merged_columns[index]->Timestamp()) {
|
||
|
merged_columns[index] = column;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
int64_t last_modified_time = 0;
|
||
|
Columns columns;
|
||
|
for (auto& pair : merged_columns) {
|
||
|
// For some row, its last_modified_time > row tombstone_timestamp, but
|
||
|
// it might have rows whose timestamp is ealier than tombstone, so we
|
||
|
// ned to filter these rows.
|
||
|
if (pair.second->Timestamp() <= tombstone_timestamp) {
|
||
|
continue;
|
||
|
}
|
||
|
last_modified_time = std::max(last_modified_time, pair.second->Timestamp());
|
||
|
columns.push_back(std::move(pair.second));
|
||
|
}
|
||
|
return RowValue(std::move(columns), last_modified_time);
|
||
|
}
|
||
|
|
||
|
} // namespace cassandra
|
||
|
} // namespace ROCKSDB_NAMESPACE
|