Recently about Java

Recently, I was looking into computer vision related technologies.  One of the interesting techniques is known as the "integral image" which can enable more advanced techniques, most notably the Viola-Jones object detection framework, which uses a series of simple, Haar-like features, which are rectangular areas with dark and light regions, to find areas that match patterns that correspond with the object you are trying to find.

To do this computation, you search an image by sliding an imaginary window with the pattern  at multiple scales over the entire image, looking for differences between the dark and light areas of the pattern that meet a threshold that is determined by the training algorithm.  Doing this can require many computations, so how can we speed this up? Let's frame the question this way: Starting with a gray scale image in which each pixel has a value from 0-255, how can we quickly compute the average color value in a sub-region of an image?

The conventional way would be to write a loop similar the following:
private static int computeSum(BufferedImage image, int regionX1, int regionY1, int regionX2, int regionY2) {
   //Compute a sub-region of the image 
   int sum = 0;
   Raster data = image.getData();
   for (int x = regionX1; x <= regionX2; x++) {
      for (int y = regionY1; y <= regionY2; y++) {
         int value = data.getSample(y, x, 0);
         sum = sum + value;
      }
   }
   return sum;
}

But we can do better than this, especially if we'll need to compute multiple regions and not constantly recompute the sums.  To create an integral image, we make a single pass over the entire image and for each pixel in the original image compute the sum of all the pixels to the left and above of this pixel and add it to the value of the original pixel.  The code for that is below.


public class IntegralImage {
   private int[][] integralImage = null;
	
   public IntegralImage(BufferedImage image) {
      int originalImageHeight = image.getHeight();
      int originalImageWidth = image.getWidth();
      integralImage = new int[originalImageHeight][originalImageWidth];
      Raster originalPixels = image.getData();

      int originalPixelValue = 0;
      for (int row = 0; row < originalImageHeight; row++) {
         for (int column = 0; column < originalImageWidth; column++) {
      originalPixelValue = originalPixels.getSample(column, row, 0); 

         //For the leftmost pixel, just copy value from original
         if (row == 0 && column == 0) {
            integralImage[row][column] = originalPixelValue;
         }

        //For the first row, just add the value to the left of this pixel
         else if (row == 0) {
             integralImage[row][column] = originalPixelValue + integralImage[row][column - 1];
         }

        //For the first column, just add the value to the top of this pixel
         else if (column == 0) {
            integralImage[row][column] = originalPixelValue + integralImage[row - 1][column];       
         }

       //For a pixel that has pixels to its left, above it, and to the left and above diagonally, 
       //add the left and above values and subtract the value to the left and above diagonally
       else {
          integralImage[row][column] = originalPixelValue + integralImage[row][column - 1] + integralImage[row - 1][column] - integralImage[row - 1][column - 1];
       }
    }
}
After this pass through the image, we can compute the sum of any region in constant time by doing the following:

public int total(int x1, int y1, int x2, int y2) {
   int a = x1 > 0 && y1 > 0 ? integralImage[x1-1][y1-1] : 0;
   int b = x1 > 0 ? integralImage[x1-1][y2] : 0;
   int c = y1 > 0 ? integralImage[x2][y1-1] : 0;
   int d = integralImage[x2][y2];
   return a + d - b - c;
}

And that's it! We can now apply this faster computation to a number of interesting problems. For a nice tutorial on this, you may want to check out the following.

Blur is a new Apache 2.0 licensed software project that provides a search capability built on top of Hadoop and Lucene. Elastic Search and Solr already exist so why build something new? While these projects work well, they didn't have a solid integration with the Hadoop ecosystem. Blur was built specifically for Big Data, taking scalability, redundancy, and performance into consideration from the very start, while leveraging all the goodness that already exists in the Hadoop stack.

A year and a half ago, my project began using Hadoop for data processing. Very early on, we were having networking issues that would make our HDFS cluster network connectivity spotty at best. Over one weekend in particular, we steadily lost network connection to 47 of the 90 data nodes in the cluster. When we came in on Monday morning, I noticed that the MapReduce system was a little sluggish but still working. When I checked HDFS I saw that our capacity had dropped by about 50%. After running an fsck on the cluster I was amazed to find that what seemed like a catastrophic failure over the weekend resulted in a still healthy file system. This experience left a lasting impression on me. It was then that I got the idea to somehow leverage the redundancy and fault tolerance of HDFS for the next version of a search system that I was just beginning to (re)write.

I had already written a custom sharded Lucene server that had been in a production system for a couple of years. Lucene worked really well and did everything that we needed for search. The issue that we faced was that it was running on big iron that was not redundant and could not be easily expanded. After seeing the resilient characteristics of Hadoop first hand, I decided to look into marrying the already mature and impressive feature set of Lucene with the built in redundancy and scalability of the Hadoop platform. From this experiment Blur was created.

The biggest technical issues/features that Blur solves:

  • Rapid mass indexing of entire datasets
  • Automatic Shard Server Failover
  • Near Real-time update compatibility via Lucene NRT
  • Compression of Lucene FDT files while maintaining random access performance
  • Lucene WAL (Write Ahead Log) to provide data reliability
  • Lucene R/W directly into HDFS (the seek on write problem)
  • Random access performance with block caching of the Lucene Directory

Data Model

Data in Blur is stored in Tables that contain Rows. Rows must have a unique row id and contain one or more Records. Records have a unique record id (unique within the Row) and a column family for grouping columns that logically make up a single record. Columns contain a name and a value, and a Record can contain multiple columns with the same name.

Architecture

Blur uses Hadoop's MapReduce framework for indexing data, and Hadoop's HDFS filesystem for storing indexes. Thrift is used for all inter-process communications and Zookeeper is used to know the state of the system and to store meta data. The Blur architecture is made up of two types of server processes:

  • Blur Controller Server
  • Blur Shard Server

The shard server, serves 0 or more shards from all the currently online tables. The calculation of the what shards are online in each shard server is done through the state information in Zookeeper. If a shard server goes down, through interaction with Zookeeper the remaining shard servers detect the failure and determine which if any of the missing shards they need to serve from HDFS.

The controller server provides a single point of entry (logically) to the cluster for spraying out queries, collecting the responses, and providing a single response. Both the controller and shard servers expose the same Thrift API which helps to ease debugging. It also allows developers to start a single shard server and interact with it the same way they would with a large cluster. Many controller servers can be (and should be) run for redundancy. The controllers act as gateways to all of the data that is being served by the shard servers.

Updating / Loading Data

Currently there are two ways to load and update data. The first is through a bulk load in MapReduce and the second is through mutation calls in Thrift.

Bulk Load MapReduce Example

Data Mutation Thrift Example

Searching Data

Any element in the Blur data model is searchable through the normal Lucene semantics: analyzers. Analyzers are defined per Blur table.

The standard Lucene query syntax is the default way to search Blur. If anything outside of the standard syntax is needed, you can create a Lucene query directly with Java objects, and submit them through the expert query API.

The column family grouping within Rows allows for results to be discovered across column families similar to what you would get with an inner join across two tables that share the same key (or in this case rowid). For complicated data models that have multiple column families, this makes for a very powerful search capability.

The following example searches for "value" as a full text search. If I had wanted to search for "value" in a single field like column "colA" in column family "famB" the query would look like "famB.colA:value".

Fetching Data

Fetches can be done by row or by record. This is done by creating a selector object in which you specify the rowid or recordid, and the specific column families or columns that you would like returned. When not specified, the entire Row or Record is returned.

Current State

Blur is nearing it's first release 0.1 and is relatively stable. The first release candidate should be available for download within the next few weeks. In the meantime you can check it out on github:

https://github.com/nearinfinity/blur

http://blur.io

Near Infinity is proud to host a new Gradle Expert Training class, taught by Tim Berglund, the Author of the Gradle O'Reilly Book Series. This is an intensive and highly practical 3-day Gradle course. You will become familiar with all major concepts of Gradle and how to best use Gradle for simple as well as complex build scenarios. This course is packed with hands-on exercises.

To learn more please visit Gradeware's Gradle Expert Training course description. To register visit Gradeware's Gradle course registration web page.

Be sure to check out our Upcoming NIC-U Training Courses page for information on more upcoming courses.

This post is going to demonstrate thrift usage by searching a Lucene index from Ruby.

Thrift In a Nutshell

Essentially thrift is a serialization and RPC framework that allows you to communicate between programs that are not necessarily written in the same language. Thrift is used by defining data types and services in a .thrift file. You then run the .thrift file against the thrift compiler which generates the stub code needed for clients and servers. Currently thrift will generate code for C++, C#, Erlang, Haskell, Java, Objective C/Cocoa, OCaml, Perl, PHP, Python, Ruby, and Squeak. For a more detailed description of thrift along with instructions on how to install thrift if needed, consult the thrift wiki

Generating the Lucene Index

Our first step is to generate a small, simple lucene index. To build our index, 50,000 fake person records were downloaded from the Fake Name Generator in a comma delimited file. Each person record will contain a first name, last name, address and email address. Our indexing code will be very simple and will not be using any of lucene's advanced features.
public class IndexBuilder {

    public static void main(String[] args) throws Exception {
        String namesFile = "names.csv";
        Document doc = new Document();
        Field[] fields = new Field[]{new Field("firstName", "", Field.Store.YES, Field.Index.ANALYZED_NO_NORMS),
                new Field("lastName", "", Field.Store.YES, Field.Index.ANALYZED_NO_NORMS),
                new Field("address", "", Field.Store.YES, Field.Index.ANALYZED_NO_NORMS),
                new Field("email", "", Field.Store.YES, Field.Index.ANALYZED_NO_NORMS)};
        addFieldsToDocument(doc, fields);

        BufferedReader reader = new BufferedReader(new FileReader(namesFile));

        IndexWriter indexWriter = new IndexWriter(FSDirectory.open(new File("blog-index")),new IndexWriterConfig(Version.LUCENE_31, new StandardAnalyzer(Version.LUCENE_31)));

        String line;
        while ((line = reader.readLine()) != null) {
            String[] personData = getPersonData(line);
            setFieldData(personData, fields);
            indexWriter.addDocument(doc);
        }
        indexWriter.optimize();
        indexWriter.close();
    }

    private static String[] getPersonData(String line) {
        return line.split(",");
    }

    private static void setFieldData(String[] data, Field[] fields) {
        int index = 0;
        for (Field field : fields) {
            field.setValue(data[index++]);
        }
    }

    private static void addFieldsToDocument(Document doc, Field[] fields) {
        for (Field field : fields) {
            doc.add(field);
        }
    }
}

Creating the .thrift File

The next step will be to define what objects and services we want in our .thrift file, which will be called lucene_search.thrift. The lucene_search.thrift file is intentionally very basic. For more details on the structure of .thrift files consult the thrift wiki tutorial
//all generated java code will have the following for package name
namespace java bbejeck.thrift.gen

//this is the person object 
struct Person {
  1: string firstName,
  2: string lastName,
  3: string address,
  4: string email
}

//exception used to send meaningful error messages back to user
exception LuceneSearchException {
  1: string message
}

//service definition used by client and server
service LuceneSearch { 
    list<Person> search(1: string query) throws (1:LuceneSearchException error) 
}
As you can see from the example above, the .thrift file format is completely language agnostic. Next we need to generate our java and ruby code. The following were run from the command line:
  • $ thrift --gen java lucene_search.thrift
  • $ thrift --gen rb lucene_search.thrift
The generated code ends up in two directories named gen-java/ and gen-rb/ respectively. The files generated for java are LuceneSearch.java, LuceneSearchException.java and Person.java. The generated ruby files are lucene_search.rb, lucene_search_types.rb and lucene_search_constants.rb. In our next step, we are going to use generated java code to write our thrift server.

Thrift Server - Java

Thrift generates all the stub code you need for a server to expose your service or program. The only code we will need to write is a class that implements the generated Iface interface (defined in the LuceneSearch class), which contains the search method defined in our .thrift file.
public class LuceneThriftServer {
    private static final int PORT = 9090;
    private static int numberThreads = 5;

    public static void main(String[] args) throws Exception {
        TServerSocket serverSocket = new TServerSocket(PORT, 100000);
        LuceneSearch.Processor searchProcessor = new LuceneSearch.Processor(new SearchHandler(args[0]));
        if (args.length > 1) {
            numberThreads = Integer.parseInt(args[1]);
        }
        TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverSocket);
        serverArgs.maxWorkerThreads(numberThreads);
        TServer thriftServer = new TThreadPoolServer(serverArgs.processor(searchProcessor).protocolFactory(new TBinaryProtocol.Factory()));
        thriftServer.serve();
    }

Iface Implementation

The SearchHandler class actually does the work of searching the lucene index. One tradeoff made here is that any exception while searching is caught and re-thrown as a LuceneSearchException. While it's usually not a great idea to just re-throw an exception, in this case it makes sense to do so. Since the LuceneSearchException is defined in the lucene_search.thrift file, the generated client code will handle that exception. So instead of receiving a generic thrift exception when an error occurs, the client should receive a more meaningful error message.
public class SearchHandler implements LuceneSearch.Iface {
    private IndexSearcher searcher;
    private QueryParser queryParser;
    private static final int MAX_RESULTS = 1000;

    public SearchHandler(String indexPath) {
        try {
            searcher = new IndexSearcher(FSDirectory.open(new File(indexPath)), true);
            queryParser = new QueryParser(Version.LUCENE_31, null, new StandardAnalyzer(Version.LUCENE_31));
            queryParser.setAllowLeadingWildcard(true);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public List<Person> search(String query) throws LuceneSearchException {
        List<Person> results = new ArrayList<Person>();
        try {
            Query q = queryParser.parse(query);
            TopDocs topDocs = searcher.search(q, MAX_RESULTS);
            for (ScoreDoc sd : topDocs.scoreDocs) {
                Document document = searcher.doc(sd.doc);
                results.add(getPersonFromDocument(document));
            }
        } catch (Exception e) {
            throw new LuceneSearchException(e.getMessage());
        }
        return results;
    }

    private Person getPersonFromDocument(Document document) {
        Person p = new Person();
        p.firstName = document.get("firstName");
        p.lastName = document.get("lastName");
        p.address = document.get("address");
        p.email = document.get("email");

        return p;
    }
}
The next step in our process is to write the client.

Thrift Client - Ruby

Writing the thrift ruby client is even easier than writing the server code. If you have not already done so, install the thrift gem by running "gem install thrift" to get the required thrift library code. All the code you need for your client is already generated by thrift. At this point we are only doing what is needed to get the client to communicate with the server.
module ThriftConnection

  class LuceneClient

    def initialize(host='localhost', port=9090)
      socket = Thrift::Socket.new(host, port)
      @transport = Thrift::BufferedTransport.new(socket)
      protocol_factory = ::Thrift::BinaryProtocolFactory.new
      protocol = protocol_factory.get_protocol(@transport)
      @transport.open
      @client = LuceneSearch::Client.new(protocol)
    end

    def search(query)
      @client.search(query)
    end

    def close
      @transport.close
    end

  end
end

Running With Scissors

This section has the odd title "Running With Scissors", because like actual running with scissors, what we are about to do may not be a great idea. In all the thrift generated code there is a warning at the top "DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING", obviously I don't, but I'm not going to let that stop me at this point (sometimes you just have to see if you can get something to work!) What we've done is implement method_missing in the generated Person class (found in lucene_search_types.rb) so we can specify searches ala ActiveRecord style. What we are going to do to accomplish this is use a regular expression to pull out what fields to search for and use the arguments passed in as the search values. The regular expression here is fairly simple and only aims to handle simple searches.
#added to translate from symbol to expected search format
  SEARCH_KEYS_MAPPING = {:first_name => 'firstName',
                                            :last_name => 'lastName',
                                            :email => 'email',
                                            :address => 'address'}


  def self.method_missing(method_name, *args)
    lucene_client = ThriftConnection::LuceneClient.new
    query = ""
        #handles find_by_first_name etc
    if method_name.to_s =~ /^find_by_([a-z]+_?[a-z]*)$/
      query = "#{SEARCH_KEYS_MAPPING[$1.to_sym]}:#{args[0]}"
        #handles find_by_first_name_or_last_name, find_by_first_name_and_email 
    elsif method_name.to_s =~/^find_by_([a-z]+_[a-z]+)_([a-z]+)_([a-z]+_?[a-z]*)$/
      query ="#{SEARCH_KEYS_MAPPING[$1.to_sym]}:#{args[0]} #{$2.upcase} #{SEARCH_KEYS_MAPPING[$3.to_sym]}:#{args[1]}"
    else
       raise ArgumentError.new("search method pattern #{method_name} not recognized")
    end

    results = lucene_client.search(query)
    lucene_client.close
    results
  end
As we'll see in the next section, this actually worked, but I still view this more as a useful experiment. First of all this was placed in generated code, so any time you make changes you would have to manually get the method_missing definition back into the Person class. Secondly, Lucene search syntax is really not all that hard to learn.

Testing

All of what we have done so far would not be worth much if we could not verify our work with some testing. Here is the unit test to verify that we are indeed able to search a Lucene index from Ruby. To get some names to search on I simply ran
head names.csv
and then used some of the information in various combinations to get counts of what searches should return. For example to get an idea of what searching for a first name of Elizabeth or last name of Krause would return I ran
cat names.csv | grep -iE 'elizabeth|krause' | wc -l 
which returned a count of 289. So, first making sure that our thrift server was running in the background, here is the unit test that was run to verify our Ruby client searching against a Lucene index.
class SearchTest < Test::Unit::TestCase

  def setup
    @lucene_client = ThriftConnection::LuceneClient.new
  end


  def teardown
    @lucene_client.close
  end

  def test_search_client_first_name
    persons = @lucene_client.search("firstName:Tia")
    assert_equal(5, persons.length)

    persons.each do |person|
      assert_equal("Tia", person.firstName)
    end
  end

  def test_search_person_class_first_name
    persons = Person.find_by_first_name("Tia")
    assert_equal(5, persons.length)

    persons.each do |person|
      assert_equal("Tia", person.firstName)
    end
  end

  def test_search_client_first_name_email_domain
    persons = @lucene_client.search("+firstName:Elizabeth +email:*pookmail.com")
    assert_equal(59, persons.length)
  end

  def test_search_person_class_first_name_email_domain
    persons = Person.find_by_first_name_and_email("elizabeth", "*pookmail.com")
    assert_equal(59, persons.length)
  end

  def test_search_client_first_name_and_last_name
    persons = @lucene_client.search("+firstName:Elizabeth +lastName:Krause")
    assert_equal(1, persons.length)
    person = persons[0]

    assert_equal("Elizabeth", person.firstName)
    assert_equal("Krause", person.lastName)
  end

  def test_search_person_class_first_name_and_last_name
    persons = Person.find_by_first_name_and_last_name("elizabeth", "krause")
    assert_equal(1, persons.length)
    person = persons[0]

    assert_equal("Elizabeth", person.firstName)
    assert_equal("Krause", person.lastName)
  end

  def test_search_person_class_first_name_or_last_name
    persons = Person.find_by_first_name_or_last_name("elizabeth", "krause")
    assert_equal(289, persons.length)
  end

  def test_invalid_search
    assert_raises ArgumentError do
      Person.find_person_by_name("tia")
    end
  end

end

Conclusion

Thrift is a compelling alternative for RPC or message passing where one might otherwise be using either REST, Java RMI or middleware (JMS, AMQP). There is a great comparison of how thrift performs against other forms of RPC in this thrift tutorial from OCI found near the end of the article. It is hoped the reader was able to learn something useful. Thanks for your time

Resources

Full source for the blog including the generated code can be found on github. If you are interested in running the test you can download lucene-thrift-example.tar.gz extract the tar file and execute the runSearchTest.sh script. You do not need to have thrift installed to run the test.
  • For more information on thrift the thrift wiki is a great start
  • More information on Lucene can be found here
Recently, I needed to enhance some existing code to apply optional filters when retrieving the status of application-created JMS queues/topics (resources) from WebLogic.  These filters would be based upon a pre-defined set of categories for the JMS resources in our application.  The existing code used the deprecated MBeanHome interface to connect to the WebLogic MBean server, retrieved all "JMSServerRuntime" MBeans, and returned the status of only application-created JMS resources through the use of nested iterations over the returned MBean set.  Since I just needed to add a filter on top of the returned MBean set, the easiest route would have been to add an iteration over the returned JMSServerRuntimeMBeans to return only the ones that matched the provided filter.  However, since the existing performance was already slower than desired, I knew that adding an iteration would degrade it even further.  As a result, I decided to leverage MBeanServerConnection queryNames when retrieving the MBeans directly from WebLogic to allow WebLogic to do the filtering for me instead of iterating over the results after the MBeans are returned.  Also, I figured I'd refactor the existing code to no longer use deprecated WebLogic APIs in favor of the standard JMX programming model.

Here is an example of the existing code.  It performs the following steps:
1.  Connect to the MBeanServer using MBeanHome  
2.  Retrieve all MBeans of type "JMSServerRuntime"
3.  Iterate over the set to find the MBean for the JMS Server created by the application
4.  Retrieve all JMS destinations for the application-created JMS Server, and then iterate over them to retrieve each destination

/**
  * Obtain the status of JMS queues and topics
  * @return List of all JMSDestinationRuntimeMBean objects (one per destination)
  */
private static List<JMSDestinationRuntimeMBean> getJMSStatus()
{
List<JMSDestinationRuntimeMBean> destinations = new ArrayList<JMSDestinationRuntimeMBean>();
    try
    {
       // Look up the management bean home for the admin server
        Context ctx = new InitialContext();
        MBeanHome home = (MBeanHome) ctx.lookup(MBeanHome.ADMIN_JNDI_NAME);

        // Get all JMS server MBeans
        Set mbeanSet = home.getMBeansByType("JMSServerRuntime");

        // Iterate over the set to find the the JMS Server used by our application
Iterator mBeanIterator = mbeanSet.iterator();
        while ( mBeanIterator.hasNext() )
        {
           JMSServerRuntimeMBean server = (JMSServerRuntimeMBean) mBeanIterator.next();

            // If it's our application-created JMS server, then get all of its destinations
            String name = server.getName().toUpperCase();
            if ( name.startsWith("JMSServerName") ) //insert the name of the JMS server used by the application
            {
               JMSDestinationRuntimeMBean[] dest = server.getDestinations();
                for ( int i=0; i < dest.length; i++ )
                {
             //NOTE: This is where we would add another iteration to determine if the destination meets any provided filters
destinations.add(dest[i]);
                }
            }
        }
    }
    catch (Exception ex)
    {
        LOG.warn("Could not retrieve JMS statistics for all destinations", ex);
    }

    return(destinations);
}

Based on my analysis and testing, I found the performance of this method to be degraded in relation to the number of MBeans returned by WebLogic and the nested iterations over the returned MBean set.  So instead of retrieving MBeans using MBeanHome getMBeansByType, I decided to try queryNames from the MBeanServerConnection interface.  The queryNames method returns the MBean names from the MBean server and takes an ObjectName and QueryExp as parameters.  Since the ObjectName identifies the MBean names to be retrieved, I used "Type=JMSDestinationRuntime".  The QueryExp parameter was where I was hoping WebLogic would do the filtering for me.  After doing some more testing, I found thats since WebLogic performed the filtering before returning the MBean set, the results were returned much faster than retrieving the full list of MBeans.  The only drawback of this approach was that queryNames returns the ObjectNames for the MBeans selected and not all the attributes needed by our application for the status of the JMS queues/topics.  An additional query to the MBean server is necessary to retrieve the missing attributes.  Even with this additional query, the enhanced code returned the results faster than the existing code.

Here is an example of the updated code.  It performs the following steps:
1.  Create the QueryExp based upon the provided filters
2.  Connect to the WebLogic Admin MBeanServer using JMXConnector.   
3.  Retrieve MBeans (using QueryNames) of type "JMSServerRuntime" and the QueryExp
4.  Check that all MBeans have the application-created JMSServerRuntime
5.  Retrieve all JMS destinations for the application-created JMS Server, and then iterate over them to retrieve each destination

    /**
    * Obtain the status of JMS queues and topics
    * @param jmsNameList required leading list of names of queue/topic (if null then all queues/topics returned)
    * @return List of all JMSStatus objects (one per destination)
    */
    public static List<JMSStatus> getJMSStatusByNamedList(List<String> jmsNameList)
    {
        QueryExp fullQuery = null;
        if (jmsNameList != null)
        {
            QueryExp query = null;
            for (String jmsName : jmsNameList)
            {
                AttributeValueExp attribute = Query.attr(mBeanName);
                StringValueExp name = Query.value(jmsName);
                query = Query.initialSubString(attribute, name);
                fullQuery = (fullQuery != null) ? Query.or(fullQuery, query) : query;
            }
        }

        return getJMSStatusByQuery(fullQuery);

    }

    /**
    * Obtain the status of JMS queues and topics
    * @param query optional QueryExp to filter the returned jms destinations (if null then all queues/topics returned)
    * @return List of all JMSStatus objects (one per destination) sorted by name
    */
    private static List<JMSStatus> getJMSStatusByQuery(QueryExp query)
    {
        List<JMSStatus> destinations = new ArrayList<JMSStatus>();
private String[] mBeanAttributes = {"Name", "DestinationType", "MessagesCurrentCount",
            "MessagesPendingCount", "MessagesReceivedCount", "MessagesHighCount"};

        MBeanServerConnection connection = null;
        JMXConnector connector = null;

        try
        {
            connector = initConnection();
            connection = connector.getMBeanServerConnection();

            Set<ObjectName> mbeanSet = connection.queryNames(new ObjectName("*:*,Type=JMSDestinationRuntime"), query);
            for (ObjectName mbean : mbeanSet)
            {
                if (mbean.getKeyProperty("JMSServerRuntime").toLowerCase().startsWith("JMSServerName")) // Name of the application-created JMS Server name
                {
                        AttributeList attributes = connection.getAttributes(mbean, mBeanAttributes);
                    if (attributes.size() == mBeanAttributes.length)
                    {
JMSStatus stat = new JMSStatus(
                                (Attribute) attributes.get(0),
                                (Attribute) attributes.get(1),
                                (Attribute) attributes.get(2),
                                (Attribute) attributes.get(3),
                                (Attribute) attributes.get(4),
                                (Attribute) attributes.get(5),
                                mbean.getKeyProperty("JMSServerRuntime"),
                                mbean.getKeyProperty("ServerRuntime"));

                        destinations.add(stat);
                 }
                }
            }
        }
        catch (Exception ex)
        {
            LOG.warn("Could not retrieve JMS statistics for all destinations", ex);
        }
        finally
        {
            if (connector != null)
            {
                try {
                    connector.close();
                } catch (IOException ex) {
                    LOG.warn("Could not close the MBean Server connection", ex);
                }
            }
        }

        return(destinations);

    }

    /**
    * Initialize the JMX connection to the WebLogic Admin MBeanServer.
    * @return JMXConnector for the MBeanServer connection
    */
    private static JMXConnector initConnection() throws IOException {

        JMXServiceURL serviceURL = new JMXServiceURL(protocol, host, port, jndiPath); // Update using the appropriate protocol, host, port, jndiPath

        Map<String,Object> env = new HashMap<String,Object>();
        env.put(Context.SECURITY_PRINCIPAL, "login");  // update with admin user login
        env.put(Context.SECURITY_CREDENTIALS, "password");  // update with admin user password
        env.put(JMXConnectorFactory.PROTOCOL_PROVIDER_PACKAGES, "weblogic.management.remote");
        env.put("jmx.remote.x.request.waiting.timeout", 10000);

        return JMXConnectorFactory.connect(serviceURL, env);
   }