You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

368 lines
13 KiB

// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "format.h"
#include <algorithm>
#include <map>
#include <memory>
#include "utilities/cassandra/serialize.h"
namespace ROCKSDB_NAMESPACE {
namespace cassandra {
namespace {
const int32_t kDefaultLocalDeletionTime = std::numeric_limits<int32_t>::max();
const int64_t kDefaultMarkedForDeleteAt = std::numeric_limits<int64_t>::min();
} // namespace
ColumnBase::ColumnBase(int8_t mask, int8_t index)
: mask_(mask), index_(index) {}
std::size_t ColumnBase::Size() const { return sizeof(mask_) + sizeof(index_); }
int8_t ColumnBase::Mask() const { return mask_; }
int8_t ColumnBase::Index() const { return index_; }
void ColumnBase::Serialize(std::string* dest) const {
ROCKSDB_NAMESPACE::cassandra::Serialize<int8_t>(mask_, dest);
ROCKSDB_NAMESPACE::cassandra::Serialize<int8_t>(index_, dest);
}
std::shared_ptr<ColumnBase> ColumnBase::Deserialize(const char* src,
std::size_t offset) {
int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
if ((mask & ColumnTypeMask::DELETION_MASK) != 0) {
return Tombstone::Deserialize(src, offset);
} else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) {
return ExpiringColumn::Deserialize(src, offset);
} else {
return Column::Deserialize(src, offset);
}
}
Column::Column(int8_t mask, int8_t index, int64_t timestamp, int32_t value_size,
const char* value)
: ColumnBase(mask, index),
timestamp_(timestamp),
value_size_(value_size),
value_(value) {}
int64_t Column::Timestamp() const { return timestamp_; }
std::size_t Column::Size() const {
return ColumnBase::Size() + sizeof(timestamp_) + sizeof(value_size_) +
value_size_;
}
void Column::Serialize(std::string* dest) const {
ColumnBase::Serialize(dest);
ROCKSDB_NAMESPACE::cassandra::Serialize<int64_t>(timestamp_, dest);
ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(value_size_, dest);
dest->append(value_, value_size_);
}
std::shared_ptr<Column> Column::Deserialize(const char* src,
std::size_t offset) {
int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
offset += sizeof(mask);
int8_t index = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
offset += sizeof(index);
int64_t timestamp =
ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
offset += sizeof(timestamp);
int32_t value_size =
ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
offset += sizeof(value_size);
return std::make_shared<Column>(mask, index, timestamp, value_size,
src + offset);
}
ExpiringColumn::ExpiringColumn(int8_t mask, int8_t index, int64_t timestamp,
int32_t value_size, const char* value,
int32_t ttl)
: Column(mask, index, timestamp, value_size, value), ttl_(ttl) {}
std::size_t ExpiringColumn::Size() const {
return Column::Size() + sizeof(ttl_);
}
void ExpiringColumn::Serialize(std::string* dest) const {
Column::Serialize(dest);
ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(ttl_, dest);
}
std::chrono::time_point<std::chrono::system_clock> ExpiringColumn::TimePoint()
const {
return std::chrono::time_point<std::chrono::system_clock>(
std::chrono::microseconds(Timestamp()));
}
std::chrono::seconds ExpiringColumn::Ttl() const {
return std::chrono::seconds(ttl_);
}
bool ExpiringColumn::Expired() const {
return TimePoint() + Ttl() < std::chrono::system_clock::now();
}
std::shared_ptr<Tombstone> ExpiringColumn::ToTombstone() const {
auto expired_at = (TimePoint() + Ttl()).time_since_epoch();
int32_t local_deletion_time = static_cast<int32_t>(
std::chrono::duration_cast<std::chrono::seconds>(expired_at).count());
int64_t marked_for_delete_at =
std::chrono::duration_cast<std::chrono::microseconds>(expired_at).count();
return std::make_shared<Tombstone>(
static_cast<int8_t>(ColumnTypeMask::DELETION_MASK), Index(),
local_deletion_time, marked_for_delete_at);
}
std::shared_ptr<ExpiringColumn> ExpiringColumn::Deserialize(
const char* src, std::size_t offset) {
int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
offset += sizeof(mask);
int8_t index = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
offset += sizeof(index);
int64_t timestamp =
ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
offset += sizeof(timestamp);
int32_t value_size =
ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
offset += sizeof(value_size);
const char* value = src + offset;
offset += value_size;
int32_t ttl = ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
return std::make_shared<ExpiringColumn>(mask, index, timestamp, value_size,
value, ttl);
}
Tombstone::Tombstone(int8_t mask, int8_t index, int32_t local_deletion_time,
int64_t marked_for_delete_at)
: ColumnBase(mask, index),
local_deletion_time_(local_deletion_time),
marked_for_delete_at_(marked_for_delete_at) {}
int64_t Tombstone::Timestamp() const { return marked_for_delete_at_; }
std::size_t Tombstone::Size() const {
return ColumnBase::Size() + sizeof(local_deletion_time_) +
sizeof(marked_for_delete_at_);
}
void Tombstone::Serialize(std::string* dest) const {
ColumnBase::Serialize(dest);
ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(local_deletion_time_, dest);
ROCKSDB_NAMESPACE::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest);
}
bool Tombstone::Collectable(int32_t gc_grace_period_in_seconds) const {
auto local_deleted_at = std::chrono::time_point<std::chrono::system_clock>(
std::chrono::seconds(local_deletion_time_));
auto gc_grace_period = std::chrono::seconds(gc_grace_period_in_seconds);
return local_deleted_at + gc_grace_period < std::chrono::system_clock::now();
}
std::shared_ptr<Tombstone> Tombstone::Deserialize(const char* src,
std::size_t offset) {
int8_t mask = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
offset += sizeof(mask);
int8_t index = ROCKSDB_NAMESPACE::cassandra::Deserialize<int8_t>(src, offset);
offset += sizeof(index);
int32_t local_deletion_time =
ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
offset += sizeof(int32_t);
int64_t marked_for_delete_at =
ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
return std::make_shared<Tombstone>(mask, index, local_deletion_time,
marked_for_delete_at);
}
RowValue::RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at)
: local_deletion_time_(local_deletion_time),
marked_for_delete_at_(marked_for_delete_at),
columns_(),
last_modified_time_(0) {}
RowValue::RowValue(Columns columns, int64_t last_modified_time)
: local_deletion_time_(kDefaultLocalDeletionTime),
marked_for_delete_at_(kDefaultMarkedForDeleteAt),
columns_(std::move(columns)),
last_modified_time_(last_modified_time) {}
std::size_t RowValue::Size() const {
std::size_t size =
sizeof(local_deletion_time_) + sizeof(marked_for_delete_at_);
for (const auto& column : columns_) {
size += column->Size();
}
return size;
}
int64_t RowValue::LastModifiedTime() const {
if (IsTombstone()) {
return marked_for_delete_at_;
} else {
return last_modified_time_;
}
}
bool RowValue::IsTombstone() const {
return marked_for_delete_at_ > kDefaultMarkedForDeleteAt;
}
void RowValue::Serialize(std::string* dest) const {
ROCKSDB_NAMESPACE::cassandra::Serialize<int32_t>(local_deletion_time_, dest);
ROCKSDB_NAMESPACE::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest);
for (const auto& column : columns_) {
column->Serialize(dest);
}
}
RowValue RowValue::RemoveExpiredColumns(bool* changed) const {
*changed = false;
Columns new_columns;
for (auto& column : columns_) {
if (column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
std::shared_ptr<ExpiringColumn> expiring_column =
std::static_pointer_cast<ExpiringColumn>(column);
if (expiring_column->Expired()) {
*changed = true;
continue;
}
}
new_columns.push_back(column);
}
return RowValue(std::move(new_columns), last_modified_time_);
}
RowValue RowValue::ConvertExpiredColumnsToTombstones(bool* changed) const {
*changed = false;
Columns new_columns;
for (auto& column : columns_) {
if (column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
std::shared_ptr<ExpiringColumn> expiring_column =
std::static_pointer_cast<ExpiringColumn>(column);
if (expiring_column->Expired()) {
std::shared_ptr<Tombstone> tombstone = expiring_column->ToTombstone();
new_columns.push_back(tombstone);
*changed = true;
continue;
}
}
new_columns.push_back(column);
}
return RowValue(std::move(new_columns), last_modified_time_);
}
RowValue RowValue::RemoveTombstones(int32_t gc_grace_period) const {
Columns new_columns;
for (auto& column : columns_) {
if (column->Mask() == ColumnTypeMask::DELETION_MASK) {
std::shared_ptr<Tombstone> tombstone =
std::static_pointer_cast<Tombstone>(column);
if (tombstone->Collectable(gc_grace_period)) {
continue;
}
}
new_columns.push_back(column);
}
return RowValue(std::move(new_columns), last_modified_time_);
}
bool RowValue::Empty() const { return columns_.empty(); }
RowValue RowValue::Deserialize(const char* src, std::size_t size) {
std::size_t offset = 0;
assert(size >= sizeof(local_deletion_time_) + sizeof(marked_for_delete_at_));
int32_t local_deletion_time =
ROCKSDB_NAMESPACE::cassandra::Deserialize<int32_t>(src, offset);
offset += sizeof(int32_t);
int64_t marked_for_delete_at =
ROCKSDB_NAMESPACE::cassandra::Deserialize<int64_t>(src, offset);
offset += sizeof(int64_t);
if (offset == size) {
return RowValue(local_deletion_time, marked_for_delete_at);
}
assert(local_deletion_time == kDefaultLocalDeletionTime);
assert(marked_for_delete_at == kDefaultMarkedForDeleteAt);
Columns columns;
int64_t last_modified_time = 0;
while (offset < size) {
auto c = ColumnBase::Deserialize(src, offset);
offset += c->Size();
assert(offset <= size);
last_modified_time = std::max(last_modified_time, c->Timestamp());
columns.push_back(std::move(c));
}
return RowValue(std::move(columns), last_modified_time);
}
// Merge multiple row values into one.
// For each column in rows with same index, we pick the one with latest
// timestamp. And we also take row tombstone into consideration, by iterating
// each row from reverse timestamp order, and stop once we hit the first
// row tombstone.
RowValue RowValue::Merge(std::vector<RowValue>&& values) {
assert(values.size() > 0);
if (values.size() == 1) {
return std::move(values[0]);
}
// Merge columns by their last modified time, and skip once we hit
// a row tombstone.
std::sort(values.begin(), values.end(),
[](const RowValue& r1, const RowValue& r2) {
return r1.LastModifiedTime() > r2.LastModifiedTime();
});
std::map<int8_t, std::shared_ptr<ColumnBase>> merged_columns;
int64_t tombstone_timestamp = 0;
for (auto& value : values) {
if (value.IsTombstone()) {
if (merged_columns.size() == 0) {
return std::move(value);
}
tombstone_timestamp = value.LastModifiedTime();
break;
}
for (auto& column : value.columns_) {
int8_t index = column->Index();
if (merged_columns.find(index) == merged_columns.end()) {
merged_columns[index] = column;
} else {
if (column->Timestamp() > merged_columns[index]->Timestamp()) {
merged_columns[index] = column;
}
}
}
}
int64_t last_modified_time = 0;
Columns columns;
for (auto& pair : merged_columns) {
// For some row, its last_modified_time > row tombstone_timestamp, but
// it might have rows whose timestamp is ealier than tombstone, so we
// ned to filter these rows.
if (pair.second->Timestamp() <= tombstone_timestamp) {
continue;
}
last_modified_time = std::max(last_modified_time, pair.second->Timestamp());
columns.push_back(std::move(pair.second));
}
return RowValue(std::move(columns), last_modified_time);
}
} // namespace cassandra
} // namespace ROCKSDB_NAMESPACE