Recently about Lucene
Blur is a new Apache 2.0 licensed software project that provides a search capability built on top of Hadoop and Lucene. Elastic Search and Solr already exist so why build something new? While these projects work well, they didn't have a solid integration with the Hadoop ecosystem. Blur was built specifically for Big Data, taking scalability, redundancy, and performance into consideration from the very start, while leveraging all the goodness that already exists in the Hadoop stack.
A year and a half ago, my project began using Hadoop for data processing. Very early on, we were having networking issues that would make our HDFS cluster network connectivity spotty at best. Over one weekend in particular, we steadily lost network connection to 47 of the 90 data nodes in the cluster. When we came in on Monday morning, I noticed that the MapReduce system was a little sluggish but still working. When I checked HDFS I saw that our capacity had dropped by about 50%. After running an fsck on the cluster I was amazed to find that what seemed like a catastrophic failure over the weekend resulted in a still healthy file system. This experience left a lasting impression on me. It was then that I got the idea to somehow leverage the redundancy and fault tolerance of HDFS for the next version of a search system that I was just beginning to (re)write.
I had already written a custom sharded Lucene server that had been in a production system for a couple of years. Lucene worked really well and did everything that we needed for search. The issue that we faced was that it was running on big iron that was not redundant and could not be easily expanded. After seeing the resilient characteristics of Hadoop first hand, I decided to look into marrying the already mature and impressive feature set of Lucene with the built in redundancy and scalability of the Hadoop platform. From this experiment Blur was created.
The biggest technical issues/features that Blur solves:
- Rapid mass indexing of entire datasets
- Automatic Shard Server Failover
- Near Real-time update compatibility via Lucene NRT
- Compression of Lucene FDT files while maintaining random access performance
- Lucene WAL (Write Ahead Log) to provide data reliability
- Lucene R/W directly into HDFS (the seek on write problem)
- Random access performance with block caching of the Lucene Directory
Data Model
Data in Blur is stored in Tables that contain Rows. Rows must have a unique row id and contain one or more Records. Records have a unique record id (unique within the Row) and a column family for grouping columns that logically make up a single record. Columns contain a name and a value, and a Record can contain multiple columns with the same name.
Architecture
Blur uses Hadoop's MapReduce framework for indexing data, and Hadoop's HDFS filesystem for storing indexes. Thrift is used for all inter-process communications and Zookeeper is used to know the state of the system and to store meta data. The Blur architecture is made up of two types of server processes:
- Blur Controller Server
- Blur Shard Server
The shard server, serves 0 or more shards from all the currently online tables. The calculation of the what shards are online in each shard server is done through the state information in Zookeeper. If a shard server goes down, through interaction with Zookeeper the remaining shard servers detect the failure and determine which if any of the missing shards they need to serve from HDFS.
The controller server provides a single point of entry (logically) to the cluster for spraying out queries, collecting the responses, and providing a single response. Both the controller and shard servers expose the same Thrift API which helps to ease debugging. It also allows developers to start a single shard server and interact with it the same way they would with a large cluster. Many controller servers can be (and should be) run for redundancy. The controllers act as gateways to all of the data that is being served by the shard servers.
Updating / Loading Data
Currently there are two ways to load and update data. The first is through a bulk load in MapReduce and the second is through mutation calls in Thrift.
Bulk Load MapReduce Example
Data Mutation Thrift Example
Searching Data
Any element in the Blur data model is searchable through the normal Lucene semantics: analyzers. Analyzers are defined per Blur table.
The standard Lucene query syntax is the default way to search Blur. If anything outside of the standard syntax is needed, you can create a Lucene query directly with Java objects, and submit them through the expert query API.
The column family grouping within Rows allows for results to be discovered across column families similar to what you would get with an inner join across two tables that share the same key (or in this case rowid). For complicated data models that have multiple column families, this makes for a very powerful search capability.
The following example searches for "value" as a full text search. If I had wanted to search for "value" in a single field like column "colA" in column family "famB" the query would look like "famB.colA:value".
Fetching Data
Fetches can be done by row or by record. This is done by creating a selector object in which you specify the rowid or recordid, and the specific column families or columns that you would like returned. When not specified, the entire Row or Record is returned.
Current State
Blur is nearing it's first release 0.1 and is relatively stable. The first release candidate should be available for download within the next few weeks. In the meantime you can check it out on github:
Thrift In a Nutshell
Essentially thrift is a serialization and RPC framework that allows you to communicate between programs that are not necessarily written in the same language. Thrift is used by defining data types and services in a .thrift file. You then run the .thrift file against the thrift compiler which generates the stub code needed for clients and servers. Currently thrift will generate code for C++, C#, Erlang, Haskell, Java, Objective C/Cocoa, OCaml, Perl, PHP, Python, Ruby, and Squeak. For a more detailed description of thrift along with instructions on how to install thrift if needed, consult the thrift wikiGenerating the Lucene Index
Our first step is to generate a small, simple lucene index. To build our index, 50,000 fake person records were downloaded from the Fake Name Generator in a comma delimited file. Each person record will contain a first name, last name, address and email address. Our indexing code will be very simple and will not be using any of lucene's advanced features.
public class IndexBuilder {
public static void main(String[] args) throws Exception {
String namesFile = "names.csv";
Document doc = new Document();
Field[] fields = new Field[]{new Field("firstName", "", Field.Store.YES, Field.Index.ANALYZED_NO_NORMS),
new Field("lastName", "", Field.Store.YES, Field.Index.ANALYZED_NO_NORMS),
new Field("address", "", Field.Store.YES, Field.Index.ANALYZED_NO_NORMS),
new Field("email", "", Field.Store.YES, Field.Index.ANALYZED_NO_NORMS)};
addFieldsToDocument(doc, fields);
BufferedReader reader = new BufferedReader(new FileReader(namesFile));
IndexWriter indexWriter = new IndexWriter(FSDirectory.open(new File("blog-index")),new IndexWriterConfig(Version.LUCENE_31, new StandardAnalyzer(Version.LUCENE_31)));
String line;
while ((line = reader.readLine()) != null) {
String[] personData = getPersonData(line);
setFieldData(personData, fields);
indexWriter.addDocument(doc);
}
indexWriter.optimize();
indexWriter.close();
}
private static String[] getPersonData(String line) {
return line.split(",");
}
private static void setFieldData(String[] data, Field[] fields) {
int index = 0;
for (Field field : fields) {
field.setValue(data[index++]);
}
}
private static void addFieldsToDocument(Document doc, Field[] fields) {
for (Field field : fields) {
doc.add(field);
}
}
}
Creating the .thrift File
The next step will be to define what objects and services we want in our .thrift file, which will be called lucene_search.thrift. The lucene_search.thrift file is intentionally very basic. For more details on the structure of .thrift files consult the thrift wiki tutorial
//all generated java code will have the following for package name
namespace java bbejeck.thrift.gen
//this is the person object
struct Person {
1: string firstName,
2: string lastName,
3: string address,
4: string email
}
//exception used to send meaningful error messages back to user
exception LuceneSearchException {
1: string message
}
//service definition used by client and server
service LuceneSearch {
list<Person> search(1: string query) throws (1:LuceneSearchException error)
}
As you can see from the example above, the .thrift file format is completely language agnostic. Next we need to generate our java and ruby code. The following were run from the command line:
- $ thrift --gen java lucene_search.thrift
- $ thrift --gen rb lucene_search.thrift
Thrift Server - Java
Thrift generates all the stub code you need for a server to expose your service or program. The only code we will need to write is a class that implements the generated Iface interface (defined in the LuceneSearch class), which contains the search method defined in our .thrift file.
public class LuceneThriftServer {
private static final int PORT = 9090;
private static int numberThreads = 5;
public static void main(String[] args) throws Exception {
TServerSocket serverSocket = new TServerSocket(PORT, 100000);
LuceneSearch.Processor searchProcessor = new LuceneSearch.Processor(new SearchHandler(args[0]));
if (args.length > 1) {
numberThreads = Integer.parseInt(args[1]);
}
TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverSocket);
serverArgs.maxWorkerThreads(numberThreads);
TServer thriftServer = new TThreadPoolServer(serverArgs.processor(searchProcessor).protocolFactory(new TBinaryProtocol.Factory()));
thriftServer.serve();
}
Iface Implementation
The SearchHandler class actually does the work of searching the lucene index. One tradeoff made here is that any exception while searching is caught and re-thrown as a LuceneSearchException. While it's usually not a great idea to just re-throw an exception, in this case it makes sense to do so. Since the LuceneSearchException is defined in the lucene_search.thrift file, the generated client code will handle that exception. So instead of receiving a generic thrift exception when an error occurs, the client should receive a more meaningful error message.
public class SearchHandler implements LuceneSearch.Iface {
private IndexSearcher searcher;
private QueryParser queryParser;
private static final int MAX_RESULTS = 1000;
public SearchHandler(String indexPath) {
try {
searcher = new IndexSearcher(FSDirectory.open(new File(indexPath)), true);
queryParser = new QueryParser(Version.LUCENE_31, null, new StandardAnalyzer(Version.LUCENE_31));
queryParser.setAllowLeadingWildcard(true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public List<Person> search(String query) throws LuceneSearchException {
List<Person> results = new ArrayList<Person>();
try {
Query q = queryParser.parse(query);
TopDocs topDocs = searcher.search(q, MAX_RESULTS);
for (ScoreDoc sd : topDocs.scoreDocs) {
Document document = searcher.doc(sd.doc);
results.add(getPersonFromDocument(document));
}
} catch (Exception e) {
throw new LuceneSearchException(e.getMessage());
}
return results;
}
private Person getPersonFromDocument(Document document) {
Person p = new Person();
p.firstName = document.get("firstName");
p.lastName = document.get("lastName");
p.address = document.get("address");
p.email = document.get("email");
return p;
}
}
The next step in our process is to write the client.
Thrift Client - Ruby
Writing the thrift ruby client is even easier than writing the server code. If you have not already done so, install the thrift gem by running "gem install thrift" to get the required thrift library code. All the code you need for your client is already generated by thrift. At this point we are only doing what is needed to get the client to communicate with the server.
module ThriftConnection
class LuceneClient
def initialize(host='localhost', port=9090)
socket = Thrift::Socket.new(host, port)
@transport = Thrift::BufferedTransport.new(socket)
protocol_factory = ::Thrift::BinaryProtocolFactory.new
protocol = protocol_factory.get_protocol(@transport)
@transport.open
@client = LuceneSearch::Client.new(protocol)
end
def search(query)
@client.search(query)
end
def close
@transport.close
end
end
end
Running With Scissors
This section has the odd title "Running With Scissors", because like actual running with scissors, what we are about to do may not be a great idea. In all the thrift generated code there is a warning at the top "DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING", obviously I don't, but I'm not going to let that stop me at this point (sometimes you just have to see if you can get something to work!) What we've done is implement method_missing in the generated Person class (found in lucene_search_types.rb) so we can specify searches ala ActiveRecord style. What we are going to do to accomplish this is use a regular expression to pull out what fields to search for and use the arguments passed in as the search values. The regular expression here is fairly simple and only aims to handle simple searches.
#added to translate from symbol to expected search format
SEARCH_KEYS_MAPPING = {:first_name => 'firstName',
:last_name => 'lastName',
:email => 'email',
:address => 'address'}
def self.method_missing(method_name, *args)
lucene_client = ThriftConnection::LuceneClient.new
query = ""
#handles find_by_first_name etc
if method_name.to_s =~ /^find_by_([a-z]+_?[a-z]*)$/
query = "#{SEARCH_KEYS_MAPPING[$1.to_sym]}:#{args[0]}"
#handles find_by_first_name_or_last_name, find_by_first_name_and_email
elsif method_name.to_s =~/^find_by_([a-z]+_[a-z]+)_([a-z]+)_([a-z]+_?[a-z]*)$/
query ="#{SEARCH_KEYS_MAPPING[$1.to_sym]}:#{args[0]} #{$2.upcase} #{SEARCH_KEYS_MAPPING[$3.to_sym]}:#{args[1]}"
else
raise ArgumentError.new("search method pattern #{method_name} not recognized")
end
results = lucene_client.search(query)
lucene_client.close
results
end
As we'll see in the next section, this actually worked, but I still view this more as a useful experiment. First of all this was placed in generated code, so any time you make changes you would have to manually get the method_missing definition back into the Person class. Secondly, Lucene search syntax is really not all that hard to learn.
Testing
All of what we have done so far would not be worth much if we could not verify our work with some testing. Here is the unit test to verify that we are indeed able to search a Lucene index from Ruby. To get some names to search on I simply ranhead names.csvand then used some of the information in various combinations to get counts of what searches should return. For example to get an idea of what searching for a first name of Elizabeth or last name of Krause would return I ran
cat names.csv | grep -iE 'elizabeth|krause' | wc -lwhich returned a count of 289. So, first making sure that our thrift server was running in the background, here is the unit test that was run to verify our Ruby client searching against a Lucene index.
class SearchTest < Test::Unit::TestCase
def setup
@lucene_client = ThriftConnection::LuceneClient.new
end
def teardown
@lucene_client.close
end
def test_search_client_first_name
persons = @lucene_client.search("firstName:Tia")
assert_equal(5, persons.length)
persons.each do |person|
assert_equal("Tia", person.firstName)
end
end
def test_search_person_class_first_name
persons = Person.find_by_first_name("Tia")
assert_equal(5, persons.length)
persons.each do |person|
assert_equal("Tia", person.firstName)
end
end
def test_search_client_first_name_email_domain
persons = @lucene_client.search("+firstName:Elizabeth +email:*pookmail.com")
assert_equal(59, persons.length)
end
def test_search_person_class_first_name_email_domain
persons = Person.find_by_first_name_and_email("elizabeth", "*pookmail.com")
assert_equal(59, persons.length)
end
def test_search_client_first_name_and_last_name
persons = @lucene_client.search("+firstName:Elizabeth +lastName:Krause")
assert_equal(1, persons.length)
person = persons[0]
assert_equal("Elizabeth", person.firstName)
assert_equal("Krause", person.lastName)
end
def test_search_person_class_first_name_and_last_name
persons = Person.find_by_first_name_and_last_name("elizabeth", "krause")
assert_equal(1, persons.length)
person = persons[0]
assert_equal("Elizabeth", person.firstName)
assert_equal("Krause", person.lastName)
end
def test_search_person_class_first_name_or_last_name
persons = Person.find_by_first_name_or_last_name("elizabeth", "krause")
assert_equal(289, persons.length)
end
def test_invalid_search
assert_raises ArgumentError do
Person.find_person_by_name("tia")
end
end
end
Conclusion
Thrift is a compelling alternative for RPC or message passing where one might otherwise be using either REST, Java RMI or middleware (JMS, AMQP). There is a great comparison of how thrift performs against other forms of RPC in this thrift tutorial from OCI found near the end of the article. It is hoped the reader was able to learn something useful. Thanks for your timeResources
Full source for the blog including the generated code can be found on github. If you are interested in running the test you can download lucene-thrift-example.tar.gz extract the tar file and execute the runSearchTest.sh script. You do not need to have thrift installed to run the test.- For more information on thrift the thrift wiki is a great start
- More information on Lucene can be found here
- Each of your indexes contains 100,000,000's documents.
- You have 500 users on your system actively performing searches.
- You have 100 search results per page.
- And, your typical user pages through the first 10 pages of results. (Normal occurrence on some systems)
IndexSearcher searcher = new IndexSearcher(reader);
TermQuery query = new TermQuery(new Term("f1", "value")); IterablePaging paging = new IterablePaging(searcher, query, 100);
for (ScoreDoc sd : paging.skipTo(90)) { System.out.println("doc id [" + sd.doc + "] " + "score [" + sd.score + "]"); }
IndexSearcher searcher = new IndexSearcher(reader);
TotalHitsRef totalHitsRef = new TotalHitsRef(); ProgressRef progressRef = new ProgressRef();
TermQuery query = new TermQuery(new Term("f1", "value")); IterablePaging paging = new IterablePaging(searcher, query, 100);
for (ScoreDoc sd : paging.skipTo(90). gather(20). totalHits(totalHitsRef). progress(progressRef)) {
System.out.println("time [" + progressRef.queryTime() + "] " + "total hits [" + totalHitsRef.totalHits() + "] " + "searches [" + progressRef.searchesPerformed() + "] " + "position [" + progressRef.currentHitPosition() + "] " + "doc id [" + sd.doc + "] " + "score [" + sd.score + "]"); }

