Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Namespace Members   Compound Members   Related Pages  

StaticByzantineQuorumSystem.h

00001 /*
00002  * Copyright 2003 Michael A. Marsh, Cornell University. All rights reserved.
00003  * This software is released under the modified BSD license.
00004  * See the file LICENSE in the top-level directory for details.
00005  */
00006 //
00007 // $Id: StaticByzantineQuorumSystem.h,v 1.3 2004/05/19 15:56:56 mmarsh Exp $
00008 //
00009 // $Log: StaticByzantineQuorumSystem.h,v $
00010 // Revision 1.3  2004/05/19 15:56:56  mmarsh
00011 // *** empty log message ***
00012 //
00013 // Revision 1.2  2003/11/04 22:31:50  mmarsh
00014 // *** empty log message ***
00015 //
00016 //
00017 
00018 #ifndef __CODEX_QUORUM_STATICBYZANTINEQUORUMSYSTEM_H__
00019 #define __CODEX_QUORUM_STATICBYZANTINEQUORUMSYSTEM_H__
00020 
00021 #include <vector>
00022 #include <errno.h>
00023 #include <sys/time.h>
00024 
00025 #include "QuorumSystem.h"
00026 #include "RemoteServer.h"
00027 #include "Socket.h"
00028 #include "CODEX_Exceptions/Check.h"
00029 
00030 namespace CODEX_Quorum
00031 {
00032    
00038    template< int N, int T >
00039    class StaticByzantineQuorumSystem : public ByzantineQuorumSystem
00040    {
00041       public :
00043          StaticByzantineQuorumSystem( ResponseTracker* aRT );
00045          virtual ~StaticByzantineQuorumSystem();
00046 
00047          unsigned int quorumSystemSize();
00048          unsigned int quorumSize();
00049          unsigned int faultsTolerated();
00050 
00051          bool broadcastMessage( const Message& msg,
00052                                 ReturnVector& rets,
00053                                 ResponseCallback* cb );
00054          bool unicastMessage( const MessageVector& msgs,
00055                               ReturnVector& rets,
00056                               ResponseCallback* cb = 0 );
00057          void poll( MessageDispatcher& requestDispatcher,
00058                     QuorumDispatcher& responseDispatcher );
00059 
00066          void setServer( unsigned int i , RemoteServer* server );
00067 
00069          const RemoteServer* server(unsigned int i) const;
00070 
00071       protected :
00072 
00073       private :
00074          RemoteServer*          m_servers[N];
00075          pair<size_t,Message*>  m_buffers[N];
00076    };
00077 
00079    class QSESBQSReturnsBad : public QSExceptionBase
00080    {
00081       public :
00083          QSESBQSReturnsBad( const string& fname,
00084                             int line,
00085                             int got,
00086                             int needed) :
00087             QSExceptionBase( fname, line ),
00088             m_got( got ),
00089             m_needed( needed )
00090          {}
00091 
00093          virtual ~QSESBQSReturnsBad() {}
00094 
00095       protected :
00096          void derivedMsg() const;
00097 
00098       private :
00099          int m_got;
00100          int m_needed;
00101    };
00102 
00103    template<int N, int T>
00104    StaticByzantineQuorumSystem<N,T>::StaticByzantineQuorumSystem(
00105       ResponseTracker* aRT) :
00106       ByzantineQuorumSystem( aRT )
00107    {
00108       // Link-time check of template parameters
00109       CODEX_Exceptions::Check< ( N > 3*T ) >();
00110       for ( int i = 0 ; i < N ; ++i )
00111       {
00112          m_servers[i] = 0;
00113          m_buffers[i].first = 0;
00114          m_buffers[i].second = 0;
00115       }
00116    }
00117 
00118    template<int N, int T>
00119    StaticByzantineQuorumSystem<N,T>::~StaticByzantineQuorumSystem()
00120    {
00121       for ( int i = 0 ; i < N ; ++i )
00122       {
00123          if ( 0 != m_buffers[i].second )
00124          {
00125             delete m_buffers[i].second;
00126          }
00127          if ( 0 != m_servers[i] )
00128          {
00129             delete m_servers[i];
00130          }
00131       }
00132    }
00133 
00134    template<int N, int T>
00135    unsigned int
00136    StaticByzantineQuorumSystem<N,T>::quorumSystemSize()
00137    {
00138       return N;
00139    }
00140 
00141    template<int N, int T>
00142    unsigned int
00143    StaticByzantineQuorumSystem<N,T>::quorumSize()
00144    {
00145       int num = N + T + 1;
00146       return ( num/2 + num%2 );
00147    }
00148 
00149    template<int N, int T>
00150    unsigned int
00151    StaticByzantineQuorumSystem<N,T>::faultsTolerated()
00152    {
00153       return T;
00154    }
00155 
00156    template<int N, int T>
00157    bool
00158    StaticByzantineQuorumSystem<N,T>::broadcastMessage(
00159       const Message& msg, ReturnVector& rets, ResponseCallback* cb )
00160    {
00161       if ( rets.size() < N )
00162       {
00163          throw QSESBQSReturnsBad( __FILE__ , __LINE__ , rets.size() , N );
00164       }
00165 
00166       unsigned char* key = extractKey( msg.buffer() );
00167       bool ownKey = true;
00168 
00169       if ( 0 != cb )
00170       {
00171          // Register this message with the ResponseTracker
00172          ResponseInfo* pRI = new ResponseInfo( cb );
00173 //         unsigned char* key = extractKey( msg.buffer() );
00174          m_responseTracker->insert( key, pRI );
00175          ownKey = false;
00176       }
00177 
00178       struct timeval tv;
00179       gettimeofday( &tv, 0 );
00180 
00181       unsigned int nerror = 0;
00182       ResponseInfo* pRI = (*m_responseTracker)( key );
00183       for ( int i = 0 ; i < N ; ++i )
00184       {
00185          if ( rets[i].returnCode() == RemoteServerReturn::kFailure )
00186          {
00187             if ( 0 != pRI )
00188             {
00189                // This relies on the internal map, because ResponseInfo doesn't
00190                // in principle know how many servers to keep track of.
00191                typedef ResponseInfo::ServerMap ServerMap;
00192                const ServerMap& sMap = pRI->map();
00193                ServerMap::const_iterator smItr = sMap.find( i );
00194                if ( smItr != sMap.end() )
00195                {
00196                   // This server has already been contacted successfully.
00197                   rets[i].setReturnCode( RemoteServerReturn::kSuccess );
00198                   continue;
00199                }
00200             }
00201             rets[i].updateTime( tv );
00202          }
00203          else
00204          {
00205             // This server has already been contacted successfully.
00206             continue;
00207          }
00208          if ( 0 != m_servers[i] )
00209          {
00210             try
00211             {
00212                m_servers[i]->sendTo(msg, rets[i]);
00213                if ( rets[i].errorCode() != RemoteServerReturn::kNone )
00214                {
00215                   ++nerror;
00216                }
00217                if ( 0 != pRI )
00218                {
00219                   // We only count success when a response is received.
00220                   rets[i].setReturnCode(RemoteServerReturn::kFailure);
00221                }
00222             }
00223             catch ( ... )
00224             {
00225                ++nerror;
00226                delete m_servers[i];
00227                m_servers[i] = 0;
00228             }
00229          }
00230          else
00231          {
00232             // We don't have a connection to this server, but do we
00233             // actually need one at this point?
00234             if ( ( rets[i].returnCode() != RemoteServerReturn::kSuccess ) &&
00235                  ( rets[i].errorCode() != RemoteServerReturn::kNone ) )
00236             {
00237                ++nerror;
00238             }
00239          }
00240       }
00241       if ( ownKey ) delete [] key;
00242       // Old semantics
00243       //return ( nerror <= faultsTolerated() );
00244       return ( 0 == nerror );
00245    }
00246 
00247    template<int N, int T>
00248    bool
00249    StaticByzantineQuorumSystem<N,T>::unicastMessage(
00250       const MessageVector& msgs, ReturnVector& rets, ResponseCallback* cb )
00251    {
00252       if ( rets.size() < N )
00253       {
00254          throw QSESBQSReturnsBad( __FILE__ , __LINE__ , rets.size() , N );
00255       }
00256 
00257       // Make sure there's a message to send.
00258       unsigned char* key = 0;
00259       MessageVector::const_iterator item = msgs.begin();
00260       MessageVector::const_iterator end = msgs.end();
00261       for ( ; (item != end) && (0 == key) ; ++item )
00262       {
00263          if ( 0 < item->length() )
00264          {
00265             key = extractKey( item->buffer() );
00266          }
00267       }
00268       if ( 0 == key )
00269       {
00270          return false;
00271       }
00272       bool ownKey = true;
00273 
00274       if ( 0 != cb )
00275       {
00276          // Register this message with the ResponseTracker
00277          ResponseInfo* pRI = new ResponseInfo( cb );
00278          m_responseTracker->insert( key, pRI );
00279          ownKey = false;
00280       }
00281 
00282       struct timeval tv;
00283       gettimeofday( &tv, 0 );
00284 
00285       unsigned int nerror = 0;
00286       ResponseInfo* pRI = (*m_responseTracker)( key );
00287       for ( int i = 0 ; i < N ; ++i )
00288       {
00289          if ( rets[i].returnCode() == RemoteServerReturn::kFailure )
00290          {
00291             if ( 0 != pRI )
00292             {
00293                // This relies on the internal map, because ResponseInfo doesn't
00294                // in principle know how many servers to keep track of.
00295                typedef ResponseInfo::ServerMap ServerMap;
00296                const ServerMap& sMap = pRI->map();
00297                ServerMap::const_iterator smItr = sMap.find( i );
00298                if ( smItr != sMap.end() )
00299                {
00300                   // This server has already been contacted successfully.
00301                   rets[i].setReturnCode( RemoteServerReturn::kSuccess );
00302                   continue;
00303                }
00304             }
00305             rets[i].updateTime( tv );
00306          }
00307          else
00308          {
00309             // This server has already been contacted successfully.
00310             continue;
00311          }
00312          if ( (0 != m_servers[i]) && (0 < msgs[i].length()) )
00313          {
00314             try
00315             {
00316                m_servers[i]->sendTo( msgs[i], rets[i] );
00317                if ( rets[i].errorCode() != RemoteServerReturn::kNone )
00318                {
00319                   ++nerror;
00320                }
00321                if ( 0 != pRI )
00322                {
00323                   // We only count success when a response is received.
00324                   rets[i].setReturnCode(RemoteServerReturn::kFailure);
00325                }
00326             }
00327             catch ( ... )
00328             {
00329                ++nerror;
00330                delete m_servers[i];
00331                m_servers[i] = 0;
00332             }
00333          }
00334          // If we have a message but the server's not there, it's an error.
00335          else if ( ( 0 < msgs[i].length() ) && ( 0 == m_servers[i] ) )
00336          {
00337             // We don't have a connection to this server, but do we
00338             // actually need one at this point?
00339             if ( ( rets[i].returnCode() != RemoteServerReturn::kSuccess ) &&
00340                  ( rets[i].errorCode() != RemoteServerReturn::kNone ) )
00341             {
00342                ++nerror;
00343             }
00344          }
00345             
00346       }
00347       // Old semantics
00348       //return ( nerror <= faultsTolerated() );
00349       if ( ownKey ) delete [] key;
00350       return ( 0 == nerror );
00351    }
00352 
00358    template<int N, int T>
00359    void
00360    StaticByzantineQuorumSystem<N,T>::poll(
00361       MessageDispatcher& requestDispatcher,
00362       QuorumDispatcher& responseDispatcher )
00363    {
00364       // bitmaps for select(2)
00365       fd_set read_set;
00366       fd_set write_set;
00367       fd_set error_set;
00368       FD_ZERO(&read_set);
00369       FD_ZERO(&write_set);
00370       FD_ZERO(&error_set);
00371 
00372       // loop over sockets to remote servers setting read, write, and
00373       // error bitmaps for select(2)
00374       int width = 0;
00375       for ( int i = 0 ; i < N ; ++i )
00376       {
00377          if ( 0 != m_servers[i] )
00378          {
00379             try
00380             {
00381                int skt = m_servers[i]->set_fd(&read_set, SocketBase::kRead);
00382                m_servers[i]->set_fd(&write_set, SocketBase::kWrite);
00383                m_servers[i]->set_fd(&error_set, SocketBase::kError);
00384                width = ( skt > width )? skt : width;
00385             }
00386             catch ( ... )
00387             {
00388                delete m_servers[i];
00389                m_servers[i] = 0;
00390                m_buffers[i].first = 0;
00391                if ( 0 != m_buffers[i].second )
00392                {
00393                   delete m_buffers[i].second;
00394                   m_buffers[i].second = 0;
00395                }
00396             }
00397          }
00398       }
00399 
00400       // we want select(2) to be non-blocking, so we set the timeout to 0
00401       struct timeval tv;
00402       tv.tv_sec = 0;
00403       tv.tv_usec = 0;
00404       if ( 0 > select(width+1,&read_set,&write_set,&error_set,&tv) )
00405       {
00406          // wrong exception to throw!
00407          throw QSESocketBaseReadFailed( __FILE__ , __LINE__ , 0 , errno );
00408       }
00409 
00410       // now check for data to be read
00411       for ( int i = 0 ; i < N ; ++i )
00412       {
00413          if ( 0 != m_servers[i] )
00414          {
00415             try
00416             {
00417                // is there data to read?
00418                if ( m_servers[i]->isset_fd(&read_set, SocketBase::kRead) )
00419                {
00420                   // we may have read a partial message previously
00421                   if ( 0 == m_buffers[i].second )
00422                   {
00423                      m_buffers[i].first = 0;
00424                      m_buffers[i].second = new Message;
00425                   }
00426                   // grab everything we can without blocking, up to the
00427                   // full length of a message -- this will append to an
00428                   // existing buffer
00429                   RemoteServerReturn rsReturn;
00430                   m_buffers[i].first =
00431                      m_servers[i]->receiveFrom( *(m_buffers[i].second),
00432                                                 rsReturn,
00433                                                 m_buffers[i].first );
00434                   // have we read the entire message?
00435                   if ( ( 0 == m_buffers[i].first ) &&
00436                        ( m_buffers[i].second->length() > 0 ) )
00437                   {
00438                      Message* msg = m_buffers[i].second;
00439                      // we no longer need the buffer
00440                      m_buffers[i].second = 0;
00441                      if ( m_responseTracker->check( msg->buffer(),
00442                                                     msg->length(),
00443                                                     i ) )
00444                      {
00445                         unsigned char* msgID = extractKey( msg->buffer() );
00446                         // We're done with msg -- all its data has been copied.
00447                         delete msg;
00448                         ResponseInfo* respInfo = (*m_responseTracker)(msgID);
00449                         // If we've collected enough responses, then dispatch
00450                         // the response information to the functional object
00451                         // passed to this method.
00452                         if ( ( respInfo->failures()  >  faultsTolerated() ) ||
00453                              ( respInfo->successes() >= quorumSize()      ) )
00454                         {
00455                            m_responseTracker->remove(msgID);
00456                            responseDispatcher( msgID, respInfo );
00457                            return;
00458                         }
00459                         // Now clean up the message ID, since no one else will.
00460                         delete [] msgID;
00461                      }
00462                      else
00463                      {
00464                         requestDispatcher( msg );
00465                      }
00466                   }
00467                }
00468                // used to manage internal state of the socket
00469                m_servers[i]->isset_fd(&write_set, SocketBase::kWrite);
00470                if ( m_servers[i]->isset_fd(&error_set, SocketBase::kError) )
00471                {
00472                }
00473             }
00474             catch ( ... )
00475             {
00476                delete m_servers[i];
00477                m_servers[i] = 0;
00478                m_buffers[i].first = 0;
00479                if ( 0 != m_buffers[i].second )
00480                {
00481                   delete m_buffers[i].second;
00482                   m_buffers[i].second = 0;
00483                }
00484             }
00485          }
00486       }
00487    }
00488 
00489    template<int N, int T>
00490    void
00491    StaticByzantineQuorumSystem<N,T>::setServer( unsigned int i ,
00492                                                 RemoteServer* server )
00493    {
00494       if ( i < N )
00495       {
00496          m_servers[i] = server;
00497       }
00498    }
00499 
00500    template<int N, int T>
00501    const RemoteServer*
00502    StaticByzantineQuorumSystem<N,T>::server(unsigned int i) const
00503    {
00504       if ( i < N )
00505       {
00506          return m_servers[i];
00507       }
00508       return 0;
00509    }
00510 
00511 
00515    template< int T >
00516    class SimpleStaticByzantineQuorumSystem :
00517          public StaticByzantineQuorumSystem< 3*T + 1 , T >
00518    {
00519       public :
00521          SimpleStaticByzantineQuorumSystem( ResponseTracker* aRT ) :
00522             StaticByzantineQuorumSystem<3*T+1,T>( aRT ) {}
00524          virtual ~SimpleStaticByzantineQuorumSystem() {}
00525    };
00526 
00527 }
00528 
00529 #endif /* __CODEX_QUORUM_STATICBYZANTINEQUORUMSYSTEM_H__ */

Generated on Wed Jun 2 16:32:56 2004 for COrnell Data EXchange (CODEX) by doxygen1.2.18