00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef __CODEX_QUORUM_STATICBYZANTINEQUORUMSYSTEM_H__
00019 #define __CODEX_QUORUM_STATICBYZANTINEQUORUMSYSTEM_H__
00020
00021 #include <vector>
00022 #include <errno.h>
00023 #include <sys/time.h>
00024
00025 #include "QuorumSystem.h"
00026 #include "RemoteServer.h"
00027 #include "Socket.h"
00028 #include "CODEX_Exceptions/Check.h"
00029
00030 namespace CODEX_Quorum
00031 {
00032
00038 template< int N, int T >
00039 class StaticByzantineQuorumSystem : public ByzantineQuorumSystem
00040 {
00041 public :
00043 StaticByzantineQuorumSystem( ResponseTracker* aRT );
00045 virtual ~StaticByzantineQuorumSystem();
00046
00047 unsigned int quorumSystemSize();
00048 unsigned int quorumSize();
00049 unsigned int faultsTolerated();
00050
00051 bool broadcastMessage( const Message& msg,
00052 ReturnVector& rets,
00053 ResponseCallback* cb );
00054 bool unicastMessage( const MessageVector& msgs,
00055 ReturnVector& rets,
00056 ResponseCallback* cb = 0 );
00057 void poll( MessageDispatcher& requestDispatcher,
00058 QuorumDispatcher& responseDispatcher );
00059
00066 void setServer( unsigned int i , RemoteServer* server );
00067
00069 const RemoteServer* server(unsigned int i) const;
00070
00071 protected :
00072
00073 private :
00074 RemoteServer* m_servers[N];
00075 pair<size_t,Message*> m_buffers[N];
00076 };
00077
00079 class QSESBQSReturnsBad : public QSExceptionBase
00080 {
00081 public :
00083 QSESBQSReturnsBad( const string& fname,
00084 int line,
00085 int got,
00086 int needed) :
00087 QSExceptionBase( fname, line ),
00088 m_got( got ),
00089 m_needed( needed )
00090 {}
00091
00093 virtual ~QSESBQSReturnsBad() {}
00094
00095 protected :
00096 void derivedMsg() const;
00097
00098 private :
00099 int m_got;
00100 int m_needed;
00101 };
00102
00103 template<int N, int T>
00104 StaticByzantineQuorumSystem<N,T>::StaticByzantineQuorumSystem(
00105 ResponseTracker* aRT) :
00106 ByzantineQuorumSystem( aRT )
00107 {
00108
00109 CODEX_Exceptions::Check< ( N > 3*T ) >();
00110 for ( int i = 0 ; i < N ; ++i )
00111 {
00112 m_servers[i] = 0;
00113 m_buffers[i].first = 0;
00114 m_buffers[i].second = 0;
00115 }
00116 }
00117
00118 template<int N, int T>
00119 StaticByzantineQuorumSystem<N,T>::~StaticByzantineQuorumSystem()
00120 {
00121 for ( int i = 0 ; i < N ; ++i )
00122 {
00123 if ( 0 != m_buffers[i].second )
00124 {
00125 delete m_buffers[i].second;
00126 }
00127 if ( 0 != m_servers[i] )
00128 {
00129 delete m_servers[i];
00130 }
00131 }
00132 }
00133
00134 template<int N, int T>
00135 unsigned int
00136 StaticByzantineQuorumSystem<N,T>::quorumSystemSize()
00137 {
00138 return N;
00139 }
00140
00141 template<int N, int T>
00142 unsigned int
00143 StaticByzantineQuorumSystem<N,T>::quorumSize()
00144 {
00145 int num = N + T + 1;
00146 return ( num/2 + num%2 );
00147 }
00148
00149 template<int N, int T>
00150 unsigned int
00151 StaticByzantineQuorumSystem<N,T>::faultsTolerated()
00152 {
00153 return T;
00154 }
00155
00156 template<int N, int T>
00157 bool
00158 StaticByzantineQuorumSystem<N,T>::broadcastMessage(
00159 const Message& msg, ReturnVector& rets, ResponseCallback* cb )
00160 {
00161 if ( rets.size() < N )
00162 {
00163 throw QSESBQSReturnsBad( __FILE__ , __LINE__ , rets.size() , N );
00164 }
00165
00166 unsigned char* key = extractKey( msg.buffer() );
00167 bool ownKey = true;
00168
00169 if ( 0 != cb )
00170 {
00171
00172 ResponseInfo* pRI = new ResponseInfo( cb );
00173
00174 m_responseTracker->insert( key, pRI );
00175 ownKey = false;
00176 }
00177
00178 struct timeval tv;
00179 gettimeofday( &tv, 0 );
00180
00181 unsigned int nerror = 0;
00182 ResponseInfo* pRI = (*m_responseTracker)( key );
00183 for ( int i = 0 ; i < N ; ++i )
00184 {
00185 if ( rets[i].returnCode() == RemoteServerReturn::kFailure )
00186 {
00187 if ( 0 != pRI )
00188 {
00189
00190
00191 typedef ResponseInfo::ServerMap ServerMap;
00192 const ServerMap& sMap = pRI->map();
00193 ServerMap::const_iterator smItr = sMap.find( i );
00194 if ( smItr != sMap.end() )
00195 {
00196
00197 rets[i].setReturnCode( RemoteServerReturn::kSuccess );
00198 continue;
00199 }
00200 }
00201 rets[i].updateTime( tv );
00202 }
00203 else
00204 {
00205
00206 continue;
00207 }
00208 if ( 0 != m_servers[i] )
00209 {
00210 try
00211 {
00212 m_servers[i]->sendTo(msg, rets[i]);
00213 if ( rets[i].errorCode() != RemoteServerReturn::kNone )
00214 {
00215 ++nerror;
00216 }
00217 if ( 0 != pRI )
00218 {
00219
00220 rets[i].setReturnCode(RemoteServerReturn::kFailure);
00221 }
00222 }
00223 catch ( ... )
00224 {
00225 ++nerror;
00226 delete m_servers[i];
00227 m_servers[i] = 0;
00228 }
00229 }
00230 else
00231 {
00232
00233
00234 if ( ( rets[i].returnCode() != RemoteServerReturn::kSuccess ) &&
00235 ( rets[i].errorCode() != RemoteServerReturn::kNone ) )
00236 {
00237 ++nerror;
00238 }
00239 }
00240 }
00241 if ( ownKey ) delete [] key;
00242
00243
00244 return ( 0 == nerror );
00245 }
00246
00247 template<int N, int T>
00248 bool
00249 StaticByzantineQuorumSystem<N,T>::unicastMessage(
00250 const MessageVector& msgs, ReturnVector& rets, ResponseCallback* cb )
00251 {
00252 if ( rets.size() < N )
00253 {
00254 throw QSESBQSReturnsBad( __FILE__ , __LINE__ , rets.size() , N );
00255 }
00256
00257
00258 unsigned char* key = 0;
00259 MessageVector::const_iterator item = msgs.begin();
00260 MessageVector::const_iterator end = msgs.end();
00261 for ( ; (item != end) && (0 == key) ; ++item )
00262 {
00263 if ( 0 < item->length() )
00264 {
00265 key = extractKey( item->buffer() );
00266 }
00267 }
00268 if ( 0 == key )
00269 {
00270 return false;
00271 }
00272 bool ownKey = true;
00273
00274 if ( 0 != cb )
00275 {
00276
00277 ResponseInfo* pRI = new ResponseInfo( cb );
00278 m_responseTracker->insert( key, pRI );
00279 ownKey = false;
00280 }
00281
00282 struct timeval tv;
00283 gettimeofday( &tv, 0 );
00284
00285 unsigned int nerror = 0;
00286 ResponseInfo* pRI = (*m_responseTracker)( key );
00287 for ( int i = 0 ; i < N ; ++i )
00288 {
00289 if ( rets[i].returnCode() == RemoteServerReturn::kFailure )
00290 {
00291 if ( 0 != pRI )
00292 {
00293
00294
00295 typedef ResponseInfo::ServerMap ServerMap;
00296 const ServerMap& sMap = pRI->map();
00297 ServerMap::const_iterator smItr = sMap.find( i );
00298 if ( smItr != sMap.end() )
00299 {
00300
00301 rets[i].setReturnCode( RemoteServerReturn::kSuccess );
00302 continue;
00303 }
00304 }
00305 rets[i].updateTime( tv );
00306 }
00307 else
00308 {
00309
00310 continue;
00311 }
00312 if ( (0 != m_servers[i]) && (0 < msgs[i].length()) )
00313 {
00314 try
00315 {
00316 m_servers[i]->sendTo( msgs[i], rets[i] );
00317 if ( rets[i].errorCode() != RemoteServerReturn::kNone )
00318 {
00319 ++nerror;
00320 }
00321 if ( 0 != pRI )
00322 {
00323
00324 rets[i].setReturnCode(RemoteServerReturn::kFailure);
00325 }
00326 }
00327 catch ( ... )
00328 {
00329 ++nerror;
00330 delete m_servers[i];
00331 m_servers[i] = 0;
00332 }
00333 }
00334
00335 else if ( ( 0 < msgs[i].length() ) && ( 0 == m_servers[i] ) )
00336 {
00337
00338
00339 if ( ( rets[i].returnCode() != RemoteServerReturn::kSuccess ) &&
00340 ( rets[i].errorCode() != RemoteServerReturn::kNone ) )
00341 {
00342 ++nerror;
00343 }
00344 }
00345
00346 }
00347
00348
00349 if ( ownKey ) delete [] key;
00350 return ( 0 == nerror );
00351 }
00352
00358 template<int N, int T>
00359 void
00360 StaticByzantineQuorumSystem<N,T>::poll(
00361 MessageDispatcher& requestDispatcher,
00362 QuorumDispatcher& responseDispatcher )
00363 {
00364
00365 fd_set read_set;
00366 fd_set write_set;
00367 fd_set error_set;
00368 FD_ZERO(&read_set);
00369 FD_ZERO(&write_set);
00370 FD_ZERO(&error_set);
00371
00372
00373
00374 int width = 0;
00375 for ( int i = 0 ; i < N ; ++i )
00376 {
00377 if ( 0 != m_servers[i] )
00378 {
00379 try
00380 {
00381 int skt = m_servers[i]->set_fd(&read_set, SocketBase::kRead);
00382 m_servers[i]->set_fd(&write_set, SocketBase::kWrite);
00383 m_servers[i]->set_fd(&error_set, SocketBase::kError);
00384 width = ( skt > width )? skt : width;
00385 }
00386 catch ( ... )
00387 {
00388 delete m_servers[i];
00389 m_servers[i] = 0;
00390 m_buffers[i].first = 0;
00391 if ( 0 != m_buffers[i].second )
00392 {
00393 delete m_buffers[i].second;
00394 m_buffers[i].second = 0;
00395 }
00396 }
00397 }
00398 }
00399
00400
00401 struct timeval tv;
00402 tv.tv_sec = 0;
00403 tv.tv_usec = 0;
00404 if ( 0 > select(width+1,&read_set,&write_set,&error_set,&tv) )
00405 {
00406
00407 throw QSESocketBaseReadFailed( __FILE__ , __LINE__ , 0 , errno );
00408 }
00409
00410
00411 for ( int i = 0 ; i < N ; ++i )
00412 {
00413 if ( 0 != m_servers[i] )
00414 {
00415 try
00416 {
00417
00418 if ( m_servers[i]->isset_fd(&read_set, SocketBase::kRead) )
00419 {
00420
00421 if ( 0 == m_buffers[i].second )
00422 {
00423 m_buffers[i].first = 0;
00424 m_buffers[i].second = new Message;
00425 }
00426
00427
00428
00429 RemoteServerReturn rsReturn;
00430 m_buffers[i].first =
00431 m_servers[i]->receiveFrom( *(m_buffers[i].second),
00432 rsReturn,
00433 m_buffers[i].first );
00434
00435 if ( ( 0 == m_buffers[i].first ) &&
00436 ( m_buffers[i].second->length() > 0 ) )
00437 {
00438 Message* msg = m_buffers[i].second;
00439
00440 m_buffers[i].second = 0;
00441 if ( m_responseTracker->check( msg->buffer(),
00442 msg->length(),
00443 i ) )
00444 {
00445 unsigned char* msgID = extractKey( msg->buffer() );
00446
00447 delete msg;
00448 ResponseInfo* respInfo = (*m_responseTracker)(msgID);
00449
00450
00451
00452 if ( ( respInfo->failures() > faultsTolerated() ) ||
00453 ( respInfo->successes() >= quorumSize() ) )
00454 {
00455 m_responseTracker->remove(msgID);
00456 responseDispatcher( msgID, respInfo );
00457 return;
00458 }
00459
00460 delete [] msgID;
00461 }
00462 else
00463 {
00464 requestDispatcher( msg );
00465 }
00466 }
00467 }
00468
00469 m_servers[i]->isset_fd(&write_set, SocketBase::kWrite);
00470 if ( m_servers[i]->isset_fd(&error_set, SocketBase::kError) )
00471 {
00472 }
00473 }
00474 catch ( ... )
00475 {
00476 delete m_servers[i];
00477 m_servers[i] = 0;
00478 m_buffers[i].first = 0;
00479 if ( 0 != m_buffers[i].second )
00480 {
00481 delete m_buffers[i].second;
00482 m_buffers[i].second = 0;
00483 }
00484 }
00485 }
00486 }
00487 }
00488
00489 template<int N, int T>
00490 void
00491 StaticByzantineQuorumSystem<N,T>::setServer( unsigned int i ,
00492 RemoteServer* server )
00493 {
00494 if ( i < N )
00495 {
00496 m_servers[i] = server;
00497 }
00498 }
00499
00500 template<int N, int T>
00501 const RemoteServer*
00502 StaticByzantineQuorumSystem<N,T>::server(unsigned int i) const
00503 {
00504 if ( i < N )
00505 {
00506 return m_servers[i];
00507 }
00508 return 0;
00509 }
00510
00511
00515 template< int T >
00516 class SimpleStaticByzantineQuorumSystem :
00517 public StaticByzantineQuorumSystem< 3*T + 1 , T >
00518 {
00519 public :
00521 SimpleStaticByzantineQuorumSystem( ResponseTracker* aRT ) :
00522 StaticByzantineQuorumSystem<3*T+1,T>( aRT ) {}
00524 virtual ~SimpleStaticByzantineQuorumSystem() {}
00525 };
00526
00527 }
00528
00529 #endif