So, recently when I was parsing zillions of rows of data and aggregating related data into partial CSV files and then aggregating the bits of partial of data to reports I thought - Aha! MapReduce.
For a whole bunch of good design decisions I was using Apache Camel - a neat pipelining tool which with a bit of help from ActiveMQ provides the sort of long running stability that I needed. Camel however does not do Map/Reduce, but it does have the Aggregator Integration pattern, which you can use to so a similar thing.
![]() |
Image courtesy of Carlos Oliveira |
Now, imagine you empty your jar of loose change into one of those coin counting machines in the Mall. Internally all the coins are sorted by falling through a hole which is either nickle, dime or quarter shaped and as they emerge from the other side they are counted*. That's aggregation Camel style.
I did hit a bit of a snag. I couldn't work out how to tell the Aggregator Integration pattern that there were no more files to come... Stop... Woaa-there... Desist!
It turns out that hidden away (in the middle of the docs) the File endpoint rather usefully sets a flag in the headers called CamelBatchComplete which is just what I was looking for:
<route id="report_month_to_date"> | |
<from uri="file:partials" /> | |
<unmarshal><csv/></unmarshal> | |
<to uri="bean:myCsvHandler?method=doHandleCsvData" /> | |
<aggregate strategyRef="serviceStrategy"> | |
<correlationExpression> | |
<simple>${header.month} == ${date:now:yyyyMM}</simple> | |
</correlationExpression> | |
<completionPredicate> | |
<simple>${header.CamelBatchComplete}</simple> | |
</completionPredicate> | |
<to uri="file:reports/?fileName=${header.month}.csv" /> | |
</aggregate> | |
</route> |
* I have no idea how a coin counting machine works.