Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Namespace Members   Compound Members   Related Pages  

Socket.cc

00001 /*
00002  * Copyright 2003 Michael A. Marsh, Cornell University. All rights reserved.
00003  * This software is released under the modified BSD license.
00004  * See the file LICENSE in the top-level directory for details.
00005  */
00006 //
00007 // $Id: Socket.cc,v 1.4 2004/05/19 15:56:56 mmarsh Exp $
00008 //
00009 // $Log: Socket.cc,v $
00010 // Revision 1.4  2004/05/19 15:56:56  mmarsh
00011 // *** empty log message ***
00012 //
00013 // Revision 1.3  2004/01/23 14:54:57  mmarsh
00014 // flush() is supposed to be a blocking write, so we should ignore
00015 // EAGAIN/EWOULDBLOCK errors.
00016 //
00017 // Revision 1.2  2003/11/04 22:17:23  mmarsh
00018 // General code cleanup.
00019 //
00020 //
00021 
00022 #include "Socket.h"
00023 #include "RemoteServer.h"
00024 #include "Message.h"
00025 #include <errno.h>
00026 #include <unistd.h>
00027 #include <netinet/in.h>
00028 #include <fcntl.h>
00029 #include <iostream>
00030 
00031 using namespace CODEX_Quorum;
00032 
00033 SocketBase::SocketBase( int   domain   ,
00034                         int   type     ,
00035                         int   protocol ,
00036                         bool  blocking ) :
00037    m_msgOffset(0),
00038    m_domain(domain),
00039    m_type(type),
00040    m_protocol(protocol),
00041    m_blocking(blocking),
00042    m_port(0),
00043    m_backlog(0)
00044 {
00045    m_socket = ::socket(m_domain,m_type,m_protocol);
00046    if ( m_socket < 0 )
00047       throw QSESocketBaseBadSocket( __FILE__ ,
00048                                     __LINE__ ,
00049                                     m_domain , m_type , m_protocol , errno );
00050 }
00051 
00052 SocketBase::SocketBase( const SocketBase& aOther ) :
00053    m_msgOffset(0),
00054    m_domain(aOther.m_domain),
00055    m_type(aOther.m_type),
00056    m_protocol(aOther.m_protocol),
00057    m_blocking(aOther.m_blocking),
00058    m_port(aOther.m_port),
00059    m_backlog(aOther.m_backlog),
00060    m_socket(aOther.m_socket)
00061 {}
00062 
00063 SocketBase::~SocketBase()
00064 {
00065    ::close( m_socket );
00066 }
00067 
00068 void
00069 SocketBase::setup( int port, int backlog )
00070 {
00071    protected_bind(port);
00072    protected_listen(backlog);
00073 }
00074 
00075 void
00076 SocketBase::setup(struct sockaddr* my_addr,
00077                   socklen_t addrlen,
00078                   int backlog)
00079 {
00080    protected_bind(my_addr, addrlen);
00081    protected_listen(backlog);
00082 }
00083 
00084 void
00085 SocketBase::connect(const RemoteServer& server)
00086 {
00087    if ( ::connect(m_socket, server.sockaddr(), server.addrlen()) < 0 )
00088       throw QSESocketBaseCannotConnect( __FILE__ ,
00089                                         __LINE__ ,
00090                                         m_socket , server , errno );
00091 }
00092 
00093 SocketBase*
00094 SocketBase::accept()
00095 {
00096    return protected_accept();
00097 }
00098 
00099 size_t
00100 SocketBase::readFrom( void* output, size_t maxSize ) const
00101 {
00102    //int bytesRead = ::recv( m_socket, output, maxSize, 0 );
00103    int bytesRead = ::read( m_socket, output, maxSize );
00104    if ( bytesRead < 0 )
00105    {
00106       throw QSESocketBaseReadFailed( __FILE__ , __LINE__ , m_socket , errno );
00107    }
00108    if ( 0 == bytesRead )
00109    {
00110       throw QSESocketBaseSocketClosed( __FILE__ , __LINE__ , m_socket , 0 );
00111    }
00112    return bytesRead;
00113 }
00114 
00115 size_t
00116 SocketBase::readAll( Message& msg, size_t length ) const
00117 {
00118    const size_t buffSize = 1024;
00119    unsigned char buff[buffSize];
00120    size_t readLength;
00121    if ( 0 == msg.length() )
00122    {
00123       readLength = 1;
00124    }
00125    else
00126    {
00127       if ( 0 == length )
00128       {
00129          readLength = buffSize;
00130       }
00131       else
00132       {
00133          readLength = ( length < buffSize ) ? length : buffSize;
00134       }
00135    }
00136    size_t toRead = length;
00137    size_t nRead = readFrom( buff, readLength ); // this may throw
00138    if ( 0 == nRead )
00139    {
00140       // There was a problem reading -- we're not ready yet.
00141       return 0;
00142    }
00143    int offset = 0;
00144    if ( 0 == msg.length() )
00145    {
00146       // we need to find out how long the message is -- this should be
00147       // the only thing read out at the moment
00148       unsigned char nLength = buff[0];
00149       if ( nLength > 4 ) // int on any platform -- > 4GB is ridiculous
00150       {
00151          throw QSESocketBaseMessageTooLong( __FILE__ , __LINE__ ,
00152                                             m_socket , 0 );
00153       }
00154       nLength = ( nLength < 1 ) ? 1 : nLength; // 0 == 1
00155       nRead += readFrom( buff+1, nLength + 1 );
00156       if ( nRead < nLength + 2U ) // at least one byte of actual data -- we're
00157                                   // willing to block at this point, since
00158                                   // it's critical that we read this data now
00159       {
00160          throw QSESocketBaseMessageTooShort( __FILE__ , __LINE__ ,
00161                                              m_socket , 0 );
00162       }
00163       toRead = 0;
00164       for ( int i = 1 ; i <= nLength ; ++i )
00165       {
00166          toRead = toRead*256 + buff[i];
00167       }
00168       offset = nLength + 1;
00169    }
00170 
00171    // now fill the Message with what we've just read
00172    msg.fill( buff+offset, nRead-offset );
00173    toRead -= nRead-offset;
00174 
00175    // read until we're done or we block
00176    while ( toRead > 0 )
00177    {
00178       fd_set read_set;
00179       FD_ZERO( &read_set );
00180       int s = set_fd( &read_set, SocketBase::kRead );
00181       struct timeval tv;
00182       tv.tv_sec = 0;
00183       tv.tv_usec = 0;
00184       if ( 0 > select(s+1, &read_set, 0, 0, &tv) )
00185       {
00186          throw QSESocketBaseReadFailed( __FILE__ , __LINE__ ,
00187                                         m_socket , errno );
00188       }
00189 
00190       if ( ! isset_fd( &read_set, SocketBase::kRead ) )
00191       {
00192          // we're blocked
00193          return toRead;
00194       }
00195 
00196       // get up to toRead or buffSize bytes, whichever is smaller
00197       nRead = readFrom( buff, (toRead<buffSize)?toRead:buffSize );
00198       if ( nRead > 0 )
00199       {
00200          msg.fill(buff,nRead);
00201       }
00202       // this is safe -- nRead <= toRead according to semantics of readFrom
00203       toRead -= nRead;
00204    }
00205    return 0;
00206 }
00207 
00208 void
00209 SocketBase::writeTo( const Message& input ) const
00210 {
00211    int messageSize = input.length();
00212 
00213    // Send the length info -- This is the most conservative way to set it,
00214    // with the maximum number of length bytes permitted.
00215    unsigned char length_info[5];
00216    length_info[0] = 4;
00217    length_info[4] = messageSize%256;
00218    length_info[3] = (messageSize/256)%256;
00219    length_info[2] = (messageSize/65536)%256;
00220    length_info[1] = (messageSize/16777216)%256;
00221 
00222    // We need to put the entire message in a single buffer so that
00223    // we can write a large chunk all at once.  Otherwise, there's
00224    // likely to be an error on the receiving end when it can't read
00225    // enough data.
00226    Message* m = new Message( length_info, 5 );
00227    m->fill( input );
00228    m_msgQueue.push( m );
00229 }
00230 
00231 int
00232 SocketBase::set_fd( fd_set* fd_bitmap, SocketBase::StateType s ) const
00233 {
00234    FD_SET( m_socket, fd_bitmap );
00235    return m_socket;
00236 }
00237 
00238 bool
00239 SocketBase::isset_fd( const fd_set* fd_bitmap, SocketBase::StateType s ) const
00240 {
00241    bool retVal = FD_ISSET( m_socket, fd_bitmap );
00242    if ( retVal && ( kWrite == s ) )
00243    {
00244       // Write as much as possible from the message queue without blocking.
00245       while ( retVal && !m_msgQueue.empty() )
00246       {
00247          Message* first = m_msgQueue.front();
00248          const unsigned char* output = first->buffer() + m_msgOffset;
00249          size_t maxSize = first->length() - m_msgOffset;
00250          int bytesWritten = internal_write( output, maxSize );
00251          m_msgOffset += bytesWritten;
00252          if ( first->length() == m_msgOffset )
00253          {
00254             // The message is written completely.  Clean up.
00255             delete first;
00256             m_msgQueue.pop();
00257             m_msgOffset = 0;
00258          }
00259 
00260          // Check the status of the socket again.
00261          fd_set write_set;
00262          FD_ZERO( &write_set );
00263          int s = set_fd( &write_set, SocketBase::kWrite );
00264          struct timeval tv;
00265          tv.tv_sec = 0;
00266          tv.tv_usec = 0;
00267          if ( 0 > select( s+1, 0, &write_set, 0, &tv ) )
00268          {
00269             throw QSESocketBaseWriteFailed( __FILE__ , __LINE__ ,
00270                                             m_socket , errno );
00271          }
00272          retVal = FD_ISSET( m_socket, &write_set );
00273       }
00274    }
00275    return retVal;
00276 }
00277 
00278 void
00279 SocketBase::flush() const
00280 {
00281    // This will basically spin until all of the data is consumed.
00282    while ( ! m_msgQueue.empty() )
00283    {
00284       Message* first = m_msgQueue.front();
00285       const unsigned char* output = first->buffer() + m_msgOffset;
00286       size_t maxSize = first->length() - m_msgOffset;
00287       try
00288       {
00289          int bytesWritten = internal_write( output, maxSize );
00290          m_msgOffset += bytesWritten;
00291       }
00292       catch ( QSESocketBaseWriteFailed& e )
00293       {
00294          // This is a blocking routine, so if we'd block, that's fine.
00295          if ( e.error() != EAGAIN )
00296          {
00297             throw;
00298          }
00299       }
00300       if ( first->length() == m_msgOffset )
00301       {
00302          // The message is written completely.  Clean up.
00303          delete first;
00304          m_msgQueue.pop();
00305          m_msgOffset = 0;
00306       }
00307    }
00308 }
00309 
00310 int
00311 SocketBase::internal_write( const unsigned char* output, size_t maxSize ) const
00312 {
00313    int bytesWritten = ::send( m_socket, output, maxSize, 0 );
00314    if ( bytesWritten < 0 )
00315    {
00316       throw QSESocketBaseWriteFailed( __FILE__ , __LINE__ , m_socket , errno );
00317    }
00318    return bytesWritten;
00319 }
00320 
00321 SocketBase*
00322 SocketBase::protected_accept()
00323 {
00324    SocketBase* newSocket = clone();
00325    // We don't know what accept(2) is going to do, memory-wise, so
00326    // we need to use malloc(3) instead of new.
00327    struct sockaddr* addr = 0;
00328    socklen_t* paddrlen = 0;
00329    socklen_t addrlen = 0;
00330    switch( m_domain )
00331    {
00332       case PF_INET :
00333          addr = (struct sockaddr*) malloc(sizeof(struct sockaddr_in));
00334          addrlen = sizeof(struct sockaddr_in);
00335          paddrlen = &addrlen;
00336          break;
00337    }
00338    int newSocketFD = ::accept( m_socket, addr, paddrlen );
00339    if ( newSocketFD < 0 )
00340    {
00341       free( addr );
00342       throw QSESocketBaseAcceptFailed( __FILE__ , __LINE__ , m_socket, errno );
00343    }
00344    newSocket->setSocket( newSocketFD );
00345    newSocket->setBacklog( 0 );
00346    switch( m_domain )
00347    {
00348       case PF_INET :
00349          struct sockaddr_in* addr_in = (struct sockaddr_in*) addr;
00350          newSocket->setPort(addr_in->sin_port);
00351          break;
00352    }
00353    free(addr);
00354    if ( ! m_blocking )
00355    { // only using the POSIX variety
00356       int flags;
00357       if ( -1 == (flags = fcntl(newSocket->socket(), F_GETFL, 0)) )
00358          flags = 0;
00359       fcntl(newSocket->socket(), F_SETFL, flags | O_NONBLOCK);
00360    }
00361    newSocket->finish_accept();
00362    return newSocket;
00363 }
00364 
00365 void
00366 SocketBase::finish_accept()
00367 {
00368 }
00369 
00370 SocketBase*
00371 SocketBase::clone()
00372 {
00373    SocketBase* copy = new SocketBase( *this );
00374    return copy;
00375 }
00376 
00377 void
00378 SocketBase::protected_bind(int port)
00379 {
00380    m_port = port;
00381    struct sockaddr_in* my_addr_in = new struct sockaddr_in;
00382    my_addr_in->sin_family         = AF_INET;
00383    my_addr_in->sin_port           = htons(m_port);
00384    my_addr_in->sin_addr.s_addr    = htonl(INADDR_ANY);
00385 
00386    struct sockaddr* my_addr = (struct sockaddr*)my_addr_in;
00387    socklen_t addrlen = sizeof(*my_addr_in);
00388 
00389    protected_bind(my_addr, addrlen);
00390    delete my_addr_in;
00391 }
00392 
00398 void
00399 SocketBase::protected_bind(struct sockaddr* my_addr, socklen_t addrlen)
00400 {
00401    int on = 1;
00402    ::setsockopt( m_socket, SOL_SOCKET, SO_REUSEADDR, (void*)&on, sizeof(on) );
00403    if ( ::bind(m_socket, my_addr, addrlen) < 0 )
00404       throw QSESocketBaseCannotBind( __FILE__ , __LINE__ , m_socket , errno );
00405 }
00406 
00407 void
00408 SocketBase::protected_listen(int backlog)
00409 {
00410    m_backlog = backlog;
00411    if ( ::listen(m_socket, m_backlog) < 0 )
00412       throw QSESocketBaseListenFailed( __FILE__ , __LINE__ ,
00413                                        m_socket , m_backlog , errno );
00414 }
00415 
00416 void
00417 SocketBase::setSocket( int socketFD )
00418 {
00419    m_socket = socketFD;
00420 }
00421 
00422 void
00423 SocketBase::setBacklog( int backlog )
00424 {
00425    m_backlog = backlog;
00426 }
00427 
00428 void
00429 SocketBase::setPort( int port )
00430 {
00431    m_port = port;
00432 }
00433 
00434 //------ Exceptions ------//
00435 
00436 void
00437 QSESocketBase::derivedMsg() const
00438 {
00439    cerr << "SocketBase -- ";
00440    errMsg();
00441    cerr << "\n   errno = " << m_error;
00442 }
00443 
00444 void
00445 QSESocketBaseBadSocket::errMsg() const
00446 {
00447    cerr << "call to socket( "
00448         << m_domain   << " , "
00449         << m_type     << " , "
00450         << m_protocol << " ) failed";
00451 }
00452 
00453 void
00454 QSESocketBaseCannotBind::errMsg() const
00455 {
00456    cerr << "call to bind failed for socket " << m_socket;
00457 }
00458 
00459 void
00460 QSESocketBaseListenFailed::errMsg() const
00461 {
00462    cerr << "call to listen failed for socket " << m_socket
00463         << ", backlog " << m_backlog;
00464 }
00465 
00466 void
00467 QSESocketBaseAcceptFailed::errMsg() const
00468 {
00469    cerr << "call to accept failed for socket " << m_socket;
00470 }
00471 
00472 void
00473 QSESocketBaseCannotConnect::errMsg() const
00474 {
00475    cerr << "cannot connect:\n"
00476         << "   socket = " << m_socket << "\n"
00477         << "   server = " << m_server.name() << ":" << m_server.port();
00478 }
00479 
00480 void
00481 QSESocketBaseSocketClosed::errMsg() const
00482 {
00483    cerr << "socket " << m_socket << " is closed";
00484 }
00485 
00486 void
00487 QSESocketBaseReadFailed::errMsg() const
00488 {
00489    cerr << "read failed for socket " << m_socket;
00490 }
00491 
00492 void
00493 QSESocketBaseWriteFailed::errMsg() const
00494 {
00495    cerr << "write failed for socket " << m_socket;
00496 }
00497 
00498 void
00499 QSESocketBaseMessageTooLong::errMsg() const
00500 {
00501    cerr << "message too long on socket " << m_socket;
00502 }
00503 
00504 void
00505 QSESocketBaseMessageTooShort::errMsg() const
00506 {
00507    cerr << "message too short on socket " << m_socket;
00508 }

Generated on Wed Jun 2 16:32:56 2004 for COrnell Data EXchange (CODEX) by doxygen1.2.18