Event Compression - A Case of Foreign Exchange Rates

Often in financial industry we deal with extremely fast rate of data production and consumption. Foreign Exchange(FX) rates or stock prices are common example. We also many a times need to throttle because the consumer is slow. For example display of FX Rates or stocks in a web portal or display boards. In this blog post I am going to discuss once such problem I solved long time back.

We had FX pricing engine which used to publish FX rates at around 1500-3000 rates per second to a topic. These rates were then consumed by different internal applications. Once such downstream application was heavily reliant on latest rates(but may not need all) being available. We had a integration layer to consume price from topic and use the downstream application’s API to get the rates in. Over the time when number number of published rates increased we received customer complaining about rates being lagged.

Though 1500 rates per second is not much, the integration layer and the downstream applications were not capable enough to handle that much of volume. The integration layer had a bit of data compression logic but it was not enough. Consuming from topic and converting the data format and then publishing to the downstream took almost 15-20ms. Even with multithreading we were not able to publish more than 1000 rates per second to downstream. Also thanks to yourkit profiler, we were able to find other performance issues related to Java Serialization.

When I was working on this, I read a blog post on Coalescing Ring Buffer at this

With the coalescing ring buffer my design became much simpler. Here is some pseudo code of my implementation

This is pseudo code to communicate the idea. This does not work.

The subscriber to topic

public class TopicSubscriber {
    //Injected
    CoalescingBuffer<String, Price> buffer;

    public void onMessage(ObjectMessage message) {
        Price price = (Price) message.getObject();
        buffer.offer(price.getCurrencyPair(), price);
    }
}

The consumer of coalescing ring buffer

public class RingBufferConsumerThread extends Thread {
    //Injected
    CoalescingBuffer<String, Price> buffer;
    DownStreamService downStream;
    ConcurrentHashMap<String, PriceDispatcherThread> threads;

    public void run() {
        while (true) {
            if (!buffer.isEmpty()) {
                List<Price> prices = new ArrayList<Price>();
                buffer.poll(prices);
                for (Price price : prices) {
                    //Assuming that threads are already started for each currency pair
                    PriceDispatcherThread thread = threads.get(price.getCurrencyPair());
                    thread.setPrice(price);
                }
            }
        }
    }
}

One thread per currency pair to publish to down stream

class PriceDispatcherThread extends Thread {
    private AtomicReference<Price> atomicPrice;
    private final DownStreamService downStream;

    public PriceDispatcherThread(DownloadService downloadService) {
        this.downStream = downloadService;
    }

    public void setPrice(Price price) {
        this.atomicPrice.set(price);
    }

    public void run() {
        while (true) {
            if (atomicPrice.get() != null) {
                Price price = atomicPrice.getAndSet(null);
                downStream.send(price);
            }
        }
    }
}

Though there is more to this story I have tried to capture the idea here. I also had to use kryo to improve serialization.

Have you solved a similar problem, I would be interest to know how you solved it.

comments powered by Disqus