int zmq::router_t::xsend (msg_t *msg_)
{
// If this is the first part of the message it's the ID of the
// peer to send the message to.
if (!more_out) {
zmq_assert (!current_out);
// If we have malformed message (prefix with no subsequent message)
// then just silently ignore it.
// TODO: The connections should be killed instead.
if (msg_->flags () & msg_t::more) {
more_out = true;
// Find the pipe associated with the identity stored in the prefix.
// If there's no such pipe just silently ignore the message, unless
// router_mandatory is set.
blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
outpipes_t::iterator it = outpipes.find (identity);
if (it != outpipes.end ()) {
current_out = it->second.pipe;
if (!current_out->check_write ()) {
it->second.active = false;
current_out = NULL;
if (mandatory) {
more_out = false;
errno = EAGAIN;
return -1;
}
}
}
else
if (mandatory) {
more_out = false;
errno = EHOSTUNREACH;
return -1;
}
}
int rc = msg_->close ();
errno_assert (rc == 0);
rc = msg_->init ();
errno_assert (rc == 0);
return 0;
}
// Ignore the MORE flag for raw-sock or assert?
if (options.raw_sock)
msg_->reset_flags (msg_t::more);
// Check whether this is the last part of the message.
more_out = msg_->flags () & msg_t::more ? true : false;
// Push the message into the pipe. If there's no out pipe, just drop it.
if (current_out) {
// Close the remote connection if user has asked to do so
// by sending zero length message.
// Pending messages in the pipe will be dropped (on receiving term- ack)
if (raw_sock && msg_->size() == 0) {
current_out->terminate (false);
int rc = msg_->close ();
errno_assert (rc == 0);
current_out = NULL;
return 0;
}
bool ok = current_out->write (msg_);
if (unlikely (!ok))
current_out = NULL;
else
if (!more_out) {
current_out->flush ();
current_out = NULL;
}
}
else {
int rc = msg_->close ();
errno_assert (rc == 0);
}
// Detach the message from the data buffer.
int rc = msg_->init ();
errno_assert (rc == 0);
return 0;
}
开发者ID:gonzus,项目名称:libzmq,代码行数:88,代码来源:router.cpp
示例2: nn_global_init
//.........这里部分代码省略.........
self.print_errors = envvar && *envvar;
/* Print socket statistics to stderr */
envvar = getenv("NN_PRINT_STATISTICS");
self.print_statistics = envvar && *envvar;
/* Allocate the stack of unused file descriptors. */
self.unused = (uint16_t*) (self.socks + NN_MAX_SOCKETS);
alloc_assert (self.unused);
for (i = 0; i != NN_MAX_SOCKETS; ++i)
self.unused [i] = NN_MAX_SOCKETS - i - 1;
/* Initialise other parts of the global state. */
nn_list_init (&self.transports);
nn_list_init (&self.socktypes);
/* Plug in individual transports. */
nn_global_add_transport (nn_inproc);
nn_global_add_transport (nn_ipc);
nn_global_add_transport (nn_tcp);
nn_global_add_transport (nn_rdma);
nn_global_add_transport (nn_ws);
nn_global_add_transport (nn_tcpmux);
/* Plug in individual socktypes. */
nn_global_add_socktype (nn_pair_socktype);
nn_global_add_socktype (nn_xpair_socktype);
nn_global_add_socktype (nn_pub_socktype);
nn_global_add_socktype (nn_sub_socktype);
nn_global_add_socktype (nn_xpub_socktype);
nn_global_add_socktype (nn_xsub_socktype);
nn_global_add_socktype (nn_rep_socktype);
nn_global_add_socktype (nn_req_socktype);
nn_global_add_socktype (nn_xrep_socktype);
nn_global_add_socktype (nn_xreq_socktype);
nn_global_add_socktype (nn_push_socktype);
nn_global_add_socktype (nn_xpush_socktype);
nn_global_add_socktype (nn_pull_socktype);
nn_global_add_socktype (nn_xpull_socktype);
nn_global_add_socktype (nn_respondent_socktype);
nn_global_add_socktype (nn_surveyor_socktype);
nn_global_add_socktype (nn_xrespondent_socktype);
nn_global_add_socktype (nn_xsurveyor_socktype);
nn_global_add_socktype (nn_bus_socktype);
nn_global_add_socktype (nn_xbus_socktype);
/* Start the worker threads. */
nn_pool_init (&self.pool);
/* Start FSM */
nn_fsm_init_root (&self.fsm, nn_global_handler, nn_global_shutdown,
&self.ctx);
self.state = NN_GLOBAL_STATE_IDLE;
nn_ctx_init (&self.ctx, nn_global_getpool (), NULL);
nn_timer_init (&self.stat_timer, NN_GLOBAL_SRC_STAT_TIMER, &self.fsm);
/* Initializing special sockets. */
addr = getenv ("NN_STATISTICS_SOCKET");
if (addr) {
self.statistics_socket = nn_global_create_socket (AF_SP, NN_PUB);
errno_assert (self.statistics_socket >= 0);
rc = nn_global_create_ep (self.socks[self.statistics_socket], addr, 0);
errno_assert (rc >= 0);
} else {
self.statistics_socket = -1;
}
addr = getenv ("NN_APPLICATION_NAME");
if (addr) {
strncpy (self.appname, addr, 63);
self.appname[63] = '\0';
} else {
/* No cross-platform way to find out application binary.
Also, MSVC suggests using _getpid() instead of getpid(),
however, it's not clear whether the former is supported
by older versions of Windows/MSVC. */
#if defined _MSC_VER
#pragma warning (push)
#pragma warning (disable:4996)
#endif
sprintf (self.appname, "nanomsg.%d", getpid());
#if defined _MSC_VER
#pragma warning (pop)
#endif
}
addr = getenv ("NN_HOSTNAME");
if (addr) {
strncpy (self.hostname, addr, 63);
self.hostname[63] = '\0';
} else {
rc = gethostname (self.hostname, 63);
errno_assert (rc == 0);
self.hostname[63] = '\0';
}
nn_fsm_start(&self.fsm);
}
void zmq::session_base_t::start_connecting (bool wait_)
{
zmq_assert (connect);
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);
// Create the connecter object.
if (addr->protocol == "tcp") {
tcp_connecter_t *connecter = new (std::nothrow) tcp_connecter_t (
io_thread, this, options, addr, wait_);
alloc_assert (connecter);
launch_child (connecter);
return;
}
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
if (addr->protocol == "ipc") {
ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t (
io_thread, this, options, addr, wait_);
alloc_assert (connecter);
launch_child (connecter);
return;
}
#endif
#if defined ZMQ_HAVE_OPENPGM
// Both PGM and EPGM transports are using the same infrastructure.
if (addr->protocol == "pgm" || addr->protocol == "epgm") {
// For EPGM transport with UDP encapsulation of PGM is used.
bool udp_encapsulation = (addr->protocol == "epgm");
// At this point we'll create message pipes to the session straight
// away. There's no point in delaying it as no concept of 'connect'
// exists with PGM anyway.
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
// PGM sender.
pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
io_thread, options);
alloc_assert (pgm_sender);
int rc = pgm_sender->init (udp_encapsulation, addr->address.c_str ());
errno_assert (rc == 0);
send_attach (this, pgm_sender);
}
else if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) {
// PGM receiver.
pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
io_thread, options);
alloc_assert (pgm_receiver);
int rc = pgm_receiver->init (udp_encapsulation, addr->address.c_str ());
errno_assert (rc == 0);
send_attach (this, pgm_receiver);
}
else
zmq_assert (false);
return;
}
#endif
zmq_assert (false);
}
int zmq::socket_base_t::connect (const char *addr_)
{
if (unlikely (ctx_terminated)) {
errno = ETERM;
return -1;
}
// Process pending commands, if any.
int rc = process_commands (0, false);
if (unlikely (rc != 0))
return -1;
// Parse addr_ string.
std::string protocol;
std::string address;
rc = parse_uri (addr_, protocol, address);
if (rc != 0)
return -1;
rc = check_protocol (protocol);
if (rc != 0)
return -1;
if (protocol == "inproc") {
// TODO: inproc connect is specific with respect to creating pipes
// as there's no 'reconnect' functionality implemented. Once that
// is in place we should follow generic pipe creation algorithm.
// Find the peer endpoint.
endpoint_t peer = find_endpoint (addr_);
if (!peer.socket)
return -1;
// The total HWM for an inproc connection should be the sum of
// the binder's HWM and the connector's HWM.
int sndhwm = 0;
if (options.sndhwm != 0 && peer.options.rcvhwm != 0)
sndhwm = options.sndhwm + peer.options.rcvhwm;
int rcvhwm = 0;
if (options.rcvhwm != 0 && peer.options.sndhwm != 0)
rcvhwm = options.rcvhwm + peer.options.sndhwm;
// Create a bi-directional pipe to connect the peers.
object_t *parents [2] = {this, peer.socket};
pipe_t *new_pipes [2] = {NULL, NULL};
bool conflate = options.conflate &&
(options.type == ZMQ_DEALER ||
options.type == ZMQ_PULL ||
options.type == ZMQ_PUSH ||
options.type == ZMQ_PUB ||
options.type == ZMQ_SUB);
int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
bool conflates [2] = {conflate, conflate};
int rc = pipepair (parents, new_pipes, hwms, delays, conflates);
errno_assert (rc == 0);
// Attach local end of the pipe to this socket object.
attach_pipe (new_pipes [0]);
// If required, send the identity of the local socket to the peer.
if (peer.options.recv_identity) {
msg_t id;
rc = id.init_size (options.identity_size);
errno_assert (rc == 0);
memcpy (id.data (), options.identity, options.identity_size);
id.set_flags (msg_t::identity);
bool written = new_pipes [0]->write (&id);
zmq_assert (written);
new_pipes [0]->flush ();
}
// If required, send the identity of the peer to the local socket.
if (options.recv_identity) {
msg_t id;
rc = id.init_size (peer.options.identity_size);
errno_assert (rc == 0);
memcpy (id.data (), peer.options.identity, peer.options.identity_size);
id.set_flags (msg_t::identity);
bool written = new_pipes [1]->write (&id);
zmq_assert (written);
new_pipes [1]->flush ();
}
// Attach remote end of the pipe to the peer socket. Note that peer's
// seqnum was incremented in find_endpoint function. We don't need it
// increased here.
send_bind (peer.socket, new_pipes [1], false);
// Save last endpoint URI
last_endpoint.assign (addr_);
// remember inproc connections for disconnect
inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes[0]));
return 0;
}
//.........这里部分代码省略.........
static int make_fdpair (xs::fd_t *r_, xs::fd_t *w_)
{
#if defined XS_HAVE_EVENTFD
// Create eventfd object.
#if defined EFD_CLOEXEC
xs::fd_t fd = eventfd (0, EFD_CLOEXEC);
if (fd == -1)
return -1;
#else
xs::fd_t fd = eventfd (0, 0);
if (fd == -1)
return -1;
#if defined FD_CLOEXEC
int rc = fcntl (fd, F_SETFD, FD_CLOEXEC);
errno_assert (rc != -1);
#endif
#endif
*w_ = fd;
*r_ = fd;
return 0;
#elif defined XS_HAVE_WINDOWS
// On Windows we are using TCP sockets for in-process communication.
// That is a security hole -- other processes on the same box may connect
// to the bound TCP port and hook into internal signal processing of
// the library. To solve this problem we should use a proper in-process
// signaling mechanism such as private semaphore. However, on Windows,
// these cannot be polled on using select(). Other functions that allow
// polling on these objects (e.g. WaitForMulitpleObjects) don't allow
// to poll on sockets. Thus, the only way to fix the problem is to
// implement IOCP polling mechanism that allows to poll on both sockets
// and in-process synchronisation objects.
// Make the following critical section accessible to everyone.
SECURITY_ATTRIBUTES sa = {0};
sa.nLength = sizeof (sa);
sa.bInheritHandle = FALSE;
SECURITY_DESCRIPTOR sd;
BOOL ok = InitializeSecurityDescriptor (&sd, SECURITY_DESCRIPTOR_REVISION);
win_assert (ok);
ok = SetSecurityDescriptorDacl(&sd, TRUE, (PACL) NULL, FALSE);
win_assert (ok);
sa.lpSecurityDescriptor = &sd;
// This function has to be in a system-wide critical section so that
// two instances of the library don't accidentally create signaler
// crossing the process boundary.
// We'll use named event object to implement the critical section.
HANDLE sync = CreateEvent (&sa, FALSE, TRUE, "xs-signaler-port-sync");
win_assert (sync != NULL);
// Enter the critical section.
DWORD dwrc = WaitForSingleObject (sync, INFINITE);
xs_assert (dwrc == WAIT_OBJECT_0);
// Windows has no 'socketpair' function. CreatePipe is no good as pipe
// handles cannot be polled on. Here we create the socketpair by hand.
*w_ = INVALID_SOCKET;
*r_ = INVALID_SOCKET;
// Create listening socket.
SOCKET listener;
listener = xs::open_socket (AF_INET, SOCK_STREAM, 0);
if (listener == xs::retired_fd)
return -1;
// Set SO_REUSEADDR and TCP_NODELAY on listening socket.
BOOL so_reuseaddr = 1;
int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
(char *)&so_reuseaddr, sizeof (so_reuseaddr));
wsa_assert (rc != SOCKET_ERROR);
BOOL tcp_nodelay = 1;
rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
(char *)&tcp_nodelay, sizeof (tcp_nodelay));
wsa_assert (rc != SOCKET_ERROR);
// Bind listening socket to the local port.
struct sockaddr_in addr;
memset (&addr, 0, sizeof (addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
addr.sin_port = htons (xs::signaler_port);
rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr));
wsa_assert (rc != SOCKET_ERROR);
// Listen for incomming connections.
rc = listen (listener, 1);
wsa_assert (rc != SOCKET_ERROR);
// Create the writer socket.
*w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0);
if (*w_ == xs::retired_fd) {
rc = closesocket (listener);
wsa_assert (rc != SOCKET_ERROR);
return -1;
}
// Set TCP_NODELAY on writer socket.
//.........这里部分代码省略.........
int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
{
// If this is the first part of the message it's the ID of the
// peer to send the message to.
if (!more_out) {
zmq_assert (!current_out);
// If we have malformed message (prefix with no subsequent message)
// then just silently ignore it.
// TODO: The connections should be killed instead.
if (msg_->flags () & msg_t::label) {
more_out = true;
// Find the pipe associated with the peer ID stored in the prefix.
// If there's no such pipe just silently ignore the message.
if (msg_->size () == 4) {
uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ());
outpipes_t::iterator it = outpipes.find (peer_id);
if (it != outpipes.end ()) {
current_out = it->second.pipe;
msg_t empty;
int rc = empty.init ();
errno_assert (rc == 0);
if (!current_out->check_write (&empty)) {
it->second.active = false;
more_out = false;
current_out = NULL;
}
rc = empty.close ();
errno_assert (rc == 0);
}
}
}
int rc = msg_->close ();
errno_assert (rc == 0);
rc = msg_->init ();
errno_assert (rc == 0);
return 0;
}
// Check whether this is the last part of the message.
more_out = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
// Push the message into the pipe. If there's no out pipe, just drop it.
if (current_out) {
bool ok = current_out->write (msg_);
if (unlikely (!ok))
current_out = NULL;
else if (!more_out) {
current_out->flush ();
current_out = NULL;
}
}
else {
int rc = msg_->close ();
errno_assert (rc == 0);
}
// Detach the message from the data buffer.
int rc = msg_->init ();
errno_assert (rc == 0);
return 0;
}
int zmq::req_t::xsend (msg_t *msg_)
{
// If we've sent a request and we still haven't got the reply,
// we can't send another request unless the strict option is disabled.
if (receiving_reply) {
if (strict) {
errno = EFSM;
return -1;
}
receiving_reply = false;
message_begins = true;
}
// First part of the request is the request identity.
if (message_begins) {
reply_pipe = NULL;
if (request_id_frames_enabled) {
request_id++;
// Copy request id before sending (see issue #1695 for details).
uint32_t *request_id_copy = (uint32_t *) malloc (sizeof (uint32_t));
*request_id_copy = request_id;
msg_t id;
int rc = id.init_data (request_id_copy, sizeof (uint32_t),
free_id, NULL);
errno_assert (rc == 0);
id.set_flags (msg_t::more);
rc = dealer_t::sendpipe (&id, &reply_pipe);
if (rc != 0)
return -1;
}
msg_t bottom;
int rc = bottom.init ();
errno_assert (rc == 0);
bottom.set_flags (msg_t::more);
rc = dealer_t::sendpipe (&bottom, &reply_pipe);
if (rc != 0)
return -1;
zmq_assert (reply_pipe);
message_begins = false;
// Eat all currently available messages before the request is fully
// sent. This is done to avoid:
// REQ sends request to A, A replies, B replies too.
// A's reply was first and matches, that is used.
// An hour later REQ sends a request to B. B's old reply is used.
msg_t drop;
while (true) {
rc = drop.init ();
errno_assert (rc == 0);
rc = dealer_t::xrecv (&drop);
if (rc != 0)
break;
drop.close ();
}
}
bool more = msg_->flags () & msg_t::more ? true : false;
int rc = dealer_t::xsend (msg_);
if (rc != 0)
return rc;
// If the request was fully sent, flip the FSM into reply-receiving state.
if (!more) {
receiving_reply = true;
message_begins = true;
}
return 0;
}
int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_)
{
if (strcmp (protocol_, "tcp") == 0 ) {
// Resolve the sockaddr to bind to.
int rc = resolve_ip_interface (&addr, &addr_len, addr_);
if (rc != 0)
return -1;
// Create a listening socket.
s = socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP);
if (s == -1)
return -1;
// Allow reusing of the address.
int flag = 1;
rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
errno_assert (rc == 0);
// Set the non-blocking flag.
flag = fcntl (s, F_GETFL, 0);
if (flag == -1)
flag = 0;
rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
errno_assert (rc != -1);
// Bind the socket to the network interface and port.
rc = bind (s, (struct sockaddr*) &addr, addr_len);
if (rc != 0) {
close ();
return -1;
}
// Listen for incomming connections.
rc = listen (s, tcp_connection_backlog);
if (rc != 0) {
close ();
return -1;
}
return 0;
}
else if (strcmp (protocol_, "ipc") == 0) {
// Get rid of the file associated with the UNIX domain socket that
// may have been left behind by the previous run of the application.
::unlink (addr_);
// Convert the address into sockaddr_un structure.
int rc = resolve_local_path (&addr, &addr_len, addr_);
if (rc != 0)
return -1;
// Create a listening socket.
s = socket (AF_UNIX, SOCK_STREAM, 0);
if (s == -1)
return -1;
// Set the non-blocking flag.
int flag = fcntl (s, F_GETFL, 0);
if (flag == -1)
flag = 0;
rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
errno_assert (rc != -1);
// Bind the socket to the file path.
rc = bind (s, (struct sockaddr*) &addr, sizeof (sockaddr_un));
if (rc != 0) {
close ();
return -1;
}
// Listen for incomming connections.
rc = listen (s, tcp_connection_backlog);
if (rc != 0) {
close ();
return -1;
}
return 0;
}
else {
errno = EPROTONOSUPPORT;
return -1;
}
}
请发表评论