thread_split.cpp
*/ #include <mpi.h> #include <stdlib.h> #include <string.h> #include <stdio.h> #include <vector> #include <string> #include <utility> #include <assert.h> #include <sys/time.h> // Choose threading model: enum { THR_OPENMP = 1, THR_POSIX = 2, THR_NONE = 0 } threading = THR_POSIX; template <typename T> MPI_Datatype get_mpi_type(); template <> MPI_Datatype get_mpi_type<char>() { return MPI_CHAR; } template <> MPI_Datatype get_mpi_type<int>() { return MPI_INT; } template <> MPI_Datatype get_mpi_type<float>() { return MPI_FLOAT; } template <> MPI_Datatype get_mpi_type<double>() { return MPI_DOUBLE; } int main_threaded(int argc, char **argv); template <typename T> bool work_portion_2(T *in, size_t count, size_t niter, int rank, int nranks) { memset(in, 0, sizeof(T) * count); } template <typename T> bool work_portion_1(T *in, size_t count, size_t niter, int rank, int nranks); template <> bool work_portion_1<char>(char *in, size_t count, size_t niter, int rank, int nranks) { return true; } template <> bool work_portion_1<int>(int *in, size_t count, size_t niter, int rank, int nranks) { for (size_t i = 0; i < count; i++) { in[i] = (int)(niter * (rank+1) * i); } return true; } template <typename T> bool work_portion_3(T *in, size_t count, size_t niter, int rank, int nranks); template <> bool work_portion_3<char>(char *out, size_t count, size_t niter, int rank, int nranks) { return true; } template <> bool work_portion_3<int>(int *out, size_t count, size_t niter, int rank, int nranks) { bool result = true; for (size_t i = 0; i < count; i++) { result = (result && (out[i] == (int)(niter * nranks*(nranks+1)*i/2))); } return result; } int main_threaded_openmp(int argc, char **argv); int main_threaded_posix(int argc, char **argv); int main(int argc, char **argv) { if (argc > 1) { if (!strcasecmp(argv[1], "openmp")) threading = THR_OPENMP; if (!strcasecmp(argv[1], "posix")) threading = THR_POSIX; if (!strcasecmp(argv[1], "none")) threading = THR_NONE; } if (threading == THR_OPENMP) { main_threaded_openmp(argc, argv); return 0; } else if (threading == THR_POSIX) { main_threaded_posix(argc, argv); return 0; } printf("No threading\n"); int rank, nranks; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &nranks); typedef int type; size_t count = 1024*1024; int niter = 100; type *in = (type *)malloc(count * sizeof(type)); type *out = (type *)malloc(count * sizeof(type)); for (int j = 1; j < niter+1; j++) { work_portion_1<type>(in, count, j, rank, nranks); work_portion_2<type>(out, count, j, rank, nranks); MPI_Allreduce(in, out, count, get_mpi_type<type>(), MPI_SUM, MPI_COMM_WORLD); assert(work_portion_3<type>(out, count, j, rank, nranks)); MPI_Barrier(MPI_COMM_WORLD); } MPI_Finalize(); return 0; } #include <omp.h> void omp_aware_barrier(MPI_Comm &comm, int thread) { assert(thread != 0 || comm != MPI_COMM_NULL); #pragma omp barrier if (thread == 0) MPI_Barrier(comm); #pragma omp barrier } struct offset_and_count { size_t offset; size_t count; }; int main_threaded_openmp(int argc, char **argv) { printf("OpenMP\n"); int rank, nranks, provided = 0; MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); assert(provided == MPI_THREAD_MULTIPLE); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &nranks); typedef int type; size_t count = 1024 * 1024; int niter = 100; type *in = (type *)malloc(count * sizeof(type)); type *out = (type *)malloc(count * sizeof(type)); // Divide workload for multiple threads. // Save (offset, count) pair for each piece size_t nthreads = 8; if (argc > 2) { nthreads = atoi(argv[2]); } size_t nparts = (count > nthreads) ? nthreads : count; // Use nparts, it might be less than nthreads size_t base = count / nparts; size_t rest = count % nparts; size_t base_off = 0; std::vector<offset_and_count> offs_and_counts(nparts); for (size_t i = 0; i < nparts; i++) { offs_and_counts[i].offset = base_off; // off base_off += (offs_and_counts[i].count = base + (i<rest?1:0)); // size } // Duplicate a communicator for each thread std::vector<MPI_Comm> comms(nparts, MPI_COMM_NULL); for (size_t i = 0; i < nparts; i++) { MPI_Comm &new_comm = comms[i]; MPI_Comm_dup(MPI_COMM_WORLD, &new_comm); } // Go into parallel region, use precalculated (offset, count) pairs to separate workload // use separated communicators from comms[] // use omp_aware_barrier instead of normal MPI_COMM_WORLD barrier #pragma omp parallel num_threads(nparts) { int thread = omp_get_thread_num(); offset_and_count &offs = offs_and_counts[thread]; MPI_Comm &comm = comms[thread]; for (int j = 1; j < niter+1; j++) { if (!offs.count) { omp_aware_barrier(comm, thread); continue; } work_portion_1<type>(in + offs.offset, offs.count, j, rank, nranks); work_portion_2<type>(out + offs.offset, offs.count, j, rank, nranks); MPI_Allreduce(in + offs.offset, out + offs.offset, offs.count, get_mpi_type<type>(), MPI_SUM, comm); assert(work_portion_3<type>(out + offs.offset, offs.count, j, rank, nranks)); omp_aware_barrier(comm, thread); } } MPI_Finalize(); return 0; } #include <pthread.h> #include <sys/time.h> #include <sched.h> void pthreads_aware_barrier(MPI_Comm &comm, pthread_barrier_t &barrier, int thread) { assert(thread != 0 || comm != MPI_COMM_NULL); pthread_barrier_wait(&barrier); if (thread == 0) MPI_Barrier(comm); pthread_barrier_wait(&barrier); } struct global_data { typedef int type; type *in, *out; int niter; size_t count; int rank, nranks; pthread_barrier_t barrier; }; struct thread_local_data { size_t offset; size_t count; int thread_id; MPI_Comm *comm; global_data *global; }; void *worker(void *arg_ptr) { thread_local_data &thr_local = *((thread_local_data *)arg_ptr); global_data &global = *(thr_local.global); global_data::type *in = global.in; global_data::type *out = global.out; int &niter = global.niter; int &rank = global.rank; int &nranks = global.nranks; pthread_barrier_t &barrier = global.barrier; size_t &offset = thr_local.offset; size_t &count = thr_local.count; int &thread = thr_local.thread_id; MPI_Comm &comm = *(thr_local.comm); cpu_set_t mask; CPU_ZERO(&mask); CPU_SET(thread, &mask); int res = sched_setaffinity(0, sizeof(mask), &mask); if (res == -1) printf("failed set_thread_affinity()\n"); for (int j = 1; j < global.niter+1; j++) { if (!thr_local.count) { pthreads_aware_barrier(comm, barrier, thread); continue; } work_portion_1<global_data::type>(in + offset, count, j, rank, nranks); work_portion_2<global_data::type>(out + offset, count, j, rank, nranks); MPI_Allreduce(in + offset, out + offset, count, get_mpi_type<global_data::type>(), MPI_SUM, comm); assert(work_portion_3<global_data::type>(out + offset, count, j, rank, nranks)); pthreads_aware_barrier(comm, barrier, thread); } } int main_threaded_posix(int argc, char **argv) { printf("POSIX\n"); int provided = 0; global_data global; MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); assert(provided == MPI_THREAD_MULTIPLE); MPI_Comm_rank(MPI_COMM_WORLD, &global.rank); MPI_Comm_size(MPI_COMM_WORLD, &global.nranks); global.count = 1024 * 1024; global.niter = 100; global.in = (global_data::type *)malloc(global.count * sizeof(global_data::type)); global.out = (global_data::type *)malloc(global.count * sizeof(global_data::type)); // Divide workload for multiple threads. // Save (offset, count) pair for each piece size_t nthreads = 8; if (argc > 2) { nthreads = atoi(argv[2]); } size_t nparts = ((global.count > nthreads) ? nthreads : global.count); pthread_barrier_init(&global.barrier, NULL, nparts); // Use nparts, it might be less than nthreads size_t base = global.count / nparts; size_t rest = global.count % nparts; size_t base_off = 0; std::vector<thread_local_data> thr_local(nparts); for (size_t i = 0; i < nparts; i++) { thr_local[i].offset = base_off; // off base_off += (thr_local[i].count = base + (i<rest?1:0)); // size thr_local[i].thread_id = i; } // Duplicate a communicator for each thread std::vector<MPI_Comm> comms(nparts); MPI_Info info; MPI_Info_create(&info); char s[16]; for (size_t i = 0; i < nparts; i++) { MPI_Comm &new_comm = comms[i]; MPI_Comm_dup(MPI_COMM_WORLD, &new_comm); snprintf(s, sizeof s, "%d", i); MPI_Info_set(info, "thread_id", s); MPI_Comm_set_info(new_comm, info); thr_local[i].comm = &new_comm; thr_local[i].global = &global; } // Start parallel POSIX threads std::vector<pthread_t> pids(nparts); for (size_t i = 0; i < nparts; i++) { pthread_create(&pids[i], NULL, worker, (void *)&thr_local[i]); } // Wait for all POSIX threads to complete for (size_t i = 0; i < nparts; i++) { pthread_join(pids[i], NULL); } MPI_Info_free(&info); MPI_Finalize(); return 0; }
See Also
Parent topic: Examples