Recently by Aaron McCurry

Blur is a new Apache 2.0 licensed software project that provides a search capability built on top of Hadoop and Lucene. Elastic Search and Solr already exist so why build something new? While these projects work well, they didn't have a solid integration with the Hadoop ecosystem. Blur was built specifically for Big Data, taking scalability, redundancy, and performance into consideration from the very start, while leveraging all the goodness that already exists in the Hadoop stack.

A year and a half ago, my project began using Hadoop for data processing. Very early on, we were having networking issues that would make our HDFS cluster network connectivity spotty at best. Over one weekend in particular, we steadily lost network connection to 47 of the 90 data nodes in the cluster. When we came in on Monday morning, I noticed that the MapReduce system was a little sluggish but still working. When I checked HDFS I saw that our capacity had dropped by about 50%. After running an fsck on the cluster I was amazed to find that what seemed like a catastrophic failure over the weekend resulted in a still healthy file system. This experience left a lasting impression on me. It was then that I got the idea to somehow leverage the redundancy and fault tolerance of HDFS for the next version of a search system that I was just beginning to (re)write.

I had already written a custom sharded Lucene server that had been in a production system for a couple of years. Lucene worked really well and did everything that we needed for search. The issue that we faced was that it was running on big iron that was not redundant and could not be easily expanded. After seeing the resilient characteristics of Hadoop first hand, I decided to look into marrying the already mature and impressive feature set of Lucene with the built in redundancy and scalability of the Hadoop platform. From this experiment Blur was created.

The biggest technical issues/features that Blur solves:

  • Rapid mass indexing of entire datasets
  • Automatic Shard Server Failover
  • Near Real-time update compatibility via Lucene NRT
  • Compression of Lucene FDT files while maintaining random access performance
  • Lucene WAL (Write Ahead Log) to provide data reliability
  • Lucene R/W directly into HDFS (the seek on write problem)
  • Random access performance with block caching of the Lucene Directory

Data Model

Data in Blur is stored in Tables that contain Rows. Rows must have a unique row id and contain one or more Records. Records have a unique record id (unique within the Row) and a column family for grouping columns that logically make up a single record. Columns contain a name and a value, and a Record can contain multiple columns with the same name.

Architecture

Blur uses Hadoop's MapReduce framework for indexing data, and Hadoop's HDFS filesystem for storing indexes. Thrift is used for all inter-process communications and Zookeeper is used to know the state of the system and to store meta data. The Blur architecture is made up of two types of server processes:

  • Blur Controller Server
  • Blur Shard Server

The shard server, serves 0 or more shards from all the currently online tables. The calculation of the what shards are online in each shard server is done through the state information in Zookeeper. If a shard server goes down, through interaction with Zookeeper the remaining shard servers detect the failure and determine which if any of the missing shards they need to serve from HDFS.

The controller server provides a single point of entry (logically) to the cluster for spraying out queries, collecting the responses, and providing a single response. Both the controller and shard servers expose the same Thrift API which helps to ease debugging. It also allows developers to start a single shard server and interact with it the same way they would with a large cluster. Many controller servers can be (and should be) run for redundancy. The controllers act as gateways to all of the data that is being served by the shard servers.

Updating / Loading Data

Currently there are two ways to load and update data. The first is through a bulk load in MapReduce and the second is through mutation calls in Thrift.

Bulk Load MapReduce Example

Data Mutation Thrift Example

Searching Data

Any element in the Blur data model is searchable through the normal Lucene semantics: analyzers. Analyzers are defined per Blur table.

The standard Lucene query syntax is the default way to search Blur. If anything outside of the standard syntax is needed, you can create a Lucene query directly with Java objects, and submit them through the expert query API.

The column family grouping within Rows allows for results to be discovered across column families similar to what you would get with an inner join across two tables that share the same key (or in this case rowid). For complicated data models that have multiple column families, this makes for a very powerful search capability.

The following example searches for "value" as a full text search. If I had wanted to search for "value" in a single field like column "colA" in column family "famB" the query would look like "famB.colA:value".

Fetching Data

Fetches can be done by row or by record. This is done by creating a selector object in which you specify the rowid or recordid, and the specific column families or columns that you would like returned. When not specified, the entire Row or Record is returned.

Current State

Blur is nearing it's first release 0.1 and is relatively stable. The first release candidate should be available for download within the next few weeks. In the meantime you can check it out on github:

https://github.com/nearinfinity/blur

http://blur.io

For a number of years I have used Lucene for both search and data storage.  Meaning, I store all the data necessary to display search results in the Lucene index.  This means that there is typically a lot of data in the FDT file, and compressing that data becomes a necessity as the indexes grow in size.

In version 3.0 support for field compression was dropped by Lucene.  I could compress each field into a byte array and store the data in document, but if you have a lot of small fields in a document this doesn't work very well.  It typically won't save you any disk space, and actually it might cost you some depending on the compression algorithm.

So I have built a block level compression for the FDT file in the form of a Lucene directory.  It allows you to choose whatever compression algorithm you want and whatever block size makes sense for your data.

The block level compression allows you to compress the entire document (possibly multiple documents) into a single block and achieve a higher compress ratio than if you had compressed each field separately.

Plus you don't have to modify any of your Lucene code, other than wrapping your real directory with the compressed implementation.

NOTE:  This only works with the compound files turned off.

While I'm working on getting my code into lucene.  I have patched 3.0.0 and 2.9.1 with my low memory patch.

By default the small memory footprint is enabled to change back to the default implementation set the following system property.

-Dorg.apache.lucene.index.TermInfosReader=default

Have fun!  If you have any problems or questions please let me know or add to this LUCENE-2205.



Lucene gives users the ability to search massive amounts of data in a very short amount of time.  However allowing users to page through the entire result set of their search can be difficult and risky depending on how many users are performing searches and how many of those users are paging through 100's if not 1,000's of hits per page.

Problem Scenario:

  • Each of your indexes contains 100,000,000's documents.
  • You have 500 users on your system actively performing searches.
  • You have 100 search results per page.
  • And, your typical user pages through the first 10 pages of results.  (Normal occurrence on some systems)

So for the 10th page you will have to collect 1,000 hits, at a cost of a float plus an int plus some object overhead per hit.  So let's say 20 bytes per hit.  So you have 500 users * 1,000 hits * 20 bytes = 10,000,000 bytes or 10M.  Easy, no problem, right?

Well what if you also give the users an easy way to move to the end of the result set.  Hmm...  Well for a result set size of 10,000 it's no big deal.  But what if you hand out result sets in the order of a 1,000,000 or even 10,000,000.

At this point you really just want to prevent the system from running out of memory.  Because if you have 25 users getting 10,000,000 results each and they all click last page at the same time.  That's going to cost you 5 Gig of heap!  At least.  Some might say that it won't ever happen, but in my experience, if it can happen, it will.

So I created a Paging Hit Collector, that windows the hits to the users.  It's uses the last hit collected from the previous search pass, to feed the next search pass.  So yes if a user clicks the last page, it might perform multiple searches but, the system won't run out of memory.

The user's will get there answer eventually, and if your system gives them some feedback as it searches and pages, they will probably sit and wait for it to come back.  Instead of giving up and hitting cancel and search and cancel and search, and making the system worse and worse.

The Simple Example:

IndexSearcher searcher = new IndexSearcher(reader);
TermQuery query = new TermQuery(new Term("f1", "value")); IterablePaging paging = new IterablePaging(searcher, query, 100);
for (ScoreDoc sd : paging.skipTo(90)) {   System.out.println("doc id [" + sd.doc + "] " +     "score [" + sd.score + "]"); }

The More Advanced Example:

IndexSearcher searcher = new IndexSearcher(reader);
TotalHitsRef totalHitsRef = new TotalHitsRef(); ProgressRef progressRef = new ProgressRef();
TermQuery query = new TermQuery(new Term("f1", "value")); IterablePaging paging = new IterablePaging(searcher, query, 100);
for (ScoreDoc sd : paging.skipTo(90).                           gather(20).                           totalHits(totalHitsRef).                           progress(progressRef)) {
  System.out.println("time [" + progressRef.queryTime() + "] " +     "total hits [" + totalHitsRef.totalHits() + "] " +     "searches [" + progressRef.searchesPerformed() + "] " +     "position [" + progressRef.currentHitPosition() + "] " +     "doc id [" + sd.doc + "] " +     "score [" + sd.score + "]"); }

Here's a link to the code LUCENE-2215.

I've been using Lucene for the better part of 2 years, from initial playing around, to prototyping to production application.  It's an impressive library and it has come along way in the past couple of years.

When I first started playing around with it the version was 2.1 and the search times were so much faster than what we were trying to use at the time (Oracle Text).  The first test was indexing a monster dataset and searching it quickly.  It passed with flying colors!

Next was to add in record level access control.  Easy and extremely fast.

Next was to add in all the other data needed for our application.  That was a little bit harder, considering that we have close to 150 fields in our index and well into the billion record range (growing everyday).

The problem was that we needed more memory and there was no extra money for any more servers (or upgrades).  So there we were, stuck.  So I decided to start poking around using visualvm to see if there were any places in our application or in Lucene to save some memory.

We had already disabled norms on all our fields (we really didn't need norms for our data nor did we have the resources).  Took a long look at all our fields that we were indexing to see if there were any we didn't need, but we really did need them all.  Then I stumbled across the TermInfosReader class in Lucene.

This is where Lucene really gets it speed, but also uses quite a bit memory to do it.  And this is where I wrote my first Lucene patch.

In TermInfosReader there is a bunch stuff but the big memory hogs are in three arrays.

  • Terms[]
  • TermInfos[]
  • long[]
Basically Lucene does a binary search across the Terms array (that by default contains every 128th Term in the index) with a given Term to find where on disk the exact Term needed lives.  There's a little bit more going on in the class than that, but that's basically what it's doing.

So, I started this patch with the need to save memory.  So how in the world do you do that in java when everything is already in basic arrays and everything is needed in memory.  Well you have to save it another way, references.  References are a hidden cost in Java, every single reference in 32-bit JVM costs you 4 bytes, and 64-bit JVM it's 8 (assuming that you don't have compressed references).

Let's count the references.

  • Terms[] length * 3, 1 reference for the Term and 2 references for the two Strings inside the Term
  • TermInfo[] length * 1
  • long[] = 1 reference total
So, let's talk numbers.  If you have a billion terms in your index, that's 125 MB (1,000,000,000 / 128 * (3 + 1 references) * 4 bytes for every ref) bytes of memory for the references.  In a 64-bit JVM that doubled 250 MB.  Not to mention the object overhead for every one of those Term and TermInfos objects. Wow that's a lot!

So I decided to remove nearly all of those references by using a byte array and an int array as an offset index.

The results were impressive!

Given an index of 6.2 GB size 1,010,000 number of documents with 179,822,683 number of terms the default implementation uses 292,235,512 bytes to just get the index usable.

My no-ref implementation of the same index uses only 49,849,744 bytes get the index usable.  That a 17% of the original size, that's an 83% savings!

And the best part is, that it loads the segments faster into memory.  So those real-time updates will be online faster.  The run-time performance is slightly faster as well.  But the huge performance saving is in garbage collection.  Over 7 times faster for full GC's on my Macbook Pro.  Wow!

I think that the results speak for themselves, and I hope that the Lucene folks will accept my patch.  That way I won't have to continue patching each version after the fact.  Also removing references can be great, but the code required to do it, and maintain the same level of performance, is ugly!  So don't try this at home!