00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "Socket.h"
00023 #include "RemoteServer.h"
00024 #include "Message.h"
00025 #include <errno.h>
00026 #include <unistd.h>
00027 #include <netinet/in.h>
00028 #include <fcntl.h>
00029 #include <iostream>
00030
00031 using namespace CODEX_Quorum;
00032
00033 SocketBase::SocketBase( int domain ,
00034 int type ,
00035 int protocol ,
00036 bool blocking ) :
00037 m_msgOffset(0),
00038 m_domain(domain),
00039 m_type(type),
00040 m_protocol(protocol),
00041 m_blocking(blocking),
00042 m_port(0),
00043 m_backlog(0)
00044 {
00045 m_socket = ::socket(m_domain,m_type,m_protocol);
00046 if ( m_socket < 0 )
00047 throw QSESocketBaseBadSocket( __FILE__ ,
00048 __LINE__ ,
00049 m_domain , m_type , m_protocol , errno );
00050 }
00051
00052 SocketBase::SocketBase( const SocketBase& aOther ) :
00053 m_msgOffset(0),
00054 m_domain(aOther.m_domain),
00055 m_type(aOther.m_type),
00056 m_protocol(aOther.m_protocol),
00057 m_blocking(aOther.m_blocking),
00058 m_port(aOther.m_port),
00059 m_backlog(aOther.m_backlog),
00060 m_socket(aOther.m_socket)
00061 {}
00062
00063 SocketBase::~SocketBase()
00064 {
00065 ::close( m_socket );
00066 }
00067
00068 void
00069 SocketBase::setup( int port, int backlog )
00070 {
00071 protected_bind(port);
00072 protected_listen(backlog);
00073 }
00074
00075 void
00076 SocketBase::setup(struct sockaddr* my_addr,
00077 socklen_t addrlen,
00078 int backlog)
00079 {
00080 protected_bind(my_addr, addrlen);
00081 protected_listen(backlog);
00082 }
00083
00084 void
00085 SocketBase::connect(const RemoteServer& server)
00086 {
00087 if ( ::connect(m_socket, server.sockaddr(), server.addrlen()) < 0 )
00088 throw QSESocketBaseCannotConnect( __FILE__ ,
00089 __LINE__ ,
00090 m_socket , server , errno );
00091 }
00092
00093 SocketBase*
00094 SocketBase::accept()
00095 {
00096 return protected_accept();
00097 }
00098
00099 size_t
00100 SocketBase::readFrom( void* output, size_t maxSize ) const
00101 {
00102
00103 int bytesRead = ::read( m_socket, output, maxSize );
00104 if ( bytesRead < 0 )
00105 {
00106 throw QSESocketBaseReadFailed( __FILE__ , __LINE__ , m_socket , errno );
00107 }
00108 if ( 0 == bytesRead )
00109 {
00110 throw QSESocketBaseSocketClosed( __FILE__ , __LINE__ , m_socket , 0 );
00111 }
00112 return bytesRead;
00113 }
00114
00115 size_t
00116 SocketBase::readAll( Message& msg, size_t length ) const
00117 {
00118 const size_t buffSize = 1024;
00119 unsigned char buff[buffSize];
00120 size_t readLength;
00121 if ( 0 == msg.length() )
00122 {
00123 readLength = 1;
00124 }
00125 else
00126 {
00127 if ( 0 == length )
00128 {
00129 readLength = buffSize;
00130 }
00131 else
00132 {
00133 readLength = ( length < buffSize ) ? length : buffSize;
00134 }
00135 }
00136 size_t toRead = length;
00137 size_t nRead = readFrom( buff, readLength );
00138 if ( 0 == nRead )
00139 {
00140
00141 return 0;
00142 }
00143 int offset = 0;
00144 if ( 0 == msg.length() )
00145 {
00146
00147
00148 unsigned char nLength = buff[0];
00149 if ( nLength > 4 )
00150 {
00151 throw QSESocketBaseMessageTooLong( __FILE__ , __LINE__ ,
00152 m_socket , 0 );
00153 }
00154 nLength = ( nLength < 1 ) ? 1 : nLength;
00155 nRead += readFrom( buff+1, nLength + 1 );
00156 if ( nRead < nLength + 2U )
00157
00158
00159 {
00160 throw QSESocketBaseMessageTooShort( __FILE__ , __LINE__ ,
00161 m_socket , 0 );
00162 }
00163 toRead = 0;
00164 for ( int i = 1 ; i <= nLength ; ++i )
00165 {
00166 toRead = toRead*256 + buff[i];
00167 }
00168 offset = nLength + 1;
00169 }
00170
00171
00172 msg.fill( buff+offset, nRead-offset );
00173 toRead -= nRead-offset;
00174
00175
00176 while ( toRead > 0 )
00177 {
00178 fd_set read_set;
00179 FD_ZERO( &read_set );
00180 int s = set_fd( &read_set, SocketBase::kRead );
00181 struct timeval tv;
00182 tv.tv_sec = 0;
00183 tv.tv_usec = 0;
00184 if ( 0 > select(s+1, &read_set, 0, 0, &tv) )
00185 {
00186 throw QSESocketBaseReadFailed( __FILE__ , __LINE__ ,
00187 m_socket , errno );
00188 }
00189
00190 if ( ! isset_fd( &read_set, SocketBase::kRead ) )
00191 {
00192
00193 return toRead;
00194 }
00195
00196
00197 nRead = readFrom( buff, (toRead<buffSize)?toRead:buffSize );
00198 if ( nRead > 0 )
00199 {
00200 msg.fill(buff,nRead);
00201 }
00202
00203 toRead -= nRead;
00204 }
00205 return 0;
00206 }
00207
00208 void
00209 SocketBase::writeTo( const Message& input ) const
00210 {
00211 int messageSize = input.length();
00212
00213
00214
00215 unsigned char length_info[5];
00216 length_info[0] = 4;
00217 length_info[4] = messageSize%256;
00218 length_info[3] = (messageSize/256)%256;
00219 length_info[2] = (messageSize/65536)%256;
00220 length_info[1] = (messageSize/16777216)%256;
00221
00222
00223
00224
00225
00226 Message* m = new Message( length_info, 5 );
00227 m->fill( input );
00228 m_msgQueue.push( m );
00229 }
00230
00231 int
00232 SocketBase::set_fd( fd_set* fd_bitmap, SocketBase::StateType s ) const
00233 {
00234 FD_SET( m_socket, fd_bitmap );
00235 return m_socket;
00236 }
00237
00238 bool
00239 SocketBase::isset_fd( const fd_set* fd_bitmap, SocketBase::StateType s ) const
00240 {
00241 bool retVal = FD_ISSET( m_socket, fd_bitmap );
00242 if ( retVal && ( kWrite == s ) )
00243 {
00244
00245 while ( retVal && !m_msgQueue.empty() )
00246 {
00247 Message* first = m_msgQueue.front();
00248 const unsigned char* output = first->buffer() + m_msgOffset;
00249 size_t maxSize = first->length() - m_msgOffset;
00250 int bytesWritten = internal_write( output, maxSize );
00251 m_msgOffset += bytesWritten;
00252 if ( first->length() == m_msgOffset )
00253 {
00254
00255 delete first;
00256 m_msgQueue.pop();
00257 m_msgOffset = 0;
00258 }
00259
00260
00261 fd_set write_set;
00262 FD_ZERO( &write_set );
00263 int s = set_fd( &write_set, SocketBase::kWrite );
00264 struct timeval tv;
00265 tv.tv_sec = 0;
00266 tv.tv_usec = 0;
00267 if ( 0 > select( s+1, 0, &write_set, 0, &tv ) )
00268 {
00269 throw QSESocketBaseWriteFailed( __FILE__ , __LINE__ ,
00270 m_socket , errno );
00271 }
00272 retVal = FD_ISSET( m_socket, &write_set );
00273 }
00274 }
00275 return retVal;
00276 }
00277
00278 void
00279 SocketBase::flush() const
00280 {
00281
00282 while ( ! m_msgQueue.empty() )
00283 {
00284 Message* first = m_msgQueue.front();
00285 const unsigned char* output = first->buffer() + m_msgOffset;
00286 size_t maxSize = first->length() - m_msgOffset;
00287 try
00288 {
00289 int bytesWritten = internal_write( output, maxSize );
00290 m_msgOffset += bytesWritten;
00291 }
00292 catch ( QSESocketBaseWriteFailed& e )
00293 {
00294
00295 if ( e.error() != EAGAIN )
00296 {
00297 throw;
00298 }
00299 }
00300 if ( first->length() == m_msgOffset )
00301 {
00302
00303 delete first;
00304 m_msgQueue.pop();
00305 m_msgOffset = 0;
00306 }
00307 }
00308 }
00309
00310 int
00311 SocketBase::internal_write( const unsigned char* output, size_t maxSize ) const
00312 {
00313 int bytesWritten = ::send( m_socket, output, maxSize, 0 );
00314 if ( bytesWritten < 0 )
00315 {
00316 throw QSESocketBaseWriteFailed( __FILE__ , __LINE__ , m_socket , errno );
00317 }
00318 return bytesWritten;
00319 }
00320
00321 SocketBase*
00322 SocketBase::protected_accept()
00323 {
00324 SocketBase* newSocket = clone();
00325
00326
00327 struct sockaddr* addr = 0;
00328 socklen_t* paddrlen = 0;
00329 socklen_t addrlen = 0;
00330 switch( m_domain )
00331 {
00332 case PF_INET :
00333 addr = (struct sockaddr*) malloc(sizeof(struct sockaddr_in));
00334 addrlen = sizeof(struct sockaddr_in);
00335 paddrlen = &addrlen;
00336 break;
00337 }
00338 int newSocketFD = ::accept( m_socket, addr, paddrlen );
00339 if ( newSocketFD < 0 )
00340 {
00341 free( addr );
00342 throw QSESocketBaseAcceptFailed( __FILE__ , __LINE__ , m_socket, errno );
00343 }
00344 newSocket->setSocket( newSocketFD );
00345 newSocket->setBacklog( 0 );
00346 switch( m_domain )
00347 {
00348 case PF_INET :
00349 struct sockaddr_in* addr_in = (struct sockaddr_in*) addr;
00350 newSocket->setPort(addr_in->sin_port);
00351 break;
00352 }
00353 free(addr);
00354 if ( ! m_blocking )
00355 {
00356 int flags;
00357 if ( -1 == (flags = fcntl(newSocket->socket(), F_GETFL, 0)) )
00358 flags = 0;
00359 fcntl(newSocket->socket(), F_SETFL, flags | O_NONBLOCK);
00360 }
00361 newSocket->finish_accept();
00362 return newSocket;
00363 }
00364
00365 void
00366 SocketBase::finish_accept()
00367 {
00368 }
00369
00370 SocketBase*
00371 SocketBase::clone()
00372 {
00373 SocketBase* copy = new SocketBase( *this );
00374 return copy;
00375 }
00376
00377 void
00378 SocketBase::protected_bind(int port)
00379 {
00380 m_port = port;
00381 struct sockaddr_in* my_addr_in = new struct sockaddr_in;
00382 my_addr_in->sin_family = AF_INET;
00383 my_addr_in->sin_port = htons(m_port);
00384 my_addr_in->sin_addr.s_addr = htonl(INADDR_ANY);
00385
00386 struct sockaddr* my_addr = (struct sockaddr*)my_addr_in;
00387 socklen_t addrlen = sizeof(*my_addr_in);
00388
00389 protected_bind(my_addr, addrlen);
00390 delete my_addr_in;
00391 }
00392
00398 void
00399 SocketBase::protected_bind(struct sockaddr* my_addr, socklen_t addrlen)
00400 {
00401 int on = 1;
00402 ::setsockopt( m_socket, SOL_SOCKET, SO_REUSEADDR, (void*)&on, sizeof(on) );
00403 if ( ::bind(m_socket, my_addr, addrlen) < 0 )
00404 throw QSESocketBaseCannotBind( __FILE__ , __LINE__ , m_socket , errno );
00405 }
00406
00407 void
00408 SocketBase::protected_listen(int backlog)
00409 {
00410 m_backlog = backlog;
00411 if ( ::listen(m_socket, m_backlog) < 0 )
00412 throw QSESocketBaseListenFailed( __FILE__ , __LINE__ ,
00413 m_socket , m_backlog , errno );
00414 }
00415
00416 void
00417 SocketBase::setSocket( int socketFD )
00418 {
00419 m_socket = socketFD;
00420 }
00421
00422 void
00423 SocketBase::setBacklog( int backlog )
00424 {
00425 m_backlog = backlog;
00426 }
00427
00428 void
00429 SocketBase::setPort( int port )
00430 {
00431 m_port = port;
00432 }
00433
00434
00435
00436 void
00437 QSESocketBase::derivedMsg() const
00438 {
00439 cerr << "SocketBase -- ";
00440 errMsg();
00441 cerr << "\n errno = " << m_error;
00442 }
00443
00444 void
00445 QSESocketBaseBadSocket::errMsg() const
00446 {
00447 cerr << "call to socket( "
00448 << m_domain << " , "
00449 << m_type << " , "
00450 << m_protocol << " ) failed";
00451 }
00452
00453 void
00454 QSESocketBaseCannotBind::errMsg() const
00455 {
00456 cerr << "call to bind failed for socket " << m_socket;
00457 }
00458
00459 void
00460 QSESocketBaseListenFailed::errMsg() const
00461 {
00462 cerr << "call to listen failed for socket " << m_socket
00463 << ", backlog " << m_backlog;
00464 }
00465
00466 void
00467 QSESocketBaseAcceptFailed::errMsg() const
00468 {
00469 cerr << "call to accept failed for socket " << m_socket;
00470 }
00471
00472 void
00473 QSESocketBaseCannotConnect::errMsg() const
00474 {
00475 cerr << "cannot connect:\n"
00476 << " socket = " << m_socket << "\n"
00477 << " server = " << m_server.name() << ":" << m_server.port();
00478 }
00479
00480 void
00481 QSESocketBaseSocketClosed::errMsg() const
00482 {
00483 cerr << "socket " << m_socket << " is closed";
00484 }
00485
00486 void
00487 QSESocketBaseReadFailed::errMsg() const
00488 {
00489 cerr << "read failed for socket " << m_socket;
00490 }
00491
00492 void
00493 QSESocketBaseWriteFailed::errMsg() const
00494 {
00495 cerr << "write failed for socket " << m_socket;
00496 }
00497
00498 void
00499 QSESocketBaseMessageTooLong::errMsg() const
00500 {
00501 cerr << "message too long on socket " << m_socket;
00502 }
00503
00504 void
00505 QSESocketBaseMessageTooShort::errMsg() const
00506 {
00507 cerr << "message too short on socket " << m_socket;
00508 }