Intel® MPI Library

Developer Guide for Linux* OS

ID 768728
Date 10/31/2024
Public
Document Table of Contents

thread_split.cpp

*/
#include <mpi.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <vector>
#include <string>
#include <utility>
#include <assert.h>
#include <sys/time.h>
// Choose threading model:
enum { THR_OPENMP = 1, THR_POSIX = 2, THR_NONE = 0 } threading = THR_POSIX;
template <typename T> MPI_Datatype get_mpi_type();
template <> MPI_Datatype get_mpi_type<char>() { return MPI_CHAR; }
template <> MPI_Datatype get_mpi_type<int>() { return MPI_INT; }
template <> MPI_Datatype get_mpi_type<float>() { return MPI_FLOAT; }
template <> MPI_Datatype get_mpi_type<double>() { return MPI_DOUBLE; }
int main_threaded(int argc, char **argv);
template <typename T>
bool work_portion_2(T *in, size_t count, size_t niter, int rank, int nranks)
{
    memset(in, 0, sizeof(T) * count);
}
template <typename T>
bool work_portion_1(T *in, size_t count, size_t niter, int rank, int nranks);
template <> bool work_portion_1<char>(char *in, size_t count, size_t niter, int rank, int nranks) { return true; }
template <> bool work_portion_1<int>(int *in, size_t count, size_t niter, int rank, int nranks)
{ 
    for (size_t i = 0; i < count; i++) {
        in[i] = (int)(niter * (rank+1) * i);
    }
    return true;
}
template <typename T>
bool work_portion_3(T *in, size_t count, size_t niter, int rank, int nranks);
template <> bool work_portion_3<char>(char *out, size_t count, size_t niter, int rank, int nranks) { return true; }
template <> bool work_portion_3<int>(int *out, size_t count, size_t niter, int rank, int nranks)
{
    bool result = true;
    for (size_t i = 0; i < count; i++) {
        result = (result && (out[i] == (int)(niter * nranks*(nranks+1)*i/2)));
    }
    return result;
}
int main_threaded_openmp(int argc, char **argv);
int main_threaded_posix(int argc, char **argv);
int main(int argc, char **argv)
{
    if (argc > 1) {
        if (!strcasecmp(argv[1], "openmp")) threading = THR_OPENMP;
        if (!strcasecmp(argv[1], "posix")) threading = THR_POSIX;
        if (!strcasecmp(argv[1], "none")) threading = THR_NONE;
    }
    if (threading == THR_OPENMP) {
        main_threaded_openmp(argc, argv);
        return 0;
    } else if (threading == THR_POSIX) {
        main_threaded_posix(argc, argv);
        return 0;
    }
    printf("No threading\n");
    int rank, nranks;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &nranks);
    typedef int type;
    size_t count = 1024*1024;
    int niter = 100;
    type *in = (type *)malloc(count * sizeof(type));
    type *out = (type *)malloc(count * sizeof(type));
    for (int j = 1; j < niter+1; j++) {
        work_portion_1<type>(in, count, j, rank, nranks);
        work_portion_2<type>(out, count, j, rank, nranks);
        MPI_Allreduce(in, out, count, get_mpi_type<type>(), MPI_SUM, MPI_COMM_WORLD);
        assert(work_portion_3<type>(out, count, j, rank, nranks));
        MPI_Barrier(MPI_COMM_WORLD);
    }
    MPI_Finalize();
    return 0;
}
#include <omp.h>
void omp_aware_barrier(MPI_Comm &comm, int thread)
{
    assert(thread != 0 || comm != MPI_COMM_NULL);
#pragma omp barrier
    if (thread == 0)
        MPI_Barrier(comm);
#pragma omp barrier
}
struct offset_and_count { size_t offset; size_t count; };
int main_threaded_openmp(int argc, char **argv)
{
    printf("OpenMP\n");
    int rank, nranks, provided = 0;
    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
    assert(provided == MPI_THREAD_MULTIPLE);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &nranks);
    typedef int type;
    size_t count = 1024 * 1024;
    int niter = 100;
    type *in = (type *)malloc(count * sizeof(type));
    type *out = (type *)malloc(count * sizeof(type));
    // Divide workload for multiple threads.
    // Save (offset, count) pair for each piece
    size_t nthreads = 8;
    if (argc > 2) {
        nthreads = atoi(argv[2]);
    }
    size_t nparts = (count > nthreads) ? nthreads : count;
    // Use nparts, it might be less than nthreads
    size_t base = count / nparts;
    size_t rest = count % nparts;
    size_t base_off = 0;
    std::vector<offset_and_count> offs_and_counts(nparts);
    for (size_t i = 0; i < nparts; i++) {
        offs_and_counts[i].offset = base_off; // off
        base_off += (offs_and_counts[i].count = base + (i<rest?1:0)); // size
    }
    // Duplicate a communicator for each thread
    std::vector<MPI_Comm> comms(nparts, MPI_COMM_NULL);
    for (size_t i = 0; i < nparts; i++) {
        MPI_Comm &new_comm = comms[i];
        MPI_Comm_dup(MPI_COMM_WORLD, &new_comm);
    }
    // Go into parallel region, use precalculated (offset, count) pairs to separate workload
    // use separated communicators from comms[]
    // use omp_aware_barrier instead of normal MPI_COMM_WORLD barrier
#pragma omp parallel num_threads(nparts)
    {
        int thread = omp_get_thread_num();
        offset_and_count &offs = offs_and_counts[thread];
        MPI_Comm &comm = comms[thread];
        for (int j = 1; j < niter+1; j++) {
            if (!offs.count) { omp_aware_barrier(comm, thread); continue; }
            work_portion_1<type>(in + offs.offset, offs.count, j, rank, nranks);
            work_portion_2<type>(out + offs.offset, offs.count, j, rank, nranks);
            MPI_Allreduce(in + offs.offset, out + offs.offset, offs.count, get_mpi_type<type>(), MPI_SUM, comm);
            assert(work_portion_3<type>(out + offs.offset, offs.count, j, rank, nranks));
            omp_aware_barrier(comm, thread);
        }
    }
    MPI_Finalize();
    return 0;
}
#include <pthread.h>
#include <sys/time.h>
#include <sched.h>
void pthreads_aware_barrier(MPI_Comm &comm, pthread_barrier_t &barrier, int thread)
{
    assert(thread != 0 || comm != MPI_COMM_NULL);
    pthread_barrier_wait(&barrier);
    if (thread == 0)
        MPI_Barrier(comm);
    pthread_barrier_wait(&barrier);
}
struct global_data {
    typedef int type;
    type *in, *out;
    int niter;
    size_t count;
    int rank, nranks;
    pthread_barrier_t barrier;
};
struct thread_local_data {
    size_t offset;
    size_t count;
    int thread_id;
    MPI_Comm *comm;
    global_data *global;
};
void *worker(void *arg_ptr)
{
    thread_local_data &thr_local = *((thread_local_data *)arg_ptr);
    global_data &global = *(thr_local.global);
    global_data::type *in = global.in;
    global_data::type *out = global.out;
    int &niter = global.niter;
    int &rank = global.rank;
    int &nranks = global.nranks;
    pthread_barrier_t &barrier = global.barrier;
    size_t &offset = thr_local.offset;
    size_t &count = thr_local.count;
    int &thread = thr_local.thread_id;
    MPI_Comm &comm = *(thr_local.comm);
    cpu_set_t mask;
    CPU_ZERO(&mask);
    CPU_SET(thread, &mask);
    int res = sched_setaffinity(0, sizeof(mask), &mask);
    if (res == -1)
        printf("failed set_thread_affinity()\n");
    for (int j = 1; j < global.niter+1; j++) {
        if (!thr_local.count) { pthreads_aware_barrier(comm, barrier, thread); continue; }
        work_portion_1<global_data::type>(in + offset, count, j, rank, nranks);
        work_portion_2<global_data::type>(out + offset, count, j, rank, nranks);
        MPI_Allreduce(in + offset, out + offset, count, get_mpi_type<global_data::type>(), MPI_SUM, comm);
        assert(work_portion_3<global_data::type>(out + offset, count, j, rank, nranks));
        pthreads_aware_barrier(comm, barrier, thread);
    }
}
int main_threaded_posix(int argc, char **argv)
{
    printf("POSIX\n");
    int provided = 0;
    global_data global;
    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
    assert(provided == MPI_THREAD_MULTIPLE);
    MPI_Comm_rank(MPI_COMM_WORLD, &global.rank);
    MPI_Comm_size(MPI_COMM_WORLD, &global.nranks);
    global.count = 1024 * 1024;
    global.niter = 100;
    global.in = (global_data::type *)malloc(global.count * sizeof(global_data::type));
    global.out = (global_data::type *)malloc(global.count * sizeof(global_data::type));
    // Divide workload for multiple threads.
    // Save (offset, count) pair for each piece
    size_t nthreads = 8;
    if (argc > 2) {
        nthreads = atoi(argv[2]);
    }
    size_t nparts = ((global.count > nthreads) ? nthreads : global.count);
    pthread_barrier_init(&global.barrier, NULL, nparts);
    // Use nparts, it might be less than nthreads
    size_t base = global.count / nparts;
    size_t rest = global.count % nparts;
    size_t base_off = 0;
    std::vector<thread_local_data> thr_local(nparts);
    for (size_t i = 0; i < nparts; i++) {
        thr_local[i].offset = base_off; // off
        base_off += (thr_local[i].count = base + (i<rest?1:0)); // size
        thr_local[i].thread_id = i;
    }
    // Duplicate a communicator for each thread
    std::vector<MPI_Comm> comms(nparts);
    MPI_Info info;
    MPI_Info_create(&info);
    char s[16];
    for (size_t i = 0; i < nparts; i++) {
        MPI_Comm &new_comm = comms[i];
        MPI_Comm_dup(MPI_COMM_WORLD, &new_comm);
        snprintf(s, sizeof s, "%d", i);
        MPI_Info_set(info, "thread_id", s);
        MPI_Comm_set_info(new_comm, info);
        thr_local[i].comm = &new_comm;
        thr_local[i].global = &global;
    }
    // Start parallel POSIX threads
    std::vector<pthread_t> pids(nparts);
    for (size_t i = 0; i < nparts; i++) {
        pthread_create(&pids[i], NULL, worker, (void *)&thr_local[i]);
    }
    // Wait for all POSIX threads to complete
    for (size_t i = 0; i < nparts; i++) {
        pthread_join(pids[i], NULL);
    }
    MPI_Info_free(&info);
    MPI_Finalize();
    return 0;
}