Running Rings Around Plain Java - The Killer Code

I wrote my previous post too fast. I found a very simple change that increases the speed x6!

The idea is too process messages in a ThreadPoolExecutor. As my Nodes are Runnable, I just needed to initialize a common ThreadPoolExecutor, and in a sendMessage, execute the runnable each time.

Here is the full code: public class OptimizedRing {

    private ExecutorService executor;

    public static void main(String[] args) throws Exception {
        OptimizedRing ring = new OptimizedRing();
        RingNode node = ring.startRing(Integer.parseInt(args[0]));
        node.sendMessage(new StartMessage());
    }

    public RingNode startRing(int n) {
        RingNode[] nodes = spawnNodes(n, startTimer());
        connectNodes(n, nodes);
        return nodes[0];
    }

    private Timer startTimer() {
        Timer timer = new Timer();
        new Thread(timer).start();
        return timer;
    }

    private RingNode[] spawnNodes(int n, final Timer timer) {
        System.out.println("constructing nodes");
        long start = System.currentTimeMillis();
        executor = Executors.newFixedThreadPool(4);
        RingNode[] nodes = new RingNode[n+1];
        for (int i = 0; i < n ; i++) {
            nodes[i] = new RingNode(i, timer, null);
        }
        long end = System.currentTimeMillis();
        System.out.println("Took "+(end-start)+"ms to construct "+n+" nodes");
        return nodes;
    }

    private void connectNodes(int n, RingNode[] nodes) {
        System.out.println("connecting nodes");
        nodes[n] = nodes[0];
        for (int i=0; i<n; i++) {
            nodes[i].connect(nodes[i+1]);
        }
    }

    interface Message {
        String getType();
    }

    private static class StartMessage implements Message {
        public String getType() {
            return "START";
        }
    }

    private static class StopMessage implements Message {
        public String getType() {
            return "STOP";            
        }
    }

    private static class CancelMessage implements Message {
        public String getType() {
            return "CANCEL";
        }
    }

    private static class TokenMessage implements Message {
        private int nodeId;
        private int value;

        public TokenMessage(int nodeId, int value) {
            this.nodeId = nodeId;
            this.value = value;
        }

        public String getType() {
            return "TOKEN";
        }
    }

    private class RingNode implements Runnable {
        private int nodeId;
        private Timer timer;
        private RingNode nextNode;
        private BlockingQueue<Message> queue = new LinkedBlockingQueue<Message>();
        private boolean isActive = false;
        
        public RingNode(int id, Timer timer, RingNode nextNode) {
            nodeId = id;
            this.timer = timer;
            this.nextNode = nextNode;                        
        }

        public void connect(RingNode node) {
            nextNode = node;
            isActive = true;
        }

        public void sendMessage(Message m) {
            queue.add(m);
            executor.execute(this);
        }

        public void run() {
            if (isActive) {
                try {
                    Message m = queue.take();
                    if (m instanceof StartMessage) {
                        log("Starting messages");
                        timer.sendMessage(m);
                        nextNode.sendMessage(new TokenMessage(nodeId, 0));
                    } else if (m instanceof StopMessage) {
                        log("Stopping");
                        nextNode.sendMessage(m);
                        isActive = false;
                        //
                    } else if (m instanceof TokenMessage) {
                        if (((TokenMessage)m).nodeId == nodeId) {
                            int nextValue = ((TokenMessage)m).value + 1;
                            if (nextValue % 10000 == 0) {
                                log("Around ring "+nextValue+" times");
                            }
                            if (nextValue == 1000000) {
                                timer.sendMessage(new StopMessage());
                                timer.sendMessage(new CancelMessage());
                                nextNode.sendMessage(new StopMessage());
                                isActive = false;
                            } else {
                                nextNode.sendMessage(new TokenMessage(nodeId, nextValue));
                            }

                        } else {
                            nextNode.sendMessage(m);
                        }
                    }

                } catch (InterruptedException ie) {
                    ie.printStackTrace();
                }
            }
        }

        public void log(String s) {
            System.out.println(System.currentTimeMillis()+" "+nodeId+": "+s);
        }

    }

    private static class Timer implements Runnable {
        private BlockingQueue<Message> queue = new LinkedBlockingQueue<Message>();
        private boolean timing = false;
        private long startTime;

        public void sendMessage(Message m) {
            //we don't need to change this implementation as timer is rarely called
            queue.add(m);
        }

        public void run() {
            while (true) {
                Message m;
                try {
                    m = queue.take();
                    if (m instanceof StartMessage) {
                        startTime = System.currentTimeMillis();
                        timing = true;
                    } else if (m instanceof StopMessage) {
                        long end = System.currentTimeMillis();
                        System.out.println("Start="+startTime+" Stop="+end+" Elapsed="+(end-startTime));
                        timing = false;                                        
                    } else if (m instanceof CancelMessage) {
                        break;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}




Code
Spawn
Send 100M messages
Scala Actors
15ms
270104ms
SimpleRing
11ms
493073ms
OptimizedRing (4 threads)
6ms

84727ms
OptimizedRing (5+ threads)
5ms

62593ms
OptimizedRing (1 thread)
5ms

60660ms


I finally saw my 4 cores used! Max multithreaded throughput is achieved at 5 threads. However 1 thread is faster. Is this related to memory bandwith limit?

Now I am left wondering if actors are really that important if one can achieve much higher throughput using plain Java and very simple concepts (BlockingQueue, ThreadPoolExecutor). Worse, this test is actually faster with only 1 thread...

Comments

comments powered by Disqus