00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include <iostream>
00037 #include <mpi.h>
00038
00039 #include "MPICommunicator.h"
00040
00041 const KokoTag KOKO_ANY_TAG = MPI_ANY_TAG;
00042 const KokoProcID KOKO_ANY_ID = MPI_ANY_SOURCE;
00043 const KokoMesgHandle KOKO_NULL_HANDLE = 0;
00044
00045 class AsyncMesgHandle {
00046 public:
00047 KokoBuffer *_kbuf;
00048
00049
00050 #ifdef WIN32
00051 unsigned int _handle;
00052 #else
00053 void *_handle;
00054 #endif
00055 };
00056
00057 using namespace std;
00058
00059
00060
00061
00062
00063
00064
00065 MPICommunicator::MPICommunicator(void)
00066 {
00067 int flag;
00068
00069
00070 MPI_Init(NULL, NULL);
00071 MPI_Comm_size(MPI_COMM_WORLD, &_nprocs);
00072 MPI_Comm_rank(MPI_COMM_WORLD, &_id);
00073
00074
00075 MPI_Attr_get(MPI_COMM_WORLD,MPI_TAG_UB, &_maxtag, &flag);
00076 if (flag == false)
00077 cerr << "MPICommunicator::MPICommunicator() : Warning! Unable to get "
00078 << "maximum tag value.\n";
00079 _handlecount = 0;
00080 }
00081
00082
00083
00084
00085
00086
00087
00088
00089 MPICommunicator::~MPICommunicator(void)
00090 {
00091 MPI_Finalize();
00092
00093
00094 AMHIterator first = _handlemap.begin();
00095 AMHIterator last = _handlemap.end();
00096 while(first != last) {
00097 delete (*first).second;
00098 first++;
00099 }
00100 _handlemap.clear();
00101 }
00102
00103
00104
00105
00106
00107
00108
00109
00110 bool MPICommunicator::Barrier(void)
00111 {
00112 int err = MPI_Barrier(MPI_COMM_WORLD);
00113
00114 if (err != MPI_SUCCESS) {
00115 cerr << "MPICommunicator::Barrier() : Error on barrier.\n";
00116 ErrorMesg(err);
00117 return false;
00118 } else {
00119 return true;
00120 }
00121 }
00122
00123
00124
00125
00126
00127
00128
00129
00130 bool MPICommunicator::Send(KokoProcID id, KokoBuffer &buf, KokoTag tag)
00131 {
00132 if (tag > _maxtag) {
00133 cerr << "MPICommunicator::Send() : KokoTag value execeeds upper bound.\n";
00134 cerr << "\tKokoTags must be <= " << _maxtag << endl;
00135 return false;
00136 }
00137
00138 if (buf.Locked()) {
00139 cerr << "MPICommunicator::Send() : KokoBuffer is locked.\n";
00140 return false;
00141 }
00142
00143 int err = MPI_Send(buf, buf.Size(), MPI_UNSIGNED_CHAR, id, tag,
00144 MPI_COMM_WORLD);
00145
00146 if (err != MPI_SUCCESS) {
00147 cerr << "MPICommunicator::Send() : Error sending message.\n";
00148 ErrorMesg(err);
00149 return false;
00150 } else {
00151 return true;
00152 }
00153 }
00154
00155
00156
00157
00158
00159
00160
00161
00162 int MPICommunicator::Recv(KokoProcID id, KokoBuffer &buf, KokoTag tag)
00163 {
00164 if (tag > _maxtag) {
00165 cerr << "MPICommunicator::Recv() : KokoTag value exceeds upper bounds.\n";
00166 cerr << "\tKokoTags must be <= " << _maxtag << endl;
00167 return 0;
00168 }
00169
00170 if (buf.Locked()) {
00171 cerr << "MPICommunicator::Recv() : KokoBuffer is locked.\n";
00172 return false;
00173 }
00174
00175 MPI_Status status;
00176
00177 int err = MPI_Recv(buf, buf.TotalSize(), MPI_UNSIGNED_CHAR, id, tag,
00178 MPI_COMM_WORLD, &status);
00179
00180 if (err != MPI_SUCCESS) {
00181 cerr << "MPICommunicator::Recv() : Error receiving message.\n";
00182 ErrorMesg(err);
00183 return 0;
00184 } else {
00185 MPI_Get_count(&status, MPI_UNSIGNED_CHAR, &_recv_bytes);
00186 _recv_tag = status.MPI_TAG;
00187 _recv_src = status.MPI_SOURCE;
00188 buf.SetCurSize(_recv_bytes);
00189 return _recv_bytes;
00190 }
00191 }
00192
00193
00194
00195
00196
00197
00198
00199
00200 int MPICommunicator::SendRecv(int dest, KokoBuffer &sendbuf, KokoTag stag,
00201 int src, KokoBuffer &recvbuf, KokoTag rtag)
00202 {
00203 if (stag > _maxtag || rtag > _maxtag) {
00204 cerr << "MPICommunicator::SendRecv() : Warning, send or recveive tag "
00205 << "exceeds upper bound.\n";
00206 cerr << "\tKokoTags must be <= " << _maxtag << endl;
00207 return 0;
00208 }
00209
00210 if (sendbuf.Locked()) {
00211 cerr << "MPICommunicator::SendRecv() : send KokoBuffer is locked.\n";
00212 return false;
00213 }
00214
00215 if (recvbuf.Locked()) {
00216 cerr << "MPICommunicator::SendRecv() : send KokoBuffer is locked.\n";
00217 return false;
00218 }
00219
00220 MPI_Status status;
00221 int err = MPI_Sendrecv(sendbuf, sendbuf.Size(), MPI_UNSIGNED_CHAR, dest,
00222 stag, recvbuf, recvbuf.Size(), MPI_UNSIGNED_CHAR,
00223 src, rtag, MPI_COMM_WORLD, &status);
00224
00225 if (err != MPI_SUCCESS) {
00226 cerr << "MPICommunicator::SendRecv() : Error sending/receiving message.\n";
00227 ErrorMesg(err);
00228 return 0;
00229 } else {
00230 MPI_Get_count(&status, MPI_UNSIGNED_CHAR, &_recv_bytes);
00231 _recv_tag = status.MPI_TAG;
00232 _recv_src = status.MPI_SOURCE;
00233 recvbuf.SetCurSize(_recv_bytes);
00234 return _recv_bytes;
00235 }
00236 }
00237
00238
00239
00240
00241
00242
00243
00244 KokoMesgHandle MPICommunicator::ASend(KokoProcID id, KokoBuffer &buf,
00245 KokoTag tag)
00246 {
00247 if (tag > _maxtag) {
00248 cerr << "MPICommunicator::ASend() : Warning, tag exceeds upper bound.\n";
00249 cerr << "\tKokoTags must be <= " << _maxtag << endl;
00250 return KOKO_NULL_HANDLE;
00251 }
00252
00253 if (buf.Locked()) {
00254 cerr << "MPICommunicator::ASend() : send KokoBuffer is locked.\n";
00255 return false;
00256 }
00257
00258 MPI_Request req;
00259 int err = MPI_Isend(buf, buf.Size(), MPI_UNSIGNED_CHAR, id, tag,
00260 MPI_COMM_WORLD, &req);
00261
00262 if (err != MPI_SUCCESS) {
00263 cerr << "MPICommunicator::ASend() : Error sending message.\n";
00264 ErrorMesg(err);
00265 return KOKO_NULL_HANDLE;
00266 } else {
00267 AsyncMesgHandle *amhand = new AsyncMesgHandle;
00268 amhand->_handle = req;
00269 amhand->_kbuf = &buf;
00270 _handlecount++;
00271 if (_handlecount == 0)
00272 _handlecount = 1;
00273 AMHPair amhpair(_handlecount, amhand);
00274 AMHBool flag = _handlemap.insert(amhpair);
00275
00276 buf.Lock();
00277
00278
00279 if (flag.second == true)
00280 return amhpair.first;
00281 else
00282 return KOKO_NULL_HANDLE;
00283 }
00284 }
00285
00286
00287
00288
00289
00290
00291
00292
00293 KokoMesgHandle MPICommunicator::ARecv(KokoProcID id, KokoBuffer &buf,
00294 KokoTag tag)
00295 {
00296 if (tag > _maxtag) {
00297 cerr << "MPICommunicator::ARecv() : Warning, tag exceeds upper bound.\n";
00298 cerr << "\tKokoTags must be <= " << _maxtag << endl;
00299 return KOKO_NULL_HANDLE;
00300 }
00301
00302 if (buf.Locked()) {
00303 cerr << "MPICommunicator::ARecv() : send KokoBuffer is locked.\n";
00304 return false;
00305 }
00306
00307 MPI_Request req;
00308 int err = MPI_Irecv(buf, buf.Size(), MPI_UNSIGNED_CHAR, id, tag,
00309 MPI_COMM_WORLD, &req);
00310
00311 if (err != MPI_SUCCESS) {
00312 cerr << "MPICommunicator::ARecv() : Error receiving message.\n";
00313 ErrorMesg(err);
00314 return KOKO_NULL_HANDLE;
00315 } else {
00316 AsyncMesgHandle *amhand = new AsyncMesgHandle;
00317 amhand->_handle = req;
00318 amhand->_kbuf = &buf;
00319 _handlecount++;
00320 if (_handlecount == 0)
00321 _handlecount = 1;
00322 AMHPair amhpair(_handlecount, amhand);
00323 AMHBool flag = _handlemap.insert(amhpair);
00324
00325 buf.Lock();
00326
00327
00328 if (flag.second == true)
00329 return amhpair.first;
00330 else
00331 return KOKO_NULL_HANDLE;
00332 }
00333 }
00334
00335
00336
00337
00338
00339
00340
00341
00342 bool MPICommunicator::Wait(KokoMesgHandle &handle)
00343 {
00344 if (handle == KOKO_NULL_HANDLE) {
00345 cerr << "MPICommunicator::Wait() : Warning! Invalid message handle.\n";
00346 return true;
00347 }
00348
00349 MPI_Status status;
00350
00351 int err = MPI_Wait((MPI_Request *)&handle, &status);
00352
00353 if (err != MPI_SUCCESS) {
00354 cerr << "MPICommunicator::Wait() : Error waiting...\n";
00355 ErrorMesg(err);
00356 return true;
00357 }
00358
00359
00360
00361 MPI_Get_count(&status, MPI_UNSIGNED_CHAR, &_recv_bytes);
00362
00363
00364
00365
00366
00367
00368
00369 AMHIterator it = _handlemap.find(handle);
00370 if (it != _handlemap.end()) {
00371 (*it).second->_kbuf->SetCurSize(_recv_bytes);
00372 (*it).second->_kbuf->Unlock();
00373 delete (*it).second;
00374 _handlemap.erase(it);
00375 } else {
00376 cerr << "MPICommunicator::Wait() : Warning! Message handle not found!\n";
00377 cerr << "\tMessage recevied successfully but we may have a corrupt map.\n";
00378 }
00379
00380 handle = KOKO_NULL_HANDLE;
00381
00382 return false;
00383 }
00384
00385
00386
00387
00388
00389
00390
00391
00392 bool MPICommunicator::Test(KokoMesgHandle &handle)
00393 {
00394 if (handle == KOKO_NULL_HANDLE) {
00395 cerr << "MPICommunicator::Test() : Warning, invalid message handle.\n";
00396 return false;
00397 }
00398
00399 int flag;
00400 MPI_Status status;
00401
00402 int err = MPI_Test((MPI_Request *)&handle, &flag, &status);
00403
00404 if (err != MPI_SUCCESS) {
00405 cerr << "MPICommunicator::Test() : Error testing completion status.\n";
00406 ErrorMesg(err);
00407 return false;
00408 }
00409
00410 if (flag) {
00411
00412
00413
00414
00415
00416
00417 MPI_Get_count(&status, MPI_UNSIGNED_CHAR, &_recv_bytes);
00418
00419
00420
00421
00422
00423
00424
00425 AMHIterator it = _handlemap.find(handle);
00426 if (it != _handlemap.end()) {
00427 (*it).second->_kbuf->SetCurSize(_recv_bytes);
00428 (*it).second->_kbuf->Unlock();
00429 delete (*it).second;
00430 _handlemap.erase(it);
00431 } else {
00432 cerr << "MPICommunicator::Test() : Warning! Message handle not found!\n";
00433 cerr << "\tMessage received but we may have a corrupt map.\n";
00434 }
00435
00436 return true;
00437 } else {
00438 return false;
00439 }
00440 }
00441
00442
00443
00444
00445
00446
00447
00448
00449 bool MPICommunicator::Broadcast(KokoBuffer &buf, KokoProcID root, KokoTag tag)
00450 {
00451
00452 if (buf.Locked()) {
00453 cerr << "MPICommunicator::Broadcast() : KokoBuffer is locked.\n";
00454 return false;
00455 }
00456
00457
00458
00459
00460
00461
00462
00463 if (root == this->ID())
00464 buf.SetTag(tag);
00465
00466
00467
00468
00469
00470 int err = MPI_Bcast(buf.GetBuffer(), buf.TotalSize(), MPI_UNSIGNED_CHAR,
00471 root, MPI_COMM_WORLD);
00472
00473 if (err != MPI_SUCCESS) {
00474 cerr << "MPICommunicator::Broadcast() : Error sending broadcast.\n";
00475 ErrorMesg(err);
00476 return false;
00477 } else {
00478
00479
00480 if (this->ID() != root) {
00481 _recv_tag = buf.GetTag();
00482 if (!buf.SetCurSize(buf.TotalSize())) {
00483 cerr << "MPICommunicator::Broadcast() : Warning! "
00484 << "Can't set buffer size.\n";
00485 }
00486 }
00487 return true;
00488 }
00489 }
00490
00491
00492
00493
00494
00495
00496
00497
00498 void MPICommunicator::ErrorMesg(int code)
00499 {
00500 char mesg[MPI_MAX_ERROR_STRING];
00501 int len;
00502 MPI_Error_string(code, mesg, &len);
00503 cerr << "MPI Error: " << mesg << endl;
00504 }