Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
717 views
in Technique[技术] by (71.8m points)

r - Geographical distance by group - Applying a function on each pair of rows

I want to calculate the average geographical distance between a number of houses per province.

Suppose I have the following data.

df1 <- data.frame(province = c(1, 1, 1, 2, 2, 2),
              house = c(1, 2, 3, 4, 5, 6),
              lat = c(-76.6, -76.5, -76.4, -75.4, -80.9, -85.7), 
              lon = c(39.2, 39.1, 39.3, 60.8, 53.3, 40.2))

Using the geosphere library I can find the distance between two houses. For instance:

library(geosphere)
distm(c(df1$lon[1], df1$lat[1]), c(df1$lon[2], df1$lat[2]), fun = distHaversine)

#11429.1

How do I calculate the distance between all the houses in the province and gather the mean distance per province?

The original data-set has millions of observations per province, so performance is an issue here, too.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

My initial idea was to look at the source code of distHaversine and replicate it in a function that I would use with proxy. That would work like this (note that lon is expected to be the first column):

library(geosphere)
library(dplyr)
library(proxy)

df1 <- data.frame(province = as.integer(c(1, 1, 1, 2, 2, 2)),
                  house = as.integer(c(1, 2, 3, 4, 5, 6)),
                  lat = c(-76.6, -76.5, -76.4, -75.4, -80.9, -85.7), 
                  lon = c(39.2, 39.1, 39.3, 60.8, 53.3, 40.2))

custom_haversine <- function(x, y) {
  toRad <- pi / 180

  diff <- (y - x) * toRad
  dLon <- diff[1L]
  dLat <- diff[2L]

  a <- sin(dLat / 2) ^ 2 + cos(x[2L] * toRad) * cos(y[2L] * toRad) * sin(dLon / 2) ^ 2
  a <- min(a, 1)
  # return
  2 * atan2(sqrt(a), sqrt(1 - a)) * 6378137
}

pr_DB$set_entry(FUN=custom_haversine, names="haversine", loop=TRUE, distance=TRUE)

average_dist <- df1 %>%
  select(-house) %>%
  group_by(province) %>%
  group_map(~ data.frame(avg=mean(proxy::dist(.x[ , c("lon", "lat")], method="haversine"))))

However, if you're expecting millions of rows per province, proxy probably won't be able to allocate the intermediate (lower triangular of the) matrices. So I ported the code to C++ and added multi-threading as a bonus:

EDIT: turns out the s2d helper was far from optimal, this version now uses the formulas given here.

EDIT2: I just found out about RcppThread, and it can be used to detect user interrupt.

// [[Rcpp::plugins(cpp11)]]
// [[Rcpp::depends(RcppParallel,RcppThread)]]

#include <cstddef> // size_t
#include <math.h> // sin, cos, sqrt, atan2, pow
#include <vector>

#include <RcppThread.h>
#include <Rcpp.h>
#include <RcppParallel.h>

using namespace std;
using namespace Rcpp;
using namespace RcppParallel;

// single to double indices for lower triangular of matrices without diagonal
void s2d(const size_t id, const size_t nrow, size_t& i, size_t& j) {
  j = nrow - 2 - static_cast<size_t>(sqrt(-8 * id + 4 * nrow * (nrow - 1) - 7) / 2 - 0.5);
  i = id + j + 1 - nrow * (nrow - 1) / 2 + (nrow - j) * ((nrow - j) - 1) / 2;
}

class HaversineCalculator : public Worker
{
public:
  HaversineCalculator(const NumericVector& lon,
                      const NumericVector& lat,
                      double& avg,
                      const int n)
    : lon_(lon)
    , lat_(lat)
    , avg_(avg)
    , n_(n)
    , cos_lat_(lon.length())
  {
    // terms for distance calculation
    for (size_t i = 0; i < cos_lat_.size(); i++) {
      cos_lat_[i] = cos(lat_[i] * 3.1415926535897 / 180);
    }
  }

  void operator()(size_t begin, size_t end) {
    // for Kahan summation
    double sum = 0;
    double c = 0;

    double to_rad = 3.1415926535897 / 180;

    size_t i, j;
    for (size_t ind = begin; ind < end; ind++) {
      if (RcppThread::isInterrupted(ind % static_cast<int>(1e5) == 0)) return;

      s2d(ind, lon_.length(), i, j);

      // haversine distance
      double d_lon = (lon_[j] - lon_[i]) * to_rad;
      double d_lat = (lat_[j] - lat_[i]) * to_rad;
      double d_hav = pow(sin(d_lat / 2), 2) + cos_lat_[i] * cos_lat_[j] * pow(sin(d_lon / 2), 2);
      if (d_hav > 1) d_hav = 1;
      d_hav = 2 * atan2(sqrt(d_hav), sqrt(1 - d_hav)) * 6378137;

      // the average part
      d_hav /= n_;

      // Kahan sum step
      double y = d_hav - c;
      double t = sum + y;
      c = (t - sum) - y;
      sum = t;
    }

    mutex_.lock();
    avg_ += sum;
    mutex_.unlock();
  }

private:
  const RVector<double> lon_;
  const RVector<double> lat_;
  double& avg_;
  const int n_;
  tthread::mutex mutex_;
  vector<double> cos_lat_;
};

// [[Rcpp::export]]
double avg_haversine(const DataFrame& input, const int nthreads) {
  NumericVector lon = input["lon"];
  NumericVector lat = input["lat"];

  double avg = 0;
  int size = lon.length() * (lon.length() - 1) / 2;
  HaversineCalculator hc(lon, lat, avg, size);

  int grain = size / nthreads / 10;
  RcppParallel::parallelFor(0, size, hc, grain);
  RcppThread::checkUserInterrupt();

  return avg;
}

This code won't allocate any intermediate matrix, it will simply calculate the distance for each pair of what would be the lower triangular and accumulate the values for an average in the end. See here for the Kahan summation part.

If you save that code in, say, haversine.cpp, then you can do the following:

library(dplyr)
library(Rcpp)
library(RcppParallel)
library(RcppThread)

sourceCpp("haversine.cpp")

df1 %>%
  group_by(province) %>%
  group_map(~ data.frame(avg=avg_haversine(.x, parallel::detectCores())))
# A tibble: 2 x 2
# Groups:   province [2]
  province     avg
     <int>   <dbl>
1        1  15379.
2        2 793612.

Here's a sanity check too:

pr_DB$set_entry(FUN=geosphere::distHaversine, names="distHaversine", loop=TRUE, distance=TRUE)

df1 %>%
  select(-house) %>%
  group_by(province) %>%
  group_map(~ data.frame(avg=mean(proxy::dist(.x[ , c("lon", "lat")], method="distHaversine"))))

A word of caution though:

df <- data.frame(lon=runif(1e3, -90, 90), lat=runif(1e3, -90, 90))

system.time(proxy::dist(df, method="distHaversine"))
   user  system elapsed 
 34.353   0.005  34.394

system.time(proxy::dist(df, method="haversine"))
   user  system elapsed 
  0.789   0.020   0.809

system.time(avg_haversine(df, 4L))
   user  system elapsed 
  0.054   0.000   0.014

df <- data.frame(lon=runif(1e5, -90, 90), lat=runif(1e5, -90, 90))

system.time(avg_haversine(df, 4L))
   user  system elapsed 
 73.861   0.238  19.670

You'll probably have to wait quite a while if you have millions of rows...

I should also mention that it's not possible to detect user interrupt inside the threads created through RcppParallel, so if you start the calculation you should either wait until it finishes, or restart R/RStudio entirely. See EDIT2 above.


Regarding complexity

Depending on your actual data and how many cores your computer has, you may very well end up waiting days for the calculation to finish. This problem has quadratic complexity (per province, so to speak). This line:

int size = lon.length() * (lon.length() - 1) / 2;

signifies the amount of (haversine) distance calculations that must be performed. So if the number of rows increases by a factor of n, the number of calculations increases by a factor of n^2 / 2, roughly speaking.

There is no way to optimize this; you can't calculate the average of N numbers without actually computing each number first, and you'll have a hard time finding something faster than multi-threaded C++ code, so you'll either have to wait it out, or throw more cores at the problem, either with a single machine or with many machines working together. Otherwise you can't solve this problem.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...