//.........这里部分代码省略.........
goto found;
}
/* if I am the HNP or a tool, then I stored a route to
* this job family, so look it up
*/
jfamily = ORTE_JOB_FAMILY(target->jobid);
for (i=0; i < orte_routed_jobfams.size; i++) {
if (NULL == (jfam = (orte_routed_jobfam_t*)opal_pointer_array_get_item(&orte_routed_jobfams, i))) {
continue;
}
if (jfam->job_family == jfamily) {
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_framework.framework_output,
"%s routed_binomial: route to %s found",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOB_FAMILY_PRINT(target->jobid)));
ret = &jfam->route;
goto found;
}
}
/* not found - so we have no route */
ret = ORTE_NAME_INVALID;
goto found;
}
/* THIS CAME FROM OUR OWN JOB FAMILY... */
/* if this is going to the HNP, then send it direct if we don't know
* how to get there - otherwise, send it via the tree
*/
if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_PROC_MY_HNP, target)) {
if (!hnp_direct || orte_static_ports) {
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_framework.framework_output,
"%s routing to the HNP through my parent %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_PARENT)));
ret = ORTE_PROC_MY_PARENT;
goto found;
} else {
OPAL_OUTPUT_VERBOSE((2, orte_routed_base_framework.framework_output,
"%s routing direct to the HNP",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
ret = ORTE_PROC_MY_HNP;
goto found;
}
}
daemon.jobid = ORTE_PROC_MY_NAME->jobid;
/* find out what daemon hosts this proc */
if (ORTE_VPID_INVALID == (daemon.vpid = orte_get_proc_daemon_vpid(target))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
ret = ORTE_NAME_INVALID;
goto found;
}
/* if the daemon is me, then send direct to the target! */
if (ORTE_PROC_MY_NAME->vpid == daemon.vpid) {
ret = target;
goto found;
} else if (orte_process_info.num_procs < mca_routed_radix_component.max_connections) {
/* if the job is small enough, send direct to the target's daemon */
ret = &daemon;
goto found;
} else {
/* search routing tree for next step to that daemon */
for (item = opal_list_get_first(&my_children);
item != opal_list_get_end(&my_children);
item = opal_list_get_next(item)) {
child = (orte_routed_tree_t*)item;
if (child->vpid == daemon.vpid) {
/* the child is hosting the proc - just send it there */
ret = &daemon;
goto found;
}
/* otherwise, see if the daemon we need is below the child */
if (opal_bitmap_is_set_bit(&child->relatives, daemon.vpid)) {
/* yep - we need to step through this child */
daemon.vpid = child->vpid;
ret = &daemon;
goto found;
}
}
}
/* if we get here, then the target daemon is not beneath
* any of our children, so we have to step up through our parent
*/
daemon.vpid = ORTE_PROC_MY_PARENT->vpid;
ret = &daemon;
found:
OPAL_OUTPUT_VERBOSE((1, orte_routed_base_framework.framework_output,
"%s routed_radix_get(%s) --> %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(target),
ORTE_NAME_PRINT(ret)));
return *ret;
}
static void update_routing_plan(void)
{
orte_routed_tree_t *child;
int j;
opal_list_item_t *item;
int Level,Sum,NInLevel,Ii;
int NInPrevLevel;
/* if I am anything other than a daemon or the HNP, this
* is a meaningless command as I am not allowed to route
*/
if (!ORTE_PROC_IS_DAEMON && !ORTE_PROC_IS_HNP) {
return;
}
/* clear the list of children if any are already present */
while (NULL != (item = opal_list_remove_first(&my_children))) {
OBJ_RELEASE(item);
}
num_children = 0;
/* compute my parent */
Ii = ORTE_PROC_MY_NAME->vpid;
Level=0;
Sum=1;
NInLevel=1;
while ( Sum < (Ii+1) ) {
Level++;
NInLevel *= mca_routed_radix_component.radix;
Sum += NInLevel;
}
Sum -= NInLevel;
NInPrevLevel = NInLevel/mca_routed_radix_component.radix;
if( 0 == Ii ) {
ORTE_PROC_MY_PARENT->vpid = -1;
} else {
ORTE_PROC_MY_PARENT->vpid = (Ii-Sum) % NInPrevLevel;
ORTE_PROC_MY_PARENT->vpid += (Sum - NInPrevLevel);
}
/* compute my direct children and the bitmap that shows which vpids
* lie underneath their branch
*/
radix_tree(Ii, &num_children, &my_children, NULL);
if (0 < opal_output_get_verbosity(orte_routed_base_framework.framework_output)) {
opal_output(0, "%s: parent %d num_children %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_PROC_MY_PARENT->vpid, num_children);
for (item = opal_list_get_first(&my_children);
item != opal_list_get_end(&my_children);
item = opal_list_get_next(item)) {
child = (orte_routed_tree_t*)item;
opal_output(0, "%s: \tchild %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), child->vpid);
for (j=0; j < (int)orte_process_info.num_procs; j++) {
if (opal_bitmap_is_set_bit(&child->relatives, j)) {
opal_output(0, "%s: \t\trelation %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), j);
}
}
}
}
}
/*
* If we have a valid section, see if we have a matching section
* somewhere (i.e., same vendor ID and vendor part ID). If we do,
* update the values. If not, save the values in a new instance and
* add it to the list.
*/
static int save_section(parsed_section_values_t *s)
{
int i, j;
opal_list_item_t *item;
device_values_t *h;
bool found;
/* Is the parsed section valid? */
if (NULL == s->name || 0 == s->vendor_ids_len ||
0 == s->vendor_part_ids_len) {
return OPAL_ERR_BAD_PARAM;
}
/* Iterate over each of the vendor/part IDs in the parsed
values */
for (i = 0; i < s->vendor_ids_len; ++i) {
for (j = 0; j < s->vendor_part_ids_len; ++j) {
found = false;
/* Iterate over all the saved devices */
for (item = opal_list_get_first(&devices);
item != opal_list_get_end(&devices);
item = opal_list_get_next(item)) {
h = (device_values_t*) item;
if (s->vendor_ids[i] == h->vendor_id &&
s->vendor_part_ids[j] == h->vendor_part_id) {
/* Found a match. Update any newly-set values. */
if (s->values.mtu_set) {
h->values.mtu = s->values.mtu;
h->values.mtu_set = true;
}
if (s->values.use_eager_rdma_set) {
h->values.use_eager_rdma = s->values.use_eager_rdma;
h->values.use_eager_rdma_set = true;
}
if (NULL != s->values.receive_queues) {
h->values.receive_queues =
strdup(s->values.receive_queues);
}
if (s->values.max_inline_data_set) {
h->values.max_inline_data = s->values.max_inline_data;
h->values.max_inline_data_set = true;
}
if (s->values.rdmacm_reject_causes_connect_error_set) {
h->values.rdmacm_reject_causes_connect_error =
s->values.rdmacm_reject_causes_connect_error;
h->values.rdmacm_reject_causes_connect_error_set =
true;
}
if (s->values.ignore_device_set) {
h->values.ignore_device = s->values.ignore_device;
h->values.ignore_device_set = true;
}
found = true;
break;
}
}
/* Did we find/update it in the exising list? If not,
create a new one. */
if (!found) {
h = OBJ_NEW(device_values_t);
h->section_name = strdup(s->name);
h->vendor_id = s->vendor_ids[i];
h->vendor_part_id = s->vendor_part_ids[j];
/* NOTE: There is a bug in the PGI 6.2 series that
causes the compiler to choke when copying structs
containing bool members by value. So do a memcpy
here instead. */
memcpy(&h->values, &s->values, sizeof(s->values));
/* Need to strdup the string, though */
if (NULL != h->values.receive_queues) {
h->values.receive_queues = strdup(s->values.receive_queues);
}
opal_list_append(&devices, &h->super);
}
}
}
/* All done */
return OPAL_SUCCESS;
}
/*
* determine the proper starting point for the next mapping operation
*/
orte_node_t* orte_rmaps_base_get_starting_point(opal_list_t *node_list,
orte_job_t *jdata)
{
opal_list_item_t *item, *cur_node_item;
orte_node_t *node, *nd1, *ndmin;
int overload;
/* if a bookmark exists from some prior mapping, set us to start there */
if (NULL != jdata->bookmark) {
cur_node_item = NULL;
/* find this node on the list */
for (item = opal_list_get_first(node_list);
item != opal_list_get_end(node_list);
item = opal_list_get_next(item)) {
node = (orte_node_t*)item;
if (node->index == jdata->bookmark->index) {
cur_node_item = item;
break;
}
}
/* see if we found it - if not, just start at the beginning */
if (NULL == cur_node_item) {
cur_node_item = opal_list_get_first(node_list);
}
} else {
/* if no bookmark, then just start at the beginning of the list */
cur_node_item = opal_list_get_first(node_list);
}
OPAL_OUTPUT_VERBOSE((5, orte_rmaps_base_framework.framework_output,
"%s Starting bookmark at node %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
((orte_node_t*)cur_node_item)->name));
/* is this node fully subscribed? If so, then the first
* proc we assign will oversubscribe it, so let's look
* for another candidate
*/
node = (orte_node_t*)cur_node_item;
ndmin = node;
overload = ndmin->slots_inuse - ndmin->slots;
if (node->slots_inuse >= node->slots) {
/* work down the list - is there another node that
* would not be oversubscribed?
*/
if (cur_node_item != opal_list_get_last(node_list)) {
item = opal_list_get_next(cur_node_item);
} else {
item = opal_list_get_first(node_list);
}
nd1 = NULL;
while (item != cur_node_item) {
nd1 = (orte_node_t*)item;
if (nd1->slots_inuse < nd1->slots) {
/* this node is not oversubscribed! use it! */
cur_node_item = item;
goto process;
}
/* this one was also oversubscribed, keep track of the
* node that has the least usage - if we can't
* find anyone who isn't fully utilized, we will
* start with the least used node
*/
if (overload >= (nd1->slots_inuse - nd1->slots)) {
ndmin = nd1;
overload = ndmin->slots_inuse - ndmin->slots;
}
if (item == opal_list_get_last(node_list)) {
item = opal_list_get_first(node_list);
} else {
item= opal_list_get_next(item);
}
}
/* if we get here, then we cycled all the way around the
* list without finding a better answer - just use the node
* that is minimally overloaded if it is better than
* what we already have
*/
if (NULL != nd1 &&
(nd1->slots_inuse - nd1->slots) < (node->slots_inuse - node->slots)) {
cur_node_item = (opal_list_item_t*)ndmin;
}
}
process:
OPAL_OUTPUT_VERBOSE((5, orte_rmaps_base_framework.framework_output,
"%s Starting at node %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
((orte_node_t*)cur_node_item)->name));
/* make life easier - put the bookmark at the top of the list,
* shifting everything above it to the end of the list while
* preserving order
*/
while (cur_node_item != (item = opal_list_get_first(node_list))) {
opal_list_remove_item(node_list, item);
//.........这里部分代码省略.........
static int create_appfile(orte_sstore_base_global_snapshot_info_t *snapshot)
{
int exit_status = ORTE_SUCCESS;
FILE *appfile = NULL;
opal_list_item_t* item = NULL;
char *tmp_str = NULL;
char *amca_param = NULL;
char *tune_param = NULL;
char *reference_fmt_str = NULL;
char *location_str = NULL;
char *ref_location_fmt_str = NULL;
orte_sstore_base_local_snapshot_info_t *vpid_snapshot = NULL;
/*
* Create the appfile
*/
orte_sstore.get_attr(snapshot->ss_handle,
SSTORE_METADATA_GLOBAL_SNAP_LOC_ABS,
&tmp_str);
asprintf(&orte_restart_globals.appfile, "%s/%s",
tmp_str,
strdup("restart-appfile"));
if( NULL != tmp_str ) {
free(tmp_str);
tmp_str = NULL;
}
orte_sstore.get_attr(snapshot->ss_handle,
SSTORE_METADATA_GLOBAL_AMCA_PARAM,
&amca_param);
orte_sstore.get_attr(snapshot->ss_handle,
SSTORE_METADATA_GLOBAL_TUNE_PARAM,
&tune_param);
if (NULL == (appfile = fopen(orte_restart_globals.appfile, "w")) ) {
exit_status = ORTE_ERROR;
goto cleanup;
}
/* This will give a format string that we can use */
orte_sstore.get_attr(snapshot->ss_handle,
SSTORE_METADATA_LOCAL_SNAP_REF_FMT,
&reference_fmt_str);
orte_sstore.get_attr(snapshot->ss_handle,
SSTORE_METADATA_LOCAL_SNAP_LOC,
&location_str);
orte_sstore.get_attr(snapshot->ss_handle,
SSTORE_METADATA_LOCAL_SNAP_REF_LOC_FMT,
&ref_location_fmt_str);
/*
* Sort the snapshots so that they are in order
*/
opal_list_sort(&snapshot->local_snapshots, snapshot_sort_compare_fn);
/*
* Construct the appfile
*/
for(item = opal_list_get_first(&snapshot->local_snapshots);
item != opal_list_get_end(&snapshot->local_snapshots);
item = opal_list_get_next(item) ) {
vpid_snapshot = (orte_sstore_base_local_snapshot_info_t*)item;
fprintf(appfile, "#\n");
fprintf(appfile, "# Old Process Name: %u.%u\n",
vpid_snapshot->process_name.jobid,
vpid_snapshot->process_name.vpid);
fprintf(appfile, "#\n");
fprintf(appfile, "-np 1 ");
fprintf(appfile, "--sstore-load ");
/* loc:ref:postfix:seq */
fprintf(appfile, "%s:%s:",
location_str,
orte_restart_globals.snapshot_ref);
fprintf(appfile, reference_fmt_str, vpid_snapshot->process_name.vpid);
fprintf(appfile, ":%s:%s:%d ",
(vpid_snapshot->compress_comp == NULL ? "" : vpid_snapshot->compress_comp),
(vpid_snapshot->compress_postfix == NULL ? "" : vpid_snapshot->compress_postfix),
orte_restart_globals.seq_number);
if( NULL == amca_param ) {
amca_param = strdup("ft-enable-cr");
opal_show_help("help-orte-restart.txt", "amca_param_not_found", true,
amca_param);
}
fprintf(appfile, "-am %s ", amca_param);
if( NULL == tune_param ) {
tune_param = strdup("ft-enable-cr");
opal_show_help("help-orte-restart.txt", "tune_param_not_found", true,
tune_param);
}
fprintf(appfile, "-tune %s ", tune_param);
fprintf(appfile, " opal-restart ");
/*
* By default, point to the central storage location of the checkpoint.
//.........这里部分代码省略.........
开发者ID:ORNL,项目名称:ompi,代码行数:101,代码来源:orte-restart.c
示例8: daemon_coll_recv
//.........这里部分代码省略.........
while (NULL != (nm = (orte_namelist_t*)opal_list_remove_first(&coll->targets))) {
OBJ_RELEASE(nm);
}
/* relay the data, if required */
if (np == coll->num_peer_buckets) {
orte_routed.get_routing_list(ORTE_GRPCOMM_COLL_RELAY, coll);
while (NULL != (nm = (orte_namelist_t*)opal_list_remove_first(&coll->targets))) {
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:base:daemon_coll: RELAYING COLLECTIVE TO %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&nm->name)));
relay = OBJ_NEW(opal_buffer_t);
orte_grpcomm_base_pack_collective(relay, jobid,
coll, ORTE_GRPCOMM_INTERNAL_STG_GLOBAL);
if (ORTE_VPID_WILDCARD == nm->name.vpid) {
/* this is going to everyone in this job, so use xcast */
orte_grpcomm.xcast(nm->name.jobid, relay, ORTE_RML_TAG_DAEMON_COLL);
OBJ_RELEASE(relay);
}
/* otherwise, send to each member, but don't send it back to the
* sender as that can create an infinite loop
*/
if (nm->name.vpid == sender->vpid) {
OBJ_RELEASE(relay);
} else {
if (0 > orte_rml.send_buffer_nb(&nm->name, relay, ORTE_RML_TAG_DAEMON_COLL, 0,
orte_rml_send_callback, NULL)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_RELEASE(relay);
}
}
OBJ_RELEASE(nm);
}
}
/* clear the list for reuse */
while (NULL != (nm = (orte_namelist_t*)opal_list_remove_first(&coll->targets))) {
OBJ_RELEASE(nm);
}
/* determine how many contributors we need to recv - we know
* that all job objects were found, so we can skip that test
* while counting
*/
np = 0;
for (item = opal_list_get_first(&coll->participants);
item != opal_list_get_end(&coll->participants);
item = opal_list_get_next(item)) {
nm = (orte_namelist_t*)item;
/* get the job object for this participant */
jdata = orte_get_job_data_object(nm->name.jobid);
if (ORTE_VPID_WILDCARD == nm->name.vpid) {
/* all procs from this job are required to participate */
np += jdata->num_procs;
} else {
np++;
}
}
/* are we done? */
if (np != coll->num_global_recvd) {
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:base:daemon_coll: MISSING CONTRIBUTORS: np %s ngr %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_VPID_PRINT(np),
ORTE_VPID_PRINT(coll->num_global_recvd)));
return;
}
/* since we discovered that the collective is complete, we
* need to send it to all the participants
*/
for (item = opal_list_get_first(&coll->participants);
item != opal_list_get_end(&coll->participants);
item = opal_list_get_next(item)) {
nm = (orte_namelist_t*)item;
relay = OBJ_NEW(opal_buffer_t);
opal_dss.pack(relay, &coll->id, 1, ORTE_GRPCOMM_COLL_ID_T);
opal_dss.copy_payload(relay, &coll->buffer);
/* if the vpid is wildcard, then this goes to
* all daemons for relay
*/
if (ORTE_VPID_WILDCARD == nm->name.vpid) {
orte_grpcomm.xcast(nm->name.jobid, relay, ORTE_RML_TAG_COLLECTIVE);
OBJ_RELEASE(relay);
} else {
/* send it to this proc */
if (0 > orte_rml.send_buffer_nb(&nm->name, relay, ORTE_RML_TAG_COLLECTIVE, 0,
orte_rml_send_callback, NULL)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_RELEASE(relay);
}
}
}
/* remove this collective */
opal_list_remove_item(&orte_grpcomm_base.active_colls, &coll->super);
OBJ_RELEASE(coll);
}
void orte_grpcomm_base_progress_collectives(void)
{
opal_list_item_t *item;
orte_grpcomm_collective_t *coll;
orte_namelist_t *nm;
orte_job_t *jdata;
opal_buffer_t *relay;
int rc;
/* cycle thru all known collectives - any collective on the list
* must have come from either a local proc or receiving a global
* collective. Either way, the number of required recipients
* is the number of local procs for that job
*/
item = opal_list_get_first(&orte_grpcomm_base.active_colls);
while (item != opal_list_get_end(&orte_grpcomm_base.active_colls)) {
coll = (orte_grpcomm_collective_t*)item;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s PROGRESSING COLL id %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
coll->id));
/* if this collective is already locally complete, then ignore it */
if (coll->locally_complete) {
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s COLL %d IS LOCALLY COMPLETE",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
coll->id));
goto next_coll;
}
/* get the jobid of the participants in this collective */
if (NULL == (nm = (orte_namelist_t*)opal_list_get_first(&coll->participants))) {
opal_output(0, "NO PARTICIPANTS");
goto next_coll;
}
/* get the job object for this participant */
if (NULL == (jdata = orte_get_job_data_object(nm->name.jobid))) {
/* if the job object isn't found, then we can't progress
* this collective
*/
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s COLL %d JOBID %s NOT FOUND",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
coll->id, ORTE_JOBID_PRINT(nm->name.jobid)));
goto next_coll;
}
/* all local procs from this job are required to participate */
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s ALL LOCAL PROCS FOR JOB %s CONTRIBUTE %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jdata->jobid),
(int)jdata->num_local_procs));
/* see if all reqd participants are done */
if (jdata->num_local_procs == coll->num_local_recvd) {
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s COLLECTIVE %d LOCALLY COMPLETE - SENDING TO GLOBAL COLLECTIVE",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), coll->id));
/* mark it as locally complete */
coll->locally_complete = true;
/* pack the collective */
relay = OBJ_NEW(opal_buffer_t);
orte_grpcomm_base_pack_collective(relay, jdata->jobid,
coll, ORTE_GRPCOMM_INTERNAL_STG_LOCAL);
/* send it to our global collective handler */
if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_NAME, relay,
ORTE_RML_TAG_DAEMON_COLL, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(relay);
}
}
next_coll:
item = opal_list_get_next(item);
}
}
/* process incoming coll returns */
static void app_recv(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
orte_grpcomm_collective_t *coll, *cptr;
opal_list_item_t *item;
int n, rc;
orte_grpcomm_coll_id_t id;
orte_namelist_t *nm;
/* get the collective id */
n = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &id, &n, ORTE_GRPCOMM_COLL_ID_T))) {
ORTE_ERROR_LOG(rc);
return;
}
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:base:receive processing collective return for id %d recvd from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), id, ORTE_NAME_PRINT(sender)));
/* if the sender is my daemon, then this collective is
* a global one and is complete
*/
if (ORTE_PROC_MY_DAEMON->jobid == sender->jobid &&
ORTE_PROC_MY_DAEMON->vpid == sender->vpid) {
/* search my list of active collectives */
for (item = opal_list_get_first(&orte_grpcomm_base.active_colls);
item != opal_list_get_end(&orte_grpcomm_base.active_colls);
item = opal_list_get_next(item)) {
coll = (orte_grpcomm_collective_t*)item;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s CHECKING COLL id %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
coll->id));
if (id == coll->id) {
/* see if the collective needs another step */
if (NULL != coll->next_cb) {
/* have to go here next */
coll->next_cb(buffer, coll->next_cbdata);
break;
}
/* flag the collective as complete */
coll->active = false;
/* cleanup */
opal_list_remove_item(&orte_grpcomm_base.active_colls, item);
/* callback the specified function */
if (NULL != coll->cbfunc) {
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:base:receive executing callback",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
coll->cbfunc(buffer, coll->cbdata);
}
/* do NOT release the collective - it is the responsibility
* of whomever passed it down to us
*/
break;
}
}
return;
}
/* this came from another application process, so it
* belongs to a non-global collective taking place
* only between procs. Since there is a race condition
* between when we might create our own collective and
* when someone might send it to us, we may not have
* the collective on our list - see if we do
*/
coll = NULL;
for (item = opal_list_get_first(&orte_grpcomm_base.active_colls);
item != opal_list_get_end(&orte_grpcomm_base.active_colls);
item = opal_list_get_next(item)) {
cptr = (orte_grpcomm_collective_t*)item;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s CHECKING COLL id %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
cptr->id));
if (id == cptr->id) {
/* aha - we do have it */
coll = cptr;
break;
}
}
if (NULL == coll) {
/* nope - add it */
coll = OBJ_NEW(orte_grpcomm_collective_t);
coll->id = id;
opal_list_append(&orte_grpcomm_base.active_colls, &coll->super);
}
/* append the sender to the list of targets so
* we know we already have their contribution
*/
nm = OBJ_NEW(orte_namelist_t);
nm->name.jobid = sender->jobid;
nm->name.vpid = sender->vpid;
//.........这里部分代码省略.........
void orte_iof_orted_read_handler(int fd, short event, void *cbdata)
{
orte_iof_read_event_t *rev = (orte_iof_read_event_t*)cbdata;
unsigned char data[ORTE_IOF_BASE_MSG_MAX];
opal_buffer_t *buf=NULL;
int rc;
int32_t numbytes;
opal_list_item_t *item;
orte_iof_proc_t *proct;
orte_ns_cmp_bitmask_t mask;
/* read up to the fragment size */
#if !defined(__WINDOWS__)
numbytes = read(fd, data, sizeof(data));
#else
{
DWORD readed;
HANDLE handle = (HANDLE)_get_osfhandle(fd);
ReadFile(handle, data, sizeof(data), &readed, NULL);
numbytes = (int)readed;
}
#endif /* !defined(__WINDOWS__) */
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s iof:orted:read handler read %d bytes from %s, fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
numbytes, ORTE_NAME_PRINT(&rev->name), fd));
if (numbytes <= 0) {
if (0 > numbytes) {
/* either we have a connection error or it was a non-blocking read */
if (EAGAIN == errno || EINTR == errno) {
/* non-blocking, retry */
opal_event_add(rev->ev, 0);
return;
}
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s iof:orted:read handler %s Error on connection:%d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&rev->name), fd));
}
/* numbytes must have been zero, so go down and close the fd etc */
goto CLEAN_RETURN;
}
/* see if the user wanted the output directed to files */
if (NULL != orte_output_filename) {
/* find the sink for this rank */
for (item = opal_list_get_first(&mca_iof_orted_component.sinks);
item != opal_list_get_end(&mca_iof_orted_component.sinks);
item = opal_list_get_next(item)) {
orte_iof_sink_t *sink = (orte_iof_sink_t*)item;
/* if the target is set, then this sink is for another purpose - ignore it */
if (ORTE_JOBID_INVALID != sink->daemon.jobid) {
continue;
}
/* if this sink isn't for output, ignore it */
if (ORTE_IOF_STDIN & sink->tag) {
continue;
}
mask = ORTE_NS_CMP_ALL;
/* is this the desired proc? */
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &sink->name, &rev->name)) {
/* output to the corresponding file */
orte_iof_base_write_output(&rev->name, rev->tag, data, numbytes, sink->wev);
/* done */
break;
}
}
goto RESTART;
}
/* prep the buffer */
buf = OBJ_NEW(opal_buffer_t);
/* pack the stream first - we do this so that flow control messages can
* consist solely of the tag
*/
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &rev->tag, 1, ORTE_IOF_TAG))) {
ORTE_ERROR_LOG(rc);
goto CLEAN_RETURN;
}
/* pack name of process that gave us this data */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &rev->name, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
goto CLEAN_RETURN;
}
/* pack the data - only pack the #bytes we read! */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &data, numbytes, OPAL_BYTE))) {
ORTE_ERROR_LOG(rc);
goto CLEAN_RETURN;
}
/* start non-blocking RML call to forward received data */
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
//.........这里部分代码省略.........
请发表评论