Friday, March 12, 2010

Awesomely fast key value store design

I've got interesting idea for awesomely fast key value store. 

Writing data:
  1. When data change request comes to Client API it stores changed data in Temp Storage. Also change request gets stored to Persistent Queue.
  2. Queue Processing Job gets queued update and applies it to data in persistent storage. 

Reading data:
  1. Read request comes to DB Client API , it checks if data for this request available in Temp Storage . If it is then use it, otherwise go to Persistent Store. 

This approach allows to write to distributed DB with speed of writing to persistent queue, which is usually way faster then DB updates. And read can be easily scaled by adding additional replicas.

Potential issues with this design. 
  1. Temp Storage eventually will start overflowing. It`s hard to get memcached storage capacity as big as Persistent Storage capacity. When it happens DB Client going to fallback to Persistent DB for records that been pushed out of Temp Storage, we need to make sure that Queue Processing is done for the matching queued records. 
  2. Temp Storage based on memcached is not that reliable and if it goes down we might loose data consistency for short period of time until current data in queue will be propagated to persistent DB. It`s reliability can be improved, but let`s look at what might happen in this scenario. First thing that will happen is that users will temporary loose their changes for records that still in the queue. This might not be that bad considering it'll happen just for short period of time. But if user that already had change waiting in the queue in that moment will submit yet another change to system it might lead to situation wen change will be permanently lost. 
  3. This design relies very heavily on Queue Processing job to be reliable and fast enough. So it should be well designed. On other hand this design allows Queue Processing job to be temporarily stopped (for some maintenance tasks) without affecting end user. That is as long as Temporary storage is big enough and job is fast enough to  catch up with queued changes later.
  4. As any key value storage this design has difficulty dealing with concurrent updates in same record. 


Thursday, February 18, 2010

Tokyo Tyran/Memcached compatiblity issue

Recently I was playing with using Tokyo Tyrant with Spy Memcached client. And discovered that according to Tokyo Tyrant docs: "flags", "exptime", and "cas unique" parameters are ignored. And this is causing Spy memcached client to not be able operate with serialized objects correctly. basically whenever I save serialized object there I get String object in return.
After some hacking I managed to come up with this little class that stores "flag" data into byte array itself:

import net.spy.memcached.CachedData;
import net.spy.memcached.transcoders.SerializingTranscoder;

/**
* TTSerializingTranscoder makes spymemcached client work correctly with TokyoTyrant by working around fact that
* Tokyo Tyrant is not storing put metadata as defined in memcached protocol
*/
public class TTSerializingTranscoder extends SerializingTranscoder {

@Override
public Object decode(CachedData d) {
byte[] result = d.getData();
byte[] data = new byte[result.length - 4];
byte[] flag = new byte[4];
System.arraycopy(result, 0, flag, 0, 4);
System.arraycopy(result, 4, data, 0, result.length - 4);
int flags = byteArrayToInt(flag);
return super.decode(new CachedData(flags, data, getMaxSize()));
}

public CachedData encode(Object o) {
final CachedData res = super.encode(o);
byte[] b = res.getData();
byte[] data = new byte[b.length + 4];
final int flags = res.getFlags();
System.arraycopy(intToByteArray(flags), 0, data, 0, 4);
System.arraycopy(b, 0, data, 4, b.length);
return new CachedData(flags, data, getMaxSize());
}

public static byte[] intToByteArray(int value) {
return new byte[]{
(byte) (value >>> 24),
(byte) (value >>> 16),
(byte) (value >>> 8),
(byte) value};
}


public static int byteArrayToInt(byte[] b) {
return (b[0] << 24)
+ ((b[1] & 0xFF) << 16)
+ ((b[2] & 0xFF) << 8)
+ (b[3] & 0xFF);
}
}


to use this class I had to extend DefaultConnectionFactory (see code below) and pass it to MemcachedClient constructor (new MemcachedClient(new SpyConnectionFactory(TTSerializingTranscoder.class)));)



import net.spy.memcached.DefaultConnectionFactory;
import net.spy.memcached.transcoders.SerializingTranscoder;
import net.spy.memcached.transcoders.Transcoder;

/**
*
*/
public class SpyConnectionFactory extends DefaultConnectionFactory {
private Class transcoderClass = SerializingTranscoder.class;

public SpyConnectionFactory() {
}

public SpyConnectionFactory(Class transcoderClass) {
this.transcoderClass = transcoderClass;
}

@Override
public Transcoder<Object> getDefaultTranscoder() {
try {
//noinspection unchecked
return transcoderClass.newInstance();
} catch (Exception e) {
throw new RuntimeException("Failed to create transcoder for " + transcoderClass, e);
}
}

public Class getTranscoderClass() {
return transcoderClass;
}

public void setTranscoderClass(Class transcoderClass) {
this.transcoderClass = transcoderClass;
}
}

Followers