Prevent concurrent multiple opens of leveldb database.

Summary:
The fcntl call cannot detect lock conflicts when invoked multiple times
from the same thread.
Use a static lockedFile Set to record the paths that are locked.
A lockfile request checks to see if htis filename already exists in
lockedFiles, if so, then it triggers an error. Otherwise, it inserts
the filename in the lockedFiles Set.
A unlock file request verifies that the filename is in the lockedFiles
set and removes it from lockedFiles set.

Test Plan: unit test attached

Reviewers: heyongqiang

Reviewed By: heyongqiang

Differential Revision: https://reviews.facebook.net/D4755
main
Dhruba Borthakur 13 years ago
parent deb1a1fa9b
commit e56b2c5a31
  1. 7
      Makefile
  2. 11
      db/db_test.cc
  3. 42
      util/env_posix.cc
  4. 57
      util/filelock_test.cc

@ -52,7 +52,8 @@ TESTS = \
table_test \
version_edit_test \
version_set_test \
write_batch_test
write_batch_test \
filelock_test
TOOLS = \
manifest_dump
@ -174,6 +175,10 @@ leveldb_server_test: thrift/test/simpletest.o $(LIBRARY)
manifest_dump: tools/manifest_dump.o $(LIBOBJECTS)
$(CXX) tools/manifest_dump.o $(LIBOBJECTS) -o $@ $(LDFLAGS)
filelock_test: util/filelock_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) util/filelock_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LDFLAGS)
ifeq ($(PLATFORM), IOS)
# For iOS, create universal object files to be used on both the simulator and
# a device.

@ -213,6 +213,10 @@ class DBTest {
ASSERT_OK(TryReopen(options));
}
Status PureReopen(Options* options, DB** db) {
return DB::Open(*options, dbname_, db);
}
Status TryReopen(Options* options) {
delete db_;
db_ = NULL;
@ -828,6 +832,13 @@ TEST(DBTest, WAL) {
ASSERT_EQ("v3", Get("foo"));
}
TEST(DBTest, CheckLock) {
DB* localdb;
Options options = CurrentOptions();
ASSERT_TRUE(TryReopen(&options).ok());
ASSERT_TRUE(!(PureReopen(&options, &localdb).ok())); // second open should fail
}
TEST(DBTest, FLUSH) {
Options options = CurrentOptions();
WriteOptions writeOpt = WriteOptions();

@ -3,6 +3,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <deque>
#include <set>
#include <dirent.h>
#include <errno.h>
#include <fcntl.h>
@ -31,6 +32,10 @@ namespace leveldb {
namespace {
// list of pathnames that are locked
static std::set<std::string> lockedFiles;
static port::Mutex mutex_lockedFiles;
static Status IOError(const std::string& context, int err_number) {
return Status::IOError(context, strerror(err_number));
}
@ -291,7 +296,28 @@ class PosixMmapFile : public WritableFile {
}
};
static int LockOrUnlock(int fd, bool lock) {
static int LockOrUnlock(const std::string& fname, int fd, bool lock) {
mutex_lockedFiles.Lock();
if (lock) {
// If it already exists in the lockedFiles set, then it is already locked,
// and fail this lock attempt. Otherwise, insert it into lockedFiles.
// This check is needed because fcntl() does not detect lock conflict
// if the fcntl is issued by the same thread that earlier acquired
// this lock.
if (lockedFiles.insert(fname).second == false) {
mutex_lockedFiles.Unlock();
errno = ENOLCK;
return -1;
}
} else {
// If we are unlocking, then verify that we had locked it earlier,
// it should already exist in lockedFiles. Remove it from lockedFiles.
if (lockedFiles.erase(fname) != 1) {
mutex_lockedFiles.Unlock();
errno = ENOLCK;
return -1;
}
}
errno = 0;
struct flock f;
memset(&f, 0, sizeof(f));
@ -299,12 +325,19 @@ static int LockOrUnlock(int fd, bool lock) {
f.l_whence = SEEK_SET;
f.l_start = 0;
f.l_len = 0; // Lock/unlock entire file
return fcntl(fd, F_SETLK, &f);
int value = fcntl(fd, F_SETLK, &f);
if (value == -1 && lock) {
// if there is an error in locking, then remove the pathname from lockedfiles
lockedFiles.erase(fname);
}
mutex_lockedFiles.Unlock();
return value;
}
class PosixFileLock : public FileLock {
public:
int fd_;
std::string filename;
};
class PosixEnv : public Env {
@ -435,12 +468,13 @@ class PosixEnv : public Env {
int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
if (fd < 0) {
result = IOError(fname, errno);
} else if (LockOrUnlock(fd, true) == -1) {
} else if (LockOrUnlock(fname, fd, true) == -1) {
result = IOError("lock " + fname, errno);
close(fd);
} else {
PosixFileLock* my_lock = new PosixFileLock;
my_lock->fd_ = fd;
my_lock->filename = fname;
*lock = my_lock;
}
return result;
@ -449,7 +483,7 @@ class PosixEnv : public Env {
virtual Status UnlockFile(FileLock* lock) {
PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
Status result;
if (LockOrUnlock(my_lock->fd_, false) == -1) {
if (LockOrUnlock(my_lock->filename, my_lock->fd_, false) == -1) {
result = IOError("unlock", errno);
}
close(my_lock->fd_);

@ -0,0 +1,57 @@
// Copyright (c) 2012 Facebook. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "leveldb/status.h"
#include "leveldb/env.h"
#include <vector>
#include "util/coding.h"
#include "util/testharness.h"
namespace leveldb {
class LockTest {
public:
static LockTest* current_;
std::string file_;
leveldb::Env* env_;
LockTest() : file_(test::TmpDir() + "/db_testlock_file"),
env_(leveldb::Env::Default()) {
current_ = this;
}
~LockTest() {
}
Status LockFile(FileLock** db_lock) {
return env_->LockFile(file_, db_lock);
}
Status UnlockFile(FileLock* db_lock) {
return env_->UnlockFile(db_lock);
}
};
LockTest* LockTest::current_;
TEST(LockTest, LockBySameThread) {
FileLock* lock1;
FileLock* lock2;
// acquire a lock on a file
ASSERT_OK(LockFile(&lock1));
// re-acquire the lock on the same file. This should fail.
ASSERT_TRUE(LockFile(&lock2).IsIOError());
// release the lock
ASSERT_OK(UnlockFile(lock1));
}
} // namespace leveldb
int main(int argc, char** argv) {
return leveldb::test::RunAllTests();
}
Loading…
Cancel
Save