This source file includes following definitions.
- grequestx_progress
- ompi_grequestx_start
- ompi_grequestx_class_create
- ompi_grequestx_class_allocate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 #include "ompi_config.h"
24 #include "ompi/communicator/communicator.h"
25 #include "ompi/request/grequest.h"
26 #include "ompi/mpi/fortran/base/fint_2_int.h"
27 #include "ompi/request/grequestx.h"
28
29 static bool requests_initialized = false;
30 static opal_list_t requests;
31 static opal_atomic_int32_t active_requests = 0;
32 static bool in_progress = false;
33 static opal_mutex_t lock;
34
35 static int grequestx_progress(void) {
36 ompi_grequest_t *request, *next;
37
38 OPAL_THREAD_LOCK(&lock);
39 if (!in_progress) {
40 in_progress = true;
41
42 OPAL_LIST_FOREACH_SAFE(request, next, &requests, ompi_grequest_t) {
43 MPI_Status status;
44 OPAL_THREAD_UNLOCK(&lock);
45 request->greq_poll.c_poll(request->greq_state, &status);
46 if (REQUEST_COMPLETE(&request->greq_base)) {
47 OPAL_THREAD_LOCK(&lock);
48 opal_list_remove_item(&requests, &request->greq_base.super.super);
49 OPAL_THREAD_UNLOCK(&lock);
50 }
51 OPAL_THREAD_LOCK(&lock);
52 }
53 }
54 OPAL_THREAD_UNLOCK(&lock);
55
56 return OMPI_SUCCESS;
57 }
58
59 int ompi_grequestx_start(
60 MPI_Grequest_query_function *gquery_fn,
61 MPI_Grequest_free_function *gfree_fn,
62 MPI_Grequest_cancel_function *gcancel_fn,
63 ompi_grequestx_poll_function *gpoll_fn,
64 void* extra_state,
65 ompi_request_t** request)
66 {
67 int rc;
68
69 rc = ompi_grequest_start(gquery_fn, gfree_fn, gcancel_fn, extra_state, request);
70 if (OMPI_SUCCESS != rc) {
71 return rc;
72 }
73 ((ompi_grequest_t *)*request)->greq_poll.c_poll = gpoll_fn;
74
75 if (!requests_initialized) {
76 OBJ_CONSTRUCT(&requests, opal_list_t);
77 OBJ_CONSTRUCT(&lock, opal_mutex_t);
78 requests_initialized = true;
79 }
80
81 OPAL_THREAD_LOCK(&lock);
82 opal_list_append(&requests, &((*request)->super.super));
83 OPAL_THREAD_UNLOCK(&lock);
84 int32_t tmp = OPAL_THREAD_ADD_FETCH32(&active_requests, 1);
85 if (1 == tmp) {
86 opal_progress_register(grequestx_progress);
87 }
88
89 return OMPI_SUCCESS;
90 }
91
92
93 struct grequestx_class {
94 opal_object_t super;
95 MPI_Grequest_query_function *gquery_fn;
96 MPI_Grequest_free_function *gfree_fn;
97 MPI_Grequest_cancel_function *gcancel_fn;
98 ompi_grequestx_poll_function *gpoll_fn;
99 ompi_grequestx_wait_function *gwait_fn;
100 } ;
101
102 typedef struct grequestx_class grequestx_class;
103
104 static int next_class = 0;
105
106 static OBJ_CLASS_INSTANCE(grequestx_class, opal_object_t, NULL, NULL);
107
108 static opal_pointer_array_t classes;
109
110 int ompi_grequestx_class_create(
111 MPI_Grequest_query_function *gquery_fn,
112 MPI_Grequest_free_function *gfree_fn,
113 MPI_Grequest_cancel_function *gcancel_fn,
114 ompi_grequestx_poll_function *gpoll_fn,
115 ompi_grequestx_wait_function *gwait_fn,
116 ompi_grequestx_class *greq_class)
117 {
118 grequestx_class * class = OBJ_NEW(grequestx_class);
119 class->gquery_fn = gquery_fn;
120 class->gfree_fn = gfree_fn;
121 class->gcancel_fn = gcancel_fn;
122 class->gpoll_fn = gpoll_fn;
123 class->gwait_fn = gwait_fn;
124
125 if (0 == next_class) {
126 OBJ_CONSTRUCT(&classes, opal_pointer_array_t);
127 }
128 opal_pointer_array_add(&classes, class);
129 next_class ++;
130
131 return OMPI_SUCCESS;
132 }
133
134 int ompi_grequestx_class_allocate(
135 ompi_grequestx_class greq_class,
136 void *extra_state,
137 ompi_request_t **request)
138 {
139 grequestx_class *class = opal_pointer_array_get_item(&classes, greq_class);
140 ompi_grequestx_start(class->gquery_fn, class->gfree_fn, class->gcancel_fn, class->gpoll_fn, extra_state, request);
141
142 return OMPI_SUCCESS;
143 }