Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
112 views
in Technique[技术] by (71.8m points)

c++ - Creating a counter that stays synchronized across MPI processes

I have quite a bit of experience using the basic comm and group MPI2 methods, and do quite a bit of embarrassingly parallel simulation work using MPI. Up until now, I have structured my code to have a dispatch node, and a bunch of worker nodes. The dispatch node has a list of parameter files that will be run with the simulator. It seeds each worker node with a parameter file. The worker nodes run their simulation, then request another parameter file, which the dispatch node provides. Once all parameter files have been run, the dispatch node shuts down each worker node, before shutting itself down.

The parameter files are typically named "Par_N.txt" where N is the identifying integer (e.g.- N = 1-1000). So I was thinking, if I could create a counter, and could have this counter synchronized across all of my nodes, I could eliminate the need to have a dispatch node, and make the system a bit more simple. As simple as this sounds in theory, in practice I suspect it is a bit more difficult, as I'd need to ensure the counter is locked while being changed, etc.. And thought there might be a built-in way for MPI to handle this. Any thoughts? Am I over thinking this?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Implementing a shared counter isn't trivial, but once you do it and have it in a library somewhere you can do a lot with it.

In the Using MPI-2 book, which you should have to hand if you're going to implement this stuff, one of the examples (the code is available online) is a shared counter. The "non-scalable" one should work well out to several dozens of processes -- the counter is an array of 0..size-1 of integers, one per rank, and then the `get next work item #' operation consists of locking the window, reading everyone elses' contribution to the counter (in this case, how many items they've taken), updating your own (++), closing the window, and calculating the total. This is all done with passive one-sided operations. (The better-scaling one just uses a tree rather than a 1-d array).

So the use would be you have say rank 0 host the counter, and everyone keeps doing work units and updating the counter to get the next one until there's no more work; then you wait at a barrier or something and finalize.

Once you have something like this - using a shared value to get the next work unit available - working, then you can generalize to more sophisticated approach. So as suzterpatt suggested, everyone taking "their share" of work units at the start works great, but what to do if some finish faster than others? The usual answer now is work-stealing; everyone keeps their list of work units in a dequeue, and then when one runs out of work, it steals work units from the other end of someone elses dequeue, until there's no more work left. This is really the completely-distributed version of master-worker, where there's no more single master partitioning work. Once you have a single shared counter working, you can make mutexes from those, and from that you can implement the dequeue. But if the simple shared-counter works well enough, you may not need to go there.

Update: Ok, so here's a hacky-attempt at doing the shared counter - my version of the simple one in the MPI-2 book: seems to work, but I wouldn't say anything much stronger than that (haven't played with this stuff for a long time). There's a simple counter implementation (corresponding to the non-scaling version in the MPI-2 book) with two simple tests, one corresponding roughly to your work case; each item updates the counter to get a work item, then does the "work" (sleeps for random amount of time). At the end of each test, the counter data structure is printed out, which is the # of increments each rank has done.

#include <mpi.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>

struct mpi_counter_t {
    MPI_Win win;
    int  hostrank ;
    int  myval;
    int *data;
    int rank, size;
};

struct mpi_counter_t *create_counter(int hostrank) {
    struct mpi_counter_t *count;

    count = (struct mpi_counter_t *)malloc(sizeof(struct mpi_counter_t));
    count->hostrank = hostrank;
    MPI_Comm_rank(MPI_COMM_WORLD, &(count->rank));
    MPI_Comm_size(MPI_COMM_WORLD, &(count->size));

    if (count->rank == hostrank) {
        MPI_Alloc_mem(count->size * sizeof(int), MPI_INFO_NULL, &(count->data));
        for (int i=0; i<count->size; i++) count->data[i] = 0;
        MPI_Win_create(count->data, count->size * sizeof(int), sizeof(int),
                       MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
    } else {
        count->data = NULL;
        MPI_Win_create(count->data, 0, 1,
                       MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
    }
    count -> myval = 0;

    return count;
}

int increment_counter(struct mpi_counter_t *count, int increment) {
    int *vals = (int *)malloc( count->size * sizeof(int) );
    int val;

    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win);

    for (int i=0; i<count->size; i++) {

        if (i == count->rank) {
            MPI_Accumulate(&increment, 1, MPI_INT, 0, i, 1, MPI_INT, MPI_SUM,
                           count->win);
        } else {
            MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
        }
    }

    MPI_Win_unlock(0, count->win);
    count->myval += increment;

    vals[count->rank] = count->myval;
    val = 0;
    for (int i=0; i<count->size; i++)
        val += vals[i];

    free(vals);
    return val;
}

void delete_counter(struct mpi_counter_t **count) {
    if ((*count)->rank == (*count)->hostrank) {
        MPI_Free_mem((*count)->data);
    }
    MPI_Win_free(&((*count)->win));
    free((*count));
    *count = NULL;

    return;
}

void print_counter(struct mpi_counter_t *count) {
    if (count->rank == count->hostrank) {
        for (int i=0; i<count->size; i++) {
            printf("%2d ", count->data[i]);
        }
        puts("");
    }
}

int test1() {
    struct mpi_counter_t *c;
    int rank;
    int result;

    c = create_counter(0);

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    result = increment_counter(c, 1);
    printf("%d got counter %d
", rank, result);

    MPI_Barrier(MPI_COMM_WORLD);
    print_counter(c);
    delete_counter(&c);
}


int test2() {
    const int WORKITEMS=50;

    struct mpi_counter_t *c;
    int rank;
    int result = 0;

    c = create_counter(0);

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    srandom(rank);

    while (result < WORKITEMS) {
        result = increment_counter(c, 1);
        if (result <= WORKITEMS) {
             printf("%d working on item %d...
", rank, result);
             sleep(random() % 10);
         } else {
             printf("%d done
", rank);
         }
    }

    MPI_Barrier(MPI_COMM_WORLD);
    print_counter(c);
    delete_counter(&c);
}

int main(int argc, char **argv) {

    MPI_Init(&argc, &argv);

    test1();
    test2();

    MPI_Finalize();
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

1.4m articles

1.4m replys

5 comments

56.9k users

...