可以通过Maven或者下载jar来安装Disruptor。只要把对应的jar放在Java classpath就可以了。
public class LongEvent { private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; } }
public class LongEventFactory implements EventFactory {
@Override public Object newInstance() { return new LongEvent(); } }
/** */public class LongEventHandler implements EventHandler<LongEvent> { @Override public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception { System.out.println(longEvent.getValue()); } }
public class LongEventProducer { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } /** * onData用来发布事件,每调用一次就发布一次事件事件 * 它的参数会通过事件传递给消费者 * * @param bb */public void onData(ByteBuffer bb) { //可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽 long sequence = ringBuffer.next();try { //用上面的索引取出一个空的事件用于填充 LongEvent event = ringBuffer.get(sequence);// for the sequence event.setValue(bb.getLong(0)); } finally { //发布事件 ringBuffer.publish(sequence); } } }
Disruptor 3.0提供了lambda式的API。这样可以把一些复杂的操作放在Ring Buffer,所以在Disruptor3.0以后的版本最好使用Event Publisher或者Event Translator来发布事件。
public class LongEventProducerWithTranslator { //一个translator可以看做一个事件初始化器,publicEvent方法会调用它 //填充Event private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() { public void translateTo(LongEvent event, long sequence, ByteBuffer bb) { event.setValue(bb.getLong(0)); } }; private final RingBuffer<LongEvent> ringBuffer; public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer bb) { ringBuffer.publishEvent(TRANSLATOR, bb); } }
上面写法的另一个好处是,Translator可以分离出来并且更加容易单元测试。Disruptor提供了不同的接口(EventTranslator, EventTranslatorOneArg, EventTranslatorTwoArg, 等等)去产生一个Translator对象。很明显,Translator中方法的参数是通过RingBuffer来传递的。
public class LongEventMain { public static void main(String[] args) throws InterruptedException { // Executor that will be used to construct new threads for consumers Executor executor = Executors.newCachedThreadPool(); // The factory for the event LongEventFactory factory = new LongEventFactory(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024; // Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor); // Connect the handler disruptor.handleEventsWith(new LongEventHandler()); //handler执行完之后接着执行handler1,可以组合复杂的任务 //LongEventHandler1 handler1 = new LongEventHandler1(); //disruptor.after(handler).then(handler1); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducer producer = new LongEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); producer.onData(bb); Thread.sleep(1000); } } }
使用Java 8
Disruptor在自己的接口里面添加了对于Java 8 Lambda的支持。大部分Disruptor中的接口都符合Functional Interface的要求(也就是在接口中仅仅有一个方法)。所以在Disruptor中,可以广泛使用Lambda来代替自定义类。
public class LongEventMainJava8 { /** * 用lambda表达式来注册EventHandler和EventProductor * @param args * @throws InterruptedException */public static void main(String[] args) throws InterruptedException { // Executor that will be used to construct new threads for consumers Executor executor = Executors.newCachedThreadPool(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024;// Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor); // 可以使用lambda来注册一个EventHandler disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event.getValue())); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducer producer = new LongEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8);for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent((event, sequence, buffer) -> event.setValue(buffer.getLong(0)), bb); Thread.sleep(1000); } } }
ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0))); Thread.sleep(1000); }
由于在Java 8中方法引用也是一个lambda,因此还可以把上面的代码改成下面的代码:
public class LongEventWithMethodRef { public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println(event.getValue()); } public static void translate(LongEvent event, long sequence, ByteBuffer buffer) { event.setValue(buffer.getLong(0)); } public static void main(String[] args) throws Exception { // Executor that will be used to construct new threads for consumers Executor executor = Executors.newCachedThreadPool(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024; // Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor); // Connect the handler disruptor.handleEventsWith(LongEventWithMethodRef::handleEvent); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducer producer = new LongEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent(LongEventWithMethodRef::translate, bb); Thread.sleep(1000); } } }
单或多 事件生产者
public class singleProductorLongEventMain { public static void main(String[] args) throws Exception { //.....// Construct the Disruptor with a SingleProducerSequencer Disruptor<LongEvent> disruptor = new Disruptor(factory, bufferSize, ProducerType.SINGLE, // Single producernew BlockingWaitStrategy(), executor);//..... } }
为了证明,下面的数据是从Mac Air i7上面测试的结果:
Run 0, Disruptor=26,553,372 ops/sec
Run 1, Disruptor=28,727,377 ops/sec
Run 2, Disruptor=29,806,259 ops/sec
Run 3, Disruptor=29,717,682 ops/sec
Run 4, Disruptor=28,818,443 ops/sec
Run 5, Disruptor=29,103,608 ops/sec
Run 6, Disruptor=29,239,766 ops/sec
Run 0, Disruptor=89,365,504 ops/sec
Run 1, Disruptor=77,579,519 ops/sec
Run 2, Disruptor=78,678,206 ops/sec
Run 3, Disruptor=80,840,743 ops/sec
Run 4, Disruptor=81,037,277 ops/sec
Run 5, Disruptor=81,168,831 ops/sec
Run 6, Disruptor=81,699,346 ops/sec
转载自 http://ifeve.com/disruptor-getting-started/
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。