This source file includes following definitions.
- osc_pt2pt_accumulate_data_constructor
- osc_pt2pt_accumulate_data_destructor
- osc_pt2pt_pending_acc_constructor
- osc_pt2pt_pending_acc_destructor
- ompi_osc_pt2pt_ddt_buffer_constructor
- ompi_osc_pt2pt_ddt_buffer_destructor
- datatype_buffer_length
- ompi_osc_pt2pt_control_send
- ompi_osc_pt2pt_control_send_unbuffered_cb
- ompi_osc_pt2pt_control_send_unbuffered
- datatype_create
- process_put
- process_put_long
- osc_pt2pt_incoming_req_complete
- osc_pt2pt_get_post_send_cb
- osc_pt2pt_get_post_send
- process_get
- osc_pt2pt_accumulate_buffer
- osc_pt2pt_accumulate_allocate
- accumulate_cb
- ompi_osc_pt2pt_acc_op_queue
- replace_cb
- ompi_osc_pt2pt_acc_start
- ompi_osc_pt2pt_acc_long_start
- ompi_osc_pt2pt_gacc_start
- ompi_osc_gacc_long_start
- ompi_osc_pt2pt_cswap_start
- ompi_osc_pt2pt_progress_pending_acc
- process_acc
- process_acc_long
- process_get_acc
- process_get_acc_long
- process_cswap
- process_complete
- process_flush
- process_unlock
- process_large_datatype_request_cb
- process_large_datatype_request
- process_frag
- ompi_osc_pt2pt_callback
- ompi_osc_pt2pt_receive_repost
- ompi_osc_pt2pt_process_receive
- ompi_osc_pt2pt_frag_start_receive
- ompi_osc_pt2pt_component_irecv
- ompi_osc_pt2pt_isend_w_cb
- ompi_osc_pt2pt_irecv_w_cb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 #include "osc_pt2pt.h"
26 #include "osc_pt2pt_header.h"
27 #include "osc_pt2pt_data_move.h"
28 #include "osc_pt2pt_frag.h"
29 #include "osc_pt2pt_request.h"
30
31 #include "opal/util/arch.h"
32 #include "opal/sys/atomic.h"
33 #include "opal/align.h"
34
35 #include "ompi/mca/pml/pml.h"
36 #include "ompi/mca/pml/base/pml_base_sendreq.h"
37 #include "opal/mca/btl/btl.h"
38 #include "ompi/mca/osc/base/osc_base_obj_convert.h"
39 #include "ompi/datatype/ompi_datatype.h"
40 #include "ompi/op/op.h"
41 #include "ompi/memchecker.h"
42
43
44
45
46
47
48 struct osc_pt2pt_accumulate_data_t {
49 opal_list_item_t super;
50 ompi_osc_pt2pt_module_t* module;
51 void *target;
52 void *source;
53 size_t source_len;
54 ompi_proc_t *proc;
55 int count;
56 int peer;
57 ompi_datatype_t *datatype;
58 ompi_op_t *op;
59 opal_atomic_int32_t request_count;
60 };
61 typedef struct osc_pt2pt_accumulate_data_t osc_pt2pt_accumulate_data_t;
62
63 static void osc_pt2pt_accumulate_data_constructor (osc_pt2pt_accumulate_data_t *acc_data)
64 {
65 acc_data->source = NULL;
66 acc_data->datatype = NULL;
67 acc_data->op = NULL;
68 }
69
70 static void osc_pt2pt_accumulate_data_destructor (osc_pt2pt_accumulate_data_t *acc_data)
71 {
72 if (acc_data->source) {
73
74 free (acc_data->source);
75 }
76
77 if (acc_data->datatype) {
78 OMPI_DATATYPE_RELEASE(acc_data->datatype);
79 }
80 }
81
82 OBJ_CLASS_DECLARATION(osc_pt2pt_accumulate_data_t);
83 OBJ_CLASS_INSTANCE(osc_pt2pt_accumulate_data_t, opal_list_item_t, osc_pt2pt_accumulate_data_constructor,
84 osc_pt2pt_accumulate_data_destructor);
85
86
87
88
89
90
91
92
93
94
95
96
97
98 struct osc_pt2pt_pending_acc_t {
99 opal_list_item_t super;
100 ompi_osc_pt2pt_header_t header;
101 int source;
102 void *data;
103 size_t data_len;
104 ompi_datatype_t *datatype;
105 bool active_target;
106 };
107 typedef struct osc_pt2pt_pending_acc_t osc_pt2pt_pending_acc_t;
108
109 static void osc_pt2pt_pending_acc_constructor (osc_pt2pt_pending_acc_t *pending)
110 {
111 pending->data = NULL;
112 pending->datatype = NULL;
113 }
114
115 static void osc_pt2pt_pending_acc_destructor (osc_pt2pt_pending_acc_t *pending)
116 {
117 if (NULL != pending->data) {
118 free (pending->data);
119 }
120
121 if (NULL != pending->datatype) {
122 OMPI_DATATYPE_RELEASE(pending->datatype);
123 }
124 }
125
126 OBJ_CLASS_DECLARATION(osc_pt2pt_pending_acc_t);
127 OBJ_CLASS_INSTANCE(osc_pt2pt_pending_acc_t, opal_list_item_t,
128 osc_pt2pt_pending_acc_constructor, osc_pt2pt_pending_acc_destructor);
129
130
131
132
133
134
135
136
137
138 struct ompi_osc_pt2pt_ddt_buffer_t {
139
140
141 opal_list_item_t super;
142
143
144 ompi_osc_pt2pt_module_t *module;
145
146 int source;
147
148 ompi_osc_pt2pt_header_t *header;
149 };
150 typedef struct ompi_osc_pt2pt_ddt_buffer_t ompi_osc_pt2pt_ddt_buffer_t;
151
152 static void ompi_osc_pt2pt_ddt_buffer_constructor (ompi_osc_pt2pt_ddt_buffer_t *ddt_buffer)
153 {
154 ddt_buffer->header = NULL;
155 }
156
157 static void ompi_osc_pt2pt_ddt_buffer_destructor (ompi_osc_pt2pt_ddt_buffer_t *ddt_buffer)
158 {
159 if (ddt_buffer->header) {
160 free (ddt_buffer->header);
161 ddt_buffer->header = NULL;
162 }
163 }
164
165 OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_ddt_buffer_t);
166 OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_ddt_buffer_t, opal_list_item_t, ompi_osc_pt2pt_ddt_buffer_constructor,
167 ompi_osc_pt2pt_ddt_buffer_destructor);
168
169
170
171
172
173
174
175
176
177
178
179
180 static inline int datatype_buffer_length (ompi_datatype_t *datatype, int count)
181 {
182 ompi_datatype_t *primitive_datatype = NULL;
183 uint32_t primitive_count;
184 size_t buflen;
185
186 ompi_osc_base_get_primitive_type_info(datatype, &primitive_datatype, &primitive_count);
187 primitive_count *= count;
188
189
190 ompi_datatype_type_size(primitive_datatype, &buflen);
191
192 return buflen * primitive_count;
193 }
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211 int ompi_osc_pt2pt_control_send (ompi_osc_pt2pt_module_t *module, int target,
212 void *data, size_t len)
213 {
214 ompi_osc_pt2pt_frag_t *frag;
215 char *ptr;
216 int ret;
217
218 ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr, false, true);
219 if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
220 memcpy (ptr, data, len);
221
222 ret = ompi_osc_pt2pt_frag_finish(module, frag);
223 }
224
225 return ret;
226 }
227
228 static int ompi_osc_pt2pt_control_send_unbuffered_cb (ompi_request_t *request)
229 {
230 void *ctx = request->req_complete_cb_data;
231 ompi_osc_pt2pt_module_t *module;
232
233
234 module = *(ompi_osc_pt2pt_module_t **)ctx;
235
236
237 mark_outgoing_completion (module);
238
239
240 free (ctx);
241
242 ompi_request_free (&request);
243 return 1;
244 }
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263 int ompi_osc_pt2pt_control_send_unbuffered(ompi_osc_pt2pt_module_t *module,
264 int target, void *data, size_t len)
265 {
266 void *ctx, *data_copy;
267
268 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
269 "osc pt2pt: sending unbuffered fragment to %d", target));
270
271
272 ctx = malloc (sizeof(ompi_osc_pt2pt_module_t*) + len);
273 if (OPAL_UNLIKELY(NULL == ctx)) {
274 return OMPI_ERR_OUT_OF_RESOURCE;
275 }
276
277
278
279 ompi_osc_signal_outgoing (module, MPI_PROC_NULL, 1);
280
281
282 *(ompi_osc_pt2pt_module_t**)ctx = module;
283 data_copy = (ompi_osc_pt2pt_module_t**)ctx + 1;
284 memcpy (data_copy, data, len);
285
286 return ompi_osc_pt2pt_isend_w_cb (data_copy, len, MPI_BYTE, target, OSC_PT2PT_FRAG_TAG,
287 module->comm, ompi_osc_pt2pt_control_send_unbuffered_cb, ctx);
288 }
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304 static inline int datatype_create (ompi_osc_pt2pt_module_t *module, int peer, ompi_proc_t **proc, ompi_datatype_t **datatype, void **data)
305 {
306 ompi_datatype_t *new_datatype = NULL;
307 ompi_proc_t *peer_proc;
308 int ret = OMPI_SUCCESS;
309
310 do {
311 peer_proc = ompi_comm_peer_lookup(module->comm, peer);
312 if (OPAL_UNLIKELY(NULL == peer_proc)) {
313 OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
314 "%d: datatype_create: could not resolve proc pointer for peer %d",
315 ompi_comm_rank(module->comm),
316 peer));
317 ret = OMPI_ERROR;
318 break;
319 }
320
321 new_datatype = ompi_osc_base_datatype_create(peer_proc, data);
322 if (OPAL_UNLIKELY(NULL == new_datatype)) {
323 OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
324 "%d: datatype_create: could not resolve datatype for peer %d",
325 ompi_comm_rank(module->comm), peer));
326 ret = OMPI_ERROR;
327 }
328 } while (0);
329
330 *datatype = new_datatype;
331 if (proc) *proc = peer_proc;
332
333 return ret;
334 }
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349 static inline int process_put(ompi_osc_pt2pt_module_t* module, int source,
350 ompi_osc_pt2pt_header_put_t* put_header)
351 {
352 char *data = (char*) (put_header + 1);
353 ompi_proc_t *proc;
354 struct ompi_datatype_t *datatype;
355 size_t data_len;
356 void *target = (unsigned char*) module->baseptr +
357 ((unsigned long) put_header->displacement * module->disp_unit);
358 int ret;
359
360 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
361 "%d: process_put: received message from %d",
362 ompi_comm_rank(module->comm),
363 source));
364
365 ret = datatype_create (module, source, &proc, &datatype, (void **) &data);
366 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
367 return ret;
368 }
369
370 data_len = put_header->len - ((uintptr_t) data - (uintptr_t) put_header);
371
372 osc_pt2pt_copy_on_recv (target, data, data_len, proc, put_header->count, datatype);
373
374 OMPI_DATATYPE_RELEASE(datatype);
375
376 return put_header->len;
377 }
378
379 static inline int process_put_long(ompi_osc_pt2pt_module_t* module, int source,
380 ompi_osc_pt2pt_header_put_t* put_header)
381 {
382 char *data = (char*) (put_header + 1);
383 struct ompi_datatype_t *datatype;
384 void *target = (unsigned char*) module->baseptr +
385 ((unsigned long) put_header->displacement * module->disp_unit);
386 int ret;
387
388 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
389 "%d: process_put_long: received message from %d",
390 ompi_comm_rank(module->comm),
391 source));
392
393 ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
394 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
395 return ret;
396 }
397
398 ret = ompi_osc_pt2pt_component_irecv (module, target,
399 put_header->count,
400 datatype, source,
401 tag_to_target(put_header->tag),
402 module->comm);
403 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
404 OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
405 "%d: process_put_long: irecv error: %d",
406 ompi_comm_rank(module->comm),
407 ret));
408 return OMPI_ERROR;
409 }
410
411 OMPI_DATATYPE_RELEASE(datatype);
412
413 return put_header->len;
414 }
415
416
417
418
419
420
421
422
423
424
425
426
427
428 static int osc_pt2pt_incoming_req_complete (ompi_request_t *request)
429 {
430 ompi_osc_pt2pt_module_t *module = (ompi_osc_pt2pt_module_t *) request->req_complete_cb_data;
431 int rank = MPI_PROC_NULL;
432
433 if (request->req_status.MPI_TAG & 0x01) {
434 rank = request->req_status.MPI_SOURCE;
435 }
436
437 mark_incoming_completion (module, rank);
438
439 ompi_request_free (&request);
440 return 1;
441 }
442
443 struct osc_pt2pt_get_post_send_cb_data_t {
444 ompi_osc_pt2pt_module_t *module;
445 int peer;
446 };
447
448 static int osc_pt2pt_get_post_send_cb (ompi_request_t *request)
449 {
450 struct osc_pt2pt_get_post_send_cb_data_t *data =
451 (struct osc_pt2pt_get_post_send_cb_data_t *) request->req_complete_cb_data;
452 ompi_osc_pt2pt_module_t *module = data->module;
453 int rank = data->peer;
454
455 free (data);
456
457
458 mark_incoming_completion (module, rank);
459
460 ompi_request_free (&request);
461 return 1;
462 }
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478 static int osc_pt2pt_get_post_send (ompi_osc_pt2pt_module_t *module, void *source, int count,
479 ompi_datatype_t *datatype, int peer, int tag)
480 {
481 struct osc_pt2pt_get_post_send_cb_data_t *data;
482 int ret;
483
484 data = malloc (sizeof (*data));
485 if (OPAL_UNLIKELY(NULL == data)) {
486 return OMPI_ERR_OUT_OF_RESOURCE;
487 }
488
489 data->module = module;
490
491
492 data->peer = (tag & 0x1) ? peer : MPI_PROC_NULL;
493
494
495 ret = ompi_osc_pt2pt_isend_w_cb (source, count, datatype, peer, tag, module->comm,
496 osc_pt2pt_get_post_send_cb, (void *) data);
497 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
498 free (data);
499 }
500
501 return ret;
502 }
503
504
505
506
507
508
509
510
511
512
513 static inline int process_get (ompi_osc_pt2pt_module_t* module, int target,
514 ompi_osc_pt2pt_header_get_t* get_header)
515 {
516 char *data = (char *) (get_header + 1);
517 struct ompi_datatype_t *datatype;
518 void *source = (unsigned char*) module->baseptr +
519 ((unsigned long) get_header->displacement * module->disp_unit);
520 int ret;
521
522 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
523 "%d: process_get: received message from %d",
524 ompi_comm_rank(module->comm),
525 target));
526
527 ret = datatype_create (module, target, NULL, &datatype, (void **) &data);
528 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
529 return ret;
530 }
531
532
533 ret = osc_pt2pt_get_post_send (module, source, get_header->count, datatype,
534 target, tag_to_origin(get_header->tag));
535
536 OMPI_DATATYPE_RELEASE(datatype);
537
538 return OMPI_SUCCESS == ret ? (int) get_header->len : ret;
539 }
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554 static inline int osc_pt2pt_accumulate_buffer (void *target, void *source, size_t source_len, ompi_proc_t *proc,
555 int count, ompi_datatype_t *datatype, ompi_op_t *op)
556 {
557 int ret;
558
559 assert (NULL != target && NULL != source);
560
561 if (op == &ompi_mpi_op_replace.op) {
562 osc_pt2pt_copy_on_recv (target, source, source_len, proc, count, datatype);
563 return OMPI_SUCCESS;
564 }
565
566 #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
567 if (proc->super.proc_arch != ompi_proc_local()->super.proc_arch) {
568 ompi_datatype_t *primitive_datatype = NULL;
569 uint32_t primitive_count;
570 size_t buflen;
571 void *buffer;
572
573 ompi_osc_base_get_primitive_type_info(datatype, &primitive_datatype, &primitive_count);
574 primitive_count *= count;
575
576
577 ompi_datatype_type_size(primitive_datatype, &buflen);
578 buflen *= primitive_count;
579
580 buffer = malloc (buflen);
581 if (OPAL_UNLIKELY(NULL == buffer)) {
582 return OMPI_ERR_OUT_OF_RESOURCE;
583 }
584
585 osc_pt2pt_copy_on_recv (buffer, source, source_len, proc, primitive_count, primitive_datatype);
586
587 ret = ompi_osc_base_process_op(target, buffer, source_len, datatype,
588 count, op);
589
590 free(buffer);
591 } else
592 #endif
593
594
595 ret = ompi_osc_base_process_op(target, source, source_len, datatype,
596 count, op);
597
598 return ret;
599 }
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619 static int osc_pt2pt_accumulate_allocate (ompi_osc_pt2pt_module_t *module, int peer, void *target, void *source, size_t source_len,
620 ompi_proc_t *proc, int count, ompi_datatype_t *datatype, ompi_op_t *op,
621 int request_count, osc_pt2pt_accumulate_data_t **acc_data_out)
622 {
623 osc_pt2pt_accumulate_data_t *acc_data;
624
625 acc_data = OBJ_NEW(osc_pt2pt_accumulate_data_t);
626 if (OPAL_UNLIKELY(NULL == acc_data)) {
627 return OMPI_ERR_OUT_OF_RESOURCE;
628 }
629
630 acc_data->module = module;
631 acc_data->peer = peer;
632 acc_data->target = target;
633 acc_data->source = source;
634 acc_data->source_len = source_len;
635 acc_data->proc = proc;
636 acc_data->count = count;
637 acc_data->datatype = datatype;
638 OMPI_DATATYPE_RETAIN(datatype);
639 acc_data->op = op;
640 acc_data->request_count = request_count;
641
642 *acc_data_out = acc_data;
643
644 return OMPI_SUCCESS;
645 }
646
647
648
649
650
651
652
653
654
655
656 static int accumulate_cb (ompi_request_t *request)
657 {
658 struct osc_pt2pt_accumulate_data_t *acc_data = (struct osc_pt2pt_accumulate_data_t *) request->req_complete_cb_data;
659 ompi_osc_pt2pt_module_t *module = acc_data->module;
660 int rank = MPI_PROC_NULL;
661 int ret = OMPI_SUCCESS;
662
663 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
664 "accumulate_cb, request_count = %d", acc_data->request_count));
665
666 if (request->req_status.MPI_TAG & 0x01) {
667 rank = acc_data->peer;
668 }
669
670 if (0 == OPAL_THREAD_ADD_FETCH32(&acc_data->request_count, -1)) {
671
672
673 if (acc_data->source) {
674 ompi_datatype_t *primitive_datatype = NULL;
675 uint32_t primitive_count;
676
677 assert (NULL != acc_data->target && NULL != acc_data->source);
678
679 ompi_osc_base_get_primitive_type_info(acc_data->datatype, &primitive_datatype, &primitive_count);
680 primitive_count *= acc_data->count;
681
682 if (acc_data->op == &ompi_mpi_op_replace.op) {
683 ret = ompi_datatype_sndrcv(acc_data->source, primitive_count, primitive_datatype, acc_data->target, acc_data->count, acc_data->datatype);
684 } else {
685 ret = ompi_osc_base_process_op(acc_data->target, acc_data->source, acc_data->source_len, acc_data->datatype, acc_data->count, acc_data->op);
686 }
687 }
688
689
690 ompi_osc_pt2pt_accumulate_unlock (module);
691
692 osc_pt2pt_gc_add_buffer (module, &acc_data->super);
693 }
694
695 mark_incoming_completion (module, rank);
696
697 ompi_request_free (&request);
698 return ret;
699 }
700
701
702 static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_header_t *header, int source,
703 char *data, size_t data_len, ompi_datatype_t *datatype, bool active_target)
704 {
705 ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
706 osc_pt2pt_pending_acc_t *pending_acc;
707
708 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
709 "%d: queuing accumulate operation", ompi_comm_size (module->comm)));
710
711 pending_acc = OBJ_NEW(osc_pt2pt_pending_acc_t);
712 if (OPAL_UNLIKELY(NULL == pending_acc)) {
713 return OMPI_ERR_OUT_OF_RESOURCE;
714 }
715
716
717
718 if (active_target) {
719 OPAL_THREAD_ADD_FETCH32(&module->active_incoming_frag_count, -1);
720 } else {
721 OPAL_THREAD_ADD_FETCH32(&peer->passive_incoming_frag_count, -1);
722 }
723
724 pending_acc->active_target = active_target;
725 pending_acc->source = source;
726
727
728 pending_acc->data_len = data_len;
729
730 if (data_len) {
731 pending_acc->data = malloc (data_len);
732 memcpy (pending_acc->data, data, data_len);
733 }
734
735
736 pending_acc->datatype = datatype;
737 OMPI_DATATYPE_RETAIN(datatype);
738
739
740 switch (header->base.type) {
741 case OMPI_OSC_PT2PT_HDR_TYPE_ACC:
742 case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG:
743 case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC:
744 case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG:
745 pending_acc->header.acc = header->acc;
746 break;
747 case OMPI_OSC_PT2PT_HDR_TYPE_CSWAP:
748 pending_acc->header.cswap = header->cswap;
749 break;
750 default:
751
752 assert (0);
753 }
754
755
756 OPAL_THREAD_SCOPED_LOCK(&module->pending_acc_lock, opal_list_append (&module->pending_acc, &pending_acc->super));
757
758 return OMPI_SUCCESS;
759 }
760
761 static int replace_cb (ompi_request_t *request)
762 {
763 ompi_osc_pt2pt_module_t *module = (ompi_osc_pt2pt_module_t *) request->req_complete_cb_data;
764 int rank = MPI_PROC_NULL;
765
766 if (request->req_status.MPI_TAG & 0x01) {
767 rank = request->req_status.MPI_SOURCE;
768 }
769
770 mark_incoming_completion (module, rank);
771
772
773 ompi_osc_pt2pt_accumulate_unlock (module);
774
775 ompi_request_free (&request);
776 return 1;
777 }
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794 static int ompi_osc_pt2pt_acc_start (ompi_osc_pt2pt_module_t *module, int source, void *data, size_t data_len,
795 ompi_datatype_t *datatype, ompi_osc_pt2pt_header_acc_t *acc_header)
796 {
797 void *target = (unsigned char*) module->baseptr +
798 ((unsigned long) acc_header->displacement * module->disp_unit);
799 struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
800 ompi_proc_t *proc;
801 int ret;
802
803 proc = ompi_comm_peer_lookup(module->comm, source);
804 assert (NULL != proc);
805
806 ret = osc_pt2pt_accumulate_buffer (target, data, data_len, proc, acc_header->count,
807 datatype, op);
808
809 ompi_osc_pt2pt_accumulate_unlock (module);
810
811 return ret;
812 }
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827 static int ompi_osc_pt2pt_acc_long_start (ompi_osc_pt2pt_module_t *module, int source, ompi_datatype_t *datatype,
828 ompi_osc_pt2pt_header_acc_t *acc_header) {
829 struct osc_pt2pt_accumulate_data_t *acc_data;
830 size_t buflen;
831 void *buffer;
832 ompi_proc_t *proc;
833 void *target = (unsigned char*) module->baseptr +
834 ((unsigned long) acc_header->displacement * module->disp_unit);
835 struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
836 ompi_datatype_t *primitive_datatype;
837 uint32_t primitive_count;
838 int ret;
839
840 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
841 "ompi_osc_pt2pt_acc_long_start starting..."));
842
843 proc = ompi_comm_peer_lookup(module->comm, source);
844 assert (NULL != proc);
845
846 do {
847 if (op == &ompi_mpi_op_replace.op) {
848 ret = ompi_osc_pt2pt_irecv_w_cb (target, acc_header->count, datatype,
849 source, tag_to_target(acc_header->tag), module->comm,
850 NULL, replace_cb, module);
851 break;
852 }
853
854 ret = ompi_osc_base_get_primitive_type_info (datatype, &primitive_datatype, &primitive_count);
855 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
856 break;
857 }
858
859 primitive_count *= acc_header->count;
860
861 buflen = datatype_buffer_length (datatype, acc_header->count);
862
863
864 buffer = malloc (buflen);
865 if (OPAL_UNLIKELY(NULL == buffer)) {
866 ret = OMPI_ERR_OUT_OF_RESOURCE;
867 break;
868 }
869
870 ret = osc_pt2pt_accumulate_allocate (module, source, target, buffer, buflen, proc, acc_header->count,
871 datatype, op, 1, &acc_data);
872 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
873 free (buffer);
874 break;
875 }
876
877 ret = ompi_osc_pt2pt_irecv_w_cb (buffer, primitive_count, primitive_datatype,
878 source, tag_to_target(acc_header->tag), module->comm,
879 NULL, accumulate_cb, acc_data);
880 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
881 OBJ_RELEASE(acc_data);
882 }
883 } while (0);
884
885 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
886 ompi_osc_pt2pt_accumulate_unlock (module);
887 }
888
889 return ret;
890 }
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907 static int ompi_osc_pt2pt_gacc_start (ompi_osc_pt2pt_module_t *module, int source, void *data, size_t data_len,
908 ompi_datatype_t *datatype, ompi_osc_pt2pt_header_acc_t *acc_header)
909 {
910 void *target = (unsigned char*) module->baseptr +
911 ((unsigned long) acc_header->displacement * module->disp_unit);
912 struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
913 struct osc_pt2pt_accumulate_data_t *acc_data;
914 ompi_proc_t *proc;
915 int ret;
916
917 proc = ompi_comm_peer_lookup(module->comm, source);
918 assert (NULL != proc);
919
920 do {
921 ret = osc_pt2pt_accumulate_allocate (module, source, target, data, data_len, proc, acc_header->count,
922 datatype, op, 1, &acc_data);
923 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
924 break;
925 }
926
927 ret = ompi_osc_pt2pt_isend_w_cb (target, acc_header->count, datatype,
928 source, tag_to_origin(acc_header->tag), module->comm,
929 accumulate_cb, acc_data);
930 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
931 OBJ_RELEASE(acc_data);
932 }
933 } while (0);
934
935 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
936 ompi_osc_pt2pt_accumulate_unlock (module);
937 }
938
939 return ret;
940 }
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955 static int ompi_osc_gacc_long_start (ompi_osc_pt2pt_module_t *module, int source, ompi_datatype_t *datatype,
956 ompi_osc_pt2pt_header_acc_t *acc_header)
957 {
958 void *target = (unsigned char*) module->baseptr +
959 ((unsigned long) acc_header->displacement * module->disp_unit);
960 struct ompi_op_t *op = ompi_osc_base_op_create(acc_header->op);
961 struct osc_pt2pt_accumulate_data_t *acc_data;
962 ompi_datatype_t *primitive_datatype;
963 ompi_request_t *recv_request;
964 uint32_t primitive_count;
965 ompi_proc_t *proc;
966 size_t buflen;
967 void *buffer;
968 int ret;
969
970 proc = ompi_comm_peer_lookup(module->comm, source);
971 assert (NULL != proc);
972
973
974 buflen = datatype_buffer_length (datatype, acc_header->count);
975
976 do {
977 ret = ompi_osc_base_get_primitive_type_info (datatype, &primitive_datatype, &primitive_count);
978 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
979 break;
980 }
981
982 primitive_count *= acc_header->count;
983
984 buffer = malloc (buflen);
985 if (OPAL_UNLIKELY(NULL == buffer)) {
986 ret = OMPI_ERR_OUT_OF_RESOURCE;
987 break;
988 }
989
990 ret = osc_pt2pt_accumulate_allocate (module, source, target, buffer, buflen, proc, acc_header->count,
991 datatype, op, 2, &acc_data);
992 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
993 free (buffer);
994 break;
995 }
996
997 ret = ompi_osc_pt2pt_irecv_w_cb (buffer, acc_header->count, datatype,
998 source, tag_to_target(acc_header->tag), module->comm,
999 &recv_request, accumulate_cb, acc_data);
1000 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1001 OBJ_RELEASE(acc_data);
1002 break;
1003 }
1004
1005 ret = ompi_osc_pt2pt_isend_w_cb (target, primitive_count, primitive_datatype,
1006 source, tag_to_origin(acc_header->tag), module->comm,
1007 accumulate_cb, acc_data);
1008 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1009
1010 ompi_request_cancel (recv_request);
1011 OBJ_RELEASE(acc_data);
1012 break;
1013 }
1014 } while (0);
1015
1016 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1017 ompi_osc_pt2pt_accumulate_unlock (module);
1018 }
1019
1020 return ret;
1021 }
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039 static int ompi_osc_pt2pt_cswap_start (ompi_osc_pt2pt_module_t *module, int source, void *data, ompi_datatype_t *datatype,
1040 ompi_osc_pt2pt_header_cswap_t *cswap_header)
1041 {
1042 void *target = (unsigned char*) module->baseptr +
1043 ((unsigned long) cswap_header->displacement * module->disp_unit);
1044 void *compare_addr, *origin_addr;
1045 size_t datatype_size;
1046 ompi_proc_t *proc;
1047 int ret;
1048
1049 proc = ompi_comm_peer_lookup(module->comm, source);
1050 assert (NULL != proc);
1051
1052 datatype_size = datatype->super.size;
1053
1054 origin_addr = data;
1055 compare_addr = (void *)((uintptr_t) data + datatype_size);
1056
1057 do {
1058
1059 ret = MCA_PML_CALL(send(target, 1, datatype, source, tag_to_origin(cswap_header->tag),
1060 MCA_PML_BASE_SEND_STANDARD, module->comm));
1061 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1062 break;
1063 }
1064
1065
1066 mark_incoming_completion (module, (cswap_header->tag & 0x1) ? source : MPI_PROC_NULL);
1067
1068 if (0 == memcmp (target, compare_addr, datatype_size)) {
1069 osc_pt2pt_copy_on_recv (target, origin_addr, datatype_size, proc, 1, datatype);
1070 }
1071 } while (0);
1072
1073 ompi_osc_pt2pt_accumulate_unlock (module);
1074
1075 return ret;
1076 }
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088 int ompi_osc_pt2pt_progress_pending_acc (ompi_osc_pt2pt_module_t *module)
1089 {
1090 osc_pt2pt_pending_acc_t *pending_acc;
1091 int ret;
1092
1093
1094
1095 if (ompi_osc_pt2pt_accumulate_trylock (module)) {
1096 return OMPI_SUCCESS;
1097 }
1098
1099 OPAL_THREAD_LOCK(&module->pending_acc_lock);
1100 pending_acc = (osc_pt2pt_pending_acc_t *) opal_list_remove_first (&module->pending_acc);
1101 OPAL_THREAD_UNLOCK(&module->pending_acc_lock);
1102 if (OPAL_UNLIKELY(NULL == pending_acc)) {
1103
1104 ompi_osc_pt2pt_accumulate_unlock (module);
1105 return OMPI_SUCCESS;
1106 }
1107
1108 switch (pending_acc->header.base.type) {
1109 case OMPI_OSC_PT2PT_HDR_TYPE_ACC:
1110 ret = ompi_osc_pt2pt_acc_start (module, pending_acc->source, pending_acc->data, pending_acc->data_len,
1111 pending_acc->datatype, &pending_acc->header.acc);
1112 free (pending_acc->data);
1113 break;
1114 case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG:
1115 ret = ompi_osc_pt2pt_acc_long_start (module, pending_acc->source, pending_acc->datatype,
1116 &pending_acc->header.acc);
1117 break;
1118 case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC:
1119 ret = ompi_osc_pt2pt_gacc_start (module, pending_acc->source, pending_acc->data,
1120 pending_acc->data_len, pending_acc->datatype,
1121 &pending_acc->header.acc);
1122 break;
1123 case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG:
1124 ret = ompi_osc_gacc_long_start (module, pending_acc->source, pending_acc->datatype,
1125 &pending_acc->header.acc);
1126 break;
1127 case OMPI_OSC_PT2PT_HDR_TYPE_CSWAP:
1128 ret = ompi_osc_pt2pt_cswap_start (module, pending_acc->source, pending_acc->data,
1129 pending_acc->datatype, &pending_acc->header.cswap);
1130 break;
1131 default:
1132 ret = OMPI_ERROR;
1133
1134 assert (0);
1135 }
1136
1137
1138 mark_incoming_completion (module, pending_acc->active_target ? MPI_PROC_NULL : pending_acc->source);
1139
1140 pending_acc->data = NULL;
1141 OBJ_RELEASE(pending_acc);
1142
1143 return ret;
1144 }
1145
1146 static inline int process_acc (ompi_osc_pt2pt_module_t *module, int source,
1147 ompi_osc_pt2pt_header_acc_t *acc_header)
1148 {
1149 bool active_target = !(acc_header->tag & 0x1);
1150 char *data = (char *) (acc_header + 1);
1151 struct ompi_datatype_t *datatype;
1152 uint64_t data_len;
1153 int ret;
1154
1155 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1156 "%d: process_acc: received message from %d",
1157 ompi_comm_rank(module->comm),
1158 source));
1159
1160 ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
1161 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1162 return ret;
1163 }
1164
1165 data_len = acc_header->len - ((char*) data - (char*) acc_header);
1166
1167
1168 if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) {
1169 ret = ompi_osc_pt2pt_acc_start (module, source, data, data_len, datatype,
1170 acc_header);
1171 } else {
1172
1173 ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header,
1174 source, data, data_len, datatype, active_target);
1175 }
1176
1177
1178 OMPI_DATATYPE_RELEASE(datatype);
1179
1180 return (OMPI_SUCCESS == ret) ? (int) acc_header->len : ret;
1181 }
1182
1183 static inline int process_acc_long (ompi_osc_pt2pt_module_t* module, int source,
1184 ompi_osc_pt2pt_header_acc_t* acc_header)
1185 {
1186 bool active_target = !(acc_header->tag & 0x1);
1187 char *data = (char *) (acc_header + 1);
1188 struct ompi_datatype_t *datatype;
1189 int ret;
1190
1191 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1192 "%d: process_acc_long: received message from %d",
1193 ompi_comm_rank(module->comm),
1194 source));
1195
1196 ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
1197 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1198 return ret;
1199 }
1200
1201 if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) {
1202 ret = ompi_osc_pt2pt_acc_long_start (module, source, datatype, acc_header);
1203 } else {
1204
1205 ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header, source,
1206 NULL, 0, datatype, active_target);
1207 }
1208
1209
1210 OMPI_DATATYPE_RELEASE(datatype);
1211
1212 return (OMPI_SUCCESS == ret) ? (int) acc_header->len : ret;
1213 }
1214
1215 static inline int process_get_acc(ompi_osc_pt2pt_module_t *module, int source,
1216 ompi_osc_pt2pt_header_acc_t *acc_header)
1217 {
1218 bool active_target = !(acc_header->tag & 0x1);
1219 char *data = (char *) (acc_header + 1);
1220 struct ompi_datatype_t *datatype;
1221 void *buffer = NULL;
1222 uint64_t data_len;
1223 ompi_proc_t * proc;
1224 int ret;
1225
1226 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1227 "%d: process_get_acc: received message from %d",
1228 ompi_comm_rank(module->comm),
1229 source));
1230
1231 ret = datatype_create (module, source, &proc, &datatype, (void **) &data);
1232 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1233 return ret;
1234 }
1235
1236 data_len = acc_header->len - ((char*) data - (char*) acc_header);
1237
1238 if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) {
1239
1240 if (data_len) {
1241 ompi_datatype_t *primitive_datatype = NULL;
1242 uint32_t primitive_count;
1243 buffer = malloc (data_len);
1244 if (OPAL_UNLIKELY(NULL == buffer)) {
1245 OMPI_DATATYPE_RELEASE(datatype);
1246 return OMPI_ERR_OUT_OF_RESOURCE;
1247 }
1248
1249 ompi_osc_base_get_primitive_type_info(datatype, &primitive_datatype, &primitive_count);
1250 primitive_count *= acc_header->count;
1251
1252 osc_pt2pt_copy_on_recv (buffer, data, data_len, proc, primitive_count, primitive_datatype);
1253 }
1254
1255 ret = ompi_osc_pt2pt_gacc_start (module, source, buffer, data_len, datatype,
1256 acc_header);
1257 } else {
1258
1259 ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header,
1260 source, data, data_len, datatype, active_target);
1261 }
1262
1263
1264 OMPI_DATATYPE_RELEASE(datatype);
1265
1266 return (OMPI_SUCCESS == ret) ? (int) acc_header->len : ret;
1267 }
1268
1269 static inline int process_get_acc_long(ompi_osc_pt2pt_module_t *module, int source,
1270 ompi_osc_pt2pt_header_acc_t *acc_header)
1271 {
1272 bool active_target = !(acc_header->tag & 0x1);
1273 char *data = (char *) (acc_header + 1);
1274 struct ompi_datatype_t *datatype;
1275 int ret;
1276
1277 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1278 "%d: process_acc: received message from %d",
1279 ompi_comm_rank(module->comm),
1280 source));
1281
1282 ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
1283 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1284 return ret;
1285 }
1286
1287 if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) {
1288 ret = ompi_osc_gacc_long_start (module, source, datatype, acc_header);
1289 } else {
1290
1291 ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) acc_header,
1292 source, NULL, 0, datatype, active_target);
1293 }
1294
1295
1296 OMPI_DATATYPE_RELEASE(datatype);
1297
1298 return OMPI_SUCCESS == ret ? (int) acc_header->len : ret;
1299 }
1300
1301
1302 static inline int process_cswap (ompi_osc_pt2pt_module_t *module, int source,
1303 ompi_osc_pt2pt_header_cswap_t *cswap_header)
1304 {
1305 bool active_target = !(cswap_header->tag & 0x1);
1306 char *data = (char*) (cswap_header + 1);
1307 struct ompi_datatype_t *datatype;
1308 int ret;
1309
1310 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1311 "%d: process_cswap: received message from %d",
1312 ompi_comm_rank(module->comm),
1313 source));
1314
1315 ret = datatype_create (module, source, NULL, &datatype, (void **) &data);
1316 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1317 return ret;
1318 }
1319
1320 if (0 == ompi_osc_pt2pt_accumulate_trylock (module)) {
1321 ret = ompi_osc_pt2pt_cswap_start (module, source, data, datatype, cswap_header);
1322 } else {
1323
1324 ret = ompi_osc_pt2pt_acc_op_queue (module, (ompi_osc_pt2pt_header_t *) cswap_header, source,
1325 data, 2 * datatype->super.size, datatype, active_target);
1326 }
1327
1328
1329 OMPI_DATATYPE_RELEASE(datatype);
1330
1331 return (OMPI_SUCCESS == ret) ? (int) cswap_header->len : ret;
1332 }
1333
1334 static inline int process_complete (ompi_osc_pt2pt_module_t *module, int source,
1335 ompi_osc_pt2pt_header_complete_t *complete_header)
1336 {
1337
1338 osc_pt2pt_incoming_complete (module, source, complete_header->frag_count + 1);
1339
1340 return sizeof (*complete_header);
1341 }
1342
1343
1344
1345
1346 static inline int process_flush (ompi_osc_pt2pt_module_t *module, int source,
1347 ompi_osc_pt2pt_header_flush_t *flush_header)
1348 {
1349 ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
1350 int ret;
1351
1352 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1353 "process_flush header = {.frag_count = %d}", flush_header->frag_count));
1354
1355
1356 OPAL_THREAD_ADD_FETCH32(&peer->passive_incoming_frag_count, -(int32_t) flush_header->frag_count);
1357
1358 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1359 "%d: process_flush: received message from %d. passive_incoming_frag_count = %d",
1360 ompi_comm_rank(module->comm), source, peer->passive_incoming_frag_count));
1361
1362 ret = ompi_osc_pt2pt_process_flush (module, source, flush_header);
1363 if (OMPI_SUCCESS != ret) {
1364 ompi_osc_pt2pt_pending_t *pending;
1365
1366 pending = OBJ_NEW(ompi_osc_pt2pt_pending_t);
1367 pending->module = module;
1368 pending->source = source;
1369 pending->header.flush = *flush_header;
1370
1371 osc_pt2pt_add_pending (pending);
1372 }
1373
1374
1375 OPAL_THREAD_ADD_FETCH32(&peer->passive_incoming_frag_count, -1);
1376
1377 return sizeof (*flush_header);
1378 }
1379
1380 static inline int process_unlock (ompi_osc_pt2pt_module_t *module, int source,
1381 ompi_osc_pt2pt_header_unlock_t *unlock_header)
1382 {
1383 ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
1384 int ret;
1385
1386 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1387 "process_unlock header = {.frag_count = %d}", unlock_header->frag_count));
1388
1389
1390 OPAL_THREAD_ADD_FETCH32(&peer->passive_incoming_frag_count, -(int32_t) unlock_header->frag_count);
1391
1392 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
1393 "osc pt2pt: processing unlock request from %d. frag count = %d, processed_count = %d",
1394 source, unlock_header->frag_count, (int) peer->passive_incoming_frag_count));
1395
1396 ret = ompi_osc_pt2pt_process_unlock (module, source, unlock_header);
1397 if (OMPI_SUCCESS != ret) {
1398 ompi_osc_pt2pt_pending_t *pending;
1399
1400 pending = OBJ_NEW(ompi_osc_pt2pt_pending_t);
1401 pending->module = module;
1402 pending->source = source;
1403 pending->header.unlock = *unlock_header;
1404
1405 osc_pt2pt_add_pending (pending);
1406 }
1407
1408
1409 OPAL_THREAD_ADD_FETCH32(&peer->passive_incoming_frag_count, -1);
1410
1411 return sizeof (*unlock_header);
1412 }
1413
1414 static int process_large_datatype_request_cb (ompi_request_t *request)
1415 {
1416 ompi_osc_pt2pt_ddt_buffer_t *ddt_buffer = (ompi_osc_pt2pt_ddt_buffer_t *) request->req_complete_cb_data;
1417 ompi_osc_pt2pt_module_t *module = ddt_buffer->module;
1418 ompi_osc_pt2pt_header_t *header = ddt_buffer->header;
1419 int source = ddt_buffer->source;
1420
1421
1422 switch (header->base.type) {
1423 case OMPI_OSC_PT2PT_HDR_TYPE_PUT_LONG:
1424 (void) process_put_long (module, source, &header->put);
1425 break;
1426 case OMPI_OSC_PT2PT_HDR_TYPE_GET:
1427 (void) process_get (module, source, &header->get);
1428 break;
1429 case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG:
1430 (void) process_acc_long (module, source, &header->acc);
1431 break;
1432 case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG:
1433 (void) process_get_acc_long (module, source, &header->acc);
1434 break;
1435 default:
1436
1437 assert (0);
1438 return OMPI_ERROR;
1439 }
1440
1441
1442 osc_pt2pt_gc_add_buffer (module, &ddt_buffer->super);
1443
1444 ompi_request_free (&request);
1445 return 1;
1446 }
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460 static int process_large_datatype_request (ompi_osc_pt2pt_module_t *module, int source, ompi_osc_pt2pt_header_t *header)
1461 {
1462 ompi_osc_pt2pt_ddt_buffer_t *ddt_buffer;
1463 int header_len, tag, ret;
1464 uint64_t ddt_len;
1465
1466
1467 switch (header->base.type) {
1468 case OMPI_OSC_PT2PT_HDR_TYPE_PUT_LONG:
1469 header_len = sizeof (header->put);
1470 tag = header->put.tag;
1471 break;
1472 case OMPI_OSC_PT2PT_HDR_TYPE_GET:
1473 header_len = sizeof (header->get);
1474 tag = header->get.tag;
1475 break;
1476 case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG:
1477 header_len = sizeof (header->acc);
1478 tag = header->acc.tag;
1479 break;
1480 case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG:
1481 header_len = sizeof (header->acc);
1482 tag = header->acc.tag;
1483 break;
1484 default:
1485
1486 opal_output (0, "Unsupported header/flag combination");
1487 return OMPI_ERROR;
1488 }
1489
1490 ddt_len = *((uint64_t *)((uintptr_t) header + header_len));
1491
1492 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
1493 "process_large_datatype_request: processing fragment with type %d. ddt_len %lu",
1494 header->base.type, (unsigned long) ddt_len));
1495
1496 ddt_buffer = OBJ_NEW(ompi_osc_pt2pt_ddt_buffer_t);
1497 if (OPAL_UNLIKELY(NULL == ddt_buffer)) {
1498 return OMPI_ERR_OUT_OF_RESOURCE;
1499 }
1500
1501 ddt_buffer->module = module;
1502 ddt_buffer->source = source;
1503
1504 ddt_buffer->header = malloc (ddt_len + header_len);
1505 if (OPAL_UNLIKELY(NULL == ddt_buffer->header)) {
1506 OBJ_RELEASE(ddt_buffer);
1507 return OMPI_ERR_OUT_OF_RESOURCE;
1508 }
1509
1510 memcpy (ddt_buffer->header, header, header_len);
1511
1512 ret = ompi_osc_pt2pt_irecv_w_cb ((void *)((uintptr_t) ddt_buffer->header + header_len),
1513 ddt_len, MPI_BYTE,
1514 source, tag_to_target(tag), module->comm,
1515 NULL, process_large_datatype_request_cb, ddt_buffer);
1516 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1517 OBJ_RELEASE(ddt_buffer);
1518 return ret;
1519 }
1520
1521 return header_len + 8;
1522 }
1523
1524
1525
1526
1527 static inline int process_frag (ompi_osc_pt2pt_module_t *module,
1528 ompi_osc_pt2pt_frag_header_t *frag)
1529 {
1530 ompi_osc_pt2pt_header_t *header;
1531 int ret;
1532
1533 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1534 "osc pt2pt: process_frag: from %d, ops %d",
1535 (int) frag->source, (int) frag->num_ops));
1536
1537 header = (ompi_osc_pt2pt_header_t *) (frag + 1);
1538
1539 for (int i = 0 ; i < frag->num_ops ; ++i) {
1540 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1541 "osc pt2pt: process_frag: type 0x%x. flag 0x%x. offset %u",
1542 header->base.type, (unsigned) ((uintptr_t)header - (uintptr_t)frag),
1543 header->base.flags));
1544
1545 if (OPAL_LIKELY(!(header->base.flags & OMPI_OSC_PT2PT_HDR_FLAG_LARGE_DATATYPE))) {
1546 osc_pt2pt_ntoh(header);
1547 switch (header->base.type) {
1548 case OMPI_OSC_PT2PT_HDR_TYPE_PUT:
1549 ret = process_put(module, frag->source, &header->put);
1550 break;
1551 case OMPI_OSC_PT2PT_HDR_TYPE_PUT_LONG:
1552 ret = process_put_long(module, frag->source, &header->put);
1553 break;
1554
1555 case OMPI_OSC_PT2PT_HDR_TYPE_ACC:
1556 ret = process_acc(module, frag->source, &header->acc);
1557 break;
1558 case OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG:
1559 ret = process_acc_long (module, frag->source, &header->acc);
1560 break;
1561
1562 case OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_REQ:
1563 ret = process_unlock(module, frag->source, &header->unlock);
1564 break;
1565
1566 case OMPI_OSC_PT2PT_HDR_TYPE_GET:
1567 ret = process_get (module, frag->source, &header->get);
1568 break;
1569
1570 case OMPI_OSC_PT2PT_HDR_TYPE_CSWAP:
1571 ret = process_cswap (module, frag->source, &header->cswap);
1572 break;
1573
1574 case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC:
1575 ret = process_get_acc (module, frag->source, &header->acc);
1576 break;
1577
1578 case OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG:
1579 ret = process_get_acc_long (module, frag->source, &header->acc);
1580 break;
1581
1582 case OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_REQ:
1583 ret = process_flush (module, frag->source, &header->flush);
1584 break;
1585
1586 case OMPI_OSC_PT2PT_HDR_TYPE_COMPLETE:
1587 ret = process_complete (module, frag->source, &header->complete);
1588 break;
1589
1590 default:
1591 opal_output(0, "Unsupported fragment type 0x%x\n", header->base.type);
1592 abort();
1593 }
1594 } else {
1595 ret = process_large_datatype_request (module, frag->source, header);
1596 }
1597
1598 if (ret <= 0) {
1599 opal_output(0, "Error processing fragment: %d", ret);
1600 abort();
1601 }
1602
1603
1604
1605 header = (ompi_osc_pt2pt_header_t *) OPAL_ALIGN(((uintptr_t) header + ret), 8, uintptr_t);
1606 }
1607
1608 return OMPI_SUCCESS;
1609 }
1610
1611
1612 static int ompi_osc_pt2pt_callback (ompi_request_t *request)
1613 {
1614 ompi_osc_pt2pt_receive_t *recv = (ompi_osc_pt2pt_receive_t *) request->req_complete_cb_data;
1615
1616 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "received pt2pt fragment"));
1617
1618
1619
1620 OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.pending_receives_lock);
1621 opal_list_append (&mca_osc_pt2pt_component.pending_receives, &recv->super);
1622 OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.pending_receives_lock);
1623
1624 return OMPI_SUCCESS;
1625 }
1626
1627 static int ompi_osc_pt2pt_receive_repost (ompi_osc_pt2pt_receive_t *recv)
1628 {
1629
1630 ompi_request_wait_completion (recv->pml_request);
1631
1632
1633 recv->pml_request->req_complete_cb = ompi_osc_pt2pt_callback;
1634 recv->pml_request->req_complete_cb_data = (void *) recv;
1635
1636 return MCA_PML_CALL(start(1, &recv->pml_request));
1637 }
1638
1639 int ompi_osc_pt2pt_process_receive (ompi_osc_pt2pt_receive_t *recv)
1640 {
1641 ompi_osc_pt2pt_module_t *module = (ompi_osc_pt2pt_module_t *) recv->module;
1642 ompi_osc_pt2pt_header_t *base_header = (ompi_osc_pt2pt_header_t *) recv->buffer;
1643 size_t incoming_length = recv->pml_request->req_status._ucount;
1644 int source = recv->pml_request->req_status.MPI_SOURCE;
1645 int rc __opal_attribute_unused__;
1646
1647 assert(incoming_length >= sizeof(ompi_osc_pt2pt_header_base_t));
1648 (void)incoming_length;
1649
1650 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1651 "received pt2pt callback for fragment. source = %d, count = %u, type = 0x%x",
1652 source, (unsigned) incoming_length, base_header->base.type));
1653
1654 osc_pt2pt_ntoh(base_header);
1655 switch (base_header->base.type) {
1656 case OMPI_OSC_PT2PT_HDR_TYPE_FRAG:
1657 process_frag(module, (ompi_osc_pt2pt_frag_header_t *) base_header);
1658
1659
1660 mark_incoming_completion (module, (base_header->base.flags & OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET) ?
1661 source : MPI_PROC_NULL);
1662 break;
1663 case OMPI_OSC_PT2PT_HDR_TYPE_POST:
1664 osc_pt2pt_incoming_post (module, source);
1665 break;
1666 case OMPI_OSC_PT2PT_HDR_TYPE_LOCK_REQ:
1667 ompi_osc_pt2pt_process_lock(module, source, (ompi_osc_pt2pt_header_lock_t *) base_header);
1668 break;
1669 case OMPI_OSC_PT2PT_HDR_TYPE_LOCK_ACK:
1670 ompi_osc_pt2pt_process_lock_ack(module, (ompi_osc_pt2pt_header_lock_ack_t *) base_header);
1671 break;
1672 case OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_ACK:
1673 ompi_osc_pt2pt_process_flush_ack (module, source, (ompi_osc_pt2pt_header_flush_ack_t *) base_header);
1674 break;
1675 case OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_ACK:
1676 ompi_osc_pt2pt_process_unlock_ack (module, source, (ompi_osc_pt2pt_header_unlock_ack_t *) base_header);
1677 break;
1678 default:
1679 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1680 "received unexpected message of type %x",
1681 (int) base_header->base.type));
1682 }
1683
1684 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1685 "finished processing incoming messages"));
1686
1687 osc_pt2pt_gc_clean (module);
1688
1689 rc = ompi_osc_pt2pt_receive_repost (recv);
1690
1691 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1692 "finished posting receive request. rc: %d", rc));
1693
1694 return OMPI_SUCCESS;
1695 }
1696
1697 int ompi_osc_pt2pt_frag_start_receive (ompi_osc_pt2pt_module_t *module)
1698 {
1699 int rc;
1700
1701 module->recv_frag_count = mca_osc_pt2pt_component.receive_count;
1702 if (0 == module->recv_frag_count) {
1703 module->recv_frag_count = 1;
1704 }
1705
1706 module->recv_frags = malloc (sizeof (module->recv_frags[0]) * module->recv_frag_count);
1707 if (NULL == module->recv_frags) {
1708 return OMPI_ERR_OUT_OF_RESOURCE;
1709 }
1710
1711 for (unsigned int i = 0 ; i < module->recv_frag_count ; ++i) {
1712 OBJ_CONSTRUCT(module->recv_frags + i, ompi_osc_pt2pt_receive_t);
1713 module->recv_frags[i].module = module;
1714 module->recv_frags[i].buffer = malloc (mca_osc_pt2pt_component.buffer_size + sizeof (ompi_osc_pt2pt_frag_header_t));
1715 if (NULL == module->recv_frags[i].buffer) {
1716 return OMPI_ERR_OUT_OF_RESOURCE;
1717 }
1718
1719 rc = ompi_osc_pt2pt_irecv_w_cb (module->recv_frags[i].buffer, mca_osc_pt2pt_component.buffer_size + sizeof (ompi_osc_pt2pt_frag_header_t),
1720 MPI_BYTE, OMPI_ANY_SOURCE, OSC_PT2PT_FRAG_TAG, module->comm, &module->recv_frags[i].pml_request,
1721 ompi_osc_pt2pt_callback, module->recv_frags + i);
1722 if (OMPI_SUCCESS != rc) {
1723 return rc;
1724 }
1725 }
1726
1727 return OMPI_SUCCESS;
1728 }
1729
1730 int ompi_osc_pt2pt_component_irecv (ompi_osc_pt2pt_module_t *module, void *buf,
1731 size_t count, struct ompi_datatype_t *datatype,
1732 int src, int tag, struct ompi_communicator_t *comm)
1733 {
1734 return ompi_osc_pt2pt_irecv_w_cb (buf, count, datatype, src, tag, comm, NULL,
1735 osc_pt2pt_incoming_req_complete, module);
1736 }
1737
1738 int ompi_osc_pt2pt_isend_w_cb (const void *ptr, int count, ompi_datatype_t *datatype, int target, int tag,
1739 ompi_communicator_t *comm, ompi_request_complete_fn_t cb, void *ctx)
1740 {
1741 ompi_request_t *request;
1742 int ret;
1743
1744 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1745 "osc pt2pt: ompi_osc_pt2pt_isend_w_cb sending %d bytes to %d with tag %d",
1746 count, target, tag));
1747
1748 ret = MCA_PML_CALL(isend_init((void *)ptr, count, datatype, target, tag,
1749 MCA_PML_BASE_SEND_STANDARD, comm, &request));
1750 if (OMPI_SUCCESS != ret) {
1751 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1752 "error sending fragment. ret = %d", ret));
1753 return ret;
1754 }
1755
1756 request->req_complete_cb = cb;
1757 request->req_complete_cb_data = ctx;
1758
1759 ret = MCA_PML_CALL(start(1, &request));
1760
1761 return ret;
1762 }
1763
1764 int ompi_osc_pt2pt_irecv_w_cb (void *ptr, int count, ompi_datatype_t *datatype, int target, int tag,
1765 ompi_communicator_t *comm, ompi_request_t **request_out,
1766 ompi_request_complete_fn_t cb, void *ctx)
1767 {
1768 ompi_request_t *dummy;
1769 int ret;
1770
1771 if (NULL == request_out) {
1772 request_out = &dummy;
1773 }
1774
1775 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1776 "osc pt2pt: ompi_osc_pt2pt_irecv_w_cb receiving %d bytes from %d with tag %d",
1777 count, target, tag));
1778
1779 ret = MCA_PML_CALL(irecv_init(ptr, count, datatype, target, tag, comm, request_out));
1780 if (OMPI_SUCCESS != ret) {
1781 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1782 "error posting receive. ret = %d", ret));
1783 return ret;
1784 }
1785
1786 (*request_out)->req_complete_cb = cb;
1787 (*request_out)->req_complete_cb_data = ctx;
1788
1789 ret = MCA_PML_CALL(start(1, request_out));
1790
1791 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1792 "osc pt2pt: pml start returned %d", ret));
1793
1794 return ret;
1795 }