This source file includes following definitions.
- ompi_osc_pt2pt_comm_complete
- ompi_osc_pt2pt_req_comm_complete
- ompi_osc_pt2pt_data_isend
- ompi_osc_pt2pt_dt_send_complete
- ompi_osc_pt2pt_put_self
- ompi_osc_pt2pt_get_self
- ompi_osc_pt2pt_cas_self
- ompi_osc_pt2pt_acc_self
- ompi_osc_pt2pt_gacc_self
- ompi_osc_pt2pt_put_w_req
- ompi_osc_pt2pt_put
- ompi_osc_pt2pt_accumulate_w_req
- ompi_osc_pt2pt_accumulate
- ompi_osc_pt2pt_compare_and_swap
- ompi_osc_pt2pt_fetch_and_op
- ompi_osc_pt2pt_rput
- ompi_osc_pt2pt_rget_internal
- ompi_osc_pt2pt_rget
- ompi_osc_pt2pt_get
- ompi_osc_pt2pt_raccumulate
- ompi_osc_pt2pt_rget_accumulate_internal
- ompi_osc_pt2pt_get_accumulate
- ompi_osc_pt2pt_rget_accumulate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 #include "osc_pt2pt.h"
27 #include "osc_pt2pt_request.h"
28 #include "osc_pt2pt_header.h"
29 #include "osc_pt2pt_frag.h"
30 #include "osc_pt2pt_data_move.h"
31
32 #include "opal_stdint.h"
33 #include "ompi/memchecker.h"
34 #include "ompi/mca/osc/base/osc_base_obj_convert.h"
35
36 #include <stdio.h>
37
38
39 static int ompi_osc_pt2pt_comm_complete (ompi_request_t *request)
40 {
41 ompi_osc_pt2pt_module_t *module =
42 (ompi_osc_pt2pt_module_t*) request->req_complete_cb_data;
43
44 OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output,
45 "isend_completion_cb called"));
46
47 mark_outgoing_completion(module);
48
49 ompi_request_free (&request);
50
51 return 1;
52 }
53
54 static int ompi_osc_pt2pt_req_comm_complete (ompi_request_t *request)
55 {
56 ompi_osc_pt2pt_request_t *pt2pt_request = (ompi_osc_pt2pt_request_t *) request->req_complete_cb_data;
57
58 OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output,
59 "ompi_osc_pt2pt_req_comm_complete called tag = %d",
60 request->req_status.MPI_TAG));
61
62
63 request->req_complete_cb_data = pt2pt_request->module;
64
65 if (0 == OPAL_THREAD_ADD_FETCH32(&pt2pt_request->outstanding_requests, -1)) {
66 ompi_osc_pt2pt_request_complete (pt2pt_request, request->req_status.MPI_ERROR);
67 }
68
69 return ompi_osc_pt2pt_comm_complete (request);
70 }
71
72 static inline int ompi_osc_pt2pt_data_isend (ompi_osc_pt2pt_module_t *module, const void *buf,
73 size_t count, ompi_datatype_t *datatype, int dest,
74 int tag, ompi_osc_pt2pt_request_t *request)
75 {
76
77 ompi_osc_signal_outgoing (module, dest, 1);
78
79 if (NULL != request) {
80 ++request->outstanding_requests;
81 return ompi_osc_pt2pt_isend_w_cb (buf, count, datatype, dest, tag, module->comm,
82 ompi_osc_pt2pt_req_comm_complete, request);
83 }
84
85 return ompi_osc_pt2pt_isend_w_cb (buf, count, datatype, dest, tag, module->comm,
86 ompi_osc_pt2pt_comm_complete, module);
87 }
88
89
90 static int ompi_osc_pt2pt_dt_send_complete (ompi_request_t *request)
91 {
92 ompi_datatype_t *datatype = (ompi_datatype_t *) request->req_complete_cb_data;
93 ompi_osc_pt2pt_module_t *module = NULL;
94
95 OMPI_DATATYPE_RELEASE(datatype);
96
97 OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.lock);
98 (void) opal_hash_table_get_value_uint32(&mca_osc_pt2pt_component.modules,
99 ompi_comm_get_cid(request->req_mpi_object.comm),
100 (void **) &module);
101 OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.lock);
102 assert (NULL != module);
103
104 ompi_request_free (&request);
105
106 return 1;
107 }
108
109
110 static inline int ompi_osc_pt2pt_put_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, const void *source, int source_count,
111 ompi_datatype_t *source_datatype, ptrdiff_t target_disp, int target_count,
112 ompi_datatype_t *target_datatype, ompi_osc_pt2pt_module_t *module,
113 ompi_osc_pt2pt_request_t *request)
114 {
115 void *target = (unsigned char*) module->baseptr +
116 ((unsigned long) target_disp * module->disp_unit);
117 int ret;
118
119
120 ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
121
122 ret = ompi_datatype_sndrcv ((void *)source, source_count, source_datatype,
123 target, target_count, target_datatype);
124 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
125 return ret;
126 }
127
128 if (request) {
129 ompi_osc_pt2pt_request_complete (request, MPI_SUCCESS);
130 }
131
132 return OMPI_SUCCESS;
133 }
134
135 static inline int ompi_osc_pt2pt_get_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, void *target, int target_count, ompi_datatype_t *target_datatype,
136 ptrdiff_t source_disp, int source_count, ompi_datatype_t *source_datatype,
137 ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_request_t *request)
138 {
139 void *source = (unsigned char*) module->baseptr +
140 ((unsigned long) source_disp * module->disp_unit);
141 int ret;
142
143
144 ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
145
146 ret = ompi_datatype_sndrcv (source, source_count, source_datatype,
147 target, target_count, target_datatype);
148 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
149 return ret;
150 }
151
152 if (request) {
153 ompi_osc_pt2pt_request_complete (request, MPI_SUCCESS);
154 }
155
156 return OMPI_SUCCESS;
157 }
158
159 static inline int ompi_osc_pt2pt_cas_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, const void *source, const void *compare, void *result,
160 ompi_datatype_t *datatype, ptrdiff_t target_disp, ompi_osc_pt2pt_module_t *module)
161 {
162 void *target = (unsigned char*) module->baseptr +
163 ((unsigned long) target_disp * module->disp_unit);
164
165
166 ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
167
168 ompi_osc_pt2pt_accumulate_lock (module);
169
170 memcpy (result, target, datatype->super.size);
171
172 if (0 == memcmp (compare, target, datatype->super.size)) {
173 memcpy (target, source, datatype->super.size);
174 }
175
176 ompi_osc_pt2pt_accumulate_unlock (module);
177
178 return OMPI_SUCCESS;
179 }
180
181 static inline int ompi_osc_pt2pt_acc_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, const void *source, int source_count, ompi_datatype_t *source_datatype,
182 ptrdiff_t target_disp, int target_count, ompi_datatype_t *target_datatype,
183 ompi_op_t *op, ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_request_t *request)
184 {
185 void *target = (unsigned char*) module->baseptr +
186 ((unsigned long) target_disp * module->disp_unit);
187 int ret;
188
189
190 ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
191
192 ompi_osc_pt2pt_accumulate_lock (module);
193
194 if (&ompi_mpi_op_replace.op != op) {
195 ret = ompi_osc_base_sndrcv_op (source, source_count, source_datatype, target, target_count, target_datatype, op);
196 } else {
197 ret = ompi_datatype_sndrcv ((void *)source, source_count, source_datatype, target, target_count, target_datatype);
198 }
199
200 ompi_osc_pt2pt_accumulate_unlock (module);
201
202 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
203 OPAL_OUTPUT_VERBOSE((5, ompi_osc_base_framework.framework_output,
204 "ompi_osc_pt2pt_acc_self: failed performing accumulate operation. ret = %d", ret));
205 return ret;
206 }
207
208 if (request) {
209 ompi_osc_pt2pt_request_complete (request, MPI_SUCCESS);
210 }
211
212 return OMPI_SUCCESS;
213 }
214
215 static inline int ompi_osc_pt2pt_gacc_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, const void *source, int source_count, ompi_datatype_t *source_datatype,
216 void *result, int result_count, ompi_datatype_t *result_datatype,
217 ptrdiff_t target_disp, int target_count, ompi_datatype_t *target_datatype,
218 ompi_op_t *op, ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_request_t *request)
219 {
220 void *target = (unsigned char*) module->baseptr +
221 ((unsigned long) target_disp * module->disp_unit);
222 int ret;
223
224 OPAL_OUTPUT_VERBOSE((MCA_BASE_VERBOSE_TRACE, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_gacc_self: starting local "
225 "get accumulate"));
226
227 ompi_osc_pt2pt_accumulate_lock (module);
228
229 do {
230 ret = ompi_datatype_sndrcv (target, target_count, target_datatype,
231 result, result_count, result_datatype);
232
233 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
234 OPAL_OUTPUT_VERBOSE((5, ompi_osc_base_framework.framework_output,
235 "ompi_osc_pt2pt_gacc_self: failed copying to the target buffer. ret = %d", ret));
236 break;
237 }
238
239 if (&ompi_mpi_op_no_op.op != op) {
240 if (&ompi_mpi_op_replace.op != op) {
241 ret = ompi_osc_base_sndrcv_op (source, source_count, source_datatype, target, target_count, target_datatype, op);
242 } else {
243 ret = ompi_datatype_sndrcv ((void *)source, source_count, source_datatype, target, target_count, target_datatype);
244 }
245 }
246
247 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
248 OPAL_OUTPUT_VERBOSE((5, ompi_osc_base_framework.framework_output,
249 "ompi_osc_pt2pt_gacc_self: failed performing accumulate operation. ret = %d", ret));
250 break;
251 }
252 } while (0);
253
254 ompi_osc_pt2pt_accumulate_unlock (module);
255
256 OPAL_OUTPUT_VERBOSE((MCA_BASE_VERBOSE_TRACE, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_gacc_self: local get "
257 "accumulate complete"));
258
259 if (request) {
260
261 ompi_osc_pt2pt_request_complete (request, ret);
262 }
263
264 return OMPI_SUCCESS;
265 }
266
267
268 static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_count,
269 struct ompi_datatype_t *origin_dt,
270 int target, ptrdiff_t target_disp,
271 int target_count, struct ompi_datatype_t *target_dt,
272 ompi_win_t *win, ompi_osc_pt2pt_request_t *request)
273 {
274 ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
275 ompi_proc_t *proc = ompi_comm_peer_lookup(module->comm, target);
276 ompi_osc_pt2pt_frag_t *frag;
277 ompi_osc_pt2pt_header_put_t *header;
278 ompi_osc_pt2pt_sync_t *pt2pt_sync;
279 size_t ddt_len, payload_len, frag_len;
280 bool is_long_datatype = false;
281 bool is_long_msg = false;
282 const void *packed_ddt;
283 int tag = -1, ret;
284 char *ptr;
285
286 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
287 "put: 0x%lx, %d, %s, %d, %d, %d, %s, %s",
288 (unsigned long) origin_addr, origin_count,
289 origin_dt->name, target, (int) target_disp,
290 target_count, target_dt->name, win->w_name));
291
292 pt2pt_sync = ompi_osc_pt2pt_module_sync_lookup (module, target, NULL);
293 if (OPAL_UNLIKELY(NULL == pt2pt_sync)) {
294 return OMPI_ERR_RMA_SYNC;
295 }
296
297
298 if (0 == origin_count || 0 == target_count) {
299 if (request) {
300 ompi_osc_pt2pt_request_complete (request, MPI_SUCCESS);
301 }
302
303 return OMPI_SUCCESS;
304 }
305
306
307 if (ompi_comm_rank (module->comm) == target) {
308 return ompi_osc_pt2pt_put_self (pt2pt_sync, origin_addr, origin_count, origin_dt,
309 target_disp, target_count, target_dt,
310 module, request);
311 }
312
313
314
315 ddt_len = ompi_datatype_pack_description_length(target_dt);
316 payload_len = origin_dt->super.size * origin_count;
317 frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len + payload_len;
318
319 ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, true);
320 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
321 frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len;
322 ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true, false);
323 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
324
325 frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8;
326 ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true, false);
327 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
328 return OMPI_ERR_OUT_OF_RESOURCE;
329 }
330
331 is_long_datatype = true;
332 }
333
334 is_long_msg = true;
335 tag = get_tag(module);
336 }
337
338 if (is_long_msg) {
339
340 if (pt2pt_sync->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK) {
341 OPAL_THREAD_LOCK(&pt2pt_sync->lock);
342 ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
343 while (!(peer->flags & OMPI_OSC_PT2PT_PEER_FLAG_EAGER)) {
344 opal_condition_wait(&pt2pt_sync->cond, &pt2pt_sync->lock);
345 }
346 OPAL_THREAD_UNLOCK(&pt2pt_sync->lock);
347 } else {
348 ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
349 }
350 }
351
352 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
353 "osc pt2pt: put long protocol: %d, large datatype: %d",
354 (int) is_long_msg, (int) is_long_datatype));
355
356 header = (ompi_osc_pt2pt_header_put_t *) ptr;
357 header->base.flags = 0;
358 header->len = frag_len;
359 header->count = target_count;
360 header->displacement = target_disp;
361 ptr += sizeof(ompi_osc_pt2pt_header_put_t);
362
363 do {
364 ret = ompi_datatype_get_pack_description(target_dt, &packed_ddt);
365 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
366 break;
367 }
368
369 if (is_long_datatype) {
370
371 header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_LARGE_DATATYPE;
372
373 OMPI_DATATYPE_RETAIN(target_dt);
374
375 ret = ompi_osc_pt2pt_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE,
376 target, tag_to_target(tag), module->comm,
377 ompi_osc_pt2pt_dt_send_complete, target_dt);
378 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
379 break;
380 }
381
382 *((uint64_t *) ptr) = ddt_len;
383 ptr += 8;
384 } else {
385 memcpy((unsigned char*) ptr, packed_ddt, ddt_len);
386 ptr += ddt_len;
387 }
388
389 if (!is_long_msg) {
390 header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_PUT;
391 osc_pt2pt_hton(header, proc);
392
393 osc_pt2pt_copy_for_send (ptr, payload_len, origin_addr, proc, origin_count,
394 origin_dt);
395
396
397
398 if (request) {
399 ompi_osc_pt2pt_request_complete (request, MPI_SUCCESS);
400 }
401 } else {
402 header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_PUT_LONG;
403 header->tag = tag;
404 osc_pt2pt_hton(header, proc);
405
406 ret = ompi_osc_pt2pt_data_isend (module,origin_addr, origin_count, origin_dt,
407 target, tag_to_target(tag), request);
408 }
409 } while (0);
410
411 if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
412 header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_VALID;
413 }
414
415 return ompi_osc_pt2pt_frag_finish(module, frag);
416 }
417
418 int
419 ompi_osc_pt2pt_put(const void *origin_addr, int origin_count,
420 struct ompi_datatype_t *origin_dt,
421 int target, ptrdiff_t target_disp,
422 int target_count,
423 struct ompi_datatype_t *target_dt, ompi_win_t *win)
424 {
425 return ompi_osc_pt2pt_put_w_req (origin_addr, origin_count,
426 origin_dt, target, target_disp,
427 target_count, target_dt, win, NULL);
428 }
429
430
431 static int
432 ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
433 struct ompi_datatype_t *origin_dt,
434 int target, ptrdiff_t target_disp,
435 int target_count,
436 struct ompi_datatype_t *target_dt,
437 struct ompi_op_t *op, ompi_win_t *win,
438 ompi_osc_pt2pt_request_t *request)
439 {
440 int ret;
441 ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
442 ompi_proc_t *proc = ompi_comm_peer_lookup(module->comm, target);
443 bool is_long_datatype = false;
444 bool is_long_msg = false;
445 ompi_osc_pt2pt_frag_t *frag;
446 ompi_osc_pt2pt_header_acc_t *header;
447 ompi_osc_pt2pt_sync_t *pt2pt_sync;
448 size_t ddt_len, payload_len, frag_len;
449 char *ptr;
450 const void *packed_ddt;
451 int tag = -1;
452
453 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
454 "acc: 0x%lx, %d, %s, %d, %d, %d, %s, %s, %s",
455 (unsigned long) origin_addr, origin_count,
456 origin_dt->name, target, (int) target_disp,
457 target_count, target_dt->name, op->o_name,
458 win->w_name));
459
460 pt2pt_sync = ompi_osc_pt2pt_module_sync_lookup (module, target, NULL);
461 if (OPAL_UNLIKELY(NULL == pt2pt_sync)) {
462 return OMPI_ERR_RMA_SYNC;
463 }
464
465
466 if (0 == origin_count || 0 == target_count) {
467 if (request) {
468 ompi_osc_pt2pt_request_complete (request, MPI_SUCCESS);
469 }
470
471 return OMPI_SUCCESS;
472 }
473
474
475 if (ompi_comm_rank (module->comm) == target) {
476 return ompi_osc_pt2pt_acc_self (pt2pt_sync, origin_addr, origin_count, origin_dt,
477 target_disp, target_count, target_dt,
478 op, module, request);
479 }
480
481
482
483 ddt_len = ompi_datatype_pack_description_length(target_dt);
484 payload_len = origin_dt->super.size * origin_count;
485
486 frag_len = sizeof(*header) + ddt_len + payload_len;
487 ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, true);
488 if (OMPI_SUCCESS != ret) {
489 frag_len = sizeof(*header) + ddt_len;
490 ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true, !request);
491 if (OMPI_SUCCESS != ret) {
492
493 frag_len = sizeof(*header) + 8;
494 ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true, !request);
495 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
496 return OMPI_ERR_OUT_OF_RESOURCE;
497 }
498
499 is_long_datatype = true;
500 }
501
502 is_long_msg = true;
503 tag = get_tag (module);
504 } else {
505
506 tag = !!(module->passive_target_access_epoch);
507 }
508
509 if (is_long_msg) {
510
511 if (pt2pt_sync->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK) {
512 OPAL_THREAD_LOCK(&pt2pt_sync->lock);
513 ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
514 while (!(peer->flags & OMPI_OSC_PT2PT_PEER_FLAG_EAGER)) {
515 opal_condition_wait(&pt2pt_sync->cond, &pt2pt_sync->lock);
516 }
517 OPAL_THREAD_UNLOCK(&pt2pt_sync->lock);
518 } else {
519 ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
520 }
521 }
522
523 header = (ompi_osc_pt2pt_header_acc_t*) ptr;
524 header->base.flags = 0;
525 header->len = frag_len;
526 header->count = target_count;
527 header->displacement = target_disp;
528 header->op = op->o_f_to_c_index;
529 header->tag = tag;
530 ptr += sizeof (*header);
531
532 do {
533 ret = ompi_datatype_get_pack_description(target_dt, &packed_ddt);
534 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
535 break;
536 }
537
538 if (is_long_datatype) {
539
540 header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_LARGE_DATATYPE;
541
542 OMPI_DATATYPE_RETAIN(target_dt);
543
544 ret = ompi_osc_pt2pt_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE,
545 target, tag_to_target(tag), module->comm,
546 ompi_osc_pt2pt_dt_send_complete, target_dt);
547 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
548 break;
549 }
550
551 *((uint64_t *) ptr) = ddt_len;
552 ptr += 8;
553 } else {
554 memcpy((unsigned char*) ptr, packed_ddt, ddt_len);
555 ptr += ddt_len;
556 }
557
558 if (!is_long_msg) {
559 header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_ACC;
560 osc_pt2pt_hton(header, proc);
561
562 osc_pt2pt_copy_for_send (ptr, payload_len, origin_addr, proc,
563 origin_count, origin_dt);
564
565
566
567 if (request) {
568 ompi_osc_pt2pt_request_complete (request, MPI_SUCCESS);
569 }
570 } else {
571 header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG;
572 osc_pt2pt_hton(header, proc);
573
574 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
575 "acc: starting long accumulate with tag %d", tag));
576
577 ret = ompi_osc_pt2pt_data_isend (module, origin_addr, origin_count, origin_dt,
578 target, tag_to_target(tag), request);
579 }
580 } while (0);
581
582 if (OMPI_SUCCESS != ret) {
583 OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
584 "acc: failed with eror %d", ret));
585 } else {
586
587 header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_VALID;
588 }
589
590 return ompi_osc_pt2pt_frag_finish(module, frag);
591 }
592
593 int
594 ompi_osc_pt2pt_accumulate(const void *origin_addr, int origin_count,
595 struct ompi_datatype_t *origin_dt,
596 int target, ptrdiff_t target_disp,
597 int target_count,
598 struct ompi_datatype_t *target_dt,
599 struct ompi_op_t *op, ompi_win_t *win)
600 {
601 return ompi_osc_pt2pt_accumulate_w_req (origin_addr, origin_count, origin_dt,
602 target, target_disp, target_count,
603 target_dt, op, win, NULL);
604 }
605
606 int ompi_osc_pt2pt_compare_and_swap (const void *origin_addr, const void *compare_addr,
607 void *result_addr, struct ompi_datatype_t *dt,
608 int target, ptrdiff_t target_disp,
609 struct ompi_win_t *win)
610 {
611 ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
612 ompi_proc_t *proc = ompi_comm_peer_lookup(module->comm, target);
613 ompi_osc_pt2pt_frag_t *frag;
614 ompi_osc_pt2pt_header_cswap_t *header;
615 ompi_osc_pt2pt_sync_t *pt2pt_sync;
616 size_t ddt_len, payload_len, frag_len;
617 ompi_osc_pt2pt_request_t *request;
618 const void *packed_ddt;
619 int ret, tag;
620 char *ptr;
621
622 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
623 "cswap: 0x%lx, 0x%lx, 0x%lx, %s, %d, %d, %s",
624 (unsigned long) origin_addr, (unsigned long) compare_addr,
625 (unsigned long) result_addr, dt->name, target, (int) target_disp,
626 win->w_name));
627
628 pt2pt_sync = ompi_osc_pt2pt_module_sync_lookup (module, target, NULL);
629 if (OPAL_UNLIKELY(NULL == pt2pt_sync)) {
630 return OMPI_ERR_RMA_SYNC;
631 }
632
633
634 if (ompi_comm_rank (module->comm) == target) {
635 return ompi_osc_pt2pt_cas_self (pt2pt_sync, origin_addr, compare_addr, result_addr, dt, target_disp,
636 module);
637 }
638
639
640 OMPI_OSC_PT2PT_REQUEST_ALLOC(win, request);
641
642 request->type = OMPI_OSC_PT2PT_HDR_TYPE_CSWAP;
643 request->origin_addr = origin_addr;
644 request->internal = true;
645 OMPI_DATATYPE_RETAIN(dt);
646 request->origin_dt = dt;
647
648
649
650 ddt_len = ompi_datatype_pack_description_length(dt);
651
652
653 payload_len = dt->super.size * 2;
654
655 ret = ompi_datatype_get_pack_description(dt, &packed_ddt);
656 if (OMPI_SUCCESS != ret) {
657 return ret;
658 }
659
660 frag_len = sizeof(ompi_osc_pt2pt_header_cswap_t) + ddt_len + payload_len;
661 ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, false);
662 if (OMPI_SUCCESS != ret) {
663 return OMPI_ERR_OUT_OF_RESOURCE;
664 }
665
666 tag = get_tag (module);
667 ompi_osc_signal_outgoing (module, target, 1);
668
669 header = (ompi_osc_pt2pt_header_cswap_t *) ptr;
670 header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_CSWAP;
671 header->base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
672 header->len = frag_len;
673 header->displacement = target_disp;
674 header->tag = tag;
675 osc_pt2pt_hton(header, proc);
676 ptr += sizeof(ompi_osc_pt2pt_header_cswap_t);
677
678 memcpy((unsigned char*) ptr, packed_ddt, ddt_len);
679 ptr += ddt_len;
680
681
682 osc_pt2pt_copy_for_send (ptr, dt->super.size, origin_addr, proc, 1, dt);
683 ptr += dt->super.size;
684 osc_pt2pt_copy_for_send (ptr, dt->super.size, compare_addr, proc, 1, dt);
685
686 request->outstanding_requests = 1;
687 ret = ompi_osc_pt2pt_irecv_w_cb (result_addr, 1, dt,
688 target, tag_to_origin(tag), module->comm,
689 NULL, ompi_osc_pt2pt_req_comm_complete, request);
690 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
691 return ret;
692 }
693
694 return ompi_osc_pt2pt_frag_finish (module, frag);
695 }
696
697
698 int ompi_osc_pt2pt_fetch_and_op(const void *origin_addr, void *result_addr,
699 struct ompi_datatype_t *dt, int target,
700 ptrdiff_t target_disp, struct ompi_op_t *op,
701 struct ompi_win_t *win)
702 {
703 return ompi_osc_pt2pt_get_accumulate(origin_addr, 1, dt, result_addr, 1, dt,
704 target, target_disp, 1, dt, op, win);
705 }
706
707 int ompi_osc_pt2pt_rput(const void *origin_addr, int origin_count,
708 struct ompi_datatype_t *origin_dt,
709 int target, ptrdiff_t target_disp,
710 int target_count, struct ompi_datatype_t *target_dt,
711 struct ompi_win_t *win, struct ompi_request_t **request)
712 {
713 ompi_osc_pt2pt_request_t *pt2pt_request;
714 int ret;
715
716 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
717 "rput: 0x%lx, %d, %s, %d, %d, %d, %s, %s",
718 (unsigned long) origin_addr, origin_count,
719 origin_dt->name, target, (int) target_disp,
720 target_count, target_dt->name, win->w_name));
721
722 OMPI_OSC_PT2PT_REQUEST_ALLOC(win, pt2pt_request);
723
724
725 if (0 == origin_count || 0 == target_count) {
726 ompi_osc_pt2pt_request_complete (pt2pt_request, MPI_SUCCESS);
727 *request = &pt2pt_request->super;
728 return OMPI_SUCCESS;
729 }
730
731 pt2pt_request->type = OMPI_OSC_PT2PT_HDR_TYPE_PUT;
732
733 ret = ompi_osc_pt2pt_put_w_req (origin_addr, origin_count, origin_dt, target,
734 target_disp, target_count, target_dt, win,
735 pt2pt_request);
736 if (OMPI_SUCCESS != ret) {
737 OMPI_OSC_PT2PT_REQUEST_RETURN(pt2pt_request);
738 return ret;
739 }
740
741 *request = (ompi_request_t *) pt2pt_request;
742
743 return OMPI_SUCCESS;
744 }
745
746 static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_count,
747 struct ompi_datatype_t *origin_dt,
748 int target,
749 ptrdiff_t target_disp,
750 int target_count,
751 struct ompi_datatype_t *target_dt,
752 struct ompi_win_t *win, bool release_req,
753 struct ompi_request_t **request)
754 {
755 int ret, tag;
756 ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
757 bool is_long_datatype = false;
758 ompi_osc_pt2pt_frag_t *frag;
759 ompi_osc_pt2pt_header_get_t *header;
760 ompi_osc_pt2pt_sync_t *pt2pt_sync;
761 size_t ddt_len, frag_len;
762 char *ptr;
763 const void *packed_ddt;
764 ompi_osc_pt2pt_request_t *pt2pt_request;
765
766 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
767 "get: 0x%lx, %d, %s, %d, %d, %d, %s, %s",
768 (unsigned long) origin_addr, origin_count,
769 origin_dt->name, target, (int) target_disp,
770 target_count, target_dt->name, win->w_name));
771
772 pt2pt_sync = ompi_osc_pt2pt_module_sync_lookup (module, target, NULL);
773 if (OPAL_UNLIKELY(NULL == pt2pt_sync)) {
774 return OMPI_ERR_RMA_SYNC;
775 }
776
777
778 OMPI_OSC_PT2PT_REQUEST_ALLOC(win, pt2pt_request);
779
780 pt2pt_request->internal = release_req;
781
782
783 if (0 == origin_count || 0 == target_count) {
784 ompi_osc_pt2pt_request_complete (pt2pt_request, MPI_SUCCESS);
785 *request = &pt2pt_request->super;
786 return OMPI_SUCCESS;
787 }
788
789
790 if (ompi_comm_rank (module->comm) == target) {
791 *request = &pt2pt_request->super;
792 return ompi_osc_pt2pt_get_self (pt2pt_sync, origin_addr, origin_count, origin_dt,
793 target_disp, target_count, target_dt,
794 module, pt2pt_request);
795 }
796
797 pt2pt_request->type = OMPI_OSC_PT2PT_HDR_TYPE_GET;
798 pt2pt_request->origin_addr = origin_addr;
799 pt2pt_request->origin_count = origin_count;
800 OMPI_DATATYPE_RETAIN(origin_dt);
801 pt2pt_request->origin_dt = origin_dt;
802
803
804
805 ddt_len = ompi_datatype_pack_description_length(target_dt);
806
807 frag_len = sizeof(ompi_osc_pt2pt_header_get_t) + ddt_len;
808 ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, release_req);
809 if (OMPI_SUCCESS != ret) {
810
811 frag_len = sizeof(ompi_osc_pt2pt_header_get_t) + 8;
812 ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, release_req);
813 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
814 return OMPI_ERR_OUT_OF_RESOURCE;
815 }
816
817 is_long_datatype = true;
818 }
819
820 tag = get_tag (module);
821
822
823 ompi_osc_signal_outgoing (module, target, 1);
824
825 if (!release_req) {
826
827 ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
828 }
829
830 header = (ompi_osc_pt2pt_header_get_t*) ptr;
831 header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_GET;
832 header->base.flags = 0;
833 header->len = frag_len;
834 header->count = target_count;
835 header->displacement = target_disp;
836 header->tag = tag;
837 OSC_PT2PT_HTON(header, module, target);
838 ptr += sizeof(ompi_osc_pt2pt_header_get_t);
839
840 do {
841 ret = ompi_datatype_get_pack_description(target_dt, &packed_ddt);
842 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
843 break;
844 }
845
846 if (is_long_datatype) {
847
848 header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_LARGE_DATATYPE;
849
850 OMPI_DATATYPE_RETAIN(target_dt);
851
852 ret = ompi_osc_pt2pt_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE,
853 target, tag_to_target(tag), module->comm,
854 ompi_osc_pt2pt_dt_send_complete, target_dt);
855 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
856 break;
857 }
858
859 *((uint64_t *) ptr) = ddt_len;
860 ptr += 8;
861 } else {
862 memcpy((unsigned char*) ptr, packed_ddt, ddt_len);
863 ptr += ddt_len;
864 }
865
866
867 pt2pt_request->outstanding_requests = 1;
868 ret = ompi_osc_pt2pt_irecv_w_cb (origin_addr, origin_count, origin_dt,
869 target, tag_to_origin(tag), module->comm,
870 NULL, ompi_osc_pt2pt_req_comm_complete, pt2pt_request);
871 } while (0);
872
873 if (OMPI_SUCCESS == ret) {
874 header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_VALID;
875 *request = &pt2pt_request->super;
876 }
877
878 return ompi_osc_pt2pt_frag_finish(module, frag);
879 }
880
881 int ompi_osc_pt2pt_rget (void *origin_addr, int origin_count, struct ompi_datatype_t *origin_dt,
882 int target, ptrdiff_t target_disp, int target_count,
883 struct ompi_datatype_t *target_dt, struct ompi_win_t *win,
884 struct ompi_request_t **request)
885 {
886
887 return ompi_osc_pt2pt_rget_internal (origin_addr, origin_count, origin_dt, target, target_disp,
888 target_count, target_dt, win, false, request);
889 }
890
891
892 int ompi_osc_pt2pt_get (void *origin_addr, int origin_count, struct ompi_datatype_t *origin_dt,
893 int target, ptrdiff_t target_disp, int target_count,
894 struct ompi_datatype_t *target_dt, struct ompi_win_t *win)
895 {
896 ompi_request_t *request;
897
898 return ompi_osc_pt2pt_rget_internal (origin_addr, origin_count, origin_dt, target, target_disp,
899 target_count, target_dt, win, true, &request);
900 }
901
902 int ompi_osc_pt2pt_raccumulate(const void *origin_addr, int origin_count,
903 struct ompi_datatype_t *origin_dt, int target,
904 ptrdiff_t target_disp, int target_count,
905 struct ompi_datatype_t *target_dt, struct ompi_op_t *op,
906 struct ompi_win_t *win, struct ompi_request_t **request)
907 {
908 ompi_osc_pt2pt_request_t *pt2pt_request;
909 int ret;
910
911 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
912 "raccumulate: 0x%lx, %d, %s, %d, %d, %d, %s, %s, %s",
913 (unsigned long) origin_addr, origin_count,
914 origin_dt->name, target, (int) target_disp,
915 target_count, target_dt->name, op->o_name,
916 win->w_name));
917
918 OMPI_OSC_PT2PT_REQUEST_ALLOC(win, pt2pt_request);
919
920
921 if (0 == origin_count || 0 == target_count) {
922 ompi_osc_pt2pt_request_complete (pt2pt_request, MPI_SUCCESS);
923 *request = (ompi_request_t *) pt2pt_request;
924 return OMPI_SUCCESS;
925 }
926
927 pt2pt_request->type = OMPI_OSC_PT2PT_HDR_TYPE_ACC;
928
929 ret = ompi_osc_pt2pt_accumulate_w_req (origin_addr, origin_count, origin_dt, target,
930 target_disp, target_count, target_dt, op, win,
931 pt2pt_request);
932 if (OMPI_SUCCESS != ret) {
933 OMPI_OSC_PT2PT_REQUEST_RETURN(pt2pt_request);
934 return ret;
935 }
936
937 *request = (ompi_request_t *) pt2pt_request;
938
939 return OMPI_SUCCESS;
940 }
941
942
943 static inline
944 int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin_count,
945 struct ompi_datatype_t *origin_datatype,
946 void *result_addr, int result_count,
947 struct ompi_datatype_t *result_datatype,
948 int target_rank, MPI_Aint target_disp,
949 int target_count, struct ompi_datatype_t *target_datatype,
950 struct ompi_op_t *op, struct ompi_win_t *win,
951 bool release_req, struct ompi_request_t **request)
952 {
953 int ret;
954 ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
955 ompi_proc_t *proc = ompi_comm_peer_lookup(module->comm, target_rank);
956 bool is_long_datatype = false;
957 bool is_long_msg = false;
958 ompi_osc_pt2pt_frag_t *frag;
959 ompi_osc_pt2pt_header_acc_t *header;
960 ompi_osc_pt2pt_sync_t *pt2pt_sync;
961 size_t ddt_len, payload_len, frag_len;
962 char *ptr;
963 const void *packed_ddt;
964 int tag;
965 ompi_osc_pt2pt_request_t *pt2pt_request;
966
967 OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
968 "rget_acc: 0x%lx, %d, %s, 0x%lx, %d, %s, 0x%x, %d, %d, %s, %s, %s",
969 (unsigned long) origin_addr, origin_count, origin_datatype->name,
970 (unsigned long) result_addr, result_count, result_datatype->name,
971 target_rank, (int) target_disp, target_count, target_datatype->name,
972 op->o_name, win->w_name));
973
974 pt2pt_sync = ompi_osc_pt2pt_module_sync_lookup (module, target_rank, NULL);
975 if (OPAL_UNLIKELY(NULL == pt2pt_sync)) {
976 return OMPI_ERR_RMA_SYNC;
977 }
978
979
980 OMPI_OSC_PT2PT_REQUEST_ALLOC(win, pt2pt_request);
981
982 pt2pt_request->internal = release_req;
983
984
985 if (0 == result_count || 0 == target_count) {
986 ompi_osc_pt2pt_request_complete (pt2pt_request, MPI_SUCCESS);
987 *request = &pt2pt_request->super;
988 return OMPI_SUCCESS;
989 }
990
991 if (!release_req) {
992
993 ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
994 }
995
996
997 if (ompi_comm_rank (module->comm) == target_rank) {
998 *request = &pt2pt_request->super;
999 return ompi_osc_pt2pt_gacc_self (pt2pt_sync, origin_addr, origin_count, origin_datatype,
1000 result_addr, result_count, result_datatype,
1001 target_disp, target_count, target_datatype,
1002 op, module, pt2pt_request);
1003 }
1004
1005 pt2pt_request->type = OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC;
1006 pt2pt_request->origin_addr = origin_addr;
1007 pt2pt_request->origin_count = origin_count;
1008 OMPI_DATATYPE_RETAIN(origin_datatype);
1009 pt2pt_request->origin_dt = origin_datatype;
1010
1011
1012
1013 ddt_len = ompi_datatype_pack_description_length(target_datatype);
1014
1015 if (&ompi_mpi_op_no_op.op != op) {
1016 payload_len = origin_datatype->super.size * origin_count;
1017 } else {
1018 payload_len = 0;
1019 }
1020
1021 frag_len = sizeof(*header) + ddt_len + payload_len;
1022 ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, false, release_req);
1023 if (OMPI_SUCCESS != ret) {
1024 frag_len = sizeof(*header) + ddt_len;
1025 ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, true, release_req);
1026 if (OMPI_SUCCESS != ret) {
1027
1028 frag_len = sizeof(*header) + 8;
1029 ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, true, release_req);
1030 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1031 return OMPI_ERR_OUT_OF_RESOURCE;
1032 }
1033
1034 is_long_datatype = true;
1035 }
1036
1037 is_long_msg = true;
1038 }
1039
1040 tag = get_tag (module);
1041
1042
1043
1044 pt2pt_request->outstanding_requests = 1 + is_long_msg;
1045
1046
1047 ompi_osc_signal_outgoing (module, target_rank, pt2pt_request->outstanding_requests);
1048
1049 header = (ompi_osc_pt2pt_header_acc_t *) ptr;
1050 header->base.flags = 0;
1051 header->len = frag_len;
1052 header->count = target_count;
1053 header->displacement = target_disp;
1054 header->op = op->o_f_to_c_index;
1055 header->tag = tag;
1056
1057 ptr = (char *)(header + 1);
1058
1059 do {
1060 ret = ompi_datatype_get_pack_description(target_datatype, &packed_ddt);
1061 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1062 break;
1063 }
1064
1065 if (is_long_datatype) {
1066
1067 header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_LARGE_DATATYPE;
1068
1069 OMPI_DATATYPE_RETAIN(target_datatype);
1070
1071 ret = ompi_osc_pt2pt_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE,
1072 target_rank, tag_to_target(tag), module->comm,
1073 ompi_osc_pt2pt_dt_send_complete, target_datatype);
1074 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1075 break;
1076 }
1077
1078 *((uint64_t *) ptr) = ddt_len;
1079 ptr += 8;
1080 } else {
1081 memcpy((unsigned char*) ptr, packed_ddt, ddt_len);
1082 ptr += ddt_len;
1083 }
1084
1085 ret = ompi_osc_pt2pt_irecv_w_cb (result_addr, result_count, result_datatype,
1086 target_rank, tag_to_origin(tag), module->comm,
1087 NULL, ompi_osc_pt2pt_req_comm_complete, pt2pt_request);
1088 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
1089 break;
1090 }
1091
1092 if (!is_long_msg) {
1093 header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC;
1094 osc_pt2pt_hton(header, proc);
1095
1096 if (&ompi_mpi_op_no_op.op != op) {
1097 osc_pt2pt_copy_for_send (ptr, payload_len, origin_addr, proc, origin_count,
1098 origin_datatype);
1099 }
1100 } else {
1101 header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG;
1102 osc_pt2pt_hton(header, proc);
1103
1104 ret = ompi_osc_pt2pt_isend_w_cb (origin_addr, origin_count, origin_datatype,
1105 target_rank, tag_to_target(tag), module->comm,
1106 ompi_osc_pt2pt_req_comm_complete, pt2pt_request);
1107 }
1108 } while (0);
1109
1110 if (OMPI_SUCCESS == ret) {
1111 header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_VALID;
1112 *request = (ompi_request_t *) pt2pt_request;
1113 }
1114
1115 return ompi_osc_pt2pt_frag_finish(module, frag);
1116 }
1117
1118 int ompi_osc_pt2pt_get_accumulate(const void *origin_addr, int origin_count,
1119 struct ompi_datatype_t *origin_dt,
1120 void *result_addr, int result_count,
1121 struct ompi_datatype_t *result_dt,
1122 int target, MPI_Aint target_disp,
1123 int target_count, struct ompi_datatype_t *target_dt,
1124 struct ompi_op_t *op, struct ompi_win_t *win)
1125 {
1126 ompi_request_t *request;
1127
1128 return ompi_osc_pt2pt_rget_accumulate_internal (origin_addr, origin_count, origin_dt,
1129 result_addr, result_count, result_dt,
1130 target, target_disp, target_count,
1131 target_dt, op, win, true, &request);
1132 }
1133
1134
1135 int ompi_osc_pt2pt_rget_accumulate(const void *origin_addr, int origin_count,
1136 struct ompi_datatype_t *origin_dt,
1137 void *result_addr, int result_count,
1138 struct ompi_datatype_t *result_dt,
1139 int target, MPI_Aint target_disp,
1140 int target_count, struct ompi_datatype_t *target_dt,
1141 struct ompi_op_t *op, struct ompi_win_t *win,
1142 ompi_request_t **request)
1143 {
1144 return ompi_osc_pt2pt_rget_accumulate_internal (origin_addr, origin_count, origin_dt,
1145 result_addr, result_count, result_dt,
1146 target, target_disp, target_count,
1147 target_dt, op, win, false, request);
1148 }