Oracle Coherence Top n Query

excel-pivot-one-row-two-values

For this blog I am going to talk about how to do an Oracle Coherence Top n query – that is a query such as find me the top n things in a cache – where the top is based on an attribute of the cache entry, for example the top 5 orders based on order value. Obviously we could also find the bottom n or other such queries. The blog has a bit of preamble to set the scene and cover my example scenario, then we will look at the code to actually perform an Oracle Coherence Top n query and finally look at a few more examples using different aggregators (and we will learn something not everyone realises about ValueExtractors too). Apologies to any .Net or C++ developers but the code here is all Java. If you want to use this in either C# of C++ then you only need to create a stub of the LimitAggregator class I describe and then deploy the Java version to your server.
So, with a little help from Shakespeare, we will go and count some words…

Whilst playing around with code from my previous blog on Oracle Coherence Pivot Table Queries I had a scenario where I wanted to perform a Top n query; I was trying to replicate something I had seen elsewhere as an example and realised that out of the box Coherence does not have anything built in that allows you to do this type of query.

— First Witch:
“In the poysond Entrailes throw
Toad, that vnder cold stone,
Dayes and Nights, ha’s thirty one:
Sweltred Venom sleeping got,
Boyle thou first i’th’ charmed pot”
Macbeth (IV, i)

The exact scenario was a trivial example where I have a cache of “words” and the word entry has a count of how many times that word occurred in a book, or in our case a play. I then want to query for something like the top n most used words. I could just as easily have wanted the least used words or some other similar query.

Caching Words

So, lets start with the scenario. I need a cache of words so I need a simple Word class, the attributes will be the word itself, the usage count and the length of the word. I know I could get the length from the String without needing a field but it makes it easier to extract using a PofExtractor later.

  1. public class Word implements PortableObject
  2. {
  3.     private String word;
  4.  
  5.     private long count = 0;
  6.  
  7.     private int length;
  8.  
  9.     public Word()
  10.     {
  11.     }
  12.  
  13.     public Word(String word)
  14.     {
  15.         this.word = word;
  16.         this.length = word.length();
  17.     }
  18.  
  19.     public String getWord()
  20.     {
  21.         return word;
  22.     }
  23.  
  24.     public int getLength()
  25.     {
  26.         return length;
  27.     }
  28.  
  29.     public long getCount()
  30.     {
  31.         return count;
  32.     }
  33.  
  34.     public void setCount(long count)
  35.     {
  36.         this.count = count;
  37.     }
  38.  
  39.     public void incrementCount()
  40.     {
  41.         count++;
  42.     }
  43.  
  44.     @Override
  45.     public void readExternal(PofReader pofReader) throws IOException
  46.     {
  47.         word = pofReader.readString(0);
  48.         count = pofReader.readLong(1);
  49.         length = pofReader.readInt(2);
  50.     }
  51.  
  52.     @Override
  53.     public void writeExternal(PofWriter pofWriter) throws IOException
  54.     {
  55.         pofWriter.writeString(0, word);
  56.         pofWriter.writeLong(1, count);
  57.         pofWriter.writeInt(2, length);
  58.     }
  59.  
  60.     @Override
  61.     public String toString()
  62.     {
  63.         return "Word{" +
  64.                "count=" + count +
  65.                ", word='" + word + '\'' +
  66.                ", length=" + length +
  67.                '}';
  68.     }
  69. }
public class Word implements PortableObject
{
    private String word;

    private long count = 0;

    private int length;

    public Word()
    {
    }

    public Word(String word)
    {
        this.word = word;
        this.length = word.length();
    }

    public String getWord()
    {
        return word;
    }

    public int getLength()
    {
        return length;
    }

    public long getCount()
    {
        return count;
    }

    public void setCount(long count)
    {
        this.count = count;
    }

    public void incrementCount()
    {
        count++;
    }

    @Override
    public void readExternal(PofReader pofReader) throws IOException
    {
        word = pofReader.readString(0);
        count = pofReader.readLong(1);
        length = pofReader.readInt(2);
    }

    @Override
    public void writeExternal(PofWriter pofWriter) throws IOException
    {
        pofWriter.writeString(0, word);
        pofWriter.writeLong(1, count);
        pofWriter.writeInt(2, length);
    }

    @Override
    public String toString()
    {
        return "Word{" +
               "count=" + count +
               ", word='" + word + '\'' +
               ", length=" + length +
               '}';
    }
}

It does not get much simpler than that. The Word class implements PortableObject so it goes over the wire and has a couple of methods for getting and setting the usage count and other fields.

— Macbeth:
“Is this a Dagger, which I see before me,
The Handle toward my Hand? Come, let me clutch thee:
I haue thee not, and yet I see thee still.
Art thou not fatall Vision, sensible
To feeling, as to sight? or art thou but
A Dagger of the Minde, a false Creation,
Proceeding from the heat-oppressed Braine?”
Macbeth (II, ii)

My Word cache will be a simple distributed cache, for this example any distributed cache configuration will do as all I want to do is run an aggregator. I will not bother showing a cache configuration file as there is nothing special in it, even the default file in the Coherence jar would do for this example.

Loading the Word Cache

Now I have a class to represent a word I need to load some words into the cache. Well, Shakespeare knew a lot of words so what better than one of his plays, so we will go to Project Gutenberg where you can download some Shakespeare – and it should be obvious from the quotes that we are going to use Macbeth for this example.

Now we have a file we can write a trivial bit of code to load the words.

  1. private static void loadMacbeth(int count) throws Exception
  2. {
  3.     NamedCache wordsCache = CacheFactory.getCache("words");
  4.  
  5.     WordUpdater updater = new WordUpdater();
  6.  
  7.     BufferedReader reader = new BufferedReader(new FileReader("/Users/jonathanknight/Documents/macbeth.txt"));
  8.     String line = reader.readLine();
  9.  
  10.     // skip 382 lines to miss the Project Gutenberg introduction
  11.     for (int i=0; i<382; i++)
  12.     {
  13.         line = reader.readLine();
  14.     }
  15.  
  16.     int wordCount = 0;
  17.     while(line != null && wordCount < count)
  18.     {
  19.         System.out.println(line);
  20.         String[] words = line.split("\\s|((?!['-])\\p{Punct})");
  21.  
  22.         for (String word : words)
  23.         {
  24.             if (word.length() > 0)
  25.             {
  26.                 word = word.toLowerCase();
  27.                 wordsCache.invoke(word, updater);
  28.                 wordCount++;
  29.             }
  30.         }
  31.         line = reader.readLine();
  32.     }
  33. }
private static void loadMacbeth(int count) throws Exception
{
    NamedCache wordsCache = CacheFactory.getCache("words");

    WordUpdater updater = new WordUpdater();

    BufferedReader reader = new BufferedReader(new FileReader("/Users/jonathanknight/Documents/macbeth.txt"));
    String line = reader.readLine();

    // skip 382 lines to miss the Project Gutenberg introduction
    for (int i=0; i<382; i++)
    {
        line = reader.readLine();
    }

    int wordCount = 0;
    while(line != null && wordCount < count)
    {
        System.out.println(line);
        String[] words = line.split("\\s|((?!['-])\\p{Punct})");

        for (String word : words)
        {
            if (word.length() > 0)
            {
                word = word.toLowerCase();
                wordsCache.invoke(word, updater);
                wordCount++;
            }
        }
        line = reader.readLine();
    }
}

And the WordUpdater entry processor used to update the counts in the cache…

  1. public class WordUpdater extends AbstractProcessor implements PortableObject
  2. {
  3.     @Override
  4.     public Object process(InvocableMap.Entry entry)
  5.     {
  6.         Word word;
  7.         if (entry.isPresent())
  8.         {
  9.             word = (Word) entry.getValue();
  10.         } else {
  11.             word = new Word((String) entry.getKey());
  12.         }
  13.         word.incrementCount();
  14.         entry.setValue(word);
  15.         return null;
  16.     }
  17.  
  18.     @Override
  19.     public void readExternal(PofReader pofReader) throws IOException
  20.     {
  21.     }
  22.  
  23.     @Override
  24.     public void writeExternal(PofWriter pofWriter) throws IOException
  25.     {
  26.     }
  27. }
public class WordUpdater extends AbstractProcessor implements PortableObject
{
    @Override
    public Object process(InvocableMap.Entry entry)
    {
        Word word;
        if (entry.isPresent())
        {
            word = (Word) entry.getValue();
        } else {
            word = new Word((String) entry.getKey());
        }
        word.incrementCount();
        entry.setValue(word);
        return null;
    }

    @Override
    public void readExternal(PofReader pofReader) throws IOException
    {
    }

    @Override
    public void writeExternal(PofWriter pofWriter) throws IOException
    {
    }
}

All the code does is read lines from the .txt file (skipping the first 382 as this is an introduction by Project Gutenberg) and for each line it uses the String.split() method to split the string. The regex expression used to split the line into words “\\s|((?![‘])\\p{Punct})” splits on punctuation as well as on spaces. For those not up to speed on regex I shall explain the parts

  • The first part \\s matches whitespace
  • Then we have a | which is OR
  • Then we have ((?![‘-])\\p{Punct}) which is all punctuation except for single quote
  • The \\p{Punct} matches all punctuation
  • The (?![‘-]) is a negative look ahead – i.e. not whatever is inside the square brackets – in this case single quote and hyphen

This is because I want words that end with say a comma or full stop or come straight after say a double quote to be treated like a normal word. For example, taking the opening part of the play…

Thunder and Lightning. Enter three Witches.

1. When shall we three meet againe?
In Thunder, Lightning, or in Raine?
2. When the Hurley-burley’s done,
When the Battaile’s lost, and wonne

…I want the first “Thunder” in “Thunder and lightning” to be treated as the same word as “Thunder,” in “In Thunder, Lightning, or in Raine?” so I need to strip the comma from the second thunder; the same would apply to stripping the question mark from the end of “Raine?”. Single quotes and dashes are excluded from splitting words they are is used as hyphens to join words and apostrophes in words like “Hurley-burley’s” and we do not want this to appear as three words “Hurley”, “burley” and “s”.

The reason I use the WordUpdater EntryProcessor to do the update of the count is so that it is atomic. The scenario I was trying to work on originally involved multiple simultaneous books/plays being loaded together so word count needs to be updated atomically as two or more threads could try to update the same word at the same time.

Counting Words

To start with we will do a simple query to bring back the word count so we can see what we have in the words cache. Rather than just bring back all the entries, that would a long list, I will do a query where I return a count and a list of all the words that have that usage count; oh… and one more thing, we will only look at words with five or more letters so we do not just end up with a list of words like “a”, “to”, “and” etc…

I can do this query with standard Coherence Aggregators, in this case a combination of the DistinctValues aggregator and a GroupAggregator. To do the word length filtering I just use a PofExtractor to extract word length. So here is the code to count everything…

  1. NamedCache wordsCache = CacheFactory.getCache("words");
  2.  
  3. ValueExtractor wordExtractor = new PofExtractor(String.class, 0);
  4. ValueExtractor countExtractor = new PofExtractor(Long.class, 1);
  5.  
  6. GroupAggregator groupAggregator =
  7.         GroupAggregator.createInstance(countExtractor, new DistinctValues(wordExtractor));
  8.  
  9. Filter wordSizeFilter = new GreaterFilter(new PofExtractor(Integer.class, 4), 4);
  10.  
  11. Map<Long,Collection> result = (Map<Long, Collection>) wordsCache.aggregate(wordSizeFilter, groupAggregator);
  12.  
  13. System.out.println("Result: shouldCountWords");
  14. for (Map.Entry<Long,Collection> entry : result.entrySet())
  15. {
  16.     System.out.println(entry.getKey() + "\t" + entry.getValue());
  17. }
NamedCache wordsCache = CacheFactory.getCache("words");

ValueExtractor wordExtractor = new PofExtractor(String.class, 0);
ValueExtractor countExtractor = new PofExtractor(Long.class, 1);

GroupAggregator groupAggregator =
        GroupAggregator.createInstance(countExtractor, new DistinctValues(wordExtractor));

Filter wordSizeFilter = new GreaterFilter(new PofExtractor(Integer.class, 4), 4);

Map<Long,Collection> result = (Map<Long, Collection>) wordsCache.aggregate(wordSizeFilter, groupAggregator);

System.out.println("Result: shouldCountWords");
for (Map.Entry<Long,Collection> entry : result.entrySet())
{
    System.out.println(entry.getKey() + "\t" + entry.getValue());
}

If we load the cache with the first 10,000 words from the play and then run the code above we get the following output…

1 [manhood, blowes, drowsie, inuest, scoena, bright, folly, inuite, particular, sleeue, augment, surmise, colour …
2 [robes, hearts, neyther, wearie, hauing, sences, spoke, ready, began, banquet, filthie, partner, instant …
3 [gentlemen, sorrow, halfe, greater, honors, treason, shine, ghost, winde, thanks, enemie, secunda, throat …
4 [forth, darke, faces, second, attend, reason, innocent, ‘gainst, become, chance, bring, together, takes …
5 [might, state, house, hearke, through, fortune, though, let’s, father, while, giuen, whence, almost …
6 [would’st, words, liues, honor, feares, shake, leaue, lesse, earth, fleans, comes, chamber]
7 [hence, donalbaine, being, houre, about, royall, cannot, knocking, friends, who’s, malcolme, peace …
8 [businesse, thinke, murther, lords, kings, morrow, further, keepe, sight, present, stand, three …
9 [o’th’, highnesse, euery, could, heare, against, murth, thought, duncan, onely]
10 [haile, heart, other, whose, before, seruant, heere, downe, bloody]
11 [againe, still, without, within, i’th’]
12 [these, there’s]
13 [worthy, death]
14 [those, feare, scena]
15 [strange, where, nature]
17 [exeunt, blood, looke]
16 [knock, things]
19 [selfe, lenox, sleepe, should]
21 [great, cawdor]
20 [speake, there]
23 [thane, would]
25 [night]
26 [rosse]
30 [banquo]
34 [macbeth]
32 [their]
36 [shall]
48 [enter]
57 [which]
And yes, for those of you that are not native English speakers, they really are English words, or rather it is quite old English, or I should say Olde English as Shakespeare wrote Macbeth in around 1603. The document I used is scanned from an original first edition and has some archaic spellings and printing idiosyncrasies that are expliained very well in the introduction of the document. If you want a more readable version there is another one here More readable Macbeth

I have truncated some of the longer lists of words otherwise the results would have filled the page. At first it appears that the list (or Map rather) is in some sort of order, but it is not really and you can see the later entries are not in usage count order.

An Oracle Coherence Top n Query – Find the Top 5 Word Usages

So back to our original requirement of trying to find the most used words. For my query requirement I want the top five lists of most used words – and I want them in order. From the list above this would be…

57 [which]
48 [enter]
36 [shall]
34 [macbeth]
32 [their]

…we would expect to see Macbeth make it into the top list with 34 occurrences but poor old Banquo just missed out on 30 occurrences – although that is the least of Banquo’s problems as Macbeth has him murdered.

Now, to obtain the top five I could do the same query I just did above then when I have the results back on the client I could sort them and pull off the top five entries, but that is cheating a bit and although in this case the data set is small that may not always be the case. Say I had a cache of many GBytes of data, which is not unusual on the size of cluster I work with, I could not bring all of that back to a client to sort and cut down, far better, and far faster, to get each cluster member to do a bit of the work.

— Macbeth:
Auant, & quit my sight, let the earth hide thee:
Thy bones are marrowlesse, thy blood is cold:
Thou hast no speculation in those eyes
Which thou dost glare with.
Macbeth (III, iv)

Coherence has two ways to limit the results returned from a query, either a Filter or an Aggregator, or combination of both. In this case a Filter on its own will not do the job as this just restricts the entries returned to matching a criteria. An aggregator on the other hand does have the ability to do what we want. In my example I am using a GroupAggregator but I obviously I do not want to write a specialised case of GroupAggregator that limits its result set as this would not work if I wanted to limit other types of aggregator. For example, say I didn’t want the GroupBy/DistinctValues combination but just wanted the top ten longest words – I would just use a ReducerAggregator. What I need to write is an aggregator that wraps another aggregator and can limit (optionally after ordering) the results and without further ado (or keeping the Shakespear theme “without much ado“) here is the code…

  1. /*
  2.  * File: LimitAggregator.java
  3.  *
  4.  * Copyright (c) 2012. All Rights Reserved. Jonathan Knight.
  5.  *
  6.  * Jonathan Knight makes no representations or warranties about the
  7.  * suitability of the software, either express or implied, including but not
  8.  * limited to the implied warranties of merchantability, fitness for a
  9.  * particular purpose, or non-infringement. Jonathan Knight shall not be
  10.  * liable for any damages suffered by licensee as a result of using, modifying
  11.  * or distributing this software or its derivatives.
  12.  *
  13.  * This notice may not be removed or altered.
  14.  */
  15. package com.thegridman.coherence.aggregators;
  16.  
  17. import com.tangosol.io.pof.PofReader;
  18. import com.tangosol.io.pof.PofWriter;
  19. import com.tangosol.io.pof.PortableObject;
  20. import com.tangosol.util.Filter;
  21. import com.tangosol.util.InvocableMap;
  22. import com.tangosol.util.SimpleMapEntry;
  23. import com.tangosol.util.comparator.EntryComparator;
  24.  
  25. import java.io.IOException;
  26. import java.util.*;
  27.  
  28. /**
  29.  * This class is an Aggregator that wraps another aggregator to limit the amount of data returned.
  30.  * It would typically be used with aggregators that return Collections or Maps to perform the
  31.  * equivalent of Top <i>n</i> queries, for example, find the to 5 valuations by valuation amount.
  32.  *
  33.  * @author Jonathan Knight
  34.  */
  35. public class LimitAggregator implements InvocableMap.ParallelAwareAggregator, PortableObject
  36. {
  37.     /** The underlying aggregator that will obtain the un-ordered, unlimited results */
  38.     private InvocableMap.EntryAggregator aggregator;
  39.  
  40.     /** The maximum number of results to return */
  41.     private long limit;
  42.  
  43.     /** A flag that indicates whether the results should be ordered prior to limiting */
  44.     private boolean ordered;
  45.  
  46.     /** A comparator to use to order the results if the ordered flag is true.
  47.      * If null and ordered is true the results should be Comparable */
  48.     private Comparator comparator;
  49.  
  50.     private int mapComparatorTarget = EntryComparator.CMP_KEY;
  51.  
  52.     /** Empty constructor for POF */
  53.     public LimitAggregator()
  54.     {
  55.     }
  56.  
  57.     /**
  58.      * Create an instance of a LimitAggregator that will imit the results of the specified
  59.      * aggregator to a maximum number of optionally sorted results.
  60.      *
  61.      * @param aggregator - the aggregator to get the data
  62.      * @param limit      - the maximum number of results to return
  63.      * @param ordered    - should the results be sorted prior to limiting
  64.      * @param comparator - optional Comparator to use to sort results
  65.      */
  66.     public LimitAggregator(InvocableMap.EntryAggregator aggregator, long limit, boolean ordered, Comparator comparator)
  67.     {
  68.         this(aggregator, limit, ordered, comparator, EntryComparator.CMP_KEY);
  69.     }
  70.  
  71.     /**
  72.      * Create an instance of a LimitAggregator that will imit the results of the specified
  73.      * aggregator to a maximum number of optionally sorted results.
  74.      *
  75.      * @param aggregator - the aggregator to get the data
  76.      * @param limit      - the maximum number of results to return
  77.      * @param ordered    - should the results be sorted prior to limiting
  78.      * @param comparator - optional Comparator to use to sort results
  79.      * @param mapComparatorTarget - specified how to sort a Map result set prior to truncation. See {@link EntryComparator}
  80.      */
  81.     public LimitAggregator(InvocableMap.EntryAggregator aggregator, long limit, boolean ordered, Comparator comparator, int mapComparatorTarget)
  82.     {
  83.         this.aggregator = aggregator;
  84.         this.limit = limit;
  85.         this.ordered = ordered;
  86.         this.comparator = comparator;
  87.         this.mapComparatorTarget = mapComparatorTarget;
  88.     }
  89.  
  90.     /**
  91.      * Implementation of {@link InvocableMap.ParallelAwareAggregator} getParallelAggregator method
  92.      *
  93.      * @return this aggregator
  94.      */
  95.     @Override
  96.     public InvocableMap.EntryAggregator getParallelAggregator()
  97.     {
  98.         return this;
  99.     }
  100.  
  101.     /**
  102.      * Implementation of {@link InvocableMap.ParallelAwareAggregator} aggregate method
  103.      *
  104.      * @return the result of this aggregator
  105.      */
  106.     @Override
  107.     public Object aggregate(Set entries)
  108.     {
  109.         return aggregator.aggregate(entries);
  110.     }
  111.  
  112.     /**
  113.      * Implementation of {@link InvocableMap.ParallelAwareAggregator} method to aggregate the
  114.      * Collection of results.
  115.      *
  116.      * @param results - the results to be aggregated.
  117.      * @return the aggregated results
  118.      */
  119.     @SuppressWarnings({"unchecked"})
  120.     @Override
  121.     public Object aggregateResults(Collection results)
  122.     {
  123.         Object limited;
  124.         if (aggregator instanceof InvocableMap.ParallelAwareAggregator)
  125.         {
  126.             Object result = ((InvocableMap.ParallelAwareAggregator)aggregator).aggregateResults(results);
  127.             if (result instanceof Collection)
  128.             {
  129.                 limited = truncateCollection((Collection) result);
  130.             }
  131.             else if (result instanceof Map)
  132.             {
  133.                 limited = truncateMap((Map) result);
  134.             }
  135.             else
  136.             {
  137.                 limited = Collections.singleton(result);
  138.             }
  139.         }
  140.         else
  141.         {
  142.             limited = truncateCollection(results);
  143.         }
  144.         return limited;
  145.     }
  146.  
  147.     /**
  148.      * Method called by clients to execute the aggregator.
  149.      *
  150.      * This is required to make sure that any results are correctly sorted
  151.      * as when a SortedMap is serialized on a server and deserialized on the
  152.      * client it is no longer sorted.
  153.      *
  154.      * @param map    - the Map (cache) to aggregate
  155.      * @param filter - the Filter to apply to the aggregate call
  156.      * @return the results of the limited aggregator call.
  157.      */
  158.     @SuppressWarnings({"unchecked"})
  159.     public Object aggregate(InvocableMap map, Filter filter)
  160.     {
  161.         Object results = map.aggregate(filter, this);
  162.         if (results instanceof Collection)
  163.         {
  164.             results = copyCollection((Collection) results);
  165.         }
  166.         else if (results instanceof Map)
  167.         {
  168.             results = copyMap((Map) results);
  169.         }
  170.         return results;
  171.     }
  172.  
  173.     /**
  174.      * Method called by clients to execute the aggregator.
  175.      *
  176.      * This is required to make sure that any results are correctly sorted
  177.      * as when a SortedMap is serialized on a server and deserialized on the
  178.      * client it is no longer sorted.
  179.      *
  180.      * @param map    - the Map (cache) to aggregate
  181.      * @param keys   - the set of keys to aggregate
  182.      * @return the results of the limited aggregator call.
  183.      */
  184.     @SuppressWarnings({"unchecked"})
  185.     public Object aggregate(InvocableMap map, Collection keys)
  186.     {
  187.         Object results = map.aggregate(keys, this);
  188.         if (results instanceof Collection)
  189.         {
  190.             results = copyCollection((Collection) results);
  191.         }
  192.         else if (results instanceof Map)
  193.         {
  194.             results = copyMap((Map) results);
  195.         }
  196.         return results;
  197.     }
  198.     /**
  199.      * Truncate a Map down to the required size.
  200.      *
  201.      * @param mapToTruncate - the map to be truncated
  202.      * @return a copy of the specified Map truncated to the required number of entries.
  203.      */
  204.     public <K,V> Map<K,V> truncateMap(Map<K,V> mapToTruncate)
  205.     {
  206.         Map<K,V> map = copyMap(mapToTruncate);
  207.         truncate(map.keySet());
  208.         return map;
  209.     }
  210.  
  211.     /**
  212.      * Truncate a Collection down to the required size.
  213.      * The Collection will be sorted prior to truncation if required.
  214.      *
  215.      * @param collectionToTruncate - the Collection to be truncated
  216.      * @return a copy of the Collection truncated to the required size.
  217.      */
  218.     public <T> Collection<T> truncateCollection(Collection<T> collectionToTruncate)
  219.     {
  220.         Collection<T> collection = copyCollection(collectionToTruncate);
  221.         truncate(collection);
  222.         return collection;
  223.     }
  224.  
  225.     /**
  226.      * Truncate a Collection.
  227.      *
  228.      * @param collection - the collection to truncate
  229.      */
  230.     private void truncate(Collection collection)
  231.     {
  232.         Iterator it = collection.iterator();
  233.         int count = 0;
  234.         while(it.hasNext())
  235.         {
  236.             it.next();
  237.             if (count < limit)
  238.             {
  239.                 count++;
  240.                 continue;
  241.             }
  242.             it.remove();
  243.         }
  244.     }
  245.  
  246.     /**
  247.      * Create a copy of the specified Collection.
  248.      * If the ordered field is true then the resulting Collection
  249.      * will be ordered.
  250.      *
  251.      * @param collection - the collection to copy and optionally sort.
  252.      * @return a copy, optionally sorted, of the specified Collection.
  253.      */
  254.     @SuppressWarnings({"unchecked"})
  255.     public <T> Collection<T> copyCollection(Collection<T> collection)
  256.     {
  257.         List list = new ArrayList(collection);
  258.         if (ordered)
  259.         {
  260.             Collections.sort(list, comparator);
  261.         }
  262.         return list;
  263.     }
  264.  
  265.     /**
  266.      * Create a copy of the specified Map.
  267.      * If the ordered field is true then the resulting Map
  268.      * will be ordered.
  269.      *
  270.      * @param map - the Map to copy and optionally sort.
  271.      * @return a copy, optionally sorted, of the specified Map.
  272.      */
  273.     @SuppressWarnings({"unchecked"})
  274.     public <K,V> Map<K,V> copyMap(Map<K, V> map)
  275.     {
  276.         Map<K,V> newMap;
  277.         if (ordered)
  278.         {
  279.             newMap = new LinkedHashMap<K, V>();
  280.             List<Map.Entry<K,V>> entries = new ArrayList<Map.Entry<K,V>>();
  281.             for (Map.Entry entry : map.entrySet())
  282.             {
  283.                 entries.add(new SimpleMapEntry(entry.getKey(), entry.getValue()));
  284.             }
  285.             EntryComparator entryComparator = new EntryComparator(comparator, mapComparatorTarget);
  286.             Collections.sort(entries, entryComparator);
  287.             for (Map.Entry<K,V> entry : entries)
  288.             {
  289.                 newMap.put(entry.getKey(), entry.getValue());
  290.             }
  291.         }
  292.         else
  293.         {
  294.             newMap = new HashMap<K,V>();
  295.             newMap.putAll(map);
  296.         }
  297.         return newMap;
  298.     }
  299.  
  300.     @Override
  301.     public void readExternal(PofReader pofReader) throws IOException
  302.     {
  303.         aggregator = (InvocableMap.EntryAggregator) pofReader.readObject(0);
  304.         limit = pofReader.readLong(1);
  305.         comparator = (Comparator) pofReader.readObject(2);
  306.         ordered = pofReader.readBoolean(3);
  307.         mapComparatorTarget = pofReader.readInt(4);
  308.     }
  309.  
  310.     @Override
  311.     public void writeExternal(PofWriter pofWriter) throws IOException
  312.     {
  313.         pofWriter.writeObject(0, aggregator);
  314.         pofWriter.writeLong(1, limit);
  315.         pofWriter.writeObject(2, comparator);
  316.         pofWriter.writeBoolean(3, ordered);
  317.         pofWriter.writeInt(4, mapComparatorTarget);
  318.     }
  319.  
  320.     public InvocableMap.EntryAggregator getAggregator()
  321.     {
  322.         return aggregator;
  323.     }
  324.  
  325.     public long getLimit()
  326.     {
  327.         return limit;
  328.     }
  329.  
  330.     public boolean isOrdered()
  331.     {
  332.         return ordered;
  333.     }
  334.  
  335.     public Comparator getComparator()
  336.     {
  337.         return comparator;
  338.     }
  339.  
  340. }
/*
 * File: LimitAggregator.java
 *
 * Copyright (c) 2012. All Rights Reserved. Jonathan Knight.
 *
 * Jonathan Knight makes no representations or warranties about the
 * suitability of the software, either express or implied, including but not
 * limited to the implied warranties of merchantability, fitness for a
 * particular purpose, or non-infringement. Jonathan Knight shall not be
 * liable for any damages suffered by licensee as a result of using, modifying
 * or distributing this software or its derivatives.
 *
 * This notice may not be removed or altered.
 */
package com.thegridman.coherence.aggregators;

import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.util.Filter;
import com.tangosol.util.InvocableMap;
import com.tangosol.util.SimpleMapEntry;
import com.tangosol.util.comparator.EntryComparator;

import java.io.IOException;
import java.util.*;

/**
 * This class is an Aggregator that wraps another aggregator to limit the amount of data returned.
 * It would typically be used with aggregators that return Collections or Maps to perform the
 * equivalent of Top <i>n</i> queries, for example, find the to 5 valuations by valuation amount.
 *
 * @author Jonathan Knight
 */
public class LimitAggregator implements InvocableMap.ParallelAwareAggregator, PortableObject
{
    /** The underlying aggregator that will obtain the un-ordered, unlimited results */
    private InvocableMap.EntryAggregator aggregator;

    /** The maximum number of results to return */
    private long limit;

    /** A flag that indicates whether the results should be ordered prior to limiting */
    private boolean ordered;

    /** A comparator to use to order the results if the ordered flag is true.
     * If null and ordered is true the results should be Comparable */
    private Comparator comparator;

    private int mapComparatorTarget = EntryComparator.CMP_KEY;

    /** Empty constructor for POF */
    public LimitAggregator()
    {
    }

    /**
     * Create an instance of a LimitAggregator that will imit the results of the specified
     * aggregator to a maximum number of optionally sorted results.
     *
     * @param aggregator - the aggregator to get the data
     * @param limit      - the maximum number of results to return
     * @param ordered    - should the results be sorted prior to limiting
     * @param comparator - optional Comparator to use to sort results
     */
    public LimitAggregator(InvocableMap.EntryAggregator aggregator, long limit, boolean ordered, Comparator comparator)
    {
        this(aggregator, limit, ordered, comparator, EntryComparator.CMP_KEY);
    }

    /**
     * Create an instance of a LimitAggregator that will imit the results of the specified
     * aggregator to a maximum number of optionally sorted results.
     *
     * @param aggregator - the aggregator to get the data
     * @param limit      - the maximum number of results to return
     * @param ordered    - should the results be sorted prior to limiting
     * @param comparator - optional Comparator to use to sort results
     * @param mapComparatorTarget - specified how to sort a Map result set prior to truncation. See {@link EntryComparator}
     */
    public LimitAggregator(InvocableMap.EntryAggregator aggregator, long limit, boolean ordered, Comparator comparator, int mapComparatorTarget)
    {
        this.aggregator = aggregator;
        this.limit = limit;
        this.ordered = ordered;
        this.comparator = comparator;
        this.mapComparatorTarget = mapComparatorTarget;
    }

    /**
     * Implementation of {@link InvocableMap.ParallelAwareAggregator} getParallelAggregator method
     *
     * @return this aggregator
     */
    @Override
    public InvocableMap.EntryAggregator getParallelAggregator()
    {
        return this;
    }

    /**
     * Implementation of {@link InvocableMap.ParallelAwareAggregator} aggregate method
     *
     * @return the result of this aggregator
     */
    @Override
    public Object aggregate(Set entries)
    {
        return aggregator.aggregate(entries);
    }

    /**
     * Implementation of {@link InvocableMap.ParallelAwareAggregator} method to aggregate the
     * Collection of results.
     *
     * @param results - the results to be aggregated.
     * @return the aggregated results
     */
    @SuppressWarnings({"unchecked"})
    @Override
    public Object aggregateResults(Collection results)
    {
        Object limited;
        if (aggregator instanceof InvocableMap.ParallelAwareAggregator)
        {
            Object result = ((InvocableMap.ParallelAwareAggregator)aggregator).aggregateResults(results);
            if (result instanceof Collection)
            {
                limited = truncateCollection((Collection) result);
            }
            else if (result instanceof Map)
            {
                limited = truncateMap((Map) result);
            }
            else
            {
                limited = Collections.singleton(result);
            }
        }
        else
        {
            limited = truncateCollection(results);
        }
        return limited;
    }

    /**
     * Method called by clients to execute the aggregator.
     *
     * This is required to make sure that any results are correctly sorted
     * as when a SortedMap is serialized on a server and deserialized on the
     * client it is no longer sorted.
     *
     * @param map    - the Map (cache) to aggregate
     * @param filter - the Filter to apply to the aggregate call
     * @return the results of the limited aggregator call.
     */
    @SuppressWarnings({"unchecked"})
    public Object aggregate(InvocableMap map, Filter filter)
    {
        Object results = map.aggregate(filter, this);
        if (results instanceof Collection)
        {
            results = copyCollection((Collection) results);
        }
        else if (results instanceof Map)
        {
            results = copyMap((Map) results);
        }
        return results;
    }

    /**
     * Method called by clients to execute the aggregator.
     *
     * This is required to make sure that any results are correctly sorted
     * as when a SortedMap is serialized on a server and deserialized on the
     * client it is no longer sorted.
     *
     * @param map    - the Map (cache) to aggregate
     * @param keys   - the set of keys to aggregate
     * @return the results of the limited aggregator call.
     */
    @SuppressWarnings({"unchecked"})
    public Object aggregate(InvocableMap map, Collection keys)
    {
        Object results = map.aggregate(keys, this);
        if (results instanceof Collection)
        {
            results = copyCollection((Collection) results);
        }
        else if (results instanceof Map)
        {
            results = copyMap((Map) results);
        }
        return results;
    }
    /**
     * Truncate a Map down to the required size.
     *
     * @param mapToTruncate - the map to be truncated
     * @return a copy of the specified Map truncated to the required number of entries.
     */
    public <K,V> Map<K,V> truncateMap(Map<K,V> mapToTruncate)
    {
        Map<K,V> map = copyMap(mapToTruncate);
        truncate(map.keySet());
        return map;
    }

    /**
     * Truncate a Collection down to the required size.
     * The Collection will be sorted prior to truncation if required.
     *
     * @param collectionToTruncate - the Collection to be truncated
     * @return a copy of the Collection truncated to the required size.
     */
    public <T> Collection<T> truncateCollection(Collection<T> collectionToTruncate)
    {
        Collection<T> collection = copyCollection(collectionToTruncate);
        truncate(collection);
        return collection;
    }

    /**
     * Truncate a Collection.
     *
     * @param collection - the collection to truncate
     */
    private void truncate(Collection collection)
    {
        Iterator it = collection.iterator();
        int count = 0;
        while(it.hasNext())
        {
            it.next();
            if (count < limit)
            {
                count++;
                continue;
            }
            it.remove();
        }
    }

    /**
     * Create a copy of the specified Collection.
     * If the ordered field is true then the resulting Collection
     * will be ordered.
     *
     * @param collection - the collection to copy and optionally sort.
     * @return a copy, optionally sorted, of the specified Collection.
     */
    @SuppressWarnings({"unchecked"})
    public <T> Collection<T> copyCollection(Collection<T> collection)
    {
        List list = new ArrayList(collection);
        if (ordered)
        {
            Collections.sort(list, comparator);
        }
        return list;
    }

    /**
     * Create a copy of the specified Map.
     * If the ordered field is true then the resulting Map
     * will be ordered.
     *
     * @param map - the Map to copy and optionally sort.
     * @return a copy, optionally sorted, of the specified Map.
     */
    @SuppressWarnings({"unchecked"})
    public <K,V> Map<K,V> copyMap(Map<K, V> map)
    {
        Map<K,V> newMap;
        if (ordered)
        {
            newMap = new LinkedHashMap<K, V>();
            List<Map.Entry<K,V>> entries = new ArrayList<Map.Entry<K,V>>();
            for (Map.Entry entry : map.entrySet())
            {
                entries.add(new SimpleMapEntry(entry.getKey(), entry.getValue()));
            }
            EntryComparator entryComparator = new EntryComparator(comparator, mapComparatorTarget);
            Collections.sort(entries, entryComparator);
            for (Map.Entry<K,V> entry : entries)
            {
                newMap.put(entry.getKey(), entry.getValue());
            }
        }
        else
        {
            newMap = new HashMap<K,V>();
            newMap.putAll(map);
        }
        return newMap;
    }

    @Override
    public void readExternal(PofReader pofReader) throws IOException
    {
        aggregator = (InvocableMap.EntryAggregator) pofReader.readObject(0);
        limit = pofReader.readLong(1);
        comparator = (Comparator) pofReader.readObject(2);
        ordered = pofReader.readBoolean(3);
        mapComparatorTarget = pofReader.readInt(4);
    }

    @Override
    public void writeExternal(PofWriter pofWriter) throws IOException
    {
        pofWriter.writeObject(0, aggregator);
        pofWriter.writeLong(1, limit);
        pofWriter.writeObject(2, comparator);
        pofWriter.writeBoolean(3, ordered);
        pofWriter.writeInt(4, mapComparatorTarget);
    }

    public InvocableMap.EntryAggregator getAggregator()
    {
        return aggregator;
    }

    public long getLimit()
    {
        return limit;
    }

    public boolean isOrdered()
    {
        return ordered;
    }

    public Comparator getComparator()
    {
        return comparator;
    }

}

…and for those that want it (and have JUnit, Mockito and Hamcrest) here is the corresponding unit test class…

  1. /*
  2.  * File: LimitAggregatorTest.java
  3.  *
  4.  * Copyright (c) 2012. All Rights Reserved. Jonathan Knight.
  5.  *
  6.  * Jonathan Knight makes no representations or warranties about the
  7.  * suitability of the software, either express or implied, including but not
  8.  * limited to the implied warranties of merchantability, fitness for a
  9.  * particular purpose, or non-infringement. Jonathan Knight shall not be
  10.  * liable for any damages suffered by licensee as a result of using, modifying
  11.  * or distributing this software or its derivatives.
  12.  *
  13.  * This notice may not be removed or altered.
  14.  */
  15. package com.thegridman.coherence.aggregators;
  16.  
  17. import com.tangosol.io.pof.ConfigurablePofContext;
  18. import com.tangosol.util.Binary;
  19. import com.tangosol.util.ExternalizableHelper;
  20. import com.tangosol.util.Filter;
  21. import com.tangosol.util.InvocableMap;
  22. import com.tangosol.util.aggregator.DistinctValues;
  23. import com.tangosol.util.comparator.EntryComparator;
  24. import com.tangosol.util.comparator.InverseComparator;
  25. import com.tangosol.util.filter.EqualsFilter;
  26. import org.junit.Test;
  27.  
  28. import java.util.Arrays;
  29. import java.util.Collection;
  30. import java.util.HashMap;
  31. import java.util.HashSet;
  32. import java.util.Map;
  33. import java.util.Set;
  34.  
  35. import static org.hamcrest.CoreMatchers.instanceOf;
  36. import static org.hamcrest.CoreMatchers.is;
  37. import static org.hamcrest.CoreMatchers.not;
  38. import static org.hamcrest.CoreMatchers.sameInstance;
  39. import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
  40. import static org.hamcrest.collection.IsMapContaining.hasEntry;
  41. import static org.junit.Assert.assertThat;
  42. import static org.mockito.Matchers.any;
  43. import static org.mockito.Matchers.anyCollection;
  44. import static org.mockito.Matchers.anySet;
  45. import static org.mockito.Matchers.same;
  46. import static org.mockito.Mockito.mock;
  47. import static org.mockito.Mockito.verify;
  48. import static org.mockito.Mockito.when;
  49.  
  50. /**
  51.  * @author Jonathan Knight
  52.  */
  53. public class LimitAggregatorTest
  54. {
  55.     @Test
  56.     public void shouldSerializeAndDeserialize() throws Exception
  57.     {
  58.         ConfigurablePofContext pofContext = new ConfigurablePofContext("gridman-pof-config.xml");
  59.  
  60.         LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 100L, true, new EntryComparator());
  61.         Binary binary = ExternalizableHelper.toBinary(aggregator, pofContext);
  62.         LimitAggregator result = (LimitAggregator) ExternalizableHelper.fromBinary(binary, pofContext);
  63.  
  64.         assertThat(result.getAggregator(), is(instanceOf(DistinctValues.class)));
  65.         assertThat(result.getLimit(), is(100L));
  66.         assertThat(result.isOrdered(), is(true));
  67.         assertThat(result.getComparator(), is(instanceOf(EntryComparator.class)));
  68.     }
  69.  
  70.     @Test
  71.     public void shouldReturnSelfForParallelAggregator() throws Exception
  72.     {
  73.         LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 100L, true, new EntryComparator());
  74.         assertThat((LimitAggregator) aggregator.getParallelAggregator(), is(sameInstance(aggregator)));
  75.     }
  76.  
  77.     @Test
  78.     public void shouldCreateCopyOfCollectionInSameOrder() throws Exception
  79.     {
  80.         LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 100L, false, null);
  81.  
  82.         Collection<String> collection = Arrays.asList("1", "3", "4", "2");
  83.  
  84.         Collection<String> result = aggregator.copyCollection(collection);
  85.         assertThat(result, is(not(sameInstance(collection))));
  86.         assertThat(result, contains("1", "3", "4", "2"));
  87.     }
  88.  
  89.     @Test
  90.     public void shouldCreateOrderedCopyOfCollection() throws Exception
  91.     {
  92.         LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 100L, true, null);
  93.  
  94.         Collection<String> collection = Arrays.asList("1", "4", "3", "2");
  95.  
  96.         Collection<String> result = aggregator.copyCollection(collection);
  97.         assertThat(result, is(not(sameInstance(collection))));
  98.         assertThat(result, contains("1", "2", "3", "4"));
  99.     }
  100.  
  101.     @Test
  102.     public void shouldCreateOrderedCopyOfCollectionUsingComparator() throws Exception
  103.     {
  104.         LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 100L, true, new InverseComparator());
  105.  
  106.         Collection<String> collection = Arrays.asList("1", "4", "3", "2");
  107.  
  108.         Collection<String> result = aggregator.copyCollection(collection);
  109.         assertThat(result, is(not(sameInstance(collection))));
  110.         assertThat(result, contains("4", "3", "2", "1"));
  111.     }
  112.  
  113.     @Test
  114.     public void shouldCreateTruncatedCopyOfCollectionInSameOrder() throws Exception
  115.     {
  116.         LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 3, false, null);
  117.  
  118.         Collection<String> collection = Arrays.asList("1", "3", "4", "2");
  119.  
  120.         Collection<String> result = aggregator.truncateCollection(collection);
  121.         assertThat(result, is(not(sameInstance(collection))));
  122.         assertThat(result, contains("1", "3", "4"));
  123.     }
  124.  
  125.     @Test
  126.     public void shouldCreateTruncatedOrderedCopyOfCollection() throws Exception
  127.     {
  128.         LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 2, true, null);
  129.  
  130.         Collection<String> collection = Arrays.asList("1", "4", "3", "2");
  131.  
  132.         Collection<String> result = aggregator.truncateCollection(collection);
  133.         assertThat(result, is(not(sameInstance(collection))));
  134.         assertThat(result, contains("1", "2"));
  135.     }
  136.  
  137.     @Test
  138.     public void shouldCreateCopyOfMap() throws Exception
  139.     {
  140.         LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 100L, false, null);
  141.  
  142.         Map<String,String> map = new HashMap<String,String>();
  143.         map.put("1", "value-1");
  144.         map.put("4", "value-4");
  145.         map.put("3", "value-3");
  146.         map.put("2", "value-2");
  147.  
  148.         Map<String,String> result = aggregator.copyMap(map);
  149.         assertThat(result, is(not(sameInstance(map))));
  150.         assertThat(result, is(map));
  151.     }
  152.  
  153.     @Test
  154.     public void shouldCreateOrderedCopyOfMap() throws Exception
  155.     {
  156.         LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 100L, true, null);
  157.  
  158.         Map<String,String> map = new HashMap<String,String>();
  159.         map.put("1", "value-1");
  160.         map.put("4", "value-4");
  161.         map.put("3", "value-3");
  162.         map.put("2", "value-2");
  163.  
  164.         Map<String,String> result = aggregator.copyMap(map);
  165.         assertThat(result, is(not(sameInstance(map))));
  166.         assertThat(result, hasEntry("1", "value-1"));
  167.         assertThat(result, hasEntry("4", "value-4"));
  168.         assertThat(result, hasEntry("3", "value-3"));
  169.         assertThat(result, hasEntry("2", "value-2"));
  170.         assertThat(result.keySet(), contains("1", "2", "3", "4"));
  171.     }
  172.  
  173.     @Test
  174.     public void shouldCreateOrderedCopyOfMapUsingComparator() throws Exception
  175.     {
  176.         LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 100L, true, new InverseComparator());
  177.  
  178.         Map<String,String> map = new HashMap<String,String>();
  179.         map.put("1", "value-1");
  180.         map.put("4", "value-4");
  181.         map.put("3", "value-3");
  182.         map.put("2", "value-2");
  183.  
  184.         Map<String,String> result = aggregator.copyMap(map);
  185.         assertThat(result, is(not(sameInstance(map))));
  186.         assertThat(result, hasEntry("1", "value-1"));
  187.         assertThat(result, hasEntry("4", "value-4"));
  188.         assertThat(result, hasEntry("3", "value-3"));
  189.         assertThat(result, hasEntry("2", "value-2"));
  190.         assertThat(result.keySet(), contains("4", "3", "2", "1"));
  191.     }
  192.  
  193.     @Test
  194.     public void shouldCreateOrderedCopyOfMapSortingValues() throws Exception
  195.     {
  196.         LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 100L, true, null, EntryComparator.CMP_VALUE);
  197.  
  198.         Map<String,String> map = new HashMap<String,String>();
  199.         map.put("1", "value-C");
  200.         map.put("4", "value-D");
  201.         map.put("3", "value-B");
  202.         map.put("2", "value-A");
  203.  
  204.         Map<String,String> result = aggregator.copyMap(map);
  205.         assertThat(result, is(not(sameInstance(map))));
  206.         assertThat(result, hasEntry("1", "value-C"));
  207.         assertThat(result, hasEntry("4", "value-D"));
  208.         assertThat(result, hasEntry("3", "value-B"));
  209.         assertThat(result, hasEntry("2", "value-A"));
  210.         assertThat(result.keySet(), contains("2", "3", "1", "4"));
  211.     }
  212.  
  213.     @Test
  214.     public void shouldCreateOrderedCopyOfMapSortingValuesUsingComparator() throws Exception
  215.     {
  216.         LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 100L, true, new InverseComparator(), EntryComparator.CMP_VALUE);
  217.  
  218.         Map<String,String> map = new HashMap<String,String>();
  219.         map.put("1", "value-C");
  220.         map.put("4", "value-D");
  221.         map.put("3", "value-B");
  222.         map.put("2", "value-A");
  223.  
  224.         Map<String,String> result = aggregator.copyMap(map);
  225.         assertThat(result, is(not(sameInstance(map))));
  226.         assertThat(result, hasEntry("1", "value-C"));
  227.         assertThat(result, hasEntry("4", "value-D"));
  228.         assertThat(result, hasEntry("3", "value-B"));
  229.         assertThat(result, hasEntry("2", "value-A"));
  230.         assertThat(result.keySet(), contains("4", "1", "3", "2"));
  231.     }
  232.  
  233.     @Test
  234.     public void shouldCreateTruncatedCopyOfMap() throws Exception
  235.     {
  236.         LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 3, false, null);
  237.  
  238.         Map<String,String> map = new HashMap<String,String>();
  239.         map.put("1", "value-1");
  240.         map.put("4", "value-4");
  241.         map.put("3", "value-3");
  242.         map.put("2", "value-2");
  243.  
  244.         Map<String,String> result = aggregator.truncateMap(map);
  245.         assertThat(result, is(not(sameInstance(map))));
  246.         assertThat(result.size(), is(3));
  247.     }
  248.  
  249.     @Test
  250.     public void shouldCreateOrderedTruncatedCopyOfMap() throws Exception
  251.     {
  252.         LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 2, true, null);
  253.  
  254.         Map<String,String> map = new HashMap<String,String>();
  255.         map.put("1", "value-1");
  256.         map.put("4", "value-4");
  257.         map.put("3", "value-3");
  258.         map.put("2", "value-2");
  259.  
  260.         Map<String,String> result = aggregator.truncateMap(map);
  261.         assertThat(result, is(not(sameInstance(map))));
  262.         assertThat(result, hasEntry("1", "value-1"));
  263.         assertThat(result, hasEntry("2", "value-2"));
  264.         assertThat(result.keySet(), contains("1", "2"));
  265.     }
  266.  
  267.     @Test
  268.     public void shouldCallAggregatorToAggregateEntries() throws Exception
  269.     {
  270.         InvocableMap.EntryAggregator aggregator = mock(InvocableMap.EntryAggregator.class);
  271.         Set entries = new HashSet();
  272.         Object aggregatorResult = new Object();
  273.  
  274.         when(aggregator.aggregate(anySet())).thenReturn(aggregatorResult);
  275.  
  276.         LimitAggregator limitAggregator = new LimitAggregator(aggregator, 2, true, null);
  277.         Object result = limitAggregator.aggregate(entries);
  278.  
  279.         assertThat(result, is(sameInstance(aggregatorResult)));
  280.         verify(aggregator).aggregate(same(entries));
  281.     }
  282.  
  283.     @SuppressWarnings({"unchecked"})
  284.     @Test
  285.     public void shouldJustTruncateResultWhenAggregatorIsNotParalleAware() throws Exception
  286.     {
  287.         Collection<String> collection = Arrays.asList("1", "4", "3", "2");
  288.         InvocableMap.EntryAggregator aggregator = mock(InvocableMap.EntryAggregator.class);
  289.  
  290.         LimitAggregator limitAggregator = new LimitAggregator(aggregator, 2, true, null);
  291.         Collection<String> result = (Collection<String>) limitAggregator.aggregateResults(collection);
  292.         assertThat(result, contains("1", "2"));
  293.     }
  294.  
  295.     @SuppressWarnings({"unchecked"})
  296.     @Test
  297.     public void shouldUseUnderlyingParallelAggregatorToAggregateResultsAndTruncateReturnedCollection() throws Exception
  298.     {
  299.         Collection<String> collection = Arrays.asList("1", "4", "3", "2");
  300.         Collection<String> aggregatorResult = Arrays.asList("9", "7", "8", "6");
  301.         InvocableMap.ParallelAwareAggregator aggregator = mock(InvocableMap.ParallelAwareAggregator.class);
  302.  
  303.         when(aggregator.aggregateResults(anyCollection())).thenReturn(aggregatorResult);
  304.  
  305.         LimitAggregator limitAggregator = new LimitAggregator(aggregator, 2, true, null);
  306.         Collection<String> result = (Collection<String>) limitAggregator.aggregateResults(collection);
  307.  
  308.         verify(aggregator).aggregateResults(collection);
  309.         assertThat(result, contains("6", "7"));
  310.     }
  311.  
  312.     @SuppressWarnings({"unchecked"})
  313.     @Test
  314.     public void shouldUseUnderlyingParallelAggregatorToAggregateResultsAndTruncateReturnedMap() throws Exception
  315.     {
  316.         Collection<String> collection = Arrays.asList("1", "4", "3", "2");
  317.         Map<String,String> aggregatorResult = new HashMap<String, String>();
  318.         aggregatorResult.put("1", "value-1");
  319.         aggregatorResult.put("4", "value-4");
  320.         aggregatorResult.put("3", "value-3");
  321.         aggregatorResult.put("2", "value-2");
  322.  
  323.         InvocableMap.ParallelAwareAggregator aggregator = mock(InvocableMap.ParallelAwareAggregator.class);
  324.  
  325.         when(aggregator.aggregateResults(anyCollection())).thenReturn(aggregatorResult);
  326.  
  327.         LimitAggregator limitAggregator = new LimitAggregator(aggregator, 2, true, null);
  328.         Map<String,String> result = (Map<String,String>) limitAggregator.aggregateResults(collection);
  329.  
  330.         verify(aggregator).aggregateResults(collection);
  331.         assertThat(result.keySet(), contains("1", "2"));
  332.         assertThat(result, hasEntry("1", "value-1"));
  333.         assertThat(result, hasEntry("2", "value-2"));
  334.     }
  335.  
  336.     @SuppressWarnings({"unchecked"})
  337.     @Test
  338.     public void shouldJustReturnSingletonCollectionWhenParallelResultIsObject() throws Exception
  339.     {
  340.         Collection<String> collection = Arrays.asList("1", "4", "3", "2");
  341.         Long aggregatorResult = 1234L;
  342.  
  343.         InvocableMap.ParallelAwareAggregator aggregator = mock(InvocableMap.ParallelAwareAggregator.class);
  344.  
  345.         when(aggregator.aggregateResults(anyCollection())).thenReturn(aggregatorResult);
  346.  
  347.         LimitAggregator limitAggregator = new LimitAggregator(aggregator, 2, true, null);
  348.         Collection<Long> result = (Collection<Long>) limitAggregator.aggregateResults(collection);
  349.  
  350.         verify(aggregator).aggregateResults(collection);
  351.         assertThat(result, contains(aggregatorResult));
  352.     }
  353.  
  354.     @SuppressWarnings({"unchecked"})
  355.     @Test
  356.     public void shouldSortFinalCollectionResultFromCacheFilterAggregate() throws Exception
  357.     {
  358.         Collection<String> collection = Arrays.asList("1", "4", "3", "2");
  359.  
  360.         InvocableMap.ParallelAwareAggregator aggregator = mock(InvocableMap.ParallelAwareAggregator.class);
  361.         InvocableMap cache = mock(InvocableMap.class);
  362.         Filter filter = new EqualsFilter();
  363.  
  364.         when(cache.aggregate(any(Filter.class), any(InvocableMap.EntryAggregator.class))).thenReturn(collection);
  365.  
  366.         LimitAggregator limitAggregator = new LimitAggregator(aggregator, 4, true, null);
  367.         Collection<String> result = (Collection<String>) limitAggregator.aggregate(cache, filter);
  368.         assertThat(result, contains("1", "2", "3", "4"));
  369.     }
  370.  
  371.     @SuppressWarnings({"unchecked"})
  372.     @Test
  373.     public void shouldSortFinalMapResultFromCacheFilterAggregate() throws Exception
  374.     {
  375.         Map<String,String> aggregatorResult = new HashMap<String, String>();
  376.         aggregatorResult.put("1", "value-1");
  377.         aggregatorResult.put("4", "value-4");
  378.         aggregatorResult.put("3", "value-3");
  379.         aggregatorResult.put("2", "value-2");
  380.  
  381.         InvocableMap.ParallelAwareAggregator aggregator = mock(InvocableMap.ParallelAwareAggregator.class);
  382.         InvocableMap cache = mock(InvocableMap.class);
  383.         Filter filter = new EqualsFilter();
  384.  
  385.         when(cache.aggregate(any(Filter.class), any(InvocableMap.EntryAggregator.class))).thenReturn(aggregatorResult);
  386.  
  387.         LimitAggregator limitAggregator = new LimitAggregator(aggregator, 4, true, null);
  388.         Map<String,String> result = (Map<String, String>) limitAggregator.aggregate(cache, filter);
  389.         assertThat(result, is(aggregatorResult));
  390.         assertThat(result.keySet(), contains("1", "2", "3", "4"));
  391.     }
  392.  
  393.     @SuppressWarnings({"unchecked"})
  394.     @Test
  395.     public void shouldSortFinalCollectionResultFromCacheKeySetAggregate() throws Exception
  396.     {
  397.         Collection<String> collection = Arrays.asList("1", "4", "3", "2");
  398.  
  399.         InvocableMap.ParallelAwareAggregator aggregator = mock(InvocableMap.ParallelAwareAggregator.class);
  400.         InvocableMap cache = mock(InvocableMap.class);
  401.         Collection keys = Arrays.asList("A", "B");
  402.  
  403.         when(cache.aggregate(anyCollection(), any(InvocableMap.EntryAggregator.class))).thenReturn(collection);
  404.  
  405.         LimitAggregator limitAggregator = new LimitAggregator(aggregator, 4, true, null);
  406.         Collection<String> result = (Collection<String>) limitAggregator.aggregate(cache, keys);
  407.         assertThat(result, contains("1", "2", "3", "4"));
  408.     }
  409.  
  410.     @SuppressWarnings({"unchecked"})
  411.     @Test
  412.     public void shouldSortFinalMapResultFromCacheKeySetAggregate() throws Exception
  413.     {
  414.         Map<String,String> aggregatorResult = new HashMap<String, String>();
  415.         aggregatorResult.put("1", "value-1");
  416.         aggregatorResult.put("4", "value-4");
  417.         aggregatorResult.put("3", "value-3");
  418.         aggregatorResult.put("2", "value-2");
  419.  
  420.         InvocableMap.ParallelAwareAggregator aggregator = mock(InvocableMap.ParallelAwareAggregator.class);
  421.         InvocableMap cache = mock(InvocableMap.class);
  422.         Collection keys = Arrays.asList("A", "B");
  423.  
  424.         when(cache.aggregate(anyCollection(), any(InvocableMap.EntryAggregator.class))).thenReturn(aggregatorResult);
  425.  
  426.         LimitAggregator limitAggregator = new LimitAggregator(aggregator, 4, true, null);
  427.         Map<String,String> result = (Map<String, String>) limitAggregator.aggregate(cache, keys);
  428.         assertThat(result, is(aggregatorResult));
  429.         assertThat(result.keySet(), contains("1", "2", "3", "4"));
  430.     }
  431. }
/*
 * File: LimitAggregatorTest.java
 *
 * Copyright (c) 2012. All Rights Reserved. Jonathan Knight.
 *
 * Jonathan Knight makes no representations or warranties about the
 * suitability of the software, either express or implied, including but not
 * limited to the implied warranties of merchantability, fitness for a
 * particular purpose, or non-infringement. Jonathan Knight shall not be
 * liable for any damages suffered by licensee as a result of using, modifying
 * or distributing this software or its derivatives.
 *
 * This notice may not be removed or altered.
 */
package com.thegridman.coherence.aggregators;

import com.tangosol.io.pof.ConfigurablePofContext;
import com.tangosol.util.Binary;
import com.tangosol.util.ExternalizableHelper;
import com.tangosol.util.Filter;
import com.tangosol.util.InvocableMap;
import com.tangosol.util.aggregator.DistinctValues;
import com.tangosol.util.comparator.EntryComparator;
import com.tangosol.util.comparator.InverseComparator;
import com.tangosol.util.filter.EqualsFilter;
import org.junit.Test;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
import static org.hamcrest.collection.IsMapContaining.hasEntry;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyCollection;
import static org.mockito.Matchers.anySet;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
 * @author Jonathan Knight
 */
public class LimitAggregatorTest
{
    @Test
    public void shouldSerializeAndDeserialize() throws Exception
    {
        ConfigurablePofContext pofContext = new ConfigurablePofContext("gridman-pof-config.xml");

        LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 100L, true, new EntryComparator());
        Binary binary = ExternalizableHelper.toBinary(aggregator, pofContext);
        LimitAggregator result = (LimitAggregator) ExternalizableHelper.fromBinary(binary, pofContext);

        assertThat(result.getAggregator(), is(instanceOf(DistinctValues.class)));
        assertThat(result.getLimit(), is(100L));
        assertThat(result.isOrdered(), is(true));
        assertThat(result.getComparator(), is(instanceOf(EntryComparator.class)));
    }

    @Test
    public void shouldReturnSelfForParallelAggregator() throws Exception
    {
        LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 100L, true, new EntryComparator());
        assertThat((LimitAggregator) aggregator.getParallelAggregator(), is(sameInstance(aggregator)));
    }

    @Test
    public void shouldCreateCopyOfCollectionInSameOrder() throws Exception
    {
        LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 100L, false, null);

        Collection<String> collection = Arrays.asList("1", "3", "4", "2");

        Collection<String> result = aggregator.copyCollection(collection);
        assertThat(result, is(not(sameInstance(collection))));
        assertThat(result, contains("1", "3", "4", "2"));
    }

    @Test
    public void shouldCreateOrderedCopyOfCollection() throws Exception
    {
        LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 100L, true, null);

        Collection<String> collection = Arrays.asList("1", "4", "3", "2");

        Collection<String> result = aggregator.copyCollection(collection);
        assertThat(result, is(not(sameInstance(collection))));
        assertThat(result, contains("1", "2", "3", "4"));
    }

    @Test
    public void shouldCreateOrderedCopyOfCollectionUsingComparator() throws Exception
    {
        LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 100L, true, new InverseComparator());

        Collection<String> collection = Arrays.asList("1", "4", "3", "2");

        Collection<String> result = aggregator.copyCollection(collection);
        assertThat(result, is(not(sameInstance(collection))));
        assertThat(result, contains("4", "3", "2", "1"));
    }

    @Test
    public void shouldCreateTruncatedCopyOfCollectionInSameOrder() throws Exception
    {
        LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 3, false, null);

        Collection<String> collection = Arrays.asList("1", "3", "4", "2");

        Collection<String> result = aggregator.truncateCollection(collection);
        assertThat(result, is(not(sameInstance(collection))));
        assertThat(result, contains("1", "3", "4"));
    }

    @Test
    public void shouldCreateTruncatedOrderedCopyOfCollection() throws Exception
    {
        LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 2, true, null);

        Collection<String> collection = Arrays.asList("1", "4", "3", "2");

        Collection<String> result = aggregator.truncateCollection(collection);
        assertThat(result, is(not(sameInstance(collection))));
        assertThat(result, contains("1", "2"));
    }

    @Test
    public void shouldCreateCopyOfMap() throws Exception
    {
        LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 100L, false, null);

        Map<String,String> map = new HashMap<String,String>();
        map.put("1", "value-1");
        map.put("4", "value-4");
        map.put("3", "value-3");
        map.put("2", "value-2");

        Map<String,String> result = aggregator.copyMap(map);
        assertThat(result, is(not(sameInstance(map))));
        assertThat(result, is(map));
    }

    @Test
    public void shouldCreateOrderedCopyOfMap() throws Exception
    {
        LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 100L, true, null);

        Map<String,String> map = new HashMap<String,String>();
        map.put("1", "value-1");
        map.put("4", "value-4");
        map.put("3", "value-3");
        map.put("2", "value-2");

        Map<String,String> result = aggregator.copyMap(map);
        assertThat(result, is(not(sameInstance(map))));
        assertThat(result, hasEntry("1", "value-1"));
        assertThat(result, hasEntry("4", "value-4"));
        assertThat(result, hasEntry("3", "value-3"));
        assertThat(result, hasEntry("2", "value-2"));
        assertThat(result.keySet(), contains("1", "2", "3", "4"));
    }

    @Test
    public void shouldCreateOrderedCopyOfMapUsingComparator() throws Exception
    {
        LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 100L, true, new InverseComparator());

        Map<String,String> map = new HashMap<String,String>();
        map.put("1", "value-1");
        map.put("4", "value-4");
        map.put("3", "value-3");
        map.put("2", "value-2");

        Map<String,String> result = aggregator.copyMap(map);
        assertThat(result, is(not(sameInstance(map))));
        assertThat(result, hasEntry("1", "value-1"));
        assertThat(result, hasEntry("4", "value-4"));
        assertThat(result, hasEntry("3", "value-3"));
        assertThat(result, hasEntry("2", "value-2"));
        assertThat(result.keySet(), contains("4", "3", "2", "1"));
    }

    @Test
    public void shouldCreateOrderedCopyOfMapSortingValues() throws Exception
    {
        LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 100L, true, null, EntryComparator.CMP_VALUE);

        Map<String,String> map = new HashMap<String,String>();
        map.put("1", "value-C");
        map.put("4", "value-D");
        map.put("3", "value-B");
        map.put("2", "value-A");

        Map<String,String> result = aggregator.copyMap(map);
        assertThat(result, is(not(sameInstance(map))));
        assertThat(result, hasEntry("1", "value-C"));
        assertThat(result, hasEntry("4", "value-D"));
        assertThat(result, hasEntry("3", "value-B"));
        assertThat(result, hasEntry("2", "value-A"));
        assertThat(result.keySet(), contains("2", "3", "1", "4"));
    }

    @Test
    public void shouldCreateOrderedCopyOfMapSortingValuesUsingComparator() throws Exception
    {
        LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 100L, true, new InverseComparator(), EntryComparator.CMP_VALUE);

        Map<String,String> map = new HashMap<String,String>();
        map.put("1", "value-C");
        map.put("4", "value-D");
        map.put("3", "value-B");
        map.put("2", "value-A");

        Map<String,String> result = aggregator.copyMap(map);
        assertThat(result, is(not(sameInstance(map))));
        assertThat(result, hasEntry("1", "value-C"));
        assertThat(result, hasEntry("4", "value-D"));
        assertThat(result, hasEntry("3", "value-B"));
        assertThat(result, hasEntry("2", "value-A"));
        assertThat(result.keySet(), contains("4", "1", "3", "2"));
    }

    @Test
    public void shouldCreateTruncatedCopyOfMap() throws Exception
    {
        LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 3, false, null);

        Map<String,String> map = new HashMap<String,String>();
        map.put("1", "value-1");
        map.put("4", "value-4");
        map.put("3", "value-3");
        map.put("2", "value-2");

        Map<String,String> result = aggregator.truncateMap(map);
        assertThat(result, is(not(sameInstance(map))));
        assertThat(result.size(), is(3));
    }

    @Test
    public void shouldCreateOrderedTruncatedCopyOfMap() throws Exception
    {
        LimitAggregator aggregator = new LimitAggregator(new DistinctValues(), 2, true, null);

        Map<String,String> map = new HashMap<String,String>();
        map.put("1", "value-1");
        map.put("4", "value-4");
        map.put("3", "value-3");
        map.put("2", "value-2");

        Map<String,String> result = aggregator.truncateMap(map);
        assertThat(result, is(not(sameInstance(map))));
        assertThat(result, hasEntry("1", "value-1"));
        assertThat(result, hasEntry("2", "value-2"));
        assertThat(result.keySet(), contains("1", "2"));
    }

    @Test
    public void shouldCallAggregatorToAggregateEntries() throws Exception
    {
        InvocableMap.EntryAggregator aggregator = mock(InvocableMap.EntryAggregator.class);
        Set entries = new HashSet();
        Object aggregatorResult = new Object();

        when(aggregator.aggregate(anySet())).thenReturn(aggregatorResult);

        LimitAggregator limitAggregator = new LimitAggregator(aggregator, 2, true, null);
        Object result = limitAggregator.aggregate(entries);

        assertThat(result, is(sameInstance(aggregatorResult)));
        verify(aggregator).aggregate(same(entries));
    }

    @SuppressWarnings({"unchecked"})
    @Test
    public void shouldJustTruncateResultWhenAggregatorIsNotParalleAware() throws Exception
    {
        Collection<String> collection = Arrays.asList("1", "4", "3", "2");
        InvocableMap.EntryAggregator aggregator = mock(InvocableMap.EntryAggregator.class);

        LimitAggregator limitAggregator = new LimitAggregator(aggregator, 2, true, null);
        Collection<String> result = (Collection<String>) limitAggregator.aggregateResults(collection);
        assertThat(result, contains("1", "2"));
    }

    @SuppressWarnings({"unchecked"})
    @Test
    public void shouldUseUnderlyingParallelAggregatorToAggregateResultsAndTruncateReturnedCollection() throws Exception
    {
        Collection<String> collection = Arrays.asList("1", "4", "3", "2");
        Collection<String> aggregatorResult = Arrays.asList("9", "7", "8", "6");
        InvocableMap.ParallelAwareAggregator aggregator = mock(InvocableMap.ParallelAwareAggregator.class);

        when(aggregator.aggregateResults(anyCollection())).thenReturn(aggregatorResult);

        LimitAggregator limitAggregator = new LimitAggregator(aggregator, 2, true, null);
        Collection<String> result = (Collection<String>) limitAggregator.aggregateResults(collection);

        verify(aggregator).aggregateResults(collection);
        assertThat(result, contains("6", "7"));
    }

    @SuppressWarnings({"unchecked"})
    @Test
    public void shouldUseUnderlyingParallelAggregatorToAggregateResultsAndTruncateReturnedMap() throws Exception
    {
        Collection<String> collection = Arrays.asList("1", "4", "3", "2");
        Map<String,String> aggregatorResult = new HashMap<String, String>();
        aggregatorResult.put("1", "value-1");
        aggregatorResult.put("4", "value-4");
        aggregatorResult.put("3", "value-3");
        aggregatorResult.put("2", "value-2");

        InvocableMap.ParallelAwareAggregator aggregator = mock(InvocableMap.ParallelAwareAggregator.class);

        when(aggregator.aggregateResults(anyCollection())).thenReturn(aggregatorResult);

        LimitAggregator limitAggregator = new LimitAggregator(aggregator, 2, true, null);
        Map<String,String> result = (Map<String,String>) limitAggregator.aggregateResults(collection);

        verify(aggregator).aggregateResults(collection);
        assertThat(result.keySet(), contains("1", "2"));
        assertThat(result, hasEntry("1", "value-1"));
        assertThat(result, hasEntry("2", "value-2"));
    }

    @SuppressWarnings({"unchecked"})
    @Test
    public void shouldJustReturnSingletonCollectionWhenParallelResultIsObject() throws Exception
    {
        Collection<String> collection = Arrays.asList("1", "4", "3", "2");
        Long aggregatorResult = 1234L;

        InvocableMap.ParallelAwareAggregator aggregator = mock(InvocableMap.ParallelAwareAggregator.class);

        when(aggregator.aggregateResults(anyCollection())).thenReturn(aggregatorResult);

        LimitAggregator limitAggregator = new LimitAggregator(aggregator, 2, true, null);
        Collection<Long> result = (Collection<Long>) limitAggregator.aggregateResults(collection);

        verify(aggregator).aggregateResults(collection);
        assertThat(result, contains(aggregatorResult));
    }

    @SuppressWarnings({"unchecked"})
    @Test
    public void shouldSortFinalCollectionResultFromCacheFilterAggregate() throws Exception
    {
        Collection<String> collection = Arrays.asList("1", "4", "3", "2");

        InvocableMap.ParallelAwareAggregator aggregator = mock(InvocableMap.ParallelAwareAggregator.class);
        InvocableMap cache = mock(InvocableMap.class);
        Filter filter = new EqualsFilter();

        when(cache.aggregate(any(Filter.class), any(InvocableMap.EntryAggregator.class))).thenReturn(collection);

        LimitAggregator limitAggregator = new LimitAggregator(aggregator, 4, true, null);
        Collection<String> result = (Collection<String>) limitAggregator.aggregate(cache, filter);
        assertThat(result, contains("1", "2", "3", "4"));
    }

    @SuppressWarnings({"unchecked"})
    @Test
    public void shouldSortFinalMapResultFromCacheFilterAggregate() throws Exception
    {
        Map<String,String> aggregatorResult = new HashMap<String, String>();
        aggregatorResult.put("1", "value-1");
        aggregatorResult.put("4", "value-4");
        aggregatorResult.put("3", "value-3");
        aggregatorResult.put("2", "value-2");

        InvocableMap.ParallelAwareAggregator aggregator = mock(InvocableMap.ParallelAwareAggregator.class);
        InvocableMap cache = mock(InvocableMap.class);
        Filter filter = new EqualsFilter();

        when(cache.aggregate(any(Filter.class), any(InvocableMap.EntryAggregator.class))).thenReturn(aggregatorResult);

        LimitAggregator limitAggregator = new LimitAggregator(aggregator, 4, true, null);
        Map<String,String> result = (Map<String, String>) limitAggregator.aggregate(cache, filter);
        assertThat(result, is(aggregatorResult));
        assertThat(result.keySet(), contains("1", "2", "3", "4"));
    }

    @SuppressWarnings({"unchecked"})
    @Test
    public void shouldSortFinalCollectionResultFromCacheKeySetAggregate() throws Exception
    {
        Collection<String> collection = Arrays.asList("1", "4", "3", "2");

        InvocableMap.ParallelAwareAggregator aggregator = mock(InvocableMap.ParallelAwareAggregator.class);
        InvocableMap cache = mock(InvocableMap.class);
        Collection keys = Arrays.asList("A", "B");

        when(cache.aggregate(anyCollection(), any(InvocableMap.EntryAggregator.class))).thenReturn(collection);

        LimitAggregator limitAggregator = new LimitAggregator(aggregator, 4, true, null);
        Collection<String> result = (Collection<String>) limitAggregator.aggregate(cache, keys);
        assertThat(result, contains("1", "2", "3", "4"));
    }

    @SuppressWarnings({"unchecked"})
    @Test
    public void shouldSortFinalMapResultFromCacheKeySetAggregate() throws Exception
    {
        Map<String,String> aggregatorResult = new HashMap<String, String>();
        aggregatorResult.put("1", "value-1");
        aggregatorResult.put("4", "value-4");
        aggregatorResult.put("3", "value-3");
        aggregatorResult.put("2", "value-2");

        InvocableMap.ParallelAwareAggregator aggregator = mock(InvocableMap.ParallelAwareAggregator.class);
        InvocableMap cache = mock(InvocableMap.class);
        Collection keys = Arrays.asList("A", "B");

        when(cache.aggregate(anyCollection(), any(InvocableMap.EntryAggregator.class))).thenReturn(aggregatorResult);

        LimitAggregator limitAggregator = new LimitAggregator(aggregator, 4, true, null);
        Map<String,String> result = (Map<String, String>) limitAggregator.aggregate(cache, keys);
        assertThat(result, is(aggregatorResult));
        assertThat(result.keySet(), contains("1", "2", "3", "4"));
    }
}

There is nothing too complex in the LimitAggregator, it is a pretty standard InvocableMap.ParallelAwareAggregator and PortableObject implementation.
The getParallelAggregator() method just returns itself. The aggregator is thread safe – providing the underlying aggregator and Comparator are also thread safe so there is no harm doing this.
The aggregate(Set entries) method just calls the underlying aggregator and returns whatever result it returns; we do not need to do anything special at this stage.
The bulk of the work is done in the aggregateResults(Collection results) method. In this method we do the following

  • Check to see if the underlying aggregator is an InvocableMap.ParallelAwareAggregator. If not we just assume the results Collection is the set of results to be truncated.
  • If we do have a InvocableMap.ParallelAwareAggregator then we call it to do the final aggregation of the results then depending on what is returned from that call we process these results further.
  • If the underlying aggregator returns a Collection then we truncate it.
  • If the underlying aggregator returns a Map then we truncate it.
  • If the underlying aggregator returns anything else then we wrap it in a Collection and return that.

The methods that do the truncation and ordering of either a Collection or Map results set need a little explaining though. We will start with the simple case that is when dealing with a Collection of results.

Truncating Collections
In our aggregateResults method if we have results that are a Collection then we call the truncateCollection method.
The truncateCollection method first calls copyCollection that creates a copy of the results. If the ordered flag is true then the copy of the Collection will be sorted using the comparator.
The truncateCollection method then truncates the copy down to the required size and returns it.

Truncating Maps
In our aggregateResults method if we have results that are a Map then we call the truncateMap method.
The truncateMap method first calls the copyMap method that makes a copy of the results and if the ordered flag is true then the copy will be sorted.
Sorting the copy of the Map uses the Coherence EntryComparator class to do the sorting as this allows us to have a Comparator that targets either the Map’s key or value or combination of the two; we cover this in our final example later.
The truncateMap method then takes the copied Map and truncates it down to the required size – this is done by just truncating its keySet.

— Lady Macbeth:
Come you Spirits,
That tend on mortall thoughts, vnsex me here,
And fill me from the Crowne to the Toe, top-full
Of direst Crueltie: make thick my blood,
Stop vp th’ accesse, and passage to Remorse,
That no compunctious visitings of Nature
Shake my fell purpose, nor keepe peace betweene
Th’ effect, and hit. Come to my Womans Brests,
And take my Milke for Gall, you murth’ring Ministers,
Where-euer, in your sightlesse substances,
You wait on Natures Mischiefe. Come thick Night,
And pall thee in the dunnest smoake of Hell
Macbeth (I, v)

When the LimitAggregator has run it will return either a Map, if the underlying aggregator returns a Map, or it will return a Collection.

One final problem is that if we use this aggregator just like a normal aggregator then it will return the Collection or Map and Coherence will serialize this, send it over the wire, and deserialize it. Now, for a Collection we will be OK, as the order will be maintined during serialization but for a Map it will not. For that reason I added two methods aggregate(InvocableMap map, Filter filter) and aggregate(InvocableMap map, Collection keys) to the LimitAggregator. All these methods do is perform the normal call to cache.aggregate then if required sort the returned value.

So finaly the client side code to perform our original requirement – get the top five most used lists of words of five letters or longer.

  1. NamedCache wordsCache = CacheFactory.getCache("words");
  2.  
  3. ValueExtractor wordExtractor = new PofExtractor(String.class, 0);
  4. ValueExtractor countExtractor = new PofExtractor(Long.class, 1);
  5.  
  6. GroupAggregator groupAggregator =
  7.         GroupAggregator.createInstance(countExtractor, new DistinctValues(wordExtractor));
  8.  
  9. Comparator comparator = new InverseComparator();
  10. LimitAggregator aggregator = new LimitAggregator(groupAggregator, 5, true, comparator);
  11. Filter wordSizeFilter = new GreaterFilter(new PofExtractor(Integer.class, 2), 4);
  12.  
  13. Map<Long,Collection> result = (Map<Long, Collection>) aggregator.aggregate(wordsCache, wordSizeFilter);
  14.  
  15. System.out.println("Result: shouldCountWordsWithLimit");
  16. for (Map.Entry<Long,Collection> entry : result.entrySet())
  17. {
  18.     System.out.println(entry.getKey() + "\t" + entry.getValue());
  19. }
NamedCache wordsCache = CacheFactory.getCache("words");

ValueExtractor wordExtractor = new PofExtractor(String.class, 0);
ValueExtractor countExtractor = new PofExtractor(Long.class, 1);

GroupAggregator groupAggregator =
        GroupAggregator.createInstance(countExtractor, new DistinctValues(wordExtractor));

Comparator comparator = new InverseComparator();
LimitAggregator aggregator = new LimitAggregator(groupAggregator, 5, true, comparator);
Filter wordSizeFilter = new GreaterFilter(new PofExtractor(Integer.class, 2), 4);

Map<Long,Collection> result = (Map<Long, Collection>) aggregator.aggregate(wordsCache, wordSizeFilter);

System.out.println("Result: shouldCountWordsWithLimit");
for (Map.Entry<Long,Collection> entry : result.entrySet())
{
    System.out.println(entry.getKey() + "\t" + entry.getValue());
}

Which outputs the results we expected

57 [which]
48 [enter]
36 [shall]
34 [macbeth]
32 [their]

So there we are, a working, pretty generic, LimitAggregator which can be used to limit the result set of an underlying aggregator, optionally sorting the results.

More Oracle Coherence Top n Query Examples

So far in our example we have only wrapped a GroupAggregator inside our LimitAggregator but just to show this works with any aggregator we will do a few more queries.

A DistinctValues Aggregator Example

This time we want to find the first 10 words (sorted alphabetically) that start with the letter “M”.

  1. NamedCache wordsCache = CacheFactory.getCache("words");
  2.  
  3. ValueExtractor wordExtractor = new PofExtractor(String.class, 0);
  4.  
  5. InvocableMap.EntryAggregator aggregator = new DistinctValues(wordExtractor);
  6.  
  7. Filter wordFilter = new LikeFilter(wordExtractor, "m%", '\\', true);
  8.  
  9. LimitAggregator limitAggregator = new LimitAggregator(aggregator, 10, true, null);
  10.  
  11. Object result = limitAggregator.aggregate(wordsCache, wordFilter);
  12. System.out.println("Result:");
  13. System.out.println(result);
NamedCache wordsCache = CacheFactory.getCache("words");

ValueExtractor wordExtractor = new PofExtractor(String.class, 0);

InvocableMap.EntryAggregator aggregator = new DistinctValues(wordExtractor);

Filter wordFilter = new LikeFilter(wordExtractor, "m%", '\\', true);

LimitAggregator limitAggregator = new LimitAggregator(aggregator, 10, true, null);

Object result = limitAggregator.aggregate(wordsCache, wordFilter);
System.out.println("Result:");
System.out.println(result);

In this case we just use the DistinctValues aggregator created on line 5 to extract the words. On line 7 we create a LikeFilter that will match any String starting with “m”. We then create the LimitAggregator on line 10 passing in our DistinctValues aggregator, a limit of 10 and set the sort flag to true. There is no need to specify a Comparator as the value returned will be Strings which are already Comparable. Running the above query against the same words cache we get…

[m, mac, macb, macbeth, macbeths, macd, macdonwald, macduff, macduffe, mad]

…a list of 10 words beginning with “M” (as we asked for) sorted alphabetically.

A ReducerAggregator Example

A ReducerAggregator is used to run a ValueExtractor against cache entries and return the extracted values. The result returned by the ReducerAggregator is a Map where the key is the key of the cache entry and the value is the extracted value. Looking back at our Word class, we see we have a word length field so as an example using the ReducerAggregator we are going to find the Top n longest words in the cache. The ReducerAggregator to extract word length would look like this…

  1. ValueExtractor lengthExtractor = new PofExtractor(Integer.class, 2);
  2. InvocableMap.EntryAggregator aggregator = new ReducerAggregator(lengthExtractor);
ValueExtractor lengthExtractor = new PofExtractor(Integer.class, 2);
InvocableMap.EntryAggregator aggregator = new ReducerAggregator(lengthExtractor);

…and if we run it against the words cache and list the Map of returned results we see this…

ayre-drawne-dagger 18
selfe-comparisons 17
temple-haunting 15
trumpet-tongu’d 15
hurley-burley’s 15
heat-oppressed 14
new-borne-babe 14
deuill-porter 13
assassination 13
braine-sickly 13
vnaccompanied 13
gallowgrosses 13
distinguishes 13
nose-painting 13
chamberlaines 13
commendations 13
multitudinous 13
supernaturall 13
metaphysicall 13
construction 12
master-peece 12
… and so on …

I have sorted manually the above list to show the top length words, the result Map returned by Coherence is not actually ordered at all. You can see that Shakespeare likes his hyphenated words and if you are wondering, “ayre-drawne-dagger” means “air drawn dagger” which is the imaginary dagger Macbeth sees before him (Act II scene ii, quoted ealier).

We can now wrap the same ReducerAggregator inside our LimitAggregator and run the same query to bring back the top n words by length. There is a “but” though…
Previously we have sorted the result map by key, the length is in the Map values. This is why our LimitAggregator has functionality to sort and truncate a Map based on keys or values or a combination of the two. In this case we want to sort on values.
So lets find the four longest words in the cache…

  1. NamedCache wordsCache = CacheFactory.getCache("words");
  2.  
  3. ValueExtractor lengthExtractor = new PofExtractor(Integer.class, 2);
  4.  
  5. InvocableMap.EntryAggregator aggregator = new ReducerAggregator(lengthExtractor);
  6.  
  7. LimitAggregator limitAggregator =
  8.     new LimitAggregator(aggregator, 4, true, new InverseComparator(), EntryComparator.CMP_VALUE);
  9.  
  10. Map result = (Map) limitAggregator.aggregate(wordsCache, AlwaysFilter.INSTANCE);
  11.  
  12. System.out.println("Result: shouldGetLongestFourWordsInOrder");
  13. for (Map.Entry entry : (Set<Map.Entry>) result.entrySet())
  14. {
  15.     System.out.println(entry.getKey() + "\t" + entry.getValue());
  16. }
NamedCache wordsCache = CacheFactory.getCache("words");

ValueExtractor lengthExtractor = new PofExtractor(Integer.class, 2);

InvocableMap.EntryAggregator aggregator = new ReducerAggregator(lengthExtractor);

LimitAggregator limitAggregator =
    new LimitAggregator(aggregator, 4, true, new InverseComparator(), EntryComparator.CMP_VALUE);

Map result = (Map) limitAggregator.aggregate(wordsCache, AlwaysFilter.INSTANCE);

System.out.println("Result: shouldGetLongestFourWordsInOrder");
for (Map.Entry entry : (Set<Map.Entry>) result.entrySet())
{
    System.out.println(entry.getKey() + "\t" + entry.getValue());
}

On line 5 we create our ReducerAggregator passing in the lengthExtractor, which is just a PofExtractor to extract the length as an integer we created on line 3.
On line 7 we create the LimitAggregator. We pass in our ReducerAggregator, a limit of 4, set the sorted flag to true and we use a Coherence InverseComparator which will sort in reverse order. Finally we tell the LimitAggregator we want the Comparator to target the values of the Map rather than the default of targeting the key. Obvioulsy specifying a sort target only makes sense if the underlying aggregator is going to return a Map, which a ReducerAggregator does. There would be not point doing this with a DistinctValues aggregator and the code in the LimitAggregator will ignore that value for non-Map results.
If we run the code above to get the top four words by length we see the correct results (sort of).

ayre-drawne-dagger 18
selfe-comparisons 17
temple-haunting 15
trumpet-tongu’d 15

There is no guarantee which of the words of 15 characters we would have returned (in this case we got “temple-haunting” and “trumpet-tongu’d” – you’ll have to Google for what they mean). If we wanted to control the order of words with the same length then we would have needed to supply a more complex Comparator that sorted the words of the same length alphabetically. Believe it or not we can do this with built in Coherence Comparators classes (and classes you might not realise are Comparators) – here is the code…

  1. NamedCache wordsCache = CacheFactory.getCache("words");
  2.  
  3. ValueExtractor lengthExtractor = new PofExtractor(Integer.class, 2);
  4.  
  5. InvocableMap.EntryAggregator aggregator = new ReducerAggregator(lengthExtractor);
  6.  
  7. Comparator comparator = new ChainedComparator(
  8.         new InverseComparator(),
  9.         new KeyExtractor()
  10. );
  11.  
  12. LimitAggregator limitAggregator = new LimitAggregator(aggregator, 5, true, comparator, EntryComparator.CMP_ENTRY);
  13. Map result = (Map) limitAggregator.aggregate(wordsCache, AlwaysFilter.INSTANCE);
  14.  
  15. System.out.println("Result: shouldGetLongestFiveWordsInOrderWithSameLengthWordsSorted");
  16. for (Map.Entry entry : (Set<Map.Entry>) result.entrySet())
  17. {
  18.     System.out.println(entry.getKey() + "\t" + entry.getValue());
  19. }
NamedCache wordsCache = CacheFactory.getCache("words");

ValueExtractor lengthExtractor = new PofExtractor(Integer.class, 2);

InvocableMap.EntryAggregator aggregator = new ReducerAggregator(lengthExtractor);

Comparator comparator = new ChainedComparator(
        new InverseComparator(),
        new KeyExtractor()
);

LimitAggregator limitAggregator = new LimitAggregator(aggregator, 5, true, comparator, EntryComparator.CMP_ENTRY);
Map result = (Map) limitAggregator.aggregate(wordsCache, AlwaysFilter.INSTANCE);

System.out.println("Result: shouldGetLongestFiveWordsInOrderWithSameLengthWordsSorted");
for (Map.Entry entry : (Set<Map.Entry>) result.entrySet())
{
    System.out.println(entry.getKey() + "\t" + entry.getValue());
}

The code is almost identical to the previous code with the exception that we now use a limit of 5 in our LimitAggregator and we use a ChainedComparator on line 7.

— Lady Macbeth:
Glamys thou art, and Cawdor, and shalt be
What thou art promis’d: yet doe I feare thy Nature,
It is too full o’th’ Milke of humane kindnesse,
To catch the neerest way. Thou would’st be great,
Art not without Ambition, but without
The illnesse should attend it.
Macbeth (I, v)

When we construct the LimitAggregator on line 12 you can see we target the Comparator to the result Map.Entry as a whole using the EntryComparator.CMP_ENTRY parameter; this will make sense when we describe the Comparator.
A Coherence ChainedComparator does just what its name suggests and applies the comparators in order by chaining them together so we can have a nested sort order.
Now I can see people here going “Hang on why are you passing a KeyExtractor to the ChainedComparator?”, well, believe it or not, almost all Coherence built-in ValueExtractor implementations also implement Comparator, or more specifically the Coherence EntryComparator which means they can also be used to sort Map.Entry collections. See, Coherence never ceases to turn up something new after years of using it.
So in our case the first level of our ChainedComparator, on line 8, is an InverseExtractor, which sorts in reverse order. An InverseExtractor is an EntryExtractor and by default will target the entry value, in this case that is the word length, so our first level of sorting is length.
The next level of the ChainedComparator, on line 9, is a KeyExtractor, which as we have already said is also an EntryComparator, but being a KeyExtractor it targets the key of the Map entry, in this case that is the word itself, so the second level of our sort is the word.
This time we asked for the top five words by word length and alphbetically and here are the results…

ayre-drawne-dagger 18
selfe-comparisons 17
hurley-burley’s 15
temple-haunting 15
trumpet-tongu’d 15

You can see that now that words with the same length are sorted alphabetically, all done with built-in Coherence Comparators and classes you probably never realised were Comparators unless you are totally geeky and read and digest every word of the JavaDocs.

A Complex Top n Query

Finally, what if we wanted something even more complex, say we wanted the whole cache entry in the results returned to the client, but the set of result entries sorted and truncated on one or more attributes of the value. OK, in our scenario we will bring back the whole Word entry from the cache. We want the top 15 entries orderd by word length then usage count then alphabetically.
Here is the code…

  1. NamedCache wordsCache = CacheFactory.getCache("words");
  2.  
  3. InvocableMap.EntryAggregator aggregator = new ReducerAggregator(new IdentityExtractor());
  4.  
  5. Comparator comparator = new EntryComparator(
  6.         new ChainedComparator( new Comparator[] {
  7.                 new InverseComparator(new ReflectionExtractor("getLength")),
  8.                 new ReflectionExtractor("getCount"),
  9.                 new ReflectionExtractor("getWord")
  10.         })
  11. );
  12.  
  13. LimitAggregator limitAggregator = new LimitAggregator(aggregator, 15, true, comparator, EntryComparator.CMP_ENTRY);
  14. Map result = (Map) limitAggregator.aggregate(wordsCache, AlwaysFilter.INSTANCE);
  15.  
  16. System.out.println("Result: shouldGetTopFifteenEntriesByLengthTheUsageThenAlphabetically");
  17. for (Map.Entry entry : (Set<Map.Entry>) result.entrySet())
  18. {
  19.     System.out.println(entry.getKey() + "\t" + entry.getValue());
  20. }
NamedCache wordsCache = CacheFactory.getCache("words");

InvocableMap.EntryAggregator aggregator = new ReducerAggregator(new IdentityExtractor());

Comparator comparator = new EntryComparator(
        new ChainedComparator( new Comparator[] {
                new InverseComparator(new ReflectionExtractor("getLength")),
                new ReflectionExtractor("getCount"),
                new ReflectionExtractor("getWord")
        })
);

LimitAggregator limitAggregator = new LimitAggregator(aggregator, 15, true, comparator, EntryComparator.CMP_ENTRY);
Map result = (Map) limitAggregator.aggregate(wordsCache, AlwaysFilter.INSTANCE);

System.out.println("Result: shouldGetTopFifteenEntriesByLengthTheUsageThenAlphabetically");
for (Map.Entry entry : (Set<Map.Entry>) result.entrySet())
{
    System.out.println(entry.getKey() + "\t" + entry.getValue());
}

Looking at the previous example where we were able to use a ValueExtractor as a Comparator, we can do the same thing again here. This time we create a ChainedComparator and within it we put ValueExtrators to extract from the required fields we want to sort on. We cannot use a PofExtractor here as the values will be real objects so we use ReflectionExtractors.
Running the code gives us the following results…

[System:out] 3929: ayre-drawne-dagger Word{count=1, word=’ayre-drawne-dagger’, length=18}
[System:out] 3930: selfe-comparisons Word{count=1, word=’selfe-comparisons’, length=17}
[System:out] 3931: hurley-burley’s Word{count=1, word=’hurley-burley’s’, length=15}
[System:out] 3932: temple-haunting Word{count=1, word=’temple-haunting’, length=15}
[System:out] 3933: trumpet-tongu’d Word{count=1, word=’trumpet-tongu’d’, length=15}
[System:out] 3934: heat-oppressed Word{count=1, word=’heat-oppressed’, length=14}
[System:out] 3935: new-borne-babe Word{count=1, word=’new-borne-babe’, length=14}
[System:out] 3936: assassination Word{count=1, word=’assassination’, length=13}
[System:out] 3937: braine-sickly Word{count=1, word=’braine-sickly’, length=13}
[System:out] 3938: chamberlaines Word{count=1, word=’chamberlaines’, length=13}
[System:out] 3939: commendations Word{count=1, word=’commendations’, length=13}
[System:out] 3940: deuill-porter Word{count=1, word=’deuill-porter’, length=13}
[System:out] 3941: distinguishes Word{count=1, word=’distinguishes’, length=13}
[System:out] 3942: gallowgrosses Word{count=1, word=’gallowgrosses’, length=13}
[System:out] 3943: metaphysicall Word{count=1, word=’metaphysicall’, length=13}

You can see that within the set of words of 13 characters they are sorted by usage, then alphabetically.

Finally…

So that’s it; I hope you’ve learnt something useful – and leaving you with a final bit of Shakespeare…

Whereof what’s past is prologue; what to come,
In yours and my discharge.
The Tempest, Act 2, scene 1, 245

…which is means something like everything we’ve done so far was the introduction, what we do from now on is up to us, so go and do something really cool.

You may also like...