Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

StaticByzantineQuorumSystem.h

00001 /*
00002  * Copyright 2003 Michael A. Marsh, Cornell University. All rights reserved.
00003  * This software is released under the modified BSD license.
00004  * See the file LICENSE in the top-level directory for details.
00005  */
00006 //
00007 // $Id: StaticByzantineQuorumSystem.h,v 1.4 2005/01/21 19:44:17 mmarsh Exp $
00008 //
00009 // $Log: StaticByzantineQuorumSystem.h,v $
00010 // Revision 1.4  2005/01/21 19:44:17  mmarsh
00011 // Updated for compatibility with Doxygen 1.4.1
00012 //
00013 // Revision 1.3  2004/05/19 15:56:56  mmarsh
00014 // *** empty log message ***
00015 //
00016 // Revision 1.2  2003/11/04 22:31:50  mmarsh
00017 // *** empty log message ***
00018 //
00019 //
00020 
00021 #ifndef __CODEX_QUORUM_STATICBYZANTINEQUORUMSYSTEM_H__
00022 #define __CODEX_QUORUM_STATICBYZANTINEQUORUMSYSTEM_H__
00023 
00024 #include <vector>
00025 #include <errno.h>
00026 #include <sys/time.h>
00027 
00028 #include "QuorumSystem.h"
00029 #include "RemoteServer.h"
00030 #include "Socket.h"
00031 #include "CODEX_Exceptions/Check.h"
00032 
00033 namespace CODEX_Quorum
00034 {
00035    
00041    template< int N, int T >
00042    class StaticByzantineQuorumSystem : public ByzantineQuorumSystem
00043    {
00044       public :
00046          StaticByzantineQuorumSystem( ResponseTracker* aRT );
00048          virtual ~StaticByzantineQuorumSystem();
00049 
00050          unsigned int quorumSystemSize();
00051          unsigned int quorumSize();
00052          unsigned int faultsTolerated();
00053 
00054          bool broadcastMessage( const Message& msg,
00055                                 ReturnVector& rets,
00056                                 ResponseCallback* cb );
00057          bool unicastMessage( const MessageVector& msgs,
00058                               ReturnVector& rets,
00059                               ResponseCallback* cb = 0 );
00060          void poll( MessageDispatcher& requestDispatcher,
00061                     QuorumDispatcher& responseDispatcher );
00062 
00069          void setServer( unsigned int i , RemoteServer* server );
00070 
00072          const RemoteServer* server(unsigned int i) const;
00073 
00074       protected :
00075 
00076       private :
00077          RemoteServer*          m_servers[N];
00078          pair<size_t,Message*>  m_buffers[N];
00079    };
00080 
00082    class QSESBQSReturnsBad : public QSExceptionBase
00083    {
00084       public :
00086          QSESBQSReturnsBad( const string& fname,
00087                             int line,
00088                             int got,
00089                             int needed) :
00090             QSExceptionBase( fname, line ),
00091             m_got( got ),
00092             m_needed( needed )
00093          {}
00094 
00096          virtual ~QSESBQSReturnsBad() {}
00097 
00098       protected :
00099          void derivedMsg() const;
00100 
00101       private :
00102          int m_got;
00103          int m_needed;
00104    };
00105 
00106    template<int N, int T>
00107    StaticByzantineQuorumSystem<N,T>::StaticByzantineQuorumSystem(
00108       ResponseTracker* aRT) :
00109       ByzantineQuorumSystem( aRT )
00110    {
00111       // Link-time check of template parameters
00112       CODEX_Exceptions::Check< ( N > 3*T ) >();
00113       for ( int i = 0 ; i < N ; ++i )
00114       {
00115          m_servers[i] = 0;
00116          m_buffers[i].first = 0;
00117          m_buffers[i].second = 0;
00118       }
00119    }
00120 
00121    template<int N, int T>
00122    StaticByzantineQuorumSystem<N,T>::~StaticByzantineQuorumSystem()
00123    {
00124       for ( int i = 0 ; i < N ; ++i )
00125       {
00126          if ( 0 != m_buffers[i].second )
00127          {
00128             delete m_buffers[i].second;
00129          }
00130          if ( 0 != m_servers[i] )
00131          {
00132             delete m_servers[i];
00133          }
00134       }
00135    }
00136 
00137    template<int N, int T>
00138    unsigned int
00139    StaticByzantineQuorumSystem<N,T>::quorumSystemSize()
00140    {
00141       return N;
00142    }
00143 
00144    template<int N, int T>
00145    unsigned int
00146    StaticByzantineQuorumSystem<N,T>::quorumSize()
00147    {
00148       int num = N + T + 1;
00149       return ( num/2 + num%2 );
00150    }
00151 
00152    template<int N, int T>
00153    unsigned int
00154    StaticByzantineQuorumSystem<N,T>::faultsTolerated()
00155    {
00156       return T;
00157    }
00158 
00159    template<int N, int T>
00160    bool
00161    StaticByzantineQuorumSystem<N,T>::broadcastMessage(
00162       const Message& msg, ReturnVector& rets, ResponseCallback* cb )
00163    {
00164       if ( rets.size() < N )
00165       {
00166          throw QSESBQSReturnsBad( __FILE__ , __LINE__ , rets.size() , N );
00167       }
00168 
00169       unsigned char* key = extractKey( msg.buffer() );
00170       bool ownKey = true;
00171 
00172       if ( 0 != cb )
00173       {
00174          // Register this message with the ResponseTracker
00175          ResponseInfo* pRI = new ResponseInfo( cb );
00176 //         unsigned char* key = extractKey( msg.buffer() );
00177          m_responseTracker->insert( key, pRI );
00178          ownKey = false;
00179       }
00180 
00181       struct timeval tv;
00182       gettimeofday( &tv, 0 );
00183 
00184       unsigned int nerror = 0;
00185       ResponseInfo* pRI = (*m_responseTracker)( key );
00186       for ( int i = 0 ; i < N ; ++i )
00187       {
00188          if ( rets[i].returnCode() == RemoteServerReturn::kFailure )
00189          {
00190             if ( 0 != pRI )
00191             {
00192                // This relies on the internal map, because ResponseInfo doesn't
00193                // in principle know how many servers to keep track of.
00194                typedef ResponseInfo::ServerMap ServerMap;
00195                const ServerMap& sMap = pRI->map();
00196                ServerMap::const_iterator smItr = sMap.find( i );
00197                if ( smItr != sMap.end() )
00198                {
00199                   // This server has already been contacted successfully.
00200                   rets[i].setReturnCode( RemoteServerReturn::kSuccess );
00201                   continue;
00202                }
00203             }
00204             rets[i].updateTime( tv );
00205          }
00206          else
00207          {
00208             // This server has already been contacted successfully.
00209             continue;
00210          }
00211          if ( 0 != m_servers[i] )
00212          {
00213             try
00214             {
00215                m_servers[i]->sendTo(msg, rets[i]);
00216                if ( rets[i].errorCode() != RemoteServerReturn::kNone )
00217                {
00218                   ++nerror;
00219                }
00220                if ( 0 != pRI )
00221                {
00222                   // We only count success when a response is received.
00223                   rets[i].setReturnCode(RemoteServerReturn::kFailure);
00224                }
00225             }
00226             catch ( ... )
00227             {
00228                ++nerror;
00229                delete m_servers[i];
00230                m_servers[i] = 0;
00231             }
00232          }
00233          else
00234          {
00235             // We don't have a connection to this server, but do we
00236             // actually need one at this point?
00237             if ( ( rets[i].returnCode() != RemoteServerReturn::kSuccess ) &&
00238                  ( rets[i].errorCode() != RemoteServerReturn::kNone ) )
00239             {
00240                ++nerror;
00241             }
00242          }
00243       }
00244       if ( ownKey ) delete [] key;
00245       // Old semantics
00246       //return ( nerror <= faultsTolerated() );
00247       return ( 0 == nerror );
00248    }
00249 
00250    template<int N, int T>
00251    bool
00252    StaticByzantineQuorumSystem<N,T>::unicastMessage(
00253       const MessageVector& msgs, ReturnVector& rets, ResponseCallback* cb )
00254    {
00255       if ( rets.size() < N )
00256       {
00257          throw QSESBQSReturnsBad( __FILE__ , __LINE__ , rets.size() , N );
00258       }
00259 
00260       // Make sure there's a message to send.
00261       unsigned char* key = 0;
00262       MessageVector::const_iterator item = msgs.begin();
00263       MessageVector::const_iterator end = msgs.end();
00264       for ( ; (item != end) && (0 == key) ; ++item )
00265       {
00266          if ( 0 < item->length() )
00267          {
00268             key = extractKey( item->buffer() );
00269          }
00270       }
00271       if ( 0 == key )
00272       {
00273          return false;
00274       }
00275       bool ownKey = true;
00276 
00277       if ( 0 != cb )
00278       {
00279          // Register this message with the ResponseTracker
00280          ResponseInfo* pRI = new ResponseInfo( cb );
00281          m_responseTracker->insert( key, pRI );
00282          ownKey = false;
00283       }
00284 
00285       struct timeval tv;
00286       gettimeofday( &tv, 0 );
00287 
00288       unsigned int nerror = 0;
00289       ResponseInfo* pRI = (*m_responseTracker)( key );
00290       for ( int i = 0 ; i < N ; ++i )
00291       {
00292          if ( rets[i].returnCode() == RemoteServerReturn::kFailure )
00293          {
00294             if ( 0 != pRI )
00295             {
00296                // This relies on the internal map, because ResponseInfo doesn't
00297                // in principle know how many servers to keep track of.
00298                typedef ResponseInfo::ServerMap ServerMap;
00299                const ServerMap& sMap = pRI->map();
00300                ServerMap::const_iterator smItr = sMap.find( i );
00301                if ( smItr != sMap.end() )
00302                {
00303                   // This server has already been contacted successfully.
00304                   rets[i].setReturnCode( RemoteServerReturn::kSuccess );
00305                   continue;
00306                }
00307             }
00308             rets[i].updateTime( tv );
00309          }
00310          else
00311          {
00312             // This server has already been contacted successfully.
00313             continue;
00314          }
00315          if ( (0 != m_servers[i]) && (0 < msgs[i].length()) )
00316          {
00317             try
00318             {
00319                m_servers[i]->sendTo( msgs[i], rets[i] );
00320                if ( rets[i].errorCode() != RemoteServerReturn::kNone )
00321                {
00322                   ++nerror;
00323                }
00324                if ( 0 != pRI )
00325                {
00326                   // We only count success when a response is received.
00327                   rets[i].setReturnCode(RemoteServerReturn::kFailure);
00328                }
00329             }
00330             catch ( ... )
00331             {
00332                ++nerror;
00333                delete m_servers[i];
00334                m_servers[i] = 0;
00335             }
00336          }
00337          // If we have a message but the server's not there, it's an error.
00338          else if ( ( 0 < msgs[i].length() ) && ( 0 == m_servers[i] ) )
00339          {
00340             // We don't have a connection to this server, but do we
00341             // actually need one at this point?
00342             if ( ( rets[i].returnCode() != RemoteServerReturn::kSuccess ) &&
00343                  ( rets[i].errorCode() != RemoteServerReturn::kNone ) )
00344             {
00345                ++nerror;
00346             }
00347          }
00348             
00349       }
00350       // Old semantics
00351       //return ( nerror <= faultsTolerated() );
00352       if ( ownKey ) delete [] key;
00353       return ( 0 == nerror );
00354    }
00355 
00361    template<int N, int T>
00362    void
00363    StaticByzantineQuorumSystem<N,T>::poll(
00364       MessageDispatcher& requestDispatcher,
00365       QuorumDispatcher& responseDispatcher )
00366    {
00367       // bitmaps for select(2)
00368       fd_set read_set;
00369       fd_set write_set;
00370       fd_set error_set;
00371       FD_ZERO(&read_set);
00372       FD_ZERO(&write_set);
00373       FD_ZERO(&error_set);
00374 
00375       // loop over sockets to remote servers setting read, write, and
00376       // error bitmaps for select(2)
00377       int width = 0;
00378       for ( int i = 0 ; i < N ; ++i )
00379       {
00380          if ( 0 != m_servers[i] )
00381          {
00382             try
00383             {
00384                int skt = m_servers[i]->set_fd(&read_set, SocketBase::kRead);
00385                m_servers[i]->set_fd(&write_set, SocketBase::kWrite);
00386                m_servers[i]->set_fd(&error_set, SocketBase::kError);
00387                width = ( skt > width )? skt : width;
00388             }
00389             catch ( ... )
00390             {
00391                delete m_servers[i];
00392                m_servers[i] = 0;
00393                m_buffers[i].first = 0;
00394                if ( 0 != m_buffers[i].second )
00395                {
00396                   delete m_buffers[i].second;
00397                   m_buffers[i].second = 0;
00398                }
00399             }
00400          }
00401       }
00402 
00403       // we want select(2) to be non-blocking, so we set the timeout to 0
00404       struct timeval tv;
00405       tv.tv_sec = 0;
00406       tv.tv_usec = 0;
00407       if ( 0 > select(width+1,&read_set,&write_set,&error_set,&tv) )
00408       {
00409          // wrong exception to throw!
00410          throw QSESocketBaseReadFailed( __FILE__ , __LINE__ , 0 , errno );
00411       }
00412 
00413       // now check for data to be read
00414       for ( int i = 0 ; i < N ; ++i )
00415       {
00416          if ( 0 != m_servers[i] )
00417          {
00418             try
00419             {
00420                // is there data to read?
00421                if ( m_servers[i]->isset_fd(&read_set, SocketBase::kRead) )
00422                {
00423                   // we may have read a partial message previously
00424                   if ( 0 == m_buffers[i].second )
00425                   {
00426                      m_buffers[i].first = 0;
00427                      m_buffers[i].second = new Message;
00428                   }
00429                   // grab everything we can without blocking, up to the
00430                   // full length of a message -- this will append to an
00431                   // existing buffer
00432                   RemoteServerReturn rsReturn;
00433                   m_buffers[i].first =
00434                      m_servers[i]->receiveFrom( *(m_buffers[i].second),
00435                                                 rsReturn,
00436                                                 m_buffers[i].first );
00437                   // have we read the entire message?
00438                   if ( ( 0 == m_buffers[i].first ) &&
00439                        ( m_buffers[i].second->length() > 0 ) )
00440                   {
00441                      Message* msg = m_buffers[i].second;
00442                      // we no longer need the buffer
00443                      m_buffers[i].second = 0;
00444                      if ( m_responseTracker->check( msg->buffer(),
00445                                                     msg->length(),
00446                                                     i ) )
00447                      {
00448                         unsigned char* msgID = extractKey( msg->buffer() );
00449                         // We're done with msg -- all its data has been copied.
00450                         delete msg;
00451                         ResponseInfo* respInfo = (*m_responseTracker)(msgID);
00452                         // If we've collected enough responses, then dispatch
00453                         // the response information to the functional object
00454                         // passed to this method.
00455                         if ( ( respInfo->failures()  >  faultsTolerated() ) ||
00456                              ( respInfo->successes() >= quorumSize()      ) )
00457                         {
00458                            m_responseTracker->remove(msgID);
00459                            responseDispatcher( msgID, respInfo );
00460                            return;
00461                         }
00462                         // Now clean up the message ID, since no one else will.
00463                         delete [] msgID;
00464                      }
00465                      else
00466                      {
00467                         requestDispatcher( msg );
00468                      }
00469                   }
00470                }
00471                // used to manage internal state of the socket
00472                m_servers[i]->isset_fd(&write_set, SocketBase::kWrite);
00473                if ( m_servers[i]->isset_fd(&error_set, SocketBase::kError) )
00474                {
00475                }
00476             }
00477             catch ( ... )
00478             {
00479                delete m_servers[i];
00480                m_servers[i] = 0;
00481                m_buffers[i].first = 0;
00482                if ( 0 != m_buffers[i].second )
00483                {
00484                   delete m_buffers[i].second;
00485                   m_buffers[i].second = 0;
00486                }
00487             }
00488          }
00489       }
00490    }
00491 
00492    template<int N, int T>
00493    void
00494    StaticByzantineQuorumSystem<N,T>::setServer( unsigned int i ,
00495                                                 RemoteServer* server )
00496    {
00497       if ( i < N )
00498       {
00499          m_servers[i] = server;
00500       }
00501    }
00502 
00503    template<int N, int T>
00504    const RemoteServer*
00505    StaticByzantineQuorumSystem<N,T>::server(unsigned int i) const
00506    {
00507       if ( i < N )
00508       {
00509          return m_servers[i];
00510       }
00511       return 0;
00512    }
00513 
00514 
00518    template< int T >
00519    class SimpleStaticByzantineQuorumSystem :
00520          public StaticByzantineQuorumSystem< 3*T + 1 , T >
00521    {
00522       public :
00524          SimpleStaticByzantineQuorumSystem( ResponseTracker* aRT ) :
00525             StaticByzantineQuorumSystem<3*T+1,T>( aRT ) {}
00527          virtual ~SimpleStaticByzantineQuorumSystem() {}
00528    };
00529 
00530 }
00531 
00532 #endif /* __CODEX_QUORUM_STATICBYZANTINEQUORUMSYSTEM_H__ */

Generated on Fri May 6 17:41:27 2005 for COrnell Data EXchange (CODEX) by  doxygen 1.4.1