EDIT: On 09/15/2013 - I am describing my scenario further broken into steps to help everybody understand my situation better. Added the source for whole application for download too. If you want to jump to the original question, scroll down to the last heading. Please let me know the questions. Thanks
Summary
Alaska state Capital Juneau has a AST (Alaska State Trooper) headquarters building where they would like to show a large screen with a single number displayed and updated automatically. That number is called (Crime Quotient Index) or CQI
CQI is basically a calculated number to show the current crime situation of the state...
How it is calculated?
The program that runs the screen is a .NET WPF Application that is constantly receiving CrimeReport objects through a Hot IObservable stream.
CQI is calculated per city and then a Sum() for all cities is taken which is called States CQI
Here are the steps of calculating the State CQI
Step 1 - Receive crime data
CrimeReport is sent to the .NET Application each time a crime is reported. It has the following components
DateTime of the crime
City - City/County of Jurisdiction
SeverityLevel - Serious/NonSerious
EstimatedSolveTime - Estimated number of days AST determines it would take to solve the crime.
So in this step, we subscribe to the IObservable and create instance of MainViewModel
IObservable<CrimeReport> reportSource = mainSource.Publish();
MainVM = new MainViewModel(reportSource);
reportSource.Connect();
Step 2 - Group by city and do maths per city
As you receive the Report, group it by city so
var cities = reportSource.GroupBy(k => k.City)
.Select(g => new CityDto(g.Key, g);
CityDto is a DTO class that takes all its report for current city and calculates the City's CQI.
The calculation of City's CQI is done by the following formula
if Ratio of Total number serious crimes to Total number of Non serious crimes is less than 1
then
City's CQI = Ratio x Minimum of Estimated Solve time
else
City's CQI = Ratio x Maximum Estimated Solve time
Here is class definition of CityDto
internal class CityDto
{
public string CityName { get; set; }
public IObservable<decimal> CityCqi {get; set;}
public CityDto(string cityName, IObservable<CrimeReport> cityReports)
{
CityName = cityName;
// Get all serious and non serious crimes
//
var totalSeriousCrimes = cityReports.Where(c => c.Severity == CrimeSeverity.Serious)
.Scan(0, (p, _) => p++);
var totalnonSeriousCrimes = cityReports.Where(c => c.Severity == CrimeSeverity.NonSerious)
.Scan(0, (p, _) => p++);
// Get the ratio
//
var ratio = Observable.CombineLatest(totalSeriousCrimes, totalnonSeriousCrimes,
(s, n) => n == 0? s : s/n); // Avoding DivideByZero here
// Get the minimum and maximum estimated solve time
//
var minEstimatedSolveTime = cityReports.Select(c => c.EstimatedSolveTime)
.Scan(5000, (p, n) => n < p? n : p);
var maxEstimatedSolveTime = cityReports.Select(c=>c.EstimatedSolveTime)
.Scan(0, (p, n) => n > p? n : p);
//Time for the City's CQI
//
CityCqi = Observable.CombineLatest(ratio, minEstimatedSolveTime, maxEstimatedSolveTime, (r, n, x) => r < 1.0? r * n : r * m);
}
}
Now that we have City DTO objects maintaining City's CQI values and exposing that live CQI through an IObservable, Alaska State's Capital would like to Sum() up all the Cities' CQI to show it as Alaska's CQI and show it live on the screen and Each crime reported anywhere in the City/County participating in CQI program should have an immediate effect on the State's CQI
Step 3 - Roll up cities' data for state
Now we have to calculate the whole state's CQI which is live updating on the big screen, we have State's view model called MainViewModel
internal class MainViewModel
{
public MainViewModel(IObservable<CrimeReport> mainReport)
{
/// Here is the snippet also mentioned in Step 2
//
var cities = mainReport.GroupBy(k => k.City)
.Select(g => new CityDto(g.Key, g));
///// T h i s ///// Is //// Where //// I /// am /// Stuck
//
var allCqis = cities.Select(c => c.CityCqi); // gives you IObservable<IObservable<decimal>> ,
/// Need to use latest of each observable in allCqi and sum them up
//// How do I do it ?
}
}
Constraints
- Not all cities in Alaska currently participate in state's CQI program, but cities are enrolling day by day so I cannot have List and adding all the cities regardless of enrollment is not practical either. So it is IObservable which maintains only those cities not only participating but also have sent at least one CrimeReport object.
Full Source code
Source can be downloaded by clicking Here
Originally asked question
I have a single hot observable of multiple hot observables...
IObservable<IObservable<decimal>>
I would like an observable that when subscribed will keep its observer informed of the sum of all "Latest" decimal numbers from all observables inside.
How can I achieve that ? I tried CombineLatest(...) but could not get it right.
Thanks
See Question&Answers more detail:
os