Wednesday, March 18, 2009

Oracle Coherence learnings, so far...Part 1

I've been going thru a lab which is dealing with a set of new functions in each chapter. Here are my learnings so far:

1.Definition: Oracle Coherence is an in-memory data grid solution that enables organizations to predictably scale mission-critical applications by providing fast access to frequently used data.

2. The standard way of putting and getting data into cache:
NamedCache cache = CacheFactory.getCache("person");
Person p1 = new Person(2, "Jade", "Goody", "London, 38", 36, Person.FEMALE);
cache.put(p1.getId(), p1);
Person p2 = (Person)cache.get(1);


3. The custom objects that you put into coherence cache must be atleast Serializable. If you want a more efficient way then implement com.tangosol.io.ExternalizableLite. This will require you to implement two methods too but mashalling and unmarshalling of objects become very efficient.

4. To do bulk upload of data into the in-memory cache, you would want to do it efficiently like this:
public static void bulkLoad(NamedCache cache, Connection conn)
{
Statement s;
ResultSet rs;
Map buffer = new HashMap();

try
{
int count = 0;
s = conn.createStatement();
rs = s.executeQuery("select key, value from table");
while (rs.next())
{
Integer key = new Integer(rs.getInt(1));
String value = rs.getString(2);
buffer.put(key, value);

// this loads 1000 items at a time into the cache
if ((count++ % 1000) == 0)
{
cache.putAll(buffer);
buffer.clear();
}
}
if (!buffer.isEmpty())
{
cache.putAll(buffer);
}
...
}
catch (SQLException e)
{...}

}
5. To carry-out efficient processing of filtered results you may want to do this instead of the regular iterator stuff:
public static void performQuery()
{
NamedCache c = CacheFactory.getCache("test");

// Search for entries that start with 'c'
Filter query = new LikeFilter(IdentityExtractor.INSTANCE, "c%", '\\', true);

// Perform query, return keys of entries that match
Set keys = c.keySet(query);

// The amount of objects to process at a time
final int BUFFER_SIZE = 100;

// Object buffer
Set buffer = new HashSet(BUFFER_SIZE);

for (Iterator i = keys.iterator(); i.hasNext();)
{
buffer.add(i.next());

if (buffer.size() >= BUFFER_SIZE)
{
// Bulk load BUFFER_SIZE number of objects from cache
Map entries = c.getAll(buffer);

// Process each entry
process(entries);

// Done processing these keys, clear buffer
buffer.clear();
}
}
// Handle the last partial chunk (if any)
if (!buffer.isEmpty())
{
process(c.getAll(buffer));
}

}
6. This is how filtering works:
Set malesOver35 = cache.entrySet(
new AndFilter(new EqualsFilter("getGender", Person.MALE),
new GreaterEqualsFilter("getAge", 35)));

7. This is how aggregation works:
Double avgAgeMales =
(Double)cache.aggregate(new EqualsFilter("getGender", Person.MALE),
new DoubleAverage("getAge"));


8. Entry Processes are agents that perform processing against entries, and will carry this out directly where the data is being held. The sort of processing you can carry out may change the data, e.g. create, update or remove or may just perform calculations on the data. Entry processors that work against the same key will be logically queued. This means that you can achieve lock-free (high performance) processing. A small example is as follows:
Class RaiseSalary extents AbstractProcessor {
...
public Object process (Entry entry) {
Employee emp = (Employee)entry.getValue();
emp.setSalary(emp.getSalary() * 1.10);
entry.setValue(emp);
return null;
}
To invoke this you then do the following:
empCache.invokeAll(AlwaysFilter.INSTANCE, new RaiseSalary());

No comments: