Recently about Hadoop

Blur is a new Apache 2.0 licensed software project that provides a search capability built on top of Hadoop and Lucene. Elastic Search and Solr already exist so why build something new? While these projects work well, they didn't have a solid integration with the Hadoop ecosystem. Blur was built specifically for Big Data, taking scalability, redundancy, and performance into consideration from the very start, while leveraging all the goodness that already exists in the Hadoop stack.

A year and a half ago, my project began using Hadoop for data processing. Very early on, we were having networking issues that would make our HDFS cluster network connectivity spotty at best. Over one weekend in particular, we steadily lost network connection to 47 of the 90 data nodes in the cluster. When we came in on Monday morning, I noticed that the MapReduce system was a little sluggish but still working. When I checked HDFS I saw that our capacity had dropped by about 50%. After running an fsck on the cluster I was amazed to find that what seemed like a catastrophic failure over the weekend resulted in a still healthy file system. This experience left a lasting impression on me. It was then that I got the idea to somehow leverage the redundancy and fault tolerance of HDFS for the next version of a search system that I was just beginning to (re)write.

I had already written a custom sharded Lucene server that had been in a production system for a couple of years. Lucene worked really well and did everything that we needed for search. The issue that we faced was that it was running on big iron that was not redundant and could not be easily expanded. After seeing the resilient characteristics of Hadoop first hand, I decided to look into marrying the already mature and impressive feature set of Lucene with the built in redundancy and scalability of the Hadoop platform. From this experiment Blur was created.

The biggest technical issues/features that Blur solves:

  • Rapid mass indexing of entire datasets
  • Automatic Shard Server Failover
  • Near Real-time update compatibility via Lucene NRT
  • Compression of Lucene FDT files while maintaining random access performance
  • Lucene WAL (Write Ahead Log) to provide data reliability
  • Lucene R/W directly into HDFS (the seek on write problem)
  • Random access performance with block caching of the Lucene Directory

Data Model

Data in Blur is stored in Tables that contain Rows. Rows must have a unique row id and contain one or more Records. Records have a unique record id (unique within the Row) and a column family for grouping columns that logically make up a single record. Columns contain a name and a value, and a Record can contain multiple columns with the same name.

Architecture

Blur uses Hadoop's MapReduce framework for indexing data, and Hadoop's HDFS filesystem for storing indexes. Thrift is used for all inter-process communications and Zookeeper is used to know the state of the system and to store meta data. The Blur architecture is made up of two types of server processes:

  • Blur Controller Server
  • Blur Shard Server

The shard server, serves 0 or more shards from all the currently online tables. The calculation of the what shards are online in each shard server is done through the state information in Zookeeper. If a shard server goes down, through interaction with Zookeeper the remaining shard servers detect the failure and determine which if any of the missing shards they need to serve from HDFS.

The controller server provides a single point of entry (logically) to the cluster for spraying out queries, collecting the responses, and providing a single response. Both the controller and shard servers expose the same Thrift API which helps to ease debugging. It also allows developers to start a single shard server and interact with it the same way they would with a large cluster. Many controller servers can be (and should be) run for redundancy. The controllers act as gateways to all of the data that is being served by the shard servers.

Updating / Loading Data

Currently there are two ways to load and update data. The first is through a bulk load in MapReduce and the second is through mutation calls in Thrift.

Bulk Load MapReduce Example

Data Mutation Thrift Example

Searching Data

Any element in the Blur data model is searchable through the normal Lucene semantics: analyzers. Analyzers are defined per Blur table.

The standard Lucene query syntax is the default way to search Blur. If anything outside of the standard syntax is needed, you can create a Lucene query directly with Java objects, and submit them through the expert query API.

The column family grouping within Rows allows for results to be discovered across column families similar to what you would get with an inner join across two tables that share the same key (or in this case rowid). For complicated data models that have multiple column families, this makes for a very powerful search capability.

The following example searches for "value" as a full text search. If I had wanted to search for "value" in a single field like column "colA" in column family "famB" the query would look like "famB.colA:value".

Fetching Data

Fetches can be done by row or by record. This is done by creating a selector object in which you specify the rowid or recordid, and the specific column families or columns that you would like returned. When not specified, the entire Row or Record is returned.

Current State

Blur is nearing it's first release 0.1 and is relatively stable. The first release candidate should be available for download within the next few weeks. In the meantime you can check it out on github:

https://github.com/nearinfinity/blur

http://blur.io

Last Thursday (on Cinco de Mayo) I gave a presentation on Hadoop and Hive at the Nova/DC Java Users Group. As several people asked about getting the slides, I've shared them here on Slideshare. I also posted the presentation sample code on Github at basic-hadoop-examples.

So, you have a ton of data and are trying to figure out what to do with it.  Big data is the newest industry buzzword and there are a myriad of solutions out there that claim to solve the big data problem.  One of the industry leading technologies at the moment is Hadoop.  But what is Hadoop and how does it make working with big data manageable?  Lets see if we can take some of the essentials of Hadoop and explain them in more business terms. While no analogy is perfect, I think that this one is pretty good.

Coca Cola started in Atlanta Georgia.  Coke products were all made in one plant and distributed locally in Georgia.  As the popularity of Coke increased over the years, the Coca Cola Company had to change the production and distribution of it's product in order to meet global demand.  Now there are bottling plants all over the world that use raw materials and Coke recipes to produce and distribute billions of servings of Coke products a year.  As your data grows like the demand for Coke, one plant will not be enough to satisfy your data production needs.  Enter Hadoop.

Hadoop is a distributed data crunching architecture very much like Coke's bottling and distribution network.  So, why is it impractical to just scale one bottling plant as demand grows? Let's see.  Physical infrastructure becomes a problem.  As production scales up, you need more raw material.  This means a larger loading dock, more trucks, and larger roads to accommodate increased traffic.  You also need more machinery, more power, and more people to run the plant.  Of course all of this can scale to a point but could you imagine the infrastructure that Coca Cola would need in Atlanta to produce all of the billions of cans of Coke that they sell a year?

You also need a delivery network that delivers product in a timely manner. This is more problematic.  Delivering Coke to new markets that are further away from the plant means more trucks and aged product.  For all of these reasons Coca Cola made a transformational decision to ship raw material all over the country and eventually world and have regional plants bottle Coke locally.  You can do the same thing with data.

Instead of having one large plant that has a finite production capability, Hadoop allows you to have thousands of smaller distributed plants that work together in a network of data production.  This solves many big data problems in the same way that Coke solved their production problem.  A Hadoop cluster of computers is made up of as many production plants as you need to process your data. Your raw material is your data.  Hadoop automatically distributes this raw material to all of your data production plants or nodes.  When you run a Hadoop job, instead of pulling data to the program, it pushes the program to the data just like a recipe would be distributed to all of the bottling plants that produce Coke.  This approach has many benefits over large singular databases or having one huge bottling plant.

Increase capacity without affecting production

Adding a new plant has no impact on the other plants in the network.  When a new plant comes online, the Hadoop system automatically distributes raw material to it and sends it data crunching recipes so that it can immediately increase capacity.


Upgrade individual plants without halting production

If there is a new conveyor belt that can increase the capacity of a plant, while one plant is being upgraded the rest can increase production slightly in order to absorb the temporarily reduced capacity.  


Absorb plant failures

With one large system, if something catastrophic happens, all production stops.  With a bottling plant network, if a plant in Wisconsin has to halt production because of local flooding, the plants in Illinois and Ohio can ratchet up capacity temporarily to meet demand while flood waters subside.


And then there's scalability

If I have one big plant that that is at capacity, what do I do?  Do I build another big plant and double my capacity even though I may initially only need to increase production by 5%?  Since the plants are smaller, Hadoop allows you to add what you need when you need it.


There is much more to Hadoop than described here, but I think that this gives you a good idea as to why you should take a look at it if you have big data that you want to exploit in some way.  This distribution and production technique did wonders for Coca Cola, just think of what it could do for you.

Using HBase-dsl

| | Comments (1) | TrackBacks (0)
At the beginning of last month I started prototyping various solutions for a customer using HBase.  However I found myself writing tons of code to perform some fairly simple tasks.  So I set out to simply my HBase code and ended up writing a Java HBase DSL.  It's still fairly rough around the edges but it does allow the use of standard Java types and it's extensible.

Simple Put and Get Example

Direct HBase API:

public class PutAndGet {
   public static void main(String[] args) throws IOException {
      HTable hTable = new HTable("test");

      byte[] rowId = Bytes.toBytes("abcd");
      byte[] famA = Bytes.toBytes("famA");
      byte[] col1 = Bytes.toBytes("col1");
      Put put = new Put(rowId).
         add(famA, col1, Bytes.toBytes("hello world!"));
      hTable.put(put);
      Get get = new Get(rowId);
      Result result = hTable.get(get);
      byte[] value = result.getValue(famA, col1);
      System.out.println(Bytes.toString(value));
   }
}
HBase-dsl API:

public class PutAndGetWithDsl {

   public static void main(String[] args) throws IOException {

      HBase<QueryOps, String> hBase = new HBase<QueryOps<String>, String>(String.class);

      hBase.save("test").
 
         row("abcd").

            family("famA").

               col("col1", "hello world!");

      String value = hBase.fetch("test").

         row("abcd").
            family("famA").

               value("col1", String.class)
      System.out.println(value);
   }

}
Now this is where the dsl becomes more powerful!

Scanner Example

Direct HBase API:

public class Scanner {
   public static void main(String[] args) throws IOException {
      byte[] famA = Bytes.toBytes("famA");
      byte[] col1 = Bytes.toBytes("col1");  

      HTable hTable = new HTable("test");  

      Scan scan = new Scan(Bytes.toBytes("a"), Bytes.toBytes("z"));
      scan.addColumn(famA, col1);  

      SingleColumnValueFilter singleColumnValueFilterA = new SingleColumnValueFilter(
           famA, col1, CompareOp.EQUAL, Bytes.toBytes("hello world!"));
      singleColumnValueFilterA.setFilterIfMissing(true);  

      SingleColumnValueFilter singleColumnValueFilterB = new SingleColumnValueFilter(
           famA, col1, CompareOp.EQUAL, Bytes.toBytes("hello hbase!"));
      singleColumnValueFilterB.setFilterIfMissing(true);  

      FilterList filter = new FilterList(Operator.MUST_PASS_ONE, Arrays
           .asList((Filter) singleColumnValueFilterA,
                singleColumnValueFilterB));  

      scan.setFilter(filter);  

      ResultScanner scanner = hTable.getScanner(scan);  

      for (Result result : scanner) {
         System.out.println(Bytes.toString(result.getValue(famA, col1)));
      }
   }
}
HBase-dsl API:

public class ScannerWithDsl {
   public static void main(String[] args) throws IOException {
      HBase<QueryOps, String> hBase = new HBase<QueryOps<String>, String>(String.class);

      hBase.scan("test","a","z").
         select().
            family("famA").
               col("col1").
         where().
            family("famA").
               col("col1").eq("hello world!","hello hbase!").
         foreach(new ForEach() {
            @Override
            public void process(Row row) {
               System.out.println(row.value("famA", "col1", String.class));
            }
         });
  }
}

See the unit tests, for more examples.

In the past few weeks I have been spending more and more time working with Hadoop and Hive.  For those of you that don't know what Hadoop is check out what wikipedia has to say.  Hive is built on top of Hadoop, simply stated is it a SQL engine that submits map/reduce jobs to Hadoop for execution.

So next you ask yourself, "why do I care"?  Well with Hive using Hadoop for all the heavy lifting, the amount of data that you can process is only limited by the amount of hardware you have in your cluster.  Hive is used for data warehousing which means that it is designed to work on huge datasets, huge joins, huge data loads, huge query results, etc.  However before you start thinking about getting rid of that MySQL database, think again.  Hive is not and never will be low latency.  All queries submit map/reduce jobs to Hadoop which then operates on files stored in HDFS.

Hive has a lot of nice features built in, like:
  • It can operate on raw files located in HDFS, like logs from you application, like csv files from your database(s).  So this can reduce your load time, because you don't have to actually load it into a database before you can use it.
  • It can operate on compressed files.  I started using this feature last week because I am getting a 4 to 1 compression ratio with no different in performance (I am using sequence files with block compression).
  • In your SQL statements you can actually use the Hadoop streaming api to build your own mapper and reducers, and they don't even have to be written in Java!
  • You can also create your own user defined functions, so when you have to do something crazy with the data, you can!

And there are lots more, so go check it out!

Hive, the real Netezza killer.