Add concurrency to compacting SpatialDB

Summary: This will speed up our import times

Test Plan: Added simple unit test just to get code coverage

Reviewers: sdong, ljin, yhchiang, rven, mohaps

Reviewed By: mohaps

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D28869
main
Igor Canadi 10 years ago
parent 3c92e52338
commit cd0980150b
  1. 4
      include/rocksdb/utilities/spatial_db.h
  2. 61
      utilities/spatialdb/spatial_db.cc
  3. 5
      utilities/spatialdb/spatial_db_test.cc

@ -222,7 +222,9 @@ class SpatialDB : public StackableDB {
// Calling Compact() after inserting a bunch of elements should speed up
// reading. This is especially useful if you use SpatialDBOptions::bulk_load
virtual Status Compact() = 0;
// Num threads determines how many threads we'll use for compactions. Setting
// this to bigger number will use more IO and CPU, but finish faster
virtual Status Compact(int num_threads = 1) = 0;
// Query the specified spatial_index. Query will return all elements that
// intersect bbox, but it may also return some extra elements.

@ -11,10 +11,13 @@
#define __STDC_FORMAT_MACROS
#endif
#include <algorithm>
#include <condition_variable>
#include <inttypes.h>
#include <string>
#include <vector>
#include <algorithm>
#include <mutex>
#include <thread>
#include <set>
#include <unordered_set>
@ -561,27 +564,49 @@ class SpatialDBImpl : public SpatialDB {
return Write(write_options, &batch);
}
virtual Status Compact() override {
// TODO(icanadi) maybe do this in parallel?
Status s, t;
virtual Status Compact(int num_threads) override {
std::vector<ColumnFamilyHandle*> column_families;
column_families.push_back(data_column_family_);
for (auto& iter : name_to_index_) {
t = Flush(FlushOptions(), iter.second.column_family);
if (!t.ok()) {
s = t;
}
t = CompactRange(iter.second.column_family, nullptr, nullptr);
if (!t.ok()) {
s = t;
}
column_families.push_back(iter.second.column_family);
}
t = Flush(FlushOptions(), data_column_family_);
if (!t.ok()) {
s = t;
std::mutex state_mutex;
std::condition_variable cv;
Status s;
int threads_running = 0;
std::vector<std::thread> threads;
for (auto cfh : column_families) {
threads.emplace_back([&, cfh] {
{
std::unique_lock<std::mutex> lk(state_mutex);
cv.wait(lk, [&] { return threads_running < num_threads; });
threads_running++;
}
Status t = Flush(FlushOptions(), cfh);
if (t.ok()) {
t = CompactRange(cfh, nullptr, nullptr);
}
{
std::unique_lock<std::mutex> lk(state_mutex);
threads_running--;
if (s.ok() && !t.ok()) {
s = t;
}
cv.notify_one();
}
});
}
t = CompactRange(data_column_family_, nullptr, nullptr);
if (!t.ok()) {
s = t;
for (auto& t : threads) {
t.join();
}
return s;
}

@ -245,7 +245,10 @@ TEST(SpatialDBTest, RandomizedTest) {
elements.push_back(make_pair(blob, bbox));
}
db_->Compact();
// parallel
db_->Compact(2);
// serial
db_->Compact(1);
for (int i = 0; i < 1000; ++i) {
BoundingBox<int> int_bbox = RandomBoundingBox(128, &rnd, 10);

Loading…
Cancel
Save