This source file includes following definitions.
- ompi_crcp_bkmrk_pml_peer_ref_construct
- ompi_crcp_bkmrk_pml_peer_ref_destruct
- ompi_crcp_bkmrk_pml_message_content_ref_construct
- ompi_crcp_bkmrk_pml_message_content_ref_destruct
- ompi_crcp_bkmrk_pml_traffic_message_ref_construct
- ompi_crcp_bkmrk_pml_traffic_message_ref_destruct
- ompi_crcp_bkmrk_pml_drain_message_ref_construct
- ompi_crcp_bkmrk_pml_drain_message_ref_destruct
- ompi_crcp_bkmrk_pml_drain_message_ack_ref_construct
- ompi_crcp_bkmrk_pml_drain_message_ack_ref_destruct
- ompi_crcp_bkmrk_pml_init
- ompi_crcp_bkmrk_pml_finalize
- ompi_crcp_bkmrk_pml_enable
- ompi_crcp_bkmrk_pml_progress
- ompi_crcp_bkmrk_pml_iprobe
- ompi_crcp_bkmrk_pml_probe
- ompi_crcp_bkmrk_pml_dump
- ompi_crcp_bkmrk_pml_add_comm
- ompi_crcp_bkmrk_pml_del_comm
- ompi_crcp_bkmrk_pml_add_procs
- ompi_crcp_bkmrk_pml_del_procs
- ompi_crcp_bkmrk_pml_isend_init
- ompi_crcp_bkmrk_pml_start_isend_init
- ompi_crcp_bkmrk_request_complete_isend_init
- ompi_crcp_bkmrk_pml_isend
- ompi_crcp_bkmrk_request_complete_isend
- ompi_crcp_bkmrk_pml_send
- ompi_crcp_bkmrk_pml_irecv_init
- ompi_crcp_bkmrk_pml_start_drain_irecv_init
- ompi_crcp_bkmrk_pml_start_irecv_init
- ompi_crcp_bkmrk_request_complete_irecv_init
- ompi_crcp_bkmrk_pml_irecv
- ompi_crcp_bkmrk_request_complete_irecv
- ompi_crcp_bkmrk_pml_recv
- ompi_crcp_bkmrk_pml_start
- ompi_crcp_bkmrk_request_complete
- ompi_crcp_bkmrk_pml_quiesce_start
- ompi_crcp_bkmrk_pml_quiesce_end
- ompi_crcp_bkmrk_pml_ft_event
- traffic_message_append
- traffic_message_start
- traffic_message_move
- traffic_message_find_mark_persistent
- traffic_message_grab_content
- traffic_message_create_drain_message
- traffic_message_find_recv
- traffic_message_find
- drain_message_append
- drain_message_remove
- drain_message_check_recv
- drain_message_find_any
- drain_message_find
- drain_message_grab_content
- drain_message_copy_remove_persistent
- drain_message_copy_remove
- find_peer
- find_peer_in_comm
- ft_event_coordinate_peers
- ft_event_finalize_exchange
- ft_event_exchange_bookmarks
- ft_event_check_bookmarks
- ft_event_post_drain_acks
- drain_message_ack_cbfunc
- ft_event_post_drained
- ft_event_post_drain_message
- ft_event_wait_quiesce
- wait_quiesce_drained
- coord_request_wait_all
- coord_request_wait
- wait_quiesce_drain_ack
- send_bookmarks
- recv_bookmarks
- recv_bookmarks_cbfunc
- send_msg_details
- do_send_msg_detail
- recv_msg_details
- do_recv_msg_detail
- do_recv_msg_detail_check_drain
- do_recv_msg_detail_resp
- start_time
- end_time
- get_time
- clear_timers
- display_all_timers
- display_indv_timer
- display_indv_timer_core
- traffic_message_dump_msg_content_indv
- traffic_message_dump_msg_indv
- traffic_message_dump_drain_msg_indv
- traffic_message_dump_msg_list
- traffic_message_dump_peer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 #include "ompi_config.h"
24
25 #include <sys/types.h>
26 #ifdef HAVE_UNISTD_H
27 #include <unistd.h>
28 #endif
29
30 #include "opal/dss/dss.h"
31 #include "opal/runtime/opal_cr.h"
32 #include "opal/mca/event/event.h"
33 #include "opal/util/output.h"
34 #include "opal/util/printf.h"
35
36 #include "opal/util/opal_environ.h"
37 #include "ompi/mca/mca.h"
38 #include "opal/mca/pmix/pmix.h"
39
40 #include "ompi/request/request.h"
41 #include "ompi/mca/rte/rte.h"
42 #include "ompi/mca/pml/pml.h"
43 #include "ompi/mca/pml/base/base.h"
44 #include "ompi/mca/pml/base/pml_base_request.h"
45 #include "ompi/mca/crcp/crcp.h"
46 #include "ompi/mca/crcp/base/base.h"
47
48 #include "opal/class/opal_free_list.h"
49 #include "ompi/runtime/ompi_cr.h"
50 #include "orte/runtime/orte_wait.h"
51
52 #include "crcp_bkmrk.h"
53 #include "crcp_bkmrk_pml.h"
54
55
56
57
58 #define PROBE_ANY_SIZE ((size_t) 0)
59 #define PROBE_ANY_COUNT ((size_t) 0)
60
61 #define PERSIST_MARKER ((int) -1)
62
63 #define RECV_MATCH_RESP_DONE 0
64 #define RECV_MATCH_RESP_MORE 1
65 #define RECV_MATCH_RESP_ERROR 2
66
67 #define INVALID_INT -123456789
68
69 #define FIND_MSG_TRUE 0
70 #define FIND_MSG_FALSE 1
71 #define FIND_MSG_UNKNOWN 2
72
73
74 static mca_pml_base_component_t *wrapped_pml_component = NULL;
75 static mca_pml_base_module_t *wrapped_pml_module = NULL;
76
77
78 static uint64_t message_seq_num = 1;
79 static uint64_t content_ref_seq_num = 1;
80
81
82 static uint64_t current_msg_id = 0;
83 static ompi_crcp_bkmrk_pml_message_type_t current_msg_type = 0;
84
85
86
87 static bool stall_for_completion;
88
89
90
91
92 static int ft_event_state = OPAL_CRS_RUNNING;
93
94
95
96
97 opal_list_t ompi_crcp_bkmrk_pml_peer_refs;
98
99
100
101
102 opal_list_t unknown_recv_from_list;
103 opal_list_t unknown_persist_recv_list;
104
105
106
107
108 opal_list_t drained_msg_ack_list;
109
110
111
112
113 opal_free_list_t coord_state_free_list;
114 opal_free_list_t content_ref_free_list;
115 opal_free_list_t peer_ref_free_list;
116 opal_free_list_t traffic_msg_ref_free_list;
117 opal_free_list_t drain_msg_ref_free_list;
118 opal_free_list_t drain_ack_msg_ref_free_list;
119
120
121
122
123 ompi_request_t ** quiesce_requests = NULL;
124 ompi_status_public_t ** quiesce_statuses = NULL;
125 int quiesce_request_count = 0;
126
127
128
129
130
131 static int ompi_crcp_bkmrk_pml_start_isend_init(ompi_request_t **request);
132 static int ompi_crcp_bkmrk_pml_start_irecv_init(ompi_request_t **request);
133 static int ompi_crcp_bkmrk_pml_start_drain_irecv_init(ompi_request_t **request, bool *found_drain);
134
135 static int ompi_crcp_bkmrk_request_complete_isend_init(struct ompi_request_t *request,
136 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
137 int src, int tag, int tmp_ddt_size);
138 static int ompi_crcp_bkmrk_request_complete_isend(struct ompi_request_t *request,
139 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
140 int src, int tag, int tmp_ddt_size);
141 static int ompi_crcp_bkmrk_request_complete_irecv_init(struct ompi_request_t *request,
142 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
143 int src, int tag, int tmp_ddt_size);
144 static int ompi_crcp_bkmrk_request_complete_irecv(struct ompi_request_t *request,
145 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
146 int src, int tag, int tmp_ddt_size);
147
148
149
150
151 static int traffic_message_append(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
152 opal_list_t * append_list,
153 ompi_crcp_bkmrk_pml_message_type_t msg_type,
154 size_t count,
155 ompi_datatype_t *datatype,
156 size_t ddt_size,
157 int tag,
158 int dest,
159 struct ompi_communicator_t* comm,
160 ompi_crcp_bkmrk_pml_traffic_message_ref_t **msg_ref);
161
162
163
164
165 static int traffic_message_start(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref,
166 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
167 ompi_request_t **request,
168 opal_list_t * peer_list,
169 ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref);
170
171
172
173
174
175 static int traffic_message_move(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref,
176 ompi_crcp_bkmrk_pml_message_type_t msg_type,
177 ompi_crcp_bkmrk_pml_peer_ref_t *from_peer_ref,
178 opal_list_t * from_list,
179 ompi_crcp_bkmrk_pml_peer_ref_t *to_peer_ref,
180 opal_list_t * to_list,
181 ompi_crcp_bkmrk_pml_traffic_message_ref_t **new_msg_ref,
182 bool keep_active,
183 bool remove);
184
185
186
187
188 static int traffic_message_grab_content(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref,
189 ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref,
190 bool remove,
191 bool already_drained);
192
193
194
195
196 static int traffic_message_find_mark_persistent(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref,
197 ompi_request_t **request,
198 bool cur_active,
199 bool set_is_active,
200 ompi_crcp_bkmrk_pml_message_content_ref_t **content_ref);
201
202
203
204
205 static int traffic_message_find(opal_list_t * search_list,
206 size_t count, int tag, int peer, uint32_t comm_id,
207 size_t ddt_size,
208 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** found_msg_ref,
209 int active);
210
211
212
213
214
215 static int traffic_message_find_recv(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
216 int rank, uint32_t comm_id, int tag,
217 size_t count, size_t datatype_size,
218 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_recv_msg_ref,
219 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_irecv_msg_ref,
220 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_precv_msg_ref,
221 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_unknown_recv_msg_ref,
222 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_unknown_precv_msg_ref);
223
224
225
226
227 static int traffic_message_create_drain_message(bool post_drain,
228 int max_post,
229 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
230 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_msg_ref,
231 int *num_posted);
232
233
234
235
236 static int drain_message_append(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
237 ompi_crcp_bkmrk_pml_message_type_t msg_type,
238 size_t count, size_t ddt_size,
239 int tag,int dest,
240 struct ompi_communicator_t* comm,
241 ompi_crcp_bkmrk_pml_drain_message_ref_t **msg_ref);
242
243
244
245
246 static int drain_message_remove(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
247 ompi_crcp_bkmrk_pml_drain_message_ref_t *msg_ref,
248 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref);
249
250
251
252
253 static int drain_message_check_recv(void **buf, size_t count,
254 ompi_datatype_t *datatype,
255 int *src, int *tag,
256 struct ompi_communicator_t* comm,
257 struct ompi_request_t **request,
258 ompi_status_public_t** status,
259 bool *found_drain);
260
261
262
263
264 static int drain_message_find(opal_list_t * search_list,
265 size_t count, int tag, int peer,
266 uint32_t comm_id, size_t ddt_size,
267 ompi_crcp_bkmrk_pml_drain_message_ref_t ** found_msg_ref,
268 ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref);
269
270
271
272
273 static int drain_message_find_any(size_t count, int tag, int peer,
274 struct ompi_communicator_t* comm, size_t ddt_size,
275 ompi_crcp_bkmrk_pml_drain_message_ref_t ** found_msg_ref,
276 ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref,
277 ompi_crcp_bkmrk_pml_peer_ref_t **peer_ref);
278
279
280
281
282 static int drain_message_grab_content(ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref,
283 ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref);
284
285
286
287
288 static int drain_message_copy_remove(ompi_crcp_bkmrk_pml_drain_message_ref_t *msg_ref,
289 ompi_crcp_bkmrk_pml_message_content_ref_t * content_ref,
290 int *src, int *tag,
291 struct ompi_request_t **request,
292 ompi_status_public_t **status,
293 ompi_datatype_t *datatype, int count, void **buf,
294 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref);
295
296
297
298
299 static int drain_message_copy_remove_persistent(ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref,
300 ompi_crcp_bkmrk_pml_message_content_ref_t *drain_content_ref,
301 ompi_crcp_bkmrk_pml_traffic_message_ref_t *traffic_msg_ref,
302 ompi_request_t *request,
303 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref);
304
305
306
307
308 static ompi_crcp_bkmrk_pml_peer_ref_t* find_peer(ompi_process_name_t proc);
309
310
311
312
313 static int find_peer_in_comm(struct ompi_communicator_t* comm, int proc_idx,
314 ompi_crcp_bkmrk_pml_peer_ref_t **peer_ref);
315
316
317
318
319
320 static int ft_event_coordinate_peers(void);
321
322
323
324
325
326 static int ft_event_finalize_exchange(void);
327
328
329
330
331
332
333
334 static int ft_event_exchange_bookmarks(void);
335
336
337
338
339 static int send_bookmarks(int peer_idx);
340
341
342
343
344 static int recv_bookmarks(int peer_idx);
345
346
347
348
349 static void recv_bookmarks_cbfunc(int status,
350 ompi_process_name_t* sender,
351 opal_buffer_t *buffer,
352 ompi_rml_tag_t tag,
353 void* cbdata);
354 static int total_recv_bookmarks = 0;
355
356
357
358
359
360 static int ft_event_check_bookmarks(void);
361
362
363
364
365
366 static int send_msg_details(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
367 int total_sent, int total_matched);
368
369
370
371
372
373 static int do_send_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
374 ompi_crcp_bkmrk_pml_traffic_message_ref_t*msg_ref,
375 int *num_matches,
376 int *total_found,
377 bool *finished);
378
379
380
381
382 static int recv_msg_details(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
383 int total_recv, int total_matched);
384
385
386
387
388 static int do_recv_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
389 int *rank, uint32_t *comm_id, int *tag,
390 size_t *count, size_t *datatype_size,
391 int *p_num_sent);
392
393
394
395
396
397
398 static int do_recv_msg_detail_check_drain(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
399 int rank, uint32_t comm_id, int tag,
400 size_t count, size_t datatype_size,
401 int p_num_sent,
402 int *num_resolved);
403
404
405
406
407 static int do_recv_msg_detail_resp(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
408 int resp,
409 int num_resolv,
410 int total_found);
411
412
413
414
415
416
417 static int ft_event_post_drain_acks(void);
418
419
420
421
422 static void drain_message_ack_cbfunc(int status,
423 ompi_process_name_t* sender,
424 opal_buffer_t *buffer,
425 ompi_rml_tag_t tag,
426 void* cbdata);
427
428
429
430
431
432 static int ft_event_post_drained(void);
433
434 static int ft_event_post_drain_message(ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref,
435 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref);
436
437
438
439
440
441
442 static int ft_event_wait_quiesce(void);
443
444
445
446
447 static int wait_quiesce_drained(void);
448
449
450
451
452
453
454 static int coord_request_wait_all( size_t count,
455 ompi_request_t ** requests,
456 ompi_status_public_t ** statuses);
457
458
459
460
461
462
463
464 static int coord_request_wait( ompi_request_t * request,
465 ompi_status_public_t * status);
466
467
468
469
470 static int wait_quiesce_drain_ack(void);
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507 #define CRCP_TIMER_TOTAL_CKPT 0
508 #define CRCP_TIMER_CKPT_EX_B 1
509 #define CRCP_TIMER_CKPT_EX_PEER_S 2
510 #define CRCP_TIMER_CKPT_EX_PEER_R 3
511 #define CRCP_TIMER_CKPT_EX_WAIT 4
512 #define CRCP_TIMER_CKPT_CHECK_B 5
513 #define CRCP_TIMER_CKPT_CHECK_PEER_S 6
514 #define CRCP_TIMER_CKPT_CHECK_PEER_R 7
515 #define CRCP_TIMER_CKPT_POST_DRAIN 8
516 #define CRCP_TIMER_CKPT_WAIT_QUI 9
517 #define CRCP_TIMER_TOTAL_CONT 10
518 #define CRCP_TIMER_TOTAL_RST 11
519 #define CRCP_TIMER_MAX 12
520
521 static double get_time(void);
522 static void start_time(int idx);
523 static void end_time(int idx);
524 static void display_indv_timer(int idx, int proc, int msgs);
525 static void display_indv_timer_core(int idx, int proc, int msgs, bool direct);
526 static void display_all_timers(int state);
527 static void clear_timers(void);
528
529 double timer_start[CRCP_TIMER_MAX];
530 double timer_end[CRCP_TIMER_MAX];
531 char * timer_label[CRCP_TIMER_MAX];
532
533 #define START_TIMER(idx) \
534 { \
535 if(OPAL_UNLIKELY(timing_enabled > 0)) { \
536 start_time(idx); \
537 } \
538 }
539
540 #define END_TIMER(idx) \
541 { \
542 if(OPAL_UNLIKELY(timing_enabled > 0)) { \
543 end_time(idx); \
544 } \
545 }
546
547 #define DISPLAY_INDV_TIMER(idx, proc, msg) \
548 { \
549 if(OPAL_UNLIKELY(timing_enabled > 0)) { \
550 display_indv_timer(idx, proc, msg); \
551 } \
552 }
553
554 #define DISPLAY_ALL_TIMERS(var) \
555 { \
556 if(OPAL_UNLIKELY(timing_enabled > 0)) { \
557 display_all_timers(var); \
558 } \
559 }
560
561
562
563
564 #if OPAL_ENABLE_DEBUG
565 static void traffic_message_dump_peer(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref, char * msg, bool root_only);
566 static void traffic_message_dump_msg_list(opal_list_t *msg_list, bool is_drain);
567 static void traffic_message_dump_msg_indv(ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref, char * msg, bool vshort);
568 static void traffic_message_dump_msg_content_indv(ompi_crcp_bkmrk_pml_message_content_ref_t * content_ref);
569
570 static void traffic_message_dump_drain_msg_indv(ompi_crcp_bkmrk_pml_drain_message_ref_t * msg_ref, char * msg, bool vshort);
571
572 #define TRAFFIC_MSG_DUMP_PEER(lv, a) { \
573 if( lv <= mca_crcp_bkmrk_component.super.verbose ) { \
574 traffic_message_dump_peer a; \
575 } \
576 }
577 #define TRAFFIC_MSG_DUMP_MSG_LIST(lv, a) { \
578 if( lv <= mca_crcp_bkmrk_component.super.verbose ) { \
579 traffic_message_dump_msg_list a; \
580 } \
581 }
582 #define TRAFFIC_MSG_DUMP_MSG_INDV(lv, a) { \
583 if( lv <= mca_crcp_bkmrk_component.super.verbose ) { \
584 traffic_message_dump_msg_indv a; \
585 } \
586 }
587 #define TRAFFIC_MSG_DUMP_MSG_CONTENT_INDV(lv, a) { \
588 if( lv <= mca_crcp_bkmrk_component.super.verbose ) { \
589 traffic_message_dump_msg_content_indv a; \
590 } \
591 }
592 #define TRAFFIC_MSG_DUMP_DRAIN_MSG_INDV(lv, a) { \
593 if( lv <= mca_crcp_bkmrk_component.super.verbose ) { \
594 traffic_message_dump_drain_msg_indv a; \
595 } \
596 }
597 #else
598 #define TRAFFIC_MSG_DUMP_PEER(lv, a) ;
599 #define TRAFFIC_MSG_DUMP_MSG_LIST(lv, a) ;
600 #define TRAFFIC_MSG_DUMP_MSG_INDV(lv, a) ;
601 #define TRAFFIC_MSG_DUMP_MSG_CONTENT_INDV(lv, a) ;
602 #define TRAFFIC_MSG_DUMP_DRAIN_MSG_INDV(lv, a) ;
603 #endif
604
605 #define ERROR_SHOULD_NEVER_HAPPEN(msg) { \
606 opal_output(0, msg \
607 " ---------- This should never happen ---------- (%s:%d)", \
608 __FILE__, __LINE__); \
609 }
610
611 #define ERROR_SHOULD_NEVER_HAPPEN_ARG(msg, arg) { \
612 opal_output(0, msg \
613 " ---------- This should never happen ---------- (%s:%d)", \
614 arg, __FILE__, __LINE__); \
615 }
616
617
618
619
620
621
622
623 #define HOKE_PEER_REF_ALLOC(peer_ref) \
624 do { \
625 peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t *) \
626 opal_free_list_wait (&peer_ref_free_list); \
627 } while(0)
628
629 #define HOKE_PEER_REF_RETURN(peer_ref) \
630 do { \
631 opal_free_list_return (&peer_ref_free_list, \
632 (opal_free_list_item_t*)peer_ref); \
633 } while(0)
634
635
636 #define HOKE_CONTENT_REF_ALLOC(content_ref) \
637 do { \
638 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*) \
639 opal_free_list_wait (&content_ref_free_list); \
640 content_ref->msg_id = content_ref_seq_num; \
641 content_ref_seq_num++; \
642 } while(0)
643
644 #define HOKE_CONTENT_REF_RETURN(content_ref) \
645 do { \
646 opal_free_list_return (&content_ref_free_list, \
647 (opal_free_list_item_t*)content_ref); \
648 } while(0)
649
650
651 #define HOKE_TRAFFIC_MSG_REF_ALLOC(msg_ref) \
652 do { \
653 msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*) \
654 opal_free_list_wait (&traffic_msg_ref_free_list); \
655 } while(0)
656
657 #define HOKE_TRAFFIC_MSG_REF_RETURN(msg_ref) \
658 do { \
659 opal_free_list_return (&traffic_msg_ref_free_list, \
660 (opal_free_list_item_t*)msg_ref); \
661 } while(0)
662
663
664 #define HOKE_DRAIN_MSG_REF_ALLOC(msg_ref) \
665 do { \
666 msg_ref = (ompi_crcp_bkmrk_pml_drain_message_ref_t *) \
667 opal_free_list_wait (&drain_msg_ref_free_list); \
668 } while(0)
669
670 #define HOKE_DRAIN_MSG_REF_RETURN(msg_ref) \
671 do { \
672 opal_free_list_return (&drain_msg_ref_free_list, \
673 (opal_free_list_item_t*)msg_ref); \
674 } while(0)
675
676
677 #define HOKE_DRAIN_ACK_MSG_REF_ALLOC(msg_ref) \
678 do { \
679 msg_ref = (ompi_crcp_bkmrk_pml_drain_message_ack_ref_t *) \
680 opal_free_list_wait (&drain_ack_msg_ref_free_list); \
681 } while(0)
682
683 #define HOKE_DRAIN_ACK_MSG_REF_RETURN(msg_ref) \
684 do { \
685 opal_free_list_return (&drain_ack_msg_ref_free_list, \
686 (opal_free_list_item_t*)msg_ref); \
687 } while(0)
688
689
690
691
692
693 OBJ_CLASS_INSTANCE(ompi_crcp_bkmrk_pml_peer_ref_t,
694 opal_list_item_t,
695 ompi_crcp_bkmrk_pml_peer_ref_construct,
696 ompi_crcp_bkmrk_pml_peer_ref_destruct);
697
698 void ompi_crcp_bkmrk_pml_peer_ref_construct(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref) {
699 peer_ref->proc_name.jobid = ORTE_JOBID_INVALID;
700 peer_ref->proc_name.vpid = ORTE_VPID_INVALID;
701
702 OBJ_CONSTRUCT(&peer_ref->send_list, opal_list_t);
703 OBJ_CONSTRUCT(&peer_ref->isend_list, opal_list_t);
704 OBJ_CONSTRUCT(&peer_ref->send_init_list, opal_list_t);
705
706 OBJ_CONSTRUCT(&peer_ref->recv_list, opal_list_t);
707 OBJ_CONSTRUCT(&peer_ref->irecv_list, opal_list_t);
708 OBJ_CONSTRUCT(&peer_ref->recv_init_list, opal_list_t);
709
710 OBJ_CONSTRUCT(&peer_ref->drained_list, opal_list_t);
711
712 peer_ref->total_msgs_sent = 0;
713 peer_ref->matched_msgs_sent = 0;
714
715 peer_ref->total_msgs_recvd = 0;
716 peer_ref->matched_msgs_recvd = 0;
717
718 peer_ref->total_drained_msgs = 0;
719
720 peer_ref->ack_required = false;
721 }
722
723 void ompi_crcp_bkmrk_pml_peer_ref_destruct( ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref) {
724 opal_list_item_t* item = NULL;
725
726 peer_ref->proc_name.jobid = ORTE_JOBID_INVALID;
727 peer_ref->proc_name.vpid = ORTE_VPID_INVALID;
728
729 while( NULL != (item = opal_list_remove_first(&peer_ref->send_list)) ) {
730 HOKE_TRAFFIC_MSG_REF_RETURN(item);
731 }
732 OBJ_DESTRUCT(&peer_ref->send_list);
733 while( NULL != (item = opal_list_remove_first(&peer_ref->isend_list)) ) {
734 HOKE_TRAFFIC_MSG_REF_RETURN(item);
735 }
736 OBJ_DESTRUCT(&peer_ref->isend_list);
737 while( NULL != (item = opal_list_remove_first(&peer_ref->send_init_list)) ) {
738 HOKE_TRAFFIC_MSG_REF_RETURN(item);
739 }
740 OBJ_DESTRUCT(&peer_ref->send_init_list);
741
742 while( NULL != (item = opal_list_remove_first(&peer_ref->recv_list)) ) {
743 HOKE_TRAFFIC_MSG_REF_RETURN(item);
744 }
745 OBJ_DESTRUCT(&peer_ref->recv_list);
746 while( NULL != (item = opal_list_remove_first(&peer_ref->irecv_list)) ) {
747 HOKE_TRAFFIC_MSG_REF_RETURN(item);
748 }
749 OBJ_DESTRUCT(&peer_ref->irecv_list);
750 while( NULL != (item = opal_list_remove_first(&peer_ref->recv_init_list)) ) {
751 HOKE_TRAFFIC_MSG_REF_RETURN(item);
752 }
753 OBJ_DESTRUCT(&peer_ref->recv_init_list);
754
755 while( NULL != (item = opal_list_remove_first(&peer_ref->drained_list)) ) {
756 HOKE_DRAIN_MSG_REF_RETURN(item);
757 }
758 OBJ_DESTRUCT(&peer_ref->drained_list);
759
760 peer_ref->total_msgs_sent = 0;
761 peer_ref->matched_msgs_sent = 0;
762
763 peer_ref->total_msgs_recvd = 0;
764 peer_ref->matched_msgs_recvd = 0;
765
766 peer_ref->total_drained_msgs = 0;
767
768 peer_ref->ack_required = false;
769 }
770
771
772
773
774 OBJ_CLASS_INSTANCE(ompi_crcp_bkmrk_pml_message_content_ref_t,
775 opal_list_item_t,
776 ompi_crcp_bkmrk_pml_message_content_ref_construct,
777 ompi_crcp_bkmrk_pml_message_content_ref_destruct);
778
779 void ompi_crcp_bkmrk_pml_message_content_ref_construct(ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref)
780 {
781 content_ref->buffer = NULL;
782 content_ref->request = NULL;
783 content_ref->active = false;
784
785 content_ref->done = false;
786 content_ref->active = false;
787 content_ref->already_posted = false;
788 content_ref->already_drained = false;
789
790 content_ref->msg_id = 0;
791 }
792
793 void ompi_crcp_bkmrk_pml_message_content_ref_destruct( ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref)
794 {
795 if( NULL != content_ref->buffer ) {
796 free(content_ref->buffer);
797 }
798 content_ref->buffer = NULL;
799
800 if( NULL != content_ref->request ) {
801 OBJ_RELEASE(content_ref->request);
802 }
803 content_ref->request = NULL;
804
805 content_ref->active = false;
806
807 content_ref->done = false;
808 content_ref->active = false;
809 content_ref->already_posted = false;
810 content_ref->already_drained = false;
811
812 content_ref->msg_id = 0;
813 }
814
815
816
817
818 OBJ_CLASS_INSTANCE(ompi_crcp_bkmrk_pml_traffic_message_ref_t,
819 opal_list_item_t,
820 ompi_crcp_bkmrk_pml_traffic_message_ref_construct,
821 ompi_crcp_bkmrk_pml_traffic_message_ref_destruct);
822
823 void ompi_crcp_bkmrk_pml_traffic_message_ref_construct(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref) {
824 msg_ref->msg_id = 0;
825 msg_ref->msg_type = COORD_MSG_TYPE_UNKNOWN;
826
827 msg_ref->count = 0;
828 msg_ref->ddt_size = 0;
829 msg_ref->tag = 0;
830 msg_ref->rank = 0;
831 msg_ref->comm = NULL;
832
833 OBJ_CONSTRUCT(&msg_ref->msg_contents, opal_list_t);
834
835 msg_ref->proc_name.jobid = ORTE_JOBID_INVALID;
836 msg_ref->proc_name.vpid = ORTE_VPID_INVALID;
837
838 msg_ref->matched = INVALID_INT;
839 msg_ref->done = INVALID_INT;
840 msg_ref->active = INVALID_INT;
841 msg_ref->posted = INVALID_INT;
842 msg_ref->active_drain = INVALID_INT;
843 }
844
845 void ompi_crcp_bkmrk_pml_traffic_message_ref_destruct( ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref) {
846 opal_list_item_t* item = NULL;
847
848 msg_ref->msg_id = 0;
849 msg_ref->msg_type = COORD_MSG_TYPE_UNKNOWN;
850
851 msg_ref->count = 0;
852 msg_ref->ddt_size = 0;
853 msg_ref->tag = 0;
854 msg_ref->rank = 0;
855 msg_ref->comm = NULL;
856
857 while( NULL != (item = opal_list_remove_first(&(msg_ref->msg_contents)) ) ) {
858 HOKE_CONTENT_REF_RETURN(item);
859 }
860 OBJ_DESTRUCT(&(msg_ref->msg_contents));
861
862 msg_ref->proc_name.jobid = ORTE_JOBID_INVALID;
863 msg_ref->proc_name.vpid = ORTE_VPID_INVALID;
864
865 msg_ref->matched = INVALID_INT;
866 msg_ref->done = INVALID_INT;
867 msg_ref->active = INVALID_INT;
868 msg_ref->posted = INVALID_INT;
869 msg_ref->active_drain = INVALID_INT;
870 }
871
872
873
874
875 OBJ_CLASS_INSTANCE(ompi_crcp_bkmrk_pml_drain_message_ref_t,
876 opal_list_item_t,
877 ompi_crcp_bkmrk_pml_drain_message_ref_construct,
878 ompi_crcp_bkmrk_pml_drain_message_ref_destruct);
879
880 void ompi_crcp_bkmrk_pml_drain_message_ref_construct(ompi_crcp_bkmrk_pml_drain_message_ref_t *msg_ref) {
881 msg_ref->msg_id = 0;
882 msg_ref->msg_type = COORD_MSG_TYPE_UNKNOWN;
883
884 msg_ref->count = 0;
885
886 msg_ref->datatype = NULL;
887 msg_ref->ddt_size = 0;
888
889 msg_ref->tag = 0;
890 msg_ref->rank = 0;
891 msg_ref->comm = NULL;
892
893 OBJ_CONSTRUCT(&msg_ref->msg_contents, opal_list_t);
894
895 msg_ref->proc_name.jobid = ORTE_JOBID_INVALID;
896 msg_ref->proc_name.vpid = ORTE_VPID_INVALID;
897
898 msg_ref->done = INVALID_INT;
899 msg_ref->active = INVALID_INT;
900 msg_ref->already_posted = INVALID_INT;
901 }
902
903 void ompi_crcp_bkmrk_pml_drain_message_ref_destruct( ompi_crcp_bkmrk_pml_drain_message_ref_t *msg_ref) {
904 opal_list_item_t* item = NULL;
905
906 msg_ref->msg_id = 0;
907 msg_ref->msg_type = COORD_MSG_TYPE_UNKNOWN;
908
909 msg_ref->count = 0;
910
911 if( NULL != msg_ref->datatype ) {
912 OBJ_RELEASE(msg_ref->datatype);
913 msg_ref->datatype = NULL;
914 }
915 msg_ref->ddt_size = 0;
916
917 msg_ref->tag = 0;
918 msg_ref->rank = 0;
919 msg_ref->comm = NULL;
920
921 while( NULL != (item = opal_list_remove_first(&(msg_ref->msg_contents)) ) ) {
922 HOKE_CONTENT_REF_RETURN(item);
923 }
924 OBJ_DESTRUCT(&(msg_ref->msg_contents));
925
926 msg_ref->proc_name.jobid = ORTE_JOBID_INVALID;
927 msg_ref->proc_name.vpid = ORTE_VPID_INVALID;
928
929 msg_ref->done = INVALID_INT;
930 msg_ref->active = INVALID_INT;
931 msg_ref->already_posted = INVALID_INT;
932 }
933
934
935
936
937 OBJ_CLASS_INSTANCE(ompi_crcp_bkmrk_pml_drain_message_ack_ref_t,
938 opal_list_item_t,
939 ompi_crcp_bkmrk_pml_drain_message_ack_ref_construct,
940 ompi_crcp_bkmrk_pml_drain_message_ack_ref_destruct);
941
942 void ompi_crcp_bkmrk_pml_drain_message_ack_ref_construct(ompi_crcp_bkmrk_pml_drain_message_ack_ref_t *msg_ack_ref) {
943 msg_ack_ref->complete = false;
944
945 msg_ack_ref->peer.jobid = ORTE_JOBID_INVALID;
946 msg_ack_ref->peer.vpid = ORTE_VPID_INVALID;
947 }
948
949 void ompi_crcp_bkmrk_pml_drain_message_ack_ref_destruct( ompi_crcp_bkmrk_pml_drain_message_ack_ref_t *msg_ack_ref) {
950 msg_ack_ref->complete = false;
951
952 msg_ack_ref->peer.jobid = ORTE_JOBID_INVALID;
953 msg_ack_ref->peer.vpid = ORTE_VPID_INVALID;
954 }
955
956
957
958
959
960 OBJ_CLASS_INSTANCE(ompi_crcp_bkmrk_pml_state_t,
961 ompi_crcp_base_pml_state_t,
962 NULL,
963 NULL
964 );
965
966
967
968
969 #define CRCP_COORD_STATE_ALLOC(state_ref) \
970 do { \
971 state_ref = (ompi_crcp_bkmrk_pml_state_t *) \
972 opal_free_list_wait (&coord_state_free_list); \
973 } while(0)
974
975 #define CRCP_COORD_STATE_RETURN(state_ref) \
976 do { \
977 opal_free_list_return (&coord_state_free_list, \
978 (opal_free_list_item_t *)state_ref); \
979 } while(0)
980
981 #define CREATE_COORD_STATE(coord_state, pml_state, v_peer_ref, v_msg_ref) \
982 { \
983 CRCP_COORD_STATE_ALLOC(coord_state); \
984 \
985 coord_state->prev_ptr = pml_state; \
986 coord_state->p_super.super = pml_state->super; \
987 coord_state->p_super.state = pml_state->state; \
988 coord_state->p_super.error_code = pml_state->error_code; \
989 coord_state->p_super.wrapped_pml_component = pml_state->wrapped_pml_component; \
990 coord_state->p_super.wrapped_pml_module = pml_state->wrapped_pml_module; \
991 \
992 coord_state->peer_ref = v_peer_ref; \
993 coord_state->msg_ref = v_msg_ref; \
994 }
995
996 #define EXTRACT_COORD_STATE(pml_state, v_coord_state, v_rtn_state, v_peer_ref, v_msg_ref) \
997 { \
998 v_coord_state = (ompi_crcp_bkmrk_pml_state_t*)pml_state; \
999 v_rtn_state = v_coord_state->prev_ptr; \
1000 v_peer_ref = v_coord_state->peer_ref; \
1001 v_msg_ref = v_coord_state->msg_ref; \
1002 }
1003
1004
1005 #define CREATE_NEW_MSG(msg_ref, v_type, v_count, v_ddt_size, v_tag, v_rank, v_comm, p_jobid, p_vpid) \
1006 { \
1007 HOKE_TRAFFIC_MSG_REF_ALLOC(msg_ref); \
1008 \
1009 msg_ref->msg_id = message_seq_num; \
1010 message_seq_num++; \
1011 \
1012 msg_ref->msg_type = v_type; \
1013 \
1014 msg_ref->count = v_count; \
1015 \
1016 msg_ref->ddt_size = v_ddt_size; \
1017 \
1018 msg_ref->tag = v_tag; \
1019 msg_ref->rank = v_rank; \
1020 msg_ref->comm = v_comm; \
1021 \
1022 msg_ref->proc_name.jobid = p_jobid; \
1023 msg_ref->proc_name.vpid = p_vpid; \
1024 \
1025 msg_ref->matched = 0; \
1026 msg_ref->done = 0; \
1027 msg_ref->active = 0; \
1028 msg_ref->posted = 0; \
1029 msg_ref->active_drain = 0; \
1030 }
1031
1032 #define CREATE_NEW_DRAIN_MSG(msg_ref, v_type, v_count, v_ddt_size, v_tag, v_rank, v_comm, p_jobid, p_vpid) \
1033 { \
1034 HOKE_DRAIN_MSG_REF_ALLOC(msg_ref); \
1035 \
1036 msg_ref->msg_id = message_seq_num; \
1037 message_seq_num++; \
1038 \
1039 msg_ref->msg_type = v_type; \
1040 \
1041 msg_ref->count = v_count; \
1042 \
1043 msg_ref->datatype = NULL; \
1044 msg_ref->ddt_size = ddt_size; \
1045 \
1046 msg_ref->tag = v_tag; \
1047 msg_ref->rank = v_rank; \
1048 msg_ref->comm = v_comm; \
1049 \
1050 msg_ref->proc_name.jobid = p_jobid; \
1051 msg_ref->proc_name.vpid = p_vpid; \
1052 }
1053
1054
1055 #define PACK_BUFFER(buffer, var, count, type, error_msg) \
1056 { \
1057 if (OMPI_SUCCESS != (ret = opal_dss.pack(buffer, &(var), count, type)) ) { \
1058 opal_output(mca_crcp_bkmrk_component.super.output_handle, \
1059 "%s (Return %d)", error_msg, ret); \
1060 exit_status = ret; \
1061 goto cleanup; \
1062 } \
1063 }
1064
1065 #define UNPACK_BUFFER(buffer, var, count, type, error_msg) \
1066 { \
1067 int32_t n = count; \
1068 if (OPAL_SUCCESS != (ret = opal_dss.unpack(buffer, &(var), &n, type)) ) { \
1069 opal_output(mca_crcp_bkmrk_component.super.output_handle, \
1070 "%s (Return %d)", error_msg, ret); \
1071 exit_status = ret; \
1072 goto cleanup; \
1073 } \
1074 }
1075
1076
1077
1078
1079 int ompi_crcp_bkmrk_pml_init(void) {
1080 message_seq_num = 1;
1081 current_msg_id = 0;
1082 current_msg_type = COORD_MSG_TYPE_UNKNOWN;
1083 stall_for_completion = false;
1084 ft_event_state = OPAL_CRS_RUNNING;
1085
1086 OBJ_CONSTRUCT(&ompi_crcp_bkmrk_pml_peer_refs, opal_list_t);
1087
1088 OBJ_CONSTRUCT(&unknown_recv_from_list, opal_list_t);
1089 OBJ_CONSTRUCT(&unknown_persist_recv_list, opal_list_t);
1090
1091 OBJ_CONSTRUCT(&drained_msg_ack_list, opal_list_t);
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101 OBJ_CONSTRUCT(&coord_state_free_list, opal_free_list_t);
1102 opal_free_list_init (&coord_state_free_list,
1103 sizeof(ompi_crcp_bkmrk_pml_state_t),
1104 opal_cache_line_size,
1105 OBJ_CLASS(ompi_crcp_bkmrk_pml_state_t),
1106 0,opal_cache_line_size,
1107 4,
1108 -1,
1109 4,
1110 NULL, 0, NULL, NULL, NULL);
1111
1112 OBJ_CONSTRUCT(&content_ref_free_list, opal_free_list_t);
1113 opal_free_list_init (&content_ref_free_list,
1114 sizeof(ompi_crcp_bkmrk_pml_message_content_ref_t),
1115 opal_cache_line_size,
1116 OBJ_CLASS(ompi_crcp_bkmrk_pml_message_content_ref_t),
1117 0,opal_cache_line_size,
1118 80,
1119 -1,
1120 32,
1121 NULL, 0, NULL, NULL, NULL);
1122
1123 OBJ_CONSTRUCT(&peer_ref_free_list, opal_free_list_t);
1124 opal_free_list_init (&peer_ref_free_list,
1125 sizeof(ompi_crcp_bkmrk_pml_peer_ref_t),
1126 opal_cache_line_size,
1127 OBJ_CLASS(ompi_crcp_bkmrk_pml_peer_ref_t),
1128 0,opal_cache_line_size,
1129 16,
1130 -1,
1131 16,
1132 NULL, 0, NULL, NULL, NULL);
1133
1134 OBJ_CONSTRUCT(&traffic_msg_ref_free_list, opal_free_list_t);
1135 opal_free_list_init (&traffic_msg_ref_free_list,
1136 sizeof(ompi_crcp_bkmrk_pml_traffic_message_ref_t),
1137 opal_cache_line_size,
1138 OBJ_CLASS(ompi_crcp_bkmrk_pml_traffic_message_ref_t),
1139 0,opal_cache_line_size,
1140 32,
1141 -1,
1142 64,
1143 NULL, 0, NULL, NULL, NULL);
1144
1145 OBJ_CONSTRUCT(&drain_msg_ref_free_list, opal_free_list_t);
1146 opal_free_list_init (&drain_msg_ref_free_list,
1147 sizeof(ompi_crcp_bkmrk_pml_drain_message_ref_t),
1148 opal_cache_line_size,
1149 OBJ_CLASS(ompi_crcp_bkmrk_pml_drain_message_ref_t),
1150 0,opal_cache_line_size,
1151 32,
1152 -1,
1153 64,
1154 NULL, 0, NULL, NULL, NULL);
1155
1156 OBJ_CONSTRUCT(&drain_ack_msg_ref_free_list, opal_free_list_t);
1157 opal_free_list_init (&drain_ack_msg_ref_free_list,
1158 sizeof(ompi_crcp_bkmrk_pml_drain_message_ack_ref_t),
1159 opal_cache_line_size,
1160 OBJ_CLASS(ompi_crcp_bkmrk_pml_drain_message_ack_ref_t),
1161 0,opal_cache_line_size,
1162 16,
1163 -1,
1164 16,
1165 NULL, 0, NULL, NULL, NULL);
1166
1167 clear_timers();
1168
1169 if( timing_enabled > 0 ) {
1170 timer_label[CRCP_TIMER_TOTAL_CKPT] = strdup("Total Ckpt.");
1171 timer_label[CRCP_TIMER_CKPT_EX_B] = strdup("Exchange Bookmarks");
1172 timer_label[CRCP_TIMER_CKPT_EX_PEER_S] = strdup(" Ex.Bk. Send Peer");
1173 timer_label[CRCP_TIMER_CKPT_EX_PEER_R] = strdup(" Ex.Bk. Recv Peer");
1174 timer_label[CRCP_TIMER_CKPT_EX_WAIT] = strdup(" Ex.Bk. Wait");
1175
1176 timer_label[CRCP_TIMER_CKPT_CHECK_B] = strdup("Check Bookmarks");
1177 timer_label[CRCP_TIMER_CKPT_CHECK_PEER_S] = strdup(" Ck.Bk. Send Peer");
1178 timer_label[CRCP_TIMER_CKPT_CHECK_PEER_R] = strdup(" Ck.Bk. Recv Peer");
1179
1180 timer_label[CRCP_TIMER_CKPT_POST_DRAIN] = strdup("Post Drain Msgs.");
1181 timer_label[CRCP_TIMER_CKPT_WAIT_QUI] = strdup("Wait for Quiescence");
1182
1183 timer_label[CRCP_TIMER_TOTAL_CONT] = strdup("Total Continue");
1184
1185 timer_label[CRCP_TIMER_TOTAL_RST] = strdup("Total Restart");
1186 }
1187
1188 return OMPI_SUCCESS;
1189 }
1190
1191 int ompi_crcp_bkmrk_pml_finalize(void) {
1192 int i;
1193
1194 current_msg_id = 0;
1195 current_msg_type = COORD_MSG_TYPE_UNKNOWN;
1196 stall_for_completion = false;
1197 ft_event_state = OPAL_CRS_RUNNING;
1198
1199 OBJ_DESTRUCT(&ompi_crcp_bkmrk_pml_peer_refs);
1200
1201 OBJ_DESTRUCT(&unknown_recv_from_list);
1202 OBJ_DESTRUCT(&unknown_persist_recv_list);
1203
1204 OBJ_DESTRUCT(&drained_msg_ack_list);
1205
1206
1207 OBJ_DESTRUCT(&peer_ref_free_list);
1208 OBJ_DESTRUCT(&traffic_msg_ref_free_list);
1209 OBJ_DESTRUCT(&drain_msg_ref_free_list);
1210 OBJ_DESTRUCT(&drain_ack_msg_ref_free_list);
1211 OBJ_DESTRUCT(&content_ref_free_list);
1212
1213 if( timing_enabled > 0 ) {
1214 for(i = 0; i < CRCP_TIMER_MAX; ++i) {
1215 free(timer_label[i]);
1216 timer_label[i] = NULL;
1217 }
1218 }
1219
1220 return OMPI_SUCCESS;
1221 }
1222
1223
1224
1225
1226
1227 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_enable(
1228 bool enable,
1229 ompi_crcp_base_pml_state_t* pml_state )
1230 {
1231
1232 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1233 "crcp:bkmrk: pml_enable()"));
1234
1235 pml_state->error_code = OMPI_SUCCESS;
1236 return pml_state;
1237 }
1238
1239
1240 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_progress(
1241 ompi_crcp_base_pml_state_t* pml_state)
1242 {
1243
1244
1245 OPAL_OUTPUT_VERBOSE((35, mca_crcp_bkmrk_component.super.output_handle,
1246 "crcp:bkmrk: pml_progress()"));
1247
1248 pml_state->error_code = OMPI_SUCCESS;
1249 return pml_state;
1250 }
1251
1252
1253
1254 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_iprobe(
1255 int dst, int tag,
1256 struct ompi_communicator_t* comm,
1257 int *matched,
1258 ompi_status_public_t* status,
1259 ompi_crcp_base_pml_state_t* pml_state )
1260 {
1261 ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref = NULL;
1262 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
1263 int exit_status = OMPI_SUCCESS;
1264 int ret;
1265
1266 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1267 "crcp:bkmrk: pml_iprobe(%d, %d)", dst, tag));
1268
1269
1270
1271
1272
1273
1274 if( OMPI_CRCP_PML_PRE == pml_state->state) {
1275
1276
1277
1278 if( OMPI_SUCCESS != (ret = drain_message_find_any(PROBE_ANY_COUNT, tag, dst,
1279 comm, PROBE_ANY_SIZE,
1280 &drain_msg_ref,
1281 &content_ref,
1282 NULL) ) ) {
1283 ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_iprobe(): Failed trying to find a drained message.");
1284 exit_status = ret;
1285 goto DONE;
1286 }
1287
1288
1289
1290
1291
1292
1293 if( NULL != drain_msg_ref ) {
1294 OPAL_OUTPUT_VERBOSE((12, mca_crcp_bkmrk_component.super.output_handle,
1295 "crcp:bkmrk: pml_iprobe(): Matched a drained message..."));
1296
1297
1298 if( MPI_STATUS_IGNORE != status ) {
1299 memcpy(status, &content_ref->status, sizeof(ompi_status_public_t));
1300 }
1301
1302
1303 *matched = 1;
1304
1305
1306 pml_state->state = OMPI_CRCP_PML_DONE;
1307 pml_state->error_code = OMPI_SUCCESS;
1308 return pml_state;
1309 }
1310
1311
1312
1313 else {
1314
1315 *matched = 0;
1316 }
1317 }
1318
1319 DONE:
1320 pml_state->error_code = exit_status;
1321 return pml_state;
1322 }
1323
1324 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_probe(
1325 int dst, int tag,
1326 struct ompi_communicator_t* comm,
1327 ompi_status_public_t* status,
1328 ompi_crcp_base_pml_state_t* pml_state )
1329 {
1330 ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref = NULL;
1331 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
1332 int exit_status = OMPI_SUCCESS;
1333 int ret;
1334
1335 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1336 "crcp:bkmrk: pml_probe(%d, %d)", dst, tag));
1337
1338
1339
1340
1341
1342
1343 if( OMPI_CRCP_PML_PRE == pml_state->state) {
1344
1345
1346
1347 if( OMPI_SUCCESS != (ret = drain_message_find_any(PROBE_ANY_COUNT, tag, dst,
1348 comm, PROBE_ANY_SIZE,
1349 &drain_msg_ref,
1350 &content_ref,
1351 NULL) ) ) {
1352 ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_probe(): Failed trying to find a drained message.");
1353 exit_status = ret;
1354 goto DONE;
1355 }
1356
1357
1358
1359
1360
1361 if( NULL != drain_msg_ref ) {
1362 OPAL_OUTPUT_VERBOSE((12, mca_crcp_bkmrk_component.super.output_handle,
1363 "crcp:bkmrk: pml_iprobe(): Matched a drained message..."));
1364
1365
1366 if( MPI_STATUS_IGNORE != status ) {
1367 memcpy(status, &content_ref->status, sizeof(ompi_status_public_t));
1368 }
1369
1370
1371 pml_state->state = OMPI_CRCP_PML_DONE;
1372 pml_state->error_code = OMPI_SUCCESS;
1373 return pml_state;
1374 }
1375 }
1376
1377 DONE:
1378 pml_state->error_code = exit_status;
1379 return pml_state;
1380 }
1381
1382
1383 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_dump(
1384 struct ompi_communicator_t* comm,
1385 int verbose,
1386 ompi_crcp_base_pml_state_t* pml_state )
1387 {
1388 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1389 "crcp:bkmrk: pml_dump()"));
1390
1391 pml_state->error_code = OMPI_SUCCESS;
1392 return pml_state;
1393 }
1394
1395
1396
1397 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_add_comm(
1398 struct ompi_communicator_t* comm,
1399 ompi_crcp_base_pml_state_t* pml_state )
1400 {
1401
1402
1403 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1404 "crcp:bkmrk: pml_add_comm()"));
1405
1406 pml_state->error_code = OMPI_SUCCESS;
1407 return pml_state;
1408 }
1409
1410 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_del_comm(
1411 struct ompi_communicator_t* comm,
1412 ompi_crcp_base_pml_state_t* pml_state )
1413 {
1414
1415
1416 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1417 "crcp:bkmrk: pml_del_comm()"));
1418
1419 pml_state->error_code = OMPI_SUCCESS;
1420 return pml_state;
1421 }
1422
1423
1424 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_add_procs(
1425 struct ompi_proc_t **procs,
1426 size_t nprocs,
1427 ompi_crcp_base_pml_state_t* pml_state )
1428 {
1429 ompi_crcp_bkmrk_pml_peer_ref_t *new_peer_ref;
1430 size_t i;
1431
1432 if( OMPI_CRCP_PML_PRE != pml_state->state ){
1433 goto DONE;
1434 }
1435
1436 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1437 "crcp:bkmrk: pml_add_procs()"));
1438
1439
1440
1441
1442 wrapped_pml_component = pml_state->wrapped_pml_component;
1443 wrapped_pml_module = pml_state->wrapped_pml_module;
1444
1445
1446
1447
1448 for( i = 0; i < nprocs; ++i) {
1449 HOKE_PEER_REF_ALLOC(new_peer_ref);
1450
1451 new_peer_ref->proc_name.jobid = OMPI_CAST_RTE_NAME(&procs[i]->super.proc_name)->jobid;
1452 new_peer_ref->proc_name.vpid = OMPI_CAST_RTE_NAME(&procs[i]->super.proc_name)->vpid;
1453
1454 opal_list_append(&ompi_crcp_bkmrk_pml_peer_refs, &(new_peer_ref->super));
1455 }
1456
1457 DONE:
1458 pml_state->error_code = OMPI_SUCCESS;
1459 return pml_state;
1460 }
1461
1462 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_del_procs(
1463 struct ompi_proc_t **procs,
1464 size_t nprocs,
1465 ompi_crcp_base_pml_state_t* pml_state )
1466 {
1467 opal_list_item_t *item = NULL;
1468 ompi_crcp_bkmrk_pml_peer_ref_t *old_peer_ref;
1469 int exit_status = OMPI_SUCCESS;
1470 size_t i;
1471
1472 if( OMPI_CRCP_PML_PRE != pml_state->state ){
1473 goto DONE;
1474 }
1475
1476 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1477 "crcp:bkmrk: pml_del_procs()"));
1478
1479 for( i = 0; i < nprocs; ++i) {
1480 item = (opal_list_item_t*)find_peer(*(ompi_process_name_t*)&procs[i]->super.proc_name);
1481 if(NULL == item) {
1482 opal_output(mca_crcp_bkmrk_component.super.output_handle,
1483 "crcp:bkmrk: del_procs: Unable to find peer %s\n",
1484 OMPI_NAME_PRINT(&procs[i]->super.proc_name));
1485 exit_status = OMPI_ERROR;
1486 goto DONE;
1487 }
1488
1489
1490 opal_list_remove_item(&ompi_crcp_bkmrk_pml_peer_refs, item);
1491 old_peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
1492 HOKE_PEER_REF_RETURN(old_peer_ref);
1493 }
1494
1495 DONE:
1496 pml_state->error_code = exit_status;
1497 return pml_state;
1498 }
1499
1500
1501 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_isend_init(
1502 void *buf, size_t count,
1503 ompi_datatype_t *datatype,
1504 int dst, int tag,
1505 mca_pml_base_send_mode_t mode,
1506 struct ompi_communicator_t* comm,
1507 struct ompi_request_t **request,
1508 ompi_crcp_base_pml_state_t* pml_state )
1509 {
1510 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref = NULL;
1511 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL;
1512 ompi_crcp_bkmrk_pml_state_t *coord_state = NULL;
1513 int exit_status = OMPI_SUCCESS;
1514 int ret;
1515
1516 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1517 "crcp:bkmrk: pml_isend_init()"));
1518
1519
1520
1521
1522
1523 if( OMPI_CRCP_PML_PRE == pml_state->state ) {
1524
1525
1526
1527 if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, dst, &peer_ref) ) ){
1528 opal_output(mca_crcp_bkmrk_component.super.output_handle,
1529 "crcp:bkmrk: isend: Failed to find peer_ref\n");
1530 exit_status = ret;
1531 goto DONE;
1532 }
1533
1534
1535
1536
1537 traffic_message_append(peer_ref, &(peer_ref->send_init_list),
1538 COORD_MSG_TYPE_P_SEND,
1539 count, datatype, 0, tag, dst, comm,
1540 &msg_ref);
1541
1542
1543 CREATE_COORD_STATE(coord_state, pml_state,
1544 peer_ref, msg_ref);
1545
1546 coord_state->p_super.error_code = OMPI_SUCCESS;
1547 return &coord_state->p_super;
1548 }
1549
1550
1551
1552 else if( OMPI_CRCP_PML_POST == pml_state->state ) {
1553 ompi_crcp_base_pml_state_t *rtn_state = NULL;
1554 ompi_crcp_bkmrk_pml_message_content_ref_t *new_content = NULL;
1555
1556 EXTRACT_COORD_STATE(pml_state, coord_state, rtn_state,
1557 peer_ref, msg_ref);
1558
1559
1560
1561
1562 HOKE_CONTENT_REF_ALLOC(new_content);
1563 new_content->buffer = buf;
1564 new_content->request = *request;
1565 new_content->done = false;
1566 new_content->active = false;
1567 new_content->already_posted = true;
1568 new_content->already_drained = false;
1569 OBJ_RETAIN(*request);
1570 opal_list_append(&(msg_ref->msg_contents), &(new_content->super) );
1571
1572 CRCP_COORD_STATE_RETURN(coord_state);
1573
1574 rtn_state->error_code = OMPI_SUCCESS;
1575 return rtn_state;
1576 }
1577
1578 DONE:
1579 pml_state->error_code = exit_status;
1580 return pml_state;
1581 }
1582
1583 static int ompi_crcp_bkmrk_pml_start_isend_init(ompi_request_t **request)
1584 {
1585 int ret, exit_status = OMPI_SUCCESS;
1586 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref = NULL;
1587 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL;
1588 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
1589 mca_pml_base_request_t *breq = NULL;
1590 size_t tmp_ddt_size = 0;
1591
1592 breq = (mca_pml_base_request_t *)(*request);
1593 ompi_datatype_type_size(breq->req_datatype, &tmp_ddt_size);
1594
1595
1596
1597
1598 if( OMPI_SUCCESS != (ret = find_peer_in_comm(breq->req_comm,
1599 breq->req_peer,
1600 &peer_ref) ) ){
1601 opal_output(mca_crcp_bkmrk_component.super.output_handle,
1602 "crcp:bkmrk: req_start(): Failed to find peer_ref\n");
1603 exit_status = ret;
1604 goto DONE;
1605 }
1606
1607
1608 if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->send_init_list),
1609 breq->req_count,
1610 breq->req_tag,
1611 breq->req_peer,
1612 breq->req_comm->c_contextid,
1613 tmp_ddt_size,
1614 &msg_ref,
1615 PERSIST_MARKER
1616 ) ) ) {
1617 opal_output(mca_crcp_bkmrk_component.super.output_handle,
1618 "crcp:bkmrk: pml_start(): Unable to find the proper (send_init) message ref for this recv\n");
1619 exit_status = ret;
1620 goto DONE;
1621 }
1622
1623 if( NULL == msg_ref ) {
1624 ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_start(): Could not find message ref");
1625 exit_status = OMPI_ERROR;
1626 goto DONE;
1627 } else {
1628 traffic_message_start(msg_ref,
1629 peer_ref,
1630 request,
1631 &(peer_ref->send_init_list),
1632 &content_ref);
1633
1634 if( !content_ref->already_drained ) {
1635
1636 peer_ref->total_msgs_sent += 1;
1637 }
1638 }
1639
1640 DONE:
1641 return exit_status;
1642 }
1643
1644 static int ompi_crcp_bkmrk_request_complete_isend_init(struct ompi_request_t *request,
1645 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
1646 int src, int tag, int tmp_ddt_size)
1647 {
1648 int ret, exit_status = OMPI_SUCCESS;
1649 mca_pml_base_request_t *breq = NULL;
1650 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL;
1651 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
1652
1653 breq = (mca_pml_base_request_t *)request;
1654
1655
1656 if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->send_init_list),
1657 breq->req_count,
1658 tag, src,
1659 breq->req_comm->c_contextid,
1660 tmp_ddt_size,
1661 &msg_ref,
1662 FIND_MSG_TRUE
1663 ) ) ) {
1664 opal_output(mca_crcp_bkmrk_component.super.output_handle,
1665 "crcp:bkmrk: req_complete: Unable to find the proper (send_init) message ref for this complete\n");
1666 exit_status = ret;
1667 goto DONE;
1668 }
1669
1670 if( NULL == msg_ref ) {
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
1683 "crcp:bkmrk: request_complete: No match found for this request :( %d, %d ): [%d/%d,%d]\n",
1684 peer_ref->total_msgs_sent, peer_ref->total_msgs_recvd,
1685 breq->req_peer, src, breq->req_comm->c_contextid));
1686 exit_status = OMPI_SUCCESS;
1687 goto DONE;
1688 }
1689
1690
1691 traffic_message_find_mark_persistent(msg_ref, &request,
1692 true,
1693 false,
1694 &content_ref);
1695
1696 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Request Complete (Send_init) --", true));
1697
1698 if( !content_ref->already_drained ) {
1699 msg_ref->done++;
1700 msg_ref->active--;
1701 } else {
1702 msg_ref->active_drain--;
1703 content_ref->already_drained = false;
1704 }
1705
1706 OPAL_OUTPUT_VERBOSE((25, mca_crcp_bkmrk_component.super.output_handle,
1707 "crcp:bkmrk: req_complete: Marked Message... ( %d, %d )\n",
1708 peer_ref->total_msgs_sent, peer_ref->total_msgs_recvd));
1709 DONE:
1710 return exit_status;
1711 }
1712
1713
1714 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_isend(
1715 void *buf, size_t count,
1716 ompi_datatype_t *datatype,
1717 int dst, int tag,
1718 mca_pml_base_send_mode_t mode,
1719 struct ompi_communicator_t* comm,
1720 struct ompi_request_t **request,
1721 ompi_crcp_base_pml_state_t* pml_state )
1722 {
1723 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref = NULL;
1724 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL;
1725 ompi_crcp_bkmrk_pml_state_t *coord_state = NULL;
1726 int exit_status = OMPI_SUCCESS;
1727 int ret;
1728
1729 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1730 "crcp:bkmrk: pml_isend()"));
1731
1732
1733
1734
1735
1736 if( OMPI_CRCP_PML_PRE == pml_state->state ) {
1737
1738
1739
1740 if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, dst, &peer_ref) ) ){
1741 opal_output(mca_crcp_bkmrk_component.super.output_handle,
1742 "crcp:bkmrk: isend: Failed to find peer_ref\n");
1743 exit_status = ret;
1744 goto DONE;
1745 }
1746
1747
1748
1749
1750 traffic_message_append(peer_ref, &(peer_ref->isend_list),
1751 COORD_MSG_TYPE_I_SEND,
1752 count, datatype, 0, tag, dst, comm,
1753 &msg_ref);
1754
1755
1756 peer_ref->total_msgs_sent += 1;
1757
1758
1759 CREATE_COORD_STATE(coord_state, pml_state,
1760 peer_ref, msg_ref);
1761
1762 coord_state->p_super.error_code = OMPI_SUCCESS;
1763 return &coord_state->p_super;
1764 }
1765
1766
1767
1768 else if( OMPI_CRCP_PML_POST == pml_state->state ) {
1769 ompi_crcp_base_pml_state_t *rtn_state = NULL;
1770 ompi_crcp_bkmrk_pml_message_content_ref_t *new_content = NULL;
1771
1772 EXTRACT_COORD_STATE(pml_state, coord_state, rtn_state,
1773 peer_ref, msg_ref);
1774
1775
1776
1777
1778 HOKE_CONTENT_REF_ALLOC(new_content);
1779 new_content->buffer = NULL;
1780 new_content->request = *request;
1781 new_content->done = false;
1782 new_content->active = true;
1783 new_content->already_posted = true;
1784 new_content->already_drained = false;
1785 OBJ_RETAIN(*request);
1786 opal_list_append(&(msg_ref->msg_contents), &(new_content->super) );
1787
1788 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Append Message (isend) --", true));
1789
1790 CRCP_COORD_STATE_RETURN(coord_state);
1791
1792 rtn_state->error_code = OMPI_SUCCESS;
1793 return rtn_state;
1794 }
1795
1796 DONE:
1797 pml_state->error_code = exit_status;
1798 return pml_state;
1799 }
1800
1801 static int ompi_crcp_bkmrk_request_complete_isend(struct ompi_request_t *request,
1802 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
1803 int src, int tag, int tmp_ddt_size)
1804 {
1805 int ret, exit_status = OMPI_SUCCESS;
1806 mca_pml_base_request_t *breq = NULL;
1807 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL;
1808 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
1809
1810 breq = (mca_pml_base_request_t *)request;
1811
1812
1813 if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->isend_list),
1814 breq->req_count,
1815 tag, src,
1816 breq->req_comm->c_contextid,
1817 tmp_ddt_size,
1818 &msg_ref,
1819 FIND_MSG_TRUE
1820 ) ) ) {
1821 opal_output(mca_crcp_bkmrk_component.super.output_handle,
1822 "crcp:bkmrk: req_complete: Unable to find the proper (isend) message ref for this complete\n");
1823 exit_status = ret;
1824 goto DONE;
1825 }
1826
1827 if( NULL == msg_ref ) {
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
1840 "crcp:bkmrk: request_complete: No match found for this request :( %d, %d ): [%d/%d,%d]\n",
1841 peer_ref->total_msgs_sent, peer_ref->total_msgs_recvd,
1842 breq->req_peer, src, breq->req_comm->c_contextid));
1843 exit_status = OMPI_SUCCESS;
1844 goto DONE;
1845 }
1846
1847 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
1848 "crcp:bkmrk: req_complete: Matched an iSend: total = %d",
1849 peer_ref->total_msgs_sent));
1850
1851
1852 traffic_message_grab_content(msg_ref, &content_ref, true, true);
1853
1854 if( !content_ref->already_drained ) {
1855 msg_ref->done++;
1856 msg_ref->active--;
1857 } else {
1858 msg_ref->active_drain--;
1859 content_ref->already_drained = false;
1860 }
1861 HOKE_CONTENT_REF_RETURN(content_ref);
1862
1863 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Request Complete (iSend) --", true));
1864
1865 OPAL_OUTPUT_VERBOSE((25, mca_crcp_bkmrk_component.super.output_handle,
1866 "crcp:bkmrk: req_complete: Marked Message... ( %d, %d )\n",
1867 peer_ref->total_msgs_sent, peer_ref->total_msgs_recvd));
1868 DONE:
1869 return exit_status;
1870 }
1871
1872
1873 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_send(
1874 void *buf, size_t count,
1875 ompi_datatype_t *datatype,
1876 int dst, int tag,
1877 mca_pml_base_send_mode_t mode,
1878 struct ompi_communicator_t* comm,
1879 ompi_crcp_base_pml_state_t* pml_state )
1880 {
1881 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref = NULL;
1882 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL;
1883 ompi_crcp_bkmrk_pml_state_t *coord_state = NULL;
1884 int exit_status = OMPI_SUCCESS;
1885 int ret;
1886
1887 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1888 "crcp:bkmrk: pml_send()"));
1889
1890
1891
1892
1893
1894 if( OMPI_CRCP_PML_PRE == pml_state->state ) {
1895
1896
1897
1898 if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, dst, &peer_ref) ) ){
1899 opal_output(mca_crcp_bkmrk_component.super.output_handle,
1900 "crcp:bkmrk: send: Failed to find peer_ref\n");
1901 exit_status = ret;
1902 goto DONE;
1903 }
1904
1905
1906
1907
1908 traffic_message_append(peer_ref, &(peer_ref->send_list),
1909 COORD_MSG_TYPE_B_SEND,
1910 count, datatype, 0, tag, dst, comm,
1911 &msg_ref);
1912
1913
1914 peer_ref->total_msgs_sent += 1;
1915 current_msg_id = msg_ref->msg_id;
1916 current_msg_type = COORD_MSG_TYPE_B_SEND;
1917
1918
1919 CREATE_COORD_STATE(coord_state, pml_state,
1920 peer_ref, msg_ref);
1921 coord_state->p_super.error_code = OMPI_SUCCESS;
1922
1923 return &coord_state->p_super;
1924 }
1925
1926
1927
1928 else if( OMPI_CRCP_PML_POST == pml_state->state ) {
1929 ompi_crcp_base_pml_state_t *rtn_state = NULL;
1930
1931 EXTRACT_COORD_STATE(pml_state, coord_state, rtn_state,
1932 peer_ref, msg_ref);
1933
1934
1935
1936
1937 msg_ref->done++;
1938 msg_ref->active--;
1939
1940 current_msg_id = 0;
1941 current_msg_type = COORD_MSG_TYPE_UNKNOWN;
1942
1943 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "Send done", true));
1944
1945 CRCP_COORD_STATE_RETURN(coord_state);
1946 rtn_state->error_code = OMPI_SUCCESS;
1947
1948 return rtn_state;
1949 }
1950
1951 DONE:
1952 pml_state->error_code = exit_status;
1953 return pml_state;
1954 }
1955
1956
1957 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_irecv_init(
1958 void *buf, size_t count,
1959 ompi_datatype_t *datatype,
1960 int src, int tag,
1961 struct ompi_communicator_t* comm,
1962 struct ompi_request_t **request,
1963 ompi_crcp_base_pml_state_t* pml_state)
1964 {
1965 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref = NULL;
1966 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL;
1967 ompi_crcp_bkmrk_pml_state_t *coord_state = NULL;
1968 int exit_status = OMPI_SUCCESS;
1969 int ret;
1970
1971 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1972 "crcp:bkmrk: pml_irecv_init()"));
1973
1974
1975
1976
1977
1978
1979 if( OMPI_CRCP_PML_PRE == pml_state->state) {
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990 if( MPI_ANY_SOURCE == src || src < 0) {
1991
1992
1993
1994 traffic_message_append(NULL, &(unknown_persist_recv_list),
1995 COORD_MSG_TYPE_P_RECV,
1996 count, datatype, 0, tag, src, comm,
1997 &msg_ref);
1998
1999 CREATE_COORD_STATE(coord_state, pml_state,
2000 NULL, msg_ref);
2001 }
2002 else {
2003 if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, src, &peer_ref) ) ){
2004 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2005 "crcp:bkmrk: recv: Failed to find peer_ref\n");
2006 exit_status = ret;
2007 goto DONE;
2008 }
2009
2010
2011
2012
2013 traffic_message_append(peer_ref, &(peer_ref->recv_init_list),
2014 COORD_MSG_TYPE_P_RECV,
2015 count, datatype, 0, tag, src, comm,
2016 &msg_ref);
2017
2018 CREATE_COORD_STATE(coord_state, pml_state,
2019 peer_ref, msg_ref);
2020 }
2021
2022 coord_state->p_super.error_code = OMPI_SUCCESS;
2023 return &coord_state->p_super;
2024 }
2025
2026
2027
2028
2029 else if( OMPI_CRCP_PML_POST == pml_state->state) {
2030 ompi_crcp_base_pml_state_t *rtn_state = NULL;
2031 ompi_crcp_bkmrk_pml_message_content_ref_t *new_content = NULL;
2032
2033 EXTRACT_COORD_STATE(pml_state, coord_state, rtn_state,
2034 peer_ref, msg_ref);
2035
2036
2037
2038
2039 HOKE_CONTENT_REF_ALLOC(new_content);
2040 new_content->buffer = buf;
2041 new_content->request = *request;
2042 new_content->done = false;
2043 new_content->active = false;
2044 new_content->already_posted = true;
2045 new_content->already_drained = false;
2046 OBJ_RETAIN(*request);
2047 opal_list_append(&(msg_ref->msg_contents), &(new_content->super) );
2048
2049 CRCP_COORD_STATE_RETURN(coord_state);
2050
2051 rtn_state->error_code = OMPI_SUCCESS;
2052 return rtn_state;
2053 }
2054
2055 DONE:
2056 pml_state->error_code = exit_status;
2057 return pml_state;
2058 }
2059
2060 static int ompi_crcp_bkmrk_pml_start_drain_irecv_init(ompi_request_t **request, bool *found_drain)
2061 {
2062 int ret, exit_status = OMPI_SUCCESS;
2063 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref = NULL;
2064 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL;
2065 ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref = NULL;
2066 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
2067 mca_pml_base_request_t *breq = NULL;
2068 size_t tmp_ddt_size = 0;
2069
2070 *found_drain = false;
2071
2072 breq = (mca_pml_base_request_t *)(*request);
2073 ompi_datatype_type_size(breq->req_datatype, &tmp_ddt_size);
2074
2075
2076
2077
2078 if( 0 <= breq->req_peer ) {
2079 if( OMPI_SUCCESS != (ret = find_peer_in_comm(breq->req_comm,
2080 breq->req_peer,
2081 &peer_ref) ) ){
2082 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2083 "crcp:bkmrk: pml_start(): Failed to find peer_ref\n");
2084 exit_status = ret;
2085 goto DONE;
2086 }
2087
2088 if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->recv_init_list),
2089 breq->req_count,
2090 breq->req_tag,
2091 breq->req_peer,
2092 breq->req_comm->c_contextid,
2093 tmp_ddt_size,
2094 &msg_ref,
2095 PERSIST_MARKER
2096 ) ) ) {
2097 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2098 "crcp:bkmrk: pml_start(): Unable to find the proper (recv) message ref for this recv\n");
2099 exit_status = ret;
2100 goto DONE;
2101 }
2102 }
2103
2104
2105
2106 else {
2107 if( OMPI_SUCCESS != (ret = traffic_message_find(&(unknown_persist_recv_list),
2108 breq->req_count,
2109 breq->req_tag,
2110 INVALID_INT,
2111 breq->req_comm->c_contextid,
2112 tmp_ddt_size,
2113 &msg_ref,
2114 PERSIST_MARKER
2115 ) ) ) {
2116 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2117 "crcp:bkmrk: pml_start(): Unable to find the proper (recv) message ref for this recv\n");
2118 exit_status = ret;
2119 goto DONE;
2120 }
2121 }
2122
2123
2124
2125
2126 if( NULL == msg_ref ) {
2127 ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_start(): Could not find message ref");
2128 exit_status = OMPI_ERROR;
2129 goto DONE;
2130 }
2131
2132
2133
2134
2135 if( NULL != peer_ref ) {
2136 if( OMPI_SUCCESS != (ret = drain_message_find(&(peer_ref->drained_list),
2137 msg_ref->count, msg_ref->tag, msg_ref->rank,
2138 msg_ref->comm->c_contextid, msg_ref->ddt_size,
2139 &drain_msg_ref,
2140 &content_ref) ) ) {
2141 ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_start(): Failed trying to find a drained message.");
2142 exit_status = ret;
2143 goto DONE;
2144 }
2145 } else {
2146 if( OMPI_SUCCESS != (ret = drain_message_find_any(msg_ref->count, msg_ref->tag, msg_ref->rank,
2147 msg_ref->comm, msg_ref->ddt_size,
2148 &drain_msg_ref,
2149 &content_ref,
2150 &peer_ref) ) ) {
2151 ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_start(): Failed trying to find a drained message.");
2152 exit_status = ret;
2153 goto DONE;
2154 }
2155 }
2156
2157
2158
2159
2160 if( NULL != drain_msg_ref ) {
2161 *found_drain = true;
2162 OPAL_OUTPUT_VERBOSE((12, mca_crcp_bkmrk_component.super.output_handle,
2163 "crcp:bkmrk: pml_start(): Matched a drained message..."));
2164
2165 if( OMPI_SUCCESS != (ret = drain_message_copy_remove_persistent(drain_msg_ref,
2166 content_ref,
2167 msg_ref,
2168 *request,
2169 peer_ref) ) ) {
2170 opal_output( mca_crcp_bkmrk_component.super.output_handle,
2171 "crcp:bkmrk: pml_start(): Datatype copy failed (%d)",
2172 ret);
2173 }
2174
2175 peer_ref->total_drained_msgs -= 1;
2176 }
2177
2178 DONE:
2179 return exit_status;
2180 }
2181
2182 static int ompi_crcp_bkmrk_pml_start_irecv_init(ompi_request_t **request)
2183 {
2184 int ret, exit_status = OMPI_SUCCESS;
2185 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref = NULL;
2186 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL;
2187 mca_pml_base_request_t *breq = NULL;
2188 size_t tmp_ddt_size = 0;
2189
2190 breq = (mca_pml_base_request_t *)(*request);
2191 ompi_datatype_type_size(breq->req_datatype, &tmp_ddt_size);
2192
2193
2194
2195
2196 if( 0 <= breq->req_peer ) {
2197 if( OMPI_SUCCESS != (ret = find_peer_in_comm(breq->req_comm,
2198 breq->req_peer,
2199 &peer_ref) ) ){
2200 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2201 "crcp:bkmrk: pml_start(): Failed to find peer_ref\n");
2202 exit_status = ret;
2203 goto DONE;
2204 }
2205
2206 if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->recv_init_list),
2207 breq->req_count,
2208 breq->req_tag,
2209 breq->req_peer,
2210 breq->req_comm->c_contextid,
2211 tmp_ddt_size,
2212 &msg_ref,
2213 PERSIST_MARKER
2214 ) ) ) {
2215 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2216 "crcp:bkmrk: pml_start(): Unable to find the proper (recv) message ref for this recv\n");
2217 exit_status = ret;
2218 goto DONE;
2219 }
2220
2221 if( NULL != msg_ref ) {
2222 traffic_message_start(msg_ref,
2223 peer_ref,
2224 request,
2225 &(peer_ref->recv_init_list),
2226 NULL);
2227 }
2228 }
2229
2230
2231
2232 else {
2233 if( OMPI_SUCCESS != (ret = traffic_message_find(&(unknown_persist_recv_list),
2234 breq->req_count,
2235 breq->req_tag,
2236 INVALID_INT,
2237 breq->req_comm->c_contextid,
2238 tmp_ddt_size,
2239 &msg_ref,
2240 PERSIST_MARKER
2241 ) ) ) {
2242 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2243 "crcp:bkmrk: pml_start(): Unable to find the proper (recv) message ref for this recv\n");
2244 exit_status = ret;
2245 goto DONE;
2246 }
2247
2248 if( NULL != msg_ref ) {
2249 traffic_message_start(msg_ref,
2250 NULL,
2251 request,
2252 &(unknown_persist_recv_list),
2253 NULL);
2254 }
2255 }
2256
2257 if( NULL == msg_ref ) {
2258 ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_start(): Could not find message ref");
2259 exit_status = OMPI_ERROR;
2260 goto DONE;
2261 }
2262
2263 DONE:
2264 return exit_status;
2265 }
2266
2267 static int ompi_crcp_bkmrk_request_complete_irecv_init(struct ompi_request_t *request,
2268 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
2269 int src, int tag, int tmp_ddt_size)
2270 {
2271 int ret, exit_status = OMPI_SUCCESS;
2272 mca_pml_base_request_t *breq = NULL;
2273 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL, *new_msg_ref = NULL;
2274 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
2275
2276 breq = (mca_pml_base_request_t *)request;
2277
2278
2279
2280
2281 if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->recv_init_list),
2282 breq->req_count,
2283 tag, src,
2284 breq->req_comm->c_contextid,
2285 tmp_ddt_size,
2286 &msg_ref,
2287 FIND_MSG_TRUE
2288 ) ) ) {
2289 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2290 "crcp:bkmrk: req_complete: Unable to find the proper (recv_init) message ref for this complete\n");
2291 exit_status = ret;
2292 goto DONE;
2293 }
2294
2295
2296
2297
2298 if( NULL == msg_ref ) {
2299 if( OMPI_SUCCESS != (ret = traffic_message_find(&(unknown_persist_recv_list),
2300 breq->req_count,
2301 tag,
2302 INVALID_INT,
2303 breq->req_comm->c_contextid,
2304 tmp_ddt_size,
2305 &msg_ref,
2306 FIND_MSG_TRUE
2307 ) ) ) {
2308 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2309 "crcp:bkmrk: requ_complete: Unable to find the proper (recv_init) message ref for this complete\n");
2310 exit_status = ret;
2311 goto DONE;
2312 }
2313
2314 if( NULL != msg_ref ) {
2315 traffic_message_move(msg_ref,
2316 COORD_MSG_TYPE_P_RECV,
2317 NULL, &(unknown_persist_recv_list),
2318 peer_ref, &(peer_ref->recv_init_list),
2319 &new_msg_ref,
2320 true,
2321 false);
2322 msg_ref = new_msg_ref;
2323 }
2324 }
2325
2326
2327
2328
2329 if( NULL == msg_ref ) {
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
2342 "crcp:bkmrk: request_complete: No match found for this request :( %d, %d ): [%d/%d,%d]\n",
2343 peer_ref->total_msgs_sent, peer_ref->total_msgs_recvd,
2344 breq->req_peer, src, breq->req_comm->c_contextid));
2345 exit_status = OMPI_SUCCESS;
2346 goto DONE;
2347 }
2348
2349
2350
2351
2352
2353 traffic_message_find_mark_persistent(msg_ref, &request,
2354 true,
2355 false,
2356 &content_ref);
2357 if( NULL == content_ref ) {
2358 exit_status = OMPI_ERROR;
2359 goto DONE;
2360 }
2361
2362 if( !content_ref->already_drained ) {
2363 peer_ref->total_msgs_recvd += 1;
2364 msg_ref->done++;
2365 msg_ref->active--;
2366 } else {
2367 msg_ref->active_drain--;
2368 content_ref->already_drained = false;
2369 }
2370
2371
2372
2373 if( NULL == new_msg_ref ) {
2374 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Request Complete (Recv_Init) --", true));
2375 } else {
2376 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Request Complete (Recv_init - Unknown) --", true));
2377 }
2378
2379 OPAL_OUTPUT_VERBOSE((25, mca_crcp_bkmrk_component.super.output_handle,
2380 "crcp:bkmrk: req_complete: Marked Message... ( %d, %d )\n",
2381 peer_ref->total_msgs_sent, peer_ref->total_msgs_recvd));
2382 DONE:
2383 return exit_status;
2384 }
2385
2386 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_irecv(
2387 void *buf, size_t count,
2388 ompi_datatype_t *datatype,
2389 int src, int tag,
2390 struct ompi_communicator_t* comm,
2391 struct ompi_request_t **request,
2392 ompi_crcp_base_pml_state_t* pml_state )
2393 {
2394 int ret, exit_status = OMPI_SUCCESS;
2395 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref = NULL;
2396 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL;
2397 ompi_crcp_bkmrk_pml_state_t *coord_state = NULL;
2398 bool found_drain = false;
2399
2400 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
2401 "crcp:bkmrk: pml_irecv()"));
2402
2403
2404
2405
2406
2407
2408 if( OMPI_CRCP_PML_PRE == pml_state->state) {
2409
2410
2411
2412 found_drain = false;
2413 if( OMPI_SUCCESS != (ret = drain_message_check_recv(buf, count, datatype,
2414 &src, &tag, comm, request, NULL,
2415 &found_drain) ) ) {
2416 ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_recv(): Failed trying to find a drained message.");
2417 exit_status = ret;
2418 goto DONE;
2419 }
2420
2421 if( found_drain ) {
2422
2423
2424
2425
2426
2427
2428 pml_state->state = OMPI_CRCP_PML_DONE;
2429 pml_state->error_code = OMPI_SUCCESS;
2430 return pml_state;
2431 }
2432
2433
2434
2435 else {
2436
2437
2438
2439 if( MPI_ANY_SOURCE == src || src < 0) {
2440
2441
2442
2443 traffic_message_append(NULL, &(unknown_recv_from_list),
2444 COORD_MSG_TYPE_I_RECV,
2445 count, datatype, 0, tag, src, comm,
2446 &msg_ref);
2447
2448 CREATE_COORD_STATE(coord_state, pml_state,
2449 NULL, msg_ref);
2450 }
2451 else {
2452 if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, src, &peer_ref) ) ){
2453 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2454 "crcp:bkmrk: pml_irecv(): Failed to find peer_ref\n");
2455 exit_status = ret;
2456 goto DONE;
2457 }
2458
2459
2460
2461
2462 traffic_message_append(peer_ref, &(peer_ref->irecv_list),
2463 COORD_MSG_TYPE_I_RECV,
2464 count, datatype, 0, tag, src, comm,
2465 &msg_ref);
2466
2467 CREATE_COORD_STATE(coord_state, pml_state,
2468 peer_ref, msg_ref);
2469 }
2470
2471 coord_state->p_super.error_code = OMPI_SUCCESS;
2472 return &coord_state->p_super;
2473 }
2474 }
2475
2476
2477
2478
2479 else if( OMPI_CRCP_PML_POST == pml_state->state) {
2480 ompi_crcp_base_pml_state_t *rtn_state = NULL;
2481 ompi_crcp_bkmrk_pml_message_content_ref_t *new_content = NULL;
2482
2483 EXTRACT_COORD_STATE(pml_state, coord_state, rtn_state,
2484 peer_ref, msg_ref);
2485
2486
2487
2488
2489 HOKE_CONTENT_REF_ALLOC(new_content);
2490 new_content->buffer = NULL;
2491 new_content->request = *request;
2492 new_content->done = false;
2493 new_content->active = true;
2494 new_content->already_posted = true;
2495 new_content->already_drained = false;
2496 OBJ_RETAIN(*request);
2497 opal_list_append(&(msg_ref->msg_contents), &(new_content->super) );
2498
2499 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Append Message (irecv) --", true));
2500
2501 CRCP_COORD_STATE_RETURN(coord_state);
2502
2503 rtn_state->error_code = OMPI_SUCCESS;
2504 return rtn_state;
2505 }
2506
2507 DONE:
2508 pml_state->error_code = exit_status;
2509 return pml_state;
2510 }
2511
2512 static int ompi_crcp_bkmrk_request_complete_irecv(struct ompi_request_t *request,
2513 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
2514 int src, int tag, int tmp_ddt_size)
2515 {
2516 int ret, exit_status = OMPI_SUCCESS;
2517 mca_pml_base_request_t *breq = NULL;
2518 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL, *new_msg_ref = NULL;
2519 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
2520
2521 breq = (mca_pml_base_request_t *)request;
2522
2523
2524
2525
2526 if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->irecv_list),
2527 breq->req_count,
2528 tag, src,
2529 breq->req_comm->c_contextid,
2530 tmp_ddt_size,
2531 &msg_ref,
2532 FIND_MSG_TRUE
2533 ) ) ) {
2534 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2535 "crcp:bkmrk: req_complete: Unable to find the proper (irecv) message ref for this complete\n");
2536 exit_status = ret;
2537 goto DONE;
2538 }
2539
2540
2541
2542
2543 if( NULL == msg_ref ) {
2544 if( OMPI_SUCCESS != (ret = traffic_message_find(&(unknown_recv_from_list),
2545 breq->req_count,
2546 tag,
2547 INVALID_INT,
2548 breq->req_comm->c_contextid,
2549 tmp_ddt_size,
2550 &msg_ref,
2551 FIND_MSG_TRUE
2552 ) ) ) {
2553 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2554 "crcp:bkmrk: req_complete: Unable to find the proper (recv_init) message ref for this complete\n");
2555 exit_status = ret;
2556 goto DONE;
2557 }
2558
2559 if( NULL != msg_ref ) {
2560 traffic_message_move(msg_ref,
2561 COORD_MSG_TYPE_I_RECV,
2562 NULL, &(unknown_recv_from_list),
2563 peer_ref, &(peer_ref->irecv_list),
2564 &new_msg_ref,
2565 true,
2566 true);
2567 msg_ref = new_msg_ref;
2568 }
2569 }
2570
2571
2572
2573
2574 if( NULL == msg_ref ) {
2575
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
2587 "crcp:bkmrk: request_complete: No match found for this request :( %d, %d ): [%d/%d,%d]\n",
2588 peer_ref->total_msgs_sent, peer_ref->total_msgs_recvd,
2589 breq->req_peer, src, breq->req_comm->c_contextid));
2590 exit_status = OMPI_SUCCESS;
2591 goto DONE;
2592 }
2593
2594
2595
2596
2597 traffic_message_grab_content(msg_ref, &content_ref, true, true);
2598
2599 if( !content_ref->already_drained ) {
2600 peer_ref->total_msgs_recvd += 1;
2601 msg_ref->done++;
2602 msg_ref->active--;
2603 } else {
2604 msg_ref->active_drain--;
2605 content_ref->already_drained = false;
2606 }
2607
2608 HOKE_CONTENT_REF_RETURN(content_ref);
2609
2610 if( NULL == new_msg_ref ) {
2611 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Request Complete (iRecv) --", true));
2612 } else {
2613 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Request Complete (iRecv - Unknown) --", true));
2614 }
2615
2616 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
2617 "crcp:bkmrk: req_complete: Matched an iRecv: total = %d",
2618 peer_ref->total_msgs_recvd));
2619
2620 DONE:
2621 return exit_status;
2622 }
2623
2624 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_recv(
2625 void *buf, size_t count,
2626 ompi_datatype_t *datatype,
2627 int src, int tag,
2628 struct ompi_communicator_t* comm,
2629 ompi_status_public_t* status,
2630 ompi_crcp_base_pml_state_t* pml_state)
2631 {
2632 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref = NULL;
2633 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL, *new_msg_ref = NULL;
2634 ompi_crcp_bkmrk_pml_state_t *coord_state = NULL;
2635 bool found_drain = false;
2636 int exit_status = OMPI_SUCCESS;
2637 int ret;
2638
2639 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
2640 "crcp:bkmrk: pml_recv()"));
2641
2642
2643
2644
2645
2646
2647 if( OMPI_CRCP_PML_PRE == pml_state->state) {
2648
2649
2650
2651 found_drain = false;
2652 if( OMPI_SUCCESS != (ret = drain_message_check_recv(buf, count, datatype,
2653 &src, &tag, comm, NULL, &status,
2654 &found_drain) ) ) {
2655 ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_recv(): Failed trying to find a drained message.");
2656 exit_status = ret;
2657 goto DONE;
2658 }
2659
2660 if( found_drain ) {
2661
2662
2663
2664
2665
2666
2667 pml_state->state = OMPI_CRCP_PML_DONE;
2668 pml_state->error_code = OMPI_SUCCESS;
2669 return pml_state;
2670 }
2671
2672
2673
2674 else {
2675
2676
2677
2678 if( MPI_ANY_SOURCE == src || src < 0) {
2679 traffic_message_append(NULL, &(unknown_recv_from_list),
2680 COORD_MSG_TYPE_B_RECV,
2681 count, datatype, 0, tag, src, comm,
2682 &msg_ref);
2683
2684 CREATE_COORD_STATE(coord_state, pml_state,
2685 NULL, msg_ref);
2686 }
2687 else {
2688 if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, src, &peer_ref) ) ){
2689 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2690 "crcp:bkmrk: pml_recv(): Failed to find peer_ref\n");
2691 exit_status = ret;
2692 goto DONE;
2693 }
2694
2695 traffic_message_append(peer_ref, &(peer_ref->recv_list),
2696 COORD_MSG_TYPE_B_RECV,
2697 count, datatype, 0, tag, src, comm,
2698 &msg_ref);
2699
2700 CREATE_COORD_STATE(coord_state, pml_state,
2701 peer_ref, msg_ref);
2702 }
2703
2704
2705 current_msg_id = msg_ref->msg_id;
2706 current_msg_type = COORD_MSG_TYPE_B_RECV;
2707
2708 coord_state->p_super.error_code = OMPI_SUCCESS;
2709 return &coord_state->p_super;
2710 }
2711 }
2712
2713
2714
2715
2716 else if( OMPI_CRCP_PML_POST == pml_state->state) {
2717 ompi_crcp_base_pml_state_t *rtn_state = NULL;
2718
2719 EXTRACT_COORD_STATE(pml_state, coord_state, rtn_state,
2720 peer_ref, msg_ref);
2721
2722
2723
2724
2725
2726 if( NULL == peer_ref ) {
2727 src = status->MPI_SOURCE;
2728
2729 if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, src, &peer_ref) ) ){
2730 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2731 "crcp:bkmrk: pml_recv(): Failed to resolve peer_ref (rank %d)\n",
2732 src);
2733 exit_status = ret;
2734 goto DONE;
2735 }
2736
2737 traffic_message_move(msg_ref,
2738 COORD_MSG_TYPE_B_RECV,
2739 NULL, &(unknown_recv_from_list),
2740 peer_ref, &(peer_ref->recv_list),
2741 &new_msg_ref,
2742 false,
2743 true);
2744 new_msg_ref->done++;
2745 new_msg_ref->active--;
2746 } else {
2747
2748
2749
2750 msg_ref->done++;
2751 msg_ref->active--;
2752 }
2753
2754 peer_ref->total_msgs_recvd += 1;
2755 current_msg_id = 0;
2756 current_msg_type = COORD_MSG_TYPE_UNKNOWN;
2757
2758 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "Recv Done", true));
2759
2760 CRCP_COORD_STATE_RETURN(coord_state);
2761
2762 rtn_state->error_code = OMPI_SUCCESS;
2763 return rtn_state;
2764 }
2765
2766 DONE:
2767 pml_state->error_code = exit_status;
2768 return pml_state;
2769 }
2770
2771
2772
2773
2774 static ompi_request_type_t * coord_start_req_types = NULL;
2775
2776 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_start(
2777 size_t count,
2778 ompi_request_t** requests,
2779 ompi_crcp_base_pml_state_t* pml_state )
2780 {
2781 int ret, exit_status = OMPI_SUCCESS;
2782 mca_pml_base_request_t *breq = NULL;
2783 size_t tmp_ddt_size = 0;
2784 size_t iter_req;
2785 bool found_drain = false;
2786
2787 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
2788 "crcp:bkmrk: pml_start()"));
2789
2790
2791
2792
2793 if( OMPI_CRCP_PML_POST == pml_state->state ) {
2794 for(iter_req = 0; iter_req < count; iter_req++) {
2795 breq = (mca_pml_base_request_t *)requests[iter_req];
2796 if(breq->req_type == MCA_PML_REQUEST_SEND ) {
2797 if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_pml_start_isend_init(&(requests[iter_req]))) ) {
2798 exit_status = ret;
2799 goto DONE;
2800 }
2801 }
2802 }
2803 }
2804
2805
2806
2807
2808
2809
2810 if( OMPI_CRCP_PML_PRE == pml_state->state ) {
2811
2812
2813
2814 coord_start_req_types = (ompi_request_type_t *)malloc(sizeof(ompi_request_type_t) * count);
2815 for(iter_req = 0; iter_req < count; iter_req++) {
2816 coord_start_req_types[iter_req] = OMPI_REQUEST_NOOP;
2817 }
2818
2819 for(iter_req = 0; iter_req < count; iter_req++) {
2820 breq = (mca_pml_base_request_t *)requests[iter_req];
2821 ompi_datatype_type_size(breq->req_datatype, &tmp_ddt_size);
2822
2823 if( breq->req_type == MCA_PML_REQUEST_RECV ) {
2824 found_drain = false;
2825 if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_pml_start_drain_irecv_init(&(requests[iter_req]), &found_drain)) ) {
2826 exit_status = ret;
2827 goto DONE;
2828 }
2829
2830 if( found_drain ) {
2831 coord_start_req_types[iter_req] = requests[iter_req]->req_type;
2832 requests[iter_req]->req_type = OMPI_REQUEST_NOOP;
2833 requests[iter_req]->req_complete = true;
2834 }
2835 }
2836 }
2837 goto DONE;
2838 }
2839 else if( OMPI_CRCP_PML_POST == pml_state->state) {
2840 for(iter_req = 0; iter_req < count; iter_req++) {
2841 breq = (mca_pml_base_request_t *)requests[iter_req];
2842 ompi_datatype_type_size(breq->req_datatype, &tmp_ddt_size);
2843
2844 if (breq->req_type == MCA_PML_REQUEST_RECV) {
2845
2846
2847
2848
2849
2850
2851 if( NULL != coord_start_req_types ) {
2852 if( OMPI_REQUEST_NOOP != coord_start_req_types[iter_req] ) {
2853 requests[iter_req]->req_type = coord_start_req_types[iter_req];
2854 continue;
2855 }
2856 }
2857
2858 if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_pml_start_irecv_init(&(requests[iter_req]))) ) {
2859 exit_status = ret;
2860 goto DONE;
2861 }
2862 }
2863 }
2864
2865
2866
2867
2868 if( NULL != coord_start_req_types ) {
2869 free(coord_start_req_types);
2870 coord_start_req_types = NULL;
2871 }
2872 }
2873
2874 DONE:
2875 pml_state->error_code = exit_status;
2876 return pml_state;
2877 }
2878
2879
2880 int ompi_crcp_bkmrk_request_complete(struct ompi_request_t *request)
2881 {
2882 int ret, exit_status = OMPI_SUCCESS;
2883 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref = NULL;
2884 mca_pml_base_request_t *breq;
2885 size_t tmp_ddt_size = 0;
2886 int src, tag;
2887
2888 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
2889 "crcp:bkmrk: pml_request_complete()"));
2890
2891
2892
2893
2894 breq = (mca_pml_base_request_t *)request;
2895
2896 if( (breq->req_type != MCA_PML_REQUEST_SEND &&
2897 breq->req_type != MCA_PML_REQUEST_RECV ) ||
2898 request->req_type == OMPI_REQUEST_NOOP ||
2899 request->req_type == OMPI_REQUEST_NULL) {
2900 exit_status = OMPI_SUCCESS;
2901 goto DONE;
2902 }
2903
2904
2905 src = breq->req_peer;
2906 tag = breq->req_tag;
2907 ompi_datatype_type_size(breq->req_datatype, &tmp_ddt_size);
2908
2909
2910
2911
2912 if( MPI_ANY_SOURCE == src ) {
2913 if( OMPI_SUCCESS != (ret = find_peer_in_comm(breq->req_comm, request->req_status.MPI_SOURCE, &peer_ref) ) ){
2914 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2915 "crcp:bkmrk: req_complete(): Failed to find peer_ref\n");
2916 exit_status = ret;
2917 goto DONE;
2918 }
2919 } else {
2920 if( OMPI_SUCCESS != (ret = find_peer_in_comm(breq->req_comm, src, &peer_ref) ) ){
2921 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2922 "crcp:bkmrk: req_complete(): Failed to find peer_ref\n");
2923 exit_status = ret;
2924 goto DONE;
2925 }
2926 }
2927
2928
2929
2930
2931 if(breq->req_type == MCA_PML_REQUEST_SEND ) {
2932
2933
2934
2935 if( false == request->req_persistent ) {
2936 if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_request_complete_isend(request, peer_ref,
2937 src, tag, tmp_ddt_size) ) ) {
2938 exit_status = ret;
2939 goto DONE;
2940 }
2941 }
2942
2943
2944
2945 else {
2946 if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_request_complete_isend_init(request, peer_ref,
2947 src, tag, tmp_ddt_size) ) ) {
2948 exit_status = ret;
2949 goto DONE;
2950 }
2951 }
2952 }
2953
2954
2955
2956 else if(breq->req_type == MCA_PML_REQUEST_RECV) {
2957
2958
2959
2960 if( false == request->req_persistent ) {
2961 if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_request_complete_irecv(request, peer_ref,
2962 src, tag, tmp_ddt_size) ) ) {
2963 exit_status = ret;
2964 goto DONE;
2965 }
2966 }
2967
2968
2969
2970 else {
2971 if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_request_complete_irecv_init(request, peer_ref,
2972 src, tag, tmp_ddt_size) ) ) {
2973 exit_status = ret;
2974 goto DONE;
2975 }
2976 }
2977 }
2978
2979 DONE:
2980 return exit_status;
2981 }
2982
2983
2984 int ompi_crcp_bkmrk_pml_quiesce_start(ompi_crcp_bkmrk_pml_quiesce_tag_type_t tag ) {
2985 int ret, exit_status = OMPI_SUCCESS;
2986
2987 if( OMPI_SUCCESS != (ret = ft_event_coordinate_peers()) ) {
2988 exit_status = ret;
2989 }
2990
2991 return exit_status;
2992 }
2993
2994 int ompi_crcp_bkmrk_pml_quiesce_end(ompi_crcp_bkmrk_pml_quiesce_tag_type_t tag ) {
2995 int ret, exit_status = OMPI_SUCCESS;
2996
2997 if( OMPI_SUCCESS != (ret = ft_event_finalize_exchange() ) ) {
2998 exit_status = ret;
2999 }
3000
3001 return exit_status;
3002 }
3003
3004 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_ft_event(
3005 int state,
3006 ompi_crcp_base_pml_state_t* pml_state)
3007 {
3008 static int step_to_return_to = 0;
3009 static bool first_continue_pass = false;
3010 opal_list_item_t* item = NULL;
3011 int exit_status = OMPI_SUCCESS;
3012 int ret;
3013
3014 ft_event_state = state;
3015
3016 if( step_to_return_to == 1 ) {
3017 goto STEP_1;
3018 }
3019
3020 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
3021 "crcp:bkmrk: pml_ft_event()"));
3022
3023
3024
3025
3026 if(OPAL_CRS_CHECKPOINT == state) {
3027 if( OMPI_CRCP_PML_PRE != pml_state->state){
3028 goto DONE;
3029 }
3030
3031 if( opal_cr_timing_barrier_enabled ) {
3032 OPAL_CR_SET_TIMER(OPAL_CR_TIMER_CRCPBR0);
3033 if( OMPI_SUCCESS != (ret = opal_pmix.fence(NULL, 0))) {
3034 exit_status = ret;
3035 goto DONE;
3036 }
3037 }
3038 OPAL_CR_SET_TIMER(OPAL_CR_TIMER_CRCP0);
3039
3040 START_TIMER(CRCP_TIMER_TOTAL_CKPT);
3041 STEP_1:
3042 step_to_return_to = 0;
3043
3044
3045
3046
3047
3048 if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_pml_quiesce_start(QUIESCE_TAG_CKPT)) ) {
3049 opal_output(mca_crcp_bkmrk_component.super.output_handle,
3050 "crcp:bkmrk: %s ft_event: Checkpoint Coordination Failed %d",
3051 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
3052 ret);
3053 exit_status = ret;
3054 goto DONE;
3055 }
3056
3057 if( stall_for_completion ) {
3058 stall_for_completion = false;
3059 opal_cr_stall_check = true;
3060 step_to_return_to = 1;
3061
3062 exit_status = OMPI_EXISTS;
3063 goto DONE_STALL;
3064 }
3065 END_TIMER(CRCP_TIMER_TOTAL_CKPT);
3066
3067 DISPLAY_ALL_TIMERS(state);
3068 clear_timers();
3069 }
3070
3071
3072
3073 else if(OPAL_CRS_CONTINUE == state) {
3074 if( OMPI_CRCP_PML_POST != pml_state->state){
3075 goto DONE;
3076 }
3077
3078 first_continue_pass = !first_continue_pass;
3079
3080
3081 if (opal_cr_continue_like_restart && first_continue_pass) {
3082 goto DONE;
3083 }
3084
3085 START_TIMER(CRCP_TIMER_TOTAL_CONT);
3086
3087
3088
3089
3090 if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_pml_quiesce_end(QUIESCE_TAG_CONTINUE) ) ) {
3091 opal_output(mca_crcp_bkmrk_component.super.output_handle,
3092 "crcp:bkmrk: pml_ft_event: Checkpoint Finalization Failed %d",
3093 ret);
3094 exit_status = ret;
3095 goto DONE;
3096 }
3097 END_TIMER(CRCP_TIMER_TOTAL_CONT);
3098
3099 DISPLAY_ALL_TIMERS(state);
3100 clear_timers();
3101
3102 if( opal_cr_timing_barrier_enabled ) {
3103 OPAL_CR_SET_TIMER(OPAL_CR_TIMER_COREBR1);
3104 if( OMPI_SUCCESS != (ret = opal_pmix.fence(NULL, 0))) {
3105 exit_status = ret;
3106 goto DONE;
3107 }
3108 }
3109 OPAL_CR_SET_TIMER(OPAL_CR_TIMER_CORE2);
3110 }
3111
3112
3113
3114 else if(OPAL_CRS_RESTART == state) {
3115 if( OMPI_CRCP_PML_POST != pml_state->state){
3116 goto DONE;
3117 }
3118
3119 START_TIMER(CRCP_TIMER_TOTAL_RST);
3120
3121
3122
3123 for(item = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
3124 item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
3125 item = opal_list_get_next(item) ) {
3126 ompi_crcp_bkmrk_pml_peer_ref_t *cur_peer_ref;
3127 cur_peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
3128
3129
3130 cur_peer_ref->proc_name.jobid = OMPI_PROC_MY_NAME->jobid;
3131 }
3132
3133
3134
3135
3136 if( OMPI_SUCCESS != (ret = ft_event_finalize_exchange() ) ) {
3137 opal_output(mca_crcp_bkmrk_component.super.output_handle,
3138 "crcp:bkmrk: pml_ft_event: Checkpoint Finalization Failed %d",
3139 ret);
3140 exit_status = ret;
3141 goto DONE;
3142 }
3143
3144 END_TIMER(CRCP_TIMER_TOTAL_RST);
3145
3146 DISPLAY_ALL_TIMERS(state);
3147 clear_timers();
3148 }
3149
3150
3151
3152 else if(OPAL_CRS_TERM == state ) {
3153 goto DONE;
3154 }
3155
3156
3157
3158 else {
3159 goto DONE;
3160 }
3161
3162 DONE:
3163 step_to_return_to = 0;
3164 ft_event_state = OPAL_CRS_RUNNING;
3165
3166 DONE_STALL:
3167 pml_state->error_code = exit_status;
3168 return pml_state;
3169 }
3170
3171
3172
3173
3174
3175
3176
3177
3178 static int traffic_message_append(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
3179 opal_list_t * append_list,
3180 ompi_crcp_bkmrk_pml_message_type_t msg_type,
3181 size_t count,
3182 ompi_datatype_t *datatype,
3183 size_t in_ddt_size,
3184 int tag,
3185 int dest,
3186 struct ompi_communicator_t* comm,
3187 ompi_crcp_bkmrk_pml_traffic_message_ref_t **msg_ref)
3188 {
3189 int ret, exit_status = OMPI_SUCCESS;
3190 size_t ddt_size = 0;
3191
3192 if( NULL != datatype ) {
3193 ompi_datatype_type_size(datatype,
3194 &ddt_size);
3195 } else {
3196 ddt_size = in_ddt_size;
3197
3198 }
3199
3200
3201
3202
3203
3204
3205 if( OMPI_SUCCESS != (ret = traffic_message_find(append_list,
3206 count, tag, dest,
3207 comm->c_contextid,
3208 ddt_size,
3209 msg_ref,
3210 FIND_MSG_UNKNOWN
3211 ) ) ) {
3212 opal_output(mca_crcp_bkmrk_component.super.output_handle,
3213 "crcp:bkmrk: traffic_message_append: Unable to find the proper message reference.\n");
3214 return OMPI_ERROR;
3215 }
3216
3217 if( NULL != *msg_ref ) {
3218 if( msg_type == COORD_MSG_TYPE_P_SEND ||
3219 msg_type == COORD_MSG_TYPE_P_RECV ) {
3220 (*msg_ref)->posted++;
3221 } else {
3222 (*msg_ref)->active++;
3223 }
3224 } else {
3225 if( NULL != peer_ref ) {
3226 CREATE_NEW_MSG((*msg_ref), msg_type,
3227 count, ddt_size, tag, dest, comm,
3228 peer_ref->proc_name.jobid,
3229 peer_ref->proc_name.vpid);
3230 } else {
3231 CREATE_NEW_MSG((*msg_ref), msg_type,
3232 count, ddt_size, tag, dest, comm,
3233 ORTE_JOBID_INVALID, ORTE_VPID_INVALID);
3234 }
3235
3236 if( msg_type == COORD_MSG_TYPE_P_SEND ||
3237 msg_type == COORD_MSG_TYPE_P_RECV ) {
3238 (*msg_ref)->matched = 0;
3239 (*msg_ref)->done = 0;
3240 (*msg_ref)->active = 0;
3241 (*msg_ref)->posted = 1;
3242 } else {
3243 (*msg_ref)->matched = 0;
3244 (*msg_ref)->done = 0;
3245 (*msg_ref)->active = 1;
3246 (*msg_ref)->posted = 0;
3247 }
3248
3249 opal_list_append(append_list, &((*msg_ref)->super));
3250 }
3251
3252 if( NULL != peer_ref ) {
3253 if( msg_type == COORD_MSG_TYPE_B_SEND ) {
3254 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Append Message (send) --", true));
3255 }
3256 else if( msg_type == COORD_MSG_TYPE_P_SEND ) {
3257 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Append Message (send_init) --", true));
3258 }
3259 else if( msg_type == COORD_MSG_TYPE_B_RECV ) {
3260 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Append Message (recv) --", true));
3261 }
3262 else if( msg_type == COORD_MSG_TYPE_P_RECV ) {
3263 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Append Message (recv_init) --", true));
3264 }
3265 else if( msg_type == COORD_MSG_TYPE_I_SEND || msg_type == COORD_MSG_TYPE_I_RECV ) {
3266 ;
3267 }
3268 else {
3269 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Append Message (Unknown) --", true));
3270 }
3271 }
3272
3273 return exit_status;
3274 }
3275
3276 static int traffic_message_start(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref,
3277 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
3278 ompi_request_t **request,
3279 opal_list_t * peer_list,
3280 ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref)
3281 {
3282
3283
3284
3285 msg_ref->active++;
3286
3287 traffic_message_find_mark_persistent(msg_ref, request,
3288 false,
3289 true,
3290 content_ref);
3291 return OMPI_SUCCESS;
3292 }
3293
3294 static int traffic_message_move(ompi_crcp_bkmrk_pml_traffic_message_ref_t *old_msg_ref,
3295 ompi_crcp_bkmrk_pml_message_type_t msg_type,
3296 ompi_crcp_bkmrk_pml_peer_ref_t *from_peer_ref,
3297 opal_list_t * from_list,
3298 ompi_crcp_bkmrk_pml_peer_ref_t *to_peer_ref,
3299 opal_list_t * to_list,
3300 ompi_crcp_bkmrk_pml_traffic_message_ref_t **new_msg_ref,
3301 bool keep_active,
3302 bool remove)
3303 {
3304 int ret;
3305 ompi_crcp_bkmrk_pml_message_content_ref_t *new_content = NULL, *prev_content = NULL;
3306 ompi_request_t *request = NULL;
3307 bool loc_already_drained = false;
3308
3309
3310 if( COORD_MSG_TYPE_B_RECV != msg_type ) {
3311 traffic_message_grab_content(old_msg_ref, &prev_content, remove, true);
3312 request = prev_content->request;
3313
3314 loc_already_drained = prev_content->already_drained;
3315
3316 if( remove ) {
3317 prev_content->request = NULL;
3318 HOKE_CONTENT_REF_RETURN(prev_content);
3319 }
3320 }
3321
3322 ret = traffic_message_append(to_peer_ref, to_list,
3323 old_msg_ref->msg_type,
3324 old_msg_ref->count,
3325 NULL,
3326 old_msg_ref->ddt_size,
3327 old_msg_ref->tag,
3328 old_msg_ref->rank,
3329 old_msg_ref->comm,
3330 new_msg_ref);
3331
3332 if( loc_already_drained ) {
3333 old_msg_ref->active_drain--;
3334 (*new_msg_ref)->active--;
3335 (*new_msg_ref)->active_drain++;
3336 } else {
3337
3338 old_msg_ref->active--;
3339 }
3340
3341 if( msg_type == COORD_MSG_TYPE_P_SEND ||
3342 msg_type == COORD_MSG_TYPE_P_RECV ) {
3343 if( keep_active ) {
3344 (*new_msg_ref)->active++;
3345 }
3346 }
3347
3348 if( COORD_MSG_TYPE_B_RECV != msg_type && NULL == request ) {
3349 ERROR_SHOULD_NEVER_HAPPEN("Error: Must match a non-blocking send, and there is no matching request.");
3350 }
3351
3352 if( NULL != request ) {
3353 HOKE_CONTENT_REF_ALLOC(new_content);
3354 new_content->buffer = NULL;
3355 new_content->request = request;
3356 new_content->done = false;
3357 new_content->active = keep_active;
3358 new_content->already_posted = true;
3359 new_content->already_drained = loc_already_drained;
3360 OBJ_RETAIN(request);
3361 opal_list_append(&((*new_msg_ref)->msg_contents), &(new_content->super) );
3362 }
3363
3364 if( NULL == from_peer_ref && NULL != to_peer_ref ) {
3365 (*new_msg_ref)->proc_name.jobid = to_peer_ref->proc_name.jobid;
3366 (*new_msg_ref)->proc_name.vpid = to_peer_ref->proc_name.vpid;
3367 }
3368
3369 return ret;
3370 }
3371
3372 static int traffic_message_find_mark_persistent(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref,
3373 ompi_request_t **request,
3374 bool cur_active,
3375 bool set_is_active,
3376 ompi_crcp_bkmrk_pml_message_content_ref_t **c_ref)
3377 {
3378 mca_pml_base_request_t * breq = NULL;
3379 opal_list_item_t* item = NULL;
3380
3381 breq = (mca_pml_base_request_t *)(*request);
3382
3383 for(item = opal_list_get_first(&(msg_ref->msg_contents));
3384 item != opal_list_get_end( &(msg_ref->msg_contents));
3385 item = opal_list_get_next(item) ) {
3386 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
3387 mca_pml_base_request_t * loc_breq = NULL;
3388
3389 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)item;
3390 loc_breq = (mca_pml_base_request_t *)(content_ref->request);
3391
3392 if( content_ref->active != cur_active ) {
3393 continue;
3394 }
3395 else if( loc_breq->req_sequence == breq->req_sequence ) {
3396 OPAL_OUTPUT_VERBOSE((25, mca_crcp_bkmrk_component.super.output_handle,
3397 "%s %8s Request [%d] (%s) %d : %d",
3398 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
3399 (set_is_active ? "Start" : (NULL != c_ref ? "Drain" : "Complete")),
3400 (int)msg_ref->msg_id,
3401 (content_ref->active ? "T" : "F"),
3402 (int)loc_breq->req_sequence,
3403 (int)breq->req_sequence));
3404
3405 content_ref->active = set_is_active;
3406 if( NULL != c_ref ) {
3407 *c_ref = content_ref;
3408 }
3409 break;
3410 }
3411 }
3412
3413 return OMPI_SUCCESS;
3414 }
3415
3416 static int traffic_message_grab_content(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref,
3417 ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref,
3418 bool remove,
3419 bool already_drained)
3420 {
3421 ompi_crcp_bkmrk_pml_message_content_ref_t *new_content = NULL;
3422 ompi_crcp_bkmrk_pml_message_content_ref_t *loc_content_ref = NULL;
3423 opal_list_item_t* item = NULL;
3424
3425
3426
3427
3428 if( 0 >= opal_list_get_size(&msg_ref->msg_contents) ) {
3429 return OMPI_SUCCESS;
3430 }
3431
3432
3433
3434
3435
3436 if( already_drained ) {
3437 item = opal_list_get_first(&(msg_ref->msg_contents));
3438 new_content = (ompi_crcp_bkmrk_pml_message_content_ref_t*)item;
3439 }
3440
3441 for(item = opal_list_get_first(&(msg_ref->msg_contents));
3442 item != opal_list_get_end( &(msg_ref->msg_contents));
3443 item = opal_list_get_next(item) ) {
3444 loc_content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)item;
3445
3446 if( !already_drained ) {
3447 TRAFFIC_MSG_DUMP_MSG_CONTENT_INDV(10, (loc_content_ref));
3448 }
3449
3450 if( loc_content_ref->already_drained == already_drained ) {
3451 new_content = (ompi_crcp_bkmrk_pml_message_content_ref_t*)item;
3452 break;
3453 }
3454 }
3455
3456 if( remove ) {
3457 opal_list_remove_item(&msg_ref->msg_contents, &(new_content->super));
3458 }
3459
3460 if( NULL != content_ref ) {
3461 *content_ref = new_content;
3462 } else if( remove && NULL != new_content ) {
3463 HOKE_CONTENT_REF_RETURN(new_content);
3464 }
3465
3466 return OMPI_SUCCESS;
3467 }
3468
3469 static int traffic_message_create_drain_message(bool post_drain,
3470 int max_post,
3471 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
3472 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_msg_ref,
3473 int *num_posted)
3474 {
3475 ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref = NULL;
3476 ompi_crcp_bkmrk_pml_message_content_ref_t *new_content = NULL, *prev_content = NULL;
3477 int m_iter, m_total;
3478
3479 *num_posted = 0;
3480
3481
3482
3483
3484 if( NULL == (*posted_msg_ref) || max_post <= 0) {
3485 return OMPI_SUCCESS;
3486 }
3487
3488
3489
3490
3491 m_total = max_post;
3492 if( !post_drain && max_post > (*posted_msg_ref)->active ) {
3493 m_total = (*posted_msg_ref)->active;
3494 }
3495
3496 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
3497 "crcp:bkmrk: %s <-- %s "
3498 " --> Create Drain Msg: %s %4d = min(%4d / %4d)",
3499 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
3500 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
3501 (post_drain ? "Posting" : "Not Posting"),
3502 m_total, (*posted_msg_ref)->active, max_post ));
3503
3504 TRAFFIC_MSG_DUMP_MSG_INDV(10, ((*posted_msg_ref), "Drain", true));
3505
3506
3507
3508
3509 drain_message_append(peer_ref,
3510 COORD_MSG_TYPE_I_RECV,
3511 (*posted_msg_ref)->count,
3512 (*posted_msg_ref)->ddt_size,
3513 (*posted_msg_ref)->tag,
3514 (*posted_msg_ref)->rank,
3515 (*posted_msg_ref)->comm,
3516 &drain_msg_ref);
3517
3518
3519
3520
3521 for(m_iter = 0; m_iter < m_total; ++m_iter) {
3522 new_content = NULL;
3523
3524 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
3525 "crcp:bkmrk: %s <-- %s "
3526 " \t--> Find Content: %s (%4d of %4d)",
3527 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
3528 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
3529 (post_drain ? "Posting" : "Not Posting"),
3530 m_iter, m_total));
3531
3532
3533
3534
3535
3536
3537 traffic_message_grab_content((*posted_msg_ref), &prev_content, false, false);
3538 if( NULL != prev_content ) {
3539 prev_content->already_drained = true;
3540 }
3541
3542
3543 if( !post_drain && (*posted_msg_ref)->msg_type != COORD_MSG_TYPE_B_RECV ) {
3544 assert( NULL != prev_content );
3545 }
3546
3547
3548 if( NULL != prev_content ) {
3549 (*posted_msg_ref)->active--;
3550 }
3551 (*posted_msg_ref)->active_drain++;
3552
3553
3554 HOKE_CONTENT_REF_ALLOC(new_content);
3555 new_content->buffer = NULL;
3556 if( NULL == prev_content ) {
3557 new_content->request = NULL;
3558 } else {
3559 new_content->request = prev_content->request;
3560 if( NULL != new_content->request ) {
3561 OBJ_RETAIN(new_content->request);
3562 }
3563 }
3564 opal_list_append(&(drain_msg_ref->msg_contents), &(new_content->super) );
3565
3566 if( !post_drain ) {
3567 new_content->done = false;
3568 new_content->active = true;
3569 new_content->already_posted = true;
3570 new_content->already_drained = true;
3571
3572 drain_msg_ref->active++;
3573 drain_msg_ref->already_posted++;
3574 } else {
3575 new_content->done = false;
3576 new_content->active = false;
3577 new_content->already_posted = false;
3578 new_content->already_drained = true;
3579
3580
3581
3582
3583
3584
3585 ompi_datatype_duplicate(&(ompi_mpi_packed.dt), &(drain_msg_ref->datatype));
3586
3587
3588 if(drain_msg_ref->count > 0 ) {
3589 new_content->buffer = (void *) malloc(drain_msg_ref->count * drain_msg_ref->ddt_size);
3590 } else {
3591 new_content->buffer = (void *) malloc(1 * drain_msg_ref->ddt_size);
3592 }
3593
3594
3595 }
3596
3597 (*num_posted)++;
3598 }
3599
3600 peer_ref->total_drained_msgs += *num_posted;
3601
3602 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
3603 "crcp:bkmrk: %s <-- %s "
3604 "Added %d messages to the drained list (size = %d)",
3605 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
3606 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
3607 (*num_posted),
3608 (int)opal_list_get_size(&(peer_ref->drained_list)) ));
3609
3610 return OMPI_SUCCESS;
3611 }
3612
3613 static int traffic_message_find_recv(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
3614 int rank, uint32_t comm_id, int tag,
3615 size_t count, size_t datatype_size,
3616 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_recv_msg_ref,
3617 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_irecv_msg_ref,
3618 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_precv_msg_ref,
3619 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_unknown_recv_msg_ref,
3620 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_unknown_precv_msg_ref)
3621 {
3622 int ret;
3623
3624 *posted_recv_msg_ref = NULL;
3625 *posted_irecv_msg_ref = NULL;
3626 *posted_precv_msg_ref = NULL;
3627 *posted_unknown_recv_msg_ref = NULL;
3628 *posted_unknown_precv_msg_ref = NULL;
3629
3630
3631
3632
3633 if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->recv_list),
3634 count, tag, INVALID_INT,
3635 comm_id, datatype_size,
3636 posted_recv_msg_ref,
3637 FIND_MSG_UNKNOWN) ) ) {
3638 opal_output(mca_crcp_bkmrk_component.super.output_handle,
3639 "crcp:bkmrk: traffic_message_find_recv: Unable to find the proper message reference.\n");
3640 return OMPI_ERROR;
3641 }
3642
3643
3644
3645
3646 if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->irecv_list),
3647 count, tag, INVALID_INT,
3648 comm_id, datatype_size,
3649 posted_irecv_msg_ref,
3650 FIND_MSG_UNKNOWN) ) ) {
3651 opal_output(mca_crcp_bkmrk_component.super.output_handle,
3652 "crcp:bkmrk: traffic_message_find_recv: Unable to find the proper message reference.\n");
3653 return OMPI_ERROR;
3654 }
3655
3656
3657
3658
3659 if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->recv_init_list),
3660 count, tag, INVALID_INT,
3661 comm_id, datatype_size,
3662 posted_precv_msg_ref,
3663 FIND_MSG_UNKNOWN) ) ) {
3664 opal_output(mca_crcp_bkmrk_component.super.output_handle,
3665 "crcp:bkmrk: traffic_message_find_recv: Unable to find the proper message reference.\n");
3666 return OMPI_ERROR;
3667 }
3668
3669
3670
3671
3672 if( OMPI_SUCCESS != (ret = traffic_message_find(&(unknown_recv_from_list),
3673 count, tag, INVALID_INT,
3674 comm_id, datatype_size,
3675 posted_unknown_recv_msg_ref,
3676 FIND_MSG_UNKNOWN) ) ) {
3677 opal_output(mca_crcp_bkmrk_component.super.output_handle,
3678 "crcp:bkmrk: traffic_message_find_recv: Unable to find the proper message reference.\n");
3679 return OMPI_ERROR;
3680 }
3681
3682
3683
3684
3685 if( OMPI_SUCCESS != (ret = traffic_message_find(&(unknown_persist_recv_list),
3686 count, tag, INVALID_INT,
3687 comm_id, datatype_size,
3688 posted_unknown_precv_msg_ref,
3689 FIND_MSG_UNKNOWN) ) ) {
3690 opal_output(mca_crcp_bkmrk_component.super.output_handle,
3691 "crcp:bkmrk: traffic_message_find_recv: Unable to find the proper message reference.\n");
3692 return OMPI_ERROR;
3693 }
3694
3695
3696
3697
3698
3699
3700
3701 return OMPI_SUCCESS;
3702 }
3703
3704 static int traffic_message_find(opal_list_t * search_list,
3705 size_t count, int tag, int peer,
3706 uint32_t comm_id, size_t ddt_size,
3707 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** found_msg_ref,
3708 int active )
3709 {
3710 opal_list_item_t* item = NULL;
3711
3712 *found_msg_ref = NULL;
3713
3714 #if OPAL_ENABLE_DEBUG == 1
3715
3716
3717
3718 if( NULL == search_list) {
3719 opal_output(0, "WARNING (Debug): Search_list NULL! (%s:%d)", __FILE__, __LINE__);
3720 return OMPI_ERROR;
3721 }
3722 #endif
3723
3724
3725
3726
3727 for(item = opal_list_get_last(search_list);
3728 item != opal_list_get_begin(search_list);
3729 item = opal_list_get_prev(item) ) {
3730 ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref;
3731 msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)item;
3732
3733 if( active != FIND_MSG_UNKNOWN ) {
3734 if( active == PERSIST_MARKER ) {
3735 if( 0 >= msg_ref->posted ) {
3736 continue;
3737 }
3738 }
3739 else if( (active == FIND_MSG_TRUE && 0 >= (msg_ref->active + msg_ref->active_drain) ) ||
3740 (active == FIND_MSG_FALSE && 0 <= (msg_ref->active + msg_ref->active_drain) ) ) {
3741 continue;
3742 }
3743 }
3744
3745 if(msg_ref->count == count &&
3746 (NULL != msg_ref->comm && msg_ref->comm->c_contextid == comm_id) &&
3747 (msg_ref->tag == MPI_ANY_TAG || msg_ref->tag == tag) &&
3748 (peer == INVALID_INT || msg_ref->rank == peer) &&
3749 msg_ref->ddt_size == ddt_size) {
3750
3751 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
3752 "crcp:bkmrk: traffic_message_find: Found Message -- Comm list (%d, %d)\n",
3753 tag, peer));
3754
3755 *found_msg_ref = msg_ref;
3756 return OMPI_SUCCESS;
3757 }
3758 }
3759
3760 return OMPI_SUCCESS;
3761 }
3762
3763
3764
3765
3766
3767 static int drain_message_append(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
3768 ompi_crcp_bkmrk_pml_message_type_t msg_type,
3769 size_t count, size_t ddt_size,
3770 int tag,int dest,
3771 struct ompi_communicator_t* comm,
3772 ompi_crcp_bkmrk_pml_drain_message_ref_t **msg_ref)
3773 {
3774 int ret, exit_status = OMPI_SUCCESS;
3775 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
3776
3777
3778
3779
3780
3781
3782 if( OMPI_SUCCESS != (ret = drain_message_find(&(peer_ref->drained_list),
3783 count, tag, dest,
3784 comm->c_contextid,
3785 ddt_size,
3786 msg_ref,
3787 &content_ref) ) ) {
3788 opal_output(mca_crcp_bkmrk_component.super.output_handle,
3789 "crcp:bkmrk: traffic_message_append: Unable to find the proper message reference.\n");
3790 return OMPI_ERROR;
3791 }
3792
3793 if( NULL == *msg_ref ) {
3794 CREATE_NEW_DRAIN_MSG((*msg_ref), msg_type,
3795 count, NULL, tag, dest, comm,
3796 peer_ref->proc_name.jobid,
3797 peer_ref->proc_name.vpid);
3798
3799 (*msg_ref)->done = 0;
3800 (*msg_ref)->active = 0;
3801 (*msg_ref)->already_posted = 0;
3802
3803 opal_list_append(&(peer_ref->drained_list), &((*msg_ref)->super));
3804 }
3805
3806
3807 return exit_status;
3808 }
3809
3810 static int drain_message_remove(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
3811 ompi_crcp_bkmrk_pml_drain_message_ref_t *msg_ref,
3812 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref)
3813 {
3814
3815
3816
3817 opal_list_remove_item(&(msg_ref->msg_contents), &(content_ref->super));
3818 HOKE_CONTENT_REF_RETURN(content_ref);
3819
3820
3821
3822
3823
3824 if( 0 >= opal_list_get_size(&(msg_ref->msg_contents) ) ) {
3825 TRAFFIC_MSG_DUMP_DRAIN_MSG_INDV(10, (msg_ref, "D*remove", true));
3826 opal_list_remove_item(&(peer_ref->drained_list), &(msg_ref->super));
3827 HOKE_DRAIN_MSG_REF_RETURN(msg_ref);
3828 } else {
3829 TRAFFIC_MSG_DUMP_DRAIN_MSG_INDV(10, (msg_ref, "Dremove", true));
3830 }
3831
3832 return OMPI_SUCCESS;
3833 }
3834
3835 static int drain_message_check_recv(void **buf, size_t count,
3836 ompi_datatype_t *datatype,
3837 int *src, int *tag,
3838 struct ompi_communicator_t* comm,
3839 struct ompi_request_t **request,
3840 ompi_status_public_t** status,
3841 bool *found_drain)
3842 {
3843 int ret, exit_status = OMPI_SUCCESS;
3844 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref = NULL;
3845 ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref = NULL;
3846 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
3847 size_t tmp_ddt_size = 0;
3848
3849 *found_drain = false;
3850
3851 ompi_datatype_type_size(datatype, &tmp_ddt_size);
3852
3853
3854
3855
3856 if( OMPI_SUCCESS != (ret = drain_message_find_any(count, *tag, *src,
3857 comm, tmp_ddt_size,
3858 &drain_msg_ref,
3859 &content_ref,
3860 &peer_ref) ) ) {
3861 ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: drain_check_recv(): Failed trying to find a drained message.");
3862 exit_status = ret;
3863 goto DONE;
3864 }
3865
3866
3867
3868
3869
3870
3871
3872 if( NULL != drain_msg_ref ) {
3873 OPAL_OUTPUT_VERBOSE((12, mca_crcp_bkmrk_component.super.output_handle,
3874 "crcp:bkmrk: drain_check_recv(): Matched a drained message..."));
3875
3876 if( OMPI_SUCCESS != (ret = drain_message_copy_remove(drain_msg_ref,
3877 content_ref,
3878 src, tag, request, status,
3879 datatype, count, buf,
3880 peer_ref) ) ) {
3881 opal_output( mca_crcp_bkmrk_component.super.output_handle,
3882 "crcp:bkmrk: drain_check_recv(): Datatype copy failed (%d)",
3883 ret);
3884 exit_status = ret;
3885 goto DONE;
3886 }
3887
3888 peer_ref->total_drained_msgs -= 1;
3889
3890 *found_drain = true;
3891 }
3892
3893 DONE:
3894 return exit_status;
3895 }
3896
3897 static int drain_message_find_any(size_t count, int tag, int peer,
3898 struct ompi_communicator_t* comm, size_t ddt_size,
3899 ompi_crcp_bkmrk_pml_drain_message_ref_t ** found_msg_ref,
3900 ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref,
3901 ompi_crcp_bkmrk_pml_peer_ref_t **peer_ref)
3902 {
3903 ompi_crcp_bkmrk_pml_peer_ref_t *cur_peer_ref = NULL;
3904 opal_list_item_t* item = NULL;
3905
3906 *found_msg_ref = NULL;
3907
3908 for(item = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
3909 item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
3910 item = opal_list_get_next(item) ) {
3911 cur_peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
3912
3913
3914
3915
3916
3917 if( MPI_ANY_SOURCE != peer && peer >= 0) {
3918
3919 if( comm->c_local_group->grp_proc_count <= peer ) {
3920 continue;
3921 }
3922
3923 if( OPAL_EQUAL != ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
3924 &(cur_peer_ref->proc_name),
3925 OMPI_CAST_RTE_NAME(&comm->c_local_group->grp_proc_pointers[peer]->super.proc_name))) {
3926 continue;
3927 }
3928 }
3929
3930 drain_message_find(&(cur_peer_ref->drained_list),
3931 count, tag, peer,
3932 comm->c_contextid, ddt_size,
3933 found_msg_ref,
3934 content_ref);
3935 if( NULL != *found_msg_ref) {
3936 if( NULL != peer_ref ) {
3937 *peer_ref = cur_peer_ref;
3938 }
3939 return OMPI_SUCCESS;
3940 }
3941 }
3942
3943 return OMPI_SUCCESS;
3944 }
3945
3946 static int drain_message_find(opal_list_t * search_list,
3947 size_t count, int tag, int peer,
3948 uint32_t comm_id, size_t ddt_size,
3949 ompi_crcp_bkmrk_pml_drain_message_ref_t ** found_msg_ref,
3950 ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref)
3951 {
3952 ompi_crcp_bkmrk_pml_drain_message_ref_t * drain_msg = NULL;
3953 opal_list_item_t* item = NULL;
3954
3955 *found_msg_ref = NULL;
3956 *content_ref = NULL;
3957
3958
3959
3960
3961 if( 0 >= opal_list_get_size(search_list) ) {
3962 return OMPI_SUCCESS;
3963 }
3964
3965 for(item = opal_list_get_first(search_list);
3966 item != opal_list_get_end(search_list);
3967 item = opal_list_get_next(item) ) {
3968 drain_msg = (ompi_crcp_bkmrk_pml_drain_message_ref_t*)item;
3969
3970 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
3971 "crcp:bkmrk: find_drain_msg(): Compare [%d, %d, %d, %d] to [%d, %d, %d, %d]",
3972 (int)ddt_size, (int)count, tag, peer,
3973 (int)drain_msg->ddt_size, (int)drain_msg->count, (int)drain_msg->tag, (int)drain_msg->rank));
3974
3975
3976 if( NULL != drain_msg->comm ) {
3977 if( drain_msg->comm->c_contextid != comm_id ) {
3978 continue;
3979 }
3980 }
3981
3982
3983 if( MPI_ANY_TAG != tag &&
3984 drain_msg->tag != tag) {
3985 continue;
3986 }
3987
3988
3989 if( INVALID_INT != peer ) {
3990 if( MPI_ANY_SOURCE != peer &&
3991 drain_msg->rank != peer) {
3992 continue;
3993 }
3994 }
3995
3996
3997 if( ddt_size != PROBE_ANY_SIZE &&
3998 count != PROBE_ANY_COUNT) {
3999
4000 if((drain_msg->count ) != count ||
4001 (drain_msg->ddt_size) != ddt_size) {
4002 continue;
4003 }
4004 }
4005
4006
4007 *found_msg_ref = drain_msg;
4008 break;
4009 }
4010
4011
4012
4013
4014 if( NULL != *found_msg_ref ) {
4015 drain_message_grab_content((*found_msg_ref), content_ref );
4016
4017
4018 if( NULL == *content_ref ) {
4019 *found_msg_ref = NULL;
4020 }
4021 }
4022
4023 return OMPI_SUCCESS;
4024 }
4025
4026 static int drain_message_grab_content(ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref,
4027 ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref)
4028 {
4029 ompi_crcp_bkmrk_pml_message_content_ref_t *loc_content_ref = NULL;
4030 opal_list_item_t* item = NULL;
4031
4032 *content_ref = NULL;
4033
4034 for(item = opal_list_get_first(&(drain_msg_ref->msg_contents));
4035 item != opal_list_get_end(&(drain_msg_ref->msg_contents));
4036 item = opal_list_get_next(item) ) {
4037 loc_content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)item;
4038
4039
4040
4041 if(NULL != loc_content_ref->buffer) {
4042 *content_ref = loc_content_ref;
4043 break;
4044 }
4045 }
4046
4047 return OMPI_SUCCESS;
4048 }
4049
4050 static int drain_message_copy_remove_persistent(ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref,
4051 ompi_crcp_bkmrk_pml_message_content_ref_t *drain_content_ref,
4052 ompi_crcp_bkmrk_pml_traffic_message_ref_t *traffic_msg_ref,
4053 ompi_request_t *request,
4054 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref)
4055 {
4056 int ret, exit_status = OMPI_SUCCESS;
4057 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
4058
4059
4060
4061
4062 traffic_message_find_mark_persistent(traffic_msg_ref, &request,
4063 false,
4064 false,
4065 &content_ref);
4066
4067
4068 content_ref->request = request;
4069
4070 memcpy(&(content_ref->status), &drain_content_ref->status, sizeof(ompi_status_public_t));
4071
4072 if( 0 != (ret = ompi_datatype_copy_content_same_ddt(drain_msg_ref->datatype,
4073 drain_msg_ref->count,
4074 content_ref->buffer,
4075 drain_content_ref->buffer) ) ) {
4076 opal_output( mca_crcp_bkmrk_component.super.output_handle,
4077 "crcp:bkmrk: drain_message_copy_remove_p(): Datatype copy failed (%d)",
4078 ret);
4079 exit_status = ret;
4080 }
4081
4082
4083 drain_content_ref->request = NULL;
4084 drain_message_remove(peer_ref, drain_msg_ref, drain_content_ref);
4085
4086 return exit_status;
4087 }
4088
4089 static int drain_message_copy_remove(ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref,
4090 ompi_crcp_bkmrk_pml_message_content_ref_t * drain_content_ref,
4091 int *src, int *tag,
4092 struct ompi_request_t **request,
4093 ompi_status_public_t **status,
4094 ompi_datatype_t *datatype, int count, void **buf,
4095 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref)
4096 {
4097 int ret, exit_status = OMPI_SUCCESS;
4098
4099 if( NULL != src ) {
4100 *src = drain_msg_ref->rank;
4101 }
4102
4103 if( NULL != tag ) {
4104 *tag = drain_msg_ref->tag;
4105 }
4106
4107 if( NULL != request ) {
4108 *request = drain_content_ref->request;
4109 OBJ_RETAIN(*request);
4110 }
4111
4112 if( NULL != status && MPI_STATUS_IGNORE != *status ) {
4113 memcpy(*status, &drain_content_ref->status, sizeof(ompi_status_public_t));
4114 }
4115
4116
4117 if( OPAL_LIKELY(NULL != buf) ) {
4118 if( 0 != (ret = ompi_datatype_copy_content_same_ddt(datatype, count,
4119 (void*)buf, drain_content_ref->buffer) ) ) {
4120 opal_output( mca_crcp_bkmrk_component.super.output_handle,
4121 "crcp:bkmrk: drain_message_copy_remove(): Datatype copy failed (%d)",
4122 ret);
4123 exit_status = ret;
4124 }
4125 }
4126 else {
4127 OPAL_OUTPUT_VERBOSE((20, mca_crcp_bkmrk_component.super.output_handle,
4128 "crcp:bkmrk: drain_message_copy_remove(): Skip copy - NULL buffer"));
4129 }
4130
4131
4132 drain_content_ref->request = NULL;
4133 drain_message_remove(peer_ref, drain_msg_ref, drain_content_ref);
4134
4135 return exit_status;
4136 }
4137
4138
4139
4140
4141
4142 static ompi_crcp_bkmrk_pml_peer_ref_t * find_peer(ompi_process_name_t proc)
4143 {
4144 opal_list_item_t* item = NULL;
4145 ompi_rte_cmp_bitmask_t mask;
4146
4147 for(item = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
4148 item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
4149 item = opal_list_get_next(item) ) {
4150 ompi_crcp_bkmrk_pml_peer_ref_t *cur_peer_ref;
4151 cur_peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
4152
4153 mask = OMPI_RTE_CMP_JOBID | OMPI_RTE_CMP_VPID;
4154
4155 if( OPAL_EQUAL == ompi_rte_compare_name_fields(mask,
4156 &(cur_peer_ref->proc_name),
4157 &proc) ) {
4158 return cur_peer_ref;
4159 }
4160 }
4161
4162 return NULL;
4163 }
4164
4165 static int find_peer_in_comm(struct ompi_communicator_t* comm, int proc_idx,
4166 ompi_crcp_bkmrk_pml_peer_ref_t **peer_ref)
4167 {
4168 *peer_ref = find_peer(*(ompi_process_name_t *)&comm->c_remote_group->grp_proc_pointers[proc_idx]->super.proc_name);
4169
4170 if( NULL == *peer_ref) {
4171 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4172 "crcp:bkmrk: find_peer_in_comm(): Failed to find peer_ref - peer_ref is NULL\n");
4173 return OMPI_ERROR;
4174 }
4175
4176 return OMPI_SUCCESS;
4177 }
4178
4179
4180
4181
4182
4183 static int ft_event_coordinate_peers(void)
4184 {
4185 static int step_to_return_to = 0;
4186 int exit_status = OMPI_SUCCESS;
4187 int ret;
4188
4189 if( step_to_return_to == 1 ) {
4190 goto STEP_1;
4191 }
4192
4193
4194
4195
4196 START_TIMER(CRCP_TIMER_CKPT_EX_B);
4197 if( OMPI_SUCCESS != (ret = ft_event_exchange_bookmarks() ) ) {
4198 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4199 "crcp:bkmrk: ft_event_coordinate_peers: Bookmark Exchange Failed %d",
4200 ret);
4201 exit_status = ret;
4202 goto DONE;
4203 }
4204 END_TIMER(CRCP_TIMER_CKPT_EX_B);
4205
4206
4207
4208
4209 START_TIMER(CRCP_TIMER_CKPT_CHECK_B);
4210 if( OMPI_SUCCESS != (ret = ft_event_check_bookmarks() ) ) {
4211 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4212 "crcp:bkmrk: ft_event_coordinate_peers: Bookmark Check Failed %d",
4213 ret);
4214 exit_status = ret;
4215 goto DONE;
4216 }
4217 END_TIMER(CRCP_TIMER_CKPT_CHECK_B);
4218
4219
4220
4221
4222 START_TIMER(CRCP_TIMER_CKPT_POST_DRAIN);
4223 if( OMPI_SUCCESS != (ret = ft_event_post_drain_acks() ) ) {
4224 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4225 "crcp:bkmrk: ft_event_coordinate_peers: Bookmark Post Drain ACKS Failed %d",
4226 ret);
4227 exit_status = ret;
4228 goto DONE;
4229 }
4230
4231 if( OMPI_SUCCESS != (ret = ft_event_post_drained() ) ) {
4232 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4233 "crcp:bkmrk: ft_event_coordinate_peers: Bookmark Post Drain Msgs Failed %d",
4234 ret);
4235 exit_status = ret;
4236 goto DONE;
4237 }
4238 END_TIMER(CRCP_TIMER_CKPT_POST_DRAIN);
4239 DISPLAY_INDV_TIMER(CRCP_TIMER_CKPT_POST_DRAIN, -1, 0);
4240
4241
4242
4243
4244
4245
4246
4247
4248
4249 if( 0 < current_msg_id &&
4250 current_msg_type == COORD_MSG_TYPE_B_SEND) {
4251 stall_for_completion = true;
4252 }
4253 START_TIMER(CRCP_TIMER_CKPT_WAIT_QUI);
4254 if( stall_for_completion ) {
4255 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4256 "crcp:bkmrk: %s **** STALLING %s in PID %d ***",
4257 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4258 (current_msg_type == COORD_MSG_TYPE_B_SEND ? "Send" : "Recv"),
4259 getpid() ));
4260 step_to_return_to = 1;
4261 exit_status = OMPI_SUCCESS;
4262 goto DONE;
4263 }
4264
4265 STEP_1:
4266 step_to_return_to = 0;
4267
4268
4269
4270
4271
4272
4273 if( OMPI_SUCCESS != (ret = ft_event_wait_quiesce() ) ) {
4274 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4275 "crcp:bkmrk: ft_event_coordinate_peers: Wait Quiesce Failed %d",
4276 ret);
4277 exit_status = ret;
4278 goto DONE;
4279 }
4280 END_TIMER(CRCP_TIMER_CKPT_WAIT_QUI);
4281
4282 OPAL_OUTPUT_VERBOSE((5, mca_crcp_bkmrk_component.super.output_handle,
4283 "crcp:bkmrk: %s Coordination Finished...\n",
4284 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME)));
4285
4286
4287
4288
4289
4290
4291
4292
4293
4294
4295 DONE:
4296 return exit_status;
4297 }
4298
4299 static int ft_event_finalize_exchange(void)
4300 {
4301 int exit_status = OMPI_SUCCESS;
4302 opal_list_item_t* item = NULL, *rm_item = NULL;
4303 ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref;
4304 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
4305 opal_list_item_t* cont_item = NULL;
4306
4307
4308
4309
4310 for(item = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
4311 item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
4312 item = opal_list_get_next(item) ) {
4313 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref;
4314 peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
4315
4316 if( OPAL_EQUAL != ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
4317 (OMPI_PROC_MY_NAME),
4318 &(peer_ref->proc_name)) ) {
4319 TRAFFIC_MSG_DUMP_PEER(10, (peer_ref, "finalize_exchange", false));
4320 }
4321
4322 peer_ref->total_msgs_sent = 0;
4323 peer_ref->total_msgs_recvd = 0;
4324
4325 peer_ref->matched_msgs_sent = 0;
4326 peer_ref->matched_msgs_recvd = 0;
4327
4328 peer_ref->ack_required = false;
4329
4330
4331 for(rm_item = opal_list_get_last(&peer_ref->send_list);
4332 rm_item != opal_list_get_begin(&peer_ref->send_list);
4333 rm_item = opal_list_get_prev(rm_item) ) {
4334 msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)rm_item;
4335 msg_ref->matched = 0;
4336 msg_ref->done = 0;
4337 msg_ref->active_drain += msg_ref->active;
4338 msg_ref->active = 0;
4339
4340 for(cont_item = opal_list_get_first(&(msg_ref->msg_contents));
4341 cont_item != opal_list_get_end( &(msg_ref->msg_contents));
4342 cont_item = opal_list_get_next(cont_item) ) {
4343 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)cont_item;
4344 if( content_ref->active ) {
4345 content_ref->already_drained = true;
4346 }
4347 }
4348 }
4349
4350
4351 for(rm_item = opal_list_get_last(&peer_ref->isend_list);
4352 rm_item != opal_list_get_begin(&peer_ref->isend_list);
4353 rm_item = opal_list_get_prev(rm_item) ) {
4354 msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)rm_item;
4355 msg_ref->matched = 0;
4356 msg_ref->done = 0;
4357 msg_ref->active_drain += msg_ref->active;
4358 msg_ref->active = 0;
4359
4360 for(cont_item = opal_list_get_first(&(msg_ref->msg_contents));
4361 cont_item != opal_list_get_end( &(msg_ref->msg_contents));
4362 cont_item = opal_list_get_next(cont_item) ) {
4363 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)cont_item;
4364 if( content_ref->active ) {
4365 content_ref->already_drained = true;
4366 }
4367 }
4368 }
4369
4370
4371 for(rm_item = opal_list_get_last(&peer_ref->send_list);
4372 rm_item != opal_list_get_begin(&peer_ref->send_list);
4373 rm_item = opal_list_get_prev(rm_item) ) {
4374 msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)rm_item;
4375 msg_ref->matched = 0;
4376 msg_ref->done = 0;
4377 msg_ref->active_drain += msg_ref->active;
4378 msg_ref->active = 0;
4379
4380 for(cont_item = opal_list_get_first(&(msg_ref->msg_contents));
4381 cont_item != opal_list_get_end( &(msg_ref->msg_contents));
4382 cont_item = opal_list_get_next(cont_item) ) {
4383 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)cont_item;
4384 if( content_ref->active ) {
4385 content_ref->already_drained = true;
4386 }
4387 }
4388 }
4389
4390
4391 for(rm_item = opal_list_get_last(&peer_ref->recv_list);
4392 rm_item != opal_list_get_begin(&peer_ref->recv_list);
4393 rm_item = opal_list_get_prev(rm_item) ) {
4394 msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)rm_item;
4395 msg_ref->matched = 0;
4396 msg_ref->done = 0;
4397 }
4398
4399
4400 for(rm_item = opal_list_get_last(&peer_ref->irecv_list);
4401 rm_item != opal_list_get_begin(&peer_ref->irecv_list);
4402 rm_item = opal_list_get_prev(rm_item) ) {
4403 msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)rm_item;
4404 msg_ref->matched = 0;
4405 msg_ref->done = 0;
4406 }
4407
4408
4409 for(rm_item = opal_list_get_last(&peer_ref->recv_list);
4410 rm_item != opal_list_get_begin(&peer_ref->recv_list);
4411 rm_item = opal_list_get_prev(rm_item) ) {
4412 msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)rm_item;
4413 msg_ref->matched = 0;
4414 msg_ref->done = 0;
4415 }
4416 }
4417
4418 return exit_status;
4419 }
4420
4421 static int ft_event_exchange_bookmarks(void)
4422 {
4423 int peer_idx = 0;
4424 int my_idx = OMPI_PROC_MY_NAME->vpid;
4425 int iter = 0;
4426 int num_peers = 0;
4427
4428 num_peers = opal_list_get_size(&ompi_crcp_bkmrk_pml_peer_refs);
4429
4430 for( peer_idx = (num_peers - my_idx - 1), iter = 0;
4431 iter < num_peers;
4432 peer_idx = (peer_idx + 1) % num_peers, ++iter)
4433 {
4434 if(my_idx > peer_idx) {
4435
4436 send_bookmarks(peer_idx);
4437
4438 recv_bookmarks(peer_idx);
4439 }
4440 else if(my_idx < peer_idx) {
4441
4442 recv_bookmarks(peer_idx);
4443
4444 send_bookmarks(peer_idx);
4445 }
4446 }
4447
4448
4449 START_TIMER(CRCP_TIMER_CKPT_EX_WAIT);
4450 while( total_recv_bookmarks > 0 ) {
4451 opal_event_loop(opal_sync_event_base, OPAL_EVLOOP_NONBLOCK);
4452 }
4453 total_recv_bookmarks = 0;
4454 END_TIMER(CRCP_TIMER_CKPT_EX_WAIT);
4455
4456 return OMPI_SUCCESS;
4457 }
4458
4459 static int ft_event_check_bookmarks(void)
4460 {
4461 opal_list_item_t* item = NULL;
4462 int ret;
4463 int p_n_to_p_m = 0;
4464 int p_n_from_p_m = 0;
4465
4466 if( 10 <= mca_crcp_bkmrk_component.super.verbose ) {
4467 sleep(OMPI_PROC_MY_NAME->vpid);
4468 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4469 "---------------------------------------------"));
4470 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4471 "Process %s Match Table",
4472 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME)));
4473 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4474 "%s %5s | %7s | %7s | %7s | %7s |",
4475 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4476 "Vpid", "T_Send", "M_Recv", "M_Send", "T_Recv"));
4477
4478 for(item = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
4479 item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
4480 item = opal_list_get_next(item) ) {
4481 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref;
4482 int t_send, m_send;
4483 int t_recv, m_recv;
4484 peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
4485
4486 t_send = peer_ref->total_msgs_sent;
4487 m_send = peer_ref->matched_msgs_sent;
4488 t_recv = peer_ref->total_msgs_recvd;
4489 m_recv = peer_ref->matched_msgs_recvd;
4490
4491 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4492 "%s %5d | %7d | %7d | %7d | %7d |",
4493 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4494 peer_ref->proc_name.vpid,
4495 t_send, m_recv, m_send, t_recv));
4496 }
4497 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4498 "---------------------------------------------"));
4499 }
4500
4501
4502
4503
4504
4505
4506 for(item = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
4507 item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
4508 item = opal_list_get_next(item) ) {
4509 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref;
4510 peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
4511
4512 if( OPAL_EQUAL == ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
4513 (OMPI_PROC_MY_NAME),
4514 &(peer_ref->proc_name)) ) {
4515 continue;
4516 }
4517
4518 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Bookmark Details --", false));
4519
4520
4521 if( OMPI_PROC_MY_NAME->vpid < peer_ref->proc_name.vpid ) {
4522
4523
4524
4525
4526 p_n_to_p_m = peer_ref->total_msgs_sent;
4527 p_n_from_p_m = peer_ref->matched_msgs_recvd;
4528
4529
4530 if( p_n_to_p_m < p_n_from_p_m ) {
4531 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4532 "crcp:bkmrk: %s --> %s "
4533 "Total Sent (%4d) = Matched Recv. (%4d) => Diff (%4d). "
4534 " WARNING: Peer received more than was sent. :(\n",
4535 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4536 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4537 p_n_to_p_m,
4538 p_n_from_p_m,
4539 (p_n_to_p_m - p_n_from_p_m)
4540 );
4541 }
4542
4543
4544
4545 if( p_n_to_p_m > p_n_from_p_m) {
4546 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4547 "crcp:bkmrk: %s --> %s "
4548 "Total Sent (%4d) = Matched Recv. (%4d). Peer needs %4d.\n",
4549 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4550 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4551 p_n_to_p_m,
4552 p_n_from_p_m,
4553 (p_n_to_p_m - p_n_from_p_m)
4554 ));
4555
4556
4557
4558
4559
4560 if( OMPI_SUCCESS != (ret = send_msg_details(peer_ref, p_n_to_p_m, p_n_from_p_m) ) ) {
4561 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4562 "crcp:bkmrk: check_bookmarks: Unable to send message details to peer %s: Return %d\n",
4563 OMPI_NAME_PRINT(&peer_ref->proc_name),
4564 ret);
4565 return ret;
4566 }
4567 }
4568
4569
4570
4571
4572
4573 p_n_to_p_m = peer_ref->matched_msgs_sent;
4574 p_n_from_p_m = peer_ref->total_msgs_recvd;
4575
4576
4577 if( p_n_to_p_m < p_n_from_p_m ) {
4578 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4579 "crcp:bkmrk: %s --> %s "
4580 "Matched Sent (%4d) = Total Recv. (%4d) => Diff (%4d). "
4581 " WARNING: I received more than the peer sent. :(\n",
4582 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4583 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4584 p_n_to_p_m,
4585 p_n_from_p_m,
4586 (p_n_to_p_m - p_n_from_p_m)
4587 );
4588 }
4589
4590
4591
4592 if( p_n_to_p_m > p_n_from_p_m) {
4593 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4594 "crcp:bkmrk: %s <-- %s "
4595 "Matched Sent (%4d) = Total Recv. (%4d). I need %4d.\n",
4596 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4597 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4598 p_n_to_p_m,
4599 p_n_from_p_m,
4600 (p_n_to_p_m - p_n_from_p_m)
4601 ));
4602
4603
4604
4605
4606 if( OMPI_SUCCESS != (ret = recv_msg_details(peer_ref, p_n_to_p_m, p_n_from_p_m) ) ) {
4607 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4608 "crcp:bkmrk: check_bookmarks: Unable to recv message details from peer %s: Return %d\n",
4609 OMPI_NAME_PRINT(&peer_ref->proc_name),
4610 ret);
4611 return ret;
4612 }
4613 }
4614 }
4615
4616 else {
4617
4618
4619
4620
4621 p_n_to_p_m = peer_ref->matched_msgs_sent;
4622 p_n_from_p_m = peer_ref->total_msgs_recvd;
4623
4624
4625 if( p_n_to_p_m < p_n_from_p_m ) {
4626 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4627 "crcp:bkmrk: %s --> %s "
4628 "Matched Sent (%4d) = Total Recv. (%4d) => Diff (%4d). "
4629 " WARNING: I received more than the peer sent. :(\n",
4630 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4631 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4632 p_n_to_p_m,
4633 p_n_from_p_m,
4634 (p_n_to_p_m - p_n_from_p_m)
4635 );
4636 }
4637
4638
4639
4640 if( p_n_to_p_m > p_n_from_p_m) {
4641 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4642 "crcp:bkmrk: %s <-- %s "
4643 "Matched Sent (%4d) = Total Recv. (%4d). I need %4d.\n",
4644 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4645 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4646 p_n_to_p_m,
4647 p_n_from_p_m,
4648 (p_n_to_p_m - p_n_from_p_m)
4649 ));
4650
4651
4652
4653
4654 if( OMPI_SUCCESS != (ret = recv_msg_details(peer_ref, p_n_to_p_m, p_n_from_p_m) ) ) {
4655 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4656 "crcp:bkmrk: check_bookmarks: Unable to recv message details from peer %s: Return %d\n",
4657 OMPI_NAME_PRINT(&peer_ref->proc_name),
4658 ret);
4659 return ret;
4660 }
4661 }
4662
4663
4664
4665
4666
4667 p_n_to_p_m = peer_ref->total_msgs_sent;
4668 p_n_from_p_m = peer_ref->matched_msgs_recvd;
4669
4670
4671 if( p_n_to_p_m < p_n_from_p_m ) {
4672 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4673 "crcp:bkmrk: %s --> %s "
4674 "Total Sent (%4d) = Matched Recv. (%4d) => Diff (%4d). "
4675 " WARNING: Peer received more than was sent. :(\n",
4676 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4677 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4678 p_n_to_p_m,
4679 p_n_from_p_m,
4680 (p_n_to_p_m - p_n_from_p_m)
4681 );
4682 }
4683
4684
4685
4686 if( p_n_to_p_m > p_n_from_p_m) {
4687 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4688 "crcp:bkmrk: %s --> %s "
4689 "Total Sent (%4d) = Matched Recv. (%4d). Peer needs %4d.\n",
4690 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4691 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4692 p_n_to_p_m,
4693 p_n_from_p_m,
4694 (p_n_to_p_m - p_n_from_p_m)
4695 ));
4696
4697
4698
4699
4700
4701 if( OMPI_SUCCESS != (ret = send_msg_details(peer_ref, p_n_to_p_m, p_n_from_p_m) ) ) {
4702 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4703 "crcp:bkmrk: check_bookmarks: Unable to send message details to peer %s: Return %d\n",
4704 OMPI_NAME_PRINT(&peer_ref->proc_name),
4705 ret);
4706 return ret;
4707 }
4708 }
4709 }
4710 }
4711
4712 return OMPI_SUCCESS;
4713 }
4714
4715 static int ft_event_post_drain_acks(void)
4716 {
4717 ompi_crcp_bkmrk_pml_drain_message_ack_ref_t * drain_msg_ack = NULL;
4718 opal_list_item_t* item = NULL;
4719 size_t req_size;
4720
4721 req_size = opal_list_get_size(&drained_msg_ack_list);
4722 if(req_size <= 0) {
4723 return OMPI_SUCCESS;
4724 }
4725
4726 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4727 "crcp:bkmrk: %s Wait on %d Drain ACK Messages.\n",
4728 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4729 (int)req_size));
4730
4731
4732
4733
4734
4735 for(item = opal_list_get_first(&drained_msg_ack_list);
4736 item != opal_list_get_end(&drained_msg_ack_list);
4737 item = opal_list_get_next(item) ) {
4738 drain_msg_ack = (ompi_crcp_bkmrk_pml_drain_message_ack_ref_t*)item;
4739
4740
4741 ompi_rte_recv_buffer_nb(&drain_msg_ack->peer, OMPI_CRCP_COORD_BOOKMARK_TAG,
4742 0, drain_message_ack_cbfunc, NULL);
4743 }
4744
4745 return OMPI_SUCCESS;
4746 }
4747
4748 static void drain_message_ack_cbfunc(int status,
4749 ompi_process_name_t* sender,
4750 opal_buffer_t *buffer,
4751 ompi_rml_tag_t tag,
4752 void* cbdata)
4753 {
4754 int ret, exit_status = OMPI_SUCCESS;
4755 opal_list_item_t* item = NULL;
4756 size_t ckpt_status;
4757
4758
4759
4760
4761 UNPACK_BUFFER(buffer, ckpt_status, 1, OPAL_SIZE, "");
4762
4763
4764
4765
4766 for(item = opal_list_get_first(&drained_msg_ack_list);
4767 item != opal_list_get_end(&drained_msg_ack_list);
4768 item = opal_list_get_next(item) ) {
4769 ompi_crcp_bkmrk_pml_drain_message_ack_ref_t * drain_msg_ack;
4770 drain_msg_ack = (ompi_crcp_bkmrk_pml_drain_message_ack_ref_t*)item;
4771
4772
4773 if(!drain_msg_ack->complete) {
4774
4775 if( OPAL_EQUAL == ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
4776 &(drain_msg_ack->peer),
4777 sender) ) {
4778
4779 drain_msg_ack->complete = true;
4780 OPAL_OUTPUT_VERBOSE((5, mca_crcp_bkmrk_component.super.output_handle,
4781 "crcp:bkmrk: %s --> %s Received ACK of FLUSH from peer\n",
4782 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4783 OMPI_NAME_PRINT(sender) ));
4784 return;
4785 }
4786 }
4787 }
4788
4789 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4790 "crcp:bkmrk: %s --> %s ERROR: Unable to match ACK to peer (%d)\n",
4791 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4792 OMPI_NAME_PRINT(sender), exit_status);
4793
4794 cleanup:
4795 return;
4796 }
4797
4798 static int ft_event_post_drained(void)
4799 {
4800 int ret, exit_status = OMPI_SUCCESS;
4801 ompi_crcp_bkmrk_pml_peer_ref_t *cur_peer_ref = NULL;
4802 ompi_crcp_bkmrk_pml_drain_message_ref_t * drain_msg_ref = NULL;
4803 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
4804 opal_list_item_t* item = NULL, *d_item = NULL, *c_item = NULL;
4805 int i, total_number_to_drain = 0, peer_total = 0;
4806
4807
4808 for(item = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
4809 item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
4810 item = opal_list_get_next(item) ) {
4811 cur_peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
4812
4813 if( OPAL_EQUAL == ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
4814 (OMPI_PROC_MY_NAME),
4815 &(cur_peer_ref->proc_name)) ) {
4816 continue;
4817 }
4818
4819 for(d_item = opal_list_get_first(&(cur_peer_ref->drained_list));
4820 d_item != opal_list_get_end(&(cur_peer_ref->drained_list));
4821 d_item = opal_list_get_next(d_item) ) {
4822 drain_msg_ref = (ompi_crcp_bkmrk_pml_drain_message_ref_t*)d_item;
4823
4824 for(c_item = opal_list_get_first(&(drain_msg_ref->msg_contents));
4825 c_item != opal_list_get_end(&(drain_msg_ref->msg_contents));
4826 c_item = opal_list_get_next(c_item) ) {
4827 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)c_item;
4828
4829 if( !content_ref->done ) {
4830 ++total_number_to_drain;
4831 }
4832 }
4833 }
4834 }
4835
4836
4837
4838
4839 if( 0 >= total_number_to_drain ) {
4840 return OMPI_SUCCESS;
4841 }
4842
4843
4844 if( NULL != quiesce_requests ) {
4845 free(quiesce_requests);
4846 quiesce_requests = NULL;
4847 }
4848 quiesce_requests = (ompi_request_t **)malloc( (total_number_to_drain) * sizeof(ompi_request_t *));
4849 if( NULL == quiesce_requests){
4850 exit_status = OMPI_ERROR;
4851 goto cleanup;
4852 }
4853
4854 if( NULL != quiesce_statuses ) {
4855 free(quiesce_statuses);
4856 quiesce_statuses = NULL;
4857 }
4858 quiesce_statuses = (ompi_status_public_t **)malloc( (total_number_to_drain) * sizeof(ompi_status_public_t *));
4859 if( NULL == quiesce_statuses){
4860 exit_status = OMPI_ERROR;
4861 goto cleanup;
4862 }
4863
4864
4865 for(i = 0; i < (total_number_to_drain); ++i) {
4866 quiesce_requests[i] = &(ompi_request_null.request);
4867 quiesce_statuses[i] = &ompi_status_empty;
4868 }
4869 quiesce_request_count = 0;
4870
4871
4872 for(item = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
4873 item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
4874 item = opal_list_get_next(item) ) {
4875 cur_peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
4876 peer_total = 0;
4877
4878 if( OPAL_EQUAL == ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
4879 (OMPI_PROC_MY_NAME),
4880 &(cur_peer_ref->proc_name)) ) {
4881 continue;
4882 }
4883
4884 for(d_item = opal_list_get_first(&(cur_peer_ref->drained_list));
4885 d_item != opal_list_get_end(&(cur_peer_ref->drained_list));
4886 d_item = opal_list_get_next(d_item) ) {
4887 drain_msg_ref = (ompi_crcp_bkmrk_pml_drain_message_ref_t*)d_item;
4888
4889 for(c_item = opal_list_get_first(&(drain_msg_ref->msg_contents));
4890 c_item != opal_list_get_end(&(drain_msg_ref->msg_contents));
4891 c_item = opal_list_get_next(c_item) ) {
4892 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)c_item;
4893
4894 if( content_ref->done ) {
4895 continue;
4896 }
4897
4898 if( OMPI_SUCCESS != (ret = ft_event_post_drain_message(drain_msg_ref, content_ref) ) ) {
4899 exit_status = ret;
4900 goto cleanup;
4901 }
4902
4903 cur_peer_ref->ack_required = true;
4904
4905
4906 if( NULL != content_ref->request) {
4907 quiesce_requests[quiesce_request_count] = content_ref->request;
4908 quiesce_statuses[quiesce_request_count] = &content_ref->status;
4909 quiesce_request_count++;
4910 peer_total++;
4911 }
4912
4913 else if( content_ref->already_posted ) {
4914 stall_for_completion = true;
4915 }
4916 else {
4917 ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: ft_event_post_drained(): Found a drain message with a NULL request.");
4918 }
4919 }
4920 }
4921
4922 if( peer_total > 0 || stall_for_completion ) {
4923 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4924 "crcp:bkmrk: %s <-- %s Will be draining %4d messages from this peer. Total %4d %s\n",
4925 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4926 OMPI_NAME_PRINT(&(cur_peer_ref->proc_name)),
4927 peer_total,
4928 quiesce_request_count,
4929 (stall_for_completion ? "(And Stalling)" : "") ));
4930 }
4931 }
4932
4933 cleanup:
4934 return exit_status;
4935 }
4936
4937 static int ft_event_post_drain_message(ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref,
4938 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref)
4939 {
4940 int ret;
4941
4942
4943
4944
4945
4946 if( content_ref->done ) {
4947 return OMPI_SUCCESS;
4948 }
4949
4950
4951
4952
4953 if( content_ref->already_posted ) {
4954 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
4955 "crcp:bkmrk: %s <-- %s Found a message that we do not need to post.\n",
4956 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4957 OMPI_NAME_PRINT(&(drain_msg_ref->proc_name)) ));
4958 return OMPI_SUCCESS;
4959 }
4960
4961
4962 content_ref->active = true;
4963 drain_msg_ref->active++;
4964
4965
4966
4967
4968 OPAL_OUTPUT_VERBOSE((20, mca_crcp_bkmrk_component.super.output_handle,
4969 "crcp:bkmrk: %s <-- %s Posting a message to be drained from rank %d.\n",
4970 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4971 OMPI_NAME_PRINT(&(drain_msg_ref->proc_name)),
4972 drain_msg_ref->rank));
4973 if( OMPI_SUCCESS != (ret = wrapped_pml_module->pml_irecv(content_ref->buffer,
4974 (drain_msg_ref->count * drain_msg_ref->ddt_size),
4975 drain_msg_ref->datatype,
4976 drain_msg_ref->rank,
4977 drain_msg_ref->tag,
4978 drain_msg_ref->comm,
4979 &(content_ref->request) ) ) ) {
4980 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4981 "crcp:bkmrk: %s <-- %s Failed to post the Draining PML iRecv\n",
4982 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4983 OMPI_NAME_PRINT(&(drain_msg_ref->proc_name)) );
4984 return ret;
4985 }
4986
4987 return OMPI_SUCCESS;
4988 }
4989
4990 static int ft_event_wait_quiesce(void)
4991 {
4992 int exit_status = OMPI_SUCCESS;
4993 int ret;
4994
4995
4996
4997
4998 if( OMPI_SUCCESS != (ret = wait_quiesce_drained() ) ) {
4999 opal_output(mca_crcp_bkmrk_component.super.output_handle,
5000 "crcp:bkmrk: wait_quiesce: %s Failed to quiesce drained messages\n",
5001 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME) );
5002 exit_status = ret;
5003 goto cleanup;
5004 }
5005
5006
5007
5008
5009 if( OMPI_SUCCESS != (ret = wait_quiesce_drain_ack() ) ) {
5010 opal_output(mca_crcp_bkmrk_component.super.output_handle,
5011 "crcp:bkmrk: wait_quiesce: %s Failed to recv all drain ACKs\n",
5012 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME) );
5013 exit_status = ret;
5014 goto cleanup;
5015 }
5016
5017 cleanup:
5018 return exit_status;
5019 }
5020
5021 static int wait_quiesce_drained(void)
5022 {
5023 int ret, exit_status = OMPI_SUCCESS;
5024 ompi_crcp_bkmrk_pml_peer_ref_t *cur_peer_ref = NULL;
5025 ompi_crcp_bkmrk_pml_drain_message_ref_t * drain_msg_ref = NULL;
5026 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
5027 opal_list_item_t* item = NULL, *d_item = NULL, *d_next = NULL, *c_item = NULL, *c_next = NULL;
5028 bool prev_stall = false;
5029
5030
5031
5032 OPAL_OUTPUT_VERBOSE((5, mca_crcp_bkmrk_component.super.output_handle,
5033 "crcp:bkmrk: %s Waiting on %d messages to drain\n",
5034 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5035 (int)quiesce_request_count));
5036
5037
5038
5039
5040
5041 prev_stall = opal_cr_stall_check;
5042 opal_cr_stall_check = true;
5043 if( OMPI_SUCCESS != (ret = coord_request_wait_all(quiesce_request_count,
5044 quiesce_requests,
5045 quiesce_statuses) ) ) {
5046 exit_status = ret;
5047 goto cleanup;
5048 }
5049 opal_cr_stall_check = prev_stall;
5050
5051
5052
5053
5054
5055
5056
5057
5058 for(item = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
5059 item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
5060 item = opal_list_get_next(item) ) {
5061 cur_peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
5062
5063 if( OPAL_EQUAL == ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
5064 (OMPI_PROC_MY_NAME),
5065 &(cur_peer_ref->proc_name)) ) {
5066 continue;
5067 }
5068
5069
5070
5071
5072 if( cur_peer_ref->ack_required ) {
5073 opal_buffer_t *buffer = NULL;
5074 size_t response = 1;
5075
5076 OPAL_OUTPUT_VERBOSE((5, mca_crcp_bkmrk_component.super.output_handle,
5077 "crcp:bkmrk: %s --> %s Send ACKs to Peer\n",
5078 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5079 OMPI_NAME_PRINT(&(cur_peer_ref->proc_name)) ));
5080
5081
5082 if (NULL == (buffer = OBJ_NEW(opal_buffer_t))) {
5083 exit_status = OMPI_ERROR;
5084 goto cleanup;
5085 }
5086
5087 PACK_BUFFER(buffer, response, 1, OPAL_SIZE, "");
5088
5089
5090 if (ORTE_SUCCESS != (ret = ompi_rte_send_buffer_nb(&(cur_peer_ref->proc_name),
5091 buffer, OMPI_CRCP_COORD_BOOKMARK_TAG,
5092 orte_rml_send_callback, NULL))) {
5093 exit_status = ret;
5094 goto cleanup;
5095 }
5096 if( NULL != buffer) {
5097 OBJ_RELEASE(buffer);
5098 buffer = NULL;
5099 }
5100 }
5101
5102 cur_peer_ref->ack_required = false;
5103
5104
5105
5106
5107 for(d_item = opal_list_get_first(&(cur_peer_ref->drained_list));
5108 d_item != opal_list_get_end(&(cur_peer_ref->drained_list));
5109 d_item = d_next ) {
5110 drain_msg_ref = (ompi_crcp_bkmrk_pml_drain_message_ref_t*)d_item;
5111 d_next = opal_list_get_next(d_item);
5112
5113 for(c_item = opal_list_get_first(&(drain_msg_ref->msg_contents));
5114 c_item != opal_list_get_end(&(drain_msg_ref->msg_contents));
5115 c_item = c_next ) {
5116 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)c_item;
5117 c_next = opal_list_get_next(c_item);
5118
5119
5120
5121
5122
5123 if( content_ref->done ) {
5124 continue;
5125 }
5126
5127 if( content_ref->already_posted ) {
5128 drain_message_remove(cur_peer_ref, drain_msg_ref, content_ref);
5129
5130
5131 drain_msg_ref->active--;
5132 drain_msg_ref->already_posted--;
5133 } else {
5134 content_ref->done = true;
5135 content_ref->active = false;
5136
5137
5138 drain_msg_ref->done++;
5139 drain_msg_ref->active--;
5140 }
5141 }
5142 }
5143 }
5144
5145 cleanup:
5146 if( NULL != quiesce_requests ) {
5147 free(quiesce_requests);
5148 quiesce_requests = NULL;
5149 }
5150
5151 if( NULL != quiesce_statuses ) {
5152 free(quiesce_statuses);
5153 quiesce_statuses = NULL;
5154 }
5155
5156 quiesce_request_count = 0;
5157
5158 return exit_status;
5159 }
5160
5161 static int coord_request_wait_all( size_t count,
5162 ompi_request_t ** requests,
5163 ompi_status_public_t ** statuses )
5164 {
5165 int exit_status = OMPI_SUCCESS;
5166 ompi_status_public_t * status;
5167 ompi_request_t *req;
5168 size_t i;
5169
5170
5171
5172
5173 for( i = 0; i < count; ++i) {
5174 req = requests[i];
5175 status = statuses[i];
5176
5177 coord_request_wait(req, status);
5178
5179 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
5180 "crcp:bkmrk: %s Request Wait: Done with idx %d of %d\n",
5181 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5182 (int)i, (int)count));
5183 }
5184
5185 return exit_status;
5186 }
5187
5188 static int coord_request_wait( ompi_request_t * req,
5189 ompi_status_public_t * status)
5190 {
5191 ompi_request_wait_completion(req);
5192
5193 if( MPI_STATUS_IGNORE != status ) {
5194 status->MPI_TAG = req->req_status.MPI_TAG;
5195 status->MPI_SOURCE = req->req_status.MPI_SOURCE;
5196 status->_cancelled = req->req_status._cancelled;
5197 status->_ucount = req->req_status._ucount;
5198 }
5199
5200 return OMPI_SUCCESS;
5201 }
5202
5203 static int wait_quiesce_drain_ack(void)
5204 {
5205 opal_list_item_t* item = NULL;
5206 opal_list_item_t* next = NULL;
5207 ompi_crcp_bkmrk_pml_drain_message_ack_ref_t * drain_msg_ack;
5208 int num_outstanding;
5209
5210
5211
5212 num_outstanding = opal_list_get_size(&drained_msg_ack_list);
5213 if(num_outstanding <= 0) {
5214
5215 return OMPI_SUCCESS;
5216 }
5217
5218 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5219 "crcp:bkmrk: %s Waiting on %d Drain ACK messages\n",
5220 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5221 num_outstanding));
5222
5223 while(0 < num_outstanding) {
5224 for(item = opal_list_get_first(&drained_msg_ack_list);
5225 item != opal_list_get_end(&drained_msg_ack_list);
5226 item = next) {
5227 drain_msg_ack = (ompi_crcp_bkmrk_pml_drain_message_ack_ref_t*)item;
5228 next = opal_list_get_next(item);
5229
5230 if(drain_msg_ack->complete) {
5231 num_outstanding--;
5232 opal_list_remove_item(&drained_msg_ack_list, &(drain_msg_ack->super) );
5233 HOKE_DRAIN_ACK_MSG_REF_RETURN(item);
5234 break;
5235 }
5236 }
5237
5238 opal_event_loop(opal_sync_event_base, OPAL_EVLOOP_NONBLOCK);
5239 }
5240
5241
5242 while (NULL != (item = opal_list_remove_first(&drained_msg_ack_list) ) ) {
5243 HOKE_DRAIN_ACK_MSG_REF_RETURN(item);
5244 }
5245
5246 return OMPI_SUCCESS;
5247 }
5248
5249
5250 static int send_bookmarks(int peer_idx)
5251 {
5252 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref;
5253 ompi_process_name_t peer_name;
5254 opal_buffer_t *buffer = NULL;
5255 int exit_status = OMPI_SUCCESS;
5256 int ret;
5257
5258 START_TIMER(CRCP_TIMER_CKPT_EX_PEER_S);
5259
5260
5261
5262 peer_name.jobid = OMPI_PROC_MY_NAME->jobid;
5263 peer_name.vpid = peer_idx;
5264
5265 if( NULL == (peer_ref = find_peer(peer_name))) {
5266 opal_output(mca_crcp_bkmrk_component.super.output_handle,
5267 "crcp:bkmrk: send_bookmarks: Error: Could not find peer indexed %d\n",
5268 peer_idx);
5269 exit_status = OMPI_ERROR;
5270 goto cleanup;
5271 }
5272
5273 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
5274 "crcp:bkmrk: %s --> %s Sending bookmark (S[%6d] R[%6d])\n",
5275 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5276 OMPI_NAME_PRINT(&peer_name),
5277 peer_ref->total_msgs_sent,
5278 peer_ref->total_msgs_recvd));
5279
5280
5281
5282
5283 if (NULL == (buffer = OBJ_NEW(opal_buffer_t))) {
5284 exit_status = OMPI_ERROR;
5285 goto cleanup;
5286 }
5287
5288 PACK_BUFFER(buffer, (peer_ref->total_msgs_sent), 1, OPAL_UINT32,
5289 "crcp:bkmrk: send_bookmarks: Unable to pack total_msgs_sent");
5290 PACK_BUFFER(buffer, (peer_ref->total_msgs_recvd), 1, OPAL_UINT32,
5291 "crcp:bkmrk: send_bookmarks: Unable to pack total_msgs_recvd");
5292
5293 if (ORTE_SUCCESS != (ret = ompi_rte_send_buffer_nb(&peer_name, buffer,
5294 OMPI_CRCP_COORD_BOOKMARK_TAG,
5295 orte_rml_send_callback, NULL))) {
5296 opal_output(mca_crcp_bkmrk_component.super.output_handle,
5297 "crcp:bkmrk: send_bookmarks: Failed to send bookmark to peer %s: Return %d\n",
5298 OMPI_NAME_PRINT(&peer_name),
5299 ret);
5300 exit_status = ret;
5301 goto cleanup;
5302 }
5303
5304 cleanup:
5305 if(NULL != buffer) {
5306 OBJ_RELEASE(buffer);
5307 }
5308
5309 END_TIMER(CRCP_TIMER_CKPT_EX_PEER_S);
5310 DISPLAY_INDV_TIMER(CRCP_TIMER_CKPT_EX_PEER_S, peer_idx, 1);
5311
5312 return exit_status;
5313 }
5314
5315
5316 static int recv_bookmarks(int peer_idx)
5317 {
5318 ompi_process_name_t peer_name;
5319
5320 START_TIMER(CRCP_TIMER_CKPT_EX_PEER_R);
5321
5322 peer_name.jobid = OMPI_PROC_MY_NAME->jobid;
5323 peer_name.vpid = peer_idx;
5324
5325 ompi_rte_recv_buffer_nb(&peer_name, OMPI_CRCP_COORD_BOOKMARK_TAG,
5326 0, recv_bookmarks_cbfunc, NULL);
5327
5328 ++total_recv_bookmarks;
5329
5330 END_TIMER(CRCP_TIMER_CKPT_EX_PEER_R);
5331
5332
5333
5334 return OMPI_SUCCESS;
5335 }
5336
5337 static void recv_bookmarks_cbfunc(int status,
5338 ompi_process_name_t* sender,
5339 opal_buffer_t *buffer,
5340 ompi_rml_tag_t tag,
5341 void* cbdata)
5342 {
5343 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref;
5344 int exit_status = OMPI_SUCCESS;
5345 int ret, tmp_int;
5346 ompi_vpid_t peer_idx;
5347
5348 peer_idx = sender->vpid;
5349
5350
5351
5352
5353 if( NULL == (peer_ref = find_peer(*sender))) {
5354 opal_output(mca_crcp_bkmrk_component.super.output_handle,
5355 "crcp:bkmrk: recv_bookmarks: Could not find peer indexed %d\n",
5356 peer_idx);
5357 exit_status = OMPI_ERROR;
5358 goto cleanup;
5359 }
5360
5361 UNPACK_BUFFER(buffer, tmp_int, 1, OPAL_UINT32,
5362 "crcp:bkmrk: recv_bookmarks: Unable to unpack total_msgs_sent");
5363 peer_ref->matched_msgs_sent = tmp_int;
5364
5365 UNPACK_BUFFER(buffer, tmp_int, 1, OPAL_UINT32,
5366 "crcp:bkmrk: recv_bookmarks: Unable to unpack total_msgs_recvd");
5367 peer_ref->matched_msgs_recvd = tmp_int;
5368
5369 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
5370 "crcp:bkmrk: %s <-- %s Received bookmark (S[%6d] R[%6d]) vs. (S[%6d] R[%6d]) (%d)\n",
5371 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5372 OMPI_NAME_PRINT(sender),
5373 peer_ref->matched_msgs_sent,
5374 peer_ref->matched_msgs_recvd,
5375 peer_ref->total_msgs_sent,
5376 peer_ref->total_msgs_recvd,
5377 exit_status));
5378
5379 cleanup:
5380 --total_recv_bookmarks;
5381
5382 return;
5383 }
5384
5385 static int send_msg_details(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
5386 int total_sent, int total_matched)
5387 {
5388 int ret, exit_status = OMPI_SUCCESS;
5389 ompi_crcp_bkmrk_pml_drain_message_ack_ref_t * d_msg_ack = NULL;
5390 opal_list_t *search_list = NULL;
5391 opal_list_item_t* msg_item = NULL;
5392 bool finished;
5393 int pass_num = 1;
5394 int need, found;
5395 int total_details_sent = 0;
5396 int num_matches = 0;
5397 int p_total_found = 0;
5398
5399 need = total_sent - total_matched;
5400 found = 0;
5401 finished = false;
5402 assert(need > 0);
5403
5404 START_TIMER(CRCP_TIMER_CKPT_CHECK_PEER_S);
5405
5406
5407
5408
5409 search_list = &(peer_ref->send_list);
5410 pass_num = 1;
5411
5412 SEARCH_AGAIN:
5413 for(msg_item = opal_list_get_last(search_list);
5414 msg_item != opal_list_get_begin(search_list);
5415 msg_item = opal_list_get_prev(msg_item) ) {
5416 ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref;
5417 msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)msg_item;
5418
5419 num_matches = 0;
5420
5421 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5422 "crcp:bkmrk: send_msg_details: Stage 1: [M/A/D/AD] [%3d/%3d/%3d/%3d] (%s)",
5423 msg_ref->matched, msg_ref->active, msg_ref->done, msg_ref->active_drain,
5424 (msg_ref->msg_type == COORD_MSG_TYPE_B_SEND ? " Send" :
5425 (msg_ref->msg_type == COORD_MSG_TYPE_I_SEND ? "iSend" : "pSend"))
5426 ));
5427
5428
5429 if( 0 >= (msg_ref->active + msg_ref->done) ) {
5430 continue;
5431 }
5432
5433
5434 if(OMPI_SUCCESS != (ret = do_send_msg_detail(peer_ref, msg_ref, &num_matches, &p_total_found, &finished)) ) {
5435 opal_output(mca_crcp_bkmrk_component.super.output_handle,
5436 "crcp:bkmrk: send_msg_details: %s --> %s Failed to send message details to peer. Return %d\n",
5437 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5438 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5439 ret);
5440 }
5441
5442 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5443 "crcp:bkmrk: send_msg_details: Stage 2: [M/A/D/AD] [%3d/%3d/%3d/%3d] (%s) [%3d, %3d, %s] [%3d, %3d]",
5444 msg_ref->matched, msg_ref->active, msg_ref->done, msg_ref->active_drain,
5445 (msg_ref->msg_type == COORD_MSG_TYPE_B_SEND ? " Send" :
5446 (msg_ref->msg_type == COORD_MSG_TYPE_I_SEND ? "iSend" : "pSend")),
5447 num_matches, p_total_found, (finished ? "T" : "F"),
5448 total_details_sent, found
5449 ));
5450
5451 total_details_sent += num_matches;
5452 if(0 < num_matches ) {
5453 found += num_matches;
5454 }
5455 if(finished) {
5456 goto ALL_SENT;
5457 }
5458 }
5459
5460
5461
5462
5463
5464 if( 1 == pass_num ) {
5465 search_list = &(peer_ref->isend_list);
5466 pass_num = 2;
5467 goto SEARCH_AGAIN;
5468 }
5469
5470
5471
5472
5473
5474 if( 2 == pass_num ) {
5475 search_list = &(peer_ref->send_init_list);
5476 pass_num = 3;
5477 goto SEARCH_AGAIN;
5478 }
5479
5480 ALL_SENT:
5481 if( need > found ) {
5482 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5483 "crcp:bkmrk: send_msg_details: ERROR: ****** Need (%d) vs Found (%d)",
5484 need, found));
5485 }
5486 assert(need <= found);
5487
5488
5489
5490
5491
5492 HOKE_DRAIN_ACK_MSG_REF_ALLOC(d_msg_ack);
5493 d_msg_ack->peer.jobid = peer_ref->proc_name.jobid;
5494 d_msg_ack->peer.vpid = peer_ref->proc_name.vpid;
5495
5496 d_msg_ack->complete = false;
5497 opal_list_append(&drained_msg_ack_list, &(d_msg_ack->super));
5498 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5499 "crcp:bkmrk: %s <-> %s Message Inflight! Will wait on ACK from this peer.\n",
5500 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5501 OMPI_NAME_PRINT(&(peer_ref->proc_name))));
5502
5503 END_TIMER(CRCP_TIMER_CKPT_CHECK_PEER_S);
5504 DISPLAY_INDV_TIMER(CRCP_TIMER_CKPT_CHECK_PEER_S, peer_ref->proc_name.vpid, total_details_sent);
5505
5506 return exit_status;
5507 }
5508
5509 static int do_send_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
5510 ompi_crcp_bkmrk_pml_traffic_message_ref_t*msg_ref,
5511 int *num_matches,
5512 int *total_found,
5513 bool *finished)
5514 {
5515 int ret, exit_status = OMPI_SUCCESS;
5516 opal_buffer_t *buffer = NULL;
5517 orte_rml_recv_cb_t *rb = NULL;
5518 int32_t recv_response = RECV_MATCH_RESP_ERROR;
5519 int32_t num_resolv = -1;
5520 int32_t p_total_found = -1;
5521 int comm_my_rank = -1;
5522 int total_sent;
5523
5524 *num_matches = 0;
5525 *total_found = 0;;
5526 *finished = false;
5527
5528 if( NULL != buffer) {
5529 OBJ_RELEASE(buffer);
5530 buffer = NULL;
5531 }
5532
5533 if (NULL == (buffer = OBJ_NEW(opal_buffer_t))) {
5534 exit_status = OMPI_ERROR;
5535 goto cleanup;
5536 }
5537
5538
5539
5540
5541
5542
5543 comm_my_rank = ompi_comm_rank(msg_ref->comm);
5544
5545 PACK_BUFFER(buffer, msg_ref->comm->c_contextid, 1, OPAL_UINT32,
5546 "crcp:bkmrk: send_msg_details: Unable to pack communicator ID");
5547 PACK_BUFFER(buffer, comm_my_rank, 1, OPAL_INT,
5548 "crcp:bkmrk: send_msg_details: Unable to pack comm rank ID");
5549
5550
5551
5552
5553
5554
5555
5556 PACK_BUFFER(buffer, msg_ref->tag, 1, OPAL_INT,
5557 "crcp:bkmrk: send_msg_details: Unable to pack tag");
5558 PACK_BUFFER(buffer, msg_ref->count, 1, OPAL_SIZE,
5559 "crcp:bkmrk: send_msg_details: Unable to pack count");
5560 PACK_BUFFER(buffer, msg_ref->ddt_size, 1, OPAL_SIZE,
5561 "crcp:bkmrk: send_msg_details: Unable to pack datatype size");
5562
5563
5564
5565
5566
5567
5568 total_sent = msg_ref->done + msg_ref->active;
5569 PACK_BUFFER(buffer, total_sent, 1, OPAL_INT,
5570 "crcp:bkmrk: send_msg_details: Unable to pack done+active count");
5571
5572
5573
5574
5575 if (ORTE_SUCCESS != (ret = ompi_rte_send_buffer_nb(&peer_ref->proc_name, buffer,
5576 OMPI_CRCP_COORD_BOOKMARK_TAG,
5577 orte_rml_send_callback, NULL))) {
5578 opal_output(mca_crcp_bkmrk_component.super.output_handle,
5579 "crcp:bkmrk: do_send_msg_detail: Unable to send message details to peer %s: Return %d\n",
5580 OMPI_NAME_PRINT(&peer_ref->proc_name),
5581 ret);
5582
5583 exit_status = OMPI_ERROR;
5584 goto cleanup;
5585 }
5586
5587
5588
5589
5590 rb = OBJ_NEW(orte_rml_recv_cb_t);
5591 rb->active = true;
5592 ompi_rte_recv_buffer_nb(&peer_ref->proc_name, OMPI_CRCP_COORD_BOOKMARK_TAG, 0,
5593 orte_rml_recv_callback, rb);
5594 ORTE_WAIT_FOR_COMPLETION(rb->active);
5595
5596 UNPACK_BUFFER(&rb->data, recv_response, 1, OPAL_UINT32,
5597 "crcp:bkmrk: send_msg_details: Failed to unpack the ACK from peer buffer.");
5598 UNPACK_BUFFER(&rb->data, num_resolv, 1, OPAL_UINT32,
5599 "crcp:bkmrk: send_msg_details: Failed to unpack the num_resolv from peer buffer.");
5600 UNPACK_BUFFER(&rb->data, p_total_found, 1, OPAL_UINT32,
5601 "crcp:bkmrk: send_msg_details: Failed to unpack the total_found from peer buffer.");
5602
5603 OBJ_RELEASE(rb);
5604
5605 msg_ref->matched += num_resolv;
5606 *num_matches = num_resolv;
5607 *total_found = p_total_found;
5608
5609
5610
5611
5612 if( RECV_MATCH_RESP_DONE == recv_response ) {
5613 *finished = true;
5614 }
5615 else if( RECV_MATCH_RESP_MORE == recv_response ) {
5616 *finished = false;
5617 }
5618
5619 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
5620 "**************************\n"));
5621 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
5622 "send_msg_details: %d, %d = %s [%d / %d]\n",
5623 *num_matches, *total_found,
5624 (*finished ? "Done" : "Continue..."),
5625 msg_ref->done, msg_ref->active));
5626 TRAFFIC_MSG_DUMP_MSG_INDV(15, (msg_ref, "", false));
5627 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
5628 "**************************\n"));
5629
5630 cleanup:
5631 return exit_status;
5632 }
5633
5634
5635
5636
5637
5638 static int recv_msg_details(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
5639 int total_recv, int total_matched)
5640 {
5641 int need, found;
5642 int response;
5643 int exit_status = OMPI_SUCCESS;
5644 int ret;
5645 int total_details_recv = 0;
5646
5647 need = total_recv - total_matched;
5648 found = 0;
5649
5650 assert( need > 0);
5651
5652 START_TIMER(CRCP_TIMER_CKPT_CHECK_PEER_R);
5653
5654
5655
5656
5657 while(need > found) {
5658 uint32_t p_comm_id;
5659 size_t p_count;
5660 size_t p_datatype_size;
5661 int p_rank;
5662 int p_tag;
5663 int p_num_sent;
5664 int num_resolved = 0;
5665
5666
5667
5668
5669 if( OMPI_SUCCESS != (ret = do_recv_msg_detail(peer_ref,
5670 &p_rank, &p_comm_id,
5671 &p_tag, &p_count,
5672 &p_datatype_size,
5673 &p_num_sent)) ) {
5674 opal_output(mca_crcp_bkmrk_component.super.output_handle,
5675 "crcp:bkmrk: recv_msg_details: %s <-- %s "
5676 "Failed to receive message detail from peer. Return %d\n",
5677 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5678 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5679 ret);
5680 exit_status = ret;
5681 goto cleanup;
5682 }
5683
5684
5685
5686
5687
5688 num_resolved = 0;
5689 if( OMPI_SUCCESS != (ret = do_recv_msg_detail_check_drain(peer_ref,
5690 p_rank, p_comm_id,
5691 p_tag, p_count,
5692 p_datatype_size,
5693 p_num_sent,
5694 &num_resolved)) ) {
5695 opal_output(mca_crcp_bkmrk_component.super.output_handle,
5696 "crcp:bkmrk: recv_msg_details: %s <-- %s "
5697 "Failed to check message detail from peer. Return %d\n",
5698 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5699 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5700 ret);
5701 exit_status = ret;
5702 goto cleanup;
5703 }
5704
5705 found += num_resolved;
5706 total_details_recv += num_resolved;
5707
5708 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5709 "crcp:bkmrk: %s <-- %s Recv Detail: Stage --: [%3d / %3d] [%3d, %3d, %s]",
5710 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5711 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5712 need, found,
5713 num_resolved, total_details_recv,
5714 ( need <= found ? "T" : "F") ));
5715
5716
5717 if( need <= found ) {
5718 response = RECV_MATCH_RESP_DONE;
5719 }
5720
5721 else {
5722 response = RECV_MATCH_RESP_MORE;
5723 }
5724
5725 if(OMPI_SUCCESS != (ret = do_recv_msg_detail_resp(peer_ref, response, num_resolved, found))) {
5726 opal_output(mca_crcp_bkmrk_component.super.output_handle,
5727 "crcp:bkmrk: recv_msg_details: %s <-- %s Failed to respond to peer. Return %d\n",
5728 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5729 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5730 ret);
5731 exit_status = ret;
5732 goto cleanup;
5733 }
5734 }
5735
5736 cleanup:
5737
5738 END_TIMER(CRCP_TIMER_CKPT_CHECK_PEER_R);
5739 DISPLAY_INDV_TIMER(CRCP_TIMER_CKPT_CHECK_PEER_R, peer_ref->proc_name.vpid, total_details_recv);
5740
5741 return exit_status;
5742 }
5743
5744 static int do_recv_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
5745 int *rank, uint32_t *comm_id, int *tag,
5746 size_t *count, size_t *datatype_size,
5747 int *p_num_sent)
5748 {
5749 orte_rml_recv_cb_t *rb = NULL;
5750 int exit_status = OMPI_SUCCESS;
5751 int ret;
5752
5753
5754
5755
5756 rb = OBJ_NEW(orte_rml_recv_cb_t);
5757 rb->active = true;
5758 ompi_rte_recv_buffer_nb(&peer_ref->proc_name, OMPI_CRCP_COORD_BOOKMARK_TAG, 0, orte_rml_recv_callback, rb);
5759 ORTE_WAIT_FOR_COMPLETION(rb->active);
5760
5761
5762 UNPACK_BUFFER(&rb->data, (*comm_id), 1, OPAL_UINT32,
5763 "crcp:bkmrk: recv_msg_details: Failed to unpack the communicator ID");
5764 UNPACK_BUFFER(&rb->data, (*rank), 1, OPAL_INT,
5765 "crcp:bkmrk: recv_msg_details: Failed to unpack the communicator rank ID");
5766
5767
5768 UNPACK_BUFFER(&rb->data, (*tag), 1, OPAL_INT,
5769 "crcp:bkmrk: recv_msg_details: Failed to unpack the tag");
5770 UNPACK_BUFFER(&rb->data, (*count), 1, OPAL_SIZE,
5771 "crcp:bkmrk: recv_msg_details: Failed to unpack the count");
5772 UNPACK_BUFFER(&rb->data, (*datatype_size), 1, OPAL_SIZE,
5773 "crcp:bkmrk: recv_msg_details: Failed to unpack the datatype size");
5774
5775
5776 UNPACK_BUFFER(&rb->data, (*p_num_sent), 1, OPAL_INT,
5777 "crcp:bkmrk: recv_msg_details: Failed to unpack the sent count");
5778
5779 cleanup:
5780 OBJ_RELEASE(rb);
5781 return exit_status;
5782 }
5783
5784 static int do_recv_msg_detail_check_drain(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
5785 int rank, uint32_t comm_id, int tag,
5786 size_t count, size_t datatype_size,
5787 int p_num_sent,
5788 int *num_resolved)
5789 {
5790 int ret, exit_status = OMPI_SUCCESS;
5791 ompi_crcp_bkmrk_pml_traffic_message_ref_t *posted_tmp_msg_ref = NULL;
5792 ompi_crcp_bkmrk_pml_traffic_message_ref_t *posted_recv_msg_ref = NULL;
5793 ompi_crcp_bkmrk_pml_traffic_message_ref_t *posted_irecv_msg_ref = NULL;
5794 ompi_crcp_bkmrk_pml_traffic_message_ref_t *posted_precv_msg_ref = NULL;
5795 ompi_crcp_bkmrk_pml_traffic_message_ref_t *posted_unknown_recv_msg_ref = NULL;
5796 ompi_crcp_bkmrk_pml_traffic_message_ref_t *posted_unknown_precv_msg_ref = NULL;
5797
5798 int num_left_unresolved = 0;
5799
5800 int num_still_active = 0;
5801
5802 int num_posted = 0;
5803
5804 *num_resolved = 0;
5805 num_left_unresolved = p_num_sent;
5806
5807 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5808 "crcp:bkmrk: %s <-- %s "
5809 "Stage 0: Ck.Drain: [TR %3d/MS %3d] sent %4d, unres %4d, res %4d",
5810 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5811 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5812 peer_ref->total_msgs_recvd,
5813 peer_ref->matched_msgs_sent,
5814 p_num_sent,
5815 num_left_unresolved,
5816 *num_resolved));
5817 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "Recv Check...", true));
5818
5819
5820
5821
5822 ret = traffic_message_find_recv(peer_ref,
5823 rank, comm_id, tag, count, datatype_size,
5824 &posted_recv_msg_ref,
5825 &posted_irecv_msg_ref,
5826 &posted_precv_msg_ref,
5827 &posted_unknown_recv_msg_ref,
5828 &posted_unknown_precv_msg_ref);
5829 if( OMPI_SUCCESS != ret) {
5830 opal_output(mca_crcp_bkmrk_component.super.output_handle,
5831 "crcp:bkmrk: recv_msg_detail_check: %s -- %s "
5832 "Failed to determine if we have received this message. Return %d\n",
5833 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5834 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5835 ret);
5836 exit_status = ret;
5837 goto cleanup;
5838 }
5839
5840
5841
5842
5843
5844
5845
5846
5847
5848
5849
5850
5851
5852
5853
5854 if( NULL != posted_recv_msg_ref ) {
5855 posted_recv_msg_ref->matched += posted_recv_msg_ref->done;
5856 num_left_unresolved -= posted_recv_msg_ref->done;
5857 TRAFFIC_MSG_DUMP_MSG_INDV(11, (posted_recv_msg_ref, "Ck. Recv", true));
5858 }
5859 if( NULL != posted_irecv_msg_ref ) {
5860 posted_irecv_msg_ref->matched += posted_irecv_msg_ref->done;
5861 num_left_unresolved -= posted_irecv_msg_ref->done;
5862 TRAFFIC_MSG_DUMP_MSG_INDV(11, (posted_irecv_msg_ref, "Ck. iRecv", true));
5863 }
5864 if( NULL != posted_precv_msg_ref ) {
5865 posted_precv_msg_ref->matched += posted_precv_msg_ref->done;
5866 num_left_unresolved -= posted_precv_msg_ref->done;
5867 TRAFFIC_MSG_DUMP_MSG_INDV(11, (posted_precv_msg_ref, "Ck. pRecv", true));
5868 }
5869 if( NULL != posted_unknown_recv_msg_ref ) {
5870 posted_unknown_recv_msg_ref->matched += posted_unknown_recv_msg_ref->done;
5871 num_left_unresolved -= posted_unknown_recv_msg_ref->done;
5872 TRAFFIC_MSG_DUMP_MSG_INDV(11, (posted_unknown_recv_msg_ref, "Ck. uRecv", true));
5873 }
5874 if( NULL != posted_unknown_precv_msg_ref ) {
5875 posted_unknown_precv_msg_ref->matched += posted_unknown_precv_msg_ref->done;
5876 num_left_unresolved -= posted_unknown_precv_msg_ref->done;
5877 TRAFFIC_MSG_DUMP_MSG_INDV(11, (posted_unknown_precv_msg_ref, "Ck. upRecv", true));
5878 }
5879
5880 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5881 "crcp:bkmrk: %s <-- %s "
5882 "Stage 1: Ck.Drain: [TR %3d/MS %3d] sent %4d, unres %4d, res %4d",
5883 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5884 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5885 peer_ref->total_msgs_recvd,
5886 peer_ref->matched_msgs_sent,
5887 p_num_sent,
5888 num_left_unresolved,
5889 *num_resolved));
5890
5891
5892
5893
5894
5895 if( num_left_unresolved <= 0 ) {
5896 goto cleanup;
5897 }
5898
5899
5900
5901
5902
5903
5904 if( NULL != posted_recv_msg_ref ) {
5905 if( posted_recv_msg_ref->active > num_left_unresolved ) {
5906 posted_recv_msg_ref->matched += num_left_unresolved;
5907 num_still_active += num_left_unresolved;
5908 num_left_unresolved = 0;
5909 } else {
5910 posted_recv_msg_ref->matched += posted_recv_msg_ref->active;
5911 num_still_active += posted_recv_msg_ref->active;
5912 num_left_unresolved -= posted_recv_msg_ref->active;
5913 }
5914 }
5915 if( num_left_unresolved > 0 && NULL != posted_irecv_msg_ref ) {
5916 if( posted_irecv_msg_ref->active > num_left_unresolved ) {
5917 posted_irecv_msg_ref->matched += num_left_unresolved;
5918 num_still_active += num_left_unresolved;
5919 num_left_unresolved = 0;
5920 } else {
5921 posted_irecv_msg_ref->matched += posted_irecv_msg_ref->active;
5922 num_still_active += posted_irecv_msg_ref->active;
5923 num_left_unresolved -= posted_irecv_msg_ref->active;
5924 }
5925 }
5926 if( num_left_unresolved > 0 && NULL != posted_precv_msg_ref ) {
5927 if( posted_precv_msg_ref->active > num_left_unresolved ) {
5928 posted_precv_msg_ref->matched += num_left_unresolved;
5929 num_still_active += num_left_unresolved;
5930 num_left_unresolved = 0;
5931 } else {
5932 posted_precv_msg_ref->matched += posted_precv_msg_ref->active;
5933 num_still_active += posted_precv_msg_ref->active;
5934 num_left_unresolved -= posted_precv_msg_ref->active;
5935 }
5936 }
5937 if( num_left_unresolved > 0 && NULL != posted_unknown_recv_msg_ref ) {
5938 if( posted_unknown_recv_msg_ref->active > num_left_unresolved ) {
5939 posted_unknown_recv_msg_ref->matched += num_left_unresolved;
5940 num_still_active += num_left_unresolved;
5941 num_left_unresolved = 0;
5942 } else {
5943 posted_unknown_recv_msg_ref->matched += posted_unknown_recv_msg_ref->active;
5944 num_still_active += posted_unknown_recv_msg_ref->active;
5945 num_left_unresolved -= posted_unknown_recv_msg_ref->active;
5946 }
5947 }
5948 if( num_left_unresolved > 0 && NULL != posted_unknown_precv_msg_ref ) {
5949 if( posted_unknown_precv_msg_ref->active > num_left_unresolved ) {
5950 posted_unknown_precv_msg_ref->matched += num_left_unresolved;
5951 num_still_active += num_left_unresolved;
5952 num_left_unresolved = 0;
5953 } else {
5954 posted_unknown_precv_msg_ref->matched += posted_unknown_precv_msg_ref->active;
5955 num_still_active += posted_unknown_precv_msg_ref->active;
5956 num_left_unresolved -= posted_unknown_precv_msg_ref->active;
5957 }
5958 }
5959
5960
5961
5962
5963
5964
5965 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5966 "crcp:bkmrk: %s <-- %s "
5967 "Stage 2: Ck.Drain: [TR %3d/MS %3d] sent %4d, unres %4d, res %4d, active %4d",
5968 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5969 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5970 peer_ref->total_msgs_recvd,
5971 peer_ref->matched_msgs_sent,
5972 p_num_sent,
5973 num_left_unresolved,
5974 *num_resolved,
5975 num_still_active
5976 ));
5977
5978
5979
5980
5981 if(num_left_unresolved < 0 ) {
5982 ERROR_SHOULD_NEVER_HAPPEN_ARG("crcp:bkmrk: Ck.Drain: Unresolved (%3d) < 0", num_left_unresolved);
5983 exit_status = OMPI_ERROR;
5984 goto cleanup;
5985 }
5986
5987
5988
5989
5990
5991
5992 if( num_left_unresolved <= 0 &&
5993 num_still_active <= 0) {
5994 goto cleanup;
5995 }
5996
5997
5998
5999
6000
6001
6002
6003
6004 if( num_still_active > 0 ) {
6005
6006
6007
6008
6009
6010 if( NULL != posted_recv_msg_ref ) {
6011
6012 if (current_msg_id == posted_recv_msg_ref->msg_id &&
6013 COORD_MSG_TYPE_B_RECV == posted_recv_msg_ref->msg_type) {
6014 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
6015 "crcp:bkmrk: %s <-- %s "
6016 "Recv Check: Found a message that is 'active'! Prepare to STALL.\n",
6017 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
6018 OMPI_NAME_PRINT(&(peer_ref->proc_name)) ));
6019 stall_for_completion = true;
6020 }
6021 else {
6022 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
6023 "crcp:bkmrk: %s <-- %s "
6024 "Recv Check: Found a message that is 'active', but is not the current recv! "
6025 "No stall required [%3d, %3d, %3d, %3d].\n",
6026 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
6027 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
6028 (int)current_msg_id,
6029 (int)current_msg_type,
6030 (int)posted_recv_msg_ref->msg_id,
6031 (int)posted_recv_msg_ref->msg_type));
6032 }
6033 }
6034
6035
6036
6037
6038
6039
6040
6041 traffic_message_create_drain_message(false, num_still_active,
6042 peer_ref,
6043 &posted_recv_msg_ref,
6044 &num_posted);
6045 num_still_active -= num_posted;
6046 *num_resolved += num_posted;
6047 peer_ref->total_msgs_recvd += num_posted;
6048
6049 traffic_message_create_drain_message(false, num_still_active,
6050 peer_ref,
6051 &posted_irecv_msg_ref,
6052 &num_posted);
6053 num_still_active -= num_posted;
6054 *num_resolved += num_posted;
6055 peer_ref->total_msgs_recvd += num_posted;
6056
6057 traffic_message_create_drain_message(false, num_still_active,
6058 peer_ref,
6059 &posted_precv_msg_ref,
6060 &num_posted);
6061 num_still_active -= num_posted;
6062 *num_resolved += num_posted;
6063 peer_ref->total_msgs_recvd += num_posted;
6064
6065 traffic_message_create_drain_message(false, num_still_active,
6066 peer_ref,
6067 &posted_unknown_recv_msg_ref,
6068 &num_posted);
6069 num_still_active -= num_posted;
6070 *num_resolved += num_posted;
6071 peer_ref->total_msgs_recvd += num_posted;
6072
6073 traffic_message_create_drain_message(false, num_still_active,
6074 peer_ref,
6075 &posted_unknown_precv_msg_ref,
6076 &num_posted);
6077 num_still_active -= num_posted;
6078 *num_resolved += num_posted;
6079 peer_ref->total_msgs_recvd += num_posted;
6080 }
6081
6082 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
6083 "crcp:bkmrk: %s <-- %s "
6084 "Stage 3: Ck.Drain: [TR %3d/MS %3d] sent %4d, unres %4d, res %4d, active %4d",
6085 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
6086 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
6087 peer_ref->total_msgs_recvd,
6088 peer_ref->matched_msgs_sent,
6089 p_num_sent,
6090 num_left_unresolved,
6091 *num_resolved,
6092 num_still_active
6093 ));
6094
6095
6096
6097
6098
6099
6100 if( num_left_unresolved > 0 ) {
6101
6102 CREATE_NEW_MSG(posted_tmp_msg_ref, COORD_MSG_TYPE_I_RECV,
6103 count, datatype_size, tag, rank,
6104 ompi_comm_lookup(comm_id),
6105 peer_ref->proc_name.jobid,
6106 peer_ref->proc_name.vpid);
6107
6108 traffic_message_create_drain_message(true, num_left_unresolved,
6109 peer_ref,
6110 &posted_tmp_msg_ref,
6111 &num_posted);
6112 num_left_unresolved -= num_posted;
6113 *num_resolved += num_posted;
6114 peer_ref->total_msgs_recvd += num_posted;
6115
6116 HOKE_TRAFFIC_MSG_REF_RETURN(posted_tmp_msg_ref);
6117 }
6118
6119 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
6120 "crcp:bkmrk: %s <-- %s "
6121 "Stage 4: Ck.Drain: [TR %3d/MS %3d] sent %4d, unres %4d, res %4d, active %4d",
6122 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
6123 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
6124 peer_ref->total_msgs_recvd,
6125 peer_ref->matched_msgs_sent,
6126 p_num_sent,
6127 num_left_unresolved,
6128 *num_resolved,
6129 num_still_active
6130 ));
6131
6132
6133 cleanup:
6134 return exit_status;
6135 }
6136
6137 static int do_recv_msg_detail_resp(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
6138 int resp, int num_resolv, int total_found)
6139 {
6140 opal_buffer_t * buffer = NULL;
6141 int exit_status = OMPI_SUCCESS;
6142 int ret;
6143
6144 if (NULL == (buffer = OBJ_NEW(opal_buffer_t))) {
6145 exit_status = OMPI_ERROR;
6146 goto cleanup;
6147 }
6148
6149 PACK_BUFFER(buffer, resp, 1, OPAL_UINT32,
6150 "crcp:bkmrk: recv_msg_details: Unable to ask peer for more messages");
6151 PACK_BUFFER(buffer, num_resolv, 1, OPAL_UINT32,
6152 "crcp:bkmrk: recv_msg_details: Unable to ask peer for more messages");
6153 PACK_BUFFER(buffer, total_found, 1, OPAL_UINT32,
6154 "crcp:bkmrk: recv_msg_details: Unable to ask peer for more messages");
6155
6156 if (ORTE_SUCCESS != (ret = ompi_rte_send_buffer_nb(&peer_ref->proc_name, buffer,
6157 OMPI_CRCP_COORD_BOOKMARK_TAG,
6158 orte_rml_send_callback, NULL))) {
6159 opal_output(mca_crcp_bkmrk_component.super.output_handle,
6160 "crcp:bkmrk: recv_msg_detail_resp: Unable to send message detail response to peer %s: Return %d\n",
6161 OMPI_NAME_PRINT(&peer_ref->proc_name),
6162 ret);
6163 exit_status = OMPI_ERROR;
6164 goto cleanup;
6165 }
6166
6167 cleanup:
6168 if( NULL != buffer) {
6169 OBJ_RELEASE(buffer);
6170 buffer = NULL;
6171 }
6172
6173 return exit_status;
6174 }
6175
6176
6177
6178
6179
6180 static void start_time(int idx) {
6181 if(idx < CRCP_TIMER_MAX ) {
6182 timer_start[idx] = get_time();
6183 }
6184 }
6185
6186 static void end_time(int idx) {
6187 if(idx < CRCP_TIMER_MAX ) {
6188 timer_end[idx] = get_time();
6189 }
6190 }
6191
6192 static double get_time() {
6193 double wtime;
6194
6195 #if OPAL_TIMER_USEC_NATIVE
6196 wtime = (double)opal_timer_base_get_usec() / 1000000.0;
6197 #else
6198 struct timeval tv;
6199 gettimeofday(&tv, NULL);
6200 wtime = tv.tv_sec;
6201 wtime += (double)tv.tv_usec / 1000000.0;
6202 #endif
6203
6204 return wtime;
6205 }
6206
6207 static void clear_timers(void) {
6208 int i;
6209 for(i = 0; i < CRCP_TIMER_MAX; ++i) {
6210 timer_start[i] = 0.0;
6211 timer_end[i] = 0.0;
6212 }
6213 }
6214
6215 static void display_all_timers(int state) {
6216 bool report_ready = false;
6217 double barrier_start, barrier_stop;
6218 int i, ret;
6219
6220 if( 0 != OMPI_PROC_MY_NAME->vpid ) {
6221 if( 2 > timing_enabled ) {
6222 return;
6223 }
6224 else if( 2 == timing_enabled ) {
6225 if( OPAL_SUCCESS != (ret = opal_pmix.fence(NULL, 0))) {
6226 OPAL_ERROR_LOG(ret);
6227 }
6228 return;
6229 }
6230 }
6231
6232 for( i = 0; i < CRCP_TIMER_MAX; ++i) {
6233 if(timer_end[i] > 0.001) {
6234 report_ready = true;
6235 }
6236 }
6237 if( !report_ready ) {
6238 return;
6239 }
6240
6241 opal_output(0, "crcp:bkmrk: timing(%20s): ******************** Begin: [State = %12s]\n", "Summary", opal_crs_base_state_str(state));
6242 for( i = 0; i < CRCP_TIMER_MAX; ++i) {
6243 display_indv_timer_core(i, 0, 0, false);
6244 }
6245
6246 if( timing_enabled >= 2) {
6247 barrier_start = get_time();
6248 if( OPAL_SUCCESS != (ret = opal_pmix.fence(NULL, 0))) {
6249 OPAL_ERROR_LOG(ret);
6250 }
6251 barrier_stop = get_time();
6252 opal_output(0,
6253 "crcp:bkmrk: timing(%20s): %20s = %10.2f s\n",
6254 "",
6255 "Group Barrier",
6256 (barrier_stop - barrier_start));
6257 }
6258
6259 opal_output(0, "crcp:bkmrk: timing(%20s): ******************** End: [State = %12s]\n", "Summary", opal_crs_base_state_str(state));
6260
6261 }
6262
6263 static void display_indv_timer(int idx, int proc, int msgs) {
6264 display_indv_timer_core(idx, proc, msgs, true);
6265 }
6266
6267 static void display_indv_timer_core(int idx, int proc, int msgs, bool direct) {
6268 double diff = timer_end[idx] - timer_start[idx];
6269 char * str = NULL;
6270
6271 if( 0 != OMPI_PROC_MY_NAME->vpid && timing_enabled < 3 ) {
6272 return;
6273 }
6274
6275
6276 if(timer_end[idx] <= 0.001) {
6277 return;
6278 }
6279
6280 switch(idx) {
6281 case CRCP_TIMER_CKPT_EX_PEER_S:
6282 case CRCP_TIMER_CKPT_EX_PEER_R:
6283 case CRCP_TIMER_CKPT_CHECK_PEER_S:
6284 case CRCP_TIMER_CKPT_CHECK_PEER_R:
6285
6286
6287 if( direct && timing_enabled >= 2) {
6288 opal_asprintf(&str, "Proc %2d, Msg %5d", proc, msgs);
6289 } else {
6290 return;
6291 }
6292 break;
6293 default:
6294 str = strdup("");
6295 break;
6296 }
6297
6298 opal_output(0,
6299 "crcp:bkmrk: timing(%20s): %20s = %10.2f s\n",
6300 str,
6301 timer_label[idx],
6302 diff);
6303 free(str);
6304 str = NULL;
6305 }
6306
6307
6308 #if OPAL_ENABLE_DEBUG
6309 static void traffic_message_dump_msg_content_indv(ompi_crcp_bkmrk_pml_message_content_ref_t * content_ref)
6310 {
6311 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
6312 "\t\t(%3d) Content: [A/D/P/Dr] [%s / %s / %s /%s]",
6313 (int)content_ref->msg_id,
6314 (content_ref->active ? "T" : "F"),
6315 (content_ref->done ? "T" : "F"),
6316 (content_ref->already_posted ? "T" : "F"),
6317 (content_ref->already_drained ? "T" : "F")));
6318 }
6319
6320 static void traffic_message_dump_msg_indv(ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref, char * msg, bool vshort)
6321 {
6322 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
6323 opal_list_item_t* cont_item = NULL;
6324 char * type_name = NULL;
6325
6326 switch(msg_ref->msg_type) {
6327 case COORD_MSG_TYPE_B_SEND:
6328 type_name = strdup(" Send");
6329 break;
6330 case COORD_MSG_TYPE_I_SEND:
6331 type_name = strdup("iSend");
6332 break;
6333 case COORD_MSG_TYPE_P_SEND:
6334 type_name = strdup("pSend");
6335 break;
6336 case COORD_MSG_TYPE_B_RECV:
6337 type_name = strdup(" Recv");
6338 break;
6339 case COORD_MSG_TYPE_I_RECV:
6340 type_name = strdup("iRecv");
6341 break;
6342 case COORD_MSG_TYPE_P_RECV:
6343 type_name = strdup("pRecv");
6344 break;
6345 default:
6346 type_name = strdup("Unknown");
6347 break;
6348 }
6349
6350 if( !vshort ) {
6351 opal_output(0, "\t%s %10s (%3d): [m %3d/d %3d/a %3d/ad %3d/p %3d] Contents %2d ... count %6d, tag %6d, rank %3d",
6352 type_name,
6353 msg,
6354 (int)msg_ref->msg_id,
6355 msg_ref->matched,
6356 msg_ref->done,
6357 msg_ref->active,
6358 msg_ref->active_drain,
6359 msg_ref->posted,
6360 (int)opal_list_get_size(&msg_ref->msg_contents),
6361 (int)msg_ref->count,
6362 msg_ref->tag,
6363 msg_ref->rank);
6364 } else {
6365 opal_output(0, "\t%s %10s (%3d): [m %3d/d %3d/a %3d/ad %3d/p %3d] Contents %2d ... count %6d",
6366 type_name,
6367 msg,
6368 (int)msg_ref->msg_id,
6369 msg_ref->matched,
6370 msg_ref->done,
6371 msg_ref->active,
6372 msg_ref->active_drain,
6373 msg_ref->posted,
6374 (int)opal_list_get_size(&msg_ref->msg_contents),
6375 (int)msg_ref->count);
6376 }
6377
6378 free(type_name);
6379
6380 for(cont_item = opal_list_get_first(&(msg_ref->msg_contents));
6381 cont_item != opal_list_get_end( &(msg_ref->msg_contents));
6382 cont_item = opal_list_get_next(cont_item) ) {
6383 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)cont_item;
6384
6385 traffic_message_dump_msg_content_indv(content_ref);
6386 }
6387 }
6388
6389 static void traffic_message_dump_drain_msg_indv(ompi_crcp_bkmrk_pml_drain_message_ref_t * msg_ref, char * msg, bool vshort)
6390 {
6391 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
6392 opal_list_item_t* cont_item = NULL;
6393 char * type_name = NULL;
6394
6395 switch(msg_ref->msg_type) {
6396 case COORD_MSG_TYPE_B_SEND:
6397 type_name = strdup(" Send");
6398 break;
6399 case COORD_MSG_TYPE_I_SEND:
6400 type_name = strdup("iSend");
6401 break;
6402 case COORD_MSG_TYPE_P_SEND:
6403 type_name = strdup("pSend");
6404 break;
6405 case COORD_MSG_TYPE_B_RECV:
6406 type_name = strdup(" Recv");
6407 break;
6408 case COORD_MSG_TYPE_I_RECV:
6409 type_name = strdup("iRecv");
6410 break;
6411 case COORD_MSG_TYPE_P_RECV:
6412 type_name = strdup("pRecv");
6413 break;
6414 default:
6415 type_name = strdup("Unknown");
6416 break;
6417 }
6418
6419 if( !vshort ) {
6420 opal_output(0, "\t%s %10s (%3d): [d %3d/a %3d] Contents %2d ... count %6d, tag %6d, rank %3d",
6421 type_name,
6422 msg,
6423 (int)msg_ref->msg_id,
6424 msg_ref->done,
6425 msg_ref->active,
6426 (int)opal_list_get_size(&msg_ref->msg_contents),
6427 (int)msg_ref->count,
6428 msg_ref->tag,
6429 msg_ref->rank);
6430 } else {
6431 opal_output(0, "\t%s %10s (%3d): [d %3d/a %3d] Contents %2d ... count %6d",
6432 type_name,
6433 msg,
6434 (int)msg_ref->msg_id,
6435 msg_ref->done,
6436 msg_ref->active,
6437 (int)opal_list_get_size(&msg_ref->msg_contents),
6438 (int)msg_ref->count);
6439 }
6440
6441 free(type_name);
6442
6443 for(cont_item = opal_list_get_first(&(msg_ref->msg_contents));
6444 cont_item != opal_list_get_end( &(msg_ref->msg_contents));
6445 cont_item = opal_list_get_next(cont_item) ) {
6446 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)cont_item;
6447
6448 traffic_message_dump_msg_content_indv(content_ref);
6449 }
6450 }
6451
6452 static void traffic_message_dump_msg_list(opal_list_t *msg_list, bool is_drain)
6453 {
6454 opal_list_item_t* item = NULL;
6455 ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref = NULL;
6456 ompi_crcp_bkmrk_pml_drain_message_ref_t * drain_msg_ref = NULL;
6457
6458 for(item = opal_list_get_last(msg_list);
6459 item != opal_list_get_begin(msg_list);
6460 item = opal_list_get_prev(item) ) {
6461 if( !is_drain ) {
6462 msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)item;
6463 traffic_message_dump_msg_indv(msg_ref, "", false);
6464 } else {
6465 drain_msg_ref = (ompi_crcp_bkmrk_pml_drain_message_ref_t*)item;
6466 traffic_message_dump_drain_msg_indv(drain_msg_ref, "Drain", false);
6467 }
6468 }
6469 }
6470
6471 static void traffic_message_dump_peer(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref, char * msg, bool root_only)
6472 {
6473 if( root_only && ompi_process_info.my_name.vpid != 0 ) {
6474 return;
6475 } else {
6476 sleep(ompi_process_info.my_name.vpid * 2);
6477 }
6478
6479 opal_output(0, "------------- %s ---------------------------------", msg);
6480 opal_output(0, "%s <-> %s Totals Sent [ %3d / %3d ] Recv [ %3d / %3d ]",
6481 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
6482 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
6483 peer_ref->total_msgs_sent,
6484 peer_ref->matched_msgs_sent,
6485 peer_ref->total_msgs_recvd,
6486 peer_ref->matched_msgs_recvd);
6487 opal_output(0, "\n");
6488
6489 traffic_message_dump_msg_list(&(peer_ref->send_list), false);
6490 traffic_message_dump_msg_list(&(peer_ref->isend_list), false);
6491 traffic_message_dump_msg_list(&(peer_ref->send_init_list), false);
6492
6493 traffic_message_dump_msg_list(&(peer_ref->recv_list), false);
6494 traffic_message_dump_msg_list(&(peer_ref->irecv_list), false);
6495 traffic_message_dump_msg_list(&(peer_ref->recv_init_list), false);
6496
6497 traffic_message_dump_msg_list(&(peer_ref->drained_list), true);
6498
6499 opal_output(0, "--------------------------------------------------");
6500 usleep(250000);
6501 }
6502 #endif