I've taken the code example from Stack Overflow question Disruptor.NET example and modified it to "measure" time. Full listing is below:
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;
namespace DisruptorTest
{
public sealed class ValueEntry
{
public long Value { get; set; }
public ValueEntry()
{
Console.WriteLine("New ValueEntry created");
}
}
public class ValueAdditionHandler : IEventHandler<ValueEntry>
{
public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
{
Program.sw.Stop();
long microseconds = Program.sw.ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
Console.WriteLine("elapsed microseconds = " + microseconds);
Console.WriteLine("Event handled: Value = {0} (processed event {1}", data.Value, sequence);
}
}
class Program
{
public static Stopwatch sw = Stopwatch.StartNew();
private static readonly Random _random = new Random();
private static readonly int _ringSize = 16; // Must be multiple of 2
static void Main(string[] args)
{
var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), _ringSize, TaskScheduler.Default);
disruptor.HandleEventsWith(new ValueAdditionHandler());
var ringBuffer = disruptor.Start();
while (true)
{
var valueToSet = _random.Next();
long sequenceNo = ringBuffer.Next();
ValueEntry entry = ringBuffer[sequenceNo];
entry.Value = valueToSet;
sw.Restart();
ringBuffer.Publish(sequenceNo);
Console.WriteLine("Published entry {0}, value {1}", sequenceNo, entry.Value);
Thread.Sleep(1000);
}
}
}
}
And the output is:
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
Published entry 0, value 1510145842
elapsed microseconds = 2205
Event handled: Value = 1510145842 (processed event 0
Published entry 1, value 1718075893
elapsed microseconds = 85
Event handled: Value = 1718075893 (processed event 1
Published entry 2, value 1675907645
elapsed microseconds = 32
Event handled: Value = 1675907645 (processed event 2
Published entry 3, value 1563009446
elapsed microseconds = 75
Event handled: Value = 1563009446 (processed event 3
Published entry 4, value 1782914062
elapsed microseconds = 34
Event handled: Value = 1782914062 (processed event 4
Published entry 5, value 1516398244
elapsed microseconds = 50
Event handled: Value = 1516398244 (processed event 5
Published entry 6, value 76829327
elapsed microseconds = 50
Event handled: Value = 76829327 (processed event 6
So it takes about 50 microseconds to pass data from one thread to another. But it is not fast at all! "The current version of the Disruptor can do ~50 ns between threads at a rate of 1 million messages per second." So my results are 1000 times slower than expected.
What's wrong with my example and how do achieve 50 ns speed?
I've modified program above and now receive 1 microsecond delay, which is much better. However, I am still waiting for the response from disruptor
pattern experts. I'm looking for an example which can prove that I can actually pass data in 50 ns.
Also I wrote the same test using BlockingCollection
and received 14 microseconds in average, which proves that Disruptor
is faster:
Using BlockingCollection:
average = 14 minimum = 0 0-5 = 890558, 5-10 = 1773781, 10-30 = 6900128, >30 = 435433
Using Disruptor:
average = 0 minimum = 0 0-5 = 9908469, 5-10 = 64464, 10-30 = 19902, >30 = 7065
BlockingCollection code:
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace DisruptorTest
{
public sealed class ValueEntry
{
public int Value { get; set; }
public ValueEntry()
{
// Console.WriteLine("New ValueEntry created");
}
}
//public class ValueAdditionHandler : IEventHandler<ValueEntry>
//{
// public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
// {
// long microseconds = Program.sw[data.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
// Program.results[data.Value] = microseconds;
// //Console.WriteLine("elapsed microseconds = " + microseconds);
// //Console.WriteLine("Event handled: Value = {0} (processed event {1}", data.Value, sequence);
// }
//}
class Program
{
public const int length = 10000000;
public static Stopwatch[] sw = new Stopwatch[length];
public static long[] results = new long[length];
static BlockingCollection<ValueEntry> dataItems = new BlockingCollection<ValueEntry>(150);
static void Main(string[] args)
{
for (int i = 0; i < length; i++)
{
sw[i] = Stopwatch.StartNew();
}
// A simple blocking consumer with no cancellation.
Task.Factory.StartNew(() =>
{
while (!dataItems.IsCompleted)
{
ValueEntry ve = null;
try
{
ve = dataItems.Take();
long microseconds = sw[ve.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
results[ve.Value] = microseconds;
//Console.WriteLine("elapsed microseconds = " + microseconds);
//Console.WriteLine("Event handled: Value = {0} (processed event {1}", ve.Value, ve.Value);
}
catch (InvalidOperationException) { }
}
}, TaskCreationOptions.LongRunning);
for (int i = 0; i < length; i++)
{
var valueToSet = i;
ValueEntry entry = new ValueEntry();
entry.Value = valueToSet;
sw[i].Restart();
dataItems.Add(entry);
//Console.WriteLine("Published entry {0}, value {1}", valueToSet, entry.Value);
//Thread.Sleep(1000);
}
// Wait until all events are delivered
Thread.Sleep(5000);
long average = 0;
long minimum = 10000000000;
int firstFive = 0;
int fiveToTen = 0;
int tenToThirty = 0;
int moreThenThirty = 0;
// Do not count first 100 items because they could be extremely slow
for (int i = 100; i < length; i++)
{
average += results[i];
if (results[i] < minimum)
{
minimum = results[i];
}
if (results[i] < 5)
{
firstFive++;
}
else if (results[i] < 10)
{
fiveToTen++;
}
else if (results[i] < 30)
{
tenToThirty++;
} else
{
moreThenThirty++;
}
}
average /= (length - 100);
Console.WriteLine("average = {0} minimum = {1} 0-5 = {2}, 5-10 = {3}, 10-30 = {4}, >30 = {5}", average, minimum, firstFive, fiveToTen, tenToThirty, moreThenThirty);
}
}
}
Disruptor code:
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;
namespace DisruptorTest
{
public sealed class ValueEntry
{
public int Value { get; set; }
public ValueEntry()
{
// Console.WriteLine("New ValueEntry created");
}
}
public class ValueAdditionHandler : IEventHandler<ValueEntry>
{
public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
{
long microseconds = Program.sw[data.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
Program.results[data.Value] = microseconds;
//Console.WriteLine("elapsed microseconds = " + microseconds);
//Console.WriteLine("Event handled: Value = {0} (processed event {1}", data.Value, sequence);
}
}
class Program
{
public const int length = 10000000;
public static Stopwatch[] sw = new Stopwatch[length];
public static long[] results = new long[length];
private static readonly Random _random = new Random();
private static readonly int _ringSize = 1024; // Must be multiple of 2
static void Main(string[] args)
{
for (int i = 0; i < length; i++)
{
sw[i] = Stopwatch.StartNew();
}
var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), _ringSize, TaskScheduler.Default);
disruptor.HandleEventsWith(new ValueAdditionHandler());
var ringBuffer = disruptor.Start();
for (int i = 0; i < length; i++)
{
var valueToSet = i;
long sequenceNo = ringBuffer.Next();
ValueEntry entry = ringBuffer[sequenceNo];
entry.Value = valueToSet;
sw[i].Restart();
ringBuffer.Publish(sequenceNo);
//Console.WriteLine("Published entry {0}, value {1}", sequenceNo, entry.Value);
//Thread.Sleep(1000);
}
// wait until all events are delivered
Thread.Sleep(5000);
long average = 0;
long minimum = 10000000000;
int firstFive = 0;
int fiveToTen = 0;
int tenToThirty = 0;
int moreThenThirty = 0;
// Do not count first 100 items because they could be extremely slow
for (int i = 100; i < length; i++)
{
average += results[i];
if (results[i] < minimum)
{
minimum = results[i];
}
if (results[i] < 5)
{
firstFive++;
}
else if (results[i] < 10)
{
fiveToTen++;
}
else if (results[i] < 30)
{
tenToThirty++;
}
else
{
moreThenThirty++;