This source file includes following definitions.
- jtrk_cons
- jtrk_des
- init
- orte_ras_slurm_allocate
- deallocate
- orte_ras_slurm_finalize
- orte_ras_slurm_discover
- orte_ras_slurm_parse_ranges
- orte_ras_slurm_parse_range
- timeout
- recv_data
- dyn_allocate
- parse_alloc_msg
- get_node_list
- read_ip_port
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 #include "orte_config.h"
27 #include "orte/constants.h"
28 #include "orte/types.h"
29
30 #include <netdb.h>
31 #include <unistd.h>
32 #include <string.h>
33 #include <ctype.h>
34 #include <sys/types.h>
35 #include <sys/socket.h>
36 #ifdef HAVE_NETINET_IN_H
37 #include <netinet/in.h>
38 #endif
39 #ifdef HAVE_ARPA_INET_H
40 #include <arpa/inet.h>
41 #endif
42 #include <fcntl.h>
43 #include <stdlib.h>
44 #include <string.h>
45
46 #include "opal/util/argv.h"
47 #include "opal/util/net.h"
48 #include "opal/util/output.h"
49 #include "opal/opal_socket_errno.h"
50
51 #include "orte/util/show_help.h"
52 #include "orte/mca/errmgr/errmgr.h"
53 #include "orte/mca/rmaps/base/base.h"
54 #include "orte/mca/state/state.h"
55 #include "orte/util/name_fns.h"
56 #include "orte/runtime/orte_globals.h"
57
58 #include "orte/mca/ras/base/ras_private.h"
59 #include "ras_slurm.h"
60
61 #define ORTE_SLURM_DYN_MAX_SIZE 256
62
63
64
65
66 static int init(void);
67 static int orte_ras_slurm_allocate(orte_job_t *jdata, opal_list_t *nodes);
68 static void deallocate(orte_job_t *jdata,
69 orte_app_context_t *app);
70 static int orte_ras_slurm_finalize(void);
71
72
73
74
75 orte_ras_base_module_t orte_ras_slurm_module = {
76 init,
77 orte_ras_slurm_allocate,
78 deallocate,
79 orte_ras_slurm_finalize
80 };
81
82
83 static int orte_ras_slurm_discover(char *regexp, char* tasks_per_node,
84 opal_list_t *nodelist);
85 static int orte_ras_slurm_parse_ranges(char *base, char *ranges, char ***nodelist);
86 static int orte_ras_slurm_parse_range(char *base, char *range, char ***nodelist);
87
88 static int dyn_allocate(orte_job_t *jdata);
89 static char* get_node_list(orte_app_context_t *app);
90 static int parse_alloc_msg(char *msg, int *idx, int *sjob,
91 char **nodelist, char **tpn);
92
93 static void recv_data(int fd, short args, void *cbdata);
94 static void timeout(int fd, short args, void *cbdata);
95 static int read_ip_port(char *filename, char **ip, uint16_t *port);
96
97
98
99 typedef struct {
100 opal_object_t super;
101 int sjob;
102 } local_apptracker_t;
103 OBJ_CLASS_INSTANCE(local_apptracker_t,
104 opal_object_t,
105 NULL, NULL);
106
107 typedef struct {
108 opal_list_item_t super;
109 char *cmd;
110 opal_event_t timeout_ev;
111 orte_jobid_t jobid;
112 opal_pointer_array_t apps;
113 int napps;
114 } local_jobtracker_t;
115 static void jtrk_cons(local_jobtracker_t *ptr)
116 {
117 ptr->cmd = NULL;
118 OBJ_CONSTRUCT(&ptr->apps, opal_pointer_array_t);
119 opal_pointer_array_init(&ptr->apps, 1, INT_MAX, 1);
120 ptr->napps = 0;
121 }
122 static void jtrk_des(local_jobtracker_t *ptr)
123 {
124 int i;
125 local_apptracker_t *ap;
126
127 if (NULL != ptr->cmd) {
128 free(ptr->cmd);
129 }
130 for (i=0; i < ptr->apps.size; i++) {
131 if (NULL != (ap = (local_apptracker_t*)opal_pointer_array_get_item(&ptr->apps, i))) {
132 OBJ_RELEASE(ap);
133 }
134 }
135 OBJ_DESTRUCT(&ptr->apps);
136 }
137 OBJ_CLASS_INSTANCE(local_jobtracker_t,
138 opal_list_item_t,
139 jtrk_cons, jtrk_des);
140
141
142 static int socket_fd;
143 static opal_list_t jobs;
144 static opal_event_t recv_ev;
145
146
147 static int init(void)
148 {
149 char *slurm_host=NULL;
150 uint16_t port=0;
151 struct sockaddr_in address;
152 int flags;
153 struct hostent *h;
154
155 if (mca_ras_slurm_component.dyn_alloc_enabled) {
156 if (NULL == mca_ras_slurm_component.config_file) {
157 orte_show_help("help-ras-slurm.txt", "dyn-alloc-no-config", true);
158 return ORTE_ERR_SILENT;
159 }
160
161 if (ORTE_SUCCESS != read_ip_port(mca_ras_slurm_component.config_file,
162 &slurm_host, &port) ||
163 NULL == slurm_host || 0 == port) {
164 if (NULL != slurm_host) {
165 free(slurm_host);
166 }
167 return ORTE_ERR_SILENT;
168 }
169 OPAL_OUTPUT_VERBOSE((2, orte_ras_base_framework.framework_output,
170 "ras:slurm got [ ip = %s, port = %u ] from %s\n",
171 slurm_host, port, mca_ras_slurm_component.config_file));
172
173
174 if ((socket_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
175 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
176 free(slurm_host);
177 return ORTE_ERR_OUT_OF_RESOURCE;
178 }
179
180
181 bzero(&address, sizeof(address));
182 address.sin_family = AF_INET;
183 if (!opal_net_isaddr(slurm_host)) {
184
185
186
187 if (NULL == (h = gethostbyname(slurm_host))) {
188
189 orte_show_help("help-ras-slurm.txt", "host-not-resolved",
190 true, slurm_host);
191 free(slurm_host);
192 return ORTE_ERR_SILENT;
193 }
194 free(slurm_host);
195 slurm_host = strdup(inet_ntoa(*(struct in_addr*)h->h_addr_list[0]));
196 }
197 address.sin_addr.s_addr = inet_addr(slurm_host);
198 address.sin_port = htons(port);
199 if (connect(socket_fd, (struct sockaddr*)&address, sizeof(address)) < 0) {
200 orte_show_help("help-ras-slurm.txt", "connection-failed",
201 true, slurm_host, (int)port);
202 free(slurm_host);
203 return ORTE_ERR_SILENT;
204 }
205 free(slurm_host);
206
207
208 if ((flags = fcntl(socket_fd, F_GETFL, 0)) < 0) {
209 opal_output(0, "ras:slurm:dyn: fcntl(F_GETFL) failed: %s (%d)",
210 strerror(opal_socket_errno), opal_socket_errno);
211 return ORTE_ERROR;
212 } else {
213 flags |= O_NONBLOCK;
214 if (fcntl(socket_fd, F_SETFL, flags) < 0) {
215 opal_output(0, "ras:slurm:dyn: fcntl(F_SETFL) failed: %s (%d)",
216 strerror(opal_socket_errno), opal_socket_errno);
217 return ORTE_ERROR;
218 }
219 }
220
221
222 opal_event_set(orte_event_base, &recv_ev, socket_fd,
223 OPAL_EV_READ, recv_data, NULL);
224 opal_event_add(&recv_ev, 0);
225
226
227 OBJ_CONSTRUCT(&jobs, opal_list_t);
228 }
229 return ORTE_SUCCESS;
230 }
231
232
233
234
235
236
237 static int orte_ras_slurm_allocate(orte_job_t *jdata, opal_list_t *nodes)
238 {
239 int ret, cpus_per_task;
240 char *slurm_node_str, *regexp;
241 char *tasks_per_node, *node_tasks;
242 char *tmp;
243 char *slurm_jobid;
244
245 if (NULL == (slurm_jobid = getenv("SLURM_JOBID"))) {
246
247
248
249 if (!mca_ras_slurm_component.dyn_alloc_enabled) {
250
251 opal_output_verbose(2, orte_ras_base_framework.framework_output,
252 "%s ras:slurm: no prior allocation and dynamic alloc disabled",
253 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
254 return ORTE_ERR_TAKE_NEXT_OPTION;
255 }
256 } else {
257
258
259
260 orte_job_ident = strdup(slurm_jobid);
261 }
262
263 slurm_node_str = getenv("SLURM_NODELIST");
264 if (NULL == slurm_node_str) {
265
266 if (mca_ras_slurm_component.dyn_alloc_enabled) {
267
268
269
270
271 ret = dyn_allocate(jdata);
272
273
274
275 return ret;
276 }
277 orte_show_help("help-ras-slurm.txt", "slurm-env-var-not-found", 1,
278 "SLURM_NODELIST");
279 return ORTE_ERR_NOT_FOUND;
280 }
281 regexp = strdup(slurm_node_str);
282 if(NULL == regexp) {
283 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
284 return ORTE_ERR_OUT_OF_RESOURCE;
285 }
286
287 if (mca_ras_slurm_component.use_all) {
288
289
290
291
292
293
294 tasks_per_node = getenv("SLURM_JOB_CPUS_PER_NODE");
295 if (NULL == tasks_per_node) {
296
297 orte_show_help("help-ras-slurm.txt", "slurm-env-var-not-found", 1,
298 "SLURM_JOB_CPUS_PER_NODE");
299 free(regexp);
300 return ORTE_ERR_NOT_FOUND;
301 }
302 node_tasks = strdup(tasks_per_node);
303 if (NULL == node_tasks) {
304 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
305 free(regexp);
306 return ORTE_ERR_OUT_OF_RESOURCE;
307 }
308 cpus_per_task = 1;
309 } else {
310
311 tasks_per_node = getenv("SLURM_TASKS_PER_NODE");
312 if (NULL == tasks_per_node) {
313
314 orte_show_help("help-ras-slurm.txt", "slurm-env-var-not-found", 1,
315 "SLURM_TASKS_PER_NODE");
316 free(regexp);
317 return ORTE_ERR_NOT_FOUND;
318 }
319 node_tasks = strdup(tasks_per_node);
320 if (NULL == node_tasks) {
321 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
322 free(regexp);
323 return ORTE_ERR_OUT_OF_RESOURCE;
324 }
325
326
327 tmp = getenv("SLURM_CPUS_PER_TASK");
328 if(NULL != tmp) {
329 cpus_per_task = atoi(tmp);
330 if(0 >= cpus_per_task) {
331 opal_output(0, "ras:slurm:allocate: Got bad value from SLURM_CPUS_PER_TASK. "
332 "Variable was: %s\n", tmp);
333 ORTE_ERROR_LOG(ORTE_ERROR);
334 free(node_tasks);
335 free(regexp);
336 return ORTE_ERROR;
337 }
338 } else {
339 cpus_per_task = 1;
340 }
341 }
342
343 ret = orte_ras_slurm_discover(regexp, node_tasks, nodes);
344 free(regexp);
345 free(node_tasks);
346 if (ORTE_SUCCESS != ret) {
347 OPAL_OUTPUT_VERBOSE((1, orte_ras_base_framework.framework_output,
348 "%s ras:slurm:allocate: discover failed!",
349 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
350 return ret;
351 }
352
353 orte_num_allocated_nodes = opal_list_get_size(nodes);
354
355
356
357 OPAL_OUTPUT_VERBOSE((1, orte_ras_base_framework.framework_output,
358 "%s ras:slurm:allocate: success",
359 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
360 return ORTE_SUCCESS;
361 }
362
363 static void deallocate(orte_job_t *jdata,
364 orte_app_context_t *app)
365 {
366 }
367
368 static int orte_ras_slurm_finalize(void)
369 {
370 opal_list_item_t *item;
371
372 if (mca_ras_slurm_component.dyn_alloc_enabled) {
373
374 opal_event_del(&recv_ev);
375 while (NULL != (item = opal_list_remove_first(&jobs))) {
376 OBJ_RELEASE(item);
377 }
378 OBJ_DESTRUCT(&jobs);
379
380 shutdown(socket_fd, 2);
381 close(socket_fd);
382 }
383 return ORTE_SUCCESS;
384 }
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402 static int orte_ras_slurm_discover(char *regexp, char *tasks_per_node,
403 opal_list_t* nodelist)
404 {
405 int i, j, len, ret, count, reps, num_nodes;
406 char *base, **names = NULL;
407 char *begptr, *endptr, *orig;
408 int *slots;
409 bool found_range = false;
410 bool more_to_come = false;
411 char *ptr;
412
413 orig = base = strdup(regexp);
414 if (NULL == base) {
415 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
416 return ORTE_ERR_OUT_OF_RESOURCE;
417 }
418
419 OPAL_OUTPUT_VERBOSE((1, orte_ras_base_framework.framework_output,
420 "%s ras:slurm:allocate:discover: checking nodelist: %s",
421 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
422 regexp));
423
424 do {
425
426 len = strlen(base);
427 for (i = 0; i <= len; ++i) {
428 if (base[i] == '[') {
429
430 base[i] = '\0';
431 found_range = true;
432 break;
433 }
434 if (base[i] == ',') {
435
436 base[i] = '\0';
437 found_range = false;
438 more_to_come = true;
439 break;
440 }
441 if (base[i] == '\0') {
442
443 found_range = false;
444 more_to_come = false;
445 break;
446 }
447 }
448 if(i == 0) {
449
450 orte_show_help("help-ras-slurm.txt", "slurm-env-var-bad-value",
451 1, regexp, tasks_per_node, "SLURM_NODELIST");
452 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
453 free(orig);
454 return ORTE_ERR_BAD_PARAM;
455 }
456
457 if (found_range) {
458
459 for (j = i; j < len; ++j) {
460 if (base[j] == ']') {
461 base[j] = '\0';
462 break;
463 }
464 }
465 if (j >= len) {
466
467 orte_show_help("help-ras-slurm.txt", "slurm-env-var-bad-value",
468 1, regexp, tasks_per_node, "SLURM_NODELIST");
469 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
470 free(orig);
471 return ORTE_ERR_BAD_PARAM;
472 }
473
474 ret = orte_ras_slurm_parse_ranges(base, base + i + 1, &names);
475 if(ORTE_SUCCESS != ret) {
476 orte_show_help("help-ras-slurm.txt", "slurm-env-var-bad-value",
477 1, regexp, tasks_per_node, "SLURM_NODELIST");
478 ORTE_ERROR_LOG(ret);
479 free(orig);
480 return ret;
481 }
482 if(base[j + 1] == ',') {
483 more_to_come = true;
484 base = &base[j + 2];
485 } else {
486 more_to_come = false;
487 }
488 } else {
489
490
491 OPAL_OUTPUT_VERBOSE((1, orte_ras_base_framework.framework_output,
492 "%s ras:slurm:allocate:discover: found node %s",
493 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
494 base));
495
496 if(ORTE_SUCCESS != (ret = opal_argv_append_nosize(&names, base))) {
497 ORTE_ERROR_LOG(ret);
498 free(orig);
499 return ret;
500 }
501
502 base = &base[i + 1];
503 }
504 } while(more_to_come);
505
506 free(orig);
507
508 num_nodes = opal_argv_count(names);
509
510
511
512 slots = malloc(sizeof(int) * num_nodes);
513 if (NULL == slots) {
514 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
515 return ORTE_ERR_OUT_OF_RESOURCE;
516 }
517 memset(slots, 0, sizeof(int) * num_nodes);
518
519 orig = begptr = strdup(tasks_per_node);
520 if (NULL == begptr) {
521 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
522 free(slots);
523 return ORTE_ERR_OUT_OF_RESOURCE;
524 }
525
526 j = 0;
527 while (begptr) {
528 count = strtol(begptr, &endptr, 10);
529 if ((endptr[0] == '(') && (endptr[1] == 'x')) {
530 reps = strtol((endptr+2), &endptr, 10);
531 if (endptr[0] == ')') {
532 endptr++;
533 }
534 } else {
535 reps = 1;
536 }
537
538
539
540
541
542
543
544
545
546
547
548
549
550 for (i = 0; i < reps && j < num_nodes; i++) {
551 slots[j++] = count;
552 }
553
554 if (*endptr == ',') {
555 begptr = endptr + 1;
556 } else if (*endptr == '\0' || j >= num_nodes) {
557 break;
558 } else {
559 orte_show_help("help-ras-slurm.txt", "slurm-env-var-bad-value", 1,
560 regexp, tasks_per_node, "SLURM_TASKS_PER_NODE");
561 ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
562 free(slots);
563 free(orig);
564 return ORTE_ERR_BAD_PARAM;
565 }
566 }
567
568 free(orig);
569
570
571
572 for (i = 0; NULL != names && NULL != names[i]; ++i) {
573 orte_node_t *node;
574
575 if( !orte_keep_fqdn_hostnames && !opal_net_isaddr(names[i]) ) {
576 if (NULL != (ptr = strchr(names[i], '.'))) {
577 *ptr = '\0';
578 }
579 }
580
581 OPAL_OUTPUT_VERBOSE((1, orte_ras_base_framework.framework_output,
582 "%s ras:slurm:allocate:discover: adding node %s (%d slot%s)",
583 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
584 names[i], slots[i], (1 == slots[i]) ? "" : "s"));
585
586 node = OBJ_NEW(orte_node_t);
587 if (NULL == node) {
588 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
589 free(slots);
590 return ORTE_ERR_OUT_OF_RESOURCE;
591 }
592 node->name = strdup(names[i]);
593 node->state = ORTE_NODE_STATE_UP;
594 node->slots_inuse = 0;
595 node->slots_max = 0;
596 node->slots = slots[i];
597 opal_list_append(nodelist, &node->super);
598 }
599 free(slots);
600 opal_argv_free(names);
601
602
603 return ret;
604 }
605
606
607
608
609
610
611
612
613
614
615 static int orte_ras_slurm_parse_ranges(char *base, char *ranges, char ***names)
616 {
617 int i, len, ret;
618 char *start, *orig;
619
620
621
622 len = strlen(ranges);
623 for (orig = start = ranges, i = 0; i < len; ++i) {
624 if (',' == ranges[i]) {
625 ranges[i] = '\0';
626 ret = orte_ras_slurm_parse_range(base, start, names);
627 if (ORTE_SUCCESS != ret) {
628 ORTE_ERROR_LOG(ret);
629 return ret;
630 }
631 start = ranges + i + 1;
632 }
633 }
634
635
636
637 if (start < orig + len) {
638
639 OPAL_OUTPUT_VERBOSE((1, orte_ras_base_framework.framework_output,
640 "%s ras:slurm:allocate:discover: parse range %s (2)",
641 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
642 start));
643
644 ret = orte_ras_slurm_parse_range(base, start, names);
645 if (ORTE_SUCCESS != ret) {
646 ORTE_ERROR_LOG(ret);
647 return ret;
648 }
649 }
650
651
652 return ORTE_SUCCESS;
653 }
654
655
656
657
658
659
660
661
662
663
664 static int orte_ras_slurm_parse_range(char *base, char *range, char ***names)
665 {
666 char *str, temp1[BUFSIZ];
667 size_t i, j, start, end;
668 size_t base_len, len, num_len;
669 size_t num_str_len;
670 bool found;
671 int ret;
672
673 len = strlen(range);
674 base_len = strlen(base);
675
676
677 start = end = 0;
678
679
680
681 for (found = false, i = 0; i < len; ++i) {
682 if (isdigit((int) range[i])) {
683 if (!found) {
684 start = atoi(range + i);
685 found = true;
686 break;
687 }
688 }
689 }
690 if (!found) {
691 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
692 return ORTE_ERR_NOT_FOUND;
693 }
694
695
696
697 for (found = false, num_str_len = 0; i < len; ++i, ++num_str_len) {
698 if (!isdigit((int) range[i])) {
699 break;
700 }
701 }
702
703
704
705 if (i >= len) {
706 end = start;
707 found = true;
708 }
709
710
711
712
713 else {
714 for (; i < len; ++i) {
715 if (isdigit((int) range[i])) {
716 end = atoi(range + i);
717 found = true;
718 break;
719 }
720 }
721 }
722 if (!found) {
723 ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
724 return ORTE_ERR_NOT_FOUND;
725 }
726
727
728
729 len = base_len + num_str_len + 32;
730 str = malloc(len);
731 if (NULL == str) {
732 ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
733 return ORTE_ERR_OUT_OF_RESOURCE;
734 }
735 strcpy(str, base);
736 for (i = start; i <= end; ++i) {
737 str[base_len] = '\0';
738 snprintf(temp1, BUFSIZ - 1, "%lu", (long) i);
739
740
741
742 if ((num_len = strlen(temp1)) < num_str_len) {
743 for (j = base_len; j < base_len + (num_str_len - num_len); ++j) {
744 str[j] = '0';
745 }
746 str[j] = '\0';
747 }
748 strcat(str, temp1);
749 ret = opal_argv_append_nosize(names, str);
750 if(ORTE_SUCCESS != ret) {
751 ORTE_ERROR_LOG(ret);
752 free(str);
753 return ret;
754 }
755 }
756 free(str);
757
758
759 return ORTE_SUCCESS;
760 }
761
762 static void timeout(int fd, short args, void *cbdata)
763 {
764 local_jobtracker_t *jtrk = (local_jobtracker_t*)cbdata;
765 orte_job_t *jdata;
766
767 orte_show_help("help-ras-slurm.txt", "slurm-dyn-alloc-timeout", true);
768 opal_output_verbose(2, orte_ras_base_framework.framework_output,
769 "%s Timed out on dynamic allocation",
770 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
771
772 jdata = orte_get_job_data_object(jtrk->jobid);
773 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_ALLOC_FAILED);
774 }
775
776 static void recv_data(int fd, short args, void *cbdata)
777 {
778 bool found;
779 int i, rc;
780 orte_node_t *nd, *nd2;
781 opal_list_t nds, ndtmp;
782 opal_list_item_t *item, *itm;
783 char recv_msg[8192];
784 int nbytes, idx, sjob;
785 char **alloc, *nodelist, *tpn;
786 local_jobtracker_t *ptr, *jtrk;
787 local_apptracker_t *aptrk;
788 orte_app_context_t *app;
789 orte_jobid_t jobid;
790 orte_job_t *jdata;
791 char **dash_host = NULL;
792
793 opal_output_verbose(2, orte_ras_base_framework.framework_output,
794 "%s ras:slurm: dynamic allocation - data recvd",
795 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
796
797
798
799
800 memset(recv_msg, 0, sizeof(recv_msg));
801 nbytes = read(fd, recv_msg, sizeof(recv_msg) - 1);
802
803 opal_output_verbose(2, orte_ras_base_framework.framework_output,
804 "%s ras:slurm: dynamic allocation msg: %s",
805 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), recv_msg);
806
807
808 if (0 == nbytes || 0 == strlen(recv_msg) || strstr(recv_msg, "failure") != NULL) {
809
810
811
812 orte_show_help("help-ras-slurm.txt", "slurm-dyn-alloc-failed", true,
813 (0 == strlen(recv_msg)) ? "NO MSG" : recv_msg);
814 ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_ALLOC_FAILED);
815 return;
816 }
817
818
819 alloc = opal_argv_split(recv_msg, ':');
820
821
822 tpn = strchr(alloc[0], '=');
823 orte_util_convert_string_to_jobid(&jobid, tpn+1);
824
825 jdata = orte_get_job_data_object(jobid);
826 jtrk = NULL;
827
828 for (item = opal_list_get_first(&jobs);
829 item != opal_list_get_end(&jobs);
830 item = opal_list_get_next(item)) {
831 ptr = (local_jobtracker_t*)item;
832 if (ptr->jobid == jobid) {
833 jtrk = ptr;
834 break;
835 }
836 }
837 if (NULL == jtrk) {
838 orte_show_help("help-ras-slurm.txt", "slurm-dyn-alloc-failed", true, "NO JOB TRACKER");
839 ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_ALLOC_FAILED);
840 opal_argv_free(alloc);
841 return;
842 }
843
844
845 opal_event_del(&jtrk->timeout_ev);
846
847
848
849
850 OBJ_CONSTRUCT(&nds, opal_list_t);
851 OBJ_CONSTRUCT(&ndtmp, opal_list_t);
852 idx = -1;
853 sjob = -1;
854 nodelist = NULL;
855 tpn = NULL;
856 for (i=1; NULL != alloc[i]; i++) {
857 if (ORTE_SUCCESS != parse_alloc_msg(alloc[i], &idx, &sjob, &nodelist, &tpn)) {
858 orte_show_help("help-ras-slurm.txt", "slurm-dyn-alloc-failed", true, jtrk->cmd);
859 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_ALLOC_FAILED);
860 opal_argv_free(alloc);
861 if (NULL != nodelist) {
862 free(nodelist);
863 }
864 if (NULL != tpn) {
865 free(tpn);
866 }
867 return;
868 }
869 if (idx < 0) {
870 orte_show_help("help-ras-slurm.txt", "slurm-dyn-alloc-failed", true, jtrk->cmd);
871 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_ALLOC_FAILED);
872 opal_argv_free(alloc);
873 free(nodelist);
874 free(tpn);
875 return;
876 }
877 if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, idx))) {
878 orte_show_help("help-ras-slurm.txt", "slurm-dyn-alloc-failed", true, jtrk->cmd);
879 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_ALLOC_FAILED);
880 opal_argv_free(alloc);
881 free(nodelist);
882 free(tpn);
883 return;
884 }
885
886 orte_remove_attribute(&app->attributes, ORTE_APP_DASH_HOST);
887
888 if (NULL == (aptrk = (local_apptracker_t*)opal_pointer_array_get_item(&jtrk->apps, idx))) {
889 aptrk = OBJ_NEW(local_apptracker_t);
890 opal_pointer_array_set_item(&jtrk->apps, idx, aptrk);
891 }
892 aptrk->sjob = sjob;
893
894 if (ORTE_SUCCESS != (rc = orte_ras_slurm_discover(nodelist, tpn, &ndtmp))) {
895 ORTE_ERROR_LOG(rc);
896 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_ALLOC_FAILED);
897 opal_argv_free(alloc);
898 free(nodelist);
899 free(tpn);
900 return;
901 }
902
903
904
905 while (NULL != (item = opal_list_remove_first(&ndtmp))) {
906 nd = (orte_node_t*)item;
907 opal_argv_append_nosize(&dash_host, nd->name);
908
909 found = false;
910 for (itm = opal_list_get_first(&nds);
911 itm != opal_list_get_end(&nds);
912 itm = opal_list_get_next(itm)) {
913 nd2 = (orte_node_t*)itm;
914 if (0 == strcmp(nd->name, nd2->name)) {
915 found = true;
916 nd2->slots += nd->slots;
917 OBJ_RELEASE(item);
918 break;
919 }
920 }
921 if (!found) {
922
923 opal_list_append(&nds, item);
924 }
925 }
926
927 free(nodelist);
928 free(tpn);
929 }
930
931 opal_argv_free(alloc);
932 OBJ_DESTRUCT(&ndtmp);
933 if (NULL != dash_host) {
934 tpn = opal_argv_join(dash_host, ',');
935 for (idx=0; idx < jdata->apps->size; idx++) {
936 if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, idx))) {
937 orte_show_help("help-ras-slurm.txt", "slurm-dyn-alloc-failed", true, jtrk->cmd);
938 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_ALLOC_FAILED);
939 opal_argv_free(dash_host);
940 free(tpn);
941 return;
942 }
943 orte_set_attribute(&app->attributes, ORTE_APP_DASH_HOST, ORTE_ATTR_LOCAL, (void*)tpn, OPAL_STRING);
944 }
945 opal_argv_free(dash_host);
946 free(tpn);
947 }
948
949 if (opal_list_is_empty(&nds)) {
950
951
952
953
954
955
956
957 OBJ_DESTRUCT(&nds);
958 orte_show_help("help-ras-base.txt", "ras-base:no-allocation", true);
959 ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
960 }
961
962
963 if (ORTE_SUCCESS != (rc = orte_ras_base_node_insert(&nds, jdata))) {
964 ORTE_ERROR_LOG(rc);
965 OBJ_DESTRUCT(&nds);
966 ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE);
967 return;
968 }
969 OBJ_DESTRUCT(&nds);
970
971
972 if (!(ORTE_MAPPING_SUBSCRIBE_GIVEN & ORTE_GET_MAPPING_DIRECTIVE(orte_rmaps_base.mapping))) {
973 ORTE_SET_MAPPING_DIRECTIVE(orte_rmaps_base.mapping, ORTE_MAPPING_NO_OVERSUBSCRIBE);
974 }
975
976 orte_managed_allocation = true;
977
978 ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_ALLOCATION_COMPLETE);
979
980 return;
981 }
982
983
984
985
986
987 static int dyn_allocate(orte_job_t *jdata)
988 {
989 char *cmd_str, **cmd=NULL, *tmp, *jstring;
990 char *node_list;
991 orte_app_context_t *app;
992 int i;
993 struct timeval tv;
994 local_jobtracker_t *jtrk;
995 int64_t i64, *i64ptr;
996
997 if (NULL == mca_ras_slurm_component.config_file) {
998 opal_output(0, "Cannot perform dynamic allocation as no Slurm configuration file provided");
999 return ORTE_ERR_NOT_FOUND;
1000 }
1001
1002
1003 jtrk = OBJ_NEW(local_jobtracker_t);
1004 jtrk->jobid = jdata->jobid;
1005 opal_list_append(&jobs, &jtrk->super);
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017 opal_argv_append_nosize(&cmd, "allocate");
1018
1019 orte_util_convert_jobid_to_string(&jstring, jdata->jobid);
1020 opal_asprintf(&tmp, "jobid=%s", jstring);
1021 opal_argv_append_nosize(&cmd, tmp);
1022 free(tmp);
1023 free(jstring);
1024
1025
1026
1027
1028
1029
1030 #if 0
1031 if (!mca_ras_slurm_component.rolling_alloc) {
1032 opal_argv_append_nosize(&cmd, "return=all");
1033 }
1034 #else
1035 opal_argv_append_nosize(&cmd, "return=all");
1036 #endif
1037
1038
1039 opal_asprintf(&tmp, "timeout=%d", mca_ras_slurm_component.timeout);
1040 opal_argv_append_nosize(&cmd, tmp);
1041 free(tmp);
1042
1043
1044 i64ptr = &i64;
1045 for (i=0; i < jdata->apps->size; i++) {
1046 if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(jdata->apps, i))) {
1047 continue;
1048 }
1049
1050 opal_asprintf(&tmp, ": app=%d", (int)app->idx);
1051 opal_argv_append_nosize(&cmd, tmp);
1052 free(tmp);
1053
1054 opal_asprintf(&tmp, "np=%d", app->num_procs);
1055 opal_argv_append_nosize(&cmd, tmp);
1056 free(tmp);
1057
1058 if (orte_get_attribute(&app->attributes, ORTE_APP_MIN_NODES, (void**)&i64ptr, OPAL_INT64)) {
1059 opal_asprintf(&tmp, "N=%ld", (long int)i64);
1060 opal_argv_append_nosize(&cmd, tmp);
1061 free(tmp);
1062 }
1063
1064
1065
1066 node_list = get_node_list(app);
1067 if (NULL != node_list) {
1068 opal_asprintf(&tmp, "node_list=%s", node_list);
1069 opal_argv_append_nosize(&cmd, tmp);
1070 free(node_list);
1071 free(tmp);
1072 }
1073
1074 if (orte_get_attribute(&app->attributes, ORTE_APP_MANDATORY, NULL, OPAL_BOOL)) {
1075 opal_argv_append_nosize(&cmd, "flag=mandatory");
1076 } else {
1077 opal_argv_append_nosize(&cmd, "flag=optional");
1078 }
1079 }
1080
1081
1082 cmd_str = opal_argv_join(cmd, ' ');
1083 opal_argv_free(cmd);
1084
1085
1086
1087
1088
1089 opal_event_evtimer_set(orte_event_base, &jtrk->timeout_ev, timeout, jtrk);
1090 tv.tv_sec = mca_ras_slurm_component.timeout * 2;
1091 tv.tv_usec = 0;
1092 opal_event_evtimer_add(&jtrk->timeout_ev, &tv);
1093
1094 opal_output_verbose(2, orte_ras_base_framework.framework_output,
1095 "%s slurm:dynalloc cmd_str = %s",
1096 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
1097 cmd_str);
1098
1099 if (send(socket_fd, cmd_str, strlen(cmd_str)+1, 0) < 0) {
1100 ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
1101 }
1102 free(cmd_str);
1103
1104
1105
1106
1107
1108
1109
1110 return ORTE_ERR_ALLOCATION_PENDING;
1111 }
1112
1113 static int parse_alloc_msg(char *msg, int *idx, int *sjob,
1114 char **nodelist, char **tpn)
1115 {
1116 char *tmp;
1117 char *p_str;
1118 char *pos;
1119 int found=0;
1120
1121 if (msg == NULL || strlen(msg) == 0) {
1122 return ORTE_ERR_BAD_PARAM;
1123 }
1124
1125 tmp = strdup(msg);
1126 p_str = strtok(tmp, " ");
1127 while (p_str) {
1128 if (NULL != strstr(p_str, "slurm_jobid")) {
1129 pos = strchr(p_str, '=');
1130 *sjob = strtol(pos+1, NULL, 10);
1131 found++;
1132 } else if (NULL != strstr(p_str, "allocated_node_list")) {
1133 pos = strchr(p_str, '=');
1134 *nodelist = strdup(pos+1);
1135 found++;
1136 } else if (NULL != strstr(p_str, "tasks_per_node")) {
1137 pos = strchr(p_str, '=');
1138 *tpn = strdup(pos+1);
1139 found++;
1140 } else if (NULL != strstr(p_str, "app")) {
1141 pos = strchr(p_str, '=');
1142 *idx = strtol(pos+1, NULL, 10);
1143 found++;
1144 }
1145 p_str = strtok(NULL, " ");
1146 }
1147 free(tmp);
1148
1149 if (4 != found) {
1150 return ORTE_ERR_NOT_FOUND;
1151 }
1152 return ORTE_SUCCESS;
1153 }
1154
1155 static char* get_node_list(orte_app_context_t *app)
1156 {
1157 int j;
1158 char **total_host = NULL;
1159 char *nodes;
1160 char **dash_host, *dh;
1161
1162 if (!orte_get_attribute(&app->attributes, ORTE_APP_DASH_HOST, (void**)&dh, OPAL_STRING)) {
1163 return NULL;
1164 }
1165 dash_host = opal_argv_split(dh, ',');
1166 free(dh);
1167 for (j=0; NULL != dash_host[j]; j++) {
1168 opal_argv_append_unique_nosize(&total_host, dash_host[j], false);
1169 }
1170 opal_argv_free(dash_host);
1171 if (NULL == total_host) {
1172 return NULL;
1173 }
1174
1175 nodes = opal_argv_join(total_host, ',');
1176 opal_argv_free(total_host);
1177 return nodes;
1178 }
1179
1180 static int read_ip_port(char *filename, char **ip, uint16_t *port)
1181 {
1182 FILE *fp;
1183 char line[ORTE_SLURM_DYN_MAX_SIZE];
1184 char *pos;
1185 bool found_port = false;
1186 bool found_ip = false;
1187
1188 if (NULL == (fp = fopen(filename, "r"))) {
1189 orte_show_help("help-ras-slurm.txt", "config-file-not-found", true, filename);
1190 return ORTE_ERR_SILENT;
1191 }
1192
1193 memset(line, 0, ORTE_SLURM_DYN_MAX_SIZE);
1194 while (NULL != fgets(line, ORTE_SLURM_DYN_MAX_SIZE, fp) &&
1195 (!found_ip || !found_port)) {
1196 if (0 == strlen(line)) {
1197 continue;
1198 }
1199 line[strlen(line)-1] = '\0';
1200 if (0 == strncmp(line, "JobSubmitDynAllocPort", strlen("JobSubmitDynAllocPort"))) {
1201 pos = strstr(line, "=") + 1;
1202 *port = strtol(pos, NULL, 10);
1203 found_port = true;
1204 } else if (0 == strncmp(line, "ControlMachine", strlen("ControlMachine"))) {
1205 pos = strstr(line, "=") + 1;
1206 *ip = strdup(pos);
1207 found_ip = true;
1208 }
1209 memset(line, 0, ORTE_SLURM_DYN_MAX_SIZE);
1210 }
1211
1212 fclose(fp);
1213 if (!found_ip) {
1214 opal_output(0, "The IP address or name of the Slurm control machine was not provided");
1215 return ORTE_ERR_NOT_FOUND;
1216 }
1217 if (!found_port) {
1218 opal_output(0, "The IP port of the Slurm dynamic allocation service was not provided");
1219 return ORTE_ERR_NOT_FOUND;
1220 }
1221
1222 return ORTE_SUCCESS;
1223 }