Monday, April 30, 2012

From NOSQL to MySql (BDB to mysql) - Part3

We finally migrated 14 nodes this weekend. As there were millions of files to be migrated even after running 5 parallel thread on each node it took close to 3-4 hours per node. We were running 3 nodes at a time otherwise the db load was shooting up high. As we cant afford to have a downtime on weekdays, the migration has to happen on weekend nights. On Friday night it we were up till 6:00 AM and on Saturday night we were up till 4:00 AM. We wanted to do more nodes but the no of files per host was going overboard and I wanted to be conservative to start with, if the mysql servers can handle more then later we would consolidate shards or move them to this host. New mysql hosts are going to be provisioned next week so hoping to migrate more nodes this week.  Monday was a calm day after long time as we chose all nodes that were spiking to be migrated first.

Db loads in all DCs are low and even app nodes are low, Surprisingly slave dbs are getting pounded more than master db after last week's optimisation in code to move some heavily used queries to slave that can handle stale data,  a good sign as we can easily add more slaves to spread the load.

Master db load on one DC

Slave DB load on one DC


Mysql Sharding at our company- Part1

Sharding is a double edged sword, on one hand it allows you to scale your applications with increased growth in customers and on other hand it very hard for junior developers to grasp the concept. I try to abstract and encapsulate as much complexity as I can in the framework.

Typically people either do
1. Functional Sharding/Vertical sharding: For huge datasets this will only buy you time. When I joined the company everything was stored in Berkely db or Lucene or in some files on filesystems. I tried moving all the things to Mysql in one project but it became a monster so I started moving pieces of applications out into mysql and moving them to production. This also boosted confidence of team in mysql as more feature went live on Mysql and they saw that Mysql was reliable and scalable. I started with smaller pieces that didn't had >10-20M rows but needs mysql for its ACID properties. Anticipating the growth we decided to create one schema per functionality and avoided joins between two schemas by joining them in application if required.  This way in case one feature became hot and start using Mysql like crazy then we can easily move that schema to its own server or add more slaves for that schema.

  1. Easier to understand for developers
  2. Local development can happen on one mysql server and deployment can happen in different topology.
  3. You can keep on adding more slave if the app is less write intensive or you can keep updating to new hardware and more RAM.
  1. For bigger datasets that cant fit in one server, this will only buy you some more time to move to Horizontal sharding.
2. Horizontal sharding : In this sharding approach you basically distribute your data across many mysql servers. There are various ways to do this also:
  1. By Customer or user : When a customer registers you assign him a shard and all his data lives on that mysql shard.  Joins are easy in this approach but hotspots become and issue and you will need to redistribute the shards on different hosts.
  2. By hash on key: In this approach you distribute the data for a customer across many shards. Each record is assigned its own shards.  There will rarely be need to redistribute shards here but joins are pain as you have do them in application.
We use both sharding approaches.
Sharding by Customer or user : In our metadata db for files/folders we assign a shard to a customer on registration. We had to do this because most of our metadata queries require joins and I could not find an easy way to spread this across multiple servers in the allotted time. We find the four least used shards and randomly pick one of them and assign the customer to it. We store the customer to shard mapping in a global db (a.ka dns or cluster metadata db). Each incoming request looks up  the shard Id by customer and looks up a connection pool out of pool of connections and executes the request. I have described it here

We divide our servers into clusters so we can update them when any non backward compatible changes is introduced and also so that we can manage migrations or do rolling updates (topic of next blog post). Several clusters can live on same host or we can do one cluster per host or we can split one cluster on to many hosts.

Customer acme lives on shard 64 which lives on physical shard 5. 
physical shard 5 lives on host1 schema2.
host1 belongs to cluster C1  

Sharding by hash on key:In our S3 clone database (Instead of using S3 for storage we use filers for storage and Mysql for object metadata ) we use the other approach of sharding where each object is assigned a shard id based on least used shard and the shardId is embedded in the Object key itself. We could spread this into multiple servers because all queries are by objectId and do not require any joins at all. We maintain a logical shard to physical shard mapping here in case we need  to move the shard to some other host or redistribute it. Whenever an object needs to be looked up we parse the objectid and get the logical shard id, from this  we lookup the physical shard id and get a connection pool and do the query there.

ObjectId = 32.XXXXXXX maps to logical shard 32.
logical shard 32 lives on physical shard 4
physical shard 4 lives on host2, schema5

In next parts of the series I will cover more details on how the logical and physical shard mapping is done and the reasons behind the cluster approach.

Part1 of series
Part2 of series
Part3 of series
Part4 of series 

Thursday, April 26, 2012

From NOSQL to MySql (BDB to mysql) - Part2

So we are now in third week of Mysql deployment and slowly we are moving nodes from BDB to Mysql  Results so far have been promising.
1) We had ran into some query optimisation issues with Mysql that I blogged here
2) Just ran into a 4 byte unicode issue with Mysql where utf8 with mysql wont support 4 bytes characters and apparently you have to use utf8mb4 for it, how stupid of Mysql.  Will write a separate blog post for it.

But as far as performance is concerned its rocking so far. On 20th we did the deployment and as soon as the node went live with Mysql you can see immediate drop in load avg consistently for this week. This is because most of the processing that was done by in memory db on app node is now transferred to Mysql.  But in second graph you would see that Mysql is sitting ducks and its no where close to app node load avg. Now this is with 3 nodes pounding, we would add 10+ nodes to start with on each Mysql and then consolidate later as I want to start conservative.

Biggest benefit we would  get is instead of managing 120+ BDBs on each node we would manage 10 Mysql servers and later we would even consolidate them once we see them sitting idle. I will share more details on sharding architecture I came up with in another blog post soon.
App Node Load Average
Mysql server load average


Saturday, April 21, 2012

Quartz delete/unschedule a job at runtime

I had a requirement to remove a quartz job from a tomcat without restarting it. So best solution is to write a jsp to do this. I had earlier written a Quartz Admin jsp blog so I went and added a Delete button to that jsp and it appears all I need to do is call
    if("Delete".equals(btnInterrupt)) {
         scheduler.deleteJob(jobNameToRun, groupNameToRun);
    Job <%=jobNameToRun%> Deleted.


There is a unschedule  api also but that will only unschedule the job's next execution and I wanted to remove the job.  When tomcat is restarted the job will come back.

Friday, April 20, 2012

Adding Jitter to cache layer

Thundering herd is an issue common to webapp that rely on heavy caching where if lots of items expire at the same time due to a server restart or temporal event, then suddenly lots of calls will go to database at same time. This can even bring down the database in extreme cases. I wont go into much detail but the app need to do two things solve this issue.

1) Add consistent hashing to cache layer : This way when a memcache server is added/removed from the pool, entire cache is not invalidated.  We use memcahe from both python and Java layer and I still have to find a consistent caching solution that is portable across both languages. hash_ring and spymemcached both use different points for server so need to read/test more.
2) Add a jitter to cache or randomise the expiry time: We expire long term cache  records every 8 hours after that key was added and short term cache expiry is 2 hours. As our customers usually comes to work in morning and access the cloud file server it can happen that lots of them access at same time and populate short term cache and this cache may get invalidated at same time, causing lots of db access at same time. To avoid this we expire keys randomly between 2 hours and 2 hours 10 minutes.

Monday, April 16, 2012

shell file $10 parameter

Normally you don't have a shell file that calls a java program with 10 parameters but I had a migration script had 10 arguments and I kept getting first parameter in the 10th argument as shell was seeing $10 and it was passing first param concatenated with 0. The solution is to enclose all parameters > 9 with curly brackets.

so use something like

$JAVA_HOME/bin/java -verbose:gc -Duser.timezone=GMT -Dfile.encoding=UTF8 -server -Xms512m -Xmx2048m $1 $2 $3 $4 $5 $6 $7 $8 $9 ${10} ${11} ${12}

instead of

$JAVA_HOME/bin/java -verbose:gc -Duser.timezone=GMT -Dfile.encoding=UTF8 -server -Xms512m -Xmx2048m $1 $2 $3 $4 $5 $6 $7 $8 $9 $10 $11 $12

Sunday, April 15, 2012

mysql subquery multiple columns

if you need a subquery to return multiple columns for comparison purpose then its allowed so you can do something like

select x from table1 where (a,b)= (select a,b from table2)

but if you want something like

select (select a,b from table1 where  table1.x=table2.x) fields, table2.y, table2.z from table2 join table3 on where table2.xx=10

then mysql wont allow you to select 2 columns in the first subquery.

To avoid this you can turn the query into a join query and write a wrapper query on top like

select a,b,y,z from (table1 join table2 on table1.x =table2.x) join table3 on where table2.xx=10

but I ran into an issue on weekend where table1 had 1.4M rows and table 2 had 40K rows and table3 had 3K rows.  Now joining 1.4M to 44K rows was not coming out of Mysql database even in 10 minutes so this approach was out of question. The final result set size was 44K.

So finally the best solution I could find on weekend to fix this to concatenate columns and split them in java code.

select (select concat(a,'|',b) from table1 where  table1.x=table2.x) fields, table2.y, table2.z from table2 join table3 on where table2.xx=10

Friday, April 6, 2012

java Spring Shard datasource with Mysql/Oracle

If you are implementing database sharding and using Spring JDBC then you are out of luck to using declarative transactions and find a Datasource with Spring that would handle sharding. I had to implement my own Datasource manager and own annotations to use declarative kind of transactions to hide complexities from average developers.  Its very important to abstract out cross cutting concerns as sharding and transactions so that any junior developers wont be confused and start copying code left and right without understanding the impact of their changes globally. 

So the idea is that
1) You would implement a ShardDataSourceManager that would be basically pool of connection pools and you would lookup a datasource by shard id.
2)You would define your own Transactional annotations and annotate methods with it
3) You need to write an interceptor at dao layer that would read annotations on method and some context info. From the context info you would lookup shard id and lookup datasource and inject into a thread local.
4)The dao layer when it looks up datasource would look into thread local to construct a jdbc template and execute queries on it.

Here is a sample ShardDataSourceManager, ShardTransactional Annotation

public @interface ShardTransactional {
      public abstract boolean readOnly() default false;


public class ShardTransactionInterceptor implements MethodInterceptor {
    private static final AppLogger logger = AppLogger.getLogger(ShardTransactionInterceptor.class);
    private static ThreadLocal dataSourceThreadLocal = new ThreadLocal();
    private ShardDataSourceManager shardDataSourceManager;
    public ShardDataSourceManager getShardDataSourceManager() {
        return shardDataSourceManager;

    public void setShardDataSourceManager(ShardDataSourceManager shardDataSourceManager) {
        this.shardDataSourceManager = shardDataSourceManager;

    public Object invoke(final MethodInvocation method) throws Throwable {
        if (method.getMethod().isAnnotationPresent(ShardTransactional.class)) {
            try {
                ShardTransactional annotation = method.getMethod().getAnnotation(ShardTransactional.class);
                User user = getParam(method, User.class);
                if (user == null) {
                    throw new IllegalStateException("All transactional methods must have user argument");
                TransactionTemplate transactionTemplate = new TransactionTemplate();
                boolean readOnly = annotation.readOnly();
                ShardInfo shardInfo =  getShardInfo(user);
                transactionTemplate.setTransactionManager(shardDataSourceManager.getTransactionManagerByHostId(shardInfo.getHostId(), readOnly));
                return transactionTemplate.execute(new TransactionCallback() {
                    public Object doInTransaction(TransactionStatus transactionStatus) {
                        try {
                            return method.proceed();
                        }catch (Throwable t) {
                            logger.error("Rolling back transaction due to" ,t);
                            throw new RuntimeException(t);                       
            } finally {
        } else {
            return method.proceed();

    private ShardInfo getShardInfo(User user) {
        ...code to lookup shard by user   
        return shardInfo;

    public static DataSource getDataSource() {
        return dataSourceThreadLocal.get();
    private DataSource cacheDataSourceInThreadLocal(int hostId, boolean readOnly) {
        DataSource datasource = shardDataSourceManager.getDataSourceByHostId(hostId, readOnly);
        return datasource;

    private T getParam(MethodInvocation method, Class clazz) {
        Method reflectMethod = method.getMethod();
        Class[] parameterTypes = reflectMethod.getParameterTypes();
        if (parameterTypes != null) {
            int i=0;
            boolean found = false;
            for (Class parameterType : parameterTypes) {
                if(clazz.isAssignableFrom(parameterType)) {
                    found = true;
            if (found) {
                T param = (T) method.getArguments()[i];
                return param;
        return null;

public class ShardDataSourceManager {
    private static final AppLogger logger = AppLogger.getLogger(ShardDataSourceManager.class);
    private static boolean autoCommit = false;
    private Map dataSourceMap = new HashMap();

    private Map transactionManagerMap = new HashMap();

    private ShardManager shardManager;

    private String driverClassName = "";

    private int maxActive = 20;

    private int maxIdle = 5;

    private int maxWait = 180000;
    private int minEvictableIdleTimeMillis = 300000;
    private boolean testWhileIdle = true;

    private String validationQuery = "select 1 from dual";
    private String userName;

    private String userPassword;

    public String getDriverClassName() {
        return driverClassName;

    public void setDriverClassName(String driverClassName) {
        this.driverClassName = driverClassName;

    public int getMaxActive() {
        return maxActive;

    public void setMaxActive(int maxActive) {
        this.maxActive = maxActive;

    public int getMaxIdle() {
        return maxIdle;

    public void setMaxIdle(int maxIdle) {
        this.maxIdle = maxIdle;

    public int getMaxWait() {
        return maxWait;

    public void setMaxWait(int maxWait) {
        this.maxWait = maxWait;

    public int getMinEvictableIdleTimeMillis() {
        return minEvictableIdleTimeMillis;

    public void setMinEvictableIdleTimeMillis(int minEvictableIdleTimeMillis) {
        this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;

    public boolean isTestWhileIdle() {
        return testWhileIdle;

    public void setTestWhileIdle(boolean testWhileIdle) {
        this.testWhileIdle = testWhileIdle;

    public String getValidationQuery() {
        return validationQuery;

    public void setValidationQuery(String validationQuery) {
        this.validationQuery = validationQuery;

    public String getUserPassword() {
        return userPassword;

    public void setUserPassword(String userPassword) {
        this.userPassword = userPassword;

    public String getUserName() {
        return userName;

    public void setUserName(String userName) {
        this.userName = userName;

    public void init() throws Exception {
        for (DbHost shardInfo : shardManager.getDbHosts()) {
            String url = "jdbc:mysql://" + shardInfo.getMasterHost();
            BasicDataSource dataSource = createDataSource(url, username);
            dataSourceMap.put(shardInfo.getHostId(), dataSource);
            DataSourceTransactionManager masterTransactionManager = new DataSourceTransactionManager(dataSource);
            transactionManagerMap.put(shardInfo.getHostId(), masterTransactionManager);
  "DataSource Created for hostid= {}, url= {}", shardInfo.getHostId(), dataSource.getUrl());

    private BasicDataSource createDataSource(String url, String username) {"Initing {} ", url);"Creating Datasource {}", url);
        BasicDataSource dataSource = new BasicDataSource();
        return dataSource;

    private DataSource getDataSourceByHostId(int hostId) {
        DataSource dataSource = dataSourceMap.get(hostId);
        if (dataSource == null) {
            logger.warn("Could not find a data source for: {}", hostId);
            throw new IllegalArgumentException("Invalid dbname, no such pool configured: " + hostId);
        return dataSource;

    public DataSource getDataSourceByHostId(int hostId, boolean readOnly) {
        DataSource dataSource = null;
        if (dataSource == null) {
            logger.debug("Using Master datasource for hostid={}", hostId);
            dataSource = dataSourceMap.get(hostId);
        if (dataSource == null) {
            String msg = "Could not find a data source for hostId=" + hostId;
            throw new IllegalArgumentException(msg);
        return dataSource;

    public DataSourceTransactionManager getTransactionManagerByHostId(int hostId, boolean readOnly) {
        DataSourceTransactionManager transactionManager = null;
        if (transactionManager == null) {
            logger.debug("Using Master transactionmanager for hostid={}", hostId);
            transactionManager = transactionManagerMap.get(hostId);
        if (transactionManager == null) {
            String msg = "Could not find a data source for hostId=" + hostId;
            throw new IllegalArgumentException(msg);
        return transactionManager;

    public void destroy() throws Exception {"destroying pools");

    private void destroyPool(Map dsMap) throws SQLException {
        if (dsMap != null) {
            for (BasicDataSource dataSource : dsMap.values()) {
      "Discarding pools: {}", dataSource);

Annotating SQL queries

I have been using oracle and mysql top 10 queries view to proactively diagnose slow queries and optimize them for next release. But sometimes the same query will have different plans depending on data set its executed on. So if you have a sharded database then a query like select * from files where file_path=? and customer_id=? would run fine locally and for most shards but not for some shards.

To diagnose live production issues or issues that occurred in past it would be good to know some query context and one trick to do this is to annotate your queries with context information in comments.

    public static String annotateSql(String contextInfo, String sql) {
        return "  /*" + contextInfo + "*/" + sql;

now your top queries report over last day would have top queries along with context info. This context info can be customerId or shard id.

From NOSQL to MySql (BDB to mysql) - Part1

Before I joined the company it was evolving from a search and share company to a cloud file server and we were using lucene as a datastore for metadata but it was always lagging in updates as  you cant update data realtime in lucene. Then we were going to NFS for metadata but it was not scaling for metadata calls as NFS protocol was never meant to scale to web2.0 volume. So someone picked up Berkeley DB as a backend. The only reason to not pick any other NoSQL products was that none of them had multi row transactions capabilities and relational db was not picked as it would take time to port code to relational db. I had recently joined at that time and then company became a hit and we scaled from 3-4 nodes in one data centre to 60-100 nodes across multiple datacentre. We are still going through the scaling phase and believe me it is not fun scaling Berkeley db.  Berkeley db works fine if you can hold your dataset in memory but as soon as you start hitting dbs of size 10G or more it becomes a nightmare. The main issues we find with Berkeley db are:

  1. Its difficult or I should say impossible to find people qualified enough to diagnose Berkeley db crashes. We hired a consultant at some costly hourly rates but all he would give is theoretical knowledge and not enough details of the crash.
  2. As its an in memory db if tomcat is hung and kill -hup wont work then we always have to  cross our fingers doing a kill -9 as sometimes we would lose data.
  3. Sometimes DB would get corrupt pages and we would have to do a catastrophic recovery and we would lose data so temporarily I had to build an alternate replicated mysql db writing some ETL code for partial recovery of metadata.
  4. As Berkeley db is an in memory db  the only way to query data is to write a JSP. Simple queries from marketing or products team would require us to write a jsp. Updating data to fix corrupt data was no fun either.
  5. We tried hard to enable replication to other node to scale read operations but our write rates were so high that replication would always lag and it works in development but never in production.
  6. Backing up Berkeley db and rsync of logs to filers would cause IO issues on nodes.
  7. Log churn on Berkeley db would cause IO issues.
  8. To add an index you have to write code and db would become fragmented so lately we are just shutting down nodes every week and copying db to another folder and moving it back to defrag it or in worse case we have to dump/reload the db.
  9. So due to above issues we were left with pinning customers to a node and nodes would become hotspots and one bad customer would affect other customers.
  10. We had to put throttles in system as we cant serve a customer from two or more nodes due to in memory issues.
  11. Very few engineers would have the aptitude to dig deep into Berkely db code and fix issues, it took me some time to digest but it opened my vision to NOSQL dbs and I was able to apply same concepts when we added cassandra to some other usecase.
  12. Sometimes we throw more hardware to solve the issue and its not cost effective in longer term.
  13. Indexes was another issue, you have to write code to add an index and that means you will have to go thought whole release cycle to get index on a db. 
  14. Adding salt to injury was that indexes wont get created on db startup, they would only get created when someone accesses a db, for a customer with 3-4M files this could take anywhere from 10-20 mins if the server is busy or 1 min if server is idle. This unpredictable performance of bdb was making us nuts.  Whenever we would create an index we had to remember to run empty snapshot queries to generate the indexes on weekend and that would mean painful exercise to repeat on 60 nodes as one or the other node would get hung and you would end up spending anywhere from 4-5 hours fixing it.
  15. BDB Locks were another pain, bdb support page level locks instead of row level locks and that means more chance of deadlocks, we would get constant deadlocks if a background thread like Trash purge Quartz job is aggressively trying to purge data and same customer is trying to add files at same time. We had to implement application level locks to prevent such scenarios and that complicates the code.

Ultimately we became tired and decided to move to Mysql and it took almost an year to port code to mysql with so many fires going on to scale Berkely db due to 20% growth month over month.  But in short NOSQL is good and is still used at company for many usecases but when it comes to  transactions/adhoc querying Mysql is still the champion and unless you are writing a desktop client application, inmemory dbs are bad idea. I despise Berkeley db everyday to be used a server application.

Finally Mysql backend is going live today to some customers and once its live and scaling we would start removing throttles and start serving one customer from multiple nodes in coming months. My anticipation is that we would consolidate hardware and cut the no of servers in half as app nodes will now be sitting mostly idle. I would cover more details on the Mysql sharding and how scaling with Mysql goes in coming months.


Tuesday, April 3, 2012

Jersey file upload ContentDisposition getSize

if you are using Jersey for file upload and using "@FormDataParam("file") InputStream file" to receive the file and  "@FormDataParam("file") FormDataContentDisposition fileDetail" to receive filemetadata then you will run into and issue where fileDetails.getSize() is coming as -1.  This will only get populated if the "size" header is passed in  multipart request. For me FF was not sending when I was uploading file.

The only way I could make it work was to add a TeeInputStream and calculate the length.  I wrote a class

    private static class OutputStreamSizeCalculator extends OutputStream {
        private long length;

        public void write(int b) throws IOException {

        public long getLength() {
            return length;

and then wrapped my InputStream around it
        OutputStreamSizeCalculator out = new OutputStreamSizeCalculator();
        TeeInputStream tee = new TeeInputStream(in, out, true);
...pass tee to the underlying methods and then call

        long length = out.getLength();