Coherence – Backing Map Filter Queries and Cache Join Queries

This post describes how to run Filter queries directly against Binary Backing Maps in Oracle Coherence. Querying a backing map is not the same as querying a NamedCache. A backing map is the internal map that Coherence uses to store data for a cache on each storage enabled member of the cluster. Direct use of backing maps is a pretty advanced Coherence topic and not something that everyone does so even if you do not use it yet in your application maybe this post will give you some ideas for where it might fit in.

This post applies specifically to Distributed Cache backing maps as the data is stored in Binary form, that is usually POF serialized (in fact you could use the techniques described with any Map that holds a Binary key and a Binary value). The code I describe below would be no use for something like a Replicated Cache or a Near Cache as they hold their data in Object form so you would not be using Filters with PofExtractors to query them.

Querying a backing map using a PofExtractor is something that is not possible out of the box with Coherence, but with a small bit of utility code is very easy and very useful. After showing you the querying code I will later describe how it can be used to create a filter which does the same as a join clause in a SQL statement to allow you to do join queries in Coherence.

Oracle Coherence has always allowed direct access to backing maps from inside various classes, for example inside an EntryProcessor, Trigger, Filter, etc… Since Coherence 3.7 accessing backing maps became much easier as they are properly exposed on the API. What makes querying easier is that 3.7 also made it much easier to access the indexes for a cache, again this was possible prior to 3.7 but the code was not very nice.

The main use of backing map access across caches is when using key association. This is where related data is co-located in the same Coherence partition and therefore on the same cluster member. This means if you are doing processing in one cache you can access the related data directly from the backing map of the associated caches as you know they will be in the same process memory.

Oracle Coherence already includes class called InvocableMapHelper that allows you to run Filter queries against any Map. InvocableMapHelper is a nice little utility class that allows you to do entrySet and keySet queries with a Filter as well as create indexes on any type of Map. The problem with InvocableMapHelper though is that it does not support using a Filter that use a PofExtractor or any Filter that expects a BinaryEntry – which is what a Filter gets passed when it normally executes against a Distributed Cache. The results of calling InvocableMapHelper.entrySet also return a set of Map.Entry instances, which are no good if you wanted to do something like run an Aggregator over the results and the Aggregator uses a PofExtractor. Of course we could just use a ReflectionExtractor or make sure we index everything we want to query on but I think the disadvantages of reflection and deserialization (i.e. a lot of garbage generation) and the fact we do not want to always index every attribute (a lot of memory usage) mean we need an alternative. There are various ways to work around this, but rather than have code a work-around on a case by case basis depending on the use case, the simplest for me would be to have an equivalent to the InvocableMapHelper methods but instead have methods that work with and return BinaryEntry instance. Let’s look at an example use case to try and explain things.

In my example below rather than write my own domain model I am going to pinch one from the Coherence book (I’m sure Aleks will not mind). In the book the examples all use a fictional Coherent Bank banking system, with customers, accounts, transactions, money etc… For the example I am going to use the Account class and Transaction class as these are key associated; that is Transactions are co-located with the corresponding Account. So, if I am processing an Account, say in an EntryProcessor or Aggregator, I can easily get the Transactions and vice-versa. If you have not read the Coherence book you really should, it is excellent; you can get it from the publisher’s web site. Although the book is titled Coherence 3.5 and we are now on 3.7, and soon no doubt Oracle will bring out the next version, the book is still very relevant to anyone building Coherence applications.

The requirement for my example is a typical group by query – I want the total value of all Transactions for today grouped by customer ID – a customer can have many accounts. The customer ID is an attribute of Account and obviously the Transaction value is an attribute of Transaction. On a database this would be a simple SQL query with a join but not so simple in Coherence. Now we could easily do this with multiple queries. We can aggregate over transactions and get a total for today by account ID. We can then get the customer ID for each of those accounts and sum the results up by customer ID. Coherence even has a GroupAggregator that makes getting the total transaction value for an account quite simple. But we don’t want to do two queries, otherwise we wouldn’t have much of an example.

So, I am running an Aggregator against Accounts and I want to sum the total of all of todays Transactions by Customer ID. Here is the aggregate method of my Aggregator.

public Object aggregate(Set entries) {
    Map<Long,BigDecimal> totals = new HashMap<Long, BigDecimal>();

    for (BinaryEntry entry : (Set<BinaryEntry>) entries) {
        Account account = (Account) entry.getValue();

        Long customerId = account.getCustomerId();
        if (!totals.containsKey(customerId)) {
            totals.put(customerId, new BigDecimal(0.0d));
        }
        BigDecimal total = totals.get(customerId);

        BackingMapContext transactionsBackingMapContext =
            entry.getContext().getBackingMapContext("transactions");

        // Find all the transactions for the account for today...
        Set<Map.Entry> transactionsForAccount = 
            getTransactionsForAccount(account.getId(), transactionsBackingMapContext);

        for (Map.Entry transactionEntry : transactionsForAccount) {
            Transaction transaction = (Transaction) transactionEntry.getValue();
            total = total.add(transaction.getAmount().getAmount());
        }
        totals.put(customerId, total);
    }
    return totals;
}

Nothing too complicated; but the important part is the call to the getTransactionsForAccount method. How does this method search the transactions backing map for transactions for today and the relevant account ID. Well we could do the brute force approach…

    private Set<BinaryEntry> getTransactionsForAccount(Long accountId, 
        BackingMapContext transactionsBackingMapContext) {

        Map transactions = transactionsBackingMapContext.getBackingMap();
        BackingMapManagerContext context = transactionsBackingMapContext.getManagerContext();
        Converter converter = context.getValueFromInternalConverter();

        long start = getStartOfToday();
        long end = getEndOfToday();

        Set<BinaryEntry> transactionsForAccount = new HashSet<BinaryEntry>();

        for (Map.Entry transactionEntry : (Set<Map.Entry>) transactions.entrySet()) {
            Binary binary = (Binary) transactionEntry.getValue();
            Transaction transaction = (Transaction) converter.convert(binary);

            long txAccountId = transaction.getId().getAccountId();
            long txTime = transaction.getTime().getTime();

            if (txAccountId == accountId && txTime >= start && txTime <= end) {
                BinaryEntry binaryEntry = new BackingMapBinaryEntry(    
                        (Binary) transactionEntry.getKey(),
                        (Binary) transactionEntry.getValue(), null, context);
                transactionsForAccount.add(binaryEntry);
            }
        }

        return transactionsForAccount;
    }

Again, nothing too complex, but a lot of work and the code iterates over the whole backing map so it is very inefficient. I know there is some deserialization in there which we could remove by using PofExtractors, but the example here is about how to avoid the brute force searching of the backing map.

So, what is the alternative; well out of the box there isn't one. What we want to be able to do is something like the InvocableMapHelper method call but one that works with BinaryEntries so we can do something like this...

Date start = getStartOfToday();
Date end = getEndOfToday();

SimplePofPath accountIdPath = new SimplePofPath(new int[]{1, 0});
PofExtractor accountIdExtractor = new PofExtractor(Long.class, accountIdPath);
EqualsFilter transactionIdFilter = new EqualsFilter(accountIdExtractor, accountId);

PofExtractor timeExtractor = new PofExtractor(Date.class, 2);
ChainedExtractor timeAsLongExtractor = new ChainedExtractor(timeExtractor, new ReflectionExtractor("getTime"));
BetweenFilter timeFilter = new BetweenFilter(timeAsLongExtractor, start.getTime(), end.getTime());

Filter filter = new AndFilter(transactionIdFilter, timeFilter);

Set<Map.Entry> transactionsForAccount = QueryHelper.INSTANCE.entrySet(transactionsBackingMapContext, filter);

We create some filters, one on Account ID and the other a BetweenFilter to get today's transactions, then we use them with a QueryHelper to query the backing map. So what does the QueryHelper look like?

The QueryHelper

The QueryHelper class is going to provide some of the same functionality as InvocableMapHelper but use BinaryEntry instances rather than the SimpleMapEntry that InvocableMapHelper uses. Here is the code...

public class QueryHelper {

    public static final QueryHelper INSTANCE = new QueryHelper();

    @SuppressWarnings({"unchecked"})
    public <T> Set<T> query(BackingMapContext backingMapContext, Filter filter, boolean shouldReturnEntries,
                            boolean shouldSort, Comparator comparator) {

        Map backingMap = backingMapContext.getBackingMap();
        Map indexMap = backingMapContext.getIndexMap();

        boolean matchAll = AlwaysFilter.INSTANCE.equals(filter);

        Filter remainingFilter = null;
        Object[] results;
        if (matchAll || indexMap == null || !(filter instanceof IndexAwareFilter)) {
            results = backingMap.keySet().toArray();
        } else {
            Set filteredKeys = new SubSet(backingMap.keySet());
            try {
                remainingFilter = ((IndexAwareFilter)filter).applyIndex(indexMap, filteredKeys);
            } catch (ConcurrentModificationException e) {
                filteredKeys = new SubSet(new ImmutableArrayList(backingMap.keySet().toArray()));
                remainingFilter = ((IndexAwareFilter)filter).applyIndex(indexMap, filteredKeys);
            }
            results = filteredKeys.toArray();
            matchAll = (remainingFilter == null);
        }

        int numberOfResults = 0;
        if (matchAll && !shouldReturnEntries) {
            numberOfResults = results.length;
        } else {
            for (Object key : results) {
                Object value = backingMap.get(key);
                if (value == null && !backingMap.containsKey(key)) {
                    continue;
                }
                Map.Entry entry = new QueryBinaryEntry((Binary)key, (Binary)value, null, backingMapContext);
                 if (matchAll || InvocableMapHelper.evaluateEntry(remainingFilter, entry)) {
                    results[numberOfResults++] = shouldReturnEntries ? entry : key;
                }
            }
        }

        boolean isLimitFilter = filter instanceof LimitFilter;

        if (isLimitFilter || (shouldReturnEntries && shouldSort)) {
            if (numberOfResults < results.length) {
                Object[] copy = new Object[numberOfResults];
                System.arraycopy(results, 0, copy, 0, numberOfResults);
                results = copy;
            }

            if (shouldReturnEntries && shouldSort) {
                if (comparator == null) {
                    comparator = SafeComparator.INSTANCE;
                }
                Arrays.sort(results, new EntryComparator(comparator));
            } else if (shouldSort) {
                Arrays.sort(results, comparator);
            }

            if (isLimitFilter) {
                LimitFilter limitFilter = (LimitFilter)filter;
                limitFilter.setComparator(null);
                results = limitFilter.extractPage(results);
                numberOfResults = results.length;
                limitFilter.setComparator(comparator);
            }
        }

        return new ImmutableArrayList(results, 0, numberOfResults).getSet();
    }

    public <T> Set<T> keySet(BinaryEntry entry, String cacheName, Filter filter) {
        return keySet(entry, cacheName, filter, false, null);
    }

    public <T> Set<T> keySet(BinaryEntry entry, String cacheName, Filter filter, 
                             boolean shouldSort, Comparator comparator) {
        BackingMapContext backingMapContext = entry.getContext().getBackingMapContext(cacheName);
        return query(backingMapContext, filter, false, shouldSort, comparator);
    }

    public Set entrySet(BinaryEntry entry, String cacheName, Filter filter) {
        return entrySet(entry, cacheName, filter, false, null);
    }

    public Set entrySet(BinaryEntry entry, String cacheName, Filter filter, 
                        boolean shouldSort, Comparator comparator) {
        BackingMapContext backingMapContext = entry.getContext().getBackingMapContext(cacheName);
        return query(backingMapContext, filter, true, shouldSort, comparator);
    }

    public <T> Set<T> entrySet(BackingMapContext backingMapContext, Filter filter) {
        return query(backingMapContext, filter, true, false, null);
    }

    private class QueryBinaryEntry extends BackingMapBinaryEntry {
        private BackingMapContext backingMapContext;

        private QueryBinaryEntry(Binary key, Binary value, 
                                 Binary originalValue, BackingMapContext backingMapContext) {
            super(key, value, originalValue, backingMapContext.getManagerContext());
            this.backingMapContext = backingMapContext;
        }

        @Override
        public ObservableMap getBackingMap() {
            return backingMapContext.getBackingMap();
        }

        @Override
        public BackingMapContext getBackingMapContext() {
            return backingMapContext;
        }

        @Override
        public boolean isReadOnly() {
            return true;
        }

        @Override
        public boolean isPresent() {
            return true;
        }

    }
}

The bulk of the work is done in the query method. The code checks to see if we have an AlwaysFilter, if not then it applies any indexes. After that the filter is evaluated by creating a BinaryEntry for each entry to be evaluated. Finally we apply any LimitFilter.

So that is it, we now have a method that allows us to query a backing map directly using Filters that have ValuExtractors that expect a BinaryEntry instance. The results of the entrySet methods will return a Set containing BinaryEntry instances.

The QueryHelper can be used in a number of places where we can get a backing map (or any other Map of Binary key and Binary value), this could be Aggregators, as above, Filters, Invocables, Entry Processors, Triggers etc...

Join Filters

Now as promised I am going to use the QueryHelper above to create a Filter that can replicate the action of a join in a SQL Where clause.

Using the same Coherent Bank domain as above say I want to get all the Account instances that have had a deposit in GBP. Again this would be an easy DB SQL query but requires two caches in Coherence. Just like previoulsy we could do this in two calls, get the distinct list of account IDs from the Transaction cache then get the Account instances with a getAll, but this would be two network calls and again I wouldn't have much of an example to show. I also could use an aggregator like the one we wrote ablove but this example is about just using a Filter.

Of course, we cannot use a normal Coherence Filter as a normal filter only executes against a single cache – what we need is a JoinFilter

public class JoinFilter implements EntryFilter, PortableObject {

    private String cacheName;
    private AndFilter filter;
    private QueryHelper queryHelper = QueryHelper.INSTANCE;

    public JoinFilter() {

    }

    public JoinFilter(Filter criteria, String cacheName, Join... joins) {
        if (joins.length < 1) {
            throw new IllegalArgumentException("There must be at least one join parameter");
        }
        this.cacheName = cacheName;
        this.filter = new AndFilter(new AllFilter(joins), criteria);
    }

    @Override
    public boolean evaluateEntry(Map.Entry entry) {
        AllFilter allFilter = (AllFilter) filter.getFilters()[0];
        Filter[] joins = allFilter.getFilters();
        for (Filter join : joins) {
            ((Join)join).setValue(entry);
        }
        return !queryHelper.keySet((BinaryEntry)entry, cacheName, filter).isEmpty();
    }

    public Filter getFilter() {
        return filter;
    }

    @Override
    public boolean evaluate(Object o) {
        return false;
    }

    public void setQueryHelper(QueryHelper queryHelper) {
        this.queryHelper = queryHelper;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }

        JoinFilter that = (JoinFilter) o;

        if (!cacheName.equals(that.cacheName)) {
            return false;
        }
        if (!filter.equals(that.filter)) {
            return false;
        }

        return true;
    }

    @Override
    public String toString() {
        return "JoinFilter (cacheName=" + cacheName + " join=" + filter + ")";
    }

    @Override
    public int hashCode() {
        int result = cacheName.hashCode();
        result = 31 * result + filter.hashCode();
        return result;
    }

    @SuppressWarnings({"unchecked"})
    @Override
    public void readExternal(PofReader pofReader) throws IOException {
        cacheName = pofReader.readString(1);
        filter = (AndFilter) pofReader.readObject(2);
    }

    @Override
    public void writeExternal(PofWriter pofWriter) throws IOException {
        pofWriter.writeString(1, cacheName);
        pofWriter.writeObject(2, filter);
    }

}

The JoinFilter class above takes three constructor parameters, The first is the criteria (a Filter) to apply to the cache being joined to, the second is the name of the cache being joined to and the third is one or more Join instances that describe how the left hand side cache (or domain class) of the query is joined to the right hand side cache (or domain class).
The Join class is another simple class

public class Join extends EqualsFilter implements PortableObject {
    private ValueExtractor left;

    public Join() {
    }

    public Join(ValueExtractor left, ValueExtractor right) {
        super(right, null);
        this.left = left;
    }

    public ValueExtractor getLeftValueExtractor() {
        return left;
    }

    public ValueExtractor getRightValueExtractor() {
        return getValueExtractor();
    }

    public void setValue(Map.Entry entry) {
        m_oValue = InvocableMapHelper.extractFromEntry(left, entry);
    }

    @Override
    public void readExternal(PofReader pofReader) throws IOException {
        super.readExternal(pofReader);
        left = (ValueExtractor) pofReader.readObject(2);
    }

    @Override
    public void writeExternal(PofWriter pofWriter) throws IOException {
        super.writeExternal(pofWriter);
        pofWriter.writeObject(2, left);
    }

    public String toString()
    {
      return "Join(left=" + left + " right=" + getValueExtractor() + ')';
    }

    public boolean equals(Object o)
    {
      if ((o instanceof Join))
      {
        Join that = (Join)o;
        return (getClass() == that.getClass())
                && (equals(this.left, that.left))
                && (equals(this.m_extractor, that.getValueExtractor()))
                && (equals(this.m_oValue, that.getValue()));
      }
      return false;
    }

    public int hashCode()
    {
      return hashCode(left) + hashCode(this.m_extractor) + hashCode(this.m_oValue);
    }
}

You will see the Join class is a special sub class of the Coherence EqualsFilter that takes two ValueExtractor instances as parameters. The first extractor is to be applied to the left side cache (in our example the Accounts cache) the second will be applied to the right side cache being joined to (in our example the Transactions cache). A join is nothing more than an equality check on attributes from two tables in SQL or caches in Coherence so it makes sense that it is a spcialisation of EqulasFilter. Here is the code to use the above classes to perform our query, which was get Accounts that have had a transaction of type Deposit in GBP.

// Create the left (Account m_id field) side join extractor
PofExtractor accountIdExtractor = new PofExtractor(Long.class, 0);

// Create the right (Transaction m_id.m_accountId field) side join extractor
SimplePofPath transactionAccountIdPath = new SimplePofPath(new int[]{0, 0});
PofExtractor transactionAccountIdExtractor = new PofExtractor(Long.class, transactionAccountIdPath);

// Create the Join
Join join = new Join(accountIdExtractor, transactionAccountIdExtractor);

// Create the first part of criteria (transaction type is Deposit - Transaction m_type field)
PofExtractor transactionTypeExtractor = new PofExtractor(null, 1);
Filter isDeposit = new EqualsFilter(transactionTypeExtractor, TransactionType.DEPOSIT);

// Create the second part of criteria (transaction is in GBP - Transaction m_amount.m_currency.currencyCode field)
SimplePofPath currencyCodePath = new SimplePofPath(new int[]{4,2,0});
PofExtractor currencyCodeExtractor = new PofExtractor(null, currencyCodePath);
Filter isGBP = new EqualsFilter(currencyCodeExtractor, "GBP");

// And the two criteria to create the criteria Filter
Filter criteria = new AndFilter(isDeposit, isGBP);

// Create the JoinFilter - joining to transactions
Filter joinFilter = new JoinFilter(criteria, "transactions", join);

// Execute the query
Set<BinaryEntry> entries = accounts.entrySet(joinFilter);

Using the code above we now have a very flexible way of creating Filters that effectively have joins in the where clauses for caches that are key-associated. I could even nest them in the criteria paramter to hop further along a chain of joins if I had multiple key associated caches.

The fact that the JoinFilter is a normal Coherence Filter means we can use it anywhere we would normally be able to use a Filter, keySet, entrySet, aggregate, invokeAll etc. Anyone interested in joins with Coherence should check out my colleague Ben Stopford's blog www.benstopford.com

You may also like...

11 Responses

  1. And they said distributed joins are difficult to do (sarcasm). The amount code required for this is scary. GridGain handles joins with a lot less code (I think).

  2. jk says:

    Hi Ashwin,

    The code above is not about distributed joins. The post started off being about how to query a Coherence backing map that contains Binary data using Filters that require a BinaryEntry. Off the back of that was some code on how to use that to do a join Filter – but it is a very restrictive use case where the caches are already using Key Association.

    In theory distributed joins are not actually that hard, they are just not very efficient unless done carefully. From what I can see the only way to implement a true distributed join has to require some sort of map reduce type processing. Nodes are only going to be able to return partial results sets from parts of the query using the data they have. I cannot see any other way than splitting the query into multiple smaller queries to obtain sub-sets of data that are then further reduced. These sub-sets would need to all be fetched back to one place and the final join and reduction done on that node. Which is sort of how we do joins with aggregators in the project I work on.

    Personally I don’t think the code above is complex. I wrote it in a couple of hours, including a full set of unit tests. I know GridGain provides a way of writing a query using SQL which includes joins, so I agree, the interface is very simple. But that is probably hiding you from the complex code underneath that is working out how to run and join the query. I have not really looked at GridGain so I couldn’t comment on how well a distributed join query scales very large caches. Having been through a few iterations of the query API we have built into our Coherence application making queries fast and scalable, especially queries that do joins, takes some effort.

  3. Henry says:

    Hi JK,

    I saw below comments you did on Ben’s blog:

    “Coherence has a utility class called InvocableMapHelper which can run Filter queries agains any Map. Note though that when running a query against a backing map you will get back a Set of Map.Entry instances that contain the Binary key and value, if you want then as proper objects you would need to convert them. This is not really going to be any different than iterating over the backing map yourself, it is just done in a single method call but…”

    So how the above QueryHelper perform better than InvocableMapHelper? Can InvocableMapHelper used for querying backing map as below?

    public Set queryBackingMap(String nameOfCacheToSearch, Filter filter, BinaryEntry entry) {
    Map backingMapToSearch = entry.getContext().getBackingMap(nameOfCacheToSearch);
    Map indexMap = entry.getBackingMapContext().getIndexMap();
    return InvocableMapHelper.query(backingMapToSearch, indexMap, filter, true, false, null);
    }

    • jk says:

      Hi Henry,

      You can indeed use InvocableMapHelper to query any Map, but it will only work with certain types of Filter; or more accurately it will only work with Filters using certain types of ValueExtractor. If you use Filters and ValueExtractors that are reflection based then it will work. Most people I work with use POF as it is more efficient and POF extractors will not work with InvocableMapHelper as the need to work with a BinaryEntry.

      So… the code in you comment above will work depending on the type of filter/value extractor used.

      JK

      • Phil says:

        InvokableMapHelper *can* work with POF extractors, if you’ve built a POF-based index that exactly matches the query. It will evaluate the query using the index, (but fail horribly if there’s no such index).

  4. Henry says:

    Hi JK,

    Thanks for prompt response. I am working on a securities trading system that we need to update multiple child caches inside a EntryProcessor using backing map, which is invoked against the parent cache for providing concurrency control.

    I realized that the change to parent cache will be rolled back by coherence automatically if any exception thrown inside EntryProcessor, but not the changes on those child caches made through backing map. I wonder should we really worry about exception in this case as it is just updating local JVM memory (ignoring extreme case such as sudden crash of JVM)?

    If exception should be considered, what I can think is to keep before-image at the start of EntryProcessor and rollback by coding if exception occur. Is there any better idea?

    Thanks a lot.

  5. Charlie says:

    Very nice post JK!!

    One little detail though that could be important to note. In Coherence the indexes and backing maps are updated at separate times. That is the update happens to the backing map and then each index is modified without blocking access for other threads. Which means that you may get results back from applyIndex that no longer matches the filter. In coherence there compensation logic taking care of this precise scenario, which wouldn’t apply if you would use applyIndex manually. This of course only applies when you are using worker threads, or a backing map with some kind of expiry/eviction or configured refresh-ahead.

  6. Henry says:

    Hi JK,

    As far as I understand Near cache is composed with front as local cache and back as distributed cache, then why you said that we cannot apply the same approach, using backing map to manipulate a Near cache? Is that my misunderstanding? Thanks.

    Henry

  1. July 23, 2012

    […] JK’s post on Join filters here (much better than mine […]

Leave a Reply to Henry Cancel reply

Your email address will not be published. Required fields are marked *