This source file includes following definitions.
- ompi_crcp_bkmrk_pml_peer_ref_construct
- ompi_crcp_bkmrk_pml_peer_ref_destruct
- ompi_crcp_bkmrk_pml_message_content_ref_construct
- ompi_crcp_bkmrk_pml_message_content_ref_destruct
- ompi_crcp_bkmrk_pml_traffic_message_ref_construct
- ompi_crcp_bkmrk_pml_traffic_message_ref_destruct
- ompi_crcp_bkmrk_pml_drain_message_ref_construct
- ompi_crcp_bkmrk_pml_drain_message_ref_destruct
- ompi_crcp_bkmrk_pml_drain_message_ack_ref_construct
- ompi_crcp_bkmrk_pml_drain_message_ack_ref_destruct
- ompi_crcp_bkmrk_pml_init
- ompi_crcp_bkmrk_pml_finalize
- ompi_crcp_bkmrk_pml_enable
- ompi_crcp_bkmrk_pml_progress
- ompi_crcp_bkmrk_pml_iprobe
- ompi_crcp_bkmrk_pml_probe
- ompi_crcp_bkmrk_pml_dump
- ompi_crcp_bkmrk_pml_add_comm
- ompi_crcp_bkmrk_pml_del_comm
- ompi_crcp_bkmrk_pml_add_procs
- ompi_crcp_bkmrk_pml_del_procs
- ompi_crcp_bkmrk_pml_isend_init
- ompi_crcp_bkmrk_pml_start_isend_init
- ompi_crcp_bkmrk_request_complete_isend_init
- ompi_crcp_bkmrk_pml_isend
- ompi_crcp_bkmrk_request_complete_isend
- ompi_crcp_bkmrk_pml_send
- ompi_crcp_bkmrk_pml_irecv_init
- ompi_crcp_bkmrk_pml_start_drain_irecv_init
- ompi_crcp_bkmrk_pml_start_irecv_init
- ompi_crcp_bkmrk_request_complete_irecv_init
- ompi_crcp_bkmrk_pml_irecv
- ompi_crcp_bkmrk_request_complete_irecv
- ompi_crcp_bkmrk_pml_recv
- ompi_crcp_bkmrk_pml_start
- ompi_crcp_bkmrk_request_complete
- ompi_crcp_bkmrk_pml_quiesce_start
- ompi_crcp_bkmrk_pml_quiesce_end
- ompi_crcp_bkmrk_pml_ft_event
- traffic_message_append
- traffic_message_start
- traffic_message_move
- traffic_message_find_mark_persistent
- traffic_message_grab_content
- traffic_message_create_drain_message
- traffic_message_find_recv
- traffic_message_find
- drain_message_append
- drain_message_remove
- drain_message_check_recv
- drain_message_find_any
- drain_message_find
- drain_message_grab_content
- drain_message_copy_remove_persistent
- drain_message_copy_remove
- find_peer
- find_peer_in_comm
- ft_event_coordinate_peers
- ft_event_finalize_exchange
- ft_event_exchange_bookmarks
- ft_event_check_bookmarks
- ft_event_post_drain_acks
- drain_message_ack_cbfunc
- ft_event_post_drained
- ft_event_post_drain_message
- ft_event_wait_quiesce
- wait_quiesce_drained
- coord_request_wait_all
- coord_request_wait
- wait_quiesce_drain_ack
- send_bookmarks
- recv_bookmarks
- recv_bookmarks_cbfunc
- send_msg_details
- do_send_msg_detail
- recv_msg_details
- do_recv_msg_detail
- do_recv_msg_detail_check_drain
- do_recv_msg_detail_resp
- start_time
- end_time
- get_time
- clear_timers
- display_all_timers
- display_indv_timer
- display_indv_timer_core
- traffic_message_dump_msg_content_indv
- traffic_message_dump_msg_indv
- traffic_message_dump_drain_msg_indv
- traffic_message_dump_msg_list
- traffic_message_dump_peer
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 
  14 
  15 
  16 
  17 
  18 
  19 
  20 
  21 
  22 
  23 #include "ompi_config.h"
  24 
  25 #include <sys/types.h>
  26 #ifdef HAVE_UNISTD_H
  27 #include <unistd.h>
  28 #endif  
  29 
  30 #include "opal/dss/dss.h"
  31 #include "opal/runtime/opal_cr.h"
  32 #include "opal/mca/event/event.h"
  33 #include "opal/util/output.h"
  34 #include "opal/util/printf.h"
  35 
  36 #include "opal/util/opal_environ.h"
  37 #include "ompi/mca/mca.h"
  38 #include "opal/mca/pmix/pmix.h"
  39 
  40 #include "ompi/request/request.h"
  41 #include "ompi/mca/rte/rte.h"
  42 #include "ompi/mca/pml/pml.h"
  43 #include "ompi/mca/pml/base/base.h"
  44 #include "ompi/mca/pml/base/pml_base_request.h"
  45 #include "ompi/mca/crcp/crcp.h"
  46 #include "ompi/mca/crcp/base/base.h"
  47 
  48 #include "opal/class/opal_free_list.h"
  49 #include "ompi/runtime/ompi_cr.h"
  50 #include "orte/runtime/orte_wait.h"
  51 
  52 #include "crcp_bkmrk.h"
  53 #include "crcp_bkmrk_pml.h"
  54 
  55 
  56 
  57 
  58 #define PROBE_ANY_SIZE  ((size_t) 0)
  59 #define PROBE_ANY_COUNT ((size_t) 0)
  60 
  61 #define PERSIST_MARKER ((int) -1)
  62 
  63 #define RECV_MATCH_RESP_DONE  0
  64 #define RECV_MATCH_RESP_MORE  1
  65 #define RECV_MATCH_RESP_ERROR 2
  66 
  67 #define INVALID_INT -123456789
  68 
  69 #define FIND_MSG_TRUE     0
  70 #define FIND_MSG_FALSE    1
  71 #define FIND_MSG_UNKNOWN  2
  72 
  73 
  74 static mca_pml_base_component_t  *wrapped_pml_component = NULL;
  75 static mca_pml_base_module_t     *wrapped_pml_module    = NULL;
  76 
  77 
  78 static uint64_t message_seq_num = 1;
  79 static uint64_t content_ref_seq_num = 1;
  80 
  81 
  82 static uint64_t current_msg_id = 0;
  83 static ompi_crcp_bkmrk_pml_message_type_t current_msg_type = 0;
  84 
  85 
  86 
  87 static bool stall_for_completion;
  88 
  89 
  90 
  91 
  92 static int ft_event_state = OPAL_CRS_RUNNING;
  93 
  94 
  95 
  96 
  97 opal_list_t ompi_crcp_bkmrk_pml_peer_refs;
  98 
  99 
 100 
 101 
 102 opal_list_t unknown_recv_from_list;
 103 opal_list_t unknown_persist_recv_list;
 104 
 105 
 106 
 107 
 108 opal_list_t drained_msg_ack_list;
 109 
 110 
 111 
 112 
 113 opal_free_list_t coord_state_free_list;
 114 opal_free_list_t content_ref_free_list;
 115 opal_free_list_t peer_ref_free_list;
 116 opal_free_list_t traffic_msg_ref_free_list;
 117 opal_free_list_t drain_msg_ref_free_list;
 118 opal_free_list_t drain_ack_msg_ref_free_list;
 119 
 120 
 121 
 122 
 123 ompi_request_t       ** quiesce_requests = NULL;
 124 ompi_status_public_t ** quiesce_statuses = NULL;
 125 int                     quiesce_request_count = 0;
 126 
 127 
 128 
 129 
 130 
 131 static int ompi_crcp_bkmrk_pml_start_isend_init(ompi_request_t **request);
 132 static int ompi_crcp_bkmrk_pml_start_irecv_init(ompi_request_t **request);
 133 static int ompi_crcp_bkmrk_pml_start_drain_irecv_init(ompi_request_t **request, bool *found_drain);
 134 
 135 static int ompi_crcp_bkmrk_request_complete_isend_init(struct ompi_request_t *request,
 136                                                ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
 137                                                int src, int tag, int tmp_ddt_size);
 138 static int ompi_crcp_bkmrk_request_complete_isend(struct ompi_request_t *request,
 139                                                ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
 140                                                int src, int tag, int tmp_ddt_size);
 141 static int ompi_crcp_bkmrk_request_complete_irecv_init(struct ompi_request_t *request,
 142                                                ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
 143                                                int src, int tag, int tmp_ddt_size);
 144 static int ompi_crcp_bkmrk_request_complete_irecv(struct ompi_request_t *request,
 145                                                ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
 146                                                int src, int tag, int tmp_ddt_size);
 147 
 148 
 149 
 150 
 151 static int traffic_message_append(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
 152                                   opal_list_t * append_list,
 153                                   ompi_crcp_bkmrk_pml_message_type_t msg_type,
 154                                   size_t count,
 155                                   ompi_datatype_t *datatype,
 156                                   size_t ddt_size,
 157                                   int tag,
 158                                   int dest,
 159                                   struct ompi_communicator_t* comm,
 160                                   ompi_crcp_bkmrk_pml_traffic_message_ref_t **msg_ref);
 161 
 162 
 163 
 164 
 165 static int traffic_message_start(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref,
 166                                  ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
 167                                  ompi_request_t **request,
 168                                  opal_list_t * peer_list,
 169                                  ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref);
 170 
 171 
 172 
 173 
 174 
 175 static int traffic_message_move(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref,
 176                                 ompi_crcp_bkmrk_pml_message_type_t msg_type,
 177                                 ompi_crcp_bkmrk_pml_peer_ref_t *from_peer_ref,
 178                                 opal_list_t * from_list,
 179                                 ompi_crcp_bkmrk_pml_peer_ref_t *to_peer_ref,
 180                                 opal_list_t * to_list,
 181                                 ompi_crcp_bkmrk_pml_traffic_message_ref_t **new_msg_ref,
 182                                 bool keep_active, 
 183                                 bool remove); 
 184 
 185 
 186 
 187 
 188 static int traffic_message_grab_content(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref,
 189                                         ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref,
 190                                         bool remove,
 191                                         bool already_drained);
 192 
 193 
 194 
 195 
 196 static int traffic_message_find_mark_persistent(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref,
 197                                                 ompi_request_t **request,
 198                                                 bool cur_active,
 199                                                 bool set_is_active,
 200                                                 ompi_crcp_bkmrk_pml_message_content_ref_t **content_ref);
 201 
 202 
 203 
 204 
 205 static int traffic_message_find(opal_list_t * search_list,
 206                                 size_t count, int tag, int peer, uint32_t comm_id,
 207                                 size_t ddt_size,
 208                                 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** found_msg_ref,
 209                                 int active);
 210 
 211 
 212 
 213 
 214 
 215 static int traffic_message_find_recv(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
 216                                      int rank, uint32_t comm_id, int tag,
 217                                      size_t count, size_t datatype_size,
 218                                      ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_recv_msg_ref,
 219                                      ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_irecv_msg_ref,
 220                                      ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_precv_msg_ref,
 221                                      ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_unknown_recv_msg_ref,
 222                                      ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_unknown_precv_msg_ref);
 223 
 224 
 225 
 226 
 227 static int traffic_message_create_drain_message(bool post_drain,
 228                                                 int max_post,
 229                                                 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
 230                                                 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_msg_ref,
 231                                                 int *num_posted);
 232 
 233 
 234 
 235 
 236 static int drain_message_append(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
 237                                 ompi_crcp_bkmrk_pml_message_type_t msg_type,
 238                                 size_t count, size_t ddt_size,
 239                                 int tag,int dest,
 240                                 struct ompi_communicator_t* comm,
 241                                 ompi_crcp_bkmrk_pml_drain_message_ref_t **msg_ref);
 242 
 243 
 244 
 245 
 246 static int drain_message_remove(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
 247                                 ompi_crcp_bkmrk_pml_drain_message_ref_t *msg_ref,
 248                                 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref);
 249 
 250 
 251 
 252 
 253 static int drain_message_check_recv(void **buf, size_t count,
 254                                     ompi_datatype_t *datatype,
 255                                     int *src, int *tag,
 256                                     struct ompi_communicator_t* comm,
 257                                     struct ompi_request_t **request,
 258                                     ompi_status_public_t** status,
 259                                     bool *found_drain);
 260 
 261 
 262 
 263 
 264 static int drain_message_find(opal_list_t * search_list,
 265                               size_t count, int tag, int peer,
 266                               uint32_t comm_id, size_t ddt_size,
 267                               ompi_crcp_bkmrk_pml_drain_message_ref_t ** found_msg_ref,
 268                               ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref);
 269 
 270 
 271 
 272 
 273 static int drain_message_find_any(size_t count, int tag, int peer,
 274                                   struct ompi_communicator_t* comm, size_t ddt_size,
 275                                   ompi_crcp_bkmrk_pml_drain_message_ref_t ** found_msg_ref,
 276                                   ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref,
 277                                   ompi_crcp_bkmrk_pml_peer_ref_t **peer_ref);
 278 
 279 
 280 
 281 
 282 static int drain_message_grab_content(ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref,
 283                                       ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref);
 284 
 285 
 286 
 287 
 288 static int drain_message_copy_remove(ompi_crcp_bkmrk_pml_drain_message_ref_t *msg_ref,
 289                                      ompi_crcp_bkmrk_pml_message_content_ref_t * content_ref,
 290                                      int *src, int *tag,
 291                                      struct ompi_request_t **request,
 292                                      ompi_status_public_t **status,
 293                                      ompi_datatype_t *datatype, int count, void **buf,
 294                                      ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref);
 295 
 296 
 297 
 298 
 299 static int drain_message_copy_remove_persistent(ompi_crcp_bkmrk_pml_drain_message_ref_t   *drain_msg_ref,
 300                                                 ompi_crcp_bkmrk_pml_message_content_ref_t *drain_content_ref,
 301                                                 ompi_crcp_bkmrk_pml_traffic_message_ref_t *traffic_msg_ref,
 302                                                 ompi_request_t *request,
 303                                                 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref);
 304 
 305 
 306 
 307 
 308 static ompi_crcp_bkmrk_pml_peer_ref_t* find_peer(ompi_process_name_t proc);
 309 
 310 
 311 
 312 
 313 static int find_peer_in_comm(struct ompi_communicator_t* comm, int proc_idx,
 314                              ompi_crcp_bkmrk_pml_peer_ref_t **peer_ref);
 315 
 316 
 317 
 318 
 319 
 320 static int ft_event_coordinate_peers(void);
 321 
 322 
 323 
 324 
 325 
 326 static int ft_event_finalize_exchange(void);
 327 
 328 
 329 
 330 
 331 
 332 
 333 
 334 static int ft_event_exchange_bookmarks(void);
 335 
 336 
 337 
 338 
 339 static int send_bookmarks(int peer_idx);
 340 
 341 
 342 
 343 
 344 static int recv_bookmarks(int peer_idx);
 345 
 346 
 347 
 348 
 349 static void recv_bookmarks_cbfunc(int status,
 350                                   ompi_process_name_t* sender,
 351                                   opal_buffer_t *buffer,
 352                                   ompi_rml_tag_t tag,
 353                                   void* cbdata);
 354 static int total_recv_bookmarks = 0;
 355 
 356 
 357 
 358 
 359 
 360 static int ft_event_check_bookmarks(void);
 361 
 362 
 363 
 364 
 365 
 366 static int send_msg_details(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
 367                             int total_sent, int total_matched);
 368 
 369 
 370 
 371 
 372 
 373 static int do_send_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
 374                               ompi_crcp_bkmrk_pml_traffic_message_ref_t*msg_ref,
 375                               int *num_matches,
 376                               int *total_found,
 377                               bool *finished);
 378 
 379 
 380 
 381 
 382 static int recv_msg_details(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
 383                             int total_recv, int total_matched);
 384 
 385 
 386 
 387 
 388 static int do_recv_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
 389                               int *rank, uint32_t *comm_id, int *tag,
 390                               size_t *count, size_t *datatype_size,
 391                               int *p_num_sent);
 392 
 393 
 394 
 395 
 396 
 397 
 398 static int do_recv_msg_detail_check_drain(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
 399                                     int rank, uint32_t comm_id, int tag,
 400                                     size_t count, size_t datatype_size,
 401                                     int p_num_sent,
 402                                     int *num_resolved);
 403 
 404 
 405 
 406 
 407 static int do_recv_msg_detail_resp(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
 408                                    int resp,
 409                                    int num_resolv,
 410                                    int total_found);
 411 
 412 
 413 
 414 
 415 
 416 
 417 static int ft_event_post_drain_acks(void);
 418 
 419 
 420 
 421 
 422 static void drain_message_ack_cbfunc(int status,
 423                                      ompi_process_name_t* sender,
 424                                      opal_buffer_t *buffer,
 425                                      ompi_rml_tag_t tag,
 426                                      void* cbdata);
 427 
 428 
 429 
 430 
 431 
 432 static int ft_event_post_drained(void);
 433 
 434 static int ft_event_post_drain_message(ompi_crcp_bkmrk_pml_drain_message_ref_t   *drain_msg_ref,
 435                                        ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref);
 436 
 437 
 438 
 439 
 440 
 441 
 442 static int ft_event_wait_quiesce(void);
 443 
 444 
 445 
 446 
 447 static int wait_quiesce_drained(void);
 448 
 449 
 450 
 451 
 452 
 453 
 454 static int coord_request_wait_all( size_t count,
 455                                    ompi_request_t ** requests,
 456                                    ompi_status_public_t ** statuses);
 457 
 458 
 459 
 460 
 461 
 462 
 463 
 464 static int coord_request_wait( ompi_request_t * request,
 465                                ompi_status_public_t * status);
 466 
 467 
 468 
 469 
 470 static int wait_quiesce_drain_ack(void);
 471 
 472 
 473 
 474 
 475 
 476 
 477 
 478 
 479 
 480 
 481 
 482 
 483 
 484 
 485 
 486 
 487 
 488 
 489 
 490 
 491 
 492 
 493 
 494 
 495 
 496 
 497 
 498 
 499 
 500 
 501 
 502 
 503 
 504 
 505 
 506 
 507 #define CRCP_TIMER_TOTAL_CKPT          0
 508 #define CRCP_TIMER_CKPT_EX_B           1
 509 #define CRCP_TIMER_CKPT_EX_PEER_S      2
 510 #define CRCP_TIMER_CKPT_EX_PEER_R      3
 511 #define CRCP_TIMER_CKPT_EX_WAIT        4
 512 #define CRCP_TIMER_CKPT_CHECK_B        5
 513 #define CRCP_TIMER_CKPT_CHECK_PEER_S   6
 514 #define CRCP_TIMER_CKPT_CHECK_PEER_R   7
 515 #define CRCP_TIMER_CKPT_POST_DRAIN     8
 516 #define CRCP_TIMER_CKPT_WAIT_QUI       9
 517 #define CRCP_TIMER_TOTAL_CONT         10
 518 #define CRCP_TIMER_TOTAL_RST          11
 519 #define CRCP_TIMER_MAX                12
 520 
 521 static double get_time(void);
 522 static void start_time(int idx);
 523 static void end_time(int idx);
 524 static void display_indv_timer(int idx, int proc, int msgs);
 525 static void display_indv_timer_core(int idx, int proc, int msgs, bool direct);
 526 static void display_all_timers(int state);
 527 static void clear_timers(void);
 528 
 529 double timer_start[CRCP_TIMER_MAX];
 530 double timer_end[CRCP_TIMER_MAX];
 531 char * timer_label[CRCP_TIMER_MAX];
 532 
 533 #define START_TIMER(idx)                    \
 534   {                                         \
 535     if(OPAL_UNLIKELY(timing_enabled > 0)) { \
 536       start_time(idx);                      \
 537     }                                       \
 538   }
 539 
 540 #define END_TIMER(idx)                      \
 541   {                                         \
 542     if(OPAL_UNLIKELY(timing_enabled > 0)) { \
 543       end_time(idx);                        \
 544     }                                       \
 545   }
 546 
 547 #define DISPLAY_INDV_TIMER(idx, proc, msg)  \
 548   {                                         \
 549     if(OPAL_UNLIKELY(timing_enabled > 0)) { \
 550       display_indv_timer(idx, proc, msg);   \
 551     }                                       \
 552   }
 553 
 554 #define DISPLAY_ALL_TIMERS(var)             \
 555   {                                         \
 556     if(OPAL_UNLIKELY(timing_enabled > 0)) { \
 557       display_all_timers(var);              \
 558     }                                       \
 559   }
 560 
 561 
 562 
 563 
 564 #if OPAL_ENABLE_DEBUG
 565 static void traffic_message_dump_peer(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref, char * msg, bool root_only);
 566 static void traffic_message_dump_msg_list(opal_list_t *msg_list, bool is_drain);
 567 static void traffic_message_dump_msg_indv(ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref, char * msg, bool vshort);
 568 static void traffic_message_dump_msg_content_indv(ompi_crcp_bkmrk_pml_message_content_ref_t * content_ref);
 569 
 570 static void traffic_message_dump_drain_msg_indv(ompi_crcp_bkmrk_pml_drain_message_ref_t * msg_ref, char * msg, bool vshort);
 571 
 572 #define TRAFFIC_MSG_DUMP_PEER(lv, a) {                 \
 573    if( lv <= mca_crcp_bkmrk_component.super.verbose ) { \
 574        traffic_message_dump_peer a;                    \
 575    }                                                   \
 576 }
 577 #define TRAFFIC_MSG_DUMP_MSG_LIST(lv, a) {             \
 578    if( lv <= mca_crcp_bkmrk_component.super.verbose ) { \
 579        traffic_message_dump_msg_list a;                \
 580    }                                                   \
 581 }
 582 #define TRAFFIC_MSG_DUMP_MSG_INDV(lv, a) {             \
 583    if( lv <= mca_crcp_bkmrk_component.super.verbose ) { \
 584        traffic_message_dump_msg_indv a;                \
 585    }                                                   \
 586 }
 587 #define TRAFFIC_MSG_DUMP_MSG_CONTENT_INDV(lv, a) {     \
 588    if( lv <= mca_crcp_bkmrk_component.super.verbose ) { \
 589        traffic_message_dump_msg_content_indv a;        \
 590    }                                                   \
 591 }
 592 #define TRAFFIC_MSG_DUMP_DRAIN_MSG_INDV(lv, a) {       \
 593    if( lv <= mca_crcp_bkmrk_component.super.verbose ) { \
 594        traffic_message_dump_drain_msg_indv a;          \
 595    }                                                   \
 596 }
 597 #else
 598 #define TRAFFIC_MSG_DUMP_PEER(lv, a)             ;
 599 #define TRAFFIC_MSG_DUMP_MSG_LIST(lv, a)         ;
 600 #define TRAFFIC_MSG_DUMP_MSG_INDV(lv, a)         ;
 601 #define TRAFFIC_MSG_DUMP_MSG_CONTENT_INDV(lv, a) ;
 602 #define TRAFFIC_MSG_DUMP_DRAIN_MSG_INDV(lv, a)   ;
 603 #endif
 604 
 605 #define ERROR_SHOULD_NEVER_HAPPEN(msg) {                                 \
 606   opal_output(0, msg                                                     \
 607               " ---------- This should never happen ---------- (%s:%d)", \
 608               __FILE__, __LINE__);                                       \
 609 }
 610 
 611 #define ERROR_SHOULD_NEVER_HAPPEN_ARG(msg, arg) {                        \
 612   opal_output(0, msg                                                     \
 613               " ---------- This should never happen ---------- (%s:%d)", \
 614               arg, __FILE__, __LINE__);                                  \
 615 }
 616 
 617 
 618 
 619 
 620 
 621 
 622 
 623 #define HOKE_PEER_REF_ALLOC(peer_ref)                 \
 624 do {                                                  \
 625     peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t *)     \
 626       opal_free_list_wait (&peer_ref_free_list);      \
 627 } while(0)
 628 
 629 #define HOKE_PEER_REF_RETURN(peer_ref)         \
 630 do {                                           \
 631    opal_free_list_return (&peer_ref_free_list, \
 632    (opal_free_list_item_t*)peer_ref);          \
 633 } while(0)
 634 
 635 
 636 #define HOKE_CONTENT_REF_ALLOC(content_ref)                       \
 637 do {                                                              \
 638     content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)    \
 639       opal_free_list_wait (&content_ref_free_list);               \
 640     content_ref->msg_id = content_ref_seq_num;                    \
 641     content_ref_seq_num++;                                        \
 642 } while(0)
 643 
 644 #define HOKE_CONTENT_REF_RETURN(content_ref)      \
 645 do {                                              \
 646    opal_free_list_return (&content_ref_free_list, \
 647    (opal_free_list_item_t*)content_ref);          \
 648 } while(0)
 649 
 650 
 651 #define HOKE_TRAFFIC_MSG_REF_ALLOC(msg_ref)                   \
 652 do {                                                          \
 653     msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)    \
 654       opal_free_list_wait (&traffic_msg_ref_free_list);       \
 655 } while(0)
 656 
 657 #define HOKE_TRAFFIC_MSG_REF_RETURN(msg_ref)          \
 658 do {                                                  \
 659    opal_free_list_return (&traffic_msg_ref_free_list, \
 660    (opal_free_list_item_t*)msg_ref);                  \
 661 } while(0)
 662 
 663 
 664 #define HOKE_DRAIN_MSG_REF_ALLOC(msg_ref)                      \
 665 do {                                                           \
 666     msg_ref = (ompi_crcp_bkmrk_pml_drain_message_ref_t *)      \
 667         opal_free_list_wait (&drain_msg_ref_free_list);        \
 668 } while(0)
 669 
 670 #define HOKE_DRAIN_MSG_REF_RETURN(msg_ref)                   \
 671 do {                                                         \
 672     opal_free_list_return (&drain_msg_ref_free_list,         \
 673                            (opal_free_list_item_t*)msg_ref); \
 674 } while(0)
 675 
 676 
 677 #define HOKE_DRAIN_ACK_MSG_REF_ALLOC(msg_ref)                   \
 678 do {                                                            \
 679     msg_ref = (ompi_crcp_bkmrk_pml_drain_message_ack_ref_t *)   \
 680         opal_free_list_wait (&drain_ack_msg_ref_free_list);     \
 681 } while(0)
 682 
 683 #define HOKE_DRAIN_ACK_MSG_REF_RETURN(msg_ref)                  \
 684 do {                                                            \
 685     opal_free_list_return (&drain_ack_msg_ref_free_list,        \
 686                            (opal_free_list_item_t*)msg_ref);    \
 687 } while(0)
 688 
 689 
 690 
 691 
 692 
 693 OBJ_CLASS_INSTANCE(ompi_crcp_bkmrk_pml_peer_ref_t,
 694                    opal_list_item_t,
 695                    ompi_crcp_bkmrk_pml_peer_ref_construct,
 696                    ompi_crcp_bkmrk_pml_peer_ref_destruct);
 697 
 698 void ompi_crcp_bkmrk_pml_peer_ref_construct(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref) {
 699     peer_ref->proc_name.jobid  = ORTE_JOBID_INVALID;
 700     peer_ref->proc_name.vpid   = ORTE_VPID_INVALID;
 701 
 702     OBJ_CONSTRUCT(&peer_ref->send_list,       opal_list_t);
 703     OBJ_CONSTRUCT(&peer_ref->isend_list,      opal_list_t);
 704     OBJ_CONSTRUCT(&peer_ref->send_init_list,  opal_list_t);
 705 
 706     OBJ_CONSTRUCT(&peer_ref->recv_list,       opal_list_t);
 707     OBJ_CONSTRUCT(&peer_ref->irecv_list,      opal_list_t);
 708     OBJ_CONSTRUCT(&peer_ref->recv_init_list,  opal_list_t);
 709 
 710     OBJ_CONSTRUCT(&peer_ref->drained_list,    opal_list_t);
 711 
 712     peer_ref->total_msgs_sent    = 0;
 713     peer_ref->matched_msgs_sent  = 0;
 714 
 715     peer_ref->total_msgs_recvd   = 0;
 716     peer_ref->matched_msgs_recvd = 0;
 717 
 718     peer_ref->total_drained_msgs = 0;
 719 
 720     peer_ref->ack_required = false;
 721 }
 722 
 723 void ompi_crcp_bkmrk_pml_peer_ref_destruct( ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref) {
 724     opal_list_item_t* item = NULL;
 725 
 726     peer_ref->proc_name.jobid  = ORTE_JOBID_INVALID;
 727     peer_ref->proc_name.vpid   = ORTE_VPID_INVALID;
 728 
 729     while( NULL != (item = opal_list_remove_first(&peer_ref->send_list)) ) {
 730         HOKE_TRAFFIC_MSG_REF_RETURN(item);
 731     }
 732     OBJ_DESTRUCT(&peer_ref->send_list);
 733     while( NULL != (item = opal_list_remove_first(&peer_ref->isend_list)) ) {
 734         HOKE_TRAFFIC_MSG_REF_RETURN(item);
 735     }
 736     OBJ_DESTRUCT(&peer_ref->isend_list);
 737     while( NULL != (item = opal_list_remove_first(&peer_ref->send_init_list)) ) {
 738         HOKE_TRAFFIC_MSG_REF_RETURN(item);
 739     }
 740     OBJ_DESTRUCT(&peer_ref->send_init_list);
 741 
 742     while( NULL != (item = opal_list_remove_first(&peer_ref->recv_list)) ) {
 743         HOKE_TRAFFIC_MSG_REF_RETURN(item);
 744     }
 745     OBJ_DESTRUCT(&peer_ref->recv_list);
 746     while( NULL != (item = opal_list_remove_first(&peer_ref->irecv_list)) ) {
 747         HOKE_TRAFFIC_MSG_REF_RETURN(item);
 748     }
 749     OBJ_DESTRUCT(&peer_ref->irecv_list);
 750     while( NULL != (item = opal_list_remove_first(&peer_ref->recv_init_list)) ) {
 751         HOKE_TRAFFIC_MSG_REF_RETURN(item);
 752     }
 753     OBJ_DESTRUCT(&peer_ref->recv_init_list);
 754 
 755     while( NULL != (item = opal_list_remove_first(&peer_ref->drained_list)) ) {
 756         HOKE_DRAIN_MSG_REF_RETURN(item);
 757     }
 758     OBJ_DESTRUCT(&peer_ref->drained_list);
 759 
 760     peer_ref->total_msgs_sent    = 0;
 761     peer_ref->matched_msgs_sent  = 0;
 762 
 763     peer_ref->total_msgs_recvd   = 0;
 764     peer_ref->matched_msgs_recvd = 0;
 765 
 766     peer_ref->total_drained_msgs = 0;
 767 
 768     peer_ref->ack_required = false;
 769 }
 770 
 771 
 772 
 773 
 774 OBJ_CLASS_INSTANCE(ompi_crcp_bkmrk_pml_message_content_ref_t,
 775                    opal_list_item_t,
 776                    ompi_crcp_bkmrk_pml_message_content_ref_construct,
 777                    ompi_crcp_bkmrk_pml_message_content_ref_destruct);
 778 
 779 void ompi_crcp_bkmrk_pml_message_content_ref_construct(ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref)
 780 {
 781     content_ref->buffer  = NULL;
 782     content_ref->request = NULL;
 783     content_ref->active  = false;
 784 
 785     content_ref->done    = false;
 786     content_ref->active  = false;
 787     content_ref->already_posted  = false;
 788     content_ref->already_drained = false;
 789 
 790     content_ref->msg_id = 0;
 791 }
 792 
 793 void ompi_crcp_bkmrk_pml_message_content_ref_destruct( ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref)
 794 {
 795     if( NULL != content_ref->buffer ) {
 796         free(content_ref->buffer);
 797     }
 798     content_ref->buffer  = NULL;
 799 
 800     if( NULL != content_ref->request ) {
 801         OBJ_RELEASE(content_ref->request);
 802     }
 803     content_ref->request = NULL;
 804 
 805     content_ref->active = false;
 806 
 807     content_ref->done    = false;
 808     content_ref->active  = false;
 809     content_ref->already_posted  = false;
 810     content_ref->already_drained = false;
 811 
 812     content_ref->msg_id = 0;
 813 }
 814 
 815 
 816 
 817 
 818 OBJ_CLASS_INSTANCE(ompi_crcp_bkmrk_pml_traffic_message_ref_t,
 819                    opal_list_item_t,
 820                    ompi_crcp_bkmrk_pml_traffic_message_ref_construct,
 821                    ompi_crcp_bkmrk_pml_traffic_message_ref_destruct);
 822 
 823 void ompi_crcp_bkmrk_pml_traffic_message_ref_construct(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref) {
 824     msg_ref->msg_id     = 0;
 825     msg_ref->msg_type   = COORD_MSG_TYPE_UNKNOWN;
 826 
 827     msg_ref->count      = 0;
 828     msg_ref->ddt_size   = 0;
 829     msg_ref->tag        = 0;
 830     msg_ref->rank       = 0;
 831     msg_ref->comm       = NULL;
 832 
 833     OBJ_CONSTRUCT(&msg_ref->msg_contents, opal_list_t);
 834 
 835     msg_ref->proc_name.jobid  = ORTE_JOBID_INVALID;
 836     msg_ref->proc_name.vpid   = ORTE_VPID_INVALID;
 837 
 838     msg_ref->matched        = INVALID_INT;
 839     msg_ref->done           = INVALID_INT;
 840     msg_ref->active         = INVALID_INT;
 841     msg_ref->posted         = INVALID_INT;
 842     msg_ref->active_drain   = INVALID_INT;
 843 }
 844 
 845 void ompi_crcp_bkmrk_pml_traffic_message_ref_destruct( ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref) {
 846     opal_list_item_t* item = NULL;
 847 
 848     msg_ref->msg_id     = 0;
 849     msg_ref->msg_type   = COORD_MSG_TYPE_UNKNOWN;
 850 
 851     msg_ref->count      = 0;
 852     msg_ref->ddt_size   = 0;
 853     msg_ref->tag        = 0;
 854     msg_ref->rank       = 0;
 855     msg_ref->comm       = NULL;
 856 
 857     while( NULL != (item = opal_list_remove_first(&(msg_ref->msg_contents)) ) ) {
 858         HOKE_CONTENT_REF_RETURN(item);
 859     }
 860     OBJ_DESTRUCT(&(msg_ref->msg_contents));
 861 
 862     msg_ref->proc_name.jobid  = ORTE_JOBID_INVALID;
 863     msg_ref->proc_name.vpid   = ORTE_VPID_INVALID;
 864 
 865     msg_ref->matched        = INVALID_INT;
 866     msg_ref->done           = INVALID_INT;
 867     msg_ref->active         = INVALID_INT;
 868     msg_ref->posted         = INVALID_INT;
 869     msg_ref->active_drain   = INVALID_INT;
 870 }
 871 
 872 
 873 
 874 
 875 OBJ_CLASS_INSTANCE(ompi_crcp_bkmrk_pml_drain_message_ref_t,
 876                    opal_list_item_t,
 877                    ompi_crcp_bkmrk_pml_drain_message_ref_construct,
 878                    ompi_crcp_bkmrk_pml_drain_message_ref_destruct);
 879 
 880 void ompi_crcp_bkmrk_pml_drain_message_ref_construct(ompi_crcp_bkmrk_pml_drain_message_ref_t *msg_ref) {
 881     msg_ref->msg_id     = 0;
 882     msg_ref->msg_type   = COORD_MSG_TYPE_UNKNOWN;
 883 
 884     msg_ref->count      = 0;
 885 
 886     msg_ref->datatype   = NULL;
 887     msg_ref->ddt_size   = 0;
 888 
 889     msg_ref->tag        = 0;
 890     msg_ref->rank       = 0;
 891     msg_ref->comm       = NULL;
 892 
 893     OBJ_CONSTRUCT(&msg_ref->msg_contents, opal_list_t);
 894 
 895     msg_ref->proc_name.jobid  = ORTE_JOBID_INVALID;
 896     msg_ref->proc_name.vpid   = ORTE_VPID_INVALID;
 897 
 898     msg_ref->done           = INVALID_INT;
 899     msg_ref->active         = INVALID_INT;
 900     msg_ref->already_posted = INVALID_INT;
 901 }
 902 
 903 void ompi_crcp_bkmrk_pml_drain_message_ref_destruct( ompi_crcp_bkmrk_pml_drain_message_ref_t *msg_ref) {
 904     opal_list_item_t* item = NULL;
 905 
 906     msg_ref->msg_id     = 0;
 907     msg_ref->msg_type   = COORD_MSG_TYPE_UNKNOWN;
 908 
 909     msg_ref->count      = 0;
 910 
 911     if( NULL != msg_ref->datatype ) {
 912         OBJ_RELEASE(msg_ref->datatype);
 913         msg_ref->datatype   = NULL;
 914     }
 915     msg_ref->ddt_size   = 0;
 916 
 917     msg_ref->tag        = 0;
 918     msg_ref->rank       = 0;
 919     msg_ref->comm       = NULL;
 920 
 921     while( NULL != (item = opal_list_remove_first(&(msg_ref->msg_contents)) ) ) {
 922         HOKE_CONTENT_REF_RETURN(item);
 923     }
 924     OBJ_DESTRUCT(&(msg_ref->msg_contents));
 925 
 926     msg_ref->proc_name.jobid  = ORTE_JOBID_INVALID;
 927     msg_ref->proc_name.vpid   = ORTE_VPID_INVALID;
 928 
 929     msg_ref->done           = INVALID_INT;
 930     msg_ref->active         = INVALID_INT;
 931     msg_ref->already_posted = INVALID_INT;
 932 }
 933 
 934 
 935 
 936 
 937 OBJ_CLASS_INSTANCE(ompi_crcp_bkmrk_pml_drain_message_ack_ref_t,
 938                    opal_list_item_t,
 939                    ompi_crcp_bkmrk_pml_drain_message_ack_ref_construct,
 940                    ompi_crcp_bkmrk_pml_drain_message_ack_ref_destruct);
 941 
 942 void ompi_crcp_bkmrk_pml_drain_message_ack_ref_construct(ompi_crcp_bkmrk_pml_drain_message_ack_ref_t *msg_ack_ref) {
 943     msg_ack_ref->complete    = false;
 944 
 945     msg_ack_ref->peer.jobid  = ORTE_JOBID_INVALID;
 946     msg_ack_ref->peer.vpid   = ORTE_VPID_INVALID;
 947 }
 948 
 949 void ompi_crcp_bkmrk_pml_drain_message_ack_ref_destruct( ompi_crcp_bkmrk_pml_drain_message_ack_ref_t *msg_ack_ref) {
 950     msg_ack_ref->complete   = false;
 951 
 952     msg_ack_ref->peer.jobid  = ORTE_JOBID_INVALID;
 953     msg_ack_ref->peer.vpid   = ORTE_VPID_INVALID;
 954 }
 955 
 956 
 957 
 958 
 959 
 960 OBJ_CLASS_INSTANCE(ompi_crcp_bkmrk_pml_state_t,
 961                    ompi_crcp_base_pml_state_t,
 962                    NULL,
 963                    NULL
 964                    );
 965 
 966 
 967 
 968 
 969 #define CRCP_COORD_STATE_ALLOC(state_ref)                    \
 970 do {                                                         \
 971     state_ref = (ompi_crcp_bkmrk_pml_state_t *)              \
 972         opal_free_list_wait (&coord_state_free_list);        \
 973 } while(0)
 974 
 975 #define CRCP_COORD_STATE_RETURN(state_ref)                      \
 976 do {                                                            \
 977     opal_free_list_return (&coord_state_free_list,              \
 978                            (opal_free_list_item_t *)state_ref); \
 979 } while(0)
 980 
 981 #define CREATE_COORD_STATE(coord_state, pml_state, v_peer_ref, v_msg_ref)         \
 982  {                                                                                \
 983    CRCP_COORD_STATE_ALLOC(coord_state);                                           \
 984                                                                                   \
 985    coord_state->prev_ptr           = pml_state;                                   \
 986    coord_state->p_super.super      = pml_state->super;                            \
 987    coord_state->p_super.state      = pml_state->state;                            \
 988    coord_state->p_super.error_code = pml_state->error_code;                       \
 989    coord_state->p_super.wrapped_pml_component = pml_state->wrapped_pml_component; \
 990    coord_state->p_super.wrapped_pml_module    = pml_state->wrapped_pml_module;    \
 991                                                                                   \
 992    coord_state->peer_ref         = v_peer_ref;                                    \
 993    coord_state->msg_ref          = v_msg_ref;                                     \
 994  }
 995 
 996 #define EXTRACT_COORD_STATE(pml_state, v_coord_state, v_rtn_state, v_peer_ref, v_msg_ref) \
 997  {                                                           \
 998    v_coord_state = (ompi_crcp_bkmrk_pml_state_t*)pml_state;   \
 999    v_rtn_state   = v_coord_state->prev_ptr;                  \
1000    v_peer_ref    = v_coord_state->peer_ref;                  \
1001    v_msg_ref     = v_coord_state->msg_ref;                   \
1002  }
1003 
1004 
1005 #define CREATE_NEW_MSG(msg_ref, v_type, v_count, v_ddt_size, v_tag, v_rank, v_comm, p_jobid, p_vpid) \
1006  {                                                               \
1007    HOKE_TRAFFIC_MSG_REF_ALLOC(msg_ref);                          \
1008                                                                  \
1009    msg_ref->msg_id   = message_seq_num;                          \
1010    message_seq_num++;                                            \
1011                                                                  \
1012    msg_ref->msg_type = v_type;                                   \
1013                                                                  \
1014    msg_ref->count    = v_count;                                  \
1015                                                                  \
1016    msg_ref->ddt_size = v_ddt_size;                               \
1017                                                                  \
1018    msg_ref->tag     = v_tag;                                     \
1019    msg_ref->rank    = v_rank;                                    \
1020    msg_ref->comm    = v_comm;                                    \
1021                                                                  \
1022    msg_ref->proc_name.jobid  = p_jobid;                          \
1023    msg_ref->proc_name.vpid   = p_vpid;                           \
1024                                                                  \
1025    msg_ref->matched = 0;                                         \
1026    msg_ref->done    = 0;                                         \
1027    msg_ref->active  = 0;                                         \
1028    msg_ref->posted  = 0;                                         \
1029    msg_ref->active_drain = 0;                                    \
1030  }
1031 
1032 #define CREATE_NEW_DRAIN_MSG(msg_ref, v_type, v_count, v_ddt_size, v_tag, v_rank, v_comm, p_jobid, p_vpid) \
1033  {                                                               \
1034    HOKE_DRAIN_MSG_REF_ALLOC(msg_ref);                            \
1035                                                                  \
1036    msg_ref->msg_id   = message_seq_num;                          \
1037    message_seq_num++;                                            \
1038                                                                  \
1039    msg_ref->msg_type = v_type;                                   \
1040                                                                  \
1041    msg_ref->count    = v_count;                                  \
1042                                                                  \
1043    msg_ref->datatype = NULL;                                     \
1044    msg_ref->ddt_size = ddt_size;                                 \
1045                                                                  \
1046    msg_ref->tag     = v_tag;                                     \
1047    msg_ref->rank    = v_rank;                                    \
1048    msg_ref->comm    = v_comm;                                    \
1049                                                                  \
1050    msg_ref->proc_name.jobid  = p_jobid;                          \
1051    msg_ref->proc_name.vpid   = p_vpid;                           \
1052  }
1053 
1054 
1055 #define PACK_BUFFER(buffer, var, count, type, error_msg)                       \
1056  {                                                                             \
1057     if (OMPI_SUCCESS != (ret = opal_dss.pack(buffer, &(var), count, type)) ) { \
1058         opal_output(mca_crcp_bkmrk_component.super.output_handle,               \
1059                     "%s (Return %d)", error_msg, ret);                         \
1060         exit_status = ret;                                                     \
1061         goto cleanup;                                                          \
1062     }                                                                          \
1063  }
1064 
1065 #define UNPACK_BUFFER(buffer, var, count, type, error_msg)                     \
1066  {                                                                             \
1067     int32_t n = count;                                                 \
1068     if (OPAL_SUCCESS != (ret = opal_dss.unpack(buffer, &(var), &n, type)) ) {  \
1069         opal_output(mca_crcp_bkmrk_component.super.output_handle,               \
1070                     "%s (Return %d)", error_msg, ret);                         \
1071         exit_status = ret;                                                     \
1072         goto cleanup;                                                          \
1073     }                                                                          \
1074  }
1075 
1076 
1077 
1078 
1079 int ompi_crcp_bkmrk_pml_init(void) {
1080     message_seq_num = 1;
1081     current_msg_id  = 0;
1082     current_msg_type = COORD_MSG_TYPE_UNKNOWN;
1083     stall_for_completion = false;
1084     ft_event_state = OPAL_CRS_RUNNING;
1085 
1086     OBJ_CONSTRUCT(&ompi_crcp_bkmrk_pml_peer_refs, opal_list_t);
1087 
1088     OBJ_CONSTRUCT(&unknown_recv_from_list, opal_list_t);
1089     OBJ_CONSTRUCT(&unknown_persist_recv_list, opal_list_t);
1090 
1091     OBJ_CONSTRUCT(&drained_msg_ack_list, opal_list_t);
1092 
1093     
1094 
1095 
1096 
1097 
1098 
1099 
1100 
1101     OBJ_CONSTRUCT(&coord_state_free_list, opal_free_list_t);
1102     opal_free_list_init (&coord_state_free_list,
1103                          sizeof(ompi_crcp_bkmrk_pml_state_t),
1104                          opal_cache_line_size,
1105                          OBJ_CLASS(ompi_crcp_bkmrk_pml_state_t),
1106                          0,opal_cache_line_size,
1107                          4,  
1108                          -1, 
1109                          4,  
1110                          NULL, 0, NULL, NULL, NULL);
1111 
1112     OBJ_CONSTRUCT(&content_ref_free_list, opal_free_list_t);
1113     opal_free_list_init (&content_ref_free_list,
1114                          sizeof(ompi_crcp_bkmrk_pml_message_content_ref_t),
1115                          opal_cache_line_size,
1116                          OBJ_CLASS(ompi_crcp_bkmrk_pml_message_content_ref_t),
1117                          0,opal_cache_line_size,
1118                          80, 
1119                          -1, 
1120                          32, 
1121                          NULL, 0, NULL, NULL, NULL);
1122 
1123     OBJ_CONSTRUCT(&peer_ref_free_list, opal_free_list_t);
1124     opal_free_list_init (&peer_ref_free_list,
1125                          sizeof(ompi_crcp_bkmrk_pml_peer_ref_t),
1126                          opal_cache_line_size,
1127                          OBJ_CLASS(ompi_crcp_bkmrk_pml_peer_ref_t),
1128                          0,opal_cache_line_size,
1129                          16, 
1130                          -1, 
1131                          16, 
1132                          NULL, 0, NULL, NULL, NULL);
1133 
1134     OBJ_CONSTRUCT(&traffic_msg_ref_free_list, opal_free_list_t);
1135     opal_free_list_init (&traffic_msg_ref_free_list,
1136                          sizeof(ompi_crcp_bkmrk_pml_traffic_message_ref_t),
1137                          opal_cache_line_size,
1138                          OBJ_CLASS(ompi_crcp_bkmrk_pml_traffic_message_ref_t),
1139                          0,opal_cache_line_size,
1140                          32, 
1141                          -1, 
1142                          64, 
1143                          NULL, 0, NULL, NULL, NULL);
1144 
1145     OBJ_CONSTRUCT(&drain_msg_ref_free_list, opal_free_list_t);
1146     opal_free_list_init (&drain_msg_ref_free_list,
1147                          sizeof(ompi_crcp_bkmrk_pml_drain_message_ref_t),
1148                          opal_cache_line_size,
1149                          OBJ_CLASS(ompi_crcp_bkmrk_pml_drain_message_ref_t),
1150                          0,opal_cache_line_size,
1151                          32, 
1152                          -1, 
1153                          64, 
1154                          NULL, 0, NULL, NULL, NULL);
1155 
1156     OBJ_CONSTRUCT(&drain_ack_msg_ref_free_list, opal_free_list_t);
1157     opal_free_list_init (&drain_ack_msg_ref_free_list,
1158                          sizeof(ompi_crcp_bkmrk_pml_drain_message_ack_ref_t),
1159                          opal_cache_line_size,
1160                          OBJ_CLASS(ompi_crcp_bkmrk_pml_drain_message_ack_ref_t),
1161                          0,opal_cache_line_size,
1162                          16, 
1163                          -1, 
1164                          16, 
1165                          NULL, 0, NULL, NULL, NULL);
1166 
1167     clear_timers();
1168 
1169     if( timing_enabled > 0 ) {
1170         timer_label[CRCP_TIMER_TOTAL_CKPT]        = strdup("Total Ckpt.");
1171         timer_label[CRCP_TIMER_CKPT_EX_B]         = strdup("Exchange Bookmarks");
1172         timer_label[CRCP_TIMER_CKPT_EX_PEER_S]    = strdup("  Ex.Bk. Send Peer");
1173         timer_label[CRCP_TIMER_CKPT_EX_PEER_R]    = strdup("  Ex.Bk. Recv Peer");
1174         timer_label[CRCP_TIMER_CKPT_EX_WAIT]      = strdup("  Ex.Bk. Wait");
1175 
1176         timer_label[CRCP_TIMER_CKPT_CHECK_B]      = strdup("Check Bookmarks");
1177         timer_label[CRCP_TIMER_CKPT_CHECK_PEER_S] = strdup("  Ck.Bk. Send Peer");
1178         timer_label[CRCP_TIMER_CKPT_CHECK_PEER_R] = strdup("  Ck.Bk. Recv Peer");
1179 
1180         timer_label[CRCP_TIMER_CKPT_POST_DRAIN]   = strdup("Post Drain Msgs.");
1181         timer_label[CRCP_TIMER_CKPT_WAIT_QUI]     = strdup("Wait for Quiescence");
1182 
1183         timer_label[CRCP_TIMER_TOTAL_CONT]        = strdup("Total Continue");
1184 
1185         timer_label[CRCP_TIMER_TOTAL_RST]         = strdup("Total Restart");
1186     }
1187 
1188     return OMPI_SUCCESS;
1189 }
1190 
1191 int ompi_crcp_bkmrk_pml_finalize(void) {
1192     int i;
1193 
1194     current_msg_id = 0;
1195     current_msg_type = COORD_MSG_TYPE_UNKNOWN;
1196     stall_for_completion = false;
1197     ft_event_state = OPAL_CRS_RUNNING;
1198 
1199     OBJ_DESTRUCT(&ompi_crcp_bkmrk_pml_peer_refs);
1200 
1201     OBJ_DESTRUCT(&unknown_recv_from_list);
1202     OBJ_DESTRUCT(&unknown_persist_recv_list);
1203 
1204     OBJ_DESTRUCT(&drained_msg_ack_list);
1205 
1206     
1207     OBJ_DESTRUCT(&peer_ref_free_list);
1208     OBJ_DESTRUCT(&traffic_msg_ref_free_list);
1209     OBJ_DESTRUCT(&drain_msg_ref_free_list);
1210     OBJ_DESTRUCT(&drain_ack_msg_ref_free_list);
1211     OBJ_DESTRUCT(&content_ref_free_list);
1212 
1213     if( timing_enabled > 0 ) {
1214         for(i = 0; i < CRCP_TIMER_MAX; ++i) {
1215             free(timer_label[i]);
1216             timer_label[i] = NULL;
1217         }
1218     }
1219 
1220     return OMPI_SUCCESS;
1221 }
1222 
1223 
1224 
1225 
1226 
1227 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_enable(
1228                                   bool enable,
1229                                   ompi_crcp_base_pml_state_t* pml_state )
1230 {
1231     
1232     OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1233                         "crcp:bkmrk: pml_enable()"));
1234 
1235     pml_state->error_code = OMPI_SUCCESS;
1236     return pml_state;
1237 }
1238 
1239 
1240 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_progress(
1241                                   ompi_crcp_base_pml_state_t* pml_state)
1242 {
1243     
1244 
1245     OPAL_OUTPUT_VERBOSE((35, mca_crcp_bkmrk_component.super.output_handle,
1246                         "crcp:bkmrk: pml_progress()"));
1247 
1248     pml_state->error_code = OMPI_SUCCESS;
1249     return pml_state;
1250 }
1251 
1252 
1253 
1254 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_iprobe(
1255                                   int dst, int tag,
1256                                   struct ompi_communicator_t* comm,
1257                                   int *matched,
1258                                   ompi_status_public_t* status,
1259                                   ompi_crcp_base_pml_state_t* pml_state )
1260 {
1261     ompi_crcp_bkmrk_pml_drain_message_ref_t   *drain_msg_ref = NULL;
1262     ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
1263     int exit_status = OMPI_SUCCESS;
1264     int ret;
1265 
1266     OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1267                         "crcp:bkmrk: pml_iprobe(%d, %d)", dst, tag));
1268 
1269     
1270 
1271 
1272 
1273 
1274     if( OMPI_CRCP_PML_PRE == pml_state->state) {
1275         
1276 
1277 
1278         if( OMPI_SUCCESS != (ret = drain_message_find_any(PROBE_ANY_COUNT, tag, dst,
1279                                                           comm, PROBE_ANY_SIZE,
1280                                                           &drain_msg_ref,
1281                                                           &content_ref,
1282                                                           NULL) ) ) {
1283             ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_iprobe(): Failed trying to find a drained message.");
1284             exit_status = ret;
1285             goto DONE;
1286         }
1287 
1288         
1289 
1290 
1291 
1292 
1293         if( NULL != drain_msg_ref ) {
1294             OPAL_OUTPUT_VERBOSE((12, mca_crcp_bkmrk_component.super.output_handle,
1295                                  "crcp:bkmrk: pml_iprobe(): Matched a drained message..."));
1296 
1297             
1298             if( MPI_STATUS_IGNORE != status ) {
1299                 memcpy(status, &content_ref->status, sizeof(ompi_status_public_t));
1300             }
1301 
1302             
1303             *matched = 1;
1304 
1305             
1306             pml_state->state = OMPI_CRCP_PML_DONE;
1307             pml_state->error_code = OMPI_SUCCESS;
1308             return pml_state;
1309         }
1310         
1311 
1312 
1313         else {
1314             
1315             *matched = 0;
1316         }
1317     }
1318 
1319  DONE:
1320     pml_state->error_code = exit_status;
1321     return pml_state;
1322 }
1323 
1324 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_probe(
1325                                   int dst, int tag,
1326                                   struct ompi_communicator_t* comm,
1327                                   ompi_status_public_t* status,
1328                                   ompi_crcp_base_pml_state_t* pml_state )
1329 {
1330     ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref = NULL;
1331     ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
1332     int exit_status = OMPI_SUCCESS;
1333     int ret;
1334 
1335     OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1336                         "crcp:bkmrk: pml_probe(%d, %d)", dst, tag));
1337 
1338     
1339 
1340 
1341 
1342 
1343     if( OMPI_CRCP_PML_PRE == pml_state->state) {
1344         
1345 
1346 
1347         if( OMPI_SUCCESS != (ret = drain_message_find_any(PROBE_ANY_COUNT, tag, dst,
1348                                                           comm, PROBE_ANY_SIZE,
1349                                                           &drain_msg_ref,
1350                                                           &content_ref,
1351                                                           NULL) ) ) {
1352             ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_probe(): Failed trying to find a drained message.");
1353             exit_status = ret;
1354             goto DONE;
1355         }
1356 
1357         
1358 
1359 
1360 
1361         if( NULL != drain_msg_ref ) {
1362             OPAL_OUTPUT_VERBOSE((12, mca_crcp_bkmrk_component.super.output_handle,
1363                                  "crcp:bkmrk: pml_iprobe(): Matched a drained message..."));
1364 
1365             
1366             if( MPI_STATUS_IGNORE != status ) {
1367                 memcpy(status, &content_ref->status, sizeof(ompi_status_public_t));
1368             }
1369 
1370             
1371             pml_state->state = OMPI_CRCP_PML_DONE;
1372             pml_state->error_code = OMPI_SUCCESS;
1373             return pml_state;
1374         }
1375     }
1376 
1377  DONE:
1378     pml_state->error_code = exit_status;
1379     return pml_state;
1380 }
1381 
1382 
1383 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_dump(
1384                                   struct ompi_communicator_t* comm,
1385                                   int verbose,
1386                                   ompi_crcp_base_pml_state_t* pml_state )
1387 {
1388     OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1389                         "crcp:bkmrk: pml_dump()"));
1390 
1391     pml_state->error_code = OMPI_SUCCESS;
1392     return pml_state;
1393 }
1394 
1395 
1396 
1397 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_add_comm(
1398                                   struct ompi_communicator_t* comm,
1399                                   ompi_crcp_base_pml_state_t* pml_state )
1400 {
1401     
1402 
1403     OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1404                         "crcp:bkmrk: pml_add_comm()"));
1405 
1406     pml_state->error_code = OMPI_SUCCESS;
1407     return pml_state;
1408 }
1409 
1410 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_del_comm(
1411                                   struct ompi_communicator_t* comm,
1412                                   ompi_crcp_base_pml_state_t* pml_state )
1413 {
1414     
1415 
1416     OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1417                         "crcp:bkmrk: pml_del_comm()"));
1418 
1419     pml_state->error_code = OMPI_SUCCESS;
1420     return pml_state;
1421 }
1422 
1423 
1424 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_add_procs(
1425                                    struct ompi_proc_t **procs,
1426                                    size_t nprocs,
1427                                    ompi_crcp_base_pml_state_t* pml_state )
1428 {
1429     ompi_crcp_bkmrk_pml_peer_ref_t *new_peer_ref;
1430     size_t i;
1431 
1432     if( OMPI_CRCP_PML_PRE != pml_state->state ){
1433         goto DONE;
1434     }
1435 
1436     OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1437                         "crcp:bkmrk: pml_add_procs()"));
1438 
1439     
1440 
1441 
1442     wrapped_pml_component = pml_state->wrapped_pml_component;
1443     wrapped_pml_module    = pml_state->wrapped_pml_module;
1444 
1445     
1446 
1447 
1448     for( i = 0; i < nprocs; ++i) {
1449         HOKE_PEER_REF_ALLOC(new_peer_ref);
1450 
1451         new_peer_ref->proc_name.jobid  = OMPI_CAST_RTE_NAME(&procs[i]->super.proc_name)->jobid;
1452         new_peer_ref->proc_name.vpid   = OMPI_CAST_RTE_NAME(&procs[i]->super.proc_name)->vpid;
1453 
1454         opal_list_append(&ompi_crcp_bkmrk_pml_peer_refs, &(new_peer_ref->super));
1455     }
1456 
1457  DONE:
1458     pml_state->error_code = OMPI_SUCCESS;
1459     return pml_state;
1460 }
1461 
1462 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_del_procs(
1463                                   struct ompi_proc_t **procs,
1464                                   size_t nprocs,
1465                                   ompi_crcp_base_pml_state_t* pml_state )
1466 {
1467     opal_list_item_t *item = NULL;
1468     ompi_crcp_bkmrk_pml_peer_ref_t *old_peer_ref;
1469     int exit_status = OMPI_SUCCESS;
1470     size_t i;
1471 
1472     if( OMPI_CRCP_PML_PRE != pml_state->state ){
1473         goto DONE;
1474     }
1475 
1476     OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1477                         "crcp:bkmrk: pml_del_procs()"));
1478 
1479     for( i = 0; i < nprocs; ++i) {
1480         item = (opal_list_item_t*)find_peer(*(ompi_process_name_t*)&procs[i]->super.proc_name);
1481         if(NULL == item) {
1482             opal_output(mca_crcp_bkmrk_component.super.output_handle,
1483                         "crcp:bkmrk: del_procs: Unable to find peer %s\n",
1484                         OMPI_NAME_PRINT(&procs[i]->super.proc_name));
1485             exit_status = OMPI_ERROR;
1486             goto DONE;
1487         }
1488 
1489         
1490         opal_list_remove_item(&ompi_crcp_bkmrk_pml_peer_refs, item);
1491         old_peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
1492         HOKE_PEER_REF_RETURN(old_peer_ref);
1493     }
1494 
1495  DONE:
1496     pml_state->error_code = exit_status;
1497     return pml_state;
1498 }
1499 
1500 
1501 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_isend_init(
1502                                   void *buf, size_t count,
1503                                   ompi_datatype_t *datatype,
1504                                   int dst, int tag,
1505                                   mca_pml_base_send_mode_t mode,
1506                                   struct ompi_communicator_t* comm,
1507                                   struct ompi_request_t **request,
1508                                   ompi_crcp_base_pml_state_t* pml_state )
1509 {
1510     ompi_crcp_bkmrk_pml_peer_ref_t    *peer_ref    = NULL;
1511     ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref     = NULL;
1512     ompi_crcp_bkmrk_pml_state_t       *coord_state = NULL;
1513     int exit_status = OMPI_SUCCESS;
1514     int ret;
1515 
1516     OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1517                         "crcp:bkmrk: pml_isend_init()"));
1518 
1519     
1520 
1521 
1522 
1523     if( OMPI_CRCP_PML_PRE == pml_state->state ) {
1524         
1525 
1526 
1527         if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, dst, &peer_ref) ) ){
1528             opal_output(mca_crcp_bkmrk_component.super.output_handle,
1529                         "crcp:bkmrk: isend: Failed to find peer_ref\n");
1530             exit_status = ret;
1531             goto DONE;
1532         }
1533 
1534         
1535 
1536 
1537         traffic_message_append(peer_ref, &(peer_ref->send_init_list),
1538                                COORD_MSG_TYPE_P_SEND,
1539                                count, datatype, 0, tag, dst, comm,
1540                                &msg_ref);
1541 
1542         
1543         CREATE_COORD_STATE(coord_state, pml_state,
1544                            peer_ref, msg_ref);
1545 
1546         coord_state->p_super.error_code = OMPI_SUCCESS;
1547         return &coord_state->p_super;
1548     }
1549     
1550 
1551 
1552     else if( OMPI_CRCP_PML_POST == pml_state->state ) {
1553         ompi_crcp_base_pml_state_t *rtn_state = NULL;
1554         ompi_crcp_bkmrk_pml_message_content_ref_t *new_content = NULL;
1555 
1556         EXTRACT_COORD_STATE(pml_state, coord_state, rtn_state,
1557                            peer_ref,  msg_ref);
1558 
1559         
1560 
1561 
1562         HOKE_CONTENT_REF_ALLOC(new_content);
1563         new_content->buffer  =  buf;
1564         new_content->request = *request;
1565         new_content->done    =  false;
1566         new_content->active  =  false;
1567         new_content->already_posted  = true;
1568         new_content->already_drained = false;
1569         OBJ_RETAIN(*request);
1570         opal_list_append(&(msg_ref->msg_contents), &(new_content->super) );
1571 
1572         CRCP_COORD_STATE_RETURN(coord_state);
1573 
1574         rtn_state->error_code = OMPI_SUCCESS;
1575         return rtn_state;
1576     }
1577 
1578  DONE:
1579     pml_state->error_code = exit_status;
1580     return pml_state;
1581 }
1582 
1583 static int ompi_crcp_bkmrk_pml_start_isend_init(ompi_request_t **request)
1584 {
1585     int ret, exit_status = OMPI_SUCCESS;
1586     ompi_crcp_bkmrk_pml_peer_ref_t            *peer_ref = NULL;
1587     ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref  = NULL;
1588     ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
1589     mca_pml_base_request_t *breq = NULL;
1590     size_t tmp_ddt_size  = 0;
1591 
1592     breq = (mca_pml_base_request_t *)(*request);
1593     ompi_datatype_type_size(breq->req_datatype, &tmp_ddt_size);
1594 
1595     
1596 
1597 
1598     if( OMPI_SUCCESS != (ret = find_peer_in_comm(breq->req_comm,
1599                                                  breq->req_peer,
1600                                                  &peer_ref) ) ){
1601         opal_output(mca_crcp_bkmrk_component.super.output_handle,
1602                     "crcp:bkmrk: req_start(): Failed to find peer_ref\n");
1603         exit_status = ret;
1604         goto DONE;
1605     }
1606 
1607     
1608     if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->send_init_list),
1609                                                     breq->req_count,
1610                                                     breq->req_tag,
1611                                                     breq->req_peer,
1612                                                     breq->req_comm->c_contextid,
1613                                                     tmp_ddt_size,
1614                                                     &msg_ref,
1615                                                     PERSIST_MARKER
1616                                                     ) ) ) {
1617         opal_output(mca_crcp_bkmrk_component.super.output_handle,
1618                     "crcp:bkmrk: pml_start(): Unable to find the proper (send_init) message ref for this recv\n");
1619         exit_status = ret;
1620         goto DONE;
1621     }
1622 
1623     if( NULL == msg_ref ) {
1624         ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_start(): Could not find message ref");
1625         exit_status = OMPI_ERROR;
1626         goto DONE;
1627     } else {
1628         traffic_message_start(msg_ref,
1629                               peer_ref,
1630                               request,
1631                               &(peer_ref->send_init_list),
1632                               &content_ref);
1633 
1634         if( !content_ref->already_drained ) {
1635             
1636             peer_ref->total_msgs_sent += 1;
1637         }
1638     }
1639 
1640  DONE:
1641     return exit_status;
1642 }
1643 
1644 static int ompi_crcp_bkmrk_request_complete_isend_init(struct ompi_request_t *request,
1645                                                ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
1646                                                int src, int tag, int tmp_ddt_size)
1647 {
1648     int ret, exit_status = OMPI_SUCCESS;
1649     mca_pml_base_request_t *breq = NULL;
1650     ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL;
1651     ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
1652 
1653     breq = (mca_pml_base_request_t *)request;
1654 
1655     
1656     if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->send_init_list),
1657                                                     breq->req_count,
1658                                                     tag, src,
1659                                                     breq->req_comm->c_contextid,
1660                                                     tmp_ddt_size,
1661                                                     &msg_ref,
1662                                                     FIND_MSG_TRUE
1663                                                     ) ) ) {
1664         opal_output(mca_crcp_bkmrk_component.super.output_handle,
1665                     "crcp:bkmrk: req_complete: Unable to find the proper (send_init) message ref for this complete\n");
1666         exit_status = ret;
1667         goto DONE;
1668     }
1669 
1670     if( NULL == msg_ref ) {
1671         
1672 
1673 
1674 
1675 
1676 
1677 
1678 
1679 
1680 
1681 
1682         OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
1683                             "crcp:bkmrk: request_complete: No match found for this request :( %d, %d ): [%d/%d,%d]\n",
1684                              peer_ref->total_msgs_sent, peer_ref->total_msgs_recvd,
1685                              breq->req_peer, src, breq->req_comm->c_contextid));
1686         exit_status = OMPI_SUCCESS;
1687         goto DONE;
1688     }
1689 
1690     
1691     traffic_message_find_mark_persistent(msg_ref, &request,
1692                                          true,  
1693                                          false, 
1694                                          &content_ref);
1695 
1696     TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Request Complete (Send_init) --", true));
1697 
1698     if( !content_ref->already_drained ) {
1699         msg_ref->done++;
1700         msg_ref->active--;
1701     } else {
1702         msg_ref->active_drain--;
1703         content_ref->already_drained = false;
1704     }
1705 
1706     OPAL_OUTPUT_VERBOSE((25, mca_crcp_bkmrk_component.super.output_handle,
1707                          "crcp:bkmrk: req_complete: Marked Message... ( %d, %d )\n",
1708                          peer_ref->total_msgs_sent, peer_ref->total_msgs_recvd));
1709  DONE:
1710     return exit_status;
1711 }
1712 
1713 
1714 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_isend(
1715                                   void *buf, size_t count,
1716                                   ompi_datatype_t *datatype,
1717                                   int dst, int tag,
1718                                   mca_pml_base_send_mode_t mode,
1719                                   struct ompi_communicator_t* comm,
1720                                   struct ompi_request_t **request,
1721                                   ompi_crcp_base_pml_state_t* pml_state )
1722 {
1723     ompi_crcp_bkmrk_pml_peer_ref_t    *peer_ref    = NULL;
1724     ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref     = NULL;
1725     ompi_crcp_bkmrk_pml_state_t       *coord_state = NULL;
1726     int exit_status = OMPI_SUCCESS;
1727     int ret;
1728 
1729     OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1730                         "crcp:bkmrk: pml_isend()"));
1731 
1732     
1733 
1734 
1735 
1736     if( OMPI_CRCP_PML_PRE == pml_state->state ) {
1737         
1738 
1739 
1740         if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, dst, &peer_ref) ) ){
1741             opal_output(mca_crcp_bkmrk_component.super.output_handle,
1742                         "crcp:bkmrk: isend: Failed to find peer_ref\n");
1743             exit_status = ret;
1744             goto DONE;
1745         }
1746 
1747         
1748 
1749 
1750         traffic_message_append(peer_ref, &(peer_ref->isend_list),
1751                                COORD_MSG_TYPE_I_SEND,
1752                                count, datatype, 0, tag, dst, comm,
1753                                &msg_ref);
1754 
1755         
1756         peer_ref->total_msgs_sent += 1;
1757 
1758         
1759         CREATE_COORD_STATE(coord_state, pml_state,
1760                            peer_ref, msg_ref);
1761 
1762         coord_state->p_super.error_code = OMPI_SUCCESS;
1763         return &coord_state->p_super;
1764     }
1765     
1766 
1767 
1768     else if( OMPI_CRCP_PML_POST == pml_state->state ) {
1769         ompi_crcp_base_pml_state_t *rtn_state = NULL;
1770         ompi_crcp_bkmrk_pml_message_content_ref_t *new_content = NULL;
1771 
1772         EXTRACT_COORD_STATE(pml_state, coord_state, rtn_state,
1773                             peer_ref,  msg_ref);
1774 
1775         
1776 
1777 
1778         HOKE_CONTENT_REF_ALLOC(new_content);
1779         new_content->buffer  =  NULL; 
1780         new_content->request = *request;
1781         new_content->done    =  false;
1782         new_content->active  =  true;
1783         new_content->already_posted  = true;
1784         new_content->already_drained = false;
1785         OBJ_RETAIN(*request);
1786         opal_list_append(&(msg_ref->msg_contents), &(new_content->super) );
1787 
1788         TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Append Message (isend) --", true));
1789 
1790         CRCP_COORD_STATE_RETURN(coord_state);
1791 
1792         rtn_state->error_code = OMPI_SUCCESS;
1793         return rtn_state;
1794     }
1795 
1796  DONE:
1797     pml_state->error_code = exit_status;
1798     return pml_state;
1799 }
1800 
1801 static int ompi_crcp_bkmrk_request_complete_isend(struct ompi_request_t *request,
1802                                           ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
1803                                           int src, int tag, int tmp_ddt_size)
1804 {
1805     int ret, exit_status = OMPI_SUCCESS;
1806     mca_pml_base_request_t *breq = NULL;
1807     ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL;
1808     ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
1809 
1810     breq = (mca_pml_base_request_t *)request;
1811 
1812     
1813     if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->isend_list),
1814                                                     breq->req_count,
1815                                                     tag, src,
1816                                                     breq->req_comm->c_contextid,
1817                                                     tmp_ddt_size,
1818                                                     &msg_ref,
1819                                                     FIND_MSG_TRUE
1820                                                     ) ) ) {
1821         opal_output(mca_crcp_bkmrk_component.super.output_handle,
1822                     "crcp:bkmrk: req_complete: Unable to find the proper (isend) message ref for this complete\n");
1823         exit_status = ret;
1824         goto DONE;
1825     }
1826 
1827     if( NULL == msg_ref ) {
1828         
1829 
1830 
1831 
1832 
1833 
1834 
1835 
1836 
1837 
1838 
1839         OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
1840                             "crcp:bkmrk: request_complete: No match found for this request :( %d, %d ): [%d/%d,%d]\n",
1841                              peer_ref->total_msgs_sent, peer_ref->total_msgs_recvd,
1842                              breq->req_peer, src, breq->req_comm->c_contextid));
1843         exit_status = OMPI_SUCCESS;
1844         goto DONE;
1845     }
1846 
1847     OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
1848                          "crcp:bkmrk: req_complete: Matched an iSend: total = %d",
1849                          peer_ref->total_msgs_sent));
1850 
1851     
1852     traffic_message_grab_content(msg_ref, &content_ref, true, true); 
1853 
1854     if( !content_ref->already_drained ) {
1855         msg_ref->done++;
1856         msg_ref->active--;
1857     } else {
1858         msg_ref->active_drain--;
1859         content_ref->already_drained = false;
1860     }
1861     HOKE_CONTENT_REF_RETURN(content_ref);
1862 
1863     TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Request Complete (iSend) --", true));
1864 
1865     OPAL_OUTPUT_VERBOSE((25, mca_crcp_bkmrk_component.super.output_handle,
1866                          "crcp:bkmrk: req_complete: Marked Message... ( %d, %d )\n",
1867                          peer_ref->total_msgs_sent, peer_ref->total_msgs_recvd));
1868  DONE:
1869     return exit_status;
1870 }
1871 
1872 
1873 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_send(
1874                                   void *buf, size_t count,
1875                                   ompi_datatype_t *datatype,
1876                                   int dst, int tag,
1877                                   mca_pml_base_send_mode_t mode,
1878                                   struct ompi_communicator_t* comm,
1879                                   ompi_crcp_base_pml_state_t* pml_state )
1880 {
1881     ompi_crcp_bkmrk_pml_peer_ref_t    *peer_ref    = NULL;
1882     ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref     = NULL;
1883     ompi_crcp_bkmrk_pml_state_t       *coord_state = NULL;
1884     int exit_status = OMPI_SUCCESS;
1885     int ret;
1886 
1887     OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1888                         "crcp:bkmrk: pml_send()"));
1889 
1890     
1891 
1892 
1893 
1894     if( OMPI_CRCP_PML_PRE == pml_state->state ) {
1895         
1896 
1897 
1898         if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, dst, &peer_ref) ) ){
1899             opal_output(mca_crcp_bkmrk_component.super.output_handle,
1900                         "crcp:bkmrk: send: Failed to find peer_ref\n");
1901             exit_status = ret;
1902             goto DONE;
1903         }
1904 
1905         
1906 
1907 
1908         traffic_message_append(peer_ref, &(peer_ref->send_list),
1909                                COORD_MSG_TYPE_B_SEND,
1910                                count, datatype, 0, tag, dst, comm,
1911                                &msg_ref);
1912 
1913         
1914         peer_ref->total_msgs_sent += 1;
1915         current_msg_id = msg_ref->msg_id;
1916         current_msg_type = COORD_MSG_TYPE_B_SEND;
1917 
1918         
1919         CREATE_COORD_STATE(coord_state, pml_state,
1920                            peer_ref, msg_ref);
1921         coord_state->p_super.error_code = OMPI_SUCCESS;
1922 
1923         return &coord_state->p_super;
1924     }
1925     
1926 
1927 
1928     else if( OMPI_CRCP_PML_POST == pml_state->state ) {
1929         ompi_crcp_base_pml_state_t *rtn_state = NULL;
1930 
1931         EXTRACT_COORD_STATE(pml_state, coord_state, rtn_state,
1932                             peer_ref,  msg_ref);
1933 
1934         
1935 
1936 
1937         msg_ref->done++;
1938         msg_ref->active--;
1939 
1940         current_msg_id = 0;
1941         current_msg_type = COORD_MSG_TYPE_UNKNOWN;
1942 
1943         TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "Send done", true));
1944 
1945         CRCP_COORD_STATE_RETURN(coord_state);
1946         rtn_state->error_code = OMPI_SUCCESS;
1947 
1948         return rtn_state;
1949     }
1950 
1951  DONE:
1952     pml_state->error_code = exit_status;
1953     return pml_state;
1954 }
1955 
1956 
1957 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_irecv_init(
1958                                   void *buf, size_t count,
1959                                   ompi_datatype_t *datatype,
1960                                   int src, int tag,
1961                                   struct ompi_communicator_t* comm,
1962                                   struct ompi_request_t **request,
1963                                   ompi_crcp_base_pml_state_t* pml_state)
1964 {
1965     ompi_crcp_bkmrk_pml_peer_ref_t    *peer_ref      = NULL;
1966     ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref       = NULL;
1967     ompi_crcp_bkmrk_pml_state_t       *coord_state   = NULL;
1968     int exit_status = OMPI_SUCCESS;
1969     int ret;
1970 
1971     OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1972                         "crcp:bkmrk: pml_irecv_init()"));
1973 
1974     
1975 
1976 
1977 
1978 
1979     if( OMPI_CRCP_PML_PRE == pml_state->state) {
1980         
1981 
1982 
1983 
1984 
1985 
1986 
1987         
1988 
1989 
1990         if( MPI_ANY_SOURCE == src || src < 0) {
1991             
1992 
1993 
1994             traffic_message_append(NULL, &(unknown_persist_recv_list),
1995                                    COORD_MSG_TYPE_P_RECV,
1996                                    count, datatype, 0, tag, src, comm,
1997                                    &msg_ref);
1998 
1999             CREATE_COORD_STATE(coord_state, pml_state,
2000                                NULL, msg_ref);
2001         }
2002         else {
2003             if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, src, &peer_ref) ) ){
2004                 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2005                             "crcp:bkmrk: recv: Failed to find peer_ref\n");
2006                 exit_status = ret;
2007                 goto DONE;
2008             }
2009 
2010             
2011 
2012 
2013             traffic_message_append(peer_ref, &(peer_ref->recv_init_list),
2014                                    COORD_MSG_TYPE_P_RECV,
2015                                    count, datatype, 0, tag, src, comm,
2016                                    &msg_ref);
2017 
2018             CREATE_COORD_STATE(coord_state, pml_state,
2019                                peer_ref, msg_ref);
2020         }
2021 
2022         coord_state->p_super.error_code = OMPI_SUCCESS;
2023         return &coord_state->p_super;
2024     }
2025     
2026 
2027 
2028 
2029     else if( OMPI_CRCP_PML_POST == pml_state->state) {
2030         ompi_crcp_base_pml_state_t *rtn_state = NULL;
2031         ompi_crcp_bkmrk_pml_message_content_ref_t *new_content = NULL;
2032 
2033         EXTRACT_COORD_STATE(pml_state, coord_state, rtn_state,
2034                             peer_ref,  msg_ref);
2035 
2036         
2037 
2038 
2039         HOKE_CONTENT_REF_ALLOC(new_content);
2040         new_content->buffer  =  buf;
2041         new_content->request = *request;
2042         new_content->done    =  false;
2043         new_content->active  =  false;
2044         new_content->already_posted  = true;
2045         new_content->already_drained = false;
2046         OBJ_RETAIN(*request);
2047         opal_list_append(&(msg_ref->msg_contents), &(new_content->super) );
2048 
2049         CRCP_COORD_STATE_RETURN(coord_state);
2050 
2051         rtn_state->error_code = OMPI_SUCCESS;
2052         return rtn_state;
2053     }
2054 
2055  DONE:
2056     pml_state->error_code = exit_status;
2057     return pml_state;
2058 }
2059 
2060 static int ompi_crcp_bkmrk_pml_start_drain_irecv_init(ompi_request_t **request, bool *found_drain)
2061 {
2062     int ret, exit_status = OMPI_SUCCESS;
2063     ompi_crcp_bkmrk_pml_peer_ref_t            *peer_ref = NULL;
2064     ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref  = NULL;
2065     ompi_crcp_bkmrk_pml_drain_message_ref_t   *drain_msg_ref = NULL;
2066     ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
2067     mca_pml_base_request_t *breq = NULL;
2068     size_t tmp_ddt_size  = 0;
2069 
2070     *found_drain = false;
2071 
2072     breq = (mca_pml_base_request_t *)(*request);
2073     ompi_datatype_type_size(breq->req_datatype, &tmp_ddt_size);
2074 
2075     
2076 
2077 
2078     if( 0 <= breq->req_peer ) {
2079         if( OMPI_SUCCESS != (ret = find_peer_in_comm(breq->req_comm,
2080                                                      breq->req_peer,
2081                                                      &peer_ref) ) ){
2082             opal_output(mca_crcp_bkmrk_component.super.output_handle,
2083                         "crcp:bkmrk: pml_start(): Failed to find peer_ref\n");
2084             exit_status = ret;
2085             goto DONE;
2086         }
2087 
2088         if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->recv_init_list),
2089                                                         breq->req_count,
2090                                                         breq->req_tag,
2091                                                         breq->req_peer,
2092                                                         breq->req_comm->c_contextid,
2093                                                         tmp_ddt_size,
2094                                                         &msg_ref,
2095                                                         PERSIST_MARKER
2096                                                         ) ) ) {
2097             opal_output(mca_crcp_bkmrk_component.super.output_handle,
2098                         "crcp:bkmrk: pml_start(): Unable to find the proper (recv) message ref for this recv\n");
2099             exit_status = ret;
2100             goto DONE;
2101         }
2102     }
2103     
2104 
2105 
2106     else {
2107         if( OMPI_SUCCESS != (ret = traffic_message_find(&(unknown_persist_recv_list),
2108                                                         breq->req_count,
2109                                                         breq->req_tag,
2110                                                         INVALID_INT,
2111                                                         breq->req_comm->c_contextid,
2112                                                         tmp_ddt_size,
2113                                                         &msg_ref,
2114                                                         PERSIST_MARKER
2115                                                         ) ) ) {
2116             opal_output(mca_crcp_bkmrk_component.super.output_handle,
2117                         "crcp:bkmrk: pml_start(): Unable to find the proper (recv) message ref for this recv\n");
2118             exit_status = ret;
2119             goto DONE;
2120         }
2121     }
2122 
2123     
2124 
2125 
2126     if( NULL == msg_ref ) {
2127         ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_start(): Could not find message ref");
2128         exit_status = OMPI_ERROR;
2129         goto DONE;
2130     }
2131 
2132     
2133 
2134 
2135     if( NULL != peer_ref ) {
2136         if( OMPI_SUCCESS != (ret = drain_message_find(&(peer_ref->drained_list),
2137                                                       msg_ref->count, msg_ref->tag, msg_ref->rank,
2138                                                       msg_ref->comm->c_contextid, msg_ref->ddt_size,
2139                                                       &drain_msg_ref,
2140                                                       &content_ref) ) ) {
2141             ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_start(): Failed trying to find a drained message.");
2142             exit_status = ret;
2143             goto DONE;
2144         }
2145     } else {
2146         if( OMPI_SUCCESS != (ret = drain_message_find_any(msg_ref->count, msg_ref->tag, msg_ref->rank,
2147                                                           msg_ref->comm, msg_ref->ddt_size,
2148                                                           &drain_msg_ref,
2149                                                           &content_ref,
2150                                                           &peer_ref) ) ) {
2151             ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_start(): Failed trying to find a drained message.");
2152             exit_status = ret;
2153             goto DONE;
2154         }
2155     }
2156 
2157     
2158 
2159 
2160     if( NULL != drain_msg_ref ) {
2161         *found_drain = true;
2162         OPAL_OUTPUT_VERBOSE((12, mca_crcp_bkmrk_component.super.output_handle,
2163                              "crcp:bkmrk: pml_start(): Matched a drained message..."));
2164 
2165         if( OMPI_SUCCESS != (ret = drain_message_copy_remove_persistent(drain_msg_ref,
2166                                                                         content_ref,
2167                                                                         msg_ref,
2168                                                                         *request,
2169                                                                         peer_ref) ) ) {
2170             opal_output( mca_crcp_bkmrk_component.super.output_handle,
2171                          "crcp:bkmrk: pml_start(): Datatype copy failed (%d)",
2172                          ret);
2173         }
2174 
2175         peer_ref->total_drained_msgs -= 1;
2176     }
2177 
2178  DONE:
2179     return exit_status;
2180 }
2181 
2182 static int ompi_crcp_bkmrk_pml_start_irecv_init(ompi_request_t **request)
2183 {
2184     int ret, exit_status = OMPI_SUCCESS;
2185     ompi_crcp_bkmrk_pml_peer_ref_t            *peer_ref = NULL;
2186     ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref  = NULL;
2187     mca_pml_base_request_t *breq = NULL;
2188     size_t tmp_ddt_size  = 0;
2189 
2190     breq = (mca_pml_base_request_t *)(*request);
2191     ompi_datatype_type_size(breq->req_datatype, &tmp_ddt_size);
2192 
2193     
2194 
2195 
2196     if( 0 <= breq->req_peer ) {
2197         if( OMPI_SUCCESS != (ret = find_peer_in_comm(breq->req_comm,
2198                                                      breq->req_peer,
2199                                                      &peer_ref) ) ){
2200             opal_output(mca_crcp_bkmrk_component.super.output_handle,
2201                         "crcp:bkmrk: pml_start(): Failed to find peer_ref\n");
2202             exit_status = ret;
2203             goto DONE;
2204         }
2205 
2206         if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->recv_init_list),
2207                                                         breq->req_count,
2208                                                         breq->req_tag,
2209                                                         breq->req_peer,
2210                                                         breq->req_comm->c_contextid,
2211                                                         tmp_ddt_size,
2212                                                         &msg_ref,
2213                                                         PERSIST_MARKER
2214                                                         ) ) ) {
2215             opal_output(mca_crcp_bkmrk_component.super.output_handle,
2216                         "crcp:bkmrk: pml_start(): Unable to find the proper (recv) message ref for this recv\n");
2217             exit_status = ret;
2218             goto DONE;
2219         }
2220 
2221         if( NULL != msg_ref ) {
2222             traffic_message_start(msg_ref,
2223                                   peer_ref,
2224                                   request,
2225                                   &(peer_ref->recv_init_list),
2226                                   NULL);
2227         }
2228     }
2229     
2230 
2231 
2232     else {
2233         if( OMPI_SUCCESS != (ret = traffic_message_find(&(unknown_persist_recv_list),
2234                                                         breq->req_count,
2235                                                         breq->req_tag,
2236                                                         INVALID_INT,
2237                                                         breq->req_comm->c_contextid,
2238                                                         tmp_ddt_size,
2239                                                         &msg_ref,
2240                                                         PERSIST_MARKER
2241                                                         ) ) ) {
2242             opal_output(mca_crcp_bkmrk_component.super.output_handle,
2243                         "crcp:bkmrk: pml_start(): Unable to find the proper (recv) message ref for this recv\n");
2244             exit_status = ret;
2245             goto DONE;
2246         }
2247 
2248         if( NULL != msg_ref ) {
2249             traffic_message_start(msg_ref,
2250                                   NULL,
2251                                   request,
2252                                   &(unknown_persist_recv_list),
2253                                   NULL);
2254         }
2255     }
2256 
2257     if( NULL == msg_ref ) {
2258         ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_start(): Could not find message ref");
2259         exit_status = OMPI_ERROR;
2260         goto DONE;
2261     }
2262 
2263  DONE:
2264     return exit_status;
2265 }
2266 
2267 static int ompi_crcp_bkmrk_request_complete_irecv_init(struct ompi_request_t *request,
2268                                                ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
2269                                                int src, int tag, int tmp_ddt_size)
2270 {
2271     int ret, exit_status = OMPI_SUCCESS;
2272     mca_pml_base_request_t *breq = NULL;
2273     ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref  = NULL, *new_msg_ref = NULL;
2274     ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
2275 
2276     breq = (mca_pml_base_request_t *)request;
2277 
2278     
2279 
2280 
2281     if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->recv_init_list),
2282                                                     breq->req_count,
2283                                                     tag, src,
2284                                                     breq->req_comm->c_contextid,
2285                                                     tmp_ddt_size,
2286                                                     &msg_ref,
2287                                                     FIND_MSG_TRUE
2288                                                     ) ) ) {
2289         opal_output(mca_crcp_bkmrk_component.super.output_handle,
2290                     "crcp:bkmrk: req_complete: Unable to find the proper (recv_init) message ref for this complete\n");
2291         exit_status = ret;
2292         goto DONE;
2293     }
2294 
2295     
2296 
2297 
2298     if( NULL == msg_ref ) {
2299         if( OMPI_SUCCESS != (ret = traffic_message_find(&(unknown_persist_recv_list),
2300                                                         breq->req_count,
2301                                                         tag,
2302                                                         INVALID_INT,
2303                                                         breq->req_comm->c_contextid,
2304                                                         tmp_ddt_size,
2305                                                         &msg_ref,
2306                                                         FIND_MSG_TRUE
2307                                                         ) ) ) {
2308             opal_output(mca_crcp_bkmrk_component.super.output_handle,
2309                         "crcp:bkmrk: requ_complete: Unable to find the proper (recv_init) message ref for this complete\n");
2310             exit_status = ret;
2311             goto DONE;
2312         }
2313 
2314         if( NULL != msg_ref ) {
2315             traffic_message_move(msg_ref,
2316                                  COORD_MSG_TYPE_P_RECV,
2317                                  NULL, &(unknown_persist_recv_list),
2318                                  peer_ref, &(peer_ref->recv_init_list),
2319                                  &new_msg_ref,
2320                                  true,
2321                                  false);
2322             msg_ref = new_msg_ref;
2323         }
2324     }
2325 
2326     
2327 
2328 
2329     if( NULL == msg_ref ) {
2330         
2331 
2332 
2333 
2334 
2335 
2336 
2337 
2338 
2339 
2340 
2341         OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
2342                             "crcp:bkmrk: request_complete: No match found for this request :( %d, %d ): [%d/%d,%d]\n",
2343                              peer_ref->total_msgs_sent, peer_ref->total_msgs_recvd,
2344                              breq->req_peer, src, breq->req_comm->c_contextid));
2345         exit_status = OMPI_SUCCESS;
2346         goto DONE;
2347     }
2348 
2349     
2350 
2351 
2352 
2353     traffic_message_find_mark_persistent(msg_ref, &request,
2354                                          true,  
2355                                          false, 
2356                                          &content_ref);
2357     if( NULL == content_ref ) {
2358         exit_status = OMPI_ERROR;
2359         goto DONE;
2360     }
2361 
2362     if( !content_ref->already_drained ) {
2363         peer_ref->total_msgs_recvd += 1;
2364         msg_ref->done++;
2365         msg_ref->active--;
2366     } else {
2367         msg_ref->active_drain--;
2368         content_ref->already_drained = false;
2369     }
2370 
2371     
2372 
2373     if( NULL == new_msg_ref ) {
2374         TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Request Complete (Recv_Init) --", true));
2375     } else {
2376         TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Request Complete (Recv_init - Unknown) --", true));
2377     }
2378 
2379     OPAL_OUTPUT_VERBOSE((25, mca_crcp_bkmrk_component.super.output_handle,
2380                          "crcp:bkmrk: req_complete: Marked Message... ( %d, %d )\n",
2381                          peer_ref->total_msgs_sent, peer_ref->total_msgs_recvd));
2382  DONE:
2383     return exit_status;
2384 }
2385 
2386 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_irecv(
2387                                   void *buf, size_t count,
2388                                   ompi_datatype_t *datatype,
2389                                   int src, int tag,
2390                                   struct ompi_communicator_t* comm,
2391                                   struct ompi_request_t **request,
2392                                   ompi_crcp_base_pml_state_t* pml_state )
2393 {
2394     int ret, exit_status = OMPI_SUCCESS;
2395     ompi_crcp_bkmrk_pml_peer_ref_t    *peer_ref      = NULL;
2396     ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref       = NULL;
2397     ompi_crcp_bkmrk_pml_state_t       *coord_state   = NULL;
2398     bool found_drain = false;
2399 
2400     OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
2401                          "crcp:bkmrk: pml_irecv()"));
2402 
2403     
2404 
2405 
2406 
2407 
2408     if( OMPI_CRCP_PML_PRE == pml_state->state) {
2409         
2410 
2411 
2412         found_drain = false;
2413         if( OMPI_SUCCESS != (ret = drain_message_check_recv(buf, count, datatype,
2414                                                             &src, &tag, comm, request, NULL,
2415                                                             &found_drain) ) ) {
2416             ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_recv(): Failed trying to find a drained message.");
2417             exit_status = ret;
2418             goto DONE;
2419         }
2420 
2421         if( found_drain ) {
2422             
2423 
2424 
2425 
2426 
2427             
2428             pml_state->state = OMPI_CRCP_PML_DONE;
2429             pml_state->error_code = OMPI_SUCCESS;
2430             return pml_state;
2431         }
2432         
2433 
2434 
2435         else {
2436             
2437 
2438 
2439             if( MPI_ANY_SOURCE == src || src < 0) {
2440                 
2441 
2442 
2443                 traffic_message_append(NULL, &(unknown_recv_from_list),
2444                                        COORD_MSG_TYPE_I_RECV,
2445                                        count, datatype, 0, tag, src, comm,
2446                                        &msg_ref);
2447 
2448                 CREATE_COORD_STATE(coord_state, pml_state,
2449                                   NULL, msg_ref);
2450             }
2451             else {
2452                 if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, src, &peer_ref) ) ){
2453                     opal_output(mca_crcp_bkmrk_component.super.output_handle,
2454                                 "crcp:bkmrk: pml_irecv(): Failed to find peer_ref\n");
2455                     exit_status = ret;
2456                     goto DONE;
2457                 }
2458 
2459                 
2460 
2461 
2462                 traffic_message_append(peer_ref, &(peer_ref->irecv_list),
2463                                        COORD_MSG_TYPE_I_RECV,
2464                                        count, datatype, 0, tag, src, comm,
2465                                        &msg_ref);
2466 
2467                 CREATE_COORD_STATE(coord_state, pml_state,
2468                                    peer_ref, msg_ref);
2469             }
2470 
2471             coord_state->p_super.error_code = OMPI_SUCCESS;
2472             return &coord_state->p_super;
2473         }
2474     }
2475     
2476 
2477 
2478 
2479     else if( OMPI_CRCP_PML_POST == pml_state->state) {
2480         ompi_crcp_base_pml_state_t *rtn_state = NULL;
2481         ompi_crcp_bkmrk_pml_message_content_ref_t *new_content = NULL;
2482 
2483         EXTRACT_COORD_STATE(pml_state, coord_state, rtn_state,
2484                             peer_ref,  msg_ref);
2485 
2486         
2487 
2488 
2489         HOKE_CONTENT_REF_ALLOC(new_content);
2490         new_content->buffer  =  NULL; 
2491         new_content->request = *request;
2492         new_content->done    =  false;
2493         new_content->active  =  true;
2494         new_content->already_posted  = true;
2495         new_content->already_drained = false;
2496         OBJ_RETAIN(*request);
2497         opal_list_append(&(msg_ref->msg_contents), &(new_content->super) );
2498 
2499         TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Append Message (irecv) --", true));
2500 
2501         CRCP_COORD_STATE_RETURN(coord_state);
2502 
2503         rtn_state->error_code = OMPI_SUCCESS;
2504         return rtn_state;
2505     }
2506 
2507  DONE:
2508     pml_state->error_code = exit_status;
2509     return pml_state;
2510 }
2511 
2512 static int ompi_crcp_bkmrk_request_complete_irecv(struct ompi_request_t *request,
2513                                           ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
2514                                           int src, int tag, int tmp_ddt_size)
2515 {
2516     int ret, exit_status = OMPI_SUCCESS;
2517     mca_pml_base_request_t *breq = NULL;
2518     ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref  = NULL, *new_msg_ref = NULL;
2519     ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
2520 
2521     breq = (mca_pml_base_request_t *)request;
2522 
2523     
2524 
2525 
2526     if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->irecv_list),
2527                                                     breq->req_count,
2528                                                     tag, src,
2529                                                     breq->req_comm->c_contextid,
2530                                                     tmp_ddt_size,
2531                                                     &msg_ref,
2532                                                     FIND_MSG_TRUE
2533                                                     ) ) ) {
2534         opal_output(mca_crcp_bkmrk_component.super.output_handle,
2535                     "crcp:bkmrk: req_complete: Unable to find the proper (irecv) message ref for this complete\n");
2536         exit_status = ret;
2537         goto DONE;
2538     }
2539 
2540     
2541 
2542 
2543     if( NULL == msg_ref ) {
2544         if( OMPI_SUCCESS != (ret = traffic_message_find(&(unknown_recv_from_list),
2545                                                         breq->req_count,
2546                                                         tag,
2547                                                         INVALID_INT,
2548                                                         breq->req_comm->c_contextid,
2549                                                         tmp_ddt_size,
2550                                                         &msg_ref,
2551                                                         FIND_MSG_TRUE
2552                                                         ) ) ) {
2553             opal_output(mca_crcp_bkmrk_component.super.output_handle,
2554                         "crcp:bkmrk: req_complete: Unable to find the proper (recv_init) message ref for this complete\n");
2555             exit_status = ret;
2556             goto DONE;
2557         }
2558 
2559         if( NULL != msg_ref ) {
2560             traffic_message_move(msg_ref,
2561                                  COORD_MSG_TYPE_I_RECV,
2562                                  NULL, &(unknown_recv_from_list),
2563                                  peer_ref, &(peer_ref->irecv_list),
2564                                  &new_msg_ref,
2565                                  true,
2566                                  true);
2567             msg_ref = new_msg_ref;
2568         }
2569     }
2570 
2571     
2572 
2573 
2574     if( NULL == msg_ref ) {
2575         
2576 
2577 
2578 
2579 
2580 
2581 
2582 
2583 
2584 
2585 
2586         OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
2587                             "crcp:bkmrk: request_complete: No match found for this request :( %d, %d ): [%d/%d,%d]\n",
2588                              peer_ref->total_msgs_sent, peer_ref->total_msgs_recvd,
2589                              breq->req_peer, src, breq->req_comm->c_contextid));
2590         exit_status = OMPI_SUCCESS;
2591         goto DONE;
2592     }
2593 
2594     
2595 
2596 
2597     traffic_message_grab_content(msg_ref, &content_ref, true, true); 
2598 
2599     if( !content_ref->already_drained ) {
2600         peer_ref->total_msgs_recvd += 1;
2601         msg_ref->done++;
2602         msg_ref->active--;
2603     } else {
2604         msg_ref->active_drain--;
2605         content_ref->already_drained = false;
2606     }
2607 
2608     HOKE_CONTENT_REF_RETURN(content_ref);
2609 
2610     if( NULL == new_msg_ref ) {
2611         TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Request Complete (iRecv) --", true));
2612     } else {
2613         TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Request Complete (iRecv - Unknown) --", true));
2614     }
2615 
2616     OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
2617                          "crcp:bkmrk: req_complete: Matched an iRecv: total = %d",
2618                          peer_ref->total_msgs_recvd));
2619 
2620  DONE:
2621     return exit_status;
2622 }
2623 
2624 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_recv(
2625                                   void *buf, size_t count,
2626                                   ompi_datatype_t *datatype,
2627                                   int src, int tag,
2628                                   struct ompi_communicator_t* comm,
2629                                   ompi_status_public_t* status,
2630                                   ompi_crcp_base_pml_state_t* pml_state)
2631 {
2632     ompi_crcp_bkmrk_pml_peer_ref_t    *peer_ref      = NULL;
2633     ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref       = NULL, *new_msg_ref = NULL;
2634     ompi_crcp_bkmrk_pml_state_t       *coord_state   = NULL;
2635     bool found_drain = false;
2636     int exit_status = OMPI_SUCCESS;
2637     int ret;
2638 
2639     OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
2640                          "crcp:bkmrk: pml_recv()"));
2641 
2642     
2643 
2644 
2645 
2646 
2647     if( OMPI_CRCP_PML_PRE == pml_state->state) {
2648         
2649 
2650 
2651         found_drain = false;
2652         if( OMPI_SUCCESS != (ret = drain_message_check_recv(buf, count, datatype,
2653                                                             &src, &tag, comm, NULL, &status,
2654                                                             &found_drain) ) ) {
2655             ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_recv(): Failed trying to find a drained message.");
2656             exit_status = ret;
2657             goto DONE;
2658         }
2659 
2660         if( found_drain ) {
2661             
2662 
2663 
2664 
2665 
2666             
2667             pml_state->state = OMPI_CRCP_PML_DONE;
2668             pml_state->error_code = OMPI_SUCCESS;
2669             return pml_state;
2670         }
2671         
2672 
2673 
2674         else {
2675             
2676 
2677 
2678             if( MPI_ANY_SOURCE == src || src < 0) {
2679                 traffic_message_append(NULL, &(unknown_recv_from_list),
2680                                        COORD_MSG_TYPE_B_RECV,
2681                                        count, datatype, 0, tag, src, comm,
2682                                        &msg_ref);
2683 
2684                 CREATE_COORD_STATE(coord_state, pml_state,
2685                                    NULL, msg_ref);
2686             }
2687             else {
2688                 if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, src, &peer_ref) ) ){
2689                     opal_output(mca_crcp_bkmrk_component.super.output_handle,
2690                                 "crcp:bkmrk: pml_recv(): Failed to find peer_ref\n");
2691                     exit_status = ret;
2692                     goto DONE;
2693                 }
2694 
2695                 traffic_message_append(peer_ref, &(peer_ref->recv_list),
2696                                        COORD_MSG_TYPE_B_RECV,
2697                                        count, datatype, 0, tag, src, comm,
2698                                        &msg_ref);
2699 
2700                 CREATE_COORD_STATE(coord_state, pml_state,
2701                                    peer_ref, msg_ref);
2702             }
2703 
2704             
2705             current_msg_id = msg_ref->msg_id;
2706             current_msg_type = COORD_MSG_TYPE_B_RECV;
2707 
2708             coord_state->p_super.error_code = OMPI_SUCCESS;
2709             return &coord_state->p_super;
2710         }
2711     }
2712     
2713 
2714 
2715 
2716     else if( OMPI_CRCP_PML_POST == pml_state->state) {
2717         ompi_crcp_base_pml_state_t *rtn_state = NULL;
2718 
2719         EXTRACT_COORD_STATE(pml_state, coord_state, rtn_state,
2720                             peer_ref,  msg_ref);
2721 
2722         
2723 
2724 
2725 
2726         if( NULL == peer_ref ) {
2727             src = status->MPI_SOURCE;
2728 
2729             if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, src, &peer_ref) ) ){
2730                 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2731                             "crcp:bkmrk: pml_recv(): Failed to resolve peer_ref (rank %d)\n",
2732                             src);
2733                 exit_status = ret;
2734                 goto DONE;
2735             }
2736 
2737             traffic_message_move(msg_ref,
2738                                  COORD_MSG_TYPE_B_RECV,
2739                                  NULL, &(unknown_recv_from_list),
2740                                  peer_ref, &(peer_ref->recv_list),
2741                                  &new_msg_ref,
2742                                  false,
2743                                  true);
2744             new_msg_ref->done++;
2745             new_msg_ref->active--;
2746         } else {
2747             
2748 
2749 
2750             msg_ref->done++;
2751             msg_ref->active--;
2752         }
2753 
2754         peer_ref->total_msgs_recvd += 1;
2755         current_msg_id = 0;
2756         current_msg_type = COORD_MSG_TYPE_UNKNOWN;
2757 
2758         TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "Recv Done", true));
2759 
2760         CRCP_COORD_STATE_RETURN(coord_state);
2761 
2762         rtn_state->error_code = OMPI_SUCCESS;
2763         return rtn_state;
2764     }
2765 
2766  DONE:
2767     pml_state->error_code = exit_status;
2768     return pml_state;
2769 }
2770 
2771 
2772 
2773 
2774 static ompi_request_type_t * coord_start_req_types = NULL;
2775 
2776 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_start(
2777                                   size_t count,
2778                                   ompi_request_t** requests,
2779                                   ompi_crcp_base_pml_state_t* pml_state )
2780 {
2781     int ret, exit_status = OMPI_SUCCESS;
2782     mca_pml_base_request_t *breq = NULL;
2783     size_t tmp_ddt_size  = 0;
2784     size_t iter_req;
2785     bool found_drain = false;
2786 
2787     OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
2788                         "crcp:bkmrk: pml_start()"));
2789 
2790     
2791 
2792 
2793     if( OMPI_CRCP_PML_POST == pml_state->state ) {
2794         for(iter_req = 0; iter_req < count; iter_req++) {
2795             breq = (mca_pml_base_request_t *)requests[iter_req];
2796             if(breq->req_type == MCA_PML_REQUEST_SEND ) {
2797                 if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_pml_start_isend_init(&(requests[iter_req]))) ) {
2798                     exit_status = ret;
2799                     goto DONE;
2800                 }
2801             }
2802         }
2803     }
2804 
2805     
2806 
2807 
2808 
2809 
2810     if( OMPI_CRCP_PML_PRE == pml_state->state ) {
2811         
2812 
2813 
2814         coord_start_req_types = (ompi_request_type_t *)malloc(sizeof(ompi_request_type_t) * count);
2815         for(iter_req = 0; iter_req < count; iter_req++) {
2816             coord_start_req_types[iter_req] = OMPI_REQUEST_NOOP;
2817         }
2818 
2819         for(iter_req = 0; iter_req < count; iter_req++) {
2820             breq = (mca_pml_base_request_t *)requests[iter_req];
2821             ompi_datatype_type_size(breq->req_datatype, &tmp_ddt_size);
2822 
2823             if( breq->req_type == MCA_PML_REQUEST_RECV ) {
2824                 found_drain = false;
2825                 if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_pml_start_drain_irecv_init(&(requests[iter_req]), &found_drain)) ) {
2826                     exit_status = ret;
2827                     goto DONE;
2828                 }
2829 
2830                 if( found_drain ) {
2831                     coord_start_req_types[iter_req] = requests[iter_req]->req_type;
2832                     requests[iter_req]->req_type = OMPI_REQUEST_NOOP;
2833                     requests[iter_req]->req_complete = true;
2834                 }
2835             }
2836         }
2837         goto DONE;
2838     }
2839     else if( OMPI_CRCP_PML_POST == pml_state->state) {
2840         for(iter_req = 0; iter_req < count; iter_req++) {
2841             breq = (mca_pml_base_request_t *)requests[iter_req];
2842             ompi_datatype_type_size(breq->req_datatype, &tmp_ddt_size);
2843 
2844             if (breq->req_type == MCA_PML_REQUEST_RECV) {
2845                 
2846 
2847 
2848 
2849 
2850 
2851                 if( NULL != coord_start_req_types ) {
2852                     if( OMPI_REQUEST_NOOP != coord_start_req_types[iter_req] ) {
2853                         requests[iter_req]->req_type = coord_start_req_types[iter_req];
2854                         continue;
2855                     }
2856                 }
2857 
2858                 if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_pml_start_irecv_init(&(requests[iter_req]))) ) {
2859                     exit_status = ret;
2860                     goto DONE;
2861                 }
2862             }
2863         }
2864 
2865         
2866 
2867 
2868         if( NULL != coord_start_req_types ) {
2869             free(coord_start_req_types);
2870             coord_start_req_types = NULL;
2871         }
2872     }
2873 
2874  DONE:
2875     pml_state->error_code = exit_status;
2876     return pml_state;
2877 }
2878 
2879 
2880 int ompi_crcp_bkmrk_request_complete(struct ompi_request_t *request)
2881 {
2882     int ret, exit_status = OMPI_SUCCESS;
2883     ompi_crcp_bkmrk_pml_peer_ref_t    *peer_ref = NULL;
2884     mca_pml_base_request_t *breq;
2885     size_t tmp_ddt_size  = 0;
2886     int src, tag;
2887 
2888     OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
2889                         "crcp:bkmrk: pml_request_complete()"));
2890 
2891     
2892 
2893 
2894     breq = (mca_pml_base_request_t *)request;
2895 
2896     if( (breq->req_type   != MCA_PML_REQUEST_SEND &&
2897          breq->req_type   != MCA_PML_REQUEST_RECV ) || 
2898         request->req_type == OMPI_REQUEST_NOOP ||
2899         request->req_type == OMPI_REQUEST_NULL) {
2900         exit_status = OMPI_SUCCESS;
2901         goto DONE;
2902     }
2903 
2904     
2905     src = breq->req_peer;
2906     tag = breq->req_tag;
2907     ompi_datatype_type_size(breq->req_datatype, &tmp_ddt_size);
2908 
2909     
2910 
2911 
2912     if( MPI_ANY_SOURCE == src ) {
2913         if( OMPI_SUCCESS != (ret = find_peer_in_comm(breq->req_comm, request->req_status.MPI_SOURCE, &peer_ref) ) ){
2914             opal_output(mca_crcp_bkmrk_component.super.output_handle,
2915                         "crcp:bkmrk: req_complete(): Failed to find peer_ref\n");
2916             exit_status = ret;
2917             goto DONE;
2918         }
2919     } else {
2920         if( OMPI_SUCCESS != (ret = find_peer_in_comm(breq->req_comm, src,  &peer_ref) ) ){
2921             opal_output(mca_crcp_bkmrk_component.super.output_handle,
2922                         "crcp:bkmrk: req_complete(): Failed to find peer_ref\n");
2923             exit_status = ret;
2924             goto DONE;
2925         }
2926     }
2927 
2928     
2929 
2930 
2931     if(breq->req_type == MCA_PML_REQUEST_SEND ) {
2932         
2933 
2934 
2935         if( false == request->req_persistent ) {
2936             if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_request_complete_isend(request, peer_ref,
2937                                                                              src, tag, tmp_ddt_size) ) ) {
2938                 exit_status = ret;
2939                 goto DONE;
2940             }
2941         }
2942         
2943 
2944 
2945         else {
2946             if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_request_complete_isend_init(request, peer_ref,
2947                                                                                   src, tag, tmp_ddt_size) ) ) {
2948                 exit_status = ret;
2949                 goto DONE;
2950             }
2951         }
2952     }
2953     
2954 
2955 
2956     else if(breq->req_type == MCA_PML_REQUEST_RECV) {
2957         
2958 
2959 
2960         if( false == request->req_persistent ) {
2961             if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_request_complete_irecv(request, peer_ref,
2962                                                                              src, tag, tmp_ddt_size) ) ) {
2963                 exit_status = ret;
2964                 goto DONE;
2965             }
2966         }
2967         
2968 
2969 
2970         else {
2971             if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_request_complete_irecv_init(request, peer_ref,
2972                                                                                   src, tag, tmp_ddt_size) ) ) {
2973                 exit_status = ret;
2974                 goto DONE;
2975             }
2976         }
2977     }
2978 
2979  DONE:
2980     return exit_status;
2981 }
2982 
2983 
2984 int ompi_crcp_bkmrk_pml_quiesce_start(ompi_crcp_bkmrk_pml_quiesce_tag_type_t tag ) {
2985     int ret, exit_status = OMPI_SUCCESS;
2986 
2987     if( OMPI_SUCCESS != (ret = ft_event_coordinate_peers()) ) {
2988         exit_status = ret;
2989     }
2990 
2991     return exit_status;
2992 }
2993 
2994 int ompi_crcp_bkmrk_pml_quiesce_end(ompi_crcp_bkmrk_pml_quiesce_tag_type_t tag ) {
2995     int ret, exit_status = OMPI_SUCCESS;
2996 
2997     if( OMPI_SUCCESS != (ret = ft_event_finalize_exchange() ) ) {
2998         exit_status = ret;
2999     }
3000 
3001     return exit_status;
3002 }
3003 
3004 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_ft_event(
3005                                   int state,
3006                                   ompi_crcp_base_pml_state_t* pml_state)
3007 {
3008     static int step_to_return_to = 0;
3009     static bool first_continue_pass = false;
3010     opal_list_item_t* item = NULL;
3011     int exit_status = OMPI_SUCCESS;
3012     int ret;
3013 
3014     ft_event_state = state;
3015 
3016     if( step_to_return_to == 1 ) {
3017         goto STEP_1;
3018     }
3019 
3020     OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
3021                         "crcp:bkmrk: pml_ft_event()"));
3022 
3023     
3024 
3025 
3026     if(OPAL_CRS_CHECKPOINT == state) {
3027         if( OMPI_CRCP_PML_PRE != pml_state->state){
3028             goto DONE;
3029         }
3030 
3031         if( opal_cr_timing_barrier_enabled ) {
3032             OPAL_CR_SET_TIMER(OPAL_CR_TIMER_CRCPBR0);
3033             if( OMPI_SUCCESS != (ret = opal_pmix.fence(NULL, 0))) {
3034                 exit_status = ret;
3035                 goto DONE;
3036             }
3037         }
3038         OPAL_CR_SET_TIMER(OPAL_CR_TIMER_CRCP0);
3039 
3040         START_TIMER(CRCP_TIMER_TOTAL_CKPT);
3041     STEP_1:
3042         step_to_return_to = 0;
3043 
3044         
3045 
3046 
3047 
3048         if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_pml_quiesce_start(QUIESCE_TAG_CKPT)) ) {
3049             opal_output(mca_crcp_bkmrk_component.super.output_handle,
3050                         "crcp:bkmrk: %s ft_event: Checkpoint Coordination Failed %d",
3051                         OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
3052                         ret);
3053             exit_status = ret;
3054             goto DONE;
3055         }
3056 
3057         if( stall_for_completion ) {
3058             stall_for_completion = false;
3059             opal_cr_stall_check  = true;
3060             step_to_return_to    = 1;
3061 
3062             exit_status = OMPI_EXISTS;
3063             goto DONE_STALL;
3064         }
3065         END_TIMER(CRCP_TIMER_TOTAL_CKPT);
3066 
3067         DISPLAY_ALL_TIMERS(state);
3068         clear_timers();
3069     }
3070     
3071 
3072 
3073     else if(OPAL_CRS_CONTINUE == state) {
3074         if( OMPI_CRCP_PML_POST != pml_state->state){
3075             goto DONE;
3076         }
3077 
3078         first_continue_pass = !first_continue_pass;
3079 
3080         
3081         if (opal_cr_continue_like_restart && first_continue_pass) {
3082             goto DONE;
3083         }
3084 
3085         START_TIMER(CRCP_TIMER_TOTAL_CONT);
3086 
3087         
3088 
3089 
3090         if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_pml_quiesce_end(QUIESCE_TAG_CONTINUE) ) ) {
3091             opal_output(mca_crcp_bkmrk_component.super.output_handle,
3092                         "crcp:bkmrk: pml_ft_event: Checkpoint Finalization Failed %d",
3093                         ret);
3094             exit_status = ret;
3095             goto DONE;
3096         }
3097         END_TIMER(CRCP_TIMER_TOTAL_CONT);
3098 
3099         DISPLAY_ALL_TIMERS(state);
3100         clear_timers();
3101 
3102         if( opal_cr_timing_barrier_enabled ) {
3103             OPAL_CR_SET_TIMER(OPAL_CR_TIMER_COREBR1);
3104             if( OMPI_SUCCESS != (ret = opal_pmix.fence(NULL, 0))) {
3105                 exit_status = ret;
3106                 goto DONE;
3107             }
3108         }
3109         OPAL_CR_SET_TIMER(OPAL_CR_TIMER_CORE2);
3110     }
3111     
3112 
3113 
3114     else if(OPAL_CRS_RESTART == state) {
3115         if( OMPI_CRCP_PML_POST != pml_state->state){
3116             goto DONE;
3117         }
3118 
3119         START_TIMER(CRCP_TIMER_TOTAL_RST);
3120         
3121 
3122 
3123         for(item  = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
3124             item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
3125             item  = opal_list_get_next(item) ) {
3126             ompi_crcp_bkmrk_pml_peer_ref_t *cur_peer_ref;
3127             cur_peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
3128 
3129             
3130             cur_peer_ref->proc_name.jobid = OMPI_PROC_MY_NAME->jobid;
3131         }
3132 
3133         
3134 
3135 
3136         if( OMPI_SUCCESS != (ret = ft_event_finalize_exchange() ) ) {
3137             opal_output(mca_crcp_bkmrk_component.super.output_handle,
3138                         "crcp:bkmrk: pml_ft_event: Checkpoint Finalization Failed %d",
3139                         ret);
3140             exit_status = ret;
3141             goto DONE;
3142         }
3143 
3144         END_TIMER(CRCP_TIMER_TOTAL_RST);
3145 
3146         DISPLAY_ALL_TIMERS(state);
3147         clear_timers();
3148     }
3149     
3150 
3151 
3152     else if(OPAL_CRS_TERM == state ) {
3153         goto DONE;
3154     }
3155     
3156 
3157 
3158     else {
3159         goto DONE;
3160     }
3161 
3162  DONE:
3163     step_to_return_to = 0;
3164     ft_event_state = OPAL_CRS_RUNNING;
3165 
3166  DONE_STALL:
3167     pml_state->error_code = exit_status;
3168     return pml_state;
3169 }
3170 
3171 
3172 
3173 
3174 
3175 
3176 
3177 
3178 static int traffic_message_append(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
3179                                   opal_list_t * append_list,
3180                                   ompi_crcp_bkmrk_pml_message_type_t msg_type,
3181                                   size_t count,
3182                                   ompi_datatype_t *datatype,
3183                                   size_t in_ddt_size,
3184                                   int tag,
3185                                   int dest,
3186                                   struct ompi_communicator_t* comm,
3187                                   ompi_crcp_bkmrk_pml_traffic_message_ref_t **msg_ref)
3188 {
3189     int ret, exit_status = OMPI_SUCCESS;
3190     size_t ddt_size = 0;
3191 
3192     if( NULL != datatype ) {
3193         ompi_datatype_type_size(datatype,
3194                            &ddt_size);
3195     } else {
3196         ddt_size = in_ddt_size;
3197         
3198     }
3199 
3200     
3201 
3202 
3203 
3204 
3205     if( OMPI_SUCCESS != (ret = traffic_message_find(append_list,
3206                                                     count, tag, dest,
3207                                                     comm->c_contextid,
3208                                                     ddt_size,
3209                                                     msg_ref,
3210                                                     FIND_MSG_UNKNOWN  
3211                                                     ) ) ) {
3212         opal_output(mca_crcp_bkmrk_component.super.output_handle,
3213                     "crcp:bkmrk: traffic_message_append: Unable to find the proper message reference.\n");
3214         return OMPI_ERROR;
3215     }
3216 
3217     if( NULL != *msg_ref ) {
3218         if( msg_type == COORD_MSG_TYPE_P_SEND ||
3219             msg_type == COORD_MSG_TYPE_P_RECV ) {
3220             (*msg_ref)->posted++;
3221         } else {
3222             (*msg_ref)->active++;
3223         }
3224     } else {
3225         if( NULL != peer_ref ) {
3226             CREATE_NEW_MSG((*msg_ref), msg_type,
3227                            count, ddt_size, tag, dest, comm,
3228                            peer_ref->proc_name.jobid,
3229                            peer_ref->proc_name.vpid);
3230         } else {
3231             CREATE_NEW_MSG((*msg_ref), msg_type,
3232                            count, ddt_size, tag, dest, comm,
3233                            ORTE_JOBID_INVALID, ORTE_VPID_INVALID);
3234         }
3235 
3236         if( msg_type == COORD_MSG_TYPE_P_SEND ||
3237             msg_type == COORD_MSG_TYPE_P_RECV ) {
3238             (*msg_ref)->matched        = 0;
3239             (*msg_ref)->done           = 0;
3240             (*msg_ref)->active         = 0;
3241             (*msg_ref)->posted         = 1;
3242         } else {
3243             (*msg_ref)->matched        = 0;
3244             (*msg_ref)->done           = 0;
3245             (*msg_ref)->active         = 1;
3246             (*msg_ref)->posted         = 0;
3247         }
3248 
3249         opal_list_append(append_list, &((*msg_ref)->super));
3250     }
3251 
3252     if( NULL != peer_ref ) {
3253         if( msg_type == COORD_MSG_TYPE_B_SEND ) {
3254             TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Append Message (send)      --", true));
3255         }
3256         else if( msg_type == COORD_MSG_TYPE_P_SEND ) {
3257             TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Append Message (send_init) --", true));
3258         }
3259         else if( msg_type == COORD_MSG_TYPE_B_RECV ) {
3260             TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Append Message (recv)      --", true));
3261         }
3262         else if( msg_type == COORD_MSG_TYPE_P_RECV ) {
3263             TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Append Message (recv_init) --", true));
3264         }
3265         else if( msg_type == COORD_MSG_TYPE_I_SEND || msg_type == COORD_MSG_TYPE_I_RECV ) {
3266             ;
3267         }
3268         else {
3269             TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Append Message (Unknown)   --", true));
3270         }
3271     }
3272 
3273     return exit_status;
3274 }
3275 
3276 static int traffic_message_start(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref,
3277                                  ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
3278                                  ompi_request_t **request,
3279                                  opal_list_t * peer_list,
3280                                  ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref)
3281 {
3282     
3283 
3284 
3285     msg_ref->active++;
3286 
3287     traffic_message_find_mark_persistent(msg_ref, request,
3288                                          false,  
3289                                          true, 
3290                                          content_ref);
3291     return OMPI_SUCCESS;
3292 }
3293 
3294 static int traffic_message_move(ompi_crcp_bkmrk_pml_traffic_message_ref_t *old_msg_ref,
3295                                 ompi_crcp_bkmrk_pml_message_type_t msg_type,
3296                                 ompi_crcp_bkmrk_pml_peer_ref_t *from_peer_ref,
3297                                 opal_list_t * from_list,
3298                                 ompi_crcp_bkmrk_pml_peer_ref_t *to_peer_ref,
3299                                 opal_list_t * to_list,
3300                                 ompi_crcp_bkmrk_pml_traffic_message_ref_t **new_msg_ref,
3301                                 bool keep_active,
3302                                 bool remove)
3303 {
3304     int ret;
3305     ompi_crcp_bkmrk_pml_message_content_ref_t *new_content = NULL, *prev_content = NULL;
3306     ompi_request_t *request = NULL;
3307     bool loc_already_drained = false;
3308 
3309     
3310     if( COORD_MSG_TYPE_B_RECV != msg_type ) {
3311         traffic_message_grab_content(old_msg_ref, &prev_content, remove, true); 
3312         request = prev_content->request;
3313 
3314         loc_already_drained = prev_content->already_drained;
3315 
3316         if( remove ) {
3317             prev_content->request = NULL;
3318             HOKE_CONTENT_REF_RETURN(prev_content);
3319         }
3320     }
3321 
3322     ret = traffic_message_append(to_peer_ref, to_list,
3323                                  old_msg_ref->msg_type,
3324                                  old_msg_ref->count,
3325                                  NULL,
3326                                  old_msg_ref->ddt_size,
3327                                  old_msg_ref->tag,
3328                                  old_msg_ref->rank,
3329                                  old_msg_ref->comm,
3330                                  new_msg_ref);
3331 
3332     if( loc_already_drained ) {
3333         old_msg_ref->active_drain--;
3334         (*new_msg_ref)->active--; 
3335         (*new_msg_ref)->active_drain++;
3336     } else {
3337         
3338         old_msg_ref->active--;
3339     }
3340 
3341     if( msg_type == COORD_MSG_TYPE_P_SEND ||
3342         msg_type == COORD_MSG_TYPE_P_RECV ) {
3343         if( keep_active ) {
3344             (*new_msg_ref)->active++;
3345         }
3346     }
3347 
3348     if( COORD_MSG_TYPE_B_RECV != msg_type && NULL == request ) {
3349         ERROR_SHOULD_NEVER_HAPPEN("Error: Must match a non-blocking send, and there is no matching request.");
3350     }
3351 
3352     if( NULL != request ) {
3353         HOKE_CONTENT_REF_ALLOC(new_content);
3354         new_content->buffer  =  NULL;
3355         new_content->request =  request;
3356         new_content->done    =  false;
3357         new_content->active  =  keep_active;
3358         new_content->already_posted  = true;
3359         new_content->already_drained = loc_already_drained;
3360         OBJ_RETAIN(request);
3361         opal_list_append(&((*new_msg_ref)->msg_contents), &(new_content->super) );
3362     }
3363 
3364     if( NULL == from_peer_ref && NULL != to_peer_ref ) {
3365         (*new_msg_ref)->proc_name.jobid = to_peer_ref->proc_name.jobid;
3366         (*new_msg_ref)->proc_name.vpid  = to_peer_ref->proc_name.vpid;
3367     }
3368 
3369     return ret;
3370 }
3371 
3372 static int traffic_message_find_mark_persistent(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref,
3373                                                 ompi_request_t **request,
3374                                                 bool cur_active,
3375                                                 bool set_is_active,
3376                                                 ompi_crcp_bkmrk_pml_message_content_ref_t **c_ref)
3377 {
3378     mca_pml_base_request_t * breq = NULL;
3379     opal_list_item_t* item = NULL;
3380 
3381     breq = (mca_pml_base_request_t *)(*request);
3382 
3383     for(item  = opal_list_get_first(&(msg_ref->msg_contents));
3384         item != opal_list_get_end(  &(msg_ref->msg_contents));
3385         item  = opal_list_get_next(item) ) {
3386         ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
3387         mca_pml_base_request_t * loc_breq = NULL;
3388 
3389         content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)item;
3390         loc_breq    = (mca_pml_base_request_t *)(content_ref->request);
3391 
3392         if( content_ref->active != cur_active ) {
3393             continue;
3394         }
3395         else if( loc_breq->req_sequence == breq->req_sequence ) {
3396             OPAL_OUTPUT_VERBOSE((25, mca_crcp_bkmrk_component.super.output_handle,
3397                                  "%s %8s Request [%d] (%s) %d : %d",
3398                                  OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
3399                                  (set_is_active ? "Start" : (NULL != c_ref ? "Drain" : "Complete")),
3400                                  (int)msg_ref->msg_id,
3401                                  (content_ref->active ? "T" : "F"),
3402                                  (int)loc_breq->req_sequence,
3403                                  (int)breq->req_sequence));
3404 
3405             content_ref->active = set_is_active;
3406             if( NULL != c_ref ) {
3407                 *c_ref = content_ref;
3408             }
3409             break;
3410         }
3411     }
3412 
3413     return OMPI_SUCCESS;
3414 }
3415 
3416 static int traffic_message_grab_content(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref,
3417                                         ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref,
3418                                         bool remove,
3419                                         bool already_drained)
3420 {
3421     ompi_crcp_bkmrk_pml_message_content_ref_t *new_content = NULL;
3422     ompi_crcp_bkmrk_pml_message_content_ref_t *loc_content_ref = NULL;
3423     opal_list_item_t* item = NULL;
3424 
3425     
3426 
3427 
3428     if( 0 >= opal_list_get_size(&msg_ref->msg_contents) ) {
3429         return OMPI_SUCCESS;
3430     }
3431 
3432     
3433 
3434 
3435 
3436     if( already_drained ) {
3437         item  = opal_list_get_first(&(msg_ref->msg_contents));
3438         new_content = (ompi_crcp_bkmrk_pml_message_content_ref_t*)item;
3439     }
3440 
3441     for(item  = opal_list_get_first(&(msg_ref->msg_contents));
3442         item != opal_list_get_end(  &(msg_ref->msg_contents));
3443         item  = opal_list_get_next(item) ) {
3444         loc_content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)item;
3445 
3446         if( !already_drained ) {
3447             TRAFFIC_MSG_DUMP_MSG_CONTENT_INDV(10, (loc_content_ref));
3448         }
3449 
3450         if( loc_content_ref->already_drained == already_drained ) {
3451             new_content = (ompi_crcp_bkmrk_pml_message_content_ref_t*)item;
3452             break;
3453         }
3454     }
3455 
3456     if( remove ) {
3457         opal_list_remove_item(&msg_ref->msg_contents, &(new_content->super));
3458     }
3459 
3460     if( NULL != content_ref ) {
3461         *content_ref = new_content;
3462     } else if( remove && NULL != new_content ) {
3463         HOKE_CONTENT_REF_RETURN(new_content);
3464     }
3465 
3466     return OMPI_SUCCESS;
3467 }
3468 
3469 static int traffic_message_create_drain_message(bool post_drain,
3470                                                 int max_post,
3471                                                 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
3472                                                 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_msg_ref,
3473                                                 int *num_posted)
3474 {
3475     ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref = NULL;
3476     ompi_crcp_bkmrk_pml_message_content_ref_t *new_content = NULL, *prev_content = NULL;
3477     int m_iter, m_total;
3478 
3479     *num_posted = 0;
3480 
3481     
3482 
3483 
3484     if( NULL == (*posted_msg_ref) || max_post <= 0) {
3485         return OMPI_SUCCESS;
3486     }
3487 
3488     
3489 
3490 
3491     m_total = max_post;
3492     if( !post_drain && max_post > (*posted_msg_ref)->active ) {
3493         m_total = (*posted_msg_ref)->active;
3494     }
3495 
3496     OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
3497                          "crcp:bkmrk: %s <-- %s "
3498                          " --> Create Drain Msg: %s %4d = min(%4d / %4d)",
3499                          OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
3500                          OMPI_NAME_PRINT(&(peer_ref->proc_name)),
3501                          (post_drain ? "Posting" : "Not Posting"),
3502                          m_total, (*posted_msg_ref)->active, max_post ));
3503 
3504     TRAFFIC_MSG_DUMP_MSG_INDV(10, ((*posted_msg_ref), "Drain", true));
3505 
3506     
3507 
3508 
3509     drain_message_append(peer_ref,
3510                          COORD_MSG_TYPE_I_RECV,
3511                          (*posted_msg_ref)->count,
3512                          (*posted_msg_ref)->ddt_size,
3513                          (*posted_msg_ref)->tag,
3514                          (*posted_msg_ref)->rank,
3515                          (*posted_msg_ref)->comm,
3516                          &drain_msg_ref);
3517 
3518     
3519 
3520 
3521     for(m_iter = 0; m_iter < m_total; ++m_iter) {
3522         new_content = NULL;
3523 
3524         OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
3525                              "crcp:bkmrk: %s <-- %s "
3526                              " \t--> Find Content: %s (%4d of %4d)",
3527                              OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
3528                              OMPI_NAME_PRINT(&(peer_ref->proc_name)),
3529                              (post_drain ? "Posting" : "Not Posting"),
3530                              m_iter, m_total));
3531 
3532         
3533 
3534 
3535 
3536 
3537         traffic_message_grab_content((*posted_msg_ref), &prev_content, false, false); 
3538         if( NULL != prev_content ) {
3539             prev_content->already_drained = true;
3540         }
3541 
3542         
3543         if( !post_drain && (*posted_msg_ref)->msg_type != COORD_MSG_TYPE_B_RECV ) {
3544             assert( NULL != prev_content );
3545         }
3546 
3547         
3548         if( NULL != prev_content ) {
3549             (*posted_msg_ref)->active--;
3550         }
3551         (*posted_msg_ref)->active_drain++;
3552 
3553         
3554         HOKE_CONTENT_REF_ALLOC(new_content);
3555         new_content->buffer  = NULL;
3556         if( NULL == prev_content ) {
3557             new_content->request  = NULL;
3558         } else {
3559             new_content->request = prev_content->request;
3560             if( NULL != new_content->request ) {
3561                 OBJ_RETAIN(new_content->request);
3562             }
3563         }
3564         opal_list_append(&(drain_msg_ref->msg_contents), &(new_content->super) );
3565 
3566         if( !post_drain ) {
3567             new_content->done            = false;
3568             new_content->active          = true;
3569             new_content->already_posted  = true;
3570             new_content->already_drained = true;
3571 
3572             drain_msg_ref->active++;
3573             drain_msg_ref->already_posted++;
3574         } else {
3575             new_content->done            = false;
3576             new_content->active          = false;
3577             new_content->already_posted  = false;
3578             new_content->already_drained = true;
3579 
3580             
3581 
3582 
3583 
3584 
3585             ompi_datatype_duplicate(&(ompi_mpi_packed.dt), &(drain_msg_ref->datatype));
3586 
3587             
3588             if(drain_msg_ref->count > 0 ) {
3589                 new_content->buffer = (void *) malloc(drain_msg_ref->count * drain_msg_ref->ddt_size);
3590             } else {
3591                 new_content->buffer = (void *) malloc(1 * drain_msg_ref->ddt_size);
3592             }
3593 
3594             
3595         }
3596 
3597         (*num_posted)++;
3598     }
3599 
3600     peer_ref->total_drained_msgs += *num_posted;
3601 
3602     OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
3603                          "crcp:bkmrk: %s <-- %s "
3604                          "Added %d messages to the drained list (size = %d)",
3605                          OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
3606                          OMPI_NAME_PRINT(&(peer_ref->proc_name)),
3607                          (*num_posted),
3608                          (int)opal_list_get_size(&(peer_ref->drained_list)) ));
3609 
3610     return OMPI_SUCCESS;
3611 }
3612 
3613 static int traffic_message_find_recv(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
3614                              int rank, uint32_t comm_id, int tag,
3615                              size_t count, size_t datatype_size,
3616                              ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_recv_msg_ref,
3617                              ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_irecv_msg_ref,
3618                              ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_precv_msg_ref,
3619                              ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_unknown_recv_msg_ref,
3620                              ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_unknown_precv_msg_ref)
3621 {
3622     int ret;
3623 
3624     *posted_recv_msg_ref  = NULL;
3625     *posted_irecv_msg_ref = NULL;
3626     *posted_precv_msg_ref = NULL;
3627     *posted_unknown_recv_msg_ref  = NULL;
3628     *posted_unknown_precv_msg_ref = NULL;
3629 
3630     
3631 
3632 
3633     if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->recv_list),
3634                                                     count, tag, INVALID_INT,
3635                                                     comm_id, datatype_size,
3636                                                     posted_recv_msg_ref,
3637                                                     FIND_MSG_UNKNOWN) ) ) {
3638         opal_output(mca_crcp_bkmrk_component.super.output_handle,
3639                     "crcp:bkmrk: traffic_message_find_recv: Unable to find the proper message reference.\n");
3640         return OMPI_ERROR;
3641     }
3642 
3643     
3644 
3645 
3646     if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->irecv_list),
3647                                                     count, tag, INVALID_INT,
3648                                                     comm_id, datatype_size,
3649                                                     posted_irecv_msg_ref,
3650                                                     FIND_MSG_UNKNOWN) ) ) {
3651         opal_output(mca_crcp_bkmrk_component.super.output_handle,
3652                     "crcp:bkmrk: traffic_message_find_recv: Unable to find the proper message reference.\n");
3653         return OMPI_ERROR;
3654     }
3655 
3656     
3657 
3658 
3659     if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->recv_init_list),
3660                                                     count, tag, INVALID_INT,
3661                                                     comm_id, datatype_size,
3662                                                     posted_precv_msg_ref,
3663                                                     FIND_MSG_UNKNOWN) ) ) {
3664         opal_output(mca_crcp_bkmrk_component.super.output_handle,
3665                     "crcp:bkmrk: traffic_message_find_recv: Unable to find the proper message reference.\n");
3666         return OMPI_ERROR;
3667     }
3668 
3669     
3670 
3671 
3672     if( OMPI_SUCCESS != (ret = traffic_message_find(&(unknown_recv_from_list),
3673                                                     count, tag, INVALID_INT,
3674                                                     comm_id, datatype_size,
3675                                                     posted_unknown_recv_msg_ref,
3676                                                     FIND_MSG_UNKNOWN) ) ) {
3677         opal_output(mca_crcp_bkmrk_component.super.output_handle,
3678                     "crcp:bkmrk: traffic_message_find_recv: Unable to find the proper message reference.\n");
3679         return OMPI_ERROR;
3680     }
3681 
3682     
3683 
3684 
3685     if( OMPI_SUCCESS != (ret = traffic_message_find(&(unknown_persist_recv_list),
3686                                                     count, tag, INVALID_INT,
3687                                                     comm_id, datatype_size,
3688                                                     posted_unknown_precv_msg_ref,
3689                                                     FIND_MSG_UNKNOWN) ) ) {
3690         opal_output(mca_crcp_bkmrk_component.super.output_handle,
3691                     "crcp:bkmrk: traffic_message_find_recv: Unable to find the proper message reference.\n");
3692         return OMPI_ERROR;
3693     }
3694 
3695     
3696 
3697 
3698 
3699 
3700 
3701     return OMPI_SUCCESS;
3702 }
3703 
3704 static int traffic_message_find(opal_list_t * search_list,
3705                                 size_t count, int tag, int peer,
3706                                 uint32_t comm_id, size_t ddt_size,
3707                                 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** found_msg_ref,
3708                                 int active )
3709 {
3710     opal_list_item_t* item = NULL;
3711 
3712     *found_msg_ref = NULL;
3713 
3714 #if OPAL_ENABLE_DEBUG == 1
3715     
3716 
3717 
3718     if( NULL == search_list) {
3719         opal_output(0, "WARNING (Debug): Search_list NULL! (%s:%d)", __FILE__, __LINE__);
3720         return OMPI_ERROR;
3721     }
3722 #endif
3723 
3724     
3725 
3726 
3727     for(item  = opal_list_get_last(search_list);
3728         item != opal_list_get_begin(search_list);
3729         item  = opal_list_get_prev(item) ) {
3730         ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref;
3731         msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)item;
3732 
3733         if( active != FIND_MSG_UNKNOWN ) {
3734             if( active == PERSIST_MARKER ) {
3735                 if( 0 >= msg_ref->posted ) {
3736                     continue;
3737                 }
3738             }
3739             else if( (active == FIND_MSG_TRUE  && 0 >= (msg_ref->active + msg_ref->active_drain) ) ||
3740                      (active == FIND_MSG_FALSE && 0 <= (msg_ref->active + msg_ref->active_drain) ) ) {
3741                 continue;
3742             }
3743         }
3744 
3745         if(msg_ref->count == count  &&
3746            (NULL != msg_ref->comm && msg_ref->comm->c_contextid == comm_id) &&
3747            (msg_ref->tag  == MPI_ANY_TAG || msg_ref->tag  == tag)   &&
3748            (peer          == INVALID_INT || msg_ref->rank == peer)  &&
3749            msg_ref->ddt_size == ddt_size) {
3750 
3751             OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
3752                                 "crcp:bkmrk: traffic_message_find: Found Message -- Comm list (%d, %d)\n",
3753                                 tag, peer));
3754 
3755             *found_msg_ref = msg_ref;
3756             return OMPI_SUCCESS;
3757         }
3758     }
3759 
3760     return OMPI_SUCCESS;
3761 }
3762 
3763 
3764 
3765 
3766 
3767 static int drain_message_append(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
3768                                 ompi_crcp_bkmrk_pml_message_type_t msg_type,
3769                                 size_t count, size_t ddt_size,
3770                                 int tag,int dest,
3771                                 struct ompi_communicator_t* comm,
3772                                 ompi_crcp_bkmrk_pml_drain_message_ref_t **msg_ref)
3773 {
3774     int ret, exit_status = OMPI_SUCCESS;
3775     ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
3776 
3777     
3778 
3779 
3780 
3781 
3782     if( OMPI_SUCCESS != (ret = drain_message_find(&(peer_ref->drained_list),
3783                                                   count, tag, dest,
3784                                                   comm->c_contextid,
3785                                                   ddt_size,
3786                                                   msg_ref,
3787                                                   &content_ref) ) ) {
3788         opal_output(mca_crcp_bkmrk_component.super.output_handle,
3789                     "crcp:bkmrk: traffic_message_append: Unable to find the proper message reference.\n");
3790         return OMPI_ERROR;
3791     }
3792 
3793     if( NULL == *msg_ref ) {
3794         CREATE_NEW_DRAIN_MSG((*msg_ref), msg_type,
3795                              count, NULL, tag, dest, comm,
3796                              peer_ref->proc_name.jobid,
3797                              peer_ref->proc_name.vpid);
3798 
3799         (*msg_ref)->done           = 0;
3800         (*msg_ref)->active         = 0;
3801         (*msg_ref)->already_posted = 0;
3802 
3803         opal_list_append(&(peer_ref->drained_list), &((*msg_ref)->super));
3804     }
3805     
3806 
3807     return exit_status;
3808 }
3809 
3810 static int drain_message_remove(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
3811                                 ompi_crcp_bkmrk_pml_drain_message_ref_t *msg_ref,
3812                                 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref)
3813 {
3814     
3815 
3816 
3817     opal_list_remove_item(&(msg_ref->msg_contents), &(content_ref->super));
3818     HOKE_CONTENT_REF_RETURN(content_ref);
3819 
3820     
3821 
3822 
3823 
3824     if( 0 >= opal_list_get_size(&(msg_ref->msg_contents) ) ) {
3825         TRAFFIC_MSG_DUMP_DRAIN_MSG_INDV(10, (msg_ref, "D*remove", true));
3826         opal_list_remove_item(&(peer_ref->drained_list), &(msg_ref->super));
3827         HOKE_DRAIN_MSG_REF_RETURN(msg_ref);
3828     } else {
3829         TRAFFIC_MSG_DUMP_DRAIN_MSG_INDV(10, (msg_ref, "Dremove", true));
3830     }
3831 
3832     return OMPI_SUCCESS;
3833 }
3834 
3835 static int drain_message_check_recv(void **buf, size_t count,
3836                                     ompi_datatype_t *datatype,
3837                                     int *src, int *tag,
3838                                     struct ompi_communicator_t* comm,
3839                                     struct ompi_request_t **request,
3840                                     ompi_status_public_t** status,
3841                                     bool *found_drain)
3842 {
3843     int ret, exit_status = OMPI_SUCCESS;
3844     ompi_crcp_bkmrk_pml_peer_ref_t    *peer_ref      = NULL;
3845     ompi_crcp_bkmrk_pml_drain_message_ref_t   *drain_msg_ref = NULL;
3846     ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
3847     size_t tmp_ddt_size  = 0;
3848 
3849     *found_drain = false;
3850 
3851     ompi_datatype_type_size(datatype, &tmp_ddt_size);
3852 
3853     
3854 
3855 
3856     if( OMPI_SUCCESS != (ret = drain_message_find_any(count, *tag, *src,
3857                                                       comm, tmp_ddt_size,
3858                                                       &drain_msg_ref,
3859                                                       &content_ref,
3860                                                       &peer_ref) ) ) {
3861         ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: drain_check_recv(): Failed trying to find a drained message.");
3862         exit_status = ret;
3863         goto DONE;
3864     }
3865 
3866     
3867 
3868 
3869 
3870 
3871 
3872     if( NULL != drain_msg_ref ) {
3873         OPAL_OUTPUT_VERBOSE((12, mca_crcp_bkmrk_component.super.output_handle,
3874                              "crcp:bkmrk: drain_check_recv(): Matched a drained message..."));
3875 
3876         if( OMPI_SUCCESS != (ret = drain_message_copy_remove(drain_msg_ref,
3877                                                              content_ref,
3878                                                              src, tag, request, status,
3879                                                              datatype, count, buf,
3880                                                              peer_ref) ) ) {
3881             opal_output( mca_crcp_bkmrk_component.super.output_handle,
3882                          "crcp:bkmrk: drain_check_recv(): Datatype copy failed (%d)",
3883                          ret);
3884             exit_status = ret;
3885             goto DONE;
3886         }
3887 
3888         peer_ref->total_drained_msgs -= 1;
3889 
3890         *found_drain = true;
3891     }
3892 
3893  DONE:
3894     return exit_status;
3895 }
3896 
3897 static int drain_message_find_any(size_t count, int tag, int peer,
3898                                   struct ompi_communicator_t* comm, size_t ddt_size,
3899                                   ompi_crcp_bkmrk_pml_drain_message_ref_t ** found_msg_ref,
3900                                   ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref,
3901                                   ompi_crcp_bkmrk_pml_peer_ref_t **peer_ref)
3902 {
3903     ompi_crcp_bkmrk_pml_peer_ref_t *cur_peer_ref = NULL;
3904     opal_list_item_t* item = NULL;
3905 
3906     *found_msg_ref = NULL;
3907 
3908     for(item  = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
3909         item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
3910         item  = opal_list_get_next(item) ) {
3911         cur_peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
3912 
3913         
3914 
3915 
3916 
3917         if( MPI_ANY_SOURCE != peer && peer >= 0) {
3918             
3919             if( comm->c_local_group->grp_proc_count <= peer ) {
3920                 continue;
3921             }
3922 
3923             if( OPAL_EQUAL != ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
3924                                                             &(cur_peer_ref->proc_name),
3925                                                             OMPI_CAST_RTE_NAME(&comm->c_local_group->grp_proc_pointers[peer]->super.proc_name))) {
3926                 continue;
3927             }
3928         }
3929 
3930         drain_message_find(&(cur_peer_ref->drained_list),
3931                            count, tag, peer,
3932                            comm->c_contextid, ddt_size,
3933                            found_msg_ref,
3934                            content_ref);
3935         if( NULL != *found_msg_ref) {
3936             if( NULL != peer_ref ) {
3937                 *peer_ref = cur_peer_ref;
3938             }
3939             return OMPI_SUCCESS;
3940         }
3941     }
3942 
3943     return OMPI_SUCCESS;
3944 }
3945 
3946 static int drain_message_find(opal_list_t * search_list,
3947                               size_t count, int tag, int peer,
3948                               uint32_t comm_id, size_t ddt_size,
3949                               ompi_crcp_bkmrk_pml_drain_message_ref_t ** found_msg_ref,
3950                               ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref)
3951 {
3952     ompi_crcp_bkmrk_pml_drain_message_ref_t * drain_msg = NULL;
3953     opal_list_item_t* item = NULL;
3954 
3955     *found_msg_ref = NULL;
3956     *content_ref   = NULL;
3957 
3958     
3959 
3960 
3961     if( 0 >= opal_list_get_size(search_list) ) {
3962         return OMPI_SUCCESS;
3963     }
3964 
3965     for(item  = opal_list_get_first(search_list);
3966         item != opal_list_get_end(search_list);
3967         item  = opal_list_get_next(item) ) {
3968         drain_msg = (ompi_crcp_bkmrk_pml_drain_message_ref_t*)item;
3969 
3970         OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
3971                              "crcp:bkmrk: find_drain_msg(): Compare [%d, %d, %d, %d] to [%d, %d, %d, %d]",
3972                              (int)ddt_size, (int)count, tag, peer,
3973                              (int)drain_msg->ddt_size, (int)drain_msg->count, (int)drain_msg->tag, (int)drain_msg->rank));
3974 
3975         
3976         if( NULL != drain_msg->comm ) {
3977             if( drain_msg->comm->c_contextid != comm_id ) {
3978                 continue;
3979             }
3980         }
3981 
3982         
3983         if( MPI_ANY_TAG    != tag &&
3984             drain_msg->tag != tag) {
3985             continue;
3986         }
3987 
3988         
3989         if( INVALID_INT != peer ) {
3990             if( MPI_ANY_SOURCE  != peer &&
3991                 drain_msg->rank != peer) {
3992                 continue;
3993             }
3994         }
3995 
3996         
3997         if( ddt_size != PROBE_ANY_SIZE &&
3998             count    != PROBE_ANY_COUNT) {
3999             
4000             if((drain_msg->count   ) != count   ||
4001                (drain_msg->ddt_size) != ddt_size) {
4002                 continue;
4003             }
4004         }
4005 
4006         
4007         *found_msg_ref = drain_msg;
4008         break;
4009     }
4010 
4011     
4012 
4013 
4014     if( NULL != *found_msg_ref ) {
4015         drain_message_grab_content((*found_msg_ref), content_ref );
4016 
4017         
4018         if( NULL == *content_ref ) {
4019             *found_msg_ref = NULL;
4020         }
4021     }
4022 
4023     return OMPI_SUCCESS;
4024 }
4025 
4026 static int drain_message_grab_content(ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref,
4027                                       ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref)
4028 {
4029     ompi_crcp_bkmrk_pml_message_content_ref_t *loc_content_ref = NULL;
4030     opal_list_item_t* item = NULL;
4031 
4032     *content_ref = NULL;
4033 
4034     for(item  = opal_list_get_first(&(drain_msg_ref->msg_contents));
4035         item != opal_list_get_end(&(drain_msg_ref->msg_contents));
4036         item  = opal_list_get_next(item) ) {
4037         loc_content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)item;
4038 
4039         
4040 
4041         if(NULL != loc_content_ref->buffer) {
4042             *content_ref = loc_content_ref;
4043             break;
4044         }
4045     }
4046 
4047     return OMPI_SUCCESS;
4048 }
4049 
4050 static int drain_message_copy_remove_persistent(ompi_crcp_bkmrk_pml_drain_message_ref_t   *drain_msg_ref,
4051                                                 ompi_crcp_bkmrk_pml_message_content_ref_t *drain_content_ref,
4052                                                 ompi_crcp_bkmrk_pml_traffic_message_ref_t *traffic_msg_ref,
4053                                                 ompi_request_t *request,
4054                                                 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref)
4055 {
4056     int ret, exit_status = OMPI_SUCCESS;
4057     ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
4058 
4059     
4060 
4061 
4062     traffic_message_find_mark_persistent(traffic_msg_ref, &request,
4063                                          false, 
4064                                          false, 
4065                                          &content_ref);
4066 
4067     
4068     content_ref->request = request;
4069 
4070     memcpy(&(content_ref->status), &drain_content_ref->status, sizeof(ompi_status_public_t));
4071 
4072     if( 0 != (ret = ompi_datatype_copy_content_same_ddt(drain_msg_ref->datatype,
4073                                                    drain_msg_ref->count,
4074                                                    content_ref->buffer,
4075                                                    drain_content_ref->buffer) ) ) {
4076         opal_output( mca_crcp_bkmrk_component.super.output_handle,
4077                      "crcp:bkmrk: drain_message_copy_remove_p(): Datatype copy failed (%d)",
4078                      ret);
4079         exit_status = ret;
4080     }
4081 
4082     
4083     drain_content_ref->request = NULL;
4084     drain_message_remove(peer_ref, drain_msg_ref, drain_content_ref);
4085 
4086     return exit_status;
4087 }
4088 
4089 static int drain_message_copy_remove(ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref,
4090                                      ompi_crcp_bkmrk_pml_message_content_ref_t * drain_content_ref,
4091                                      int *src, int *tag,
4092                                      struct ompi_request_t **request,
4093                                      ompi_status_public_t **status,
4094                                      ompi_datatype_t *datatype, int count, void **buf,
4095                                      ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref)
4096 {
4097     int ret, exit_status = OMPI_SUCCESS;
4098 
4099     if( NULL != src ) {
4100         *src = drain_msg_ref->rank;
4101     }
4102 
4103     if( NULL != tag ) {
4104         *tag = drain_msg_ref->tag;
4105     }
4106 
4107     if( NULL != request ) {
4108         *request = drain_content_ref->request;
4109         OBJ_RETAIN(*request);
4110     }
4111 
4112     if( NULL != status && MPI_STATUS_IGNORE != *status ) {
4113         memcpy(*status, &drain_content_ref->status, sizeof(ompi_status_public_t));
4114     }
4115 
4116     
4117     if( OPAL_LIKELY(NULL != buf) ) {
4118         if( 0 != (ret = ompi_datatype_copy_content_same_ddt(datatype, count,
4119                                                        (void*)buf, drain_content_ref->buffer) ) ) {
4120             opal_output( mca_crcp_bkmrk_component.super.output_handle,
4121                          "crcp:bkmrk: drain_message_copy_remove(): Datatype copy failed (%d)",
4122                          ret);
4123             exit_status = ret;
4124         }
4125     }
4126     else {
4127         OPAL_OUTPUT_VERBOSE((20, mca_crcp_bkmrk_component.super.output_handle,
4128                              "crcp:bkmrk: drain_message_copy_remove(): Skip copy - NULL buffer"));
4129     }
4130 
4131     
4132     drain_content_ref->request = NULL;
4133     drain_message_remove(peer_ref, drain_msg_ref, drain_content_ref);
4134 
4135     return exit_status;
4136 }
4137 
4138 
4139 
4140 
4141 
4142 static ompi_crcp_bkmrk_pml_peer_ref_t * find_peer(ompi_process_name_t proc)
4143 {
4144     opal_list_item_t* item = NULL;
4145     ompi_rte_cmp_bitmask_t mask;
4146 
4147     for(item  = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
4148         item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
4149         item  = opal_list_get_next(item) ) {
4150         ompi_crcp_bkmrk_pml_peer_ref_t *cur_peer_ref;
4151         cur_peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
4152 
4153         mask = OMPI_RTE_CMP_JOBID | OMPI_RTE_CMP_VPID;
4154 
4155         if( OPAL_EQUAL == ompi_rte_compare_name_fields(mask,
4156                                                         &(cur_peer_ref->proc_name),
4157                                                         &proc) ) {
4158             return cur_peer_ref;
4159         }
4160     }
4161 
4162     return NULL;
4163 }
4164 
4165 static int find_peer_in_comm(struct ompi_communicator_t* comm, int proc_idx,
4166                              ompi_crcp_bkmrk_pml_peer_ref_t **peer_ref)
4167 {
4168     *peer_ref = find_peer(*(ompi_process_name_t *)&comm->c_remote_group->grp_proc_pointers[proc_idx]->super.proc_name);
4169 
4170     if( NULL == *peer_ref) {
4171         opal_output(mca_crcp_bkmrk_component.super.output_handle,
4172                     "crcp:bkmrk: find_peer_in_comm(): Failed to find peer_ref - peer_ref is NULL\n");
4173         return OMPI_ERROR;
4174     }
4175 
4176     return OMPI_SUCCESS;
4177 }
4178 
4179 
4180 
4181 
4182 
4183 static int ft_event_coordinate_peers(void)
4184 {
4185     static int step_to_return_to = 0;
4186     int exit_status = OMPI_SUCCESS;
4187     int ret;
4188 
4189     if( step_to_return_to == 1 ) {
4190         goto STEP_1;
4191     }
4192 
4193     
4194 
4195 
4196     START_TIMER(CRCP_TIMER_CKPT_EX_B);
4197     if( OMPI_SUCCESS != (ret = ft_event_exchange_bookmarks() ) ) {
4198         opal_output(mca_crcp_bkmrk_component.super.output_handle,
4199                     "crcp:bkmrk: ft_event_coordinate_peers: Bookmark Exchange Failed %d",
4200                     ret);
4201         exit_status = ret;
4202         goto DONE;
4203     }
4204     END_TIMER(CRCP_TIMER_CKPT_EX_B);
4205 
4206     
4207 
4208 
4209     START_TIMER(CRCP_TIMER_CKPT_CHECK_B);
4210     if( OMPI_SUCCESS != (ret = ft_event_check_bookmarks() ) ) {
4211         opal_output(mca_crcp_bkmrk_component.super.output_handle,
4212                     "crcp:bkmrk: ft_event_coordinate_peers: Bookmark Check Failed %d",
4213                     ret);
4214         exit_status = ret;
4215         goto DONE;
4216     }
4217     END_TIMER(CRCP_TIMER_CKPT_CHECK_B);
4218 
4219     
4220 
4221 
4222     START_TIMER(CRCP_TIMER_CKPT_POST_DRAIN);
4223     if( OMPI_SUCCESS != (ret = ft_event_post_drain_acks() ) ) {
4224         opal_output(mca_crcp_bkmrk_component.super.output_handle,
4225                     "crcp:bkmrk: ft_event_coordinate_peers: Bookmark Post Drain ACKS Failed %d",
4226                     ret);
4227         exit_status = ret;
4228         goto DONE;
4229     }
4230 
4231     if( OMPI_SUCCESS != (ret = ft_event_post_drained() ) ) {
4232         opal_output(mca_crcp_bkmrk_component.super.output_handle,
4233                     "crcp:bkmrk: ft_event_coordinate_peers: Bookmark Post Drain Msgs Failed %d",
4234                     ret);
4235         exit_status = ret;
4236         goto DONE;
4237     }
4238     END_TIMER(CRCP_TIMER_CKPT_POST_DRAIN);
4239     DISPLAY_INDV_TIMER(CRCP_TIMER_CKPT_POST_DRAIN, -1, 0);
4240 
4241     
4242 
4243 
4244     
4245 
4246 
4247 
4248 
4249     if( 0 < current_msg_id &&
4250         current_msg_type == COORD_MSG_TYPE_B_SEND) {
4251         stall_for_completion = true;
4252     }
4253     START_TIMER(CRCP_TIMER_CKPT_WAIT_QUI);
4254     if( stall_for_completion ) {
4255         OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4256                              "crcp:bkmrk: %s **** STALLING %s in PID %d ***",
4257                              OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4258                              (current_msg_type == COORD_MSG_TYPE_B_SEND ? "Send" : "Recv"),
4259                              getpid() ));
4260         step_to_return_to = 1;
4261         exit_status = OMPI_SUCCESS;
4262         goto DONE;
4263     }
4264 
4265  STEP_1:
4266     step_to_return_to = 0;
4267 
4268     
4269 
4270 
4271 
4272 
4273     if( OMPI_SUCCESS != (ret = ft_event_wait_quiesce() ) ) {
4274         opal_output(mca_crcp_bkmrk_component.super.output_handle,
4275                     "crcp:bkmrk: ft_event_coordinate_peers: Wait Quiesce Failed %d",
4276                     ret);
4277         exit_status = ret;
4278         goto DONE;
4279     }
4280     END_TIMER(CRCP_TIMER_CKPT_WAIT_QUI);
4281 
4282     OPAL_OUTPUT_VERBOSE((5, mca_crcp_bkmrk_component.super.output_handle,
4283                         "crcp:bkmrk: %s Coordination Finished...\n",
4284                         OMPI_NAME_PRINT(OMPI_PROC_MY_NAME)));
4285 
4286     
4287 
4288 
4289 
4290 
4291 
4292 
4293 
4294 
4295  DONE:
4296     return exit_status;
4297 }
4298 
4299 static int ft_event_finalize_exchange(void)
4300 {
4301     int exit_status = OMPI_SUCCESS;
4302     opal_list_item_t* item = NULL, *rm_item = NULL;
4303     ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref;
4304     ompi_crcp_bkmrk_pml_message_content_ref_t  *content_ref = NULL;
4305     opal_list_item_t* cont_item = NULL;
4306 
4307     
4308 
4309 
4310     for(item  = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
4311         item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
4312         item  = opal_list_get_next(item) ) {
4313         ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref;
4314         peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
4315 
4316         if( OPAL_EQUAL != ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
4317                                                         (OMPI_PROC_MY_NAME),
4318                                                         &(peer_ref->proc_name)) ) {
4319             TRAFFIC_MSG_DUMP_PEER(10, (peer_ref, "finalize_exchange", false));
4320         }
4321 
4322         peer_ref->total_msgs_sent  = 0;
4323         peer_ref->total_msgs_recvd = 0;
4324 
4325         peer_ref->matched_msgs_sent  = 0;
4326         peer_ref->matched_msgs_recvd = 0;
4327 
4328         peer_ref->ack_required = false;
4329 
4330         
4331         for(rm_item  = opal_list_get_last(&peer_ref->send_list);
4332             rm_item != opal_list_get_begin(&peer_ref->send_list);
4333             rm_item  = opal_list_get_prev(rm_item) ) {
4334             msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)rm_item;
4335             msg_ref->matched = 0;
4336             msg_ref->done    = 0;
4337             msg_ref->active_drain  += msg_ref->active;
4338             msg_ref->active  = 0;
4339 
4340             for(cont_item  = opal_list_get_first(&(msg_ref->msg_contents));
4341                 cont_item != opal_list_get_end(  &(msg_ref->msg_contents));
4342                 cont_item  = opal_list_get_next(cont_item) ) {
4343                 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)cont_item;
4344                 if( content_ref->active ) {
4345                     content_ref->already_drained = true;
4346                 }
4347             }
4348         }
4349 
4350         
4351         for(rm_item  = opal_list_get_last(&peer_ref->isend_list);
4352             rm_item != opal_list_get_begin(&peer_ref->isend_list);
4353             rm_item  = opal_list_get_prev(rm_item) ) {
4354             msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)rm_item;
4355             msg_ref->matched = 0;
4356             msg_ref->done    = 0;
4357             msg_ref->active_drain  += msg_ref->active;
4358             msg_ref->active  = 0;
4359 
4360             for(cont_item  = opal_list_get_first(&(msg_ref->msg_contents));
4361                 cont_item != opal_list_get_end(  &(msg_ref->msg_contents));
4362                 cont_item  = opal_list_get_next(cont_item) ) {
4363                 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)cont_item;
4364                 if( content_ref->active ) {
4365                     content_ref->already_drained = true;
4366                 }
4367             }
4368         }
4369 
4370         
4371         for(rm_item  = opal_list_get_last(&peer_ref->send_list);
4372             rm_item != opal_list_get_begin(&peer_ref->send_list);
4373             rm_item  = opal_list_get_prev(rm_item) ) {
4374             msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)rm_item;
4375             msg_ref->matched = 0;
4376             msg_ref->done    = 0;
4377             msg_ref->active_drain  += msg_ref->active;
4378             msg_ref->active  = 0;
4379 
4380             for(cont_item  = opal_list_get_first(&(msg_ref->msg_contents));
4381                 cont_item != opal_list_get_end(  &(msg_ref->msg_contents));
4382                 cont_item  = opal_list_get_next(cont_item) ) {
4383                 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)cont_item;
4384                 if( content_ref->active ) {
4385                     content_ref->already_drained = true;
4386                 }
4387             }
4388         }
4389 
4390         
4391         for(rm_item  = opal_list_get_last(&peer_ref->recv_list);
4392             rm_item != opal_list_get_begin(&peer_ref->recv_list);
4393             rm_item  = opal_list_get_prev(rm_item) ) {
4394             msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)rm_item;
4395             msg_ref->matched = 0;
4396             msg_ref->done    = 0;
4397         }
4398 
4399         
4400         for(rm_item  = opal_list_get_last(&peer_ref->irecv_list);
4401             rm_item != opal_list_get_begin(&peer_ref->irecv_list);
4402             rm_item  = opal_list_get_prev(rm_item) ) {
4403             msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)rm_item;
4404             msg_ref->matched = 0;
4405             msg_ref->done    = 0;
4406         }
4407 
4408         
4409         for(rm_item  = opal_list_get_last(&peer_ref->recv_list);
4410             rm_item != opal_list_get_begin(&peer_ref->recv_list);
4411             rm_item  = opal_list_get_prev(rm_item) ) {
4412             msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)rm_item;
4413             msg_ref->matched = 0;
4414             msg_ref->done    = 0;
4415         }
4416     }
4417 
4418     return exit_status;
4419 }
4420 
4421 static int ft_event_exchange_bookmarks(void)
4422 {
4423     int peer_idx  = 0;
4424     int my_idx    = OMPI_PROC_MY_NAME->vpid;
4425     int iter      = 0;
4426     int num_peers = 0;
4427 
4428     num_peers = opal_list_get_size(&ompi_crcp_bkmrk_pml_peer_refs);
4429 
4430     for( peer_idx = (num_peers - my_idx - 1), iter = 0;
4431          iter < num_peers;
4432          peer_idx = (peer_idx + 1) % num_peers, ++iter)
4433         {
4434             if(my_idx > peer_idx) {
4435                 
4436                 send_bookmarks(peer_idx);
4437                 
4438                 recv_bookmarks(peer_idx);
4439             }
4440             else if(my_idx < peer_idx) {
4441                 
4442                 recv_bookmarks(peer_idx);
4443                 
4444                 send_bookmarks(peer_idx);
4445             }
4446         }
4447 
4448     
4449     START_TIMER(CRCP_TIMER_CKPT_EX_WAIT);
4450     while( total_recv_bookmarks > 0 ) {
4451         opal_event_loop(opal_sync_event_base, OPAL_EVLOOP_NONBLOCK);
4452     }
4453     total_recv_bookmarks = 0;
4454     END_TIMER(CRCP_TIMER_CKPT_EX_WAIT);
4455 
4456     return OMPI_SUCCESS;
4457 }
4458 
4459 static int ft_event_check_bookmarks(void)
4460 {
4461     opal_list_item_t* item = NULL;
4462     int ret;
4463     int p_n_to_p_m   = 0;
4464     int p_n_from_p_m = 0;
4465 
4466     if( 10 <= mca_crcp_bkmrk_component.super.verbose ) {
4467         sleep(OMPI_PROC_MY_NAME->vpid);
4468         OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4469                              "---------------------------------------------"));
4470         OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4471                              "Process %s Match Table",
4472                              OMPI_NAME_PRINT(OMPI_PROC_MY_NAME)));
4473         OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4474                             "%s  %5s | %7s | %7s | %7s | %7s |",
4475                             OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4476                             "Vpid", "T_Send", "M_Recv", "M_Send", "T_Recv"));
4477 
4478         for(item  = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
4479             item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
4480             item  = opal_list_get_next(item) ) {
4481             ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref;
4482             int t_send, m_send;
4483             int t_recv, m_recv;
4484             peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
4485 
4486             t_send = peer_ref->total_msgs_sent;
4487             m_send = peer_ref->matched_msgs_sent;
4488             t_recv = peer_ref->total_msgs_recvd;
4489             m_recv = peer_ref->matched_msgs_recvd;
4490 
4491             OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4492                                 "%s  %5d | %7d | %7d | %7d | %7d |",
4493                                 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4494                                 peer_ref->proc_name.vpid,
4495                                 t_send, m_recv, m_send, t_recv));
4496         }
4497         OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4498                              "---------------------------------------------"));
4499     }
4500 
4501     
4502 
4503 
4504 
4505 
4506     for(item  = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
4507         item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
4508         item  = opal_list_get_next(item) ) {
4509         ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref;
4510         peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
4511 
4512         if( OPAL_EQUAL == ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
4513                                                         (OMPI_PROC_MY_NAME),
4514                                                         &(peer_ref->proc_name)) ) {
4515             continue;
4516         }
4517 
4518         TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Bookmark Details --", false));
4519 
4520         
4521         if( OMPI_PROC_MY_NAME->vpid < peer_ref->proc_name.vpid ) {
4522             
4523 
4524 
4525 
4526             p_n_to_p_m   = peer_ref->total_msgs_sent;
4527             p_n_from_p_m = peer_ref->matched_msgs_recvd;
4528 
4529             
4530             if( p_n_to_p_m < p_n_from_p_m ) {
4531                 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4532                             "crcp:bkmrk: %s --> %s "
4533                             "Total Sent (%4d) = Matched Recv. (%4d) => Diff (%4d). "
4534                             " WARNING: Peer received more than was sent. :(\n",
4535                             OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4536                             OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4537                             p_n_to_p_m,
4538                             p_n_from_p_m,
4539                             (p_n_to_p_m - p_n_from_p_m)
4540                             );
4541             }
4542 
4543             
4544 
4545             if( p_n_to_p_m > p_n_from_p_m) {
4546                 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4547                                     "crcp:bkmrk: %s --> %s "
4548                                     "Total Sent (%4d) = Matched Recv. (%4d). Peer needs %4d.\n",
4549                                     OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4550                                     OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4551                                     p_n_to_p_m,
4552                                     p_n_from_p_m,
4553                                     (p_n_to_p_m - p_n_from_p_m)
4554                                     ));
4555                 
4556 
4557 
4558 
4559 
4560                 if( OMPI_SUCCESS != (ret = send_msg_details(peer_ref, p_n_to_p_m, p_n_from_p_m) ) ) {
4561                     opal_output(mca_crcp_bkmrk_component.super.output_handle,
4562                                 "crcp:bkmrk: check_bookmarks: Unable to send message details to peer %s: Return %d\n",
4563                                 OMPI_NAME_PRINT(&peer_ref->proc_name),
4564                                 ret);
4565                     return ret;
4566                 }
4567             }
4568 
4569             
4570 
4571 
4572 
4573             p_n_to_p_m   = peer_ref->matched_msgs_sent;
4574             p_n_from_p_m = peer_ref->total_msgs_recvd;
4575 
4576             
4577             if( p_n_to_p_m < p_n_from_p_m ) {
4578                 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4579                             "crcp:bkmrk: %s --> %s "
4580                             "Matched Sent (%4d) = Total Recv. (%4d) => Diff (%4d). "
4581                             " WARNING: I received more than the peer sent. :(\n",
4582                             OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4583                             OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4584                             p_n_to_p_m,
4585                             p_n_from_p_m,
4586                             (p_n_to_p_m - p_n_from_p_m)
4587                             );
4588             }
4589 
4590             
4591 
4592             if( p_n_to_p_m > p_n_from_p_m) {
4593                 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4594                                      "crcp:bkmrk: %s <-- %s "
4595                                      "Matched Sent (%4d) = Total Recv. (%4d). I need %4d.\n",
4596                                     OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4597                                     OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4598                                     p_n_to_p_m,
4599                                     p_n_from_p_m,
4600                                     (p_n_to_p_m - p_n_from_p_m)
4601                                     ));
4602                 
4603 
4604 
4605 
4606                 if( OMPI_SUCCESS != (ret = recv_msg_details(peer_ref, p_n_to_p_m, p_n_from_p_m) ) ) {
4607                     opal_output(mca_crcp_bkmrk_component.super.output_handle,
4608                                 "crcp:bkmrk: check_bookmarks: Unable to recv message details from peer %s: Return %d\n",
4609                                 OMPI_NAME_PRINT(&peer_ref->proc_name),
4610                                 ret);
4611                     return ret;
4612                 }
4613             }
4614         }
4615         
4616         else {
4617             
4618 
4619 
4620 
4621             p_n_to_p_m   = peer_ref->matched_msgs_sent;
4622             p_n_from_p_m = peer_ref->total_msgs_recvd;
4623 
4624             
4625             if( p_n_to_p_m < p_n_from_p_m ) {
4626                 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4627                             "crcp:bkmrk: %s --> %s "
4628                             "Matched Sent (%4d) = Total Recv. (%4d) => Diff (%4d). "
4629                             " WARNING: I received more than the peer sent. :(\n",
4630                             OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4631                             OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4632                             p_n_to_p_m,
4633                             p_n_from_p_m,
4634                             (p_n_to_p_m - p_n_from_p_m)
4635                             );
4636             }
4637 
4638             
4639 
4640             if( p_n_to_p_m > p_n_from_p_m) {
4641                 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4642                                      "crcp:bkmrk: %s <-- %s "
4643                                      "Matched Sent (%4d) = Total Recv. (%4d). I need %4d.\n",
4644                                     OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4645                                     OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4646                                     p_n_to_p_m,
4647                                     p_n_from_p_m,
4648                                     (p_n_to_p_m - p_n_from_p_m)
4649                                     ));
4650                 
4651 
4652 
4653 
4654                 if( OMPI_SUCCESS != (ret = recv_msg_details(peer_ref, p_n_to_p_m, p_n_from_p_m) ) ) {
4655                     opal_output(mca_crcp_bkmrk_component.super.output_handle,
4656                                 "crcp:bkmrk: check_bookmarks: Unable to recv message details from peer %s: Return %d\n",
4657                                 OMPI_NAME_PRINT(&peer_ref->proc_name),
4658                                 ret);
4659                     return ret;
4660                 }
4661             }
4662 
4663             
4664 
4665 
4666 
4667             p_n_to_p_m   = peer_ref->total_msgs_sent;
4668             p_n_from_p_m = peer_ref->matched_msgs_recvd;
4669 
4670             
4671             if( p_n_to_p_m < p_n_from_p_m ) {
4672                 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4673                             "crcp:bkmrk: %s --> %s "
4674                             "Total Sent (%4d) = Matched Recv. (%4d) => Diff (%4d). "
4675                             " WARNING: Peer received more than was sent. :(\n",
4676                             OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4677                             OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4678                             p_n_to_p_m,
4679                             p_n_from_p_m,
4680                             (p_n_to_p_m - p_n_from_p_m)
4681                             );
4682             }
4683 
4684             
4685 
4686             if( p_n_to_p_m > p_n_from_p_m) {
4687                 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4688                                     "crcp:bkmrk: %s --> %s "
4689                                     "Total Sent (%4d) = Matched Recv. (%4d). Peer needs %4d.\n",
4690                                     OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4691                                     OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4692                                     p_n_to_p_m,
4693                                     p_n_from_p_m,
4694                                     (p_n_to_p_m - p_n_from_p_m)
4695                                     ));
4696                 
4697 
4698 
4699 
4700 
4701                 if( OMPI_SUCCESS != (ret = send_msg_details(peer_ref, p_n_to_p_m, p_n_from_p_m) ) ) {
4702                     opal_output(mca_crcp_bkmrk_component.super.output_handle,
4703                                 "crcp:bkmrk: check_bookmarks: Unable to send message details to peer %s: Return %d\n",
4704                                 OMPI_NAME_PRINT(&peer_ref->proc_name),
4705                                 ret);
4706                     return ret;
4707                 }
4708             }
4709         }
4710     }
4711 
4712     return OMPI_SUCCESS;
4713 }
4714 
4715 static int ft_event_post_drain_acks(void)
4716 {
4717     ompi_crcp_bkmrk_pml_drain_message_ack_ref_t * drain_msg_ack = NULL;
4718     opal_list_item_t* item = NULL;
4719     size_t req_size;
4720 
4721     req_size  = opal_list_get_size(&drained_msg_ack_list);
4722     if(req_size <= 0) {
4723         return OMPI_SUCCESS;
4724     }
4725 
4726     OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4727                         "crcp:bkmrk: %s Wait on %d Drain ACK Messages.\n",
4728                         OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4729                         (int)req_size));
4730 
4731     
4732 
4733 
4734 
4735     for(item  = opal_list_get_first(&drained_msg_ack_list);
4736         item != opal_list_get_end(&drained_msg_ack_list);
4737         item  = opal_list_get_next(item) ) {
4738         drain_msg_ack = (ompi_crcp_bkmrk_pml_drain_message_ack_ref_t*)item;
4739 
4740         
4741         ompi_rte_recv_buffer_nb(&drain_msg_ack->peer, OMPI_CRCP_COORD_BOOKMARK_TAG,
4742                                 0, drain_message_ack_cbfunc, NULL);
4743     }
4744 
4745     return OMPI_SUCCESS;
4746 }
4747 
4748 static void drain_message_ack_cbfunc(int status,
4749                                      ompi_process_name_t* sender,
4750                                      opal_buffer_t *buffer,
4751                                      ompi_rml_tag_t tag,
4752                                      void* cbdata)
4753 {
4754     int ret, exit_status = OMPI_SUCCESS;
4755     opal_list_item_t* item = NULL;
4756     size_t ckpt_status;
4757 
4758     
4759 
4760 
4761     UNPACK_BUFFER(buffer, ckpt_status, 1, OPAL_SIZE, "");
4762 
4763     
4764 
4765 
4766     for(item  = opal_list_get_first(&drained_msg_ack_list);
4767         item != opal_list_get_end(&drained_msg_ack_list);
4768         item  = opal_list_get_next(item) ) {
4769         ompi_crcp_bkmrk_pml_drain_message_ack_ref_t * drain_msg_ack;
4770         drain_msg_ack = (ompi_crcp_bkmrk_pml_drain_message_ack_ref_t*)item;
4771 
4772         
4773         if(!drain_msg_ack->complete) {
4774             
4775             if( OPAL_EQUAL == ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
4776                                                             &(drain_msg_ack->peer),
4777                                                             sender) ) {
4778                 
4779                 drain_msg_ack->complete = true;
4780                 OPAL_OUTPUT_VERBOSE((5, mca_crcp_bkmrk_component.super.output_handle,
4781                                     "crcp:bkmrk: %s --> %s Received ACK of FLUSH from peer\n",
4782                                     OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4783                                     OMPI_NAME_PRINT(sender) ));
4784                 return;
4785             }
4786         }
4787     }
4788 
4789     opal_output(mca_crcp_bkmrk_component.super.output_handle,
4790                 "crcp:bkmrk: %s --> %s ERROR: Unable to match ACK to peer (%d)\n",
4791                 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4792                 OMPI_NAME_PRINT(sender), exit_status);
4793 
4794  cleanup:
4795     return;
4796 }
4797 
4798 static int ft_event_post_drained(void)
4799 {
4800     int ret, exit_status = OMPI_SUCCESS;
4801     ompi_crcp_bkmrk_pml_peer_ref_t *cur_peer_ref = NULL;
4802     ompi_crcp_bkmrk_pml_drain_message_ref_t * drain_msg_ref = NULL;
4803     ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
4804     opal_list_item_t* item = NULL, *d_item = NULL, *c_item = NULL;
4805     int i, total_number_to_drain = 0, peer_total = 0;
4806 
4807     
4808     for(item  = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
4809         item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
4810         item  = opal_list_get_next(item) ) {
4811         cur_peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
4812 
4813         if( OPAL_EQUAL == ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
4814                                                         (OMPI_PROC_MY_NAME),
4815                                                         &(cur_peer_ref->proc_name)) ) {
4816             continue;
4817         }
4818 
4819         for(d_item  = opal_list_get_first(&(cur_peer_ref->drained_list));
4820             d_item != opal_list_get_end(&(cur_peer_ref->drained_list));
4821             d_item  = opal_list_get_next(d_item) ) {
4822             drain_msg_ref = (ompi_crcp_bkmrk_pml_drain_message_ref_t*)d_item;
4823 
4824             for(c_item  = opal_list_get_first(&(drain_msg_ref->msg_contents));
4825                 c_item != opal_list_get_end(&(drain_msg_ref->msg_contents));
4826                 c_item  = opal_list_get_next(c_item) ) {
4827                 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)c_item;
4828 
4829                 if( !content_ref->done ) {
4830                     ++total_number_to_drain;
4831                 }
4832             }
4833         }
4834     }
4835 
4836     
4837 
4838 
4839     if( 0 >= total_number_to_drain ) {
4840         return OMPI_SUCCESS;
4841     }
4842 
4843     
4844     if( NULL != quiesce_requests ) {
4845         free(quiesce_requests);
4846         quiesce_requests = NULL;
4847     }
4848     quiesce_requests = (ompi_request_t **)malloc( (total_number_to_drain) * sizeof(ompi_request_t *));
4849     if( NULL == quiesce_requests){
4850         exit_status = OMPI_ERROR;
4851         goto cleanup;
4852     }
4853 
4854     if( NULL != quiesce_statuses ) {
4855         free(quiesce_statuses);
4856         quiesce_statuses = NULL;
4857     }
4858     quiesce_statuses = (ompi_status_public_t **)malloc( (total_number_to_drain) * sizeof(ompi_status_public_t *));
4859     if( NULL == quiesce_statuses){
4860         exit_status = OMPI_ERROR;
4861         goto cleanup;
4862     }
4863 
4864     
4865     for(i = 0; i < (total_number_to_drain); ++i) {
4866         quiesce_requests[i] = &(ompi_request_null.request);
4867         quiesce_statuses[i] = &ompi_status_empty;
4868     }
4869     quiesce_request_count = 0;
4870 
4871     
4872     for(item  = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
4873         item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
4874         item  = opal_list_get_next(item) ) {
4875         cur_peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
4876         peer_total = 0;
4877 
4878         if( OPAL_EQUAL == ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
4879                                                         (OMPI_PROC_MY_NAME),
4880                                                         &(cur_peer_ref->proc_name)) ) {
4881             continue;
4882         }
4883 
4884         for(d_item  = opal_list_get_first(&(cur_peer_ref->drained_list));
4885             d_item != opal_list_get_end(&(cur_peer_ref->drained_list));
4886             d_item  = opal_list_get_next(d_item) ) {
4887             drain_msg_ref = (ompi_crcp_bkmrk_pml_drain_message_ref_t*)d_item;
4888 
4889             for(c_item  = opal_list_get_first(&(drain_msg_ref->msg_contents));
4890                 c_item != opal_list_get_end(&(drain_msg_ref->msg_contents));
4891                 c_item  = opal_list_get_next(c_item) ) {
4892                 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)c_item;
4893 
4894                 if( content_ref->done ) {
4895                     continue;
4896                 }
4897 
4898                 if( OMPI_SUCCESS != (ret = ft_event_post_drain_message(drain_msg_ref, content_ref) ) ) {
4899                     exit_status = ret;
4900                     goto cleanup;
4901                 }
4902 
4903                 cur_peer_ref->ack_required = true;
4904 
4905                 
4906                 if( NULL != content_ref->request) {
4907                     quiesce_requests[quiesce_request_count] =  content_ref->request;
4908                     quiesce_statuses[quiesce_request_count] = &content_ref->status;
4909                     quiesce_request_count++;
4910                     peer_total++;
4911                 }
4912                 
4913                 else if( content_ref->already_posted ) {
4914                     stall_for_completion = true;
4915                 }
4916                 else {
4917                     ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: ft_event_post_drained(): Found a drain message with a NULL request.");
4918                 }
4919             }
4920         }
4921 
4922         if( peer_total > 0 || stall_for_completion ) {
4923             OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4924                                  "crcp:bkmrk: %s <-- %s Will be draining %4d messages from this peer. Total %4d %s\n",
4925                                  OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4926                                  OMPI_NAME_PRINT(&(cur_peer_ref->proc_name)),
4927                                  peer_total,
4928                                  quiesce_request_count,
4929                                  (stall_for_completion ? "(And Stalling)" : "") ));
4930         }
4931     }
4932 
4933  cleanup:
4934     return exit_status;
4935 }
4936 
4937 static int ft_event_post_drain_message(ompi_crcp_bkmrk_pml_drain_message_ref_t   *drain_msg_ref,
4938                                        ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref)
4939 {
4940     int ret;
4941 
4942     
4943 
4944 
4945 
4946     if( content_ref->done ) {
4947         return OMPI_SUCCESS;
4948     }
4949 
4950     
4951 
4952 
4953     if( content_ref->already_posted ) {
4954         OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
4955                              "crcp:bkmrk: %s <-- %s Found a message that we do not need to post.\n",
4956                              OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4957                              OMPI_NAME_PRINT(&(drain_msg_ref->proc_name)) ));
4958         return OMPI_SUCCESS;
4959     }
4960 
4961     
4962     content_ref->active = true;
4963     drain_msg_ref->active++;
4964 
4965     
4966 
4967 
4968     OPAL_OUTPUT_VERBOSE((20, mca_crcp_bkmrk_component.super.output_handle,
4969                          "crcp:bkmrk: %s <-- %s Posting a message to be drained from rank %d.\n",
4970                          OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4971                          OMPI_NAME_PRINT(&(drain_msg_ref->proc_name)),
4972                          drain_msg_ref->rank));
4973     if( OMPI_SUCCESS != (ret = wrapped_pml_module->pml_irecv(content_ref->buffer,
4974                                                              (drain_msg_ref->count * drain_msg_ref->ddt_size),
4975                                                              drain_msg_ref->datatype,
4976                                                              drain_msg_ref->rank,
4977                                                              drain_msg_ref->tag,
4978                                                              drain_msg_ref->comm,
4979                                                              &(content_ref->request) ) ) ) {
4980         opal_output(mca_crcp_bkmrk_component.super.output_handle,
4981                     "crcp:bkmrk: %s <-- %s Failed to post the Draining PML iRecv\n",
4982                     OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4983                     OMPI_NAME_PRINT(&(drain_msg_ref->proc_name)) );
4984         return ret;
4985     }
4986 
4987     return OMPI_SUCCESS;
4988 }
4989 
4990 static int ft_event_wait_quiesce(void)
4991 {
4992     int exit_status = OMPI_SUCCESS;
4993     int ret;
4994 
4995     
4996 
4997 
4998     if( OMPI_SUCCESS != (ret = wait_quiesce_drained() ) ) {
4999         opal_output(mca_crcp_bkmrk_component.super.output_handle,
5000                     "crcp:bkmrk: wait_quiesce: %s Failed to quiesce drained messages\n",
5001                     OMPI_NAME_PRINT(OMPI_PROC_MY_NAME) );
5002         exit_status = ret;
5003         goto cleanup;
5004     }
5005 
5006     
5007 
5008 
5009     if( OMPI_SUCCESS != (ret = wait_quiesce_drain_ack() ) ) {
5010         opal_output(mca_crcp_bkmrk_component.super.output_handle,
5011                     "crcp:bkmrk: wait_quiesce: %s Failed to recv all drain ACKs\n",
5012                     OMPI_NAME_PRINT(OMPI_PROC_MY_NAME) );
5013         exit_status = ret;
5014         goto cleanup;
5015     }
5016 
5017  cleanup:
5018     return exit_status;
5019 }
5020 
5021 static int wait_quiesce_drained(void)
5022 {
5023     int ret, exit_status = OMPI_SUCCESS;
5024     ompi_crcp_bkmrk_pml_peer_ref_t *cur_peer_ref = NULL;
5025     ompi_crcp_bkmrk_pml_drain_message_ref_t * drain_msg_ref = NULL;
5026     ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
5027     opal_list_item_t* item = NULL, *d_item = NULL, *d_next = NULL, *c_item = NULL, *c_next = NULL;
5028     bool prev_stall = false;
5029 
5030     
5031 
5032     OPAL_OUTPUT_VERBOSE((5, mca_crcp_bkmrk_component.super.output_handle,
5033                          "crcp:bkmrk: %s Waiting on %d messages to drain\n",
5034                          OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5035                          (int)quiesce_request_count));
5036 
5037     
5038 
5039 
5040 
5041     prev_stall = opal_cr_stall_check;
5042     opal_cr_stall_check = true;
5043     if( OMPI_SUCCESS != (ret = coord_request_wait_all(quiesce_request_count,
5044                                                       quiesce_requests,
5045                                                       quiesce_statuses) ) ) {
5046         exit_status = ret;
5047         goto cleanup;
5048     }
5049     opal_cr_stall_check = prev_stall;
5050 
5051     
5052 
5053 
5054 
5055 
5056 
5057 
5058     for(item  = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
5059         item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
5060         item  = opal_list_get_next(item) ) {
5061         cur_peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
5062 
5063         if( OPAL_EQUAL == ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
5064                                                         (OMPI_PROC_MY_NAME),
5065                                                         &(cur_peer_ref->proc_name)) ) {
5066             continue;
5067         }
5068 
5069         
5070 
5071 
5072         if( cur_peer_ref->ack_required ) {
5073             opal_buffer_t *buffer = NULL;
5074             size_t response = 1;
5075 
5076             OPAL_OUTPUT_VERBOSE((5, mca_crcp_bkmrk_component.super.output_handle,
5077                                  "crcp:bkmrk: %s --> %s Send ACKs to Peer\n",
5078                                  OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5079                                  OMPI_NAME_PRINT(&(cur_peer_ref->proc_name)) ));
5080 
5081             
5082             if (NULL == (buffer = OBJ_NEW(opal_buffer_t))) {
5083                 exit_status = OMPI_ERROR;
5084                 goto cleanup;
5085             }
5086 
5087             PACK_BUFFER(buffer, response, 1, OPAL_SIZE, "");
5088 
5089             
5090             if (ORTE_SUCCESS != (ret = ompi_rte_send_buffer_nb(&(cur_peer_ref->proc_name),
5091                                                                buffer, OMPI_CRCP_COORD_BOOKMARK_TAG,
5092                                                                orte_rml_send_callback, NULL))) {
5093                 exit_status = ret;
5094                 goto cleanup;
5095             }
5096             if( NULL != buffer) {
5097                 OBJ_RELEASE(buffer);
5098                 buffer = NULL;
5099             }
5100         }
5101 
5102         cur_peer_ref->ack_required = false;
5103 
5104         
5105 
5106 
5107         for(d_item  = opal_list_get_first(&(cur_peer_ref->drained_list));
5108             d_item != opal_list_get_end(&(cur_peer_ref->drained_list));
5109             d_item  = d_next ) {
5110             drain_msg_ref = (ompi_crcp_bkmrk_pml_drain_message_ref_t*)d_item;
5111             d_next = opal_list_get_next(d_item);
5112 
5113             for(c_item  = opal_list_get_first(&(drain_msg_ref->msg_contents));
5114                 c_item != opal_list_get_end(&(drain_msg_ref->msg_contents));
5115                 c_item  = c_next ) {
5116                 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)c_item;
5117                 c_next  = opal_list_get_next(c_item);
5118 
5119                 
5120 
5121 
5122 
5123                 if( content_ref->done ) {
5124                     continue;
5125                 }
5126 
5127                 if( content_ref->already_posted ) {
5128                     drain_message_remove(cur_peer_ref, drain_msg_ref, content_ref);
5129 
5130                     
5131                     drain_msg_ref->active--;
5132                     drain_msg_ref->already_posted--;
5133                 } else {
5134                     content_ref->done   = true;
5135                     content_ref->active = false;
5136 
5137                     
5138                     drain_msg_ref->done++;
5139                     drain_msg_ref->active--;
5140                 }
5141             }
5142         }
5143     }
5144 
5145  cleanup:
5146     if( NULL != quiesce_requests ) {
5147         free(quiesce_requests);
5148         quiesce_requests = NULL;
5149     }
5150 
5151     if( NULL != quiesce_statuses ) {
5152         free(quiesce_statuses);
5153         quiesce_statuses = NULL;
5154     }
5155 
5156     quiesce_request_count = 0;
5157 
5158     return exit_status;
5159 }
5160 
5161 static int coord_request_wait_all( size_t count,
5162                                    ompi_request_t ** requests,
5163                                    ompi_status_public_t ** statuses )
5164 {
5165     int exit_status = OMPI_SUCCESS;
5166     ompi_status_public_t * status;
5167     ompi_request_t *req;
5168     size_t i;
5169 
5170     
5171 
5172 
5173     for( i = 0; i < count; ++i) {
5174         req    = requests[i];
5175         status = statuses[i];
5176 
5177         coord_request_wait(req, status);
5178 
5179         OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
5180                             "crcp:bkmrk: %s Request Wait: Done with idx %d of %d\n",
5181                             OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5182                             (int)i, (int)count));
5183     }
5184 
5185     return exit_status;
5186 }
5187 
5188 static int coord_request_wait( ompi_request_t * req,
5189                                ompi_status_public_t * status)
5190 {
5191     ompi_request_wait_completion(req);
5192 
5193     if( MPI_STATUS_IGNORE != status ) {
5194         status->MPI_TAG    = req->req_status.MPI_TAG;
5195         status->MPI_SOURCE = req->req_status.MPI_SOURCE;
5196         status->_cancelled = req->req_status._cancelled;
5197         status->_ucount    = req->req_status._ucount;
5198     }
5199 
5200     return OMPI_SUCCESS;
5201 }
5202 
5203 static int wait_quiesce_drain_ack(void)
5204 {
5205     opal_list_item_t* item = NULL;
5206     opal_list_item_t* next = NULL;
5207     ompi_crcp_bkmrk_pml_drain_message_ack_ref_t * drain_msg_ack;
5208     int num_outstanding;
5209 
5210     
5211 
5212     num_outstanding = opal_list_get_size(&drained_msg_ack_list);
5213     if(num_outstanding <= 0) {
5214         
5215         return OMPI_SUCCESS;
5216     }
5217 
5218     OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5219                         "crcp:bkmrk: %s Waiting on %d Drain ACK messages\n",
5220                         OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5221                         num_outstanding));
5222 
5223     while(0 < num_outstanding) {
5224         for(item  = opal_list_get_first(&drained_msg_ack_list);
5225             item != opal_list_get_end(&drained_msg_ack_list);
5226             item = next) {
5227             drain_msg_ack = (ompi_crcp_bkmrk_pml_drain_message_ack_ref_t*)item;
5228             next = opal_list_get_next(item);
5229 
5230             if(drain_msg_ack->complete) {
5231                 num_outstanding--;
5232                 opal_list_remove_item(&drained_msg_ack_list, &(drain_msg_ack->super) );
5233                 HOKE_DRAIN_ACK_MSG_REF_RETURN(item);
5234                 break;
5235             }
5236         }
5237 
5238         opal_event_loop(opal_sync_event_base, OPAL_EVLOOP_NONBLOCK);
5239     }
5240 
5241     
5242     while (NULL != (item = opal_list_remove_first(&drained_msg_ack_list) ) ) {
5243         HOKE_DRAIN_ACK_MSG_REF_RETURN(item);
5244     }
5245 
5246     return OMPI_SUCCESS;
5247 }
5248 
5249 
5250 static int send_bookmarks(int peer_idx)
5251 {
5252     ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref;
5253     ompi_process_name_t peer_name;
5254     opal_buffer_t *buffer = NULL;
5255     int exit_status = OMPI_SUCCESS;
5256     int ret;
5257 
5258     START_TIMER(CRCP_TIMER_CKPT_EX_PEER_S);
5259     
5260 
5261 
5262     peer_name.jobid  = OMPI_PROC_MY_NAME->jobid;
5263     peer_name.vpid   = peer_idx;
5264 
5265     if( NULL == (peer_ref = find_peer(peer_name))) {
5266         opal_output(mca_crcp_bkmrk_component.super.output_handle,
5267                     "crcp:bkmrk: send_bookmarks: Error: Could not find peer indexed %d\n",
5268                     peer_idx);
5269         exit_status = OMPI_ERROR;
5270         goto cleanup;
5271     }
5272 
5273     OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
5274                          "crcp:bkmrk: %s --> %s Sending bookmark  (S[%6d] R[%6d])\n",
5275                          OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5276                          OMPI_NAME_PRINT(&peer_name),
5277                          peer_ref->total_msgs_sent,
5278                          peer_ref->total_msgs_recvd));
5279 
5280     
5281 
5282 
5283     if (NULL == (buffer = OBJ_NEW(opal_buffer_t))) {
5284         exit_status = OMPI_ERROR;
5285         goto cleanup;
5286     }
5287 
5288     PACK_BUFFER(buffer, (peer_ref->total_msgs_sent),      1, OPAL_UINT32,
5289                 "crcp:bkmrk: send_bookmarks: Unable to pack total_msgs_sent");
5290     PACK_BUFFER(buffer, (peer_ref->total_msgs_recvd),     1, OPAL_UINT32,
5291                 "crcp:bkmrk: send_bookmarks: Unable to pack total_msgs_recvd");
5292 
5293     if (ORTE_SUCCESS != (ret = ompi_rte_send_buffer_nb(&peer_name, buffer,
5294                                                        OMPI_CRCP_COORD_BOOKMARK_TAG,
5295                                                        orte_rml_send_callback, NULL))) {
5296         opal_output(mca_crcp_bkmrk_component.super.output_handle,
5297                     "crcp:bkmrk: send_bookmarks: Failed to send bookmark to peer %s: Return %d\n",
5298                     OMPI_NAME_PRINT(&peer_name),
5299                     ret);
5300         exit_status = ret;
5301         goto cleanup;
5302     }
5303 
5304  cleanup:
5305     if(NULL != buffer) {
5306         OBJ_RELEASE(buffer);
5307     }
5308 
5309     END_TIMER(CRCP_TIMER_CKPT_EX_PEER_S);
5310     DISPLAY_INDV_TIMER(CRCP_TIMER_CKPT_EX_PEER_S, peer_idx, 1);
5311 
5312     return exit_status;
5313 }
5314 
5315 
5316 static int recv_bookmarks(int peer_idx)
5317 {
5318     ompi_process_name_t peer_name;
5319 
5320     START_TIMER(CRCP_TIMER_CKPT_EX_PEER_R);
5321 
5322     peer_name.jobid  = OMPI_PROC_MY_NAME->jobid;
5323     peer_name.vpid   = peer_idx;
5324 
5325     ompi_rte_recv_buffer_nb(&peer_name, OMPI_CRCP_COORD_BOOKMARK_TAG,
5326                             0, recv_bookmarks_cbfunc, NULL);
5327 
5328     ++total_recv_bookmarks;
5329 
5330     END_TIMER(CRCP_TIMER_CKPT_EX_PEER_R);
5331     
5332     
5333 
5334     return OMPI_SUCCESS;
5335 }
5336 
5337 static void recv_bookmarks_cbfunc(int status,
5338                                   ompi_process_name_t* sender,
5339                                   opal_buffer_t *buffer,
5340                                   ompi_rml_tag_t tag,
5341                                   void* cbdata)
5342 {
5343     ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref;
5344     int exit_status = OMPI_SUCCESS;
5345     int ret, tmp_int;
5346     ompi_vpid_t peer_idx;
5347 
5348     peer_idx = sender->vpid;
5349 
5350     
5351 
5352 
5353     if( NULL == (peer_ref = find_peer(*sender))) {
5354         opal_output(mca_crcp_bkmrk_component.super.output_handle,
5355                     "crcp:bkmrk: recv_bookmarks: Could not find peer indexed %d\n",
5356                     peer_idx);
5357         exit_status = OMPI_ERROR;
5358         goto cleanup;
5359     }
5360 
5361     UNPACK_BUFFER(buffer, tmp_int, 1, OPAL_UINT32,
5362                   "crcp:bkmrk: recv_bookmarks: Unable to unpack total_msgs_sent");
5363     peer_ref->matched_msgs_sent = tmp_int;
5364 
5365     UNPACK_BUFFER(buffer, tmp_int, 1, OPAL_UINT32,
5366                   "crcp:bkmrk: recv_bookmarks: Unable to unpack total_msgs_recvd");
5367     peer_ref->matched_msgs_recvd = tmp_int;
5368 
5369     OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
5370                          "crcp:bkmrk: %s <-- %s Received bookmark (S[%6d] R[%6d]) vs. (S[%6d] R[%6d]) (%d)\n",
5371                          OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5372                          OMPI_NAME_PRINT(sender),
5373                          peer_ref->matched_msgs_sent,
5374                          peer_ref->matched_msgs_recvd,
5375                          peer_ref->total_msgs_sent,
5376                          peer_ref->total_msgs_recvd,
5377                          exit_status));
5378 
5379  cleanup:
5380     --total_recv_bookmarks;
5381 
5382     return;
5383 }
5384 
5385 static int send_msg_details(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
5386                             int total_sent, int total_matched)
5387 {
5388     int ret, exit_status = OMPI_SUCCESS;
5389     ompi_crcp_bkmrk_pml_drain_message_ack_ref_t * d_msg_ack = NULL;
5390     opal_list_t *search_list = NULL;
5391     opal_list_item_t* msg_item  = NULL;
5392     bool finished;
5393     int pass_num = 1;
5394     int need, found;
5395     int total_details_sent = 0;
5396     int num_matches = 0;
5397     int p_total_found = 0;
5398 
5399     need = total_sent - total_matched;
5400     found = 0;
5401     finished = false;
5402     assert(need > 0);
5403 
5404     START_TIMER(CRCP_TIMER_CKPT_CHECK_PEER_S);
5405 
5406     
5407 
5408 
5409     search_list = &(peer_ref->send_list);
5410     pass_num = 1;
5411 
5412  SEARCH_AGAIN:
5413     for(msg_item  = opal_list_get_last(search_list);
5414         msg_item != opal_list_get_begin(search_list);
5415         msg_item  = opal_list_get_prev(msg_item) ) {
5416         ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref;
5417         msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)msg_item;
5418 
5419         num_matches = 0;
5420 
5421         OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5422                              "crcp:bkmrk: send_msg_details: Stage 1: [M/A/D/AD] [%3d/%3d/%3d/%3d] (%s)",
5423                              msg_ref->matched, msg_ref->active, msg_ref->done, msg_ref->active_drain,
5424                              (msg_ref->msg_type == COORD_MSG_TYPE_B_SEND ? " Send" :
5425                               (msg_ref->msg_type == COORD_MSG_TYPE_I_SEND ? "iSend" : "pSend"))
5426                              ));
5427 
5428         
5429         if( 0 >= (msg_ref->active + msg_ref->done) ) {
5430             continue;
5431         }
5432         
5433 
5434         if(OMPI_SUCCESS != (ret = do_send_msg_detail(peer_ref, msg_ref, &num_matches, &p_total_found, &finished)) ) {
5435             opal_output(mca_crcp_bkmrk_component.super.output_handle,
5436                         "crcp:bkmrk: send_msg_details: %s --> %s Failed to send message details to peer. Return %d\n",
5437                         OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5438                         OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5439                         ret);
5440         }
5441 
5442         OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5443                              "crcp:bkmrk: send_msg_details: Stage 2: [M/A/D/AD] [%3d/%3d/%3d/%3d] (%s) [%3d, %3d, %s] [%3d, %3d]",
5444                              msg_ref->matched, msg_ref->active, msg_ref->done, msg_ref->active_drain,
5445                              (msg_ref->msg_type == COORD_MSG_TYPE_B_SEND ? " Send" :
5446                               (msg_ref->msg_type == COORD_MSG_TYPE_I_SEND ? "iSend" : "pSend")),
5447                              num_matches, p_total_found, (finished ? "T" : "F"),
5448                              total_details_sent, found
5449                              ));
5450 
5451         total_details_sent += num_matches;
5452         if(0 < num_matches ) {
5453             found += num_matches;
5454         }
5455         if(finished) {
5456             goto ALL_SENT;
5457         }
5458     }
5459 
5460     
5461 
5462 
5463 
5464     if( 1 == pass_num ) {
5465         search_list = &(peer_ref->isend_list);
5466         pass_num = 2;
5467         goto SEARCH_AGAIN;
5468     }
5469 
5470     
5471 
5472 
5473 
5474     if( 2 == pass_num ) {
5475         search_list = &(peer_ref->send_init_list);
5476         pass_num = 3;
5477         goto SEARCH_AGAIN;
5478     }
5479 
5480  ALL_SENT:
5481     if( need > found ) {
5482         OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5483                             "crcp:bkmrk: send_msg_details: ERROR: ****** Need (%d) vs Found (%d)",
5484                             need, found));
5485     }
5486     assert(need <= found);
5487 
5488     
5489 
5490 
5491 
5492     HOKE_DRAIN_ACK_MSG_REF_ALLOC(d_msg_ack);
5493     d_msg_ack->peer.jobid  = peer_ref->proc_name.jobid;
5494     d_msg_ack->peer.vpid   = peer_ref->proc_name.vpid;
5495 
5496     d_msg_ack->complete    = false;
5497     opal_list_append(&drained_msg_ack_list, &(d_msg_ack->super));
5498     OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5499                         "crcp:bkmrk: %s <-> %s Message Inflight! Will wait on ACK from this peer.\n",
5500                         OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5501                         OMPI_NAME_PRINT(&(peer_ref->proc_name))));
5502 
5503     END_TIMER(CRCP_TIMER_CKPT_CHECK_PEER_S);
5504     DISPLAY_INDV_TIMER(CRCP_TIMER_CKPT_CHECK_PEER_S, peer_ref->proc_name.vpid, total_details_sent);
5505 
5506     return exit_status;
5507 }
5508 
5509 static int do_send_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
5510                               ompi_crcp_bkmrk_pml_traffic_message_ref_t*msg_ref,
5511                               int *num_matches,
5512                               int *total_found,
5513                               bool *finished)
5514 {
5515     int ret, exit_status = OMPI_SUCCESS;
5516     opal_buffer_t *buffer = NULL;
5517     orte_rml_recv_cb_t *rb = NULL;
5518     int32_t recv_response = RECV_MATCH_RESP_ERROR;
5519     int32_t num_resolv = -1;
5520     int32_t p_total_found = -1;
5521     int comm_my_rank = -1;
5522     int total_sent;
5523 
5524     *num_matches = 0;
5525     *total_found = 0;;
5526     *finished    = false;
5527 
5528     if( NULL != buffer) {
5529         OBJ_RELEASE(buffer);
5530         buffer = NULL;
5531     }
5532 
5533     if (NULL == (buffer = OBJ_NEW(opal_buffer_t))) {
5534         exit_status = OMPI_ERROR;
5535         goto cleanup;
5536     }
5537 
5538     
5539 
5540 
5541 
5542 
5543     comm_my_rank  = ompi_comm_rank(msg_ref->comm);
5544 
5545     PACK_BUFFER(buffer, msg_ref->comm->c_contextid, 1, OPAL_UINT32,
5546                 "crcp:bkmrk: send_msg_details: Unable to pack communicator ID");
5547     PACK_BUFFER(buffer, comm_my_rank, 1, OPAL_INT,
5548                 "crcp:bkmrk: send_msg_details: Unable to pack comm rank ID");
5549 
5550     
5551 
5552 
5553 
5554 
5555 
5556     PACK_BUFFER(buffer, msg_ref->tag, 1, OPAL_INT,
5557                 "crcp:bkmrk: send_msg_details: Unable to pack tag");
5558     PACK_BUFFER(buffer, msg_ref->count, 1, OPAL_SIZE,
5559                 "crcp:bkmrk: send_msg_details: Unable to pack count");
5560     PACK_BUFFER(buffer, msg_ref->ddt_size, 1, OPAL_SIZE,
5561                 "crcp:bkmrk: send_msg_details: Unable to pack datatype size");
5562 
5563     
5564 
5565 
5566 
5567 
5568     total_sent = msg_ref->done + msg_ref->active;
5569     PACK_BUFFER(buffer, total_sent, 1, OPAL_INT,
5570                 "crcp:bkmrk: send_msg_details: Unable to pack done+active count");
5571 
5572     
5573 
5574 
5575     if (ORTE_SUCCESS != (ret = ompi_rte_send_buffer_nb(&peer_ref->proc_name, buffer,
5576                                                        OMPI_CRCP_COORD_BOOKMARK_TAG,
5577                                                        orte_rml_send_callback, NULL))) {
5578         opal_output(mca_crcp_bkmrk_component.super.output_handle,
5579                     "crcp:bkmrk: do_send_msg_detail: Unable to send message details to peer %s: Return %d\n",
5580                     OMPI_NAME_PRINT(&peer_ref->proc_name),
5581                     ret);
5582 
5583         exit_status = OMPI_ERROR;
5584         goto cleanup;
5585     }
5586 
5587     
5588 
5589 
5590     rb = OBJ_NEW(orte_rml_recv_cb_t);
5591     rb->active = true;
5592     ompi_rte_recv_buffer_nb(&peer_ref->proc_name, OMPI_CRCP_COORD_BOOKMARK_TAG, 0,
5593                             orte_rml_recv_callback, rb);
5594     ORTE_WAIT_FOR_COMPLETION(rb->active);
5595 
5596     UNPACK_BUFFER(&rb->data, recv_response, 1, OPAL_UINT32,
5597                   "crcp:bkmrk: send_msg_details: Failed to unpack the ACK from peer buffer.");
5598     UNPACK_BUFFER(&rb->data, num_resolv,  1, OPAL_UINT32,
5599                   "crcp:bkmrk: send_msg_details: Failed to unpack the num_resolv from peer buffer.");
5600     UNPACK_BUFFER(&rb->data, p_total_found, 1, OPAL_UINT32,
5601                   "crcp:bkmrk: send_msg_details: Failed to unpack the total_found from peer buffer.");
5602 
5603     OBJ_RELEASE(rb);
5604     
5605     msg_ref->matched += num_resolv;
5606     *num_matches      = num_resolv;
5607     *total_found      = p_total_found;
5608 
5609     
5610 
5611 
5612     if( RECV_MATCH_RESP_DONE == recv_response ) {
5613         *finished    = true;
5614     }
5615     else if( RECV_MATCH_RESP_MORE == recv_response ) {
5616         *finished    = false;
5617     }
5618 
5619     OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
5620                          "**************************\n"));
5621     OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
5622                          "send_msg_details: %d, %d = %s [%d / %d]\n",
5623                          *num_matches, *total_found,
5624                          (*finished ? "Done" : "Continue..."),
5625                          msg_ref->done, msg_ref->active));
5626     TRAFFIC_MSG_DUMP_MSG_INDV(15, (msg_ref, "", false));
5627     OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
5628                          "**************************\n"));
5629 
5630  cleanup:
5631     return exit_status;
5632 }
5633 
5634 
5635 
5636 
5637 
5638 static int recv_msg_details(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
5639                             int total_recv, int total_matched)
5640 {
5641     int need, found;
5642     int response;
5643     int exit_status = OMPI_SUCCESS;
5644     int ret;
5645     int total_details_recv = 0;
5646 
5647     need  = total_recv - total_matched;
5648     found = 0;
5649 
5650     assert( need > 0);
5651 
5652     START_TIMER(CRCP_TIMER_CKPT_CHECK_PEER_R);
5653 
5654     
5655 
5656 
5657     while(need > found) {
5658         uint32_t p_comm_id;
5659         size_t p_count;
5660         size_t p_datatype_size;
5661         int p_rank;
5662         int p_tag;
5663         int p_num_sent;
5664         int num_resolved = 0;
5665 
5666         
5667 
5668 
5669         if( OMPI_SUCCESS != (ret = do_recv_msg_detail(peer_ref,
5670                                                       &p_rank, &p_comm_id,
5671                                                       &p_tag, &p_count,
5672                                                       &p_datatype_size,
5673                                                       &p_num_sent)) ) {
5674             opal_output(mca_crcp_bkmrk_component.super.output_handle,
5675                         "crcp:bkmrk: recv_msg_details: %s <-- %s "
5676                         "Failed to receive message detail from peer. Return %d\n",
5677                         OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5678                         OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5679                         ret);
5680             exit_status = ret;
5681             goto cleanup;
5682         }
5683 
5684         
5685 
5686 
5687 
5688         num_resolved = 0;
5689         if( OMPI_SUCCESS != (ret = do_recv_msg_detail_check_drain(peer_ref,
5690                                                                   p_rank, p_comm_id,
5691                                                                   p_tag, p_count,
5692                                                                   p_datatype_size,
5693                                                                   p_num_sent,
5694                                                                   &num_resolved)) ) {
5695             opal_output(mca_crcp_bkmrk_component.super.output_handle,
5696                         "crcp:bkmrk: recv_msg_details: %s <-- %s "
5697                         "Failed to check message detail from peer. Return %d\n",
5698                         OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5699                         OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5700                         ret);
5701             exit_status = ret;
5702             goto cleanup;
5703         }
5704 
5705         found += num_resolved;
5706         total_details_recv += num_resolved;
5707 
5708         OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5709                              "crcp:bkmrk: %s <-- %s Recv Detail: Stage --: [%3d / %3d] [%3d, %3d, %s]",
5710                              OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5711                              OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5712                              need, found,
5713                              num_resolved, total_details_recv,
5714                              ( need <= found ? "T" : "F") ));
5715 
5716         
5717         if( need <= found ) {
5718             response = RECV_MATCH_RESP_DONE; 
5719         }
5720         
5721         else {
5722             response = RECV_MATCH_RESP_MORE;
5723         }
5724 
5725         if(OMPI_SUCCESS != (ret = do_recv_msg_detail_resp(peer_ref, response, num_resolved, found))) {
5726             opal_output(mca_crcp_bkmrk_component.super.output_handle,
5727                         "crcp:bkmrk: recv_msg_details: %s <-- %s Failed to respond to peer. Return %d\n",
5728                         OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5729                         OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5730                         ret);
5731             exit_status = ret;
5732             goto cleanup;
5733         }
5734     }
5735 
5736  cleanup:
5737 
5738     END_TIMER(CRCP_TIMER_CKPT_CHECK_PEER_R);
5739     DISPLAY_INDV_TIMER(CRCP_TIMER_CKPT_CHECK_PEER_R, peer_ref->proc_name.vpid, total_details_recv);
5740 
5741     return exit_status;
5742 }
5743 
5744 static int do_recv_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
5745                               int *rank, uint32_t *comm_id, int *tag,
5746                               size_t *count, size_t *datatype_size,
5747                               int *p_num_sent)
5748 {
5749     orte_rml_recv_cb_t *rb = NULL;
5750     int exit_status = OMPI_SUCCESS;
5751     int ret;
5752 
5753     
5754 
5755 
5756     rb = OBJ_NEW(orte_rml_recv_cb_t);
5757     rb->active = true;
5758     ompi_rte_recv_buffer_nb(&peer_ref->proc_name, OMPI_CRCP_COORD_BOOKMARK_TAG, 0, orte_rml_recv_callback, rb);
5759     ORTE_WAIT_FOR_COMPLETION(rb->active);
5760 
5761     
5762     UNPACK_BUFFER(&rb->data, (*comm_id), 1, OPAL_UINT32,
5763                   "crcp:bkmrk: recv_msg_details: Failed to unpack the communicator ID");
5764     UNPACK_BUFFER(&rb->data, (*rank), 1, OPAL_INT,
5765                   "crcp:bkmrk: recv_msg_details: Failed to unpack the communicator rank ID");
5766 
5767     
5768     UNPACK_BUFFER(&rb->data, (*tag), 1, OPAL_INT,
5769                   "crcp:bkmrk: recv_msg_details: Failed to unpack the tag");
5770     UNPACK_BUFFER(&rb->data, (*count), 1, OPAL_SIZE,
5771                   "crcp:bkmrk: recv_msg_details: Failed to unpack the count");
5772     UNPACK_BUFFER(&rb->data, (*datatype_size), 1, OPAL_SIZE,
5773                   "crcp:bkmrk: recv_msg_details: Failed to unpack the datatype size");
5774 
5775     
5776     UNPACK_BUFFER(&rb->data, (*p_num_sent), 1, OPAL_INT,
5777                   "crcp:bkmrk: recv_msg_details: Failed to unpack the sent count");
5778 
5779 cleanup:
5780     OBJ_RELEASE(rb);
5781     return exit_status;
5782 }
5783 
5784 static int do_recv_msg_detail_check_drain(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
5785                                     int rank, uint32_t comm_id, int tag,
5786                                     size_t count, size_t datatype_size,
5787                                     int p_num_sent,
5788                                     int *num_resolved)
5789 {
5790     int ret, exit_status = OMPI_SUCCESS;
5791     ompi_crcp_bkmrk_pml_traffic_message_ref_t *posted_tmp_msg_ref   = NULL;
5792     ompi_crcp_bkmrk_pml_traffic_message_ref_t *posted_recv_msg_ref  = NULL;
5793     ompi_crcp_bkmrk_pml_traffic_message_ref_t *posted_irecv_msg_ref = NULL;
5794     ompi_crcp_bkmrk_pml_traffic_message_ref_t *posted_precv_msg_ref = NULL;
5795     ompi_crcp_bkmrk_pml_traffic_message_ref_t *posted_unknown_recv_msg_ref  = NULL;
5796     ompi_crcp_bkmrk_pml_traffic_message_ref_t *posted_unknown_precv_msg_ref = NULL;
5797     
5798     int num_left_unresolved = 0;
5799     
5800     int num_still_active = 0;
5801     
5802     int num_posted = 0;
5803 
5804     *num_resolved = 0;
5805     num_left_unresolved = p_num_sent;
5806 
5807     OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5808                          "crcp:bkmrk: %s <-- %s "
5809                          "Stage 0: Ck.Drain: [TR %3d/MS %3d] sent %4d, unres %4d, res %4d",
5810                          OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5811                          OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5812                          peer_ref->total_msgs_recvd,
5813                          peer_ref->matched_msgs_sent,
5814                          p_num_sent,
5815                          num_left_unresolved,
5816                          *num_resolved));
5817     TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "Recv Check...", true));
5818 
5819     
5820 
5821 
5822     ret = traffic_message_find_recv(peer_ref,                                 
5823                                     rank, comm_id, tag, count, datatype_size, 
5824                                     &posted_recv_msg_ref,                     
5825                                     &posted_irecv_msg_ref,
5826                                     &posted_precv_msg_ref,
5827                                     &posted_unknown_recv_msg_ref,
5828                                     &posted_unknown_precv_msg_ref);
5829     if( OMPI_SUCCESS != ret) {
5830         opal_output(mca_crcp_bkmrk_component.super.output_handle,
5831                     "crcp:bkmrk: recv_msg_detail_check: %s -- %s "
5832                     "Failed to determine if we have received this message. Return %d\n",
5833                     OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5834                     OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5835                     ret);
5836         exit_status = ret;
5837         goto cleanup;
5838     }
5839 
5840     
5841 
5842 
5843 
5844 
5845 
5846 
5847 
5848 
5849 
5850 
5851     
5852 
5853 
5854     if( NULL != posted_recv_msg_ref ) {
5855         posted_recv_msg_ref->matched += posted_recv_msg_ref->done;
5856         num_left_unresolved          -= posted_recv_msg_ref->done;
5857         TRAFFIC_MSG_DUMP_MSG_INDV(11, (posted_recv_msg_ref, "Ck.  Recv", true));
5858     }
5859     if( NULL != posted_irecv_msg_ref ) {
5860         posted_irecv_msg_ref->matched += posted_irecv_msg_ref->done;
5861         num_left_unresolved           -= posted_irecv_msg_ref->done;
5862         TRAFFIC_MSG_DUMP_MSG_INDV(11, (posted_irecv_msg_ref, "Ck. iRecv", true));
5863     }
5864     if( NULL != posted_precv_msg_ref ) {
5865         posted_precv_msg_ref->matched += posted_precv_msg_ref->done;
5866         num_left_unresolved           -= posted_precv_msg_ref->done;
5867         TRAFFIC_MSG_DUMP_MSG_INDV(11, (posted_precv_msg_ref, "Ck. pRecv", true));
5868     }
5869     if( NULL != posted_unknown_recv_msg_ref ) {
5870         posted_unknown_recv_msg_ref->matched += posted_unknown_recv_msg_ref->done;
5871         num_left_unresolved                  -= posted_unknown_recv_msg_ref->done;
5872         TRAFFIC_MSG_DUMP_MSG_INDV(11, (posted_unknown_recv_msg_ref, "Ck. uRecv", true));
5873     }
5874     if( NULL != posted_unknown_precv_msg_ref ) {
5875         posted_unknown_precv_msg_ref->matched += posted_unknown_precv_msg_ref->done;
5876         num_left_unresolved                   -= posted_unknown_precv_msg_ref->done;
5877         TRAFFIC_MSG_DUMP_MSG_INDV(11, (posted_unknown_precv_msg_ref, "Ck. upRecv", true));
5878     }
5879 
5880     OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5881                          "crcp:bkmrk: %s <-- %s "
5882                          "Stage 1: Ck.Drain: [TR %3d/MS %3d] sent %4d, unres %4d, res %4d",
5883                          OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5884                          OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5885                          peer_ref->total_msgs_recvd,
5886                          peer_ref->matched_msgs_sent,
5887                          p_num_sent,
5888                          num_left_unresolved,
5889                          *num_resolved));
5890 
5891     
5892 
5893 
5894 
5895     if( num_left_unresolved <= 0 ) {
5896         goto cleanup;
5897     }
5898 
5899     
5900 
5901 
5902 
5903 
5904     if( NULL != posted_recv_msg_ref ) {
5905         if( posted_recv_msg_ref->active > num_left_unresolved ) {
5906             posted_recv_msg_ref->matched += num_left_unresolved;
5907             num_still_active             += num_left_unresolved;
5908             num_left_unresolved           = 0;
5909         } else {
5910             posted_recv_msg_ref->matched += posted_recv_msg_ref->active;
5911             num_still_active             += posted_recv_msg_ref->active;
5912             num_left_unresolved          -= posted_recv_msg_ref->active;
5913         }
5914     }
5915     if( num_left_unresolved > 0 && NULL != posted_irecv_msg_ref ) {
5916         if( posted_irecv_msg_ref->active > num_left_unresolved ) {
5917             posted_irecv_msg_ref->matched += num_left_unresolved;
5918             num_still_active              += num_left_unresolved;
5919             num_left_unresolved            = 0;
5920         } else {
5921             posted_irecv_msg_ref->matched += posted_irecv_msg_ref->active;
5922             num_still_active              += posted_irecv_msg_ref->active;
5923             num_left_unresolved           -= posted_irecv_msg_ref->active;
5924         }
5925     }
5926     if( num_left_unresolved > 0 && NULL != posted_precv_msg_ref ) {
5927         if( posted_precv_msg_ref->active > num_left_unresolved ) {
5928             posted_precv_msg_ref->matched += num_left_unresolved;
5929             num_still_active              += num_left_unresolved;
5930             num_left_unresolved            = 0;
5931         } else {
5932             posted_precv_msg_ref->matched += posted_precv_msg_ref->active;
5933             num_still_active              += posted_precv_msg_ref->active;
5934             num_left_unresolved           -= posted_precv_msg_ref->active;
5935         }
5936     }
5937     if( num_left_unresolved > 0 && NULL != posted_unknown_recv_msg_ref ) {
5938         if( posted_unknown_recv_msg_ref->active > num_left_unresolved ) {
5939             posted_unknown_recv_msg_ref->matched += num_left_unresolved;
5940             num_still_active                     += num_left_unresolved;
5941             num_left_unresolved                   = 0;
5942         } else {
5943             posted_unknown_recv_msg_ref->matched += posted_unknown_recv_msg_ref->active;
5944             num_still_active                     += posted_unknown_recv_msg_ref->active;
5945             num_left_unresolved                  -= posted_unknown_recv_msg_ref->active;
5946         }
5947     }
5948     if( num_left_unresolved > 0 && NULL != posted_unknown_precv_msg_ref ) {
5949         if( posted_unknown_precv_msg_ref->active > num_left_unresolved ) {
5950             posted_unknown_precv_msg_ref->matched += num_left_unresolved;
5951             num_still_active                      += num_left_unresolved;
5952             num_left_unresolved                    = 0;
5953         } else {
5954             posted_unknown_precv_msg_ref->matched += posted_unknown_precv_msg_ref->active;
5955             num_still_active                      += posted_unknown_precv_msg_ref->active;
5956             num_left_unresolved                   -= posted_unknown_precv_msg_ref->active;
5957         }
5958     }
5959 
5960     
5961 
5962 
5963 
5964 
5965     OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5966                          "crcp:bkmrk: %s <-- %s "
5967                          "Stage 2: Ck.Drain: [TR %3d/MS %3d] sent %4d, unres %4d, res %4d, active %4d",
5968                          OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5969                          OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5970                          peer_ref->total_msgs_recvd,
5971                          peer_ref->matched_msgs_sent,
5972                          p_num_sent,
5973                          num_left_unresolved,
5974                          *num_resolved,
5975                          num_still_active
5976                          ));
5977 
5978     
5979 
5980 
5981     if(num_left_unresolved < 0 ) {
5982         ERROR_SHOULD_NEVER_HAPPEN_ARG("crcp:bkmrk: Ck.Drain: Unresolved (%3d) < 0", num_left_unresolved);
5983         exit_status = OMPI_ERROR;
5984         goto cleanup;
5985     }
5986 
5987     
5988 
5989 
5990 
5991 
5992     if( num_left_unresolved <= 0 &&
5993         num_still_active    <= 0) {
5994         goto cleanup;
5995     }
5996 
5997     
5998 
5999 
6000 
6001 
6002 
6003 
6004     if( num_still_active > 0 ) {
6005         
6006 
6007 
6008 
6009 
6010         if( NULL != posted_recv_msg_ref ) {
6011             
6012             if (current_msg_id         == posted_recv_msg_ref->msg_id  &&
6013                 COORD_MSG_TYPE_B_RECV  == posted_recv_msg_ref->msg_type) {
6014                 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
6015                                      "crcp:bkmrk: %s <-- %s "
6016                                      "Recv Check: Found a message that is 'active'! Prepare to STALL.\n",
6017                                      OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
6018                                      OMPI_NAME_PRINT(&(peer_ref->proc_name)) ));
6019                 stall_for_completion = true;
6020             }
6021             else {
6022                 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
6023                                      "crcp:bkmrk: %s <-- %s "
6024                                      "Recv Check: Found a message that is 'active', but is not the current recv! "
6025                                      "No stall required [%3d, %3d, %3d, %3d].\n",
6026                                      OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
6027                                      OMPI_NAME_PRINT(&(peer_ref->proc_name)),
6028                                      (int)current_msg_id,
6029                                      (int)current_msg_type,
6030                                      (int)posted_recv_msg_ref->msg_id,
6031                                      (int)posted_recv_msg_ref->msg_type));
6032             }
6033         }
6034 
6035         
6036 
6037 
6038 
6039 
6040 
6041         traffic_message_create_drain_message(false, num_still_active,
6042                                              peer_ref,
6043                                              &posted_recv_msg_ref,
6044                                              &num_posted);
6045         num_still_active -= num_posted;
6046         *num_resolved    += num_posted;
6047         peer_ref->total_msgs_recvd += num_posted;
6048 
6049         traffic_message_create_drain_message(false, num_still_active,
6050                                              peer_ref,
6051                                              &posted_irecv_msg_ref,
6052                                              &num_posted);
6053         num_still_active -= num_posted;
6054         *num_resolved    += num_posted;
6055         peer_ref->total_msgs_recvd += num_posted;
6056 
6057         traffic_message_create_drain_message(false, num_still_active,
6058                                              peer_ref,
6059                                              &posted_precv_msg_ref,
6060                                              &num_posted);
6061         num_still_active -= num_posted;
6062         *num_resolved    += num_posted;
6063         peer_ref->total_msgs_recvd += num_posted;
6064 
6065         traffic_message_create_drain_message(false, num_still_active,
6066                                              peer_ref,
6067                                              &posted_unknown_recv_msg_ref,
6068                                              &num_posted);
6069         num_still_active -= num_posted;
6070         *num_resolved    += num_posted;
6071         peer_ref->total_msgs_recvd += num_posted;
6072 
6073         traffic_message_create_drain_message(false, num_still_active,
6074                                              peer_ref,
6075                                              &posted_unknown_precv_msg_ref,
6076                                              &num_posted);
6077         num_still_active -= num_posted;
6078         *num_resolved    += num_posted;
6079         peer_ref->total_msgs_recvd += num_posted;
6080     }
6081 
6082     OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
6083                          "crcp:bkmrk: %s <-- %s "
6084                          "Stage 3: Ck.Drain: [TR %3d/MS %3d] sent %4d, unres %4d, res %4d, active %4d",
6085                          OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
6086                          OMPI_NAME_PRINT(&(peer_ref->proc_name)),
6087                          peer_ref->total_msgs_recvd,
6088                          peer_ref->matched_msgs_sent,
6089                          p_num_sent,
6090                          num_left_unresolved,
6091                          *num_resolved,
6092                          num_still_active
6093                          ));
6094 
6095     
6096 
6097 
6098 
6099 
6100     if( num_left_unresolved > 0 ) {
6101         
6102         CREATE_NEW_MSG(posted_tmp_msg_ref, COORD_MSG_TYPE_I_RECV,
6103                        count, datatype_size, tag, rank,
6104                        ompi_comm_lookup(comm_id),
6105                        peer_ref->proc_name.jobid,
6106                        peer_ref->proc_name.vpid);
6107 
6108         traffic_message_create_drain_message(true, num_left_unresolved,
6109                                              peer_ref,
6110                                              &posted_tmp_msg_ref,
6111                                              &num_posted);
6112         num_left_unresolved  -= num_posted;
6113         *num_resolved        += num_posted;
6114         peer_ref->total_msgs_recvd += num_posted;
6115 
6116         HOKE_TRAFFIC_MSG_REF_RETURN(posted_tmp_msg_ref);
6117     }
6118 
6119     OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
6120                          "crcp:bkmrk: %s <-- %s "
6121                          "Stage 4: Ck.Drain: [TR %3d/MS %3d] sent %4d, unres %4d, res %4d, active %4d",
6122                          OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
6123                          OMPI_NAME_PRINT(&(peer_ref->proc_name)),
6124                          peer_ref->total_msgs_recvd,
6125                          peer_ref->matched_msgs_sent,
6126                          p_num_sent,
6127                          num_left_unresolved,
6128                          *num_resolved,
6129                          num_still_active
6130                          ));
6131 
6132     
6133  cleanup:
6134     return exit_status;
6135 }
6136 
6137 static int do_recv_msg_detail_resp(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
6138                                    int resp, int num_resolv, int total_found)
6139 {
6140     opal_buffer_t * buffer = NULL;
6141     int exit_status = OMPI_SUCCESS;
6142     int ret;
6143 
6144     if (NULL == (buffer = OBJ_NEW(opal_buffer_t))) {
6145         exit_status = OMPI_ERROR;
6146         goto cleanup;
6147     }
6148 
6149     PACK_BUFFER(buffer, resp, 1, OPAL_UINT32,
6150                 "crcp:bkmrk: recv_msg_details: Unable to ask peer for more messages");
6151     PACK_BUFFER(buffer, num_resolv, 1, OPAL_UINT32,
6152                 "crcp:bkmrk: recv_msg_details: Unable to ask peer for more messages");
6153     PACK_BUFFER(buffer, total_found, 1, OPAL_UINT32,
6154                 "crcp:bkmrk: recv_msg_details: Unable to ask peer for more messages");
6155 
6156     if (ORTE_SUCCESS != (ret = ompi_rte_send_buffer_nb(&peer_ref->proc_name, buffer,
6157                                                        OMPI_CRCP_COORD_BOOKMARK_TAG,
6158                                                        orte_rml_send_callback, NULL))) {
6159         opal_output(mca_crcp_bkmrk_component.super.output_handle,
6160                     "crcp:bkmrk: recv_msg_detail_resp: Unable to send message detail response to peer %s: Return %d\n",
6161                     OMPI_NAME_PRINT(&peer_ref->proc_name),
6162                     ret);
6163         exit_status = OMPI_ERROR;
6164         goto cleanup;
6165     }
6166 
6167  cleanup:
6168     if( NULL != buffer) {
6169         OBJ_RELEASE(buffer);
6170         buffer = NULL;
6171     }
6172 
6173     return exit_status;
6174 }
6175 
6176 
6177 
6178 
6179 
6180 static void start_time(int idx) {
6181     if(idx < CRCP_TIMER_MAX ) {
6182         timer_start[idx] = get_time();
6183     }
6184 }
6185 
6186 static void end_time(int idx) {
6187     if(idx < CRCP_TIMER_MAX ) {
6188         timer_end[idx] = get_time();
6189     }
6190 }
6191 
6192 static double get_time() {
6193     double wtime;
6194 
6195 #if OPAL_TIMER_USEC_NATIVE
6196     wtime = (double)opal_timer_base_get_usec() / 1000000.0;
6197 #else
6198     struct timeval tv;
6199     gettimeofday(&tv, NULL);
6200     wtime = tv.tv_sec;
6201     wtime += (double)tv.tv_usec / 1000000.0;
6202 #endif
6203 
6204     return wtime;
6205 }
6206 
6207 static void clear_timers(void) {
6208     int i;
6209     for(i = 0; i < CRCP_TIMER_MAX; ++i) {
6210         timer_start[i] = 0.0;
6211         timer_end[i]   = 0.0;
6212     }
6213 }
6214 
6215 static void display_all_timers(int state) {
6216     bool report_ready = false;
6217     double barrier_start, barrier_stop;
6218     int i, ret;
6219 
6220     if( 0 != OMPI_PROC_MY_NAME->vpid ) {
6221         if( 2 > timing_enabled ) {
6222             return;
6223         }
6224         else if( 2 == timing_enabled ) {
6225             if( OPAL_SUCCESS != (ret = opal_pmix.fence(NULL, 0))) {
6226                 OPAL_ERROR_LOG(ret);
6227             }
6228             return;
6229         }
6230     }
6231 
6232     for( i = 0; i < CRCP_TIMER_MAX; ++i) {
6233         if(timer_end[i] > 0.001) {
6234             report_ready = true;
6235         }
6236     }
6237     if( !report_ready ) {
6238         return;
6239     }
6240 
6241     opal_output(0, "crcp:bkmrk: timing(%20s): ******************** Begin: [State = %12s]\n", "Summary", opal_crs_base_state_str(state));
6242     for( i = 0; i < CRCP_TIMER_MAX; ++i) {
6243         display_indv_timer_core(i, 0, 0, false);
6244     }
6245 
6246     if( timing_enabled >= 2) {
6247         barrier_start = get_time();
6248         if( OPAL_SUCCESS != (ret = opal_pmix.fence(NULL, 0))) {
6249             OPAL_ERROR_LOG(ret);
6250         }
6251         barrier_stop = get_time();
6252         opal_output(0,
6253                     "crcp:bkmrk: timing(%20s): %20s = %10.2f s\n",
6254                     "",
6255                     "Group Barrier",
6256                     (barrier_stop - barrier_start));
6257     }
6258 
6259     opal_output(0, "crcp:bkmrk: timing(%20s): ******************** End:   [State = %12s]\n", "Summary", opal_crs_base_state_str(state));
6260 
6261 }
6262 
6263 static void display_indv_timer(int idx, int proc, int msgs) {
6264     display_indv_timer_core(idx, proc, msgs, true);
6265 }
6266 
6267 static void display_indv_timer_core(int idx, int proc, int msgs, bool direct) {
6268     double diff = timer_end[idx] - timer_start[idx];
6269     char * str = NULL;
6270 
6271     if( 0 != OMPI_PROC_MY_NAME->vpid && timing_enabled < 3 ) {
6272         return;
6273     }
6274 
6275     
6276     if(timer_end[idx] <= 0.001) {
6277         return;
6278     }
6279 
6280     switch(idx) {
6281     case CRCP_TIMER_CKPT_EX_PEER_S:
6282     case CRCP_TIMER_CKPT_EX_PEER_R:
6283     case CRCP_TIMER_CKPT_CHECK_PEER_S:
6284     case CRCP_TIMER_CKPT_CHECK_PEER_R:
6285         
6286 
6287         if( direct && timing_enabled >= 2) {
6288             opal_asprintf(&str, "Proc %2d, Msg %5d", proc, msgs);
6289         } else {
6290             return;
6291         }
6292         break;
6293     default:
6294         str = strdup("");
6295         break;
6296     }
6297 
6298     opal_output(0,
6299                 "crcp:bkmrk: timing(%20s): %20s = %10.2f s\n",
6300                 str,
6301                 timer_label[idx],
6302                 diff);
6303     free(str);
6304     str = NULL;
6305 }
6306 
6307 
6308 #if OPAL_ENABLE_DEBUG
6309 static void traffic_message_dump_msg_content_indv(ompi_crcp_bkmrk_pml_message_content_ref_t * content_ref)
6310 {
6311     OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
6312                          "\t\t(%3d) Content: [A/D/P/Dr] [%s / %s / %s /%s]",
6313                          (int)content_ref->msg_id,
6314                          (content_ref->active ? "T" : "F"),
6315                          (content_ref->done ? "T" : "F"),
6316                          (content_ref->already_posted ? "T" : "F"),
6317                          (content_ref->already_drained ? "T" : "F")));
6318 }
6319 
6320 static void traffic_message_dump_msg_indv(ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref, char * msg, bool vshort)
6321 {
6322     ompi_crcp_bkmrk_pml_message_content_ref_t  *content_ref = NULL;
6323     opal_list_item_t* cont_item = NULL;
6324     char * type_name = NULL;
6325 
6326     switch(msg_ref->msg_type) {
6327     case COORD_MSG_TYPE_B_SEND:
6328         type_name = strdup(" Send");
6329         break;
6330     case COORD_MSG_TYPE_I_SEND:
6331         type_name = strdup("iSend");
6332         break;
6333     case COORD_MSG_TYPE_P_SEND:
6334         type_name = strdup("pSend");
6335         break;
6336     case COORD_MSG_TYPE_B_RECV:
6337         type_name = strdup(" Recv");
6338         break;
6339     case COORD_MSG_TYPE_I_RECV:
6340         type_name = strdup("iRecv");
6341         break;
6342     case COORD_MSG_TYPE_P_RECV:
6343         type_name = strdup("pRecv");
6344         break;
6345     default:
6346         type_name = strdup("Unknown");
6347         break;
6348     }
6349 
6350     if( !vshort ) {
6351         opal_output(0, "\t%s %10s (%3d): [m %3d/d %3d/a %3d/ad %3d/p %3d] Contents %2d ... count %6d, tag %6d, rank %3d",
6352                     type_name,
6353                     msg,
6354                     (int)msg_ref->msg_id,
6355                     msg_ref->matched,
6356                     msg_ref->done,
6357                     msg_ref->active,
6358                     msg_ref->active_drain,
6359                     msg_ref->posted,
6360                     (int)opal_list_get_size(&msg_ref->msg_contents),
6361                     (int)msg_ref->count,
6362                     msg_ref->tag,
6363                     msg_ref->rank);
6364     } else {
6365         opal_output(0, "\t%s %10s (%3d): [m %3d/d %3d/a %3d/ad %3d/p %3d] Contents %2d ... count %6d",
6366                     type_name,
6367                     msg,
6368                     (int)msg_ref->msg_id,
6369                     msg_ref->matched,
6370                     msg_ref->done,
6371                     msg_ref->active,
6372                     msg_ref->active_drain,
6373                     msg_ref->posted,
6374                     (int)opal_list_get_size(&msg_ref->msg_contents),
6375                     (int)msg_ref->count);
6376     }
6377 
6378     free(type_name);
6379 
6380     for(cont_item  = opal_list_get_first(&(msg_ref->msg_contents));
6381         cont_item != opal_list_get_end(  &(msg_ref->msg_contents));
6382         cont_item  = opal_list_get_next(cont_item) ) {
6383         content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)cont_item;
6384 
6385         traffic_message_dump_msg_content_indv(content_ref);
6386     }
6387 }
6388 
6389 static void traffic_message_dump_drain_msg_indv(ompi_crcp_bkmrk_pml_drain_message_ref_t * msg_ref, char * msg, bool vshort)
6390 {
6391     ompi_crcp_bkmrk_pml_message_content_ref_t  *content_ref = NULL;
6392     opal_list_item_t* cont_item = NULL;
6393     char * type_name = NULL;
6394 
6395     switch(msg_ref->msg_type) {
6396     case COORD_MSG_TYPE_B_SEND:
6397         type_name = strdup(" Send");
6398         break;
6399     case COORD_MSG_TYPE_I_SEND:
6400         type_name = strdup("iSend");
6401         break;
6402     case COORD_MSG_TYPE_P_SEND:
6403         type_name = strdup("pSend");
6404         break;
6405     case COORD_MSG_TYPE_B_RECV:
6406         type_name = strdup(" Recv");
6407         break;
6408     case COORD_MSG_TYPE_I_RECV:
6409         type_name = strdup("iRecv");
6410         break;
6411     case COORD_MSG_TYPE_P_RECV:
6412         type_name = strdup("pRecv");
6413         break;
6414     default:
6415         type_name = strdup("Unknown");
6416         break;
6417     }
6418 
6419     if( !vshort ) {
6420         opal_output(0, "\t%s %10s (%3d): [d %3d/a %3d] Contents %2d ... count %6d, tag %6d, rank %3d",
6421                     type_name,
6422                     msg,
6423                     (int)msg_ref->msg_id,
6424                     msg_ref->done,
6425                     msg_ref->active,
6426                     (int)opal_list_get_size(&msg_ref->msg_contents),
6427                     (int)msg_ref->count,
6428                     msg_ref->tag,
6429                     msg_ref->rank);
6430     } else {
6431         opal_output(0, "\t%s %10s (%3d): [d %3d/a %3d] Contents %2d ... count %6d",
6432                     type_name,
6433                     msg,
6434                     (int)msg_ref->msg_id,
6435                     msg_ref->done,
6436                     msg_ref->active,
6437                     (int)opal_list_get_size(&msg_ref->msg_contents),
6438                     (int)msg_ref->count);
6439     }
6440 
6441     free(type_name);
6442 
6443     for(cont_item  = opal_list_get_first(&(msg_ref->msg_contents));
6444         cont_item != opal_list_get_end(  &(msg_ref->msg_contents));
6445         cont_item  = opal_list_get_next(cont_item) ) {
6446         content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)cont_item;
6447 
6448         traffic_message_dump_msg_content_indv(content_ref);
6449     }
6450 }
6451 
6452 static void traffic_message_dump_msg_list(opal_list_t *msg_list, bool is_drain)
6453 {
6454     opal_list_item_t* item = NULL;
6455     ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref = NULL;
6456     ompi_crcp_bkmrk_pml_drain_message_ref_t   * drain_msg_ref = NULL;
6457 
6458     for(item  = opal_list_get_last(msg_list);
6459         item != opal_list_get_begin(msg_list);
6460         item  = opal_list_get_prev(item) ) {
6461         if( !is_drain ) {
6462             msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)item;
6463             traffic_message_dump_msg_indv(msg_ref, "", false);
6464         } else {
6465             drain_msg_ref = (ompi_crcp_bkmrk_pml_drain_message_ref_t*)item;
6466             traffic_message_dump_drain_msg_indv(drain_msg_ref, "Drain", false);
6467         }
6468     }
6469 }
6470 
6471 static void traffic_message_dump_peer(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref, char * msg, bool root_only)
6472 {
6473     if( root_only && ompi_process_info.my_name.vpid != 0 ) {
6474         return;
6475     } else {
6476         sleep(ompi_process_info.my_name.vpid * 2);
6477     }
6478 
6479     opal_output(0, "------------- %s ---------------------------------", msg);
6480     opal_output(0, "%s <-> %s Totals Sent [ %3d / %3d ] Recv [ %3d / %3d ]",
6481                 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
6482                 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
6483                 peer_ref->total_msgs_sent,
6484                 peer_ref->matched_msgs_sent,
6485                 peer_ref->total_msgs_recvd,
6486                 peer_ref->matched_msgs_recvd);
6487     opal_output(0, "\n");
6488 
6489     traffic_message_dump_msg_list(&(peer_ref->send_list),      false);
6490     traffic_message_dump_msg_list(&(peer_ref->isend_list),     false);
6491     traffic_message_dump_msg_list(&(peer_ref->send_init_list), false);
6492 
6493     traffic_message_dump_msg_list(&(peer_ref->recv_list),      false);
6494     traffic_message_dump_msg_list(&(peer_ref->irecv_list),     false);
6495     traffic_message_dump_msg_list(&(peer_ref->recv_init_list), false);
6496 
6497     traffic_message_dump_msg_list(&(peer_ref->drained_list),   true);
6498 
6499     opal_output(0, "--------------------------------------------------");
6500     usleep(250000);
6501 }
6502 #endif