Coherence – Alternative putAll

The Oracle Coherence putAll method is often used as a more efficient way to perform bulk updates of a Coherence cache, and is actually more efficient for single updates too. This is something that most Oracle Coherence developers find out within their first week or two of using the product but putAll has some drawbacks particularly if using triggers or cache stores. This blog post describes an alternative approach to the Oracle Coherence putAll method that works around some of the limitations of the standard putAll. The concepts were covered in a recent post by a colleague of mine, Ben Stopford, on his blog here at www.benstopford.com. Ben covered why we needed a new approach to putAll in Oracle Coherence and a high level overview of the approach we used to implementing it.

The problem with using putAll is that it is not safe to use if you might throw and exception when updating a cache – this could be from something like a write-through CacheStore or a Trigger. As Oracle Coherence is a distributed system the data being put in a putAll call is distributed to many cluster members who all just do their own thing regarding the updates. If any of the updates throws an exception this will be returned to the client. If there are multiple exceptions then the client will only receive one of them. On receiving an exception the client cannot be certain how many keys from the putAll failed and how many entries have actually been updated. The cache is now in an unknown state and this makes recovery from the exception very difficult. Worse, if you were using write-through to update your database, then this is now in an unknown state too.

The project I am currently working on required a more reliable equivalent to the Coherence built in putAll; that is, we needed to know all the entries that failed. We recognised that some entries may fail during putAll as we used triggers for validation, but as long as we knew all of the entries that failed then we could take corrective action and know that the caches would be in a consistent state. On top of being more reliable the implementation needed to be as fast as possible. It took a couple of iterations of the design but in the end we came up with a workable approach, that I will describe below.

First a brief explanation of how the alternative approach works. I am assuming that you already have a basic understanding of Coherence and concepts such as distributed caches, partitions, invocation services, entry processors etc. To implement putAll in Coherence, these are the steps we need to take

  1. Take the Map of key/value pairs we want put into the cache and split the map entries into sets of entries by owning cluster member. In Coherence each key is associated to one of the partitions of a distributed cache and each member of the cluster owns a number of partitions.
  2. Send each set of entries to the relevant member.
  3. On the member iterate over the entries putting them into the cache. As all the entries are owned by this member, these will all be local updates.
  4. For each entry that we put locally catch any exception returned for any failed entries.
  5. After all the entries have been put into the cache return the map of exceptions to the client.

The Basic Code

That all sounds pretty simple so on with some code. The majority of the code in this article is Java code, as you would expect as the bulk of it will run on the server side. It is possible to use the code from other types of Coherence*Extend client, C# or C++, which we will cover at the end. The code I am going to describe first will execute from a Coherence*Extend client, we will talk about making it run from a cluster member later in the article.

The first time this approach implemented was for Coherence 3.5 and the project is now on 3.7 and the code has not required any changes – although we have enhanced the code from what I describe here to allow for particular requirements of some of our use-cases. We will take an iterative approach for the rest of the article and start with the most basic code required to do the job then optimise and enhance it later. Hopefully this will give you a bit of insight into how we usually approach this sort of requirement.

To implement the steps above the easiest thing to use is an InvocationService. To run from an Extend client we need two Invocation Services configured, the Remote Invocation Service on the client and obviously the corresponding Proxy Service on the Extend Proxy and then a standard Invocation Service that runs on all the cluster members.

So, for step one, we need an Invocable that will be executed from the client and takes our entries that will be updated, lets call this new class PutAll. Obviously PutAll needs to know what cache to update so we will add a constructor parameter for the cache name. Just like the normal putAll method on a cache we want to be able to provide a Map of values to be put, so we will also add a constructor parameter for the map. Not forgetting also that we need to make this a PortableObject and include an empty constructor for POF to work.

  1. public class PutAll extends AbstractInvocable implements PortableObject {
  2.  
  3.     private String cacheName;
  4.     private Map<Object,Object> values;
  5.  
  6.     public PutAll() {
  7.     }
  8.  
  9.     public PutAll(String cacheName, Map<Object, Object> values) {
  10.         this.cacheName = cacheName;
  11.         this.values = values;
  12.     }
  13.  
  14.     @Override
  15.     public void run() {
  16.     }
  17.  
  18.     @Override
  19.     public void readExternal(PofReader pofReader) throws IOException {
  20.         cacheName = pofReader.readString(1);
  21.         values = pofReader.readMap(2, new HashMap());
  22.     }
  23.  
  24.     @Override
  25.     public void writeExternal(PofWriter pofWriter) throws IOException {
  26.         pofWriter.writeString(1, cacheName);
  27.         pofWriter.writeMap(2, values);
  28.     }
  29. }
public class PutAll extends AbstractInvocable implements PortableObject {

    private String cacheName;
    private Map<Object,Object> values;

    public PutAll() {
    }

    public PutAll(String cacheName, Map<Object, Object> values) {
        this.cacheName = cacheName;
        this.values = values;
    }

    @Override
    public void run() {
    }

    @Override
    public void readExternal(PofReader pofReader) throws IOException {
        cacheName = pofReader.readString(1);
        values = pofReader.readMap(2, new HashMap());
    }

    @Override
    public void writeExternal(PofWriter pofWriter) throws IOException {
        pofWriter.writeString(1, cacheName);
        pofWriter.writeMap(2, values);
    }
}

That is the easy bit done, now to implement the run() method. Lets recap what we need to do in this method.

  1. Split the Map of values into values owned by different cluster Members
  2. Send each set of entries to the relevant Member using another Invocable

For the first part we can use the fact that we know the run method will be executing on a cluster member – in this case the invocation service on the Extend proxy’s proxy service. This means we can get a reference to the cache we will be updating and use the CacheService for that cache to tell us who owns what keys. It only makes sense to be using this type of putAll on a distributed cache so we know the cache service will be an instance of PartitionedService.

The run method with the first step implemented looks like this.

  1. @Override
  2. public void run() {
  3.     NamedCache cache = CacheFactory.getCache(cacheName);
  4.     PartitionedService cacheService = (PartitionedService) cache.getCacheService();
  5.  
  6.     Map<Member,Map<Object,Object>> valuesByMember = new HashMap<Member,Map<Object,Object>>();
  7.     for (Map.Entry entry : values.entrySet()) {
  8.         Object key = entry.getKey();
  9.         Member member = cacheService.getKeyOwner(key);
  10.         if (!valuesByMember.containsKey(member)) {
  11.             valuesByMember.put(member, new HashMap<Object,Object>());
  12.         }
  13.         valuesByMember.get(member).put(key, entry.getValue());
  14.     }
  15. }
@Override
public void run() {
    NamedCache cache = CacheFactory.getCache(cacheName);
    PartitionedService cacheService = (PartitionedService) cache.getCacheService();

    Map<Member,Map<Object,Object>> valuesByMember = new HashMap<Member,Map<Object,Object>>();
    for (Map.Entry entry : values.entrySet()) {
        Object key = entry.getKey();
        Member member = cacheService.getKeyOwner(key);
        if (!valuesByMember.containsKey(member)) {
            valuesByMember.put(member, new HashMap<Object,Object>());
        }
        valuesByMember.get(member).put(key, entry.getValue());
    }
}

We iterate over the values and use PartitionedService getKeyOwnwer() method to give us the owning Member.

We now have a Map of values to send to each Member of the cluster so we now need to send them. This is where we use the second Invocable and the InvocationService that runs on all the cluster members. We will call the second Invocable PutAllForMember and as constructor parameters it will take the name of the cache to update and the values for that Member.

Here is the skeleton code for the PutAllForMember class. Again it is an Invocable and implements PortableObject

  1. public class PutAllForMember extends AbstractInvocable implements PortableObject {
  2.  
  3.     private String cacheName;
  4.     private Map<Object,Object> values;
  5.  
  6.     public PutAllForMember() {
  7.     }
  8.  
  9.     public PutAllForMember(String cacheName, Map<Object, Object> values) {
  10.         this.cacheName = cacheName;
  11.         this.values = values;
  12.     }
  13.  
  14.     @Override
  15.     public void run() {
  16.     }
  17.  
  18.     @Override
  19.     public void readExternal(PofReader pofReader) throws IOException {
  20.         cacheName = pofReader.readString(1);
  21.         values = pofReader.readMap(2, new HashMap());
  22.     }
  23.  
  24.     @Override
  25.     public void writeExternal(PofWriter pofWriter) throws IOException {
  26.         pofWriter.writeString(1, cacheName);
  27.         pofWriter.writeMap(2, values);
  28.     }
  29. }
public class PutAllForMember extends AbstractInvocable implements PortableObject {

    private String cacheName;
    private Map<Object,Object> values;

    public PutAllForMember() {
    }

    public PutAllForMember(String cacheName, Map<Object, Object> values) {
        this.cacheName = cacheName;
        this.values = values;
    }

    @Override
    public void run() {
    }

    @Override
    public void readExternal(PofReader pofReader) throws IOException {
        cacheName = pofReader.readString(1);
        values = pofReader.readMap(2, new HashMap());
    }

    @Override
    public void writeExternal(PofWriter pofWriter) throws IOException {
        pofWriter.writeString(1, cacheName);
        pofWriter.writeMap(2, values);
    }
}

Now we have to implement the actual code to do the local update. As we have already said, the run method needs to basically iterate over the values updating the cache with each one and catching any exceptions. It then returns as its result a map of any exceptions thrown.

  1. @Override
  2. public void run() {
  3.     Map<Object,Throwable> errors = new HashMap<Object,Throwable>();
  4.     NamedCache cache = CacheFactory.getCache(cacheName);
  5.     for (Map.Entry entry : values.entrySet()) {
  6.         try {
  7.             cache.put(entry.getKey(), entry.getValue());
  8.         } catch (Throwable t) {
  9.             errors.put(entry.getKey(), t);
  10.         }
  11.     }
  12.     setResult(errors);
  13. }
@Override
public void run() {
    Map<Object,Throwable> errors = new HashMap<Object,Throwable>();
    NamedCache cache = CacheFactory.getCache(cacheName);
    for (Map.Entry entry : values.entrySet()) {
        try {
            cache.put(entry.getKey(), entry.getValue());
        } catch (Throwable t) {
            errors.put(entry.getKey(), t);
        }
    }
    setResult(errors);
}

Now that we have the Invocable to do the local update we can go back and finish the PutAll.run() method.

  1. @Override
  2. public void run() {
  3.     NamedCache cache = CacheFactory.getCache(cacheName);
  4.     DistributedCacheService cacheService = (DistributedCacheService) cache.getCacheService();
  5.  
  6.     Map<Member,Map<Object,Object>> valuesByMember = new HashMap<Member,Map<Object,Object>>();
  7.     for (Map.Entry entry : values.entrySet()) {
  8.         Object key = entry.getKey();
  9.         Member member = cacheService.getKeyOwner(key);
  10.         if (!valuesByMember.containsKey(member)) {
  11.             valuesByMember.put(member, new HashMap<Object,Object>());
  12.         }
  13.         valuesByMember.get(member).put(key, entry.getValue());
  14.     }
  15.  
  16.     Map<Object,Throwable> results = new HashMap<Object,Throwable>();
  17.     InvocationService service = (InvocationService) CacheFactory.getService("InvocationService");
  18.     for (Map.Entry<Member,Map<Object,Object>> entry : valuesByMember.entrySet()) {
  19.         PutAllForMember putAllForMember = new PutAllForMember(cacheName, entry.getValue());
  20.         Map<Member,Map<Object,Throwable>> results = service.query(putAllForMember, Collections.singleton(entry.getKey()));
  21.         results.putAll(results.get(entry.getValue());
  22.     }
  23.  
  24.     setResult(results);
  25. }
@Override
public void run() {
    NamedCache cache = CacheFactory.getCache(cacheName);
    DistributedCacheService cacheService = (DistributedCacheService) cache.getCacheService();

    Map<Member,Map<Object,Object>> valuesByMember = new HashMap<Member,Map<Object,Object>>();
    for (Map.Entry entry : values.entrySet()) {
        Object key = entry.getKey();
        Member member = cacheService.getKeyOwner(key);
        if (!valuesByMember.containsKey(member)) {
            valuesByMember.put(member, new HashMap<Object,Object>());
        }
        valuesByMember.get(member).put(key, entry.getValue());
    }

    Map<Object,Throwable> results = new HashMap<Object,Throwable>();
    InvocationService service = (InvocationService) CacheFactory.getService("InvocationService");
    for (Map.Entry<Member,Map<Object,Object>> entry : valuesByMember.entrySet()) {
        PutAllForMember putAllForMember = new PutAllForMember(cacheName, entry.getValue());
        Map<Member,Map<Object,Throwable>> results = service.query(putAllForMember, Collections.singleton(entry.getKey()));
        results.putAll(results.get(entry.getValue());
    }

    setResult(results);
}

The new lines (16 – 24) iterate over the map of values for each member and call the PutAllForMember Invocable for each member using the Invocation Service query method. The results returned from each member are added to a results map and finally set as the results of the PutAll Invocable. The Invocation Service query method returns its results in a Map where they key is the member and the value the results from that member. In our case we only executed on a single member so the results map should have a single key and the map of errors as its value.

Line 18 is where we get the Invocation Service to use to execute the PutAllForMember invocable on each storage node. In the code above the name of the service is hard coded as “InvocationService”, but this could easily be made a constant somewhere or passed as another constructor parameter and serialised like we do with the cache name. Obviously there has to be an invocation service with the correct name on all the members.

That is the basic code completed. Using the new PutAll is straight forward too; on a Coherence*Extend client that has an Invocation service called “remote-invocation-service” where we wanted to update the cache called “dist-test” we would do the following…

  1. Map<Object,Object> values = new HashMap<Object,Object>();
  2. // --- Populate the values map with the key/value pairs to put ---
  3. PutAll putAll = new PutAll("dist-test", values);
  4. InvocationService service = (InvocationService) CacheFactory.getService("remote-invocation-service");
  5. Map<Member,Map<Object,Throwable>> results = service.query(putAll, null);
  6. Map<Object,Throwable> errors = results.values().iterator().next();
  7. // --- check the errors map for any failures ---
Map<Object,Object> values = new HashMap<Object,Object>();
// --- Populate the values map with the key/value pairs to put ---
PutAll putAll = new PutAll("dist-test", values);
InvocationService service = (InvocationService) CacheFactory.getService("remote-invocation-service");
Map<Member,Map<Object,Throwable>> results = service.query(putAll, null);
Map<Object,Throwable> errors = results.values().iterator().next();
// --- check the errors map for any failures ---

So after some testing we see that it does indeed work. I will talk about patterns for testing Coherence code in another blog post. The only problem with the code above is that it is too slow!

Optimisations

Although the basic code described above will work it is far from optimal and one of the reasons we want to use putAll in the first place rather than a normal put is that it is more efficient.

If you build the code above and run it you will find that it is a little slow. A basic performance test using the standard Coherence putAll to put 1000 key/value pairs into a cache took around 170ms. These timings were taken on a simple three node cluster running on my MacBook Pro so they are not exhaustive tests, but we are only interested in relative performance at this point. Using the same test harness to test the code above showed an average time to put the same values of 700ms – so not really good enough!

Asynchronous Invocations

The first optimisation we can make is in the PutAll run() method. At the moment it iterates over the valuesByMember map and submits each PutAllForMember Invocable one at a time, waiting for the results to come back before submitting to the next Member. Coherence allows us to asynchronously submit calls to an InvocationService using the execute() method and be called back as each invocation returns; this would be much more efficient for our use-case as we could submit all the PutAllForMember  Invocables at once.

To allow us to use the asynchronous invocation method we need to write another class to receive these callbacks, this is an implementation of InvocationObserver. This class has a number of things to do

  • Listen to the results of each invocation – these could be successful completion, or errors due to and execution error, or errors due to the invoking member leaving.
  • Collect and results returned from each invocation
  • Have a blocking method that waits until all of the invocables have completed before returning the full results set.

Here is the basic code for our InvocationObserver, lets call it PutAllObserver.

  1. public class PutAllObserver implements InvocationObserver {
  2.  
  3.     private Map<Member,Map<Object,Object>> membersAndValues;
  4.     private Map<Object,Throwable> results;
  5.     private volatile int actualCount = 0;
  6.  
  7.     public PutAllObserver(Map<Member,Map<Object,Object>> membersAndValues) {
  8.         this.membersAndValues = membersAndValues;
  9.         this.results = new HashMap<Object,Throwable>();
  10.     }
  11.  
  12.     public synchronized Map<Object,Throwable> getResults() {
  13.     }
  14.  
  15.     @Override
  16.     public synchronized void memberCompleted(Member member, Object memberResult) {
  17.     }
  18.  
  19.     @Override
  20.     public synchronized void memberFailed(Member member, Throwable throwable) {
  21.     }
  22.  
  23.     @Override
  24.     public synchronized void memberLeft(Member member) {
  25.     }
  26.  
  27.     @Override
  28.     public synchronized void invocationCompleted() {
  29.     }
  30. }
public class PutAllObserver implements InvocationObserver {

    private Map<Member,Map<Object,Object>> membersAndValues;
    private Map<Object,Throwable> results;
    private volatile int actualCount = 0;

    public PutAllObserver(Map<Member,Map<Object,Object>> membersAndValues) {
        this.membersAndValues = membersAndValues;
        this.results = new HashMap<Object,Throwable>();
    }

    public synchronized Map<Object,Throwable> getResults() {
    }

    @Override
    public synchronized void memberCompleted(Member member, Object memberResult) {
    }

    @Override
    public synchronized void memberFailed(Member member, Throwable throwable) {
    }

    @Override
    public synchronized void memberLeft(Member member) {
    }

    @Override
    public synchronized void invocationCompleted() {
    }
}

We pass into the constructor the Map of members and their key/value pairs that we have built up in the PutAll invocable’s run() method. This allows the PutAllObserver to know how many invocation results to expect (one for each member in the key set of the map). In the constructor we also create an empty map to hold all of the results that will be returned from the invocations and we have a count field to keep track of how many invocations have completed.

We have synchronized all of the methods as we may be called by more than one thread as results come in so we want to be thread safe.

We have also added the getResults() method which will be called by the PutAll class and will block until all the invocables are finished.

Now we can add the code for each of the methods, we will start with the blocking getResults() method.

  1. public synchronized Map<Object,Throwable> getResults() {
  2.     while(actualCount < membersAndValues.size()) {
  3.         try {
  4.             this.wait();
  5.         } catch (InterruptedException e) {
  6.             Thread.currentThread().interrupt();
  7.         }
  8.     }
  9.     return results;
  10. }
public synchronized Map<Object,Throwable> getResults() {
    while(actualCount < membersAndValues.size()) {
        try {
            this.wait();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    return results;
}

This is a pretty easy method. All it needs to do is check the actualCount to see if it is less than the size of the membersAndValues map, as this is the number of invocation results we expect to receive. While the count is less than expected we just wait until notified, or until interrupted. Once the count reaches the required number we just return the results map. The notification that gets us out of the wait will come from the other methods in the class each time a result is received.

The next method is the memberCompleted() method, which signifies the invocation completed successfully.

  1. @Override
  2. public synchronized void memberCompleted(Member member, Object memberResult) {
  3.     results.putAll((Map<Object,Throwable>) memberResult);
  4.     actualCount++;
  5.     this.notifyAll();
  6. }
@Override
public synchronized void memberCompleted(Member member, Object memberResult) {
    results.putAll((Map<Object,Throwable>) memberResult);
    actualCount++;
    this.notifyAll();
}

All we do in this method is update the results map with the results from the invocable. This will be the second parameter to the method. We then increment the count and call notify to wake up the getResults method.

The next method is the memberFailed() method which will be called if the invocable fails to execute.

  1. @Override
  2. public synchronized void memberFailed(Member member, Throwable throwable) {
  3.     for (Object key : membersAndValues.get(member).keySet()) {
  4.         results.put(key, throwable);
  5.     }
  6.     actualCount++;
  7.     this.notifyAll();
  8. }
@Override
public synchronized void memberFailed(Member member, Throwable throwable) {
    for (Object key : membersAndValues.get(member).keySet()) {
        results.put(key, throwable);
    }
    actualCount++;
    this.notifyAll();
}

The first parameter to this method is the failed Member and the second parameter is the cause of the failure, so in this case we add an entry to the results map for each key for that member and use the cause as the value. We then increment the count and wake up the getResults method.

Finally the memberLeft method.

  1. @Override
  2. public synchronized void memberLeft(Member member) {
  3.     Throwable throwable = new PutAllMemberLeftException(member);
  4.     for (Object key : membersAndValues.get(member).keySet()) {
  5.         results.put(key, throwable);
  6.     }
  7.     actualCount++;
  8.     this.notifyAll();
  9. }
@Override
public synchronized void memberLeft(Member member) {
    Throwable throwable = new PutAllMemberLeftException(member);
    for (Object key : membersAndValues.get(member).keySet()) {
        results.put(key, throwable);
    }
    actualCount++;
    this.notifyAll();
}

This signifies that the member executing the invocable has left the cluster, so the invocable may not have executed. As with the memberFailed method we create an error for each key for the failed member, increment the count and wake up the getResults method

We now have an InvocationObserver that we can use when we make asynchronous invocable calls so we can add this to the PutAll run method.

  1. @Override
  2. public void run() {
  3.     NamedCache cache = CacheFactory().getCache(cacheName);
  4.     DistributedCacheService cacheService = (DistributedCacheService) cache.getCacheService();
  5.  
  6.     Map<Member,Map<Object,Object>> valuesByMember = new HashMap<Member,Map<Object,Object>>();
  7.     for (Map.Entry entry : values.entrySet()) {
  8.         Object key = entry.getKey();
  9.         Member member = cacheService.getKeyOwner(key);
  10.         if (!valuesByMember.containsKey(member)) {
  11.             valuesByMember.put(member, new HashMap<Object,Object>());
  12.         }
  13.         valuesByMember.get(member).put(key, entry.getValue());
  14.     }
  15.  
  16.     InvocationService service = (InvocationService) CacheFactory.getService("InvocationService");
  17.     PutAllObserver observer = new PutAllObserver(valuesByMember);
  18.     for (Map.Entry<Member,Map<Object,Object>> entry : valuesByMember.entrySet()) {
  19.         PutAllForMember putAllForMember = new PutAllForMember(cacheName, entry.getValue());
  20.         service.execute(putAllForMember, Collections.singleton(entry.getKey()), observer);
  21.     }
  22.  
  23.     Map<Object, Throwable> results = observer.getResults();
  24.     setResult(results);
  25. }
@Override
public void run() {
    NamedCache cache = CacheFactory().getCache(cacheName);
    DistributedCacheService cacheService = (DistributedCacheService) cache.getCacheService();

    Map<Member,Map<Object,Object>> valuesByMember = new HashMap<Member,Map<Object,Object>>();
    for (Map.Entry entry : values.entrySet()) {
        Object key = entry.getKey();
        Member member = cacheService.getKeyOwner(key);
        if (!valuesByMember.containsKey(member)) {
            valuesByMember.put(member, new HashMap<Object,Object>());
        }
        valuesByMember.get(member).put(key, entry.getValue());
    }

    InvocationService service = (InvocationService) CacheFactory.getService("InvocationService");
    PutAllObserver observer = new PutAllObserver(valuesByMember);
    for (Map.Entry<Member,Map<Object,Object>> entry : valuesByMember.entrySet()) {
        PutAllForMember putAllForMember = new PutAllForMember(cacheName, entry.getValue());
        service.execute(putAllForMember, Collections.singleton(entry.getKey()), observer);
    }

    Map<Object, Throwable> results = observer.getResults();
    setResult(results);
}

You can see that the asynchronous execute() method is now used on line 20 to make the call to the InvocationService using the PutAllObserver. After making all the invocation calls then on line 23 we call getResults on the PutAllObserver which will block until all the invocables have finished.

If we now run this in the performance test harness we get a time of 241ms, much better but slower than the standard Coherence putAll so we still have some work to do.

Serialization

Our PutAll implementation performs a lot of serialization and deserialization of the keys and values to be updated. In the standard Coherence putAll implementation there would be very little of this taking place. We can now take a look at how we can optimise our own serialization. As with a lot of things in Oracle Coherence there are usually many ways to implement a requirement and I am sure there are multiple ways to optimise the serialisation of our invocables. I am going to describe the way we used but you are welcome to come up with better ways. In the PutAll class we get passed the cache name and a map of key/value pairs. In the code above we need the cacheName and we need the keys in deserialized form to work out the partition owner, but we do not necessarily need the deserialized values. We can change the serialisation of the PutAll invocable so that we do not bother deserializing the values of the map.

  1. @Override
  2. public void writeExternal(PofWriter pofWriter) throws IOException {
  3.     pofWriter.writeString(1, cacheName);
  4.     pofWriter.writeInt(2, values.size());
  5.     Serializer serializer = pofWriter.getPofContext();
  6.     int id = 3;
  7.     for (Map.Entry entry : values.entrySet()) {
  8.         pofWriter.writeObject(id++, entry.getKey());
  9.         pofWriter.writeBinary(id++, ExternalizableHelper.toBinary(entry.getValue(), serializer));
  10.     }
  11. }
  12.  
  13. @SuppressWarnings({"unchecked"})
  14. @Override
  15. public void readExternal(PofReader pofReader) throws IOException {
  16.     cacheName = pofReader.readString(1);
  17.     values = new HashMap<Object,Object>();
  18.     int count = pofReader.readInt(2);
  19.     int id = 3;
  20.     for (int i=0; i<count; i++) {
  21.         Object key = pofReader.readObject(id++);
  22.         Binary value = pofReader.readBinary(id++);
  23.         values.put(key, value);
  24.     }
  25. }
@Override
public void writeExternal(PofWriter pofWriter) throws IOException {
    pofWriter.writeString(1, cacheName);
    pofWriter.writeInt(2, values.size());
    Serializer serializer = pofWriter.getPofContext();
    int id = 3;
    for (Map.Entry entry : values.entrySet()) {
        pofWriter.writeObject(id++, entry.getKey());
        pofWriter.writeBinary(id++, ExternalizableHelper.toBinary(entry.getValue(), serializer));
    }
}

@SuppressWarnings({"unchecked"})
@Override
public void readExternal(PofReader pofReader) throws IOException {
    cacheName = pofReader.readString(1);
    values = new HashMap<Object,Object>();
    int count = pofReader.readInt(2);
    int id = 3;
    for (int i=0; i<count; i++) {
        Object key = pofReader.readObject(id++);
        Binary value = pofReader.readBinary(id++);
        values.put(key, value);
    }
}

First the serialization in the writeExternal method; instead of using pofWriter.writeMap we first write a count of the size of the map and then individually write the key and value. For the value we serialize this ourselves into a Binary using the ExternalizableHelper class and the serializer from the PofWriter.

For the deserialization, we first read the count then the keys and values, now though we leave the value in Binary form, so it is not deserialized at all.

This leaves us with a problem though; our PutAllForMember class is now going to get a Map of keys in Object form and values in Binary form, so the call to a cache.put() as this requires the values in Object form. We do not want to deserialize the value to do a put, as the whole point here is to avoid deserialization. Instead we need a way to directly update the binary value for the corresponding keys.

It is possible to get hold of the local baking map for a cache and we could then directly put the keys and values into the backing map, making sure we converted the key to Binary. The problem with this is that direct backing map updates bypass any triggers on the cache, so this is no good.

Instead, to go via the front door and make sure we bypass nothing, we can use an EntryProcessor as this has access to the BinaryEntry and can do a binary update. Now, EntryProcessors are slow I hear you say, and yes they normally would be, but this EntryProcessor will be executing locally, there will be no over the wire calls and serialization involved, just a bit of locking – which is not too bad. The code for the entry processor is very simple as it only has to update the entry.

  1. public class BinaryValueUpdater extends AbstractProcessor implements PortableObject {
  2.  
  3.     private Binary binaryValue;
  4.  
  5.     public BinaryValueUpdater() {
  6.     }
  7.  
  8.     public BinaryValueUpdater(Binary binaryValue) {
  9.         this.binaryValue = binaryValue;
  10.     }
  11.  
  12.     @Override
  13.     public Object process(InvocableMap.Entry entry) {
  14.         ((BinaryEntry)entry).updateBinaryValue(binaryValue);
  15.         return null;
  16.     }
  17.  
  18.     @Override
  19.     public void readExternal(PofReader pofReader) throws IOException {
  20.         binaryValue = pofReader.readBinary(1);
  21.     }
  22.  
  23.     @Override
  24.     public void writeExternal(PofWriter pofWriter) throws IOException {
  25.         pofWriter.writeBinary(1, binaryValue);
  26.     }
  27. }
public class BinaryValueUpdater extends AbstractProcessor implements PortableObject {

    private Binary binaryValue;

    public BinaryValueUpdater() {
    }

    public BinaryValueUpdater(Binary binaryValue) {
        this.binaryValue = binaryValue;
    }

    @Override
    public Object process(InvocableMap.Entry entry) {
        ((BinaryEntry)entry).updateBinaryValue(binaryValue);
        return null;
    }

    @Override
    public void readExternal(PofReader pofReader) throws IOException {
        binaryValue = pofReader.readBinary(1);
    }

    @Override
    public void writeExternal(PofWriter pofWriter) throws IOException {
        pofWriter.writeBinary(1, binaryValue);
    }
}

That’s all there is to it. We can now amend the PutAllForMember run() method to work with the Binary values from the map.

  1. @Override
  2. public void run() {
  3.     Map<Object,Throwable> errors = new HashMap<Object,Throwable>();
  4.     NamedCache cache = CacheFactory.getCache(cacheName);
  5.     for (Map.Entry entry : values.entrySet()) {
  6.         try {
  7.             cache.invoke(entry.getKey(), new BinaryValueUpdater((Binary) entry.getValue()));
  8.         } catch (Throwable t) {
  9.             errors.put(entry.getKey(), t);
  10.         }
  11.     }
  12.     setResult(errors);
  13. }
@Override
public void run() {
    Map<Object,Throwable> errors = new HashMap<Object,Throwable>();
    NamedCache cache = CacheFactory.getCache(cacheName);
    for (Map.Entry entry : values.entrySet()) {
        try {
            cache.invoke(entry.getKey(), new BinaryValueUpdater((Binary) entry.getValue()));
        } catch (Throwable t) {
            errors.put(entry.getKey(), t);
        }
    }
    setResult(errors);
}

Now the PutAll invocable run method does a cache.invoke call using the BinaryValueUpdater and the Binary value, so we save any deserialization. Running our performance test now we can do 1000 key/value pairs in 92ms - so now we are actually faster than the Coherence built in putAll, so not a bad result.

Non-Java Coherence*Extend Clients

It is possible to use this PutAll implementation from a non-Java client. You will obviously need to write the client-side stub of the PutAll invocable and add it to the client side POF configuration file with the same POF ID as the server side. Below is the C# version of the PuAll invocable...

  1. using System;
  2. using System.Collections;
  3. using Tangosol.Net;
  4. using Tangosol.IO.Pof;
  5. using Tangosol.Util;
  6.  
  7. namespace TheGridMan
  8. {
  9.     class PutAll : AbstractInvocable
  10.     {
  11.         #region Constructors
  12.  
  13.         public PutAll(string cacheName, IDictionary values)
  14.         {
  15.             CacheName = cacheName;
  16.             Values = values;
  17.         }
  18.  
  19.         #endregion
  20.  
  21.         #region Properties
  22.  
  23.         public string CacheName { get; set; }
  24.  
  25.         public IDictionary Values { get; set; }
  26.  
  27.         #endregion
  28.  
  29.         #region AbstractInvocable
  30.  
  31.         public override void Run()
  32.         { }
  33.  
  34.         public override void ReadExternal(IPofReader reader)
  35.         {
  36.             CacheName = reader.ReadString(1);
  37.             int count = reader.ReadInt32(2);
  38.             Values = new Hashtable();
  39.             int id = 3;
  40.             for (int i = 1; i < count; i++)
  41.             {
  42.                 Object key = reader.ReadObject(id++);
  43.                 Object value = reader.ReadObject(id++);
  44.                 Values[key] = value;
  45.             }
  46.         }
  47.  
  48.         public override void WriteExternal(IPofWriter writer)
  49.         {
  50.             writer.WriteString(1, CacheName);
  51.             writer.WriteInt32(2, Values.Count);
  52.             int id = 3;
  53.             foreach (DictionaryEntry entry in Values)
  54.             {
  55.                 writer.WriteObject(id++, entry.Key);
  56.                 writer.WriteBinary(id++, SerializationHelper.ToBinary(entry.Value, writer.PofContext));
  57.             }
  58.         }
  59.  
  60.         #endregion
  61.     }
  62. }
using System;
using System.Collections;
using Tangosol.Net;
using Tangosol.IO.Pof;
using Tangosol.Util;

namespace TheGridMan
{
    class PutAll : AbstractInvocable
    {
        #region Constructors

        public PutAll(string cacheName, IDictionary values)
        {
            CacheName = cacheName;
            Values = values;
        }

        #endregion

        #region Properties

        public string CacheName { get; set; }

        public IDictionary Values { get; set; }

        #endregion

        #region AbstractInvocable

        public override void Run()
        { }

        public override void ReadExternal(IPofReader reader)
        {
            CacheName = reader.ReadString(1);
            int count = reader.ReadInt32(2);
            Values = new Hashtable();
            int id = 3;
            for (int i = 1; i < count; i++)
            {
                Object key = reader.ReadObject(id++);
                Object value = reader.ReadObject(id++);
                Values[key] = value;
            }
        }

        public override void WriteExternal(IPofWriter writer)
        {
            writer.WriteString(1, CacheName);
            writer.WriteInt32(2, Values.Count);
            int id = 3;
            foreach (DictionaryEntry entry in Values)
            {
                writer.WriteObject(id++, entry.Key);
                writer.WriteBinary(id++, SerializationHelper.ToBinary(entry.Value, writer.PofContext));
            }
        }

        #endregion
    }
}

The run method is obviously empty as it will never run on the client. I expect you could also leave the ReadExternal method empty too, as we would never expect a PutAll class to be deserialized on the client side either.

To use PutAll in a C# client we can do the following...

  1. IDictionary values = new Hashtable();
  2. // populate values dictionary...
  3. values["Key-1"] = "Value-1";
  4.  
  5. PutAll putAll = new PutAll("dist-test", values);
  6. IInvocationService service = (IInvocationService) CacheFactory.GetService("remote-invocation-service");
  7. IDictionary results = service.Query(putAll, null);
  8. IDictionary errors = (IDictionary) results[CacheFactory.ConfigurableCacheFactory.LocalMember];
  9. if (errors.Count > 0)
  10. {
  11.     // Oops... some puts failed!
  12. }
IDictionary values = new Hashtable();
// populate values dictionary...
values["Key-1"] = "Value-1";

PutAll putAll = new PutAll("dist-test", values);
IInvocationService service = (IInvocationService) CacheFactory.GetService("remote-invocation-service");
IDictionary results = service.Query(putAll, null);
IDictionary errors = (IDictionary) results[CacheFactory.ConfigurableCacheFactory.LocalMember];
if (errors.Count > 0)
{
    // Oops... some puts failed!
}

If the errors IDictionary contains anything then some of the entries failed.

Sorry C++ users but being a Java developer I can knock out some C# code but would have difficulty with a C++ example. Still all you C++ devs must be pretty smart so I'm sure you can work it out from the C# example above.

Further Enhancements

There are various other enhancements that could be made to the code to add other functionality.

Running from Cluster Members or Extend Clients

The code above would only work from a Coherence*Extend client as it relies on the PutAll invocable being serialized over the wire as the first step. We can make some changes to allow it to be called from a cluster member too. In the system that we originally wrote this for we have a way to determine whether we are running an invocable from a client or a cluster member and take the appropriate action. We can amend the PutAll invocable so it can be used from cluster members too. If running on a cluster member we do not need the first invocation call but we still need to make sure we have serialized the values into Binary so we add a flag, defaulted to false, to tell us whether the values have been converted. The readExternal method will have read Binary values so it can set the flag to true. We then check the flag in the run method and if it is false we can call a method to convert the values to Binary. The code looks like this…

  1. public class PutAll extends AbstractInvocable implements PortableObject {
  2.     public static final String INVOCATION_SERVICE_NAME = "InvocationService";
  3.  
  4.     private String cacheName;
  5.     private Map<Object,Object> values;
  6.     private boolean valuesAreBinary = false;
  7.  
  8.     public PutAll() {
  9.     }
  10.  
  11.     public PutAll(String cacheName, Map<Object, Object> values) {
  12.         this.cacheName = cacheName;
  13.         this.values = values;
  14.     }
  15.  
  16.     @SuppressWarnings({"unchecked"})
  17.     @Override
  18.     public void run() {
  19.         NamedCache cache = CacheFactory.getCache(cacheName);
  20.         DistributedCacheService cacheService = (DistributedCacheService) cache.getCacheService();
  21.  
  22.         ensureValuesAreBinary(cacheService.getSerializer());
  23.  
  24.         Map<Member,Map<Object,Object>> valuesByMember = new HashMap<Member,Map<Object,Object>>();
  25.         for (Map.Entry entry : values.entrySet()) {
  26.             Object key = entry.getKey();
  27.             Member member = cacheService.getKeyOwner(key);
  28.             if (!valuesByMember.containsKey(member)) {
  29.                 valuesByMember.put(member, new HashMap<Object,Object>());
  30.             }
  31.             valuesByMember.get(member).put(key, entry.getValue());
  32.         }
  33.  
  34.         InvocationService service = (InvocationService) CacheFactory.getService(INVOCATION_SERVICE_NAME);
  35.         PutAllObserver observer = new PutAllObserver(valuesByMember);
  36.         for (Map.Entry<Member,Map<Object,Object>> entry : valuesByMember.entrySet()) {
  37.             PutAllForMember putAllForMember = new PutAllForMember(cacheName, entry.getValue());
  38.             service.execute(putAllForMember, Collections.singleton(entry.getKey()), observer);
  39.         }
  40.  
  41.         Map<Object, Throwable> results = observer.getResults();
  42.         setResult(results);
  43.     }
  44.  
  45.     @SuppressWarnings({"unchecked"})
  46.     @Override
  47.     public void readExternal(PofReader pofReader) throws IOException {
  48.         cacheName = pofReader.readString(1);
  49.         values = new HashMap<Object,Object>();
  50.         int count = pofReader.readInt(2);
  51.         int id = 3;
  52.         for (int i=0; i<count; i++) {
  53.             Object key = pofReader.readObject(id++);
  54.             Binary value = pofReader.readBinary(id++);
  55.             values.put(key, value);
  56.         }
  57.         valuesAreBinary = true;
  58.     }
  59.  
  60.     @Override
  61.     public void writeExternal(PofWriter pofWriter) throws IOException {
  62.         pofWriter.writeString(1, cacheName);
  63.         pofWriter.writeInt(2, values.size());
  64.         Serializer serializer = pofWriter.getPofContext();
  65.         int id = 3;
  66.         for (Map.Entry entry : values.entrySet()) {
  67.             pofWriter.writeObject(id++, entry.getKey());
  68.             pofWriter.writeBinary(id++, ExternalizableHelper.toBinary(entry.getValue(), serializer));
  69.         }
  70.     }
  71.  
  72.     private synchronized void ensureValuesAreBinary(Serializer serializer) {
  73.         if (!valuesAreBinary) {
  74.             Map<Object, Object> converted;
  75.             if (this.values.isEmpty()) {
  76.                 converted = this.values;
  77.             } else {
  78.                 converted = new HashMap<Object,Object>();
  79.                 for (Map.Entry entry : values.entrySet()) {
  80.                     converted.put(entry.getKey(), ExternalizableHelper.toBinary(entry.getValue(), serializer));
  81.                 }
  82.             }
  83.             values = converted;
  84.             valuesAreBinary = true;
  85.         }
  86.     }
  87. }
public class PutAll extends AbstractInvocable implements PortableObject {
    public static final String INVOCATION_SERVICE_NAME = "InvocationService";

    private String cacheName;
    private Map<Object,Object> values;
    private boolean valuesAreBinary = false;

    public PutAll() {
    }

    public PutAll(String cacheName, Map<Object, Object> values) {
        this.cacheName = cacheName;
        this.values = values;
    }

    @SuppressWarnings({"unchecked"})
    @Override
    public void run() {
        NamedCache cache = CacheFactory.getCache(cacheName);
        DistributedCacheService cacheService = (DistributedCacheService) cache.getCacheService();

        ensureValuesAreBinary(cacheService.getSerializer());

        Map<Member,Map<Object,Object>> valuesByMember = new HashMap<Member,Map<Object,Object>>();
        for (Map.Entry entry : values.entrySet()) {
            Object key = entry.getKey();
            Member member = cacheService.getKeyOwner(key);
            if (!valuesByMember.containsKey(member)) {
                valuesByMember.put(member, new HashMap<Object,Object>());
            }
            valuesByMember.get(member).put(key, entry.getValue());
        }

        InvocationService service = (InvocationService) CacheFactory.getService(INVOCATION_SERVICE_NAME);
        PutAllObserver observer = new PutAllObserver(valuesByMember);
        for (Map.Entry<Member,Map<Object,Object>> entry : valuesByMember.entrySet()) {
            PutAllForMember putAllForMember = new PutAllForMember(cacheName, entry.getValue());
            service.execute(putAllForMember, Collections.singleton(entry.getKey()), observer);
        }

        Map<Object, Throwable> results = observer.getResults();
        setResult(results);
    }

    @SuppressWarnings({"unchecked"})
    @Override
    public void readExternal(PofReader pofReader) throws IOException {
        cacheName = pofReader.readString(1);
        values = new HashMap<Object,Object>();
        int count = pofReader.readInt(2);
        int id = 3;
        for (int i=0; i<count; i++) {
            Object key = pofReader.readObject(id++);
            Binary value = pofReader.readBinary(id++);
            values.put(key, value);
        }
        valuesAreBinary = true;
    }

    @Override
    public void writeExternal(PofWriter pofWriter) throws IOException {
        pofWriter.writeString(1, cacheName);
        pofWriter.writeInt(2, values.size());
        Serializer serializer = pofWriter.getPofContext();
        int id = 3;
        for (Map.Entry entry : values.entrySet()) {
            pofWriter.writeObject(id++, entry.getKey());
            pofWriter.writeBinary(id++, ExternalizableHelper.toBinary(entry.getValue(), serializer));
        }
    }

    private synchronized void ensureValuesAreBinary(Serializer serializer) {
        if (!valuesAreBinary) {
            Map<Object, Object> converted;
            if (this.values.isEmpty()) {
                converted = this.values;
            } else {
                converted = new HashMap<Object,Object>();
                for (Map.Entry entry : values.entrySet()) {
                    converted.put(entry.getKey(), ExternalizableHelper.toBinary(entry.getValue(), serializer));
                }
            }
            values = converted;
            valuesAreBinary = true;
        }
    }
}

To use PutAll from a cluster member we do not use the invocation service in the first step, we just call the PutAll run method directly then call the getResult() method to get the errors.

  1. Map<Object,Object> values = new HashMap<Object,Object>();
  2. // --- Populate the values map with the key/value pairs to put ---
  3. PutAll putAll = new PutAll("dist-test", values);
  4. putAll.run();
  5. Map<Object,Throwable> errors = putAll.getResult();
  6. // --- check the errors map for any failures
Map<Object,Object> values = new HashMap<Object,Object>();
// --- Populate the values map with the key/value pairs to put ---
PutAll putAll = new PutAll("dist-test", values);
putAll.run();
Map<Object,Throwable> errors = putAll.getResult();
// --- check the errors map for any failures

Ordering

One use of putAll in the system I am currently working on is part of the initial data population when the cluster starts up. Because of the way data is loaded we do multiple updates for the same key as we build up the data. We needed the putAll to preserve the ordering of the key/value pairs so we could in effect do a putAll where we sent multiple values for the same key in a single putAll call but ensuring we maintained the order. This is easy to do by replacing the Maps holding the keys and values in the PutAll and PutAllFormember invocables with a List of some sort of tuple class to hold the key and corresponding value. Instead of iterating over a Map we iterate over a List so maintaining order.

Setting Expiry

The NamedCache class has a version of the put method that takes a third parameter to specify the expiry time for the key and value being put but there is no putAll equivalent of this method. We could enhance out version of PutAll to set a bulk expiry value for all the entries; we just need to pass a long value to the PutAll and PutAllForMember invocables constructors, not forgetting to add them to the readExternal and writeExternal methods.

Conclusions

So there you have it, an alternative version of Coherence putAll that allows you to recover from exceptions thrown from updates. The approach has proved to be work well and remarkably fast in a stable cluster - that is one in which partitions are not moving. Given that partitions only move when nodes leave or join the cluster that is not too bad. If a node leaves the cluster then there might be problems if that node was executing one of the PutAllForMember invocables, but you should be able to catch the error and recover. But then you really don't want nodes leaving your cluster very often, if ever, do you.

You may also like...

3 Responses

  1. Craig Day says:

    Nice work. Really nice! Being able to implement conceptually tricky stuff so elegantly is one of the reasons I love Coherence.

  2. Hi JK,

    Very interesting read! One thing I wanted to point out is that if you want to put binaries into a cache, you can use the following to get the NamedCache:


    import com.tangosol.util.NullImplementation;

    ...

    CacheFactory.getCache("name", NullImplementation.getClassLoader());

    It would be interesting to compare this with BinaryValueUpdater. If you do try it, I would still do a putAll instead of a put for a single value so that we don’t have to return the old value (on a local VM this is essentially free, but still unnecessary).

    -Patrick

    • jk says:

      Hi Patrick,

      Interesting, that’s one I didn’t know, thanks for the tip. I will give it a try and see if it makes any difference.

      Cheers
      JK

Leave a Reply to Craig Day Cancel reply

Your email address will not be published. Required fields are marked *