00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __CODEX_QUORUM_STATICBYZANTINEQUORUMSYSTEM_H__
00022 #define __CODEX_QUORUM_STATICBYZANTINEQUORUMSYSTEM_H__
00023
00024 #include <vector>
00025 #include <errno.h>
00026 #include <sys/time.h>
00027
00028 #include "QuorumSystem.h"
00029 #include "RemoteServer.h"
00030 #include "Socket.h"
00031 #include "CODEX_Exceptions/Check.h"
00032
00033 namespace CODEX_Quorum
00034 {
00035
00041 template< int N, int T >
00042 class StaticByzantineQuorumSystem : public ByzantineQuorumSystem
00043 {
00044 public :
00046 StaticByzantineQuorumSystem( ResponseTracker* aRT );
00048 virtual ~StaticByzantineQuorumSystem();
00049
00050 unsigned int quorumSystemSize();
00051 unsigned int quorumSize();
00052 unsigned int faultsTolerated();
00053
00054 bool broadcastMessage( const Message& msg,
00055 ReturnVector& rets,
00056 ResponseCallback* cb );
00057 bool unicastMessage( const MessageVector& msgs,
00058 ReturnVector& rets,
00059 ResponseCallback* cb = 0 );
00060 void poll( MessageDispatcher& requestDispatcher,
00061 QuorumDispatcher& responseDispatcher );
00062
00069 void setServer( unsigned int i , RemoteServer* server );
00070
00072 const RemoteServer* server(unsigned int i) const;
00073
00074 protected :
00075
00076 private :
00077 RemoteServer* m_servers[N];
00078 pair<size_t,Message*> m_buffers[N];
00079 };
00080
00082 class QSESBQSReturnsBad : public QSExceptionBase
00083 {
00084 public :
00086 QSESBQSReturnsBad( const string& fname,
00087 int line,
00088 int got,
00089 int needed) :
00090 QSExceptionBase( fname, line ),
00091 m_got( got ),
00092 m_needed( needed )
00093 {}
00094
00096 virtual ~QSESBQSReturnsBad() {}
00097
00098 protected :
00099 void derivedMsg() const;
00100
00101 private :
00102 int m_got;
00103 int m_needed;
00104 };
00105
00106 template<int N, int T>
00107 StaticByzantineQuorumSystem<N,T>::StaticByzantineQuorumSystem(
00108 ResponseTracker* aRT) :
00109 ByzantineQuorumSystem( aRT )
00110 {
00111
00112 CODEX_Exceptions::Check< ( N > 3*T ) >();
00113 for ( int i = 0 ; i < N ; ++i )
00114 {
00115 m_servers[i] = 0;
00116 m_buffers[i].first = 0;
00117 m_buffers[i].second = 0;
00118 }
00119 }
00120
00121 template<int N, int T>
00122 StaticByzantineQuorumSystem<N,T>::~StaticByzantineQuorumSystem()
00123 {
00124 for ( int i = 0 ; i < N ; ++i )
00125 {
00126 if ( 0 != m_buffers[i].second )
00127 {
00128 delete m_buffers[i].second;
00129 }
00130 if ( 0 != m_servers[i] )
00131 {
00132 delete m_servers[i];
00133 }
00134 }
00135 }
00136
00137 template<int N, int T>
00138 unsigned int
00139 StaticByzantineQuorumSystem<N,T>::quorumSystemSize()
00140 {
00141 return N;
00142 }
00143
00144 template<int N, int T>
00145 unsigned int
00146 StaticByzantineQuorumSystem<N,T>::quorumSize()
00147 {
00148 int num = N + T + 1;
00149 return ( num/2 + num%2 );
00150 }
00151
00152 template<int N, int T>
00153 unsigned int
00154 StaticByzantineQuorumSystem<N,T>::faultsTolerated()
00155 {
00156 return T;
00157 }
00158
00159 template<int N, int T>
00160 bool
00161 StaticByzantineQuorumSystem<N,T>::broadcastMessage(
00162 const Message& msg, ReturnVector& rets, ResponseCallback* cb )
00163 {
00164 if ( rets.size() < N )
00165 {
00166 throw QSESBQSReturnsBad( __FILE__ , __LINE__ , rets.size() , N );
00167 }
00168
00169 unsigned char* key = extractKey( msg.buffer() );
00170 bool ownKey = true;
00171
00172 if ( 0 != cb )
00173 {
00174
00175 ResponseInfo* pRI = new ResponseInfo( cb );
00176
00177 m_responseTracker->insert( key, pRI );
00178 ownKey = false;
00179 }
00180
00181 struct timeval tv;
00182 gettimeofday( &tv, 0 );
00183
00184 unsigned int nerror = 0;
00185 ResponseInfo* pRI = (*m_responseTracker)( key );
00186 for ( int i = 0 ; i < N ; ++i )
00187 {
00188 if ( rets[i].returnCode() == RemoteServerReturn::kFailure )
00189 {
00190 if ( 0 != pRI )
00191 {
00192
00193
00194 typedef ResponseInfo::ServerMap ServerMap;
00195 const ServerMap& sMap = pRI->map();
00196 ServerMap::const_iterator smItr = sMap.find( i );
00197 if ( smItr != sMap.end() )
00198 {
00199
00200 rets[i].setReturnCode( RemoteServerReturn::kSuccess );
00201 continue;
00202 }
00203 }
00204 rets[i].updateTime( tv );
00205 }
00206 else
00207 {
00208
00209 continue;
00210 }
00211 if ( 0 != m_servers[i] )
00212 {
00213 try
00214 {
00215 m_servers[i]->sendTo(msg, rets[i]);
00216 if ( rets[i].errorCode() != RemoteServerReturn::kNone )
00217 {
00218 ++nerror;
00219 }
00220 if ( 0 != pRI )
00221 {
00222
00223 rets[i].setReturnCode(RemoteServerReturn::kFailure);
00224 }
00225 }
00226 catch ( ... )
00227 {
00228 ++nerror;
00229 delete m_servers[i];
00230 m_servers[i] = 0;
00231 }
00232 }
00233 else
00234 {
00235
00236
00237 if ( ( rets[i].returnCode() != RemoteServerReturn::kSuccess ) &&
00238 ( rets[i].errorCode() != RemoteServerReturn::kNone ) )
00239 {
00240 ++nerror;
00241 }
00242 }
00243 }
00244 if ( ownKey ) delete [] key;
00245
00246
00247 return ( 0 == nerror );
00248 }
00249
00250 template<int N, int T>
00251 bool
00252 StaticByzantineQuorumSystem<N,T>::unicastMessage(
00253 const MessageVector& msgs, ReturnVector& rets, ResponseCallback* cb )
00254 {
00255 if ( rets.size() < N )
00256 {
00257 throw QSESBQSReturnsBad( __FILE__ , __LINE__ , rets.size() , N );
00258 }
00259
00260
00261 unsigned char* key = 0;
00262 MessageVector::const_iterator item = msgs.begin();
00263 MessageVector::const_iterator end = msgs.end();
00264 for ( ; (item != end) && (0 == key) ; ++item )
00265 {
00266 if ( 0 < item->length() )
00267 {
00268 key = extractKey( item->buffer() );
00269 }
00270 }
00271 if ( 0 == key )
00272 {
00273 return false;
00274 }
00275 bool ownKey = true;
00276
00277 if ( 0 != cb )
00278 {
00279
00280 ResponseInfo* pRI = new ResponseInfo( cb );
00281 m_responseTracker->insert( key, pRI );
00282 ownKey = false;
00283 }
00284
00285 struct timeval tv;
00286 gettimeofday( &tv, 0 );
00287
00288 unsigned int nerror = 0;
00289 ResponseInfo* pRI = (*m_responseTracker)( key );
00290 for ( int i = 0 ; i < N ; ++i )
00291 {
00292 if ( rets[i].returnCode() == RemoteServerReturn::kFailure )
00293 {
00294 if ( 0 != pRI )
00295 {
00296
00297
00298 typedef ResponseInfo::ServerMap ServerMap;
00299 const ServerMap& sMap = pRI->map();
00300 ServerMap::const_iterator smItr = sMap.find( i );
00301 if ( smItr != sMap.end() )
00302 {
00303
00304 rets[i].setReturnCode( RemoteServerReturn::kSuccess );
00305 continue;
00306 }
00307 }
00308 rets[i].updateTime( tv );
00309 }
00310 else
00311 {
00312
00313 continue;
00314 }
00315 if ( (0 != m_servers[i]) && (0 < msgs[i].length()) )
00316 {
00317 try
00318 {
00319 m_servers[i]->sendTo( msgs[i], rets[i] );
00320 if ( rets[i].errorCode() != RemoteServerReturn::kNone )
00321 {
00322 ++nerror;
00323 }
00324 if ( 0 != pRI )
00325 {
00326
00327 rets[i].setReturnCode(RemoteServerReturn::kFailure);
00328 }
00329 }
00330 catch ( ... )
00331 {
00332 ++nerror;
00333 delete m_servers[i];
00334 m_servers[i] = 0;
00335 }
00336 }
00337
00338 else if ( ( 0 < msgs[i].length() ) && ( 0 == m_servers[i] ) )
00339 {
00340
00341
00342 if ( ( rets[i].returnCode() != RemoteServerReturn::kSuccess ) &&
00343 ( rets[i].errorCode() != RemoteServerReturn::kNone ) )
00344 {
00345 ++nerror;
00346 }
00347 }
00348
00349 }
00350
00351
00352 if ( ownKey ) delete [] key;
00353 return ( 0 == nerror );
00354 }
00355
00361 template<int N, int T>
00362 void
00363 StaticByzantineQuorumSystem<N,T>::poll(
00364 MessageDispatcher& requestDispatcher,
00365 QuorumDispatcher& responseDispatcher )
00366 {
00367
00368 fd_set read_set;
00369 fd_set write_set;
00370 fd_set error_set;
00371 FD_ZERO(&read_set);
00372 FD_ZERO(&write_set);
00373 FD_ZERO(&error_set);
00374
00375
00376
00377 int width = 0;
00378 for ( int i = 0 ; i < N ; ++i )
00379 {
00380 if ( 0 != m_servers[i] )
00381 {
00382 try
00383 {
00384 int skt = m_servers[i]->set_fd(&read_set, SocketBase::kRead);
00385 m_servers[i]->set_fd(&write_set, SocketBase::kWrite);
00386 m_servers[i]->set_fd(&error_set, SocketBase::kError);
00387 width = ( skt > width )? skt : width;
00388 }
00389 catch ( ... )
00390 {
00391 delete m_servers[i];
00392 m_servers[i] = 0;
00393 m_buffers[i].first = 0;
00394 if ( 0 != m_buffers[i].second )
00395 {
00396 delete m_buffers[i].second;
00397 m_buffers[i].second = 0;
00398 }
00399 }
00400 }
00401 }
00402
00403
00404 struct timeval tv;
00405 tv.tv_sec = 0;
00406 tv.tv_usec = 0;
00407 if ( 0 > select(width+1,&read_set,&write_set,&error_set,&tv) )
00408 {
00409
00410 throw QSESocketBaseReadFailed( __FILE__ , __LINE__ , 0 , errno );
00411 }
00412
00413
00414 for ( int i = 0 ; i < N ; ++i )
00415 {
00416 if ( 0 != m_servers[i] )
00417 {
00418 try
00419 {
00420
00421 if ( m_servers[i]->isset_fd(&read_set, SocketBase::kRead) )
00422 {
00423
00424 if ( 0 == m_buffers[i].second )
00425 {
00426 m_buffers[i].first = 0;
00427 m_buffers[i].second = new Message;
00428 }
00429
00430
00431
00432 RemoteServerReturn rsReturn;
00433 m_buffers[i].first =
00434 m_servers[i]->receiveFrom( *(m_buffers[i].second),
00435 rsReturn,
00436 m_buffers[i].first );
00437
00438 if ( ( 0 == m_buffers[i].first ) &&
00439 ( m_buffers[i].second->length() > 0 ) )
00440 {
00441 Message* msg = m_buffers[i].second;
00442
00443 m_buffers[i].second = 0;
00444 if ( m_responseTracker->check( msg->buffer(),
00445 msg->length(),
00446 i ) )
00447 {
00448 unsigned char* msgID = extractKey( msg->buffer() );
00449
00450 delete msg;
00451 ResponseInfo* respInfo = (*m_responseTracker)(msgID);
00452
00453
00454
00455 if ( ( respInfo->failures() > faultsTolerated() ) ||
00456 ( respInfo->successes() >= quorumSize() ) )
00457 {
00458 m_responseTracker->remove(msgID);
00459 responseDispatcher( msgID, respInfo );
00460 return;
00461 }
00462
00463 delete [] msgID;
00464 }
00465 else
00466 {
00467 requestDispatcher( msg );
00468 }
00469 }
00470 }
00471
00472 m_servers[i]->isset_fd(&write_set, SocketBase::kWrite);
00473 if ( m_servers[i]->isset_fd(&error_set, SocketBase::kError) )
00474 {
00475 }
00476 }
00477 catch ( ... )
00478 {
00479 delete m_servers[i];
00480 m_servers[i] = 0;
00481 m_buffers[i].first = 0;
00482 if ( 0 != m_buffers[i].second )
00483 {
00484 delete m_buffers[i].second;
00485 m_buffers[i].second = 0;
00486 }
00487 }
00488 }
00489 }
00490 }
00491
00492 template<int N, int T>
00493 void
00494 StaticByzantineQuorumSystem<N,T>::setServer( unsigned int i ,
00495 RemoteServer* server )
00496 {
00497 if ( i < N )
00498 {
00499 m_servers[i] = server;
00500 }
00501 }
00502
00503 template<int N, int T>
00504 const RemoteServer*
00505 StaticByzantineQuorumSystem<N,T>::server(unsigned int i) const
00506 {
00507 if ( i < N )
00508 {
00509 return m_servers[i];
00510 }
00511 return 0;
00512 }
00513
00514
00518 template< int T >
00519 class SimpleStaticByzantineQuorumSystem :
00520 public StaticByzantineQuorumSystem< 3*T + 1 , T >
00521 {
00522 public :
00524 SimpleStaticByzantineQuorumSystem( ResponseTracker* aRT ) :
00525 StaticByzantineQuorumSystem<3*T+1,T>( aRT ) {}
00527 virtual ~SimpleStaticByzantineQuorumSystem() {}
00528 };
00529
00530 }
00531
00532 #endif