Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

Socket.cc

00001 /*
00002  * Copyright 2003 Michael A. Marsh, Cornell University. All rights reserved.
00003  * This software is released under the modified BSD license.
00004  * See the file LICENSE in the top-level directory for details.
00005  */
00006 //
00007 // $Id: Socket.cc,v 1.5 2004/10/01 15:42:25 mmarsh Exp $
00008 //
00009 // $Log: Socket.cc,v $
00010 // Revision 1.5  2004/10/01 15:42:25  mmarsh
00011 // Updated to handle non-stream sockets.
00012 //
00013 // Revision 1.4  2004/05/19 15:56:56  mmarsh
00014 // *** empty log message ***
00015 //
00016 // Revision 1.3  2004/01/23 14:54:57  mmarsh
00017 // flush() is supposed to be a blocking write, so we should ignore
00018 // EAGAIN/EWOULDBLOCK errors.
00019 //
00020 // Revision 1.2  2003/11/04 22:17:23  mmarsh
00021 // General code cleanup.
00022 //
00023 //
00024 
00025 #include "Socket.h"
00026 #include "RemoteServer.h"
00027 #include "Message.h"
00028 #include <errno.h>
00029 #include <unistd.h>
00030 #include <netinet/in.h>
00031 #include <fcntl.h>
00032 #include <iostream>
00033 
00034 using namespace CODEX_Quorum;
00035 
00036 SocketBase::SocketBase( int   domain   ,
00037                         int   type     ,
00038                         int   protocol ,
00039                         bool  blocking ) :
00040    m_msgOffset(0),
00041    m_domain(domain),
00042    m_type(type),
00043    m_protocol(protocol),
00044    m_blocking(blocking),
00045    m_port(0),
00046    m_backlog(0)
00047 {
00048    m_socket = ::socket(m_domain,m_type,m_protocol);
00049    if ( m_socket < 0 )
00050       throw QSESocketBaseBadSocket( __FILE__ ,
00051                                     __LINE__ ,
00052                                     m_domain , m_type , m_protocol , errno );
00053 }
00054 
00055 SocketBase::SocketBase( const SocketBase& aOther ) :
00056    m_msgOffset(0),
00057    m_domain(aOther.m_domain),
00058    m_type(aOther.m_type),
00059    m_protocol(aOther.m_protocol),
00060    m_blocking(aOther.m_blocking),
00061    m_port(aOther.m_port),
00062    m_backlog(aOther.m_backlog),
00063    m_socket(aOther.m_socket)
00064 {}
00065 
00066 SocketBase::~SocketBase()
00067 {
00068    ::close( m_socket );
00069 }
00070 
00071 void
00072 SocketBase::setup( int port, int backlog )
00073 {
00074    protected_bind(port);
00075    protected_listen(backlog);
00076 }
00077 
00078 void
00079 SocketBase::setup(struct sockaddr* my_addr,
00080                   socklen_t addrlen,
00081                   int backlog)
00082 {
00083    protected_bind(my_addr, addrlen);
00084    protected_listen(backlog);
00085 }
00086 
00087 void
00088 SocketBase::connect(const RemoteServer& server)
00089 {
00090    if ( ::connect(m_socket, server.sockaddr(), server.addrlen()) < 0 )
00091       throw QSESocketBaseCannotConnect( __FILE__ ,
00092                                         __LINE__ ,
00093                                         m_socket , server , errno );
00094 }
00095 
00096 SocketBase*
00097 SocketBase::accept()
00098 {
00099    return protected_accept();
00100 }
00101 
00102 size_t
00103 SocketBase::readFrom( void* output, size_t maxSize ) const
00104 {
00105    //int bytesRead = ::recv( m_socket, output, maxSize, 0 );
00106    int bytesRead = ::read( m_socket, output, maxSize );
00107    if ( bytesRead < 0 )
00108    {
00109       throw QSESocketBaseReadFailed( __FILE__ , __LINE__ , m_socket , errno );
00110    }
00111    if ( 0 == bytesRead )
00112    {
00113       throw QSESocketBaseSocketClosed( __FILE__ , __LINE__ , m_socket , 0 );
00114    }
00115    return bytesRead;
00116 }
00117 
00118 size_t
00119 SocketBase::readAll( Message& msg, size_t length ) const
00120 {
00121    const size_t buffSize = 1024;
00122    unsigned char buff[buffSize];
00123    size_t readLength;
00124    if ( 0 == msg.length() )
00125    {
00126       readLength = 1;
00127    }
00128    else
00129    {
00130       if ( 0 == length )
00131       {
00132          readLength = buffSize;
00133       }
00134       else
00135       {
00136          readLength = ( length < buffSize ) ? length : buffSize;
00137       }
00138    }
00139    size_t toRead = length;
00140    size_t nRead = readFrom( buff, readLength ); // this may throw
00141    if ( 0 == nRead )
00142    {
00143       // There was a problem reading -- we're not ready yet.
00144       return 0;
00145    }
00146    int offset = 0;
00147    if ( 0 == msg.length() )
00148    {
00149       // we need to find out how long the message is -- this should be
00150       // the only thing read out at the moment
00151       unsigned char nLength = buff[0];
00152       if ( nLength > 4 ) // int on any platform -- > 4GB is ridiculous
00153       {
00154          throw QSESocketBaseMessageTooLong( __FILE__ , __LINE__ ,
00155                                             m_socket , 0 );
00156       }
00157       nLength = ( nLength < 1 ) ? 1 : nLength; // 0 == 1
00158       nRead += readFrom( buff+1, nLength + 1 );
00159       if ( nRead < nLength + 2U ) // at least one byte of actual data -- we're
00160                                   // willing to block at this point, since
00161                                   // it's critical that we read this data now
00162       {
00163          throw QSESocketBaseMessageTooShort( __FILE__ , __LINE__ ,
00164                                              m_socket , 0 );
00165       }
00166       toRead = 0;
00167       for ( int i = 1 ; i <= nLength ; ++i )
00168       {
00169          toRead = toRead*256 + buff[i];
00170       }
00171       offset = nLength + 1;
00172    }
00173 
00174    // now fill the Message with what we've just read
00175    msg.fill( buff+offset, nRead-offset );
00176    toRead -= nRead-offset;
00177 
00178    // read until we're done or we block
00179    while ( toRead > 0 )
00180    {
00181       fd_set read_set;
00182       FD_ZERO( &read_set );
00183       int s = set_fd( &read_set, SocketBase::kRead );
00184       struct timeval tv;
00185       tv.tv_sec = 0;
00186       tv.tv_usec = 0;
00187       if ( 0 > select(s+1, &read_set, 0, 0, &tv) )
00188       {
00189          throw QSESocketBaseReadFailed( __FILE__ , __LINE__ ,
00190                                         m_socket , errno );
00191       }
00192 
00193       if ( ! isset_fd( &read_set, SocketBase::kRead ) )
00194       {
00195          // we're blocked
00196          return toRead;
00197       }
00198 
00199       // get up to toRead or buffSize bytes, whichever is smaller
00200       nRead = readFrom( buff, (toRead<buffSize)?toRead:buffSize );
00201       if ( nRead > 0 )
00202       {
00203          msg.fill(buff,nRead);
00204       }
00205       // this is safe -- nRead <= toRead according to semantics of readFrom
00206       toRead -= nRead;
00207    }
00208    return 0;
00209 }
00210 
00211 void
00212 SocketBase::writeTo( const Message& input ) const
00213 {
00214    int messageSize = input.length();
00215 
00216    // Send the length info -- This is the most conservative way to set it,
00217    // with the maximum number of length bytes permitted.
00218    unsigned char length_info[5];
00219    length_info[0] = 4;
00220    length_info[4] = messageSize%256;
00221    length_info[3] = (messageSize/256)%256;
00222    length_info[2] = (messageSize/65536)%256;
00223    length_info[1] = (messageSize/16777216)%256;
00224 
00225    // We need to put the entire message in a single buffer so that
00226    // we can write a large chunk all at once.  Otherwise, there's
00227    // likely to be an error on the receiving end when it can't read
00228    // enough data.
00229    Message* m = new Message( length_info, 5 );
00230    m->fill( input );
00231    m_msgQueue.push( m );
00232 }
00233 
00234 int
00235 SocketBase::set_fd( fd_set* fd_bitmap, SocketBase::StateType s ) const
00236 {
00237    FD_SET( m_socket, fd_bitmap );
00238    return m_socket;
00239 }
00240 
00241 bool
00242 SocketBase::isset_fd( const fd_set* fd_bitmap, SocketBase::StateType s ) const
00243 {
00244    bool retVal = FD_ISSET( m_socket, fd_bitmap );
00245    if ( retVal && ( kWrite == s ) )
00246    {
00247       // Write as much as possible from the message queue without blocking.
00248       while ( retVal && !m_msgQueue.empty() )
00249       {
00250          Message* first = m_msgQueue.front();
00251          const unsigned char* output = first->buffer() + m_msgOffset;
00252          size_t maxSize = first->length() - m_msgOffset;
00253          int bytesWritten = internal_write( output, maxSize );
00254          m_msgOffset += bytesWritten;
00255          if ( first->length() == m_msgOffset )
00256          {
00257             // The message is written completely.  Clean up.
00258             delete first;
00259             m_msgQueue.pop();
00260             m_msgOffset = 0;
00261          }
00262 
00263          // Check the status of the socket again.
00264          fd_set write_set;
00265          FD_ZERO( &write_set );
00266          int s = set_fd( &write_set, SocketBase::kWrite );
00267          struct timeval tv;
00268          tv.tv_sec = 0;
00269          tv.tv_usec = 0;
00270          if ( 0 > select( s+1, 0, &write_set, 0, &tv ) )
00271          {
00272             throw QSESocketBaseWriteFailed( __FILE__ , __LINE__ ,
00273                                             m_socket , errno );
00274          }
00275          retVal = FD_ISSET( m_socket, &write_set );
00276       }
00277    }
00278    return retVal;
00279 }
00280 
00281 void
00282 SocketBase::flush() const
00283 {
00284    // This will basically spin until all of the data is consumed.
00285    while ( ! m_msgQueue.empty() )
00286    {
00287       Message* first = m_msgQueue.front();
00288       const unsigned char* output = first->buffer() + m_msgOffset;
00289       size_t maxSize = first->length() - m_msgOffset;
00290       try
00291       {
00292          int bytesWritten = internal_write( output, maxSize );
00293          m_msgOffset += bytesWritten;
00294       }
00295       catch ( QSESocketBaseWriteFailed& e )
00296       {
00297          // This is a blocking routine, so if we'd block, that's fine.
00298          if ( e.error() != EAGAIN )
00299          {
00300             throw;
00301          }
00302       }
00303       if ( first->length() == m_msgOffset )
00304       {
00305          // The message is written completely.  Clean up.
00306          delete first;
00307          m_msgQueue.pop();
00308          m_msgOffset = 0;
00309       }
00310    }
00311 }
00312 
00313 int
00314 SocketBase::internal_write( const unsigned char* output, size_t maxSize ) const
00315 {
00316    int bytesWritten = ::send( m_socket, output, maxSize, 0 );
00317    if ( bytesWritten < 0 )
00318    {
00319       throw QSESocketBaseWriteFailed( __FILE__ , __LINE__ , m_socket , errno );
00320    }
00321    return bytesWritten;
00322 }
00323 
00324 SocketBase*
00325 SocketBase::protected_accept()
00326 {
00327    SocketBase* newSocket = clone();
00328    // We don't know what accept(2) is going to do, memory-wise, so
00329    // we need to use malloc(3) instead of new.
00330    struct sockaddr* addr = 0;
00331    socklen_t* paddrlen = 0;
00332    socklen_t addrlen = 0;
00333    switch( m_domain )
00334    {
00335       case PF_INET :
00336          addr = (struct sockaddr*) malloc(sizeof(struct sockaddr_in));
00337          addrlen = sizeof(struct sockaddr_in);
00338          paddrlen = &addrlen;
00339          break;
00340    }
00341    int newSocketFD = ::accept( m_socket, addr, paddrlen );
00342    if ( ( newSocketFD < 0 ) && ( errno != EOPNOTSUPP ) )
00343    {
00344       free( addr );
00345       throw QSESocketBaseAcceptFailed( __FILE__ , __LINE__ , m_socket, errno );
00346    }
00347    newSocket->setSocket( newSocketFD );
00348    newSocket->setBacklog( 0 );
00349    switch( m_domain )
00350    {
00351       case PF_INET :
00352          struct sockaddr_in* addr_in = (struct sockaddr_in*) addr;
00353          newSocket->setPort(addr_in->sin_port);
00354          break;
00355    }
00356    free(addr);
00357    if ( ! m_blocking )
00358    { // only using the POSIX variety
00359       int flags;
00360       if ( -1 == (flags = fcntl(newSocket->socket(), F_GETFL, 0)) )
00361          flags = 0;
00362       fcntl(newSocket->socket(), F_SETFL, flags | O_NONBLOCK);
00363    }
00364    newSocket->finish_accept();
00365    return newSocket;
00366 }
00367 
00368 void
00369 SocketBase::finish_accept()
00370 {
00371 }
00372 
00373 SocketBase*
00374 SocketBase::clone()
00375 {
00376    SocketBase* copy = new SocketBase( *this );
00377    return copy;
00378 }
00379 
00380 void
00381 SocketBase::protected_bind(int port)
00382 {
00383    m_port = port;
00384    struct sockaddr_in* my_addr_in = new struct sockaddr_in;
00385    my_addr_in->sin_family         = AF_INET;
00386    my_addr_in->sin_port           = htons(m_port);
00387    my_addr_in->sin_addr.s_addr    = htonl(INADDR_ANY);
00388 
00389    struct sockaddr* my_addr = (struct sockaddr*)my_addr_in;
00390    socklen_t addrlen = sizeof(*my_addr_in);
00391 
00392    protected_bind(my_addr, addrlen);
00393    delete my_addr_in;
00394 }
00395 
00401 void
00402 SocketBase::protected_bind(struct sockaddr* my_addr, socklen_t addrlen)
00403 {
00404    int on = 1;
00405    if ( m_type == SOCK_STREAM )
00406    {
00407       ::setsockopt( m_socket,
00408                     SOL_SOCKET,
00409                     SO_REUSEADDR,
00410                     (void*)&on,
00411                     sizeof(on) );
00412    }
00413    if ( ::bind(m_socket, my_addr, addrlen) < 0 )
00414       throw QSESocketBaseCannotBind( __FILE__ , __LINE__ , m_socket , errno );
00415 }
00416 
00417 void
00418 SocketBase::protected_listen(int backlog)
00419 {
00420    if ( m_type != SOCK_STREAM )
00421    {
00422       return;
00423    }
00424    m_backlog = backlog;
00425    if ( ::listen(m_socket, m_backlog) < 0 )
00426       throw QSESocketBaseListenFailed( __FILE__ , __LINE__ ,
00427                                        m_socket , m_backlog , errno );
00428 }
00429 
00430 void
00431 SocketBase::setSocket( int socketFD )
00432 {
00433    m_socket = socketFD;
00434 }
00435 
00436 void
00437 SocketBase::setBacklog( int backlog )
00438 {
00439    m_backlog = backlog;
00440 }
00441 
00442 void
00443 SocketBase::setPort( int port )
00444 {
00445    m_port = port;
00446 }
00447 
00448 //------ Exceptions ------//
00449 
00450 void
00451 QSESocketBase::derivedMsg() const
00452 {
00453    cerr << "SocketBase -- ";
00454    errMsg();
00455    cerr << "\n   errno = " << m_error;
00456 }
00457 
00458 void
00459 QSESocketBaseBadSocket::errMsg() const
00460 {
00461    cerr << "call to socket( "
00462         << m_domain   << " , "
00463         << m_type     << " , "
00464         << m_protocol << " ) failed";
00465 }
00466 
00467 void
00468 QSESocketBaseCannotBind::errMsg() const
00469 {
00470    cerr << "call to bind failed for socket " << m_socket;
00471 }
00472 
00473 void
00474 QSESocketBaseListenFailed::errMsg() const
00475 {
00476    cerr << "call to listen failed for socket " << m_socket
00477         << ", backlog " << m_backlog;
00478 }
00479 
00480 void
00481 QSESocketBaseAcceptFailed::errMsg() const
00482 {
00483    cerr << "call to accept failed for socket " << m_socket;
00484 }
00485 
00486 void
00487 QSESocketBaseCannotConnect::errMsg() const
00488 {
00489    cerr << "cannot connect:\n"
00490         << "   socket = " << m_socket << "\n"
00491         << "   server = " << m_server.name() << ":" << m_server.port();
00492 }
00493 
00494 void
00495 QSESocketBaseSocketClosed::errMsg() const
00496 {
00497    cerr << "socket " << m_socket << " is closed";
00498 }
00499 
00500 void
00501 QSESocketBaseReadFailed::errMsg() const
00502 {
00503    cerr << "read failed for socket " << m_socket;
00504 }
00505 
00506 void
00507 QSESocketBaseWriteFailed::errMsg() const
00508 {
00509    cerr << "write failed for socket " << m_socket;
00510 }
00511 
00512 void
00513 QSESocketBaseMessageTooLong::errMsg() const
00514 {
00515    cerr << "message too long on socket " << m_socket;
00516 }
00517 
00518 void
00519 QSESocketBaseMessageTooShort::errMsg() const
00520 {
00521    cerr << "message too short on socket " << m_socket;
00522 }

Generated on Fri May 6 17:41:15 2005 for COrnell Data EXchange (CODEX) by  doxygen 1.4.1