00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "Socket.h"
00026 #include "RemoteServer.h"
00027 #include "Message.h"
00028 #include <errno.h>
00029 #include <unistd.h>
00030 #include <netinet/in.h>
00031 #include <fcntl.h>
00032 #include <iostream>
00033
00034 using namespace CODEX_Quorum;
00035
00036 SocketBase::SocketBase( int domain ,
00037 int type ,
00038 int protocol ,
00039 bool blocking ) :
00040 m_msgOffset(0),
00041 m_domain(domain),
00042 m_type(type),
00043 m_protocol(protocol),
00044 m_blocking(blocking),
00045 m_port(0),
00046 m_backlog(0)
00047 {
00048 m_socket = ::socket(m_domain,m_type,m_protocol);
00049 if ( m_socket < 0 )
00050 throw QSESocketBaseBadSocket( __FILE__ ,
00051 __LINE__ ,
00052 m_domain , m_type , m_protocol , errno );
00053 }
00054
00055 SocketBase::SocketBase( const SocketBase& aOther ) :
00056 m_msgOffset(0),
00057 m_domain(aOther.m_domain),
00058 m_type(aOther.m_type),
00059 m_protocol(aOther.m_protocol),
00060 m_blocking(aOther.m_blocking),
00061 m_port(aOther.m_port),
00062 m_backlog(aOther.m_backlog),
00063 m_socket(aOther.m_socket)
00064 {}
00065
00066 SocketBase::~SocketBase()
00067 {
00068 ::close( m_socket );
00069 }
00070
00071 void
00072 SocketBase::setup( int port, int backlog )
00073 {
00074 protected_bind(port);
00075 protected_listen(backlog);
00076 }
00077
00078 void
00079 SocketBase::setup(struct sockaddr* my_addr,
00080 socklen_t addrlen,
00081 int backlog)
00082 {
00083 protected_bind(my_addr, addrlen);
00084 protected_listen(backlog);
00085 }
00086
00087 void
00088 SocketBase::connect(const RemoteServer& server)
00089 {
00090 if ( ::connect(m_socket, server.sockaddr(), server.addrlen()) < 0 )
00091 throw QSESocketBaseCannotConnect( __FILE__ ,
00092 __LINE__ ,
00093 m_socket , server , errno );
00094 }
00095
00096 SocketBase*
00097 SocketBase::accept()
00098 {
00099 return protected_accept();
00100 }
00101
00102 size_t
00103 SocketBase::readFrom( void* output, size_t maxSize ) const
00104 {
00105
00106 int bytesRead = ::read( m_socket, output, maxSize );
00107 if ( bytesRead < 0 )
00108 {
00109 throw QSESocketBaseReadFailed( __FILE__ , __LINE__ , m_socket , errno );
00110 }
00111 if ( 0 == bytesRead )
00112 {
00113 throw QSESocketBaseSocketClosed( __FILE__ , __LINE__ , m_socket , 0 );
00114 }
00115 return bytesRead;
00116 }
00117
00118 size_t
00119 SocketBase::readAll( Message& msg, size_t length ) const
00120 {
00121 const size_t buffSize = 1024;
00122 unsigned char buff[buffSize];
00123 size_t readLength;
00124 if ( 0 == msg.length() )
00125 {
00126 readLength = 1;
00127 }
00128 else
00129 {
00130 if ( 0 == length )
00131 {
00132 readLength = buffSize;
00133 }
00134 else
00135 {
00136 readLength = ( length < buffSize ) ? length : buffSize;
00137 }
00138 }
00139 size_t toRead = length;
00140 size_t nRead = readFrom( buff, readLength );
00141 if ( 0 == nRead )
00142 {
00143
00144 return 0;
00145 }
00146 int offset = 0;
00147 if ( 0 == msg.length() )
00148 {
00149
00150
00151 unsigned char nLength = buff[0];
00152 if ( nLength > 4 )
00153 {
00154 throw QSESocketBaseMessageTooLong( __FILE__ , __LINE__ ,
00155 m_socket , 0 );
00156 }
00157 nLength = ( nLength < 1 ) ? 1 : nLength;
00158 nRead += readFrom( buff+1, nLength + 1 );
00159 if ( nRead < nLength + 2U )
00160
00161
00162 {
00163 throw QSESocketBaseMessageTooShort( __FILE__ , __LINE__ ,
00164 m_socket , 0 );
00165 }
00166 toRead = 0;
00167 for ( int i = 1 ; i <= nLength ; ++i )
00168 {
00169 toRead = toRead*256 + buff[i];
00170 }
00171 offset = nLength + 1;
00172 }
00173
00174
00175 msg.fill( buff+offset, nRead-offset );
00176 toRead -= nRead-offset;
00177
00178
00179 while ( toRead > 0 )
00180 {
00181 fd_set read_set;
00182 FD_ZERO( &read_set );
00183 int s = set_fd( &read_set, SocketBase::kRead );
00184 struct timeval tv;
00185 tv.tv_sec = 0;
00186 tv.tv_usec = 0;
00187 if ( 0 > select(s+1, &read_set, 0, 0, &tv) )
00188 {
00189 throw QSESocketBaseReadFailed( __FILE__ , __LINE__ ,
00190 m_socket , errno );
00191 }
00192
00193 if ( ! isset_fd( &read_set, SocketBase::kRead ) )
00194 {
00195
00196 return toRead;
00197 }
00198
00199
00200 nRead = readFrom( buff, (toRead<buffSize)?toRead:buffSize );
00201 if ( nRead > 0 )
00202 {
00203 msg.fill(buff,nRead);
00204 }
00205
00206 toRead -= nRead;
00207 }
00208 return 0;
00209 }
00210
00211 void
00212 SocketBase::writeTo( const Message& input ) const
00213 {
00214 int messageSize = input.length();
00215
00216
00217
00218 unsigned char length_info[5];
00219 length_info[0] = 4;
00220 length_info[4] = messageSize%256;
00221 length_info[3] = (messageSize/256)%256;
00222 length_info[2] = (messageSize/65536)%256;
00223 length_info[1] = (messageSize/16777216)%256;
00224
00225
00226
00227
00228
00229 Message* m = new Message( length_info, 5 );
00230 m->fill( input );
00231 m_msgQueue.push( m );
00232 }
00233
00234 int
00235 SocketBase::set_fd( fd_set* fd_bitmap, SocketBase::StateType s ) const
00236 {
00237 FD_SET( m_socket, fd_bitmap );
00238 return m_socket;
00239 }
00240
00241 bool
00242 SocketBase::isset_fd( const fd_set* fd_bitmap, SocketBase::StateType s ) const
00243 {
00244 bool retVal = FD_ISSET( m_socket, fd_bitmap );
00245 if ( retVal && ( kWrite == s ) )
00246 {
00247
00248 while ( retVal && !m_msgQueue.empty() )
00249 {
00250 Message* first = m_msgQueue.front();
00251 const unsigned char* output = first->buffer() + m_msgOffset;
00252 size_t maxSize = first->length() - m_msgOffset;
00253 int bytesWritten = internal_write( output, maxSize );
00254 m_msgOffset += bytesWritten;
00255 if ( first->length() == m_msgOffset )
00256 {
00257
00258 delete first;
00259 m_msgQueue.pop();
00260 m_msgOffset = 0;
00261 }
00262
00263
00264 fd_set write_set;
00265 FD_ZERO( &write_set );
00266 int s = set_fd( &write_set, SocketBase::kWrite );
00267 struct timeval tv;
00268 tv.tv_sec = 0;
00269 tv.tv_usec = 0;
00270 if ( 0 > select( s+1, 0, &write_set, 0, &tv ) )
00271 {
00272 throw QSESocketBaseWriteFailed( __FILE__ , __LINE__ ,
00273 m_socket , errno );
00274 }
00275 retVal = FD_ISSET( m_socket, &write_set );
00276 }
00277 }
00278 return retVal;
00279 }
00280
00281 void
00282 SocketBase::flush() const
00283 {
00284
00285 while ( ! m_msgQueue.empty() )
00286 {
00287 Message* first = m_msgQueue.front();
00288 const unsigned char* output = first->buffer() + m_msgOffset;
00289 size_t maxSize = first->length() - m_msgOffset;
00290 try
00291 {
00292 int bytesWritten = internal_write( output, maxSize );
00293 m_msgOffset += bytesWritten;
00294 }
00295 catch ( QSESocketBaseWriteFailed& e )
00296 {
00297
00298 if ( e.error() != EAGAIN )
00299 {
00300 throw;
00301 }
00302 }
00303 if ( first->length() == m_msgOffset )
00304 {
00305
00306 delete first;
00307 m_msgQueue.pop();
00308 m_msgOffset = 0;
00309 }
00310 }
00311 }
00312
00313 int
00314 SocketBase::internal_write( const unsigned char* output, size_t maxSize ) const
00315 {
00316 int bytesWritten = ::send( m_socket, output, maxSize, 0 );
00317 if ( bytesWritten < 0 )
00318 {
00319 throw QSESocketBaseWriteFailed( __FILE__ , __LINE__ , m_socket , errno );
00320 }
00321 return bytesWritten;
00322 }
00323
00324 SocketBase*
00325 SocketBase::protected_accept()
00326 {
00327 SocketBase* newSocket = clone();
00328
00329
00330 struct sockaddr* addr = 0;
00331 socklen_t* paddrlen = 0;
00332 socklen_t addrlen = 0;
00333 switch( m_domain )
00334 {
00335 case PF_INET :
00336 addr = (struct sockaddr*) malloc(sizeof(struct sockaddr_in));
00337 addrlen = sizeof(struct sockaddr_in);
00338 paddrlen = &addrlen;
00339 break;
00340 }
00341 int newSocketFD = ::accept( m_socket, addr, paddrlen );
00342 if ( ( newSocketFD < 0 ) && ( errno != EOPNOTSUPP ) )
00343 {
00344 free( addr );
00345 throw QSESocketBaseAcceptFailed( __FILE__ , __LINE__ , m_socket, errno );
00346 }
00347 newSocket->setSocket( newSocketFD );
00348 newSocket->setBacklog( 0 );
00349 switch( m_domain )
00350 {
00351 case PF_INET :
00352 struct sockaddr_in* addr_in = (struct sockaddr_in*) addr;
00353 newSocket->setPort(addr_in->sin_port);
00354 break;
00355 }
00356 free(addr);
00357 if ( ! m_blocking )
00358 {
00359 int flags;
00360 if ( -1 == (flags = fcntl(newSocket->socket(), F_GETFL, 0)) )
00361 flags = 0;
00362 fcntl(newSocket->socket(), F_SETFL, flags | O_NONBLOCK);
00363 }
00364 newSocket->finish_accept();
00365 return newSocket;
00366 }
00367
00368 void
00369 SocketBase::finish_accept()
00370 {
00371 }
00372
00373 SocketBase*
00374 SocketBase::clone()
00375 {
00376 SocketBase* copy = new SocketBase( *this );
00377 return copy;
00378 }
00379
00380 void
00381 SocketBase::protected_bind(int port)
00382 {
00383 m_port = port;
00384 struct sockaddr_in* my_addr_in = new struct sockaddr_in;
00385 my_addr_in->sin_family = AF_INET;
00386 my_addr_in->sin_port = htons(m_port);
00387 my_addr_in->sin_addr.s_addr = htonl(INADDR_ANY);
00388
00389 struct sockaddr* my_addr = (struct sockaddr*)my_addr_in;
00390 socklen_t addrlen = sizeof(*my_addr_in);
00391
00392 protected_bind(my_addr, addrlen);
00393 delete my_addr_in;
00394 }
00395
00401 void
00402 SocketBase::protected_bind(struct sockaddr* my_addr, socklen_t addrlen)
00403 {
00404 int on = 1;
00405 if ( m_type == SOCK_STREAM )
00406 {
00407 ::setsockopt( m_socket,
00408 SOL_SOCKET,
00409 SO_REUSEADDR,
00410 (void*)&on,
00411 sizeof(on) );
00412 }
00413 if ( ::bind(m_socket, my_addr, addrlen) < 0 )
00414 throw QSESocketBaseCannotBind( __FILE__ , __LINE__ , m_socket , errno );
00415 }
00416
00417 void
00418 SocketBase::protected_listen(int backlog)
00419 {
00420 if ( m_type != SOCK_STREAM )
00421 {
00422 return;
00423 }
00424 m_backlog = backlog;
00425 if ( ::listen(m_socket, m_backlog) < 0 )
00426 throw QSESocketBaseListenFailed( __FILE__ , __LINE__ ,
00427 m_socket , m_backlog , errno );
00428 }
00429
00430 void
00431 SocketBase::setSocket( int socketFD )
00432 {
00433 m_socket = socketFD;
00434 }
00435
00436 void
00437 SocketBase::setBacklog( int backlog )
00438 {
00439 m_backlog = backlog;
00440 }
00441
00442 void
00443 SocketBase::setPort( int port )
00444 {
00445 m_port = port;
00446 }
00447
00448
00449
00450 void
00451 QSESocketBase::derivedMsg() const
00452 {
00453 cerr << "SocketBase -- ";
00454 errMsg();
00455 cerr << "\n errno = " << m_error;
00456 }
00457
00458 void
00459 QSESocketBaseBadSocket::errMsg() const
00460 {
00461 cerr << "call to socket( "
00462 << m_domain << " , "
00463 << m_type << " , "
00464 << m_protocol << " ) failed";
00465 }
00466
00467 void
00468 QSESocketBaseCannotBind::errMsg() const
00469 {
00470 cerr << "call to bind failed for socket " << m_socket;
00471 }
00472
00473 void
00474 QSESocketBaseListenFailed::errMsg() const
00475 {
00476 cerr << "call to listen failed for socket " << m_socket
00477 << ", backlog " << m_backlog;
00478 }
00479
00480 void
00481 QSESocketBaseAcceptFailed::errMsg() const
00482 {
00483 cerr << "call to accept failed for socket " << m_socket;
00484 }
00485
00486 void
00487 QSESocketBaseCannotConnect::errMsg() const
00488 {
00489 cerr << "cannot connect:\n"
00490 << " socket = " << m_socket << "\n"
00491 << " server = " << m_server.name() << ":" << m_server.port();
00492 }
00493
00494 void
00495 QSESocketBaseSocketClosed::errMsg() const
00496 {
00497 cerr << "socket " << m_socket << " is closed";
00498 }
00499
00500 void
00501 QSESocketBaseReadFailed::errMsg() const
00502 {
00503 cerr << "read failed for socket " << m_socket;
00504 }
00505
00506 void
00507 QSESocketBaseWriteFailed::errMsg() const
00508 {
00509 cerr << "write failed for socket " << m_socket;
00510 }
00511
00512 void
00513 QSESocketBaseMessageTooLong::errMsg() const
00514 {
00515 cerr << "message too long on socket " << m_socket;
00516 }
00517
00518 void
00519 QSESocketBaseMessageTooShort::errMsg() const
00520 {
00521 cerr << "message too short on socket " << m_socket;
00522 }