This source file includes following definitions.
- mca_scoll_basic_collect
- _algorithm_f_central_counter
- _algorithm_f_tournament
- _algorithm_f_ring
- _algorithm_f_recursive_doubling
- _algorithm_central_collector
   1 
   2 
   3 
   4 
   5 
   6 
   7 
   8 
   9 
  10 
  11 
  12 
  13 #include "oshmem_config.h"
  14 #include <stdio.h>
  15 #include <stdlib.h>
  16 
  17 #include "oshmem/constants.h"
  18 #include "oshmem/mca/spml/spml.h"
  19 #include "oshmem/mca/scoll/scoll.h"
  20 #include "oshmem/mca/scoll/base/base.h"
  21 #include "scoll_basic.h"
  22 
  23 static int _algorithm_central_collector(struct oshmem_group_t *group,
  24                                          void *target,
  25                                          const void *source,
  26                                          size_t nlong,
  27                                          long *pSync);
  28 static int _algorithm_f_central_counter(struct oshmem_group_t *group,
  29                                          void *target,
  30                                          const void *source,
  31                                          size_t nlong,
  32                                          long *pSync);
  33 static int _algorithm_f_tournament(struct oshmem_group_t *group,
  34                                     void *target,
  35                                     const void *source,
  36                                     size_t nlong,
  37                                     long *pSync);
  38 static int _algorithm_f_recursive_doubling(struct oshmem_group_t *group,
  39                                             void *target,
  40                                             const void *source,
  41                                             size_t nlong,
  42                                             long *pSync);
  43 static int _algorithm_f_ring(struct oshmem_group_t *group,
  44                               void *target,
  45                               const void *source,
  46                               size_t nlong,
  47                               long *pSync);
  48 
  49 int mca_scoll_basic_collect(struct oshmem_group_t *group,
  50                             void *target,
  51                             const void *source,
  52                             size_t nlong,
  53                             long *pSync,
  54                             bool nlong_type,
  55                             int alg)
  56 {
  57     int rc = OSHMEM_SUCCESS;
  58 
  59     
  60     if (!group || !pSync) {
  61         SCOLL_ERROR("Active set (group) of PE is not defined");
  62         rc = OSHMEM_ERR_BAD_PARAM;
  63     }
  64 
  65     
  66     if ((rc == OSHMEM_SUCCESS) && oshmem_proc_group_is_member(group)) {
  67         int i = 0;
  68 
  69         if (nlong_type) {
  70 
  71             
  72             if (OPAL_UNLIKELY(!nlong)) {
  73                 return OPAL_SUCCESS;
  74             }
  75 
  76             alg = (alg == SCOLL_DEFAULT_ALG ?
  77                     mca_scoll_basic_param_collect_algorithm : alg);
  78             switch (alg) {
  79             case SCOLL_ALG_COLLECT_CENTRAL_COUNTER:
  80                 {
  81                     rc = _algorithm_f_central_counter(group,
  82                                                        target,
  83                                                        source,
  84                                                        nlong,
  85                                                        pSync);
  86                     break;
  87                 }
  88             case SCOLL_ALG_COLLECT_TOURNAMENT:
  89                 {
  90                     rc = _algorithm_f_tournament(group,
  91                                                   target,
  92                                                   source,
  93                                                   nlong,
  94                                                   pSync);
  95                     break;
  96                 }
  97             case SCOLL_ALG_COLLECT_RECURSIVE_DOUBLING:
  98                 {
  99                     rc = _algorithm_f_recursive_doubling(group,
 100                                                           target,
 101                                                           source,
 102                                                           nlong,
 103                                                           pSync);
 104                     break;
 105                 }
 106             case SCOLL_ALG_COLLECT_RING:
 107                 {
 108                     rc = _algorithm_f_ring(group,
 109                                             target,
 110                                             source,
 111                                             nlong,
 112                                             pSync);
 113                     break;
 114                 }
 115             default:
 116                 {
 117                     rc = _algorithm_f_central_counter(group,
 118                                                        target,
 119                                                        source,
 120                                                        nlong,
 121                                                        pSync);
 122                 }
 123             }
 124         } else {
 125             rc = _algorithm_central_collector(group,
 126                                                target,
 127                                                source,
 128                                                nlong,
 129                                                pSync);
 130         }
 131 
 132         
 133         SCOLL_VERBOSE(12,
 134                       "[#%d] Restore special synchronization array",
 135                       group->my_pe);
 136         for (i = 0; i < _SHMEM_COLLECT_SYNC_SIZE; i++) {
 137             pSync[i] = _SHMEM_SYNC_VALUE;
 138         }
 139     }
 140 
 141     return rc;
 142 }
 143 
 144 
 145 
 146 
 147 
 148 
 149 
 150 static int _algorithm_f_central_counter(struct oshmem_group_t *group,
 151                                          void *target,
 152                                          const void *source,
 153                                          size_t nlong,
 154                                          long *pSync)
 155 {
 156     int rc = OSHMEM_SUCCESS;
 157     int i = 0;
 158     int PE_root = oshmem_proc_pe(group->proc_array[0]);
 159 
 160     SCOLL_VERBOSE(12,
 161                   "[#%d] Collect algorithm: Central Counter (identical size)",
 162                   group->my_pe);
 163     SCOLL_VERBOSE(15, "[#%d] pSync[0] = %ld", group->my_pe, pSync[0]);
 164 
 165     if (PE_root == group->my_pe) {
 166         int pe_cur = 0;
 167 
 168         memcpy((void*) ((unsigned char*) target + 0 * nlong),
 169                (void *) source,
 170                nlong);
 171 
 172         SCOLL_VERBOSE(14,
 173                       "[#%d] Gather data from all PEs in the group",
 174                       group->my_pe);
 175         for (i = 0; (i < group->proc_count) && (rc == OSHMEM_SUCCESS); i++) {
 176             
 177             pe_cur = oshmem_proc_pe(group->proc_array[i]);
 178 
 179             if (pe_cur == group->my_pe)
 180                 continue;
 181 
 182             SCOLL_VERBOSE(14,
 183                           "[#%d] Gather data (%d bytes) from #%d",
 184                           group->my_pe, (int)nlong, pe_cur);
 185 
 186             
 187             rc = MCA_SPML_CALL(get(oshmem_ctx_default, (void *)source, nlong, (void*)((unsigned char*)target + i * nlong), pe_cur));
 188         }
 189     }
 190 
 191     
 192     if (rc == OSHMEM_SUCCESS) {
 193         SCOLL_VERBOSE(14,
 194                       "[#%d] Broadcast from the root #%d",
 195                       group->my_pe, PE_root);
 196         rc = BCAST_FUNC(group,
 197                     PE_root,
 198                     target,
 199                     target,
 200                     group->proc_count * nlong,
 201                     (pSync + 1),
 202                     true,
 203                     SCOLL_DEFAULT_ALG);
 204     }
 205 
 206     SCOLL_VERBOSE(15, "[#%d] pSync[0] = %ld", group->my_pe, pSync[0]);
 207 
 208     return rc;
 209 }
 210 
 211 static int _algorithm_f_tournament(struct oshmem_group_t *group,
 212                                     void *target,
 213                                     const void *source,
 214                                     size_t nlong,
 215                                     long *pSync)
 216 {
 217     int rc = OSHMEM_SUCCESS;
 218     int round = 0;
 219     int exit_flag = group->proc_count - 1;
 220     long value = SHMEM_SYNC_INIT;
 221     int my_id = oshmem_proc_group_find_id(group, group->my_pe);
 222     int peer_id = 0;
 223     int peer_pe = 0;
 224     int PE_root = oshmem_proc_pe(group->proc_array[0]);
 225 
 226     SCOLL_VERBOSE(12,
 227                   "[#%d] Collect algorithm: Tournament (identical size)",
 228                   group->my_pe);
 229     SCOLL_VERBOSE(15, "[#%d] pSync[0] = %ld", group->my_pe, pSync[0]);
 230 
 231     
 232     pSync[0] = SHMEM_SYNC_WAIT;
 233 
 234     
 235     memcpy((void*) ((unsigned char*) target + my_id * nlong),
 236            (void *) source,
 237            nlong);
 238 
 239     while (exit_flag && (rc == OSHMEM_SUCCESS)) {
 240         
 241         peer_id = my_id ^ (1 << round);
 242 
 243         
 244         exit_flag >>= 1;
 245         round++;
 246 
 247         
 248         if (peer_id >= group->proc_count)
 249             continue;
 250 
 251         if (my_id < peer_id) {
 252             pSync[0] = peer_id;
 253             value = my_id;
 254 
 255             SCOLL_VERBOSE(14, "[#%d] round = %d wait", group->my_pe, round);
 256             rc = MCA_SPML_CALL(wait((void*)pSync, SHMEM_CMP_EQ, (void*)&value, SHMEM_LONG));
 257         } else {
 258             peer_pe = oshmem_proc_pe(group->proc_array[peer_id]);
 259 
 260 #if 1 
 261 
 262 
 263 
 264             do {
 265                 MCA_SPML_CALL(get(oshmem_ctx_default, (void*)pSync, sizeof(value), (void*)&value, peer_pe));
 266             } while (value != my_id);
 267 
 268             SCOLL_VERBOSE(14,
 269                           "[#%d] round = %d send data to #%d",
 270                           group->my_pe, round, peer_pe);
 271             rc = MCA_SPML_CALL(put(oshmem_ctx_default, (void*)((unsigned char*)target + my_id * nlong), (1 << (round - 1)) * nlong, (void*)((unsigned char*)target + my_id * nlong), peer_pe));
 272 
 273             MCA_SPML_CALL(fence(oshmem_ctx_default));
 274 
 275             SCOLL_VERBOSE(14,
 276                           "[#%d] round = %d signals to #%d",
 277                           group->my_pe, round, peer_pe);
 278             value = peer_id;
 279             rc = MCA_SPML_CALL(put(oshmem_ctx_default, (void*)pSync, sizeof(value), (void*)&value, peer_pe));
 280 #endif
 281             SCOLL_VERBOSE(14, "[#%d] round = %d wait", group->my_pe, round);
 282             value = SHMEM_SYNC_RUN;
 283             rc = MCA_SPML_CALL(wait((void*)pSync, SHMEM_CMP_EQ, (void*)&value, SHMEM_LONG));
 284 
 285             break;
 286         }
 287     }
 288 
 289     
 290     if ((my_id == 0) && (rc == OSHMEM_SUCCESS)) {
 291         SCOLL_VERBOSE(14, "[#%d] signals to all", group->my_pe);
 292 
 293         value = SHMEM_SYNC_RUN;
 294         for (peer_id = 1;
 295                 (peer_id < group->proc_count) && (rc == OSHMEM_SUCCESS);
 296                 peer_id++) {
 297             peer_pe = oshmem_proc_pe(group->proc_array[peer_id]);
 298             rc = MCA_SPML_CALL(put(oshmem_ctx_default, (void*)pSync, sizeof(value), (void*)&value, peer_pe));
 299         }
 300     }
 301 
 302     
 303     if (rc == OSHMEM_SUCCESS) {
 304         SCOLL_VERBOSE(14,
 305                       "[#%d] Broadcast from the root #%d",
 306                       group->my_pe, PE_root);
 307         rc = BCAST_FUNC(group,
 308                 PE_root,
 309                 target,
 310                 target,
 311                 group->proc_count * nlong,
 312                 (pSync + 1),
 313                 true,
 314                 SCOLL_DEFAULT_ALG);
 315     }
 316 
 317     SCOLL_VERBOSE(15, "[#%d] pSync[0] = %ld", group->my_pe, pSync[0]);
 318 
 319     return rc;
 320 }
 321 
 322 static int _algorithm_f_ring(struct oshmem_group_t *group,
 323                               void *target,
 324                               const void *source,
 325                               size_t nlong,
 326                               long *pSync)
 327 {
 328     int rc = OSHMEM_SUCCESS;
 329     int i = 0;
 330     long value = SHMEM_SYNC_INIT;
 331     int my_id = oshmem_proc_group_find_id(group, group->my_pe);
 332     int data_index = 0;
 333     int peer_id = 0;
 334     int peer_pe = 0;
 335 
 336     SCOLL_VERBOSE(12,
 337                   "[#%d] Collect algorithm: Ring (identical size)",
 338                   group->my_pe);
 339     SCOLL_VERBOSE(15, "[#%d] pSync[0] = %ld", group->my_pe, pSync[0]);
 340 
 341     peer_id = (my_id + 1) % group->proc_count;
 342     peer_pe = oshmem_proc_pe(group->proc_array[peer_id]);
 343     memcpy((void*) ((unsigned char*) target + my_id * nlong),
 344            (void *) source,
 345            nlong);
 346     data_index = my_id;
 347 
 348     for (i = 0; (i < (group->proc_count - 1)) && (rc == OSHMEM_SUCCESS); i++) {
 349         SCOLL_VERBOSE(14,
 350                       "[#%d] round = %d send data to #%d by index = %d",
 351                       group->my_pe, i, peer_pe, data_index);
 352         rc = MCA_SPML_CALL(put(oshmem_ctx_default, (void*)((unsigned char*)target + data_index * nlong), nlong, (void*)((unsigned char*)target + data_index * nlong), peer_pe));
 353 
 354         MCA_SPML_CALL(fence(oshmem_ctx_default));
 355 
 356         SCOLL_VERBOSE(14,
 357                       "[#%d] round = %d signals to #%d",
 358                       group->my_pe, i, peer_pe);
 359         value = i;
 360         rc = MCA_SPML_CALL(put(oshmem_ctx_default, (void*)pSync, sizeof(value), (void*)&value, peer_pe));
 361 
 362         data_index = (data_index ? (data_index - 1) : (group->proc_count - 1));
 363 
 364         SCOLL_VERBOSE(14,
 365                       "[#%d] round = %d wait for data by index = %d",
 366                       group->my_pe, i, data_index);
 367         if (i == 0) {
 368             value = _SHMEM_SYNC_VALUE;
 369             rc = MCA_SPML_CALL(wait((void*)pSync, SHMEM_CMP_NE, (void*)&value, SHMEM_LONG));
 370         } else {
 371             value = i;
 372             rc = MCA_SPML_CALL(wait((void*)pSync, SHMEM_CMP_GE, (void*)&value, SHMEM_LONG));
 373         }
 374     }
 375 
 376     SCOLL_VERBOSE(15, "[#%d] pSync[0] = %ld", group->my_pe, pSync[0]);
 377 
 378     return rc;
 379 }
 380 
 381 static int _algorithm_f_recursive_doubling(struct oshmem_group_t *group,
 382                                             void *target,
 383                                             const void *source,
 384                                             size_t nlong,
 385                                             long *pSync)
 386 {
 387     int rc = OSHMEM_SUCCESS;
 388     int round = 0;
 389     int floor2_proc = 0;
 390     int exit_flag = 0;
 391     long value = SHMEM_SYNC_INIT;
 392     int my_id = oshmem_proc_group_find_id(group, group->my_pe);
 393     int data_index = 0;
 394     int peer_id = 0;
 395     int peer_pe = 0;
 396     int i = 0;
 397 
 398     floor2_proc = 1;
 399     i = group->proc_count;
 400     i >>= 1;
 401     while (i) {
 402         i >>= 1;
 403         floor2_proc <<= 1;
 404     }
 405 
 406     SCOLL_VERBOSE(12,
 407                   "[#%d] Collect algorithm: Recursive Doubling (identical size)",
 408                   group->my_pe);
 409     SCOLL_VERBOSE(15,
 410                   "[#%d] pSync[0] = %ld floor2_proc = %d",
 411                   group->my_pe, pSync[0], floor2_proc);
 412 
 413     memcpy((void*) ((unsigned char*) target + my_id * nlong),
 414            (void *) source,
 415            nlong);
 416     data_index = my_id;
 417 
 418     if (my_id >= floor2_proc) {
 419         int pe_cur = 0;
 420 
 421         
 422         peer_id = my_id - floor2_proc;
 423         peer_pe = oshmem_proc_pe(group->proc_array[peer_id]);
 424 
 425         for (i = 0; (i < group->proc_count) && (rc == OSHMEM_SUCCESS); i++) {
 426             if (i == my_id)
 427                 continue;
 428 
 429             pe_cur = oshmem_proc_pe(group->proc_array[i]);
 430 
 431             SCOLL_VERBOSE(14,
 432                           "[#%d] is extra send data to #%d",
 433                           group->my_pe, pe_cur);
 434             rc = MCA_SPML_CALL(put(oshmem_ctx_default, (void*)((unsigned char*)target + data_index * nlong), nlong, (void *)source, pe_cur));
 435         }
 436 
 437         MCA_SPML_CALL(fence(oshmem_ctx_default));
 438 
 439         SCOLL_VERBOSE(14,
 440                       "[#%d] is extra and signal to #%d",
 441                       group->my_pe, peer_pe);
 442         value = SHMEM_SYNC_RUN;
 443         rc = MCA_SPML_CALL(put(oshmem_ctx_default, (void*)pSync, sizeof(value), (void*)&value, peer_pe));
 444 
 445         SCOLL_VERBOSE(14, "[#%d] wait", group->my_pe);
 446         value = SHMEM_SYNC_RUN;
 447         rc = MCA_SPML_CALL(wait((void*)pSync, SHMEM_CMP_EQ, (void*)&value, SHMEM_LONG));
 448     } else {
 449         
 450         if ((group->proc_count - floor2_proc) > my_id) {
 451             
 452             peer_id = my_id + floor2_proc;
 453             peer_pe = oshmem_proc_pe(group->proc_array[peer_id]);
 454 
 455             SCOLL_VERBOSE(14,
 456                           "[#%d] wait a signal from #%d",
 457                           group->my_pe, peer_pe);
 458             value = SHMEM_SYNC_RUN;
 459             rc = MCA_SPML_CALL(wait((void*)pSync, SHMEM_CMP_EQ, (void*)&value, SHMEM_LONG));
 460         }
 461 
 462         
 463         exit_flag = floor2_proc - 1;
 464         pSync[0] = round;
 465         while (exit_flag && (rc == OSHMEM_SUCCESS)) {
 466             
 467             peer_id = my_id ^ (1 << round);
 468 
 469             
 470             exit_flag >>= 1;
 471             round++;
 472 
 473             peer_pe = oshmem_proc_pe(group->proc_array[peer_id]);
 474 
 475 #if 1 
 476 
 477 
 478 
 479             do {
 480                 MCA_SPML_CALL(get(oshmem_ctx_default, (void*)pSync, sizeof(value), (void*)&value, peer_pe));
 481             } while (value != (round - 1));
 482 
 483             SCOLL_VERBOSE(14,
 484                           "[#%d] round = %d send data to #%d by index = %d",
 485                           group->my_pe, round, peer_pe, data_index);
 486             rc = MCA_SPML_CALL(put(oshmem_ctx_default, (void*)((unsigned char*)target + data_index * nlong), (1 << (round - 1)) * nlong, (void*)((unsigned char*)target + data_index * nlong), peer_pe));
 487 
 488             MCA_SPML_CALL(fence(oshmem_ctx_default));
 489 
 490             data_index = (my_id / (1 << round)) * (1 << round);
 491 
 492             SCOLL_VERBOSE(14,
 493                           "[#%d] round = %d signals to #%d",
 494                           group->my_pe, round, peer_pe);
 495             value = SHMEM_SYNC_RUN;
 496             rc = MCA_SPML_CALL(put(oshmem_ctx_default, (void*)pSync, sizeof(value), (void*)&value, peer_pe));
 497 #endif
 498 
 499             SCOLL_VERBOSE(14, "[#%d] round = %d wait", group->my_pe, round);
 500             value = SHMEM_SYNC_RUN;
 501             rc = MCA_SPML_CALL(wait((void*)pSync, SHMEM_CMP_EQ, (void*)&value, SHMEM_LONG));
 502 
 503             pSync[0] = round;
 504         }
 505 
 506         
 507         if ((group->proc_count - floor2_proc) > my_id) {
 508             
 509             peer_id = my_id + floor2_proc;
 510             peer_pe = oshmem_proc_pe(group->proc_array[peer_id]);
 511 
 512             SCOLL_VERBOSE(14,
 513                           "[#%d] is extra send data to #%d",
 514                           group->my_pe, peer_pe);
 515             rc = MCA_SPML_CALL(put(oshmem_ctx_default, target, group->proc_count * nlong, target, peer_pe));
 516 
 517             MCA_SPML_CALL(fence(oshmem_ctx_default));
 518 
 519             SCOLL_VERBOSE(14, "[#%d] signals to #%d", group->my_pe, peer_pe);
 520             value = SHMEM_SYNC_RUN;
 521             rc = MCA_SPML_CALL(put(oshmem_ctx_default, (void*)pSync, sizeof(value), (void*)&value, peer_pe));
 522         }
 523     }
 524 
 525     SCOLL_VERBOSE(15, "[#%d] pSync[0] = %ld", group->my_pe, pSync[0]);
 526 
 527     return rc;
 528 }
 529 
 530 
 531 
 532 
 533 
 534 
 535 
 536 static int _algorithm_central_collector(struct oshmem_group_t *group,
 537                                          void *target,
 538                                          const void *source,
 539                                          size_t nlong,
 540                                          long *pSync)
 541 {
 542     int rc = OSHMEM_SUCCESS;
 543     size_t offset = 0;
 544     int i = 0;
 545     int PE_root = oshmem_proc_pe(group->proc_array[0]);
 546 
 547     SCOLL_VERBOSE(12,
 548                   "[#%d] Collect algorithm: Central Counter (vary size)",
 549                   group->my_pe);
 550 
 551     
 552     pSync[0] = (nlong ? (long)nlong : SHMEM_SYNC_READY);
 553 
 554     if (PE_root == group->my_pe) {
 555         long value = 0;
 556         int pe_cur = 0;
 557         long wait_pe_count = 0;
 558         long* wait_pe_array = NULL;
 559 
 560         wait_pe_count = group->proc_count;
 561         wait_pe_array = malloc(sizeof(*wait_pe_array) * wait_pe_count);
 562         if (wait_pe_array) {
 563             memset((void*) wait_pe_array,
 564                    0,
 565                    sizeof(*wait_pe_array) * wait_pe_count);
 566             wait_pe_array[0] = nlong;
 567             wait_pe_count--;
 568 
 569             while (wait_pe_count) {
 570                 SCOLL_VERBOSE(14,
 571                               "[#%d] Gather data size info from all PEs in the group",
 572                               group->my_pe);
 573                 for (i = 1; (i < group->proc_count) && (rc == OSHMEM_SUCCESS);
 574                         i++) {
 575                     if (wait_pe_array[i] == 0) {
 576                         pe_cur = oshmem_proc_pe(group->proc_array[i]);
 577                         value = 0;
 578                         rc = MCA_SPML_CALL(get(oshmem_ctx_default, (void*)pSync, sizeof(value), (void*)&value, pe_cur));
 579                         if ((rc == OSHMEM_SUCCESS)
 580                                 && (value != _SHMEM_SYNC_VALUE)) {
 581                             wait_pe_array[i] = value;
 582                             wait_pe_count--;
 583                             SCOLL_VERBOSE(14,
 584                                           "Got source data size as %d from #%d (wait list counter: %d)",
 585                                           (int)value, pe_cur, (int)wait_pe_count);
 586                         }
 587                     }
 588                 }
 589             }
 590 
 591             memcpy((void*) ((unsigned char*) target + 0 * nlong),
 592                    (void *) source,
 593                    nlong);
 594             offset += nlong;
 595 
 596             for (i = 1; (i < group->proc_count) && (rc == OSHMEM_SUCCESS);
 597                     i++) {
 598 
 599                 
 600                 if (wait_pe_array[i] == SHMEM_SYNC_READY) {
 601                     continue;
 602                 }
 603 
 604                 
 605                 pe_cur = oshmem_proc_pe(group->proc_array[i]);
 606 
 607                 
 608                 rc = MCA_SPML_CALL(get(oshmem_ctx_default, (void *)source, (size_t)wait_pe_array[i], (void*)((unsigned char*)target + offset), pe_cur));
 609 
 610                 SCOLL_VERBOSE(14,
 611                               "Got %d bytes of data from #%d (offset: %d)",
 612                               (int)wait_pe_array[i], pe_cur, (int)offset);
 613 
 614                 offset += (size_t)wait_pe_array[i];
 615             }
 616 
 617             free(wait_pe_array);
 618         } else {
 619             rc = OSHMEM_ERR_OUT_OF_RESOURCE;
 620         }
 621     }
 622 
 623     
 624     if (rc == OSHMEM_SUCCESS) {
 625         SCOLL_VERBOSE(14,
 626                       "[#%d] Broadcast from the root #%d",
 627                       group->my_pe, PE_root);
 628 
 629         rc = BCAST_FUNC(group,
 630                 PE_root,
 631                 target,
 632                 target,
 633                 offset,
 634                 (pSync + 1),
 635                 false,
 636                 SCOLL_DEFAULT_ALG);
 637     }
 638 
 639     return rc;
 640 }