diff --git a/java/Makefile b/java/Makefile
index 26fa38d05..c8f443f7b 100644
--- a/java/Makefile
+++ b/java/Makefile
@@ -36,6 +36,8 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractComparator\
 	org.rocksdb.test.WriteBatchInternal\
 	org.rocksdb.test.WriteBatchTest\
         org.rocksdb.WriteOptions\
+	org.rocksdb.WriteBatchWithIndex\
+	org.rocksdb.WBWIRocksIterator
 
 ROCKSDB_MAJOR = $(shell egrep "ROCKSDB_MAJOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3)
 ROCKSDB_MINOR = $(shell egrep "ROCKSDB_MINOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3)
@@ -81,6 +83,7 @@ JAVA_TESTS = org.rocksdb.test.BackupableDBOptionsTest\
 		org.rocksdb.test.WriteBatchHandlerTest\
 		org.rocksdb.test.WriteBatchTest\
 		org.rocksdb.test.WriteOptionsTest\
+		org.rocksdb.test.WriteBatchWithIndexTest
 
 JAVA_TEST_LIBDIR = ./test-libs/
 JAVA_JUNIT_JAR = $(JAVA_TEST_LIBDIR)junit-4.12-beta-2.jar
diff --git a/java/org/rocksdb/AbstractRocksIterator.java b/java/org/rocksdb/AbstractRocksIterator.java
new file mode 100644
index 000000000..cc7cf064f
--- /dev/null
+++ b/java/org/rocksdb/AbstractRocksIterator.java
@@ -0,0 +1,105 @@
+// Copyright (c) 2014, Facebook, Inc.  All rights reserved.
+// This source code is licensed under the BSD-style license found in the
+// LICENSE file in the root directory of this source tree. An additional grant
+// of patent rights can be found in the PATENTS file in the same directory.
+
+package org.rocksdb;
+
+/**
+ * Base class implementation for Rocks Iterators
+ * in the Java API
+ * <p/>
+ * <p>Multiple threads can invoke const methods on an RocksIterator without
+ * external synchronization, but if any of the threads may call a
+ * non-const method, all threads accessing the same RocksIterator must use
+ * external synchronization.</p>
+ *
+ * @param P The type of the Parent Object from which the Rocks Iterator was
+ *          created. This is used by disposeInternal to avoid double-free
+ *          issues with the underlying C++ object.
+ * @see org.rocksdb.RocksObject
+ */
+public abstract class AbstractRocksIterator<P extends RocksObject>
+    extends RocksObject implements RocksIteratorInterface {
+  final P parent_;
+
+  protected AbstractRocksIterator(P parent, long nativeHandle) {
+    super();
+    nativeHandle_ = nativeHandle;
+    // parent must point to a valid RocksDB instance.
+    assert (parent != null);
+    // RocksIterator must hold a reference to the related parent instance
+    // to guarantee that while a GC cycle starts RocksIterator instances
+    // are freed prior to parent instances.
+    parent_ = parent;
+  }
+
+  @Override
+  public boolean isValid() {
+    assert (isInitialized());
+    return isValid0(nativeHandle_);
+  }
+
+  @Override
+  public void seekToFirst() {
+    assert (isInitialized());
+    seekToFirst0(nativeHandle_);
+  }
+
+  @Override
+  public void seekToLast() {
+    assert (isInitialized());
+    seekToLast0(nativeHandle_);
+  }
+
+  @Override
+  public void seek(byte[] target) {
+    assert (isInitialized());
+    seek0(nativeHandle_, target, target.length);
+  }
+
+  @Override
+  public void next() {
+    assert (isInitialized());
+    next0(nativeHandle_);
+  }
+
+  @Override
+  public void prev() {
+    assert (isInitialized());
+    prev0(nativeHandle_);
+  }
+
+  @Override
+  public void status() throws RocksDBException {
+    assert (isInitialized());
+    status0(nativeHandle_);
+  }
+
+  /**
+   * <p>Deletes underlying C++ iterator pointer.</p>
+   * <p/>
+   * <p>Note: the underlying handle can only be safely deleted if the parent
+   * instance related to a certain RocksIterator is still valid and initialized.
+   * Therefore {@code disposeInternal()} checks if the parent is initialized
+   * before freeing the native handle.</p>
+   */
+  @Override
+  protected void disposeInternal() {
+    synchronized (parent_) {
+      assert (isInitialized());
+      if (parent_.isInitialized()) {
+        disposeInternal(nativeHandle_);
+      }
+    }
+  }
+
+  abstract void disposeInternal(long handle);
+  abstract boolean isValid0(long handle);
+  abstract void seekToFirst0(long handle);
+  abstract void seekToLast0(long handle);
+  abstract void next0(long handle);
+  abstract void prev0(long handle);
+  abstract void seek0(long handle, byte[] target, int targetLen);
+  abstract void status0(long handle) throws RocksDBException;
+}
diff --git a/java/org/rocksdb/AbstractWriteBatch.java b/java/org/rocksdb/AbstractWriteBatch.java
new file mode 100644
index 000000000..b380c5d8a
--- /dev/null
+++ b/java/org/rocksdb/AbstractWriteBatch.java
@@ -0,0 +1,92 @@
+// Copyright (c) 2014, Facebook, Inc.  All rights reserved.
+// This source code is licensed under the BSD-style license found in the
+// LICENSE file in the root directory of this source tree. An additional grant
+// of patent rights can be found in the PATENTS file in the same directory.
+
+package org.rocksdb;
+
+public abstract class AbstractWriteBatch extends RocksObject implements WriteBatchInterface {
+
+  @Override
+  public int count() {
+    assert (isInitialized());
+    return count0();
+  }
+
+  @Override
+  public void put(byte[] key, byte[] value) {
+    assert (isInitialized());
+    put(key, key.length, value, value.length);
+  }
+
+  @Override
+  public void put(ColumnFamilyHandle columnFamilyHandle, byte[] key, byte[] value) {
+    assert (isInitialized());
+    put(key, key.length, value, value.length, columnFamilyHandle.nativeHandle_);
+  }
+
+  @Override
+  public void merge(byte[] key, byte[] value) {
+    assert (isInitialized());
+    merge(key, key.length, value, value.length);
+  }
+
+  @Override
+  public void merge(ColumnFamilyHandle columnFamilyHandle, byte[] key, byte[] value) {
+    assert (isInitialized());
+    merge(key, key.length, value, value.length, columnFamilyHandle.nativeHandle_);
+  }
+
+  @Override
+  public void remove(byte[] key) {
+    assert (isInitialized());
+    remove(key, key.length);
+  }
+
+  @Override
+  public void remove(ColumnFamilyHandle columnFamilyHandle, byte[] key) {
+    assert (isInitialized());
+    remove(key, key.length, columnFamilyHandle.nativeHandle_);
+  }
+
+  @Override
+  public void putLogData(byte[] blob) {
+    assert (isInitialized());
+    putLogData(blob, blob.length);
+  }
+
+  @Override
+  public void clear() {
+    assert (isInitialized());
+    clear0();
+  }
+
+  /**
+   * Delete the c++ side pointer.
+   */
+  @Override
+  protected void disposeInternal() {
+    assert (isInitialized());
+    disposeInternal(nativeHandle_);
+  }
+
+  abstract void disposeInternal(long handle);
+
+  abstract int count0();
+
+  abstract void put(byte[] key, int keyLen, byte[] value, int valueLen);
+
+  abstract void put(byte[] key, int keyLen, byte[] value, int valueLen, long cfHandle);
+
+  abstract void merge(byte[] key, int keyLen, byte[] value, int valueLen);
+
+  abstract void merge(byte[] key, int keyLen, byte[] value, int valueLen, long cfHandle);
+
+  abstract void remove(byte[] key, int keyLen);
+
+  abstract void remove(byte[] key, int keyLen, long cfHandle);
+
+  abstract void putLogData(byte[] blob, int blobLen);
+
+  abstract void clear0();
+}
diff --git a/java/org/rocksdb/DirectSlice.java b/java/org/rocksdb/DirectSlice.java
index c69b61460..765b01586 100644
--- a/java/org/rocksdb/DirectSlice.java
+++ b/java/org/rocksdb/DirectSlice.java
@@ -16,6 +16,9 @@ import java.nio.ByteBuffer;
  * values consider using @see org.rocksdb.Slice
  */
 public class DirectSlice extends AbstractSlice<ByteBuffer> {
+  //TODO(AR) only needed by WriteBatchWithIndexTest until JDK8
+  public final static DirectSlice NONE = new DirectSlice();
+
   /**
    * Called from JNI to construct a new Java DirectSlice
    * without an underlying C++ object set
@@ -24,12 +27,12 @@ public class DirectSlice extends AbstractSlice<ByteBuffer> {
    * Note: You should be aware that
    * {@see org.rocksdb.RocksObject#disOwnNativeHandle()} is intentionally
    * called from the default DirectSlice constructor, and that it is marked as
-   * private. This is so that developers cannot construct their own default
+   * package-private. This is so that developers cannot construct their own default
    * DirectSlice objects (at present). As developers cannot construct their own
    * DirectSlice objects through this, they are not creating underlying C++
    * DirectSlice objects, and so there is nothing to free (dispose) from Java.
    */
-  private DirectSlice() {
+  DirectSlice() {
     super();
     disOwnNativeHandle();
   }
diff --git a/java/org/rocksdb/RocksDB.java b/java/org/rocksdb/RocksDB.java
index 22a608207..089882532 100644
--- a/java/org/rocksdb/RocksDB.java
+++ b/java/org/rocksdb/RocksDB.java
@@ -539,7 +539,21 @@ public class RocksDB extends RocksObject {
    */
   public void write(WriteOptions writeOpts, WriteBatch updates)
       throws RocksDBException {
-    write(writeOpts.nativeHandle_, updates.nativeHandle_);
+    write0(writeOpts.nativeHandle_, updates.nativeHandle_);
+  }
+
+  /**
+   * Apply the specified updates to the database.
+   *
+   * @param writeOpts WriteOptions instance
+   * @param updates WriteBatchWithIndex instance
+   *
+   * @throws RocksDBException thrown if error happens in underlying
+   *    native library.
+   */
+  public void write(WriteOptions writeOpts, WriteBatchWithIndex updates)
+      throws RocksDBException {
+    write1(writeOpts.nativeHandle_, updates.nativeHandle_);
   }
 
   /**
@@ -1180,6 +1194,15 @@ public class RocksDB extends RocksObject {
     return iterators;
   }
 
+  /**
+   * Gets the handle for the default column family
+   *
+   * @return The handle of the default column family
+   */
+  public ColumnFamilyHandle getDefaultColumnFamily() {
+    return new ColumnFamilyHandle(this, getDefaultColumnFamily(nativeHandle_));
+  }
+
   /**
    * Creates a new column family with the name columnFamilyName and
    * allocates a ColumnFamilyHandle within an internal structure.
@@ -1538,8 +1561,10 @@ public class RocksDB extends RocksObject {
       long handle, long writeOptHandle,
       byte[] key, int keyLen,
       byte[] value, int valueLen, long cfHandle) throws RocksDBException;
-  protected native void write(
-      long writeOptHandle, long batchHandle) throws RocksDBException;
+  protected native void write0(
+      long writeOptHandle, long wbHandle) throws RocksDBException;
+  protected native void write1(
+      long writeOptHandle, long wbwiHandle) throws RocksDBException;
   protected native boolean keyMayExist(byte[] key, int keyLen,
       StringBuffer stringBuffer);
   protected native boolean keyMayExist(byte[] key, int keyLen,
@@ -1620,6 +1645,7 @@ public class RocksDB extends RocksObject {
   protected native void releaseSnapshot(
       long nativeHandle, long snapshotHandle);
   private native void disposeInternal(long handle);
+  private native long getDefaultColumnFamily(long handle);
   private native long createColumnFamily(long handle,
       ColumnFamilyDescriptor columnFamilyDescriptor) throws RocksDBException;
   private native void dropColumnFamily(long handle, long cfHandle) throws RocksDBException;
diff --git a/java/org/rocksdb/RocksIterator.java b/java/org/rocksdb/RocksIterator.java
index 1abe7e704..bb9a6e697 100644
--- a/java/org/rocksdb/RocksIterator.java
+++ b/java/org/rocksdb/RocksIterator.java
@@ -6,9 +6,9 @@
 package org.rocksdb;
 
 /**
- * <p>An iterator yields a sequence of key/value pairs from a source.
- * The following class defines the interface. Multiple implementations
- * are provided by this library.  In particular, iterators are provided
+ * <p>An iterator that yields a sequence of key/value pairs from a source.
+ * Multiple implementations are provided by this library.
+ * In particular, iterators are provided
  * to access the contents of a Table or a DB.</p>
  *
  * <p>Multiple threads can invoke const methods on an RocksIterator without
@@ -18,67 +18,9 @@ package org.rocksdb;
  *
  * @see org.rocksdb.RocksObject
  */
-public class RocksIterator extends RocksObject {
-  public RocksIterator(RocksDB rocksDB, long nativeHandle) {
-    super();
-    nativeHandle_ = nativeHandle;
-    // rocksDB must point to a valid RocksDB instance.
-    assert(rocksDB != null);
-    // RocksIterator must hold a reference to the related RocksDB instance
-    // to guarantee that while a GC cycle starts RocksDBIterator instances
-    // are freed prior to RocksDB instances.
-    rocksDB_ = rocksDB;
-  }
-
-  /**
-   * An iterator is either positioned at a key/value pair, or
-   * not valid.  This method returns true iff the iterator is valid.
-   *
-   * @return true if iterator is valid.
-   */
-  public boolean isValid() {
-    assert(isInitialized());
-    return isValid0(nativeHandle_);
-  }
-
-  /**
-   * Position at the first key in the source.  The iterator is Valid()
-   * after this call iff the source is not empty.
-   */
-  public void seekToFirst() {
-    assert(isInitialized());
-    seekToFirst0(nativeHandle_);
-  }
-
-  /**
-   * Position at the last key in the source.  The iterator is
-   * valid after this call iff the source is not empty.
-   */
-  public void seekToLast() {
-    assert(isInitialized());
-    seekToLast0(nativeHandle_);
-  }
-
-  /**
-   * <p>Moves to the next entry in the source.  After this call, Valid() is
-   * true iff the iterator was not positioned at the last entry in the source.</p>
-   *
-   * <p>REQUIRES: {@link #isValid()}</p>
-   */
-  public void next() {
-    assert(isInitialized());
-    next0(nativeHandle_);
-  }
-
-  /**
-   * <p>Moves to the previous entry in the source.  After this call, Valid() is
-   * true iff the iterator was not positioned at the first entry in source.</p>
-   *
-   * <p>REQUIRES: {@link #isValid()}</p>
-   */
-  public void prev() {
-    assert(isInitialized());
-    prev0(nativeHandle_);
+public class RocksIterator extends AbstractRocksIterator<RocksDB> {
+  protected RocksIterator(RocksDB rocksDB, long nativeHandle) {
+    super(rocksDB, nativeHandle);
   }
 
   /**
@@ -108,59 +50,15 @@ public class RocksIterator extends RocksObject {
     return value0(nativeHandle_);
   }
 
-  /**
-   * <p>Position at the first key in the source that at or past target
-   * The iterator is valid after this call iff the source contains
-   * an entry that comes at or past target.</p>
-   *
-   * @param target byte array describing a key or a
-   *     key prefix to seek for.
-   */
-  public void seek(byte[] target) {
-    assert(isInitialized());
-    seek0(nativeHandle_, target, target.length);
-  }
+  @Override final native void disposeInternal(long handle);
+  @Override final native boolean isValid0(long handle);
+  @Override final native void seekToFirst0(long handle);
+  @Override final native void seekToLast0(long handle);
+  @Override final native void next0(long handle);
+  @Override final native void prev0(long handle);
+  @Override final native void seek0(long handle, byte[] target, int targetLen);
+  @Override final native void status0(long handle) throws RocksDBException;
 
-  /**
-   * If an error has occurred, return it.  Else return an ok status.
-   * If non-blocking IO is requested and this operation cannot be
-   * satisfied without doing some IO, then this returns Status::Incomplete().
-   *
-   * @throws RocksDBException thrown if error happens in underlying
-   *    native library.
-   */
-  public void status() throws RocksDBException {
-    assert(isInitialized());
-    status0(nativeHandle_);
-  }
-
-  /**
-   * <p>Deletes underlying C++ iterator pointer.</p>
-   *
-   * <p>Note: the underlying handle can only be safely deleted if the RocksDB
-   * instance related to a certain RocksIterator is still valid and initialized.
-   * Therefore {@code disposeInternal()} checks if the RocksDB is initialized
-   * before freeing the native handle.</p>
-   */
-  @Override protected void disposeInternal() {
-    synchronized (rocksDB_) {
-      assert (isInitialized());
-      if (rocksDB_.isInitialized()) {
-        disposeInternal(nativeHandle_);
-      }
-    }
-  }
-
-  private native boolean isValid0(long handle);
-  private native void disposeInternal(long handle);
-  private native void seekToFirst0(long handle);
-  private native void seekToLast0(long handle);
-  private native void next0(long handle);
-  private native void prev0(long handle);
   private native byte[] key0(long handle);
   private native byte[] value0(long handle);
-  private native void seek0(long handle, byte[] target, int targetLen);
-  private native void status0(long handle);
-
-  final RocksDB rocksDB_;
 }
diff --git a/java/org/rocksdb/RocksIteratorInterface.java b/java/org/rocksdb/RocksIteratorInterface.java
new file mode 100644
index 000000000..15f3a9aa9
--- /dev/null
+++ b/java/org/rocksdb/RocksIteratorInterface.java
@@ -0,0 +1,80 @@
+// Copyright (c) 2014, Facebook, Inc.  All rights reserved.
+// This source code is licensed under the BSD-style license found in the
+// LICENSE file in the root directory of this source tree. An additional grant
+// of patent rights can be found in the PATENTS file in the same directory.
+
+package org.rocksdb;
+
+/**
+ * <p>Defines the interface for an Iterator which provides
+ * access to data one entry at a time. Multiple implementations
+ * are provided by this library.  In particular, iterators are provided
+ * to access the contents of a DB and Write Batch.</p>
+ * <p/>
+ * <p>Multiple threads can invoke const methods on an RocksIterator without
+ * external synchronization, but if any of the threads may call a
+ * non-const method, all threads accessing the same RocksIterator must use
+ * external synchronization.</p>
+ *
+ * @see org.rocksdb.RocksObject
+ */
+public interface RocksIteratorInterface {
+
+  /**
+   * <p>An iterator is either positioned at an entry, or
+   * not valid.  This method returns true if the iterator is valid.</p>
+   *
+   * @return true if iterator is valid.
+   */
+  public boolean isValid();
+
+  /**
+   * <p>Position at the first entry in the source.  The iterator is Valid()
+   * after this call if the source is not empty.</p>
+   */
+  public void seekToFirst();
+
+  /**
+   * <p>Position at the last entry in the source.  The iterator is
+   * valid after this call if the source is not empty.</p>
+   */
+  public void seekToLast();
+
+  /**
+   * <p>Position at the first entry in the source whose key is that or
+   * past target.</p>
+   * <p/>
+   * <p>The iterator is valid after this call if the source contains
+   * a key that comes at or past target.</p>
+   *
+   * @param target byte array describing a key or a
+   *               key prefix to seek for.
+   */
+  public void seek(byte[] target);
+
+  /**
+   * <p>Moves to the next entry in the source.  After this call, Valid() is
+   * true if the iterator was not positioned at the last entry in the source.</p>
+   * <p/>
+   * <p>REQUIRES: {@link #isValid()}</p>
+   */
+  public void next();
+
+  /**
+   * <p>Moves to the previous entry in the source.  After this call, Valid() is
+   * true if the iterator was not positioned at the first entry in source.</p>
+   * <p/>
+   * <p>REQUIRES: {@link #isValid()}</p>
+   */
+  public void prev();
+
+  /**
+   * <pIf an error has occurred, return it.  Else return an ok status.
+   * If non-blocking IO is requested and this operation cannot be
+   * satisfied without doing some IO, then this returns Status::Incomplete().</p>
+   *
+   * @throws RocksDBException thrown if error happens in underlying
+   *                          native library.
+   */
+  public void status() throws RocksDBException;
+}
diff --git a/java/org/rocksdb/WBWIRocksIterator.java b/java/org/rocksdb/WBWIRocksIterator.java
new file mode 100644
index 000000000..3171cc4ee
--- /dev/null
+++ b/java/org/rocksdb/WBWIRocksIterator.java
@@ -0,0 +1,137 @@
+// Copyright (c) 2014, Facebook, Inc.  All rights reserved.
+// This source code is licensed under the BSD-style license found in the
+// LICENSE file in the root directory of this source tree. An additional grant
+// of patent rights can be found in the PATENTS file in the same directory.
+
+package org.rocksdb;
+
+public class WBWIRocksIterator extends AbstractRocksIterator<WriteBatchWithIndex> {
+  private final WriteEntry entry = new WriteEntry();
+
+  protected WBWIRocksIterator(WriteBatchWithIndex wbwi, long nativeHandle) {
+    super(wbwi, nativeHandle);
+  }
+
+  /**
+   * Get the current entry
+   *
+   * The WriteEntry is only valid
+   * until the iterator is repositioned.
+   * If you want to keep the WriteEntry across iterator
+   * movements, you must make a copy of its data!
+   *
+   * @return The WriteEntry of the current entry
+   */
+  public WriteEntry entry() {
+    assert(isInitialized());
+    assert(entry != null);
+    entry1(nativeHandle_, entry);
+    return entry;
+  }
+
+  @Override final native void disposeInternal(long handle);
+  @Override final native boolean isValid0(long handle);
+  @Override final native void seekToFirst0(long handle);
+  @Override final native void seekToLast0(long handle);
+  @Override final native void next0(long handle);
+  @Override final native void prev0(long handle);
+  @Override final native void seek0(long handle, byte[] target, int targetLen);
+  @Override final native void status0(long handle) throws RocksDBException;
+
+  private native void entry1(long handle, WriteEntry entry);
+
+  /**
+   * Enumeration of the Write operation
+   * that created the record in the Write Batch
+   */
+  public enum WriteType {
+    PUT,
+    MERGE,
+    DELETE,
+    LOG
+  }
+
+  /**
+   * Represents an entry returned by
+   * {@link org.rocksdb.WBWIRocksIterator#entry()}
+   *
+   * It is worth noting that a WriteEntry with
+   * the type {@link org.rocksdb.WBWIRocksIterator.WriteType#DELETE}
+   * or {@link org.rocksdb.WBWIRocksIterator.WriteType#LOG}
+   * will not have a value.
+   */
+  public static class WriteEntry {
+    WriteType type = null;
+    final DirectSlice key;
+    final DirectSlice value;
+
+    /**
+     * Intentionally private as this
+     * should only be instantiated in
+     * this manner by the outer WBWIRocksIterator
+     * class; The class members are then modified
+     * by calling {@link org.rocksdb.WBWIRocksIterator#entry()}
+     */
+    private WriteEntry() {
+      key = new DirectSlice();
+      value = new DirectSlice();
+    }
+
+    public WriteEntry(WriteType type, DirectSlice key, DirectSlice value) {
+      this.type = type;
+      this.key = key;
+      this.value = value;
+    }
+
+    /**
+     * Returns the type of the Write Entry
+     *
+     * @return the WriteType of the WriteEntry
+     */
+    public WriteType getType() {
+      return type;
+    }
+
+    /**
+     * Returns the key of the Write Entry
+     *
+     * @return The slice containing the key
+     * of the WriteEntry
+     */
+    public DirectSlice getKey() {
+      return key;
+    }
+
+    /**
+     * Returns the value of the Write Entry
+     *
+     * @return The slice containing the value of
+     * the WriteEntry or null if the WriteEntry has
+     * no value
+     */
+    public DirectSlice getValue() {
+      if(!value.isInitialized()) {
+        return null; //TODO(AR) migrate to JDK8 java.util.Optional#empty()
+      } else {
+        return value;
+      }
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if(other == null) {
+        return false;
+      } else if (this == other) {
+        return true;
+      } else if(other instanceof WriteEntry) {
+        final WriteEntry otherWriteEntry = (WriteEntry)other;
+        return type.equals(otherWriteEntry.type)
+            && key.equals(otherWriteEntry.key)
+            && (value.isInitialized() ? value.equals(otherWriteEntry.value)
+                : !otherWriteEntry.value.isInitialized());
+      } else {
+        return false;
+      }
+    }
+  }
+}
diff --git a/java/org/rocksdb/WriteBatch.java b/java/org/rocksdb/WriteBatch.java
index 3407033ab..24133ec39 100644
--- a/java/org/rocksdb/WriteBatch.java
+++ b/java/org/rocksdb/WriteBatch.java
@@ -22,7 +22,7 @@ package org.rocksdb;
  * non-const method, all threads accessing the same WriteBatch must use
  * external synchronization.
  */
-public class WriteBatch extends RocksObject {
+public class WriteBatch extends AbstractWriteBatch {
   /**
    * Constructs a WriteBatch instance.
    */
@@ -41,102 +41,6 @@ public class WriteBatch extends RocksObject {
     newWriteBatch(reserved_bytes);
   }
 
-  /**
-   * Returns the number of updates in the batch.
-   *
-   * @return number of items in WriteBatch
-   */
-  public native int count();
-
-  /**
-   * <p>Store the mapping "key-&gt;value" in the database.</p>
-   *
-   * @param key the specified key to be inserted.
-   * @param value the value associated with the specified key.
-   */
-  public void put(byte[] key, byte[] value) {
-    put(key, key.length, value, value.length);
-  }
-
-  /**
-   * <p>Store the mapping "key-&gt;value" within given column
-   * family.</p>
-   *
-   * @param columnFamilyHandle {@link org.rocksdb.ColumnFamilyHandle}
-   *     instance
-   * @param key the specified key to be inserted.
-   * @param value the value associated with the specified key.
-   */
-  public void put(ColumnFamilyHandle columnFamilyHandle,
-      byte[] key, byte[] value) {
-    put(key, key.length, value, value.length,
-        columnFamilyHandle.nativeHandle_);
-  }
-
-  /**
-   * <p>Merge "value" with the existing value of "key" in the database.
-   * "key-&gt;merge(existing, value)"</p>
-   *
-   * @param key the specified key to be merged.
-   * @param value the value to be merged with the current value for
-   * the specified key.
-   */
-  public void merge(byte[] key, byte[] value) {
-    merge(key, key.length, value, value.length);
-  }
-
-  /**
-   * <p>Merge "value" with the existing value of "key" in given column family.
-   * "key-&gt;merge(existing, value)"</p>
-   *
-   * @param columnFamilyHandle {@link ColumnFamilyHandle} instance
-   * @param key the specified key to be merged.
-   * @param value the value to be merged with the current value for
-   * the specified key.
-   */
-  public void merge(ColumnFamilyHandle columnFamilyHandle,
-      byte[] key, byte[] value) {
-    merge(key, key.length, value, value.length,
-        columnFamilyHandle.nativeHandle_);
-  }
-
-  /**
-   * <p>If the database contains a mapping for "key", erase it.  Else do nothing.</p>
-   *
-   * @param key Key to delete within database
-   */
-  public void remove(byte[] key) {
-    remove(key, key.length);
-  }
-
-  /**
-   * <p>If column family contains a mapping for "key", erase it.  Else do nothing.</p>
-   *
-   * @param columnFamilyHandle {@link ColumnFamilyHandle} instance
-   * @param key Key to delete within database
-   */
-  public void remove(ColumnFamilyHandle columnFamilyHandle, byte[] key) {
-    remove(key, key.length, columnFamilyHandle.nativeHandle_);
-  }
-
-  /**
-   * Append a blob of arbitrary size to the records in this batch. The blob will
-   * be stored in the transaction log but not in any other file. In particular,
-   * it will not be persisted to the SST files. When iterating over this
-   * WriteBatch, WriteBatch::Handler::LogData will be called with the contents
-   * of the blob as it is encountered. Blobs, puts, deletes, and merges will be
-   * encountered in the same order in thich they were inserted. The blob will
-   * NOT consume sequence number(s) and will NOT increase the count of the batch
-   *
-   * Example application: add timestamps to the transaction log for use in
-   * replication.
-   *
-   * @param blob binary object to be inserted
-   */
-  public void putLogData(byte[] blob) {
-    putLogData(blob, blob.length);
-  }
-
   /**
    * Support for iterating over the contents of a batch.
    *
@@ -149,36 +53,22 @@ public class WriteBatch extends RocksObject {
     iterate(handler.nativeHandle_);
   }
 
-  /**
-   * Clear all updates buffered in this batch
-   */
-  public native void clear();
-
-  /**
-   * Delete the c++ side pointer.
-   */
-  @Override protected void disposeInternal() {
-    assert(isInitialized());
-    disposeInternal(nativeHandle_);
-  }
+  @Override final native void disposeInternal(long handle);
+  @Override final native int count0();
+  @Override final native void put(byte[] key, int keyLen, byte[] value, int valueLen);
+  @Override final native void put(byte[] key, int keyLen, byte[] value, int valueLen,
+      long cfHandle);
+  @Override final native void merge(byte[] key, int keyLen, byte[] value, int valueLen);
+  @Override final native void merge(byte[] key, int keyLen, byte[] value, int valueLen,
+      long cfHandle);
+  @Override final native void remove(byte[] key, int keyLen);
+  @Override final native void remove(byte[] key, int keyLen, long cfHandle);
+  @Override final native void putLogData(byte[] blob, int blobLen);
+  @Override final native void clear0();
 
   private native void newWriteBatch(int reserved_bytes);
-  private native void put(byte[] key, int keyLen,
-                          byte[] value, int valueLen);
-  private native void put(byte[] key, int keyLen,
-                          byte[] value, int valueLen,
-                          long cfHandle);
-  private native void merge(byte[] key, int keyLen,
-                            byte[] value, int valueLen);
-  private native void merge(byte[] key, int keyLen,
-                            byte[] value, int valueLen,
-                            long cfHandle);
-  private native void remove(byte[] key, int keyLen);
-  private native void remove(byte[] key, int keyLen,
-                            long cfHandle);
-  private native void putLogData(byte[] blob, int blobLen);
   private native void iterate(long handlerHandle) throws RocksDBException;
-  private native void disposeInternal(long handle);
+
 
   /**
    * Handler callback for iterating over the contents of a batch.
diff --git a/java/org/rocksdb/WriteBatchInterface.java b/java/org/rocksdb/WriteBatchInterface.java
new file mode 100644
index 000000000..4eaf1ad9d
--- /dev/null
+++ b/java/org/rocksdb/WriteBatchInterface.java
@@ -0,0 +1,98 @@
+// Copyright (c) 2014, Facebook, Inc.  All rights reserved.
+// This source code is licensed under the BSD-style license found in the
+// LICENSE file in the root directory of this source tree. An additional grant
+// of patent rights can be found in the PATENTS file in the same directory.
+
+package org.rocksdb;
+
+/**
+ * <p>Defines the interface for a Write Batch which
+ * holds a collection of updates to apply atomically to a DB.</p>
+ */
+public interface WriteBatchInterface {
+
+    /**
+     * Returns the number of updates in the batch.
+     *
+     * @return number of items in WriteBatch
+     */
+    public int count();
+
+    /**
+     * <p>Store the mapping "key-&gt;value" in the database.</p>
+     *
+     * @param key the specified key to be inserted.
+     * @param value the value associated with the specified key.
+     */
+    public void put(byte[] key, byte[] value);
+
+    /**
+     * <p>Store the mapping "key-&gt;value" within given column
+     * family.</p>
+     *
+     * @param columnFamilyHandle {@link org.rocksdb.ColumnFamilyHandle}
+     *     instance
+     * @param key the specified key to be inserted.
+     * @param value the value associated with the specified key.
+     */
+    public void put(ColumnFamilyHandle columnFamilyHandle,
+                    byte[] key, byte[] value);
+
+    /**
+     * <p>Merge "value" with the existing value of "key" in the database.
+     * "key-&gt;merge(existing, value)"</p>
+     *
+     * @param key the specified key to be merged.
+     * @param value the value to be merged with the current value for
+     * the specified key.
+     */
+    public void merge(byte[] key, byte[] value);
+
+    /**
+     * <p>Merge "value" with the existing value of "key" in given column family.
+     * "key-&gt;merge(existing, value)"</p>
+     *
+     * @param columnFamilyHandle {@link ColumnFamilyHandle} instance
+     * @param key the specified key to be merged.
+     * @param value the value to be merged with the current value for
+     * the specified key.
+     */
+    public void merge(ColumnFamilyHandle columnFamilyHandle,
+                      byte[] key, byte[] value);
+
+    /**
+     * <p>If the database contains a mapping for "key", erase it.  Else do nothing.</p>
+     *
+     * @param key Key to delete within database
+     */
+    public void remove(byte[] key);
+
+    /**
+     * <p>If column family contains a mapping for "key", erase it.  Else do nothing.</p>
+     *
+     * @param columnFamilyHandle {@link ColumnFamilyHandle} instance
+     * @param key Key to delete within database
+     */
+    public void remove(ColumnFamilyHandle columnFamilyHandle, byte[] key);
+
+    /**
+     * Append a blob of arbitrary size to the records in this batch. The blob will
+     * be stored in the transaction log but not in any other file. In particular,
+     * it will not be persisted to the SST files. When iterating over this
+     * WriteBatch, WriteBatch::Handler::LogData will be called with the contents
+     * of the blob as it is encountered. Blobs, puts, deletes, and merges will be
+     * encountered in the same order in thich they were inserted. The blob will
+     * NOT consume sequence number(s) and will NOT increase the count of the batch
+     *
+     * Example application: add timestamps to the transaction log for use in
+     * replication.
+     *
+     * @param blob binary object to be inserted
+     */
+    public void putLogData(byte[] blob);
+
+    /**
+     * Clear all updates buffered in this batch
+     */
+    public void clear();
+}
diff --git a/java/org/rocksdb/WriteBatchWithIndex.java b/java/org/rocksdb/WriteBatchWithIndex.java
new file mode 100644
index 000000000..5204146c4
--- /dev/null
+++ b/java/org/rocksdb/WriteBatchWithIndex.java
@@ -0,0 +1,149 @@
+// Copyright (c) 2014, Facebook, Inc.  All rights reserved.
+// This source code is licensed under the BSD-style license found in the
+// LICENSE file in the root directory of this source tree. An additional grant
+// of patent rights can be found in the PATENTS file in the same directory.
+
+package org.rocksdb;
+
+/**
+ * Similar to {@link org.rocksdb.WriteBatch} but with a binary searchable
+ * index built for all the keys inserted.
+ *
+ * Calling put, merge, remove or putLogData calls the same function
+ * as with {@link org.rocksdb.WriteBatch} whilst also building an index.
+ *
+ * A user can call {@link org.rocksdb.WriteBatchWithIndex#newIterator() }to create an iterator
+ * over the write batch or
+ * {@link org.rocksdb.WriteBatchWithIndex#newIteratorWithBase(org.rocksdb.RocksIterator)} to
+ * get an iterator for the database with Read-Your-Own-Writes like capability
+ */
+public class WriteBatchWithIndex extends AbstractWriteBatch {
+  /**
+   * Creates a WriteBatchWithIndex where no bytes
+   * are reserved up-front, bytewise comparison is
+   * used for fallback key comparisons,
+   * and duplicate keys operations are retained
+   */
+  public WriteBatchWithIndex() {
+    super();
+    newWriteBatchWithIndex();
+  }
+
+
+  /**
+   * Creates a WriteBatchWithIndex where no bytes
+   * are reserved up-front, bytewise comparison is
+   * used for fallback key comparisons, and duplicate key
+   * assignment is determined by the constructor argument
+   *
+   * @param overwriteKey if true, overwrite the key in the index when
+   *   inserting a duplicate key, in this way an iterator will never
+   *   show two entries with the same key.
+   */
+  public WriteBatchWithIndex(boolean overwriteKey) {
+    super();
+    newWriteBatchWithIndex(overwriteKey);
+  }
+
+  /**
+   * Creates a WriteBatchWithIndex
+   *
+   * @param fallbackIndexComparator We fallback to this comparator
+   *  to compare keys within a column family if we cannot determine
+   *  the column family and so look up it's comparator.
+   *
+   * @param reservedBytes reserved bytes in underlying WriteBatch
+   *
+   * @param overwriteKey if true, overwrite the key in the index when
+   *   inserting a duplicate key, in this way an iterator will never
+   *   show two entries with the same key.
+   */
+  public WriteBatchWithIndex(AbstractComparator fallbackIndexComparator, int reservedBytes,
+      boolean overwriteKey) {
+    super();
+    newWriteBatchWithIndex(fallbackIndexComparator.nativeHandle_, reservedBytes, overwriteKey);
+  }
+
+  /**
+   * Create an iterator of a column family. User can call
+   * {@link org.rocksdb.RocksIteratorInterface#seek(byte[])} to
+   * search to the next entry of or after a key. Keys will be iterated in the
+   * order given by index_comparator. For multiple updates on the same key,
+   * each update will be returned as a separate entry, in the order of update
+   * time.
+   *
+   * @param columnFamilyHandle The column family to iterate over
+   * @return An iterator for the Write Batch contents, restricted to the column family
+   */
+  public WBWIRocksIterator newIterator(ColumnFamilyHandle columnFamilyHandle) {
+    return new WBWIRocksIterator(this, iterator1(columnFamilyHandle.nativeHandle_));
+  }
+
+  /**
+   * Create an iterator of the default column family. User can call
+   * {@link org.rocksdb.RocksIteratorInterface#seek(byte[])} to
+   * search to the next entry of or after a key. Keys will be iterated in the
+   * order given by index_comparator. For multiple updates on the same key,
+   * each update will be returned as a separate entry, in the order of update
+   * time.
+   *
+   * @return An iterator for the Write Batch contents
+   */
+  public WBWIRocksIterator newIterator() {
+    return new WBWIRocksIterator(this, iterator0());
+  }
+
+  /**
+   * Provides Read-Your-Own-Writes like functionality by
+   * creating a new Iterator that will use {@link org.rocksdb.WBWIRocksIterator}
+   * as a delta and baseIterator as a base
+   *
+   * @param columnFamilyHandle The column family to iterate over
+   * @param baseIterator The base iterator, e.g. {@link org.rocksdb.RocksDB#newIterator()}
+   * @return An iterator which shows a view comprised of both the database point-in-time
+   * from baseIterator and modifications made in this write batch.
+   */
+  public RocksIterator newIteratorWithBase(ColumnFamilyHandle columnFamilyHandle,
+      RocksIterator baseIterator) {
+    RocksIterator iterator = new RocksIterator(
+        baseIterator.parent_,
+        iteratorWithBase(columnFamilyHandle.nativeHandle_, baseIterator.nativeHandle_));
+    //when the iterator is deleted it will also delete the baseIterator
+    baseIterator.disOwnNativeHandle();
+    return iterator;
+  }
+
+  /**
+   * Provides Read-Your-Own-Writes like functionality by
+   * creating a new Iterator that will use {@link org.rocksdb.WBWIRocksIterator}
+   * as a delta and baseIterator as a base. Operates on the default column family.
+   *
+   * @param baseIterator The base iterator, e.g. {@link org.rocksdb.RocksDB#newIterator()}
+   * @return An iterator which shows a view comprised of both the database point-in-time
+   * from baseIterator and modifications made in this write batch.
+   */
+  public RocksIterator newIteratorWithBase(RocksIterator baseIterator) {
+    return newIteratorWithBase(baseIterator.parent_.getDefaultColumnFamily(), baseIterator);
+  }
+
+  @Override final native void disposeInternal(long handle);
+  @Override final native int count0();
+  @Override final native void put(byte[] key, int keyLen, byte[] value, int valueLen);
+  @Override final native void put(byte[] key, int keyLen, byte[] value, int valueLen,
+      long cfHandle);
+  @Override final native void merge(byte[] key, int keyLen, byte[] value, int valueLen);
+  @Override final native void merge(byte[] key, int keyLen, byte[] value, int valueLen,
+      long cfHandle);
+  @Override final native void remove(byte[] key, int keyLen);
+  @Override final native void remove(byte[] key, int keyLen, long cfHandle);
+  @Override final native void putLogData(byte[] blob, int blobLen);
+  @Override final native void clear0();
+
+  private native void newWriteBatchWithIndex();
+  private native void newWriteBatchWithIndex(boolean overwriteKey);
+  private native void newWriteBatchWithIndex(long fallbackIndexComparatorHandle, int reservedBytes,
+      boolean overwriteKey);
+  private native long iterator0();
+  private native long iterator1(long cfHandle);
+  private native long iteratorWithBase(long baseIteratorHandle, long cfHandle);
+}
diff --git a/java/org/rocksdb/test/ColumnFamilyTest.java b/java/org/rocksdb/test/ColumnFamilyTest.java
index 703ed296f..fb95e8010 100644
--- a/java/org/rocksdb/test/ColumnFamilyTest.java
+++ b/java/org/rocksdb/test/ColumnFamilyTest.java
@@ -56,6 +56,40 @@ public class ColumnFamilyTest {
     }
   }
 
+  @Test
+  public void defaultColumnFamily() throws RocksDBException {
+    RocksDB db = null;
+    Options options = null;
+    try {
+      options = new Options();
+      options.setCreateIfMissing(true);
+
+      DBOptions dbOptions = new DBOptions();
+      dbOptions.setCreateIfMissing(true);
+
+      db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath());
+      ColumnFamilyHandle cfh = db.getDefaultColumnFamily();
+      assertThat(cfh).isNotNull();
+
+      final byte[] key = "key".getBytes();
+      final byte[] value = "value".getBytes();
+
+      db.put(cfh, key, value);
+
+      final byte[] actualValue = db.get(cfh, key);
+
+      assertThat(cfh).isNotNull();
+      assertThat(actualValue).isEqualTo(value);
+    } finally {
+      if (db != null) {
+        db.close();
+      }
+      if (options != null) {
+        options.dispose();
+      }
+    }
+  }
+
   @Test
   public void createColumnFamily() throws RocksDBException {
     RocksDB db = null;
diff --git a/java/org/rocksdb/test/WriteBatchWithIndexTest.java b/java/org/rocksdb/test/WriteBatchWithIndexTest.java
new file mode 100644
index 000000000..de2b637ff
--- /dev/null
+++ b/java/org/rocksdb/test/WriteBatchWithIndexTest.java
@@ -0,0 +1,247 @@
+//  Copyright (c) 2014, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under the BSD-style license found in the
+//  LICENSE file in the root directory of this source tree. An additional grant
+//  of patent rights can be found in the PATENTS file in the same directory.
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+package org.rocksdb.test;
+
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.WriteBatchWithIndex;
+import org.rocksdb.DirectSlice;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+import org.rocksdb.WBWIRocksIterator;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+public class WriteBatchWithIndexTest {
+
+  @ClassRule
+  public static final RocksMemoryResource rocksMemoryResource =
+      new RocksMemoryResource();
+
+  @Rule
+  public TemporaryFolder dbFolder = new TemporaryFolder();
+
+  @Test
+  public void readYourOwnWrites() throws RocksDBException {
+    RocksDB db = null;
+    Options options = null;
+    try {
+      options = new Options();
+      // Setup options
+      options.setCreateIfMissing(true);
+      db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath());
+
+      final byte[] k1 = "key1".getBytes();
+      final byte[] v1 = "value1".getBytes();
+      final byte[] k2 = "key2".getBytes();
+      final byte[] v2 = "value2".getBytes();
+
+      db.put(k1, v1);
+      db.put(k2, v2);
+
+      final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true);
+
+      RocksIterator base = null;
+      RocksIterator it = null;
+      try {
+        base = db.newIterator();
+        it = wbwi.newIteratorWithBase(base);
+
+        it.seek(k1);
+        assertThat(it.isValid()).isTrue();
+        assertThat(it.key()).isEqualTo(k1);
+        assertThat(it.value()).isEqualTo(v1);
+
+        it.seek(k2);
+        assertThat(it.isValid()).isTrue();
+        assertThat(it.key()).isEqualTo(k2);
+        assertThat(it.value()).isEqualTo(v2);
+
+        //put data to the write batch and make sure we can read it.
+        final byte[] k3 = "key3".getBytes();
+        final byte[] v3 = "value3".getBytes();
+        wbwi.put(k3, v3);
+        it.seek(k3);
+        assertThat(it.isValid()).isTrue();
+        assertThat(it.key()).isEqualTo(k3);
+        assertThat(it.value()).isEqualTo(v3);
+
+        //update k2 in the write batch and check the value
+        final byte[] v2Other = "otherValue2".getBytes();
+        wbwi.put(k2, v2Other);
+        it.seek(k2);
+        assertThat(it.isValid()).isTrue();
+        assertThat(it.key()).isEqualTo(k2);
+        assertThat(it.value()).isEqualTo(v2Other);
+
+        //remove k1 and make sure we can read back the write
+        wbwi.remove(k1);
+        it.seek(k1);
+        assertThat(it.key()).isNotEqualTo(k1);
+
+        //reinsert k1 and make sure we see the new value
+        final byte[] v1Other = "otherValue1".getBytes();
+        wbwi.put(k1, v1Other);
+        it.seek(k1);
+        assertThat(it.isValid()).isTrue();
+        assertThat(it.key()).isEqualTo(k1);
+        assertThat(it.value()).isEqualTo(v1Other);
+      } finally {
+        if (it != null) {
+          it.dispose();
+        }
+        if (base != null) {
+          base.dispose();
+        }
+      }
+
+    } finally {
+      if (db != null) {
+        db.close();
+      }
+      if (options != null) {
+        options.dispose();
+      }
+    }
+  }
+
+  @Test
+  public void write_writeBatchWithIndex() throws RocksDBException {
+    RocksDB db = null;
+    Options options = null;
+    try {
+      options = new Options();
+      // Setup options
+      options.setCreateIfMissing(true);
+      db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath());
+
+      final byte[] k1 = "key1".getBytes();
+      final byte[] v1 = "value1".getBytes();
+      final byte[] k2 = "key2".getBytes();
+      final byte[] v2 = "value2".getBytes();
+
+      WriteBatchWithIndex wbwi = null;
+
+      try {
+        wbwi = new WriteBatchWithIndex();
+
+
+        wbwi.put(k1, v1);
+        wbwi.put(k2, v2);
+
+        db.write(new WriteOptions(), wbwi);
+      } finally {
+        if(wbwi != null) {
+          wbwi.dispose();
+        }
+      }
+
+      assertThat(db.get(k1)).isEqualTo(v1);
+      assertThat(db.get(k2)).isEqualTo(v2);
+
+    } finally {
+      if (db != null) {
+        db.close();
+      }
+      if (options != null) {
+        options.dispose();
+      }
+    }
+  }
+
+  @Test
+  public void iterator() throws RocksDBException {
+    final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true);
+
+    final String k1 = "key1";
+    final String v1 = "value1";
+    final String k2 = "key2";
+    final String v2 = "value2";
+    final String k3 = "key3";
+    final String v3 = "value3";
+    final byte[] k1b = k1.getBytes();
+    final byte[] v1b = v1.getBytes();
+    final byte[] k2b = k2.getBytes();
+    final byte[] v2b = v2.getBytes();
+    final byte[] k3b = k3.getBytes();
+    final byte[] v3b = v3.getBytes();
+
+    //add put records
+    wbwi.put(k1b, v1b);
+    wbwi.put(k2b, v2b);
+    wbwi.put(k3b, v3b);
+
+    //add a deletion record
+    final String k4 = "key4";
+    final byte[] k4b = k4.getBytes();
+    wbwi.remove(k4b);
+
+    WBWIRocksIterator.WriteEntry[] expected = {
+        new WBWIRocksIterator.WriteEntry(WBWIRocksIterator.WriteType.PUT,
+            new DirectSlice(k1), new DirectSlice(v1)),
+        new WBWIRocksIterator.WriteEntry(WBWIRocksIterator.WriteType.PUT,
+            new DirectSlice(k2), new DirectSlice(v2)),
+        new WBWIRocksIterator.WriteEntry(WBWIRocksIterator.WriteType.PUT,
+            new DirectSlice(k3), new DirectSlice(v3)),
+        new WBWIRocksIterator.WriteEntry(WBWIRocksIterator.WriteType.DELETE,
+            new DirectSlice(k4), DirectSlice.NONE)
+    };
+
+    WBWIRocksIterator it = null;
+    try {
+      it = wbwi.newIterator();
+
+      //direct access - seek to key offsets
+      final int[] testOffsets = {2, 0, 1, 3};
+
+      for(int i = 0; i < testOffsets.length; i++) {
+        final int testOffset = testOffsets[i];
+        final byte[] key = toArray(expected[testOffset].getKey().data());
+
+        it.seek(key);
+        assertThat(it.isValid()).isTrue();
+        assertThat(it.entry()).isEqualTo(expected[testOffset]);
+      }
+
+      //forward iterative access
+      int i = 0;
+      for(it.seekToFirst(); it.isValid(); it.next()) {
+        assertThat(it.entry()).isEqualTo(expected[i++]);
+      }
+
+      //reverse iterative access
+      i = expected.length - 1;
+      for(it.seekToLast(); it.isValid(); it.prev()) {
+        assertThat(it.entry()).isEqualTo(expected[i--]);
+      }
+
+    } finally {
+      if(it != null) {
+        it.dispose();
+      }
+    }
+  }
+
+  private byte[] toArray(final ByteBuffer buf) {
+    final byte[] ary = new byte[buf.remaining()];
+    buf.get(ary);
+    return ary;
+  }
+}
diff --git a/java/rocksjni/iterator.cc b/java/rocksjni/iterator.cc
index c7667a018..e9eb0bb37 100644
--- a/java/rocksjni/iterator.cc
+++ b/java/rocksjni/iterator.cc
@@ -14,6 +14,17 @@
 #include "rocksjni/portal.h"
 #include "rocksdb/iterator.h"
 
+/*
+ * Class:     org_rocksdb_RocksIterator
+ * Method:    disposeInternal
+ * Signature: (J)V
+ */
+void Java_org_rocksdb_RocksIterator_disposeInternal(
+    JNIEnv* env, jobject jobj, jlong handle) {
+  auto it = reinterpret_cast<rocksdb::Iterator*>(handle);
+  delete it;
+}
+
 /*
  * Class:     org_rocksdb_RocksIterator
  * Method:    isValid0
@@ -36,7 +47,7 @@ void Java_org_rocksdb_RocksIterator_seekToFirst0(
 
 /*
  * Class:     org_rocksdb_RocksIterator
- * Method:    seekToFirst0
+ * Method:    seekToLast0
  * Signature: (J)V
  */
 void Java_org_rocksdb_RocksIterator_seekToLast0(
@@ -46,7 +57,7 @@ void Java_org_rocksdb_RocksIterator_seekToLast0(
 
 /*
  * Class:     org_rocksdb_RocksIterator
- * Method:    seekToLast0
+ * Method:    next0
  * Signature: (J)V
  */
 void Java_org_rocksdb_RocksIterator_next0(
@@ -56,7 +67,7 @@ void Java_org_rocksdb_RocksIterator_next0(
 
 /*
  * Class:     org_rocksdb_RocksIterator
- * Method:    next0
+ * Method:    prev0
  * Signature: (J)V
  */
 void Java_org_rocksdb_RocksIterator_prev0(
@@ -66,41 +77,8 @@ void Java_org_rocksdb_RocksIterator_prev0(
 
 /*
  * Class:     org_rocksdb_RocksIterator
- * Method:    prev0
- * Signature: (J)V
- */
-jbyteArray Java_org_rocksdb_RocksIterator_key0(
-    JNIEnv* env, jobject jobj, jlong handle) {
-  auto it = reinterpret_cast<rocksdb::Iterator*>(handle);
-  rocksdb::Slice key_slice = it->key();
-
-  jbyteArray jkey = env->NewByteArray(static_cast<jsize>(key_slice.size()));
-  env->SetByteArrayRegion(jkey, 0, static_cast<jsize>(key_slice.size()),
-                          reinterpret_cast<const jbyte*>(key_slice.data()));
-  return jkey;
-}
-
-/*
- * Class:     org_rocksdb_RocksIterator
- * Method:    key0
- * Signature: (J)[B
- */
-jbyteArray Java_org_rocksdb_RocksIterator_value0(
-    JNIEnv* env, jobject jobj, jlong handle) {
-  auto it = reinterpret_cast<rocksdb::Iterator*>(handle);
-  rocksdb::Slice value_slice = it->value();
-
-  jbyteArray jkeyValue =
-      env->NewByteArray(static_cast<jsize>(value_slice.size()));
-  env->SetByteArrayRegion(jkeyValue, 0, static_cast<jsize>(value_slice.size()),
-                          reinterpret_cast<const jbyte*>(value_slice.data()));
-  return jkeyValue;
-}
-
-/*
- * Class:     org_rocksdb_RocksIterator
- * Method:    value0
- * Signature: (J)[B
+ * Method:    seek0
+ * Signature: (J[BI)V
  */
 void Java_org_rocksdb_RocksIterator_seek0(
     JNIEnv* env, jobject jobj, jlong handle,
@@ -117,8 +95,8 @@ void Java_org_rocksdb_RocksIterator_seek0(
 
 /*
  * Class:     org_rocksdb_RocksIterator
- * Method:    seek0
- * Signature: (J[BI)V
+ * Method:    status0
+ * Signature: (J)V
  */
 void Java_org_rocksdb_RocksIterator_status0(
     JNIEnv* env, jobject jobj, jlong handle) {
@@ -134,11 +112,33 @@ void Java_org_rocksdb_RocksIterator_status0(
 
 /*
  * Class:     org_rocksdb_RocksIterator
- * Method:    disposeInternal
- * Signature: (J)V
+ * Method:    key0
+ * Signature: (J)[B
  */
-void Java_org_rocksdb_RocksIterator_disposeInternal(
+jbyteArray Java_org_rocksdb_RocksIterator_key0(
     JNIEnv* env, jobject jobj, jlong handle) {
   auto it = reinterpret_cast<rocksdb::Iterator*>(handle);
-  delete it;
+  rocksdb::Slice key_slice = it->key();
+
+  jbyteArray jkey = env->NewByteArray(static_cast<jsize>(key_slice.size()));
+  env->SetByteArrayRegion(jkey, 0, static_cast<jsize>(key_slice.size()),
+                          reinterpret_cast<const jbyte*>(key_slice.data()));
+  return jkey;
+}
+
+/*
+ * Class:     org_rocksdb_RocksIterator
+ * Method:    value0
+ * Signature: (J)[B
+ */
+jbyteArray Java_org_rocksdb_RocksIterator_value0(
+    JNIEnv* env, jobject jobj, jlong handle) {
+  auto it = reinterpret_cast<rocksdb::Iterator*>(handle);
+  rocksdb::Slice value_slice = it->value();
+
+  jbyteArray jkeyValue =
+      env->NewByteArray(static_cast<jsize>(value_slice.size()));
+  env->SetByteArrayRegion(jkeyValue, 0, static_cast<jsize>(value_slice.size()),
+                          reinterpret_cast<const jbyte*>(value_slice.data()));
+  return jkeyValue;
 }
diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h
index 539e824e5..771223dba 100644
--- a/java/rocksjni/portal.h
+++ b/java/rocksjni/portal.h
@@ -19,6 +19,7 @@
 #include "rocksdb/filter_policy.h"
 #include "rocksdb/status.h"
 #include "rocksdb/utilities/backupable_db.h"
+#include "rocksdb/utilities/write_batch_with_index.h"
 #include "rocksjni/comparatorjnicallback.h"
 #include "rocksjni/writebatchhandlerjnicallback.h"
 
@@ -390,6 +391,37 @@ class WriteBatchHandlerJni {
   }
 };
 
+class WriteBatchWithIndexJni {
+ public:
+  static jclass getJClass(JNIEnv* env) {
+    jclass jclazz = env->FindClass("org/rocksdb/WriteBatchWithIndex");
+    assert(jclazz != nullptr);
+    return jclazz;
+  }
+
+  static jfieldID getHandleFieldID(JNIEnv* env) {
+    static jfieldID fid = env->GetFieldID(
+        getJClass(env), "nativeHandle_", "J");
+    assert(fid != nullptr);
+    return fid;
+  }
+
+  // Get the pointer to rocksdb::WriteBatchWithIndex of the specified
+  // org.rocksdb.WriteBatchWithIndex.
+  static rocksdb::WriteBatchWithIndex* getHandle(JNIEnv* env, jobject jwbwi) {
+    return reinterpret_cast<rocksdb::WriteBatchWithIndex*>(
+        env->GetLongField(jwbwi, getHandleFieldID(env)));
+  }
+
+  // Pass the rocksdb::WriteBatchWithIndex pointer to the java side.
+  static void setHandle(JNIEnv* env, jobject jwbwi,
+      rocksdb::WriteBatchWithIndex* wbwi) {
+    env->SetLongField(
+        jwbwi, getHandleFieldID(env),
+        reinterpret_cast<jlong>(wbwi));
+  }
+};
+
 class HistogramDataJni {
  public:
   static jmethodID getConstructorMethodId(JNIEnv* env, jclass jclazz) {
@@ -831,9 +863,153 @@ class BackupInfoListJni {
   }
 };
 
+class WBWIRocksIteratorJni {
+ public:
+    // Get the java class id of org.rocksdb.WBWIRocksIterator.
+    static jclass getJClass(JNIEnv* env) {
+      static jclass jclazz = env->FindClass("org/rocksdb/WBWIRocksIterator");
+      assert(jclazz != nullptr);
+      return jclazz;
+    }
+
+    static jfieldID getWriteEntryField(JNIEnv* env) {
+      static jfieldID fid =
+          env->GetFieldID(getJClass(env), "entry",
+          "Lorg/rocksdb/WBWIRocksIterator$WriteEntry;");
+      assert(fid != nullptr);
+      return fid;
+    }
+
+    static jobject getWriteEntry(JNIEnv* env, jobject jwbwi_rocks_iterator) {
+      jobject jwe =
+          env->GetObjectField(jwbwi_rocks_iterator, getWriteEntryField(env));
+      assert(jwe != nullptr);
+      return jwe;
+    }
+};
+
+class WriteTypeJni {
+ public:
+    // Get the PUT enum field of org.rocksdb.WBWIRocksIterator.WriteType
+    static jobject PUT(JNIEnv* env) {
+      return getEnum(env, "PUT");
+    }
+
+    // Get the MERGE enum field of org.rocksdb.WBWIRocksIterator.WriteType
+    static jobject MERGE(JNIEnv* env) {
+      return getEnum(env, "MERGE");
+    }
+
+    // Get the DELETE enum field of org.rocksdb.WBWIRocksIterator.WriteType
+    static jobject DELETE(JNIEnv* env) {
+      return getEnum(env, "DELETE");
+    }
+
+    // Get the LOG enum field of org.rocksdb.WBWIRocksIterator.WriteType
+    static jobject LOG(JNIEnv* env) {
+      return getEnum(env, "LOG");
+    }
+
+ private:
+    // Get the java class id of org.rocksdb.WBWIRocksIterator.WriteType.
+    static jclass getJClass(JNIEnv* env) {
+      // TODO(AR) setting the jclazz var to static causes getEnum to fail
+      // occasionally (e.g. in WriteBatchWithIndex#iterator() test) with
+      // SIGSEGV but I have no idea why...
+      jclass jclazz = env->FindClass("org/rocksdb/WBWIRocksIterator$WriteType");
+      assert(jclazz != nullptr);
+      return jclazz;
+    }
+
+    // Get an enum field of org.rocksdb.WBWIRocksIterator.WriteType
+    static jobject getEnum(JNIEnv* env, const char name[]) {
+      // TODO(AR) setting the jclazz var to static causes getEnum to fail
+      // occasionally (e.g. in WriteBatchWithIndex#iterator() test) with
+      // SIGSEGV but I have no idea why...
+      jclass jclazz = getJClass(env);
+      jfieldID jfid =
+          env->GetStaticFieldID(jclazz, name,
+          "Lorg/rocksdb/WBWIRocksIterator$WriteType;");
+      assert(jfid != nullptr);
+      return env->GetStaticObjectField(jclazz, jfid);
+    }
+};
+
+class WriteEntryJni {
+ public:
+    // Get the java class id of org.rocksdb.WBWIRocksIterator.WriteEntry.
+    static jclass getJClass(JNIEnv* env) {
+      static jclass jclazz =
+          env->FindClass("org/rocksdb/WBWIRocksIterator$WriteEntry");
+      assert(jclazz != nullptr);
+      return jclazz;
+    }
+
+    static void setWriteType(JNIEnv* env, jobject jwrite_entry,
+        WriteType write_type) {
+      jobject jwrite_type;
+      switch (write_type) {
+        case kPutRecord:
+          jwrite_type = WriteTypeJni::PUT(env);
+          break;
+
+        case kMergeRecord:
+          jwrite_type = WriteTypeJni::MERGE(env);
+          break;
+
+        case kDeleteRecord:
+          jwrite_type = WriteTypeJni::DELETE(env);
+          break;
+
+        case kLogDataRecord:
+          jwrite_type = WriteTypeJni::LOG(env);
+          break;
+
+        default:
+          jwrite_type = nullptr;
+      }
+      assert(jwrite_type != nullptr);
+      env->SetObjectField(jwrite_entry, getWriteTypeField(env), jwrite_type);
+    }
+
+    static void setKey(JNIEnv* env, jobject jwrite_entry,
+        const rocksdb::Slice* slice) {
+      jobject jkey = env->GetObjectField(jwrite_entry, getKeyField(env));
+      AbstractSliceJni::setHandle(env, jkey, slice);
+    }
+
+    static void setValue(JNIEnv* env, jobject jwrite_entry,
+        const rocksdb::Slice* slice) {
+      jobject jvalue = env->GetObjectField(jwrite_entry, getValueField(env));
+      AbstractSliceJni::setHandle(env, jvalue, slice);
+    }
+
+ private:
+    static jfieldID getWriteTypeField(JNIEnv* env) {
+      static jfieldID fid = env->GetFieldID(
+          getJClass(env), "type", "Lorg/rocksdb/WBWIRocksIterator$WriteType;");
+        assert(fid != nullptr);
+        return fid;
+    }
+
+    static jfieldID getKeyField(JNIEnv* env) {
+      static jfieldID fid = env->GetFieldID(
+          getJClass(env), "key", "Lorg/rocksdb/DirectSlice;");
+      assert(fid != nullptr);
+      return fid;
+    }
+
+    static jfieldID getValueField(JNIEnv* env) {
+      static jfieldID fid = env->GetFieldID(
+          getJClass(env), "value", "Lorg/rocksdb/DirectSlice;");
+      assert(fid != nullptr);
+      return fid;
+    }
+};
+
 class JniUtil {
  public:
-    /**
+    /*
      * Copies a jstring to a std::string
      * and releases the original jstring
      */
@@ -843,6 +1019,49 @@ class JniUtil {
       env->ReleaseStringUTFChars(js, utf);
       return name;
     }
+
+    /*
+     * Helper for operations on a key and value
+     * for example WriteBatch->Put
+     *
+     * TODO(AR) could be extended to cover returning rocksdb::Status
+     * from `op` and used for RocksDB->Put etc.
+     */
+    static void kv_op(
+        std::function<void(rocksdb::Slice, rocksdb::Slice)> op,
+        JNIEnv* env, jobject jobj,
+        jbyteArray jkey, jint jkey_len,
+        jbyteArray jentry_value, jint jentry_value_len) {
+      jbyte* key = env->GetByteArrayElements(jkey, nullptr);
+      jbyte* value = env->GetByteArrayElements(jentry_value, nullptr);
+      rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len);
+      rocksdb::Slice value_slice(reinterpret_cast<char*>(value),
+          jentry_value_len);
+
+      op(key_slice, value_slice);
+
+      env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
+      env->ReleaseByteArrayElements(jentry_value, value, JNI_ABORT);
+    }
+
+    /*
+     * Helper for operations on a key
+     * for example WriteBatch->Delete
+     *
+     * TODO(AR) could be extended to cover returning rocksdb::Status
+     * from `op` and used for RocksDB->Delete etc.
+     */
+    static void k_op(
+        std::function<void(rocksdb::Slice)> op,
+        JNIEnv* env, jobject jobj,
+        jbyteArray jkey, jint jkey_len) {
+      jbyte* key = env->GetByteArrayElements(jkey, nullptr);
+      rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len);
+
+      op(key_slice);
+
+      env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
+    }
 };
 
 }  // namespace rocksdb
diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc
index be70670ae..54eef7f53 100644
--- a/java/rocksjni/rocksjni.cc
+++ b/java/rocksjni/rocksjni.cc
@@ -390,18 +390,39 @@ void Java_org_rocksdb_RocksDB_put__JJ_3BI_3BIJ(
 // rocksdb::DB::Write
 /*
  * Class:     org_rocksdb_RocksDB
- * Method:    write
+ * Method:    write0
  * Signature: (JJ)V
  */
-void Java_org_rocksdb_RocksDB_write(
+void Java_org_rocksdb_RocksDB_write0(
     JNIEnv* env, jobject jdb,
-    jlong jwrite_options_handle, jlong jbatch_handle) {
+    jlong jwrite_options_handle, jlong jwb_handle) {
   rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, jdb);
-  auto write_options = reinterpret_cast<rocksdb::WriteOptions*>(
+  auto* write_options = reinterpret_cast<rocksdb::WriteOptions*>(
       jwrite_options_handle);
-  auto batch = reinterpret_cast<rocksdb::WriteBatch*>(jbatch_handle);
+  auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
 
-  rocksdb::Status s = db->Write(*write_options, batch);
+  rocksdb::Status s = db->Write(*write_options, wb);
+
+  if (!s.ok()) {
+    rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
+  }
+}
+
+/*
+ * Class:     org_rocksdb_RocksDB
+ * Method:    write1
+ * Signature: (JJ)V
+ */
+void Java_org_rocksdb_RocksDB_write1(
+    JNIEnv* env, jobject jdb,
+    jlong jwrite_options_handle, jlong jwbwi_handle) {
+  rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, jdb);
+  auto* write_options = reinterpret_cast<rocksdb::WriteOptions*>(
+      jwrite_options_handle);
+  auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(jwbwi_handle);
+  auto* wb = wbwi->GetWriteBatch();
+
+  rocksdb::Status s = db->Write(*write_options, wb);
 
   if (!s.ok()) {
     rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
@@ -1174,6 +1195,18 @@ jlongArray Java_org_rocksdb_RocksDB_iterators(
   return env->NewLongArray(0);
 }
 
+/*
+ * Class:     org_rocksdb_RocksDB
+ * Method:    getDefaultColumnFamily
+ * Signature: (J)J
+ */
+jlong Java_org_rocksdb_RocksDB_getDefaultColumnFamily(
+    JNIEnv* env, jobject jobj, jlong jdb_handle) {
+  auto* db_handle = reinterpret_cast<rocksdb::DB*>(jdb_handle);
+  auto* cf_handle = db_handle->DefaultColumnFamily();
+  return reinterpret_cast<jlong>(cf_handle);
+}
+
 /*
  * Class:     org_rocksdb_RocksDB
  * Method:    createColumnFamily
diff --git a/java/rocksjni/write_batch.cc b/java/rocksjni/write_batch.cc
index dbf2e25e2..20eb55407 100644
--- a/java/rocksjni/write_batch.cc
+++ b/java/rocksjni/write_batch.cc
@@ -41,10 +41,10 @@ void Java_org_rocksdb_WriteBatch_newWriteBatch(
 
 /*
  * Class:     org_rocksdb_WriteBatch
- * Method:    count
+ * Method:    count0
  * Signature: ()I
  */
-jint Java_org_rocksdb_WriteBatch_count(JNIEnv* env, jobject jobj) {
+jint Java_org_rocksdb_WriteBatch_count0(JNIEnv* env, jobject jobj) {
   rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj);
   assert(wb != nullptr);
 
@@ -53,42 +53,16 @@ jint Java_org_rocksdb_WriteBatch_count(JNIEnv* env, jobject jobj) {
 
 /*
  * Class:     org_rocksdb_WriteBatch
- * Method:    clear
+ * Method:    clear0
  * Signature: ()V
  */
-void Java_org_rocksdb_WriteBatch_clear(JNIEnv* env, jobject jobj) {
+void Java_org_rocksdb_WriteBatch_clear0(JNIEnv* env, jobject jobj) {
   rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj);
   assert(wb != nullptr);
 
   wb->Clear();
 }
 
-/*
- * Helper for WriteBatch put operations
- */
-void write_batch_put_helper(
-    JNIEnv* env, jobject jobj,
-    jbyteArray jkey, jint jkey_len,
-    jbyteArray jentry_value, jint jentry_value_len,
-    rocksdb::ColumnFamilyHandle* cf_handle) {
-  rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj);
-  assert(wb != nullptr);
-
-  jbyte* key = env->GetByteArrayElements(jkey, nullptr);
-  jbyte* value = env->GetByteArrayElements(jentry_value, nullptr);
-  rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len);
-  rocksdb::Slice value_slice(reinterpret_cast<char*>(value),
-      jentry_value_len);
-  if (cf_handle != nullptr) {
-    wb->Put(cf_handle, key_slice, value_slice);
-  } else {
-    // backwards compatibility
-    wb->Put(key_slice, value_slice);
-  }
-  env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
-  env->ReleaseByteArrayElements(jentry_value, value, JNI_ABORT);
-}
-
 /*
  * Class:     org_rocksdb_WriteBatch
  * Method:    put
@@ -98,8 +72,13 @@ void Java_org_rocksdb_WriteBatch_put___3BI_3BI(
     JNIEnv* env, jobject jobj,
     jbyteArray jkey, jint jkey_len,
     jbyteArray jentry_value, jint jentry_value_len) {
-  write_batch_put_helper(env, jobj, jkey, jkey_len, jentry_value,
-      jentry_value_len, nullptr);
+  auto* wb = rocksdb::WriteBatchJni::getHandle(env, jobj);
+  assert(wb != nullptr);
+  auto put = [&wb] (rocksdb::Slice key, rocksdb::Slice value) {
+    wb->Put(key, value);
+  };
+  rocksdb::JniUtil::kv_op(put, env, jobj, jkey, jkey_len, jentry_value,
+      jentry_value_len);
 }
 
 /*
@@ -111,35 +90,15 @@ void Java_org_rocksdb_WriteBatch_put___3BI_3BIJ(
     JNIEnv* env, jobject jobj,
     jbyteArray jkey, jint jkey_len,
     jbyteArray jentry_value, jint jentry_value_len, jlong jcf_handle) {
-  auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
-  write_batch_put_helper(env, jobj, jkey, jkey_len, jentry_value,
-      jentry_value_len, cf_handle);
-}
-
-/*
- * Helper for write batch merge operations
- */
-void write_batch_merge_helper(
-    JNIEnv* env, jobject jobj,
-    jbyteArray jkey, jint jkey_len,
-    jbyteArray jentry_value, jint jentry_value_len,
-    rocksdb::ColumnFamilyHandle* cf_handle) {
-  rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj);
+  auto* wb = rocksdb::WriteBatchJni::getHandle(env, jobj);
   assert(wb != nullptr);
-
-  jbyte* key = env->GetByteArrayElements(jkey, nullptr);
-  jbyte* value = env->GetByteArrayElements(jentry_value, nullptr);
-  rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len);
-  rocksdb::Slice value_slice(reinterpret_cast<char*>(value),
+  auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
+  assert(cf_handle != nullptr);
+  auto put = [&wb, &cf_handle] (rocksdb::Slice key, rocksdb::Slice value) {
+    wb->Put(cf_handle, key, value);
+  };
+  rocksdb::JniUtil::kv_op(put, env, jobj, jkey, jkey_len, jentry_value,
       jentry_value_len);
-  if (cf_handle != nullptr) {
-    wb->Merge(cf_handle, key_slice, value_slice);
-  } else {
-    // backwards compatibility
-    wb->Merge(key_slice, value_slice);
-  }
-  env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
-  env->ReleaseByteArrayElements(jentry_value, value, JNI_ABORT);
 }
 
 /*
@@ -151,8 +110,13 @@ void Java_org_rocksdb_WriteBatch_merge___3BI_3BI(
     JNIEnv* env, jobject jobj,
     jbyteArray jkey, jint jkey_len,
     jbyteArray jentry_value, jint jentry_value_len) {
-  write_batch_merge_helper(env, jobj, jkey, jkey_len,
-      jentry_value, jentry_value_len, nullptr);
+  auto* wb = rocksdb::WriteBatchJni::getHandle(env, jobj);
+  assert(wb != nullptr);
+  auto merge = [&wb] (rocksdb::Slice key, rocksdb::Slice value) {
+    wb->Merge(key, value);
+  };
+  rocksdb::JniUtil::kv_op(merge, env, jobj, jkey, jkey_len, jentry_value,
+      jentry_value_len);
 }
 
 /*
@@ -164,29 +128,15 @@ void Java_org_rocksdb_WriteBatch_merge___3BI_3BIJ(
     JNIEnv* env, jobject jobj,
     jbyteArray jkey, jint jkey_len,
     jbyteArray jentry_value, jint jentry_value_len, jlong jcf_handle) {
-  auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
-  write_batch_merge_helper(env, jobj, jkey, jkey_len,
-      jentry_value, jentry_value_len, cf_handle);
-}
-
-/*
- * Helper for write batch remove operations
- */
-void write_batch_remove_helper(
-    JNIEnv* env, jobject jobj,
-    jbyteArray jkey, jint jkey_len,
-    rocksdb::ColumnFamilyHandle* cf_handle) {
-  rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj);
+  auto* wb = rocksdb::WriteBatchJni::getHandle(env, jobj);
   assert(wb != nullptr);
-
-  jbyte* key = env->GetByteArrayElements(jkey, nullptr);
-  rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len);
-  if (cf_handle != nullptr) {
-    wb->Delete(cf_handle, key_slice);
-  } else {
-    wb->Delete(key_slice);
-  }
-  env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
+  auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
+  assert(cf_handle != nullptr);
+  auto merge = [&wb, &cf_handle] (rocksdb::Slice key, rocksdb::Slice value) {
+    wb->Merge(cf_handle, key, value);
+  };
+  rocksdb::JniUtil::kv_op(merge, env, jobj, jkey, jkey_len, jentry_value,
+      jentry_value_len);
 }
 
 /*
@@ -197,7 +147,12 @@ void write_batch_remove_helper(
 void Java_org_rocksdb_WriteBatch_remove___3BI(
     JNIEnv* env, jobject jobj,
     jbyteArray jkey, jint jkey_len) {
-  write_batch_remove_helper(env, jobj, jkey, jkey_len, nullptr);
+  auto* wb = rocksdb::WriteBatchJni::getHandle(env, jobj);
+  assert(wb != nullptr);
+  auto remove = [&wb] (rocksdb::Slice key) {
+    wb->Delete(key);
+  };
+  rocksdb::JniUtil::k_op(remove, env, jobj, jkey, jkey_len);
 }
 
 /*
@@ -208,8 +163,14 @@ void Java_org_rocksdb_WriteBatch_remove___3BI(
 void Java_org_rocksdb_WriteBatch_remove___3BIJ(
     JNIEnv* env, jobject jobj,
     jbyteArray jkey, jint jkey_len, jlong jcf_handle) {
-  auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
-  write_batch_remove_helper(env, jobj, jkey, jkey_len, cf_handle);
+  auto* wb = rocksdb::WriteBatchJni::getHandle(env, jobj);
+  assert(wb != nullptr);
+  auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
+  assert(cf_handle != nullptr);
+  auto remove = [&wb, &cf_handle] (rocksdb::Slice key) {
+    wb->Delete(cf_handle, key);
+  };
+  rocksdb::JniUtil::k_op(remove, env, jobj, jkey, jkey_len);
 }
 
 /*
@@ -219,13 +180,12 @@ void Java_org_rocksdb_WriteBatch_remove___3BIJ(
  */
 void Java_org_rocksdb_WriteBatch_putLogData(
     JNIEnv* env, jobject jobj, jbyteArray jblob, jint jblob_len) {
-  rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj);
+  auto* wb = rocksdb::WriteBatchJni::getHandle(env, jobj);
   assert(wb != nullptr);
-
-  jbyte* blob = env->GetByteArrayElements(jblob, nullptr);
-  rocksdb::Slice blob_slice(reinterpret_cast<char*>(blob), jblob_len);
-  wb->PutLogData(blob_slice);
-  env->ReleaseByteArrayElements(jblob, blob, JNI_ABORT);
+  auto putLogData = [&wb] (rocksdb::Slice blob) {
+    wb->PutLogData(blob);
+  };
+  rocksdb::JniUtil::k_op(putLogData, env, jobj, jblob, jblob_len);
 }
 
 /*
diff --git a/java/rocksjni/write_batch_with_index.cc b/java/rocksjni/write_batch_with_index.cc
new file mode 100644
index 000000000..92f2ec068
--- /dev/null
+++ b/java/rocksjni/write_batch_with_index.cc
@@ -0,0 +1,378 @@
+// Copyright (c) 2014, Facebook, Inc.  All rights reserved.
+// This source code is licensed under the BSD-style license found in the
+// LICENSE file in the root directory of this source tree. An additional grant
+// of patent rights can be found in the PATENTS file in the same directory.
+//
+// This file implements the "bridge" between Java and C++ and enables
+// calling c++ rocksdb::WriteBatchWithIndex methods from Java side.
+
+#include "include/org_rocksdb_WBWIRocksIterator.h"
+#include "include/org_rocksdb_WriteBatchWithIndex.h"
+#include "rocksdb/comparator.h"
+#include "rocksdb/utilities/write_batch_with_index.h"
+#include "rocksjni/portal.h"
+
+/*
+ * Class:     org_rocksdb_WriteBatchWithIndex
+ * Method:    newWriteBatchWithIndex
+ * Signature: ()V
+ */
+void Java_org_rocksdb_WriteBatchWithIndex_newWriteBatchWithIndex__(
+    JNIEnv* env, jobject jobj) {
+  rocksdb::WriteBatchWithIndex* wbwi = new rocksdb::WriteBatchWithIndex();
+  rocksdb::WriteBatchWithIndexJni::setHandle(env, jobj, wbwi);
+}
+
+/*
+ * Class:     org_rocksdb_WriteBatchWithIndex
+ * Method:    newWriteBatchWithIndex
+ * Signature: (Z)V
+ */
+void Java_org_rocksdb_WriteBatchWithIndex_newWriteBatchWithIndex__Z(
+    JNIEnv* env, jobject jobj, jboolean joverwrite_key) {
+  rocksdb::WriteBatchWithIndex* wbwi =
+      new rocksdb::WriteBatchWithIndex(rocksdb::BytewiseComparator(), 0,
+      static_cast<bool>(joverwrite_key));
+  rocksdb::WriteBatchWithIndexJni::setHandle(env, jobj, wbwi);
+}
+
+/*
+ * Class:     org_rocksdb_WriteBatchWithIndex
+ * Method:    newWriteBatchWithIndex
+ * Signature: (JIZ)V
+ */
+void Java_org_rocksdb_WriteBatchWithIndex_newWriteBatchWithIndex__JIZ(
+    JNIEnv* env, jobject jobj, jlong jfallback_index_comparator_handle,
+    jint jreserved_bytes, jboolean joverwrite_key) {
+  rocksdb::WriteBatchWithIndex* wbwi =
+      new rocksdb::WriteBatchWithIndex(
+      reinterpret_cast<rocksdb::Comparator*>(jfallback_index_comparator_handle),
+      static_cast<size_t>(jreserved_bytes), static_cast<bool>(joverwrite_key));
+  rocksdb::WriteBatchWithIndexJni::setHandle(env, jobj, wbwi);
+}
+
+/*
+ * Class:     org_rocksdb_WriteBatchWithIndex
+ * Method:    count
+ * Signature: ()I
+ */
+jint Java_org_rocksdb_WriteBatchWithIndex_count0(
+    JNIEnv* env, jobject jobj) {
+  rocksdb::WriteBatchWithIndex* wbwi =
+      rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj);
+  assert(wbwi != nullptr);
+
+  return static_cast<jint>(wbwi->GetWriteBatch()->Count());
+}
+
+/*
+ * Class:     org_rocksdb_WriteBatchWithIndex
+ * Method:    put
+ * Signature: ([BI[BI)V
+ */
+void Java_org_rocksdb_WriteBatchWithIndex_put___3BI_3BI(
+    JNIEnv* env, jobject jobj, jbyteArray jkey, jint jkey_len,
+    jbyteArray jentry_value, jint jentry_value_len) {
+  auto* wbwi =
+      rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj);
+  assert(wbwi != nullptr);
+  auto put = [&wbwi] (rocksdb::Slice key, rocksdb::Slice value) {
+    wbwi->Put(key, value);
+  };
+  rocksdb::JniUtil::kv_op(put, env, jobj, jkey, jkey_len, jentry_value,
+      jentry_value_len);
+}
+
+/*
+ * Class:     org_rocksdb_WriteBatchWithIndex
+ * Method:    put
+ * Signature: ([BI[BIJ)V
+ */
+void Java_org_rocksdb_WriteBatchWithIndex_put___3BI_3BIJ(
+    JNIEnv* env, jobject jobj, jbyteArray jkey, jint jkey_len,
+    jbyteArray jentry_value, jint jentry_value_len, jlong jcf_handle) {
+  auto* wbwi =
+      rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj);
+  assert(wbwi != nullptr);
+  auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
+  assert(cf_handle != nullptr);
+  auto put = [&wbwi, &cf_handle] (rocksdb::Slice key, rocksdb::Slice value) {
+    wbwi->Put(cf_handle, key, value);
+  };
+  rocksdb::JniUtil::kv_op(put, env, jobj, jkey, jkey_len, jentry_value,
+      jentry_value_len);
+}
+
+/*
+ * Class:     org_rocksdb_WriteBatchWithIndex
+ * Method:    merge
+ * Signature: ([BI[BI)V
+ */
+void Java_org_rocksdb_WriteBatchWithIndex_merge___3BI_3BI(
+    JNIEnv* env, jobject jobj, jbyteArray jkey, jint jkey_len,
+    jbyteArray jentry_value, jint jentry_value_len) {
+  auto* wbwi =
+      rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj);
+  assert(wbwi != nullptr);
+  auto merge = [&wbwi] (rocksdb::Slice key, rocksdb::Slice value) {
+    wbwi->Merge(key, value);
+  };
+  rocksdb::JniUtil::kv_op(merge, env, jobj, jkey, jkey_len, jentry_value,
+      jentry_value_len);
+}
+
+/*
+ * Class:     org_rocksdb_WriteBatchWithIndex
+ * Method:    merge
+ * Signature: ([BI[BIJ)V
+ */
+void Java_org_rocksdb_WriteBatchWithIndex_merge___3BI_3BIJ(
+    JNIEnv* env, jobject jobj, jbyteArray jkey, jint jkey_len,
+    jbyteArray jentry_value, jint jentry_value_len, jlong jcf_handle) {
+  auto* wbwi =
+      rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj);
+  assert(wbwi != nullptr);
+  auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
+  assert(cf_handle != nullptr);
+  auto merge = [&wbwi, &cf_handle] (rocksdb::Slice key, rocksdb::Slice value) {
+    wbwi->Merge(cf_handle, key, value);
+  };
+  rocksdb::JniUtil::kv_op(merge, env, jobj, jkey, jkey_len, jentry_value,
+      jentry_value_len);
+}
+
+/*
+ * Class:     org_rocksdb_WriteBatchWithIndex
+ * Method:    remove
+ * Signature: ([BI)V
+ */
+void Java_org_rocksdb_WriteBatchWithIndex_remove___3BI(
+    JNIEnv* env, jobject jobj, jbyteArray jkey, jint jkey_len) {
+  auto* wbwi =
+      rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj);
+  assert(wbwi != nullptr);
+  auto remove = [&wbwi] (rocksdb::Slice key) {
+    wbwi->Delete(key);
+  };
+  rocksdb::JniUtil::k_op(remove, env, jobj, jkey, jkey_len);
+}
+
+/*
+ * Class:     org_rocksdb_WriteBatchWithIndex
+ * Method:    remove
+ * Signature: ([BIJ)V
+ */
+void Java_org_rocksdb_WriteBatchWithIndex_remove___3BIJ(
+    JNIEnv* env, jobject jobj,
+    jbyteArray jkey, jint jkey_len, jlong jcf_handle) {
+  auto* wbwi =
+      rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj);
+  assert(wbwi != nullptr);
+  auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
+  assert(cf_handle != nullptr);
+  auto remove = [&wbwi, &cf_handle] (rocksdb::Slice key) {
+    wbwi->Delete(cf_handle, key);
+  };
+  rocksdb::JniUtil::k_op(remove, env, jobj, jkey, jkey_len);
+}
+
+/*
+ * Class:     org_rocksdb_WriteBatchWithIndex
+ * Method:    putLogData
+ * Signature: ([BI)V
+ */
+void Java_org_rocksdb_WriteBatchWithIndex_putLogData(
+    JNIEnv* env, jobject jobj, jbyteArray jblob, jint jblob_len) {
+  auto* wbwi =
+      rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj);
+  assert(wbwi != nullptr);
+  auto putLogData = [&wbwi] (rocksdb::Slice blob) {
+    wbwi->PutLogData(blob);
+  };
+  rocksdb::JniUtil::k_op(putLogData, env, jobj, jblob, jblob_len);
+}
+
+/*
+ * Class:     org_rocksdb_WriteBatchWithIndex
+ * Method:    clear
+ * Signature: ()V
+ */
+void Java_org_rocksdb_WriteBatchWithIndex_clear0(
+    JNIEnv* env, jobject jobj) {
+  rocksdb::WriteBatchWithIndex* wbwi =
+      rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj);
+  assert(wbwi != nullptr);
+
+  wbwi->GetWriteBatch()->Clear();
+}
+
+/*
+ * Class:     org_rocksdb_WriteBatchWithIndex
+ * Method:    iterator0
+ * Signature: ()J
+ */
+jlong Java_org_rocksdb_WriteBatchWithIndex_iterator0(
+    JNIEnv* env, jobject jobj) {
+  rocksdb::WriteBatchWithIndex* wbwi =
+      rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj);
+  rocksdb::WBWIIterator* wbwi_iterator = wbwi->NewIterator();
+  return reinterpret_cast<jlong>(wbwi_iterator);
+}
+
+/*
+ * Class:     org_rocksdb_WriteBatchWithIndex
+ * Method:    iterator1
+ * Signature: (J)J
+ */
+jlong Java_org_rocksdb_WriteBatchWithIndex_iterator1(
+    JNIEnv* env, jobject jobj, jlong jcf_handle) {
+  rocksdb::WriteBatchWithIndex* wbwi =
+      rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj);
+  auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
+  rocksdb::WBWIIterator* wbwi_iterator = wbwi->NewIterator(cf_handle);
+  return reinterpret_cast<jlong>(wbwi_iterator);
+}
+
+/*
+ * Class:     org_rocksdb_WriteBatchWithIndex
+ * Method:    iteratorWithBase
+ * Signature: (JJ)J
+ */
+jlong Java_org_rocksdb_WriteBatchWithIndex_iteratorWithBase(
+    JNIEnv* env, jobject jobj, jlong jcf_handle, jlong jbi_handle) {
+  rocksdb::WriteBatchWithIndex* wbwi =
+      rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj);
+  auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
+  auto* base_iterator = reinterpret_cast<rocksdb::Iterator*>(jbi_handle);
+  auto* iterator = wbwi->NewIteratorWithBase(cf_handle, base_iterator);
+  return reinterpret_cast<jlong>(iterator);
+}
+
+/*
+ * Class:     org_rocksdb_WriteBatchWithIndex
+ * Method:    disposeInternal
+ * Signature: (J)V
+ */
+void Java_org_rocksdb_WriteBatchWithIndex_disposeInternal(
+    JNIEnv* env, jobject jobj, jlong handle) {
+  auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(handle);
+  delete wbwi;
+}
+
+/* WBWIRocksIterator below */
+
+/*
+ * Class:     org_rocksdb_WBWIRocksIterator
+ * Method:    disposeInternal
+ * Signature: (J)V
+ */
+void Java_org_rocksdb_WBWIRocksIterator_disposeInternal(
+    JNIEnv* env, jobject jobj, jlong handle) {
+  auto* it = reinterpret_cast<rocksdb::WBWIIterator*>(handle);
+  delete it;
+}
+
+/*
+ * Class:     org_rocksdb_WBWIRocksIterator
+ * Method:    isValid0
+ * Signature: (J)Z
+ */
+jboolean Java_org_rocksdb_WBWIRocksIterator_isValid0(
+    JNIEnv* env, jobject jobj, jlong handle) {
+  return reinterpret_cast<rocksdb::WBWIIterator*>(handle)->Valid();
+}
+
+/*
+ * Class:     org_rocksdb_WBWIRocksIterator
+ * Method:    seekToFirst0
+ * Signature: (J)V
+ */
+void Java_org_rocksdb_WBWIRocksIterator_seekToFirst0(
+    JNIEnv* env, jobject jobj, jlong handle) {
+  reinterpret_cast<rocksdb::WBWIIterator*>(handle)->SeekToFirst();
+}
+
+/*
+ * Class:     org_rocksdb_WBWIRocksIterator
+ * Method:    seekToLast0
+ * Signature: (J)V
+ */
+void Java_org_rocksdb_WBWIRocksIterator_seekToLast0(
+    JNIEnv* env, jobject jobj, jlong handle) {
+  reinterpret_cast<rocksdb::WBWIIterator*>(handle)->SeekToLast();
+}
+
+/*
+ * Class:     org_rocksdb_WBWIRocksIterator
+ * Method:    next0
+ * Signature: (J)V
+ */
+void Java_org_rocksdb_WBWIRocksIterator_next0(
+    JNIEnv* env, jobject jobj, jlong handle) {
+  reinterpret_cast<rocksdb::WBWIIterator*>(handle)->Next();
+}
+
+/*
+ * Class:     org_rocksdb_WBWIRocksIterator
+ * Method:    prev0
+ * Signature: (J)V
+ */
+void Java_org_rocksdb_WBWIRocksIterator_prev0(
+    JNIEnv* env, jobject jobj, jlong handle) {
+  reinterpret_cast<rocksdb::WBWIIterator*>(handle)->Prev();
+}
+
+/*
+ * Class:     org_rocksdb_WBWIRocksIterator
+ * Method:    seek0
+ * Signature: (J[BI)V
+ */
+void Java_org_rocksdb_WBWIRocksIterator_seek0(
+    JNIEnv* env, jobject jobj, jlong handle, jbyteArray jtarget,
+    jint jtarget_len) {
+  auto* it = reinterpret_cast<rocksdb::WBWIIterator*>(handle);
+  jbyte* target = env->GetByteArrayElements(jtarget, 0);
+  rocksdb::Slice target_slice(
+      reinterpret_cast<char*>(target), jtarget_len);
+
+  it->Seek(target_slice);
+
+  env->ReleaseByteArrayElements(jtarget, target, JNI_ABORT);
+}
+
+/*
+ * Class:     org_rocksdb_WBWIRocksIterator
+ * Method:    status0
+ * Signature: (J)V
+ */
+void Java_org_rocksdb_WBWIRocksIterator_status0(
+    JNIEnv* env, jobject jobj, jlong handle) {
+  auto* it = reinterpret_cast<rocksdb::WBWIIterator*>(handle);
+  rocksdb::Status s = it->status();
+
+  if (s.ok()) {
+    return;
+  }
+
+  rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
+}
+
+/*
+ * Class:     org_rocksdb_WBWIRocksIterator
+ * Method:    entry1
+ * Signature: (JLorg/rocksdb/WBWIRocksIterator/WriteEntry;)V
+ */
+void Java_org_rocksdb_WBWIRocksIterator_entry1(
+    JNIEnv* env, jobject jobj, jlong handle, jobject jwrite_entry) {
+  auto* it = reinterpret_cast<rocksdb::WBWIIterator*>(handle);
+  const rocksdb::WriteEntry& we = it->Entry();
+  jobject jwe = rocksdb::WBWIRocksIteratorJni::getWriteEntry(env, jobj);
+  rocksdb::WriteEntryJni::setWriteType(env, jwe, we.type);
+  rocksdb::WriteEntryJni::setKey(env, jwe, &we.key);
+  if (we.type == rocksdb::kDeleteRecord || we.type == rocksdb::kLogDataRecord) {
+    // set native handle of value slice to null if no value available
+    rocksdb::WriteEntryJni::setValue(env, jwe, NULL);
+  } else {
+    rocksdb::WriteEntryJni::setValue(env, jwe, &we.value);
+  }
+}