Recently about Lucene

Blur is a new Apache 2.0 licensed software project that provides a search capability built on top of Hadoop and Lucene. Elastic Search and Solr already exist so why build something new? While these projects work well, they didn't have a solid integration with the Hadoop ecosystem. Blur was built specifically for Big Data, taking scalability, redundancy, and performance into consideration from the very start, while leveraging all the goodness that already exists in the Hadoop stack.

A year and a half ago, my project began using Hadoop for data processing. Very early on, we were having networking issues that would make our HDFS cluster network connectivity spotty at best. Over one weekend in particular, we steadily lost network connection to 47 of the 90 data nodes in the cluster. When we came in on Monday morning, I noticed that the MapReduce system was a little sluggish but still working. When I checked HDFS I saw that our capacity had dropped by about 50%. After running an fsck on the cluster I was amazed to find that what seemed like a catastrophic failure over the weekend resulted in a still healthy file system. This experience left a lasting impression on me. It was then that I got the idea to somehow leverage the redundancy and fault tolerance of HDFS for the next version of a search system that I was just beginning to (re)write.

I had already written a custom sharded Lucene server that had been in a production system for a couple of years. Lucene worked really well and did everything that we needed for search. The issue that we faced was that it was running on big iron that was not redundant and could not be easily expanded. After seeing the resilient characteristics of Hadoop first hand, I decided to look into marrying the already mature and impressive feature set of Lucene with the built in redundancy and scalability of the Hadoop platform. From this experiment Blur was created.

The biggest technical issues/features that Blur solves:

  • Rapid mass indexing of entire datasets
  • Automatic Shard Server Failover
  • Near Real-time update compatibility via Lucene NRT
  • Compression of Lucene FDT files while maintaining random access performance
  • Lucene WAL (Write Ahead Log) to provide data reliability
  • Lucene R/W directly into HDFS (the seek on write problem)
  • Random access performance with block caching of the Lucene Directory

Data Model

Data in Blur is stored in Tables that contain Rows. Rows must have a unique row id and contain one or more Records. Records have a unique record id (unique within the Row) and a column family for grouping columns that logically make up a single record. Columns contain a name and a value, and a Record can contain multiple columns with the same name.

Architecture

Blur uses Hadoop's MapReduce framework for indexing data, and Hadoop's HDFS filesystem for storing indexes. Thrift is used for all inter-process communications and Zookeeper is used to know the state of the system and to store meta data. The Blur architecture is made up of two types of server processes:

  • Blur Controller Server
  • Blur Shard Server

The shard server, serves 0 or more shards from all the currently online tables. The calculation of the what shards are online in each shard server is done through the state information in Zookeeper. If a shard server goes down, through interaction with Zookeeper the remaining shard servers detect the failure and determine which if any of the missing shards they need to serve from HDFS.

The controller server provides a single point of entry (logically) to the cluster for spraying out queries, collecting the responses, and providing a single response. Both the controller and shard servers expose the same Thrift API which helps to ease debugging. It also allows developers to start a single shard server and interact with it the same way they would with a large cluster. Many controller servers can be (and should be) run for redundancy. The controllers act as gateways to all of the data that is being served by the shard servers.

Updating / Loading Data

Currently there are two ways to load and update data. The first is through a bulk load in MapReduce and the second is through mutation calls in Thrift.

Bulk Load MapReduce Example

Data Mutation Thrift Example

Searching Data

Any element in the Blur data model is searchable through the normal Lucene semantics: analyzers. Analyzers are defined per Blur table.

The standard Lucene query syntax is the default way to search Blur. If anything outside of the standard syntax is needed, you can create a Lucene query directly with Java objects, and submit them through the expert query API.

The column family grouping within Rows allows for results to be discovered across column families similar to what you would get with an inner join across two tables that share the same key (or in this case rowid). For complicated data models that have multiple column families, this makes for a very powerful search capability.

The following example searches for "value" as a full text search. If I had wanted to search for "value" in a single field like column "colA" in column family "famB" the query would look like "famB.colA:value".

Fetching Data

Fetches can be done by row or by record. This is done by creating a selector object in which you specify the rowid or recordid, and the specific column families or columns that you would like returned. When not specified, the entire Row or Record is returned.

Current State

Blur is nearing it's first release 0.1 and is relatively stable. The first release candidate should be available for download within the next few weeks. In the meantime you can check it out on github:

https://github.com/nearinfinity/blur

http://blur.io

This post is going to demonstrate thrift usage by searching a Lucene index from Ruby.

Thrift In a Nutshell

Essentially thrift is a serialization and RPC framework that allows you to communicate between programs that are not necessarily written in the same language. Thrift is used by defining data types and services in a .thrift file. You then run the .thrift file against the thrift compiler which generates the stub code needed for clients and servers. Currently thrift will generate code for C++, C#, Erlang, Haskell, Java, Objective C/Cocoa, OCaml, Perl, PHP, Python, Ruby, and Squeak. For a more detailed description of thrift along with instructions on how to install thrift if needed, consult the thrift wiki

Generating the Lucene Index

Our first step is to generate a small, simple lucene index. To build our index, 50,000 fake person records were downloaded from the Fake Name Generator in a comma delimited file. Each person record will contain a first name, last name, address and email address. Our indexing code will be very simple and will not be using any of lucene's advanced features.
public class IndexBuilder {

    public static void main(String[] args) throws Exception {
        String namesFile = "names.csv";
        Document doc = new Document();
        Field[] fields = new Field[]{new Field("firstName", "", Field.Store.YES, Field.Index.ANALYZED_NO_NORMS),
                new Field("lastName", "", Field.Store.YES, Field.Index.ANALYZED_NO_NORMS),
                new Field("address", "", Field.Store.YES, Field.Index.ANALYZED_NO_NORMS),
                new Field("email", "", Field.Store.YES, Field.Index.ANALYZED_NO_NORMS)};
        addFieldsToDocument(doc, fields);

        BufferedReader reader = new BufferedReader(new FileReader(namesFile));

        IndexWriter indexWriter = new IndexWriter(FSDirectory.open(new File("blog-index")),new IndexWriterConfig(Version.LUCENE_31, new StandardAnalyzer(Version.LUCENE_31)));

        String line;
        while ((line = reader.readLine()) != null) {
            String[] personData = getPersonData(line);
            setFieldData(personData, fields);
            indexWriter.addDocument(doc);
        }
        indexWriter.optimize();
        indexWriter.close();
    }

    private static String[] getPersonData(String line) {
        return line.split(",");
    }

    private static void setFieldData(String[] data, Field[] fields) {
        int index = 0;
        for (Field field : fields) {
            field.setValue(data[index++]);
        }
    }

    private static void addFieldsToDocument(Document doc, Field[] fields) {
        for (Field field : fields) {
            doc.add(field);
        }
    }
}

Creating the .thrift File

The next step will be to define what objects and services we want in our .thrift file, which will be called lucene_search.thrift. The lucene_search.thrift file is intentionally very basic. For more details on the structure of .thrift files consult the thrift wiki tutorial
//all generated java code will have the following for package name
namespace java bbejeck.thrift.gen

//this is the person object 
struct Person {
  1: string firstName,
  2: string lastName,
  3: string address,
  4: string email
}

//exception used to send meaningful error messages back to user
exception LuceneSearchException {
  1: string message
}

//service definition used by client and server
service LuceneSearch { 
    list<Person> search(1: string query) throws (1:LuceneSearchException error) 
}
As you can see from the example above, the .thrift file format is completely language agnostic. Next we need to generate our java and ruby code. The following were run from the command line:
  • $ thrift --gen java lucene_search.thrift
  • $ thrift --gen rb lucene_search.thrift
The generated code ends up in two directories named gen-java/ and gen-rb/ respectively. The files generated for java are LuceneSearch.java, LuceneSearchException.java and Person.java. The generated ruby files are lucene_search.rb, lucene_search_types.rb and lucene_search_constants.rb. In our next step, we are going to use generated java code to write our thrift server.

Thrift Server - Java

Thrift generates all the stub code you need for a server to expose your service or program. The only code we will need to write is a class that implements the generated Iface interface (defined in the LuceneSearch class), which contains the search method defined in our .thrift file.
public class LuceneThriftServer {
    private static final int PORT = 9090;
    private static int numberThreads = 5;

    public static void main(String[] args) throws Exception {
        TServerSocket serverSocket = new TServerSocket(PORT, 100000);
        LuceneSearch.Processor searchProcessor = new LuceneSearch.Processor(new SearchHandler(args[0]));
        if (args.length > 1) {
            numberThreads = Integer.parseInt(args[1]);
        }
        TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverSocket);
        serverArgs.maxWorkerThreads(numberThreads);
        TServer thriftServer = new TThreadPoolServer(serverArgs.processor(searchProcessor).protocolFactory(new TBinaryProtocol.Factory()));
        thriftServer.serve();
    }

Iface Implementation

The SearchHandler class actually does the work of searching the lucene index. One tradeoff made here is that any exception while searching is caught and re-thrown as a LuceneSearchException. While it's usually not a great idea to just re-throw an exception, in this case it makes sense to do so. Since the LuceneSearchException is defined in the lucene_search.thrift file, the generated client code will handle that exception. So instead of receiving a generic thrift exception when an error occurs, the client should receive a more meaningful error message.
public class SearchHandler implements LuceneSearch.Iface {
    private IndexSearcher searcher;
    private QueryParser queryParser;
    private static final int MAX_RESULTS = 1000;

    public SearchHandler(String indexPath) {
        try {
            searcher = new IndexSearcher(FSDirectory.open(new File(indexPath)), true);
            queryParser = new QueryParser(Version.LUCENE_31, null, new StandardAnalyzer(Version.LUCENE_31));
            queryParser.setAllowLeadingWildcard(true);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public List<Person> search(String query) throws LuceneSearchException {
        List<Person> results = new ArrayList<Person>();
        try {
            Query q = queryParser.parse(query);
            TopDocs topDocs = searcher.search(q, MAX_RESULTS);
            for (ScoreDoc sd : topDocs.scoreDocs) {
                Document document = searcher.doc(sd.doc);
                results.add(getPersonFromDocument(document));
            }
        } catch (Exception e) {
            throw new LuceneSearchException(e.getMessage());
        }
        return results;
    }

    private Person getPersonFromDocument(Document document) {
        Person p = new Person();
        p.firstName = document.get("firstName");
        p.lastName = document.get("lastName");
        p.address = document.get("address");
        p.email = document.get("email");

        return p;
    }
}
The next step in our process is to write the client.

Thrift Client - Ruby

Writing the thrift ruby client is even easier than writing the server code. If you have not already done so, install the thrift gem by running "gem install thrift" to get the required thrift library code. All the code you need for your client is already generated by thrift. At this point we are only doing what is needed to get the client to communicate with the server.
module ThriftConnection

  class LuceneClient

    def initialize(host='localhost', port=9090)
      socket = Thrift::Socket.new(host, port)
      @transport = Thrift::BufferedTransport.new(socket)
      protocol_factory = ::Thrift::BinaryProtocolFactory.new
      protocol = protocol_factory.get_protocol(@transport)
      @transport.open
      @client = LuceneSearch::Client.new(protocol)
    end

    def search(query)
      @client.search(query)
    end

    def close
      @transport.close
    end

  end
end

Running With Scissors

This section has the odd title "Running With Scissors", because like actual running with scissors, what we are about to do may not be a great idea. In all the thrift generated code there is a warning at the top "DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING", obviously I don't, but I'm not going to let that stop me at this point (sometimes you just have to see if you can get something to work!) What we've done is implement method_missing in the generated Person class (found in lucene_search_types.rb) so we can specify searches ala ActiveRecord style. What we are going to do to accomplish this is use a regular expression to pull out what fields to search for and use the arguments passed in as the search values. The regular expression here is fairly simple and only aims to handle simple searches.
#added to translate from symbol to expected search format
  SEARCH_KEYS_MAPPING = {:first_name => 'firstName',
                                            :last_name => 'lastName',
                                            :email => 'email',
                                            :address => 'address'}


  def self.method_missing(method_name, *args)
    lucene_client = ThriftConnection::LuceneClient.new
    query = ""
        #handles find_by_first_name etc
    if method_name.to_s =~ /^find_by_([a-z]+_?[a-z]*)$/
      query = "#{SEARCH_KEYS_MAPPING[$1.to_sym]}:#{args[0]}"
        #handles find_by_first_name_or_last_name, find_by_first_name_and_email 
    elsif method_name.to_s =~/^find_by_([a-z]+_[a-z]+)_([a-z]+)_([a-z]+_?[a-z]*)$/
      query ="#{SEARCH_KEYS_MAPPING[$1.to_sym]}:#{args[0]} #{$2.upcase} #{SEARCH_KEYS_MAPPING[$3.to_sym]}:#{args[1]}"
    else
       raise ArgumentError.new("search method pattern #{method_name} not recognized")
    end

    results = lucene_client.search(query)
    lucene_client.close
    results
  end
As we'll see in the next section, this actually worked, but I still view this more as a useful experiment. First of all this was placed in generated code, so any time you make changes you would have to manually get the method_missing definition back into the Person class. Secondly, Lucene search syntax is really not all that hard to learn.

Testing

All of what we have done so far would not be worth much if we could not verify our work with some testing. Here is the unit test to verify that we are indeed able to search a Lucene index from Ruby. To get some names to search on I simply ran
head names.csv
and then used some of the information in various combinations to get counts of what searches should return. For example to get an idea of what searching for a first name of Elizabeth or last name of Krause would return I ran
cat names.csv | grep -iE 'elizabeth|krause' | wc -l 
which returned a count of 289. So, first making sure that our thrift server was running in the background, here is the unit test that was run to verify our Ruby client searching against a Lucene index.
class SearchTest < Test::Unit::TestCase

  def setup
    @lucene_client = ThriftConnection::LuceneClient.new
  end


  def teardown
    @lucene_client.close
  end

  def test_search_client_first_name
    persons = @lucene_client.search("firstName:Tia")
    assert_equal(5, persons.length)

    persons.each do |person|
      assert_equal("Tia", person.firstName)
    end
  end

  def test_search_person_class_first_name
    persons = Person.find_by_first_name("Tia")
    assert_equal(5, persons.length)

    persons.each do |person|
      assert_equal("Tia", person.firstName)
    end
  end

  def test_search_client_first_name_email_domain
    persons = @lucene_client.search("+firstName:Elizabeth +email:*pookmail.com")
    assert_equal(59, persons.length)
  end

  def test_search_person_class_first_name_email_domain
    persons = Person.find_by_first_name_and_email("elizabeth", "*pookmail.com")
    assert_equal(59, persons.length)
  end

  def test_search_client_first_name_and_last_name
    persons = @lucene_client.search("+firstName:Elizabeth +lastName:Krause")
    assert_equal(1, persons.length)
    person = persons[0]

    assert_equal("Elizabeth", person.firstName)
    assert_equal("Krause", person.lastName)
  end

  def test_search_person_class_first_name_and_last_name
    persons = Person.find_by_first_name_and_last_name("elizabeth", "krause")
    assert_equal(1, persons.length)
    person = persons[0]

    assert_equal("Elizabeth", person.firstName)
    assert_equal("Krause", person.lastName)
  end

  def test_search_person_class_first_name_or_last_name
    persons = Person.find_by_first_name_or_last_name("elizabeth", "krause")
    assert_equal(289, persons.length)
  end

  def test_invalid_search
    assert_raises ArgumentError do
      Person.find_person_by_name("tia")
    end
  end

end

Conclusion

Thrift is a compelling alternative for RPC or message passing where one might otherwise be using either REST, Java RMI or middleware (JMS, AMQP). There is a great comparison of how thrift performs against other forms of RPC in this thrift tutorial from OCI found near the end of the article. It is hoped the reader was able to learn something useful. Thanks for your time

Resources

Full source for the blog including the generated code can be found on github. If you are interested in running the test you can download lucene-thrift-example.tar.gz extract the tar file and execute the runSearchTest.sh script. You do not need to have thrift installed to run the test.
  • For more information on thrift the thrift wiki is a great start
  • More information on Lucene can be found here
For a number of years I have used Lucene for both search and data storage.  Meaning, I store all the data necessary to display search results in the Lucene index.  This means that there is typically a lot of data in the FDT file, and compressing that data becomes a necessity as the indexes grow in size.

In version 3.0 support for field compression was dropped by Lucene.  I could compress each field into a byte array and store the data in document, but if you have a lot of small fields in a document this doesn't work very well.  It typically won't save you any disk space, and actually it might cost you some depending on the compression algorithm.

So I have built a block level compression for the FDT file in the form of a Lucene directory.  It allows you to choose whatever compression algorithm you want and whatever block size makes sense for your data.

The block level compression allows you to compress the entire document (possibly multiple documents) into a single block and achieve a higher compress ratio than if you had compressed each field separately.

Plus you don't have to modify any of your Lucene code, other than wrapping your real directory with the compressed implementation.

NOTE:  This only works with the compound files turned off.

While I'm working on getting my code into lucene.  I have patched 3.0.0 and 2.9.1 with my low memory patch.

By default the small memory footprint is enabled to change back to the default implementation set the following system property.

-Dorg.apache.lucene.index.TermInfosReader=default

Have fun!  If you have any problems or questions please let me know or add to this LUCENE-2205.



Lucene gives users the ability to search massive amounts of data in a very short amount of time.  However allowing users to page through the entire result set of their search can be difficult and risky depending on how many users are performing searches and how many of those users are paging through 100's if not 1,000's of hits per page.

Problem Scenario:

  • Each of your indexes contains 100,000,000's documents.
  • You have 500 users on your system actively performing searches.
  • You have 100 search results per page.
  • And, your typical user pages through the first 10 pages of results.  (Normal occurrence on some systems)

So for the 10th page you will have to collect 1,000 hits, at a cost of a float plus an int plus some object overhead per hit.  So let's say 20 bytes per hit.  So you have 500 users * 1,000 hits * 20 bytes = 10,000,000 bytes or 10M.  Easy, no problem, right?

Well what if you also give the users an easy way to move to the end of the result set.  Hmm...  Well for a result set size of 10,000 it's no big deal.  But what if you hand out result sets in the order of a 1,000,000 or even 10,000,000.

At this point you really just want to prevent the system from running out of memory.  Because if you have 25 users getting 10,000,000 results each and they all click last page at the same time.  That's going to cost you 5 Gig of heap!  At least.  Some might say that it won't ever happen, but in my experience, if it can happen, it will.

So I created a Paging Hit Collector, that windows the hits to the users.  It's uses the last hit collected from the previous search pass, to feed the next search pass.  So yes if a user clicks the last page, it might perform multiple searches but, the system won't run out of memory.

The user's will get there answer eventually, and if your system gives them some feedback as it searches and pages, they will probably sit and wait for it to come back.  Instead of giving up and hitting cancel and search and cancel and search, and making the system worse and worse.

The Simple Example:

IndexSearcher searcher = new IndexSearcher(reader);
TermQuery query = new TermQuery(new Term("f1", "value")); IterablePaging paging = new IterablePaging(searcher, query, 100);
for (ScoreDoc sd : paging.skipTo(90)) {   System.out.println("doc id [" + sd.doc + "] " +     "score [" + sd.score + "]"); }

The More Advanced Example:

IndexSearcher searcher = new IndexSearcher(reader);
TotalHitsRef totalHitsRef = new TotalHitsRef(); ProgressRef progressRef = new ProgressRef();
TermQuery query = new TermQuery(new Term("f1", "value")); IterablePaging paging = new IterablePaging(searcher, query, 100);
for (ScoreDoc sd : paging.skipTo(90).                           gather(20).                           totalHits(totalHitsRef).                           progress(progressRef)) {
  System.out.println("time [" + progressRef.queryTime() + "] " +     "total hits [" + totalHitsRef.totalHits() + "] " +     "searches [" + progressRef.searchesPerformed() + "] " +     "position [" + progressRef.currentHitPosition() + "] " +     "doc id [" + sd.doc + "] " +     "score [" + sd.score + "]"); }

Here's a link to the code LUCENE-2215.