Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

MessageProcessor.cc

00001 /*
00002  * Copyright 2003 Michael A. Marsh, Cornell University. All rights reserved.
00003  * This software is released under the modified BSD license.
00004  * See the file LICENSE in the top-level directory for details.
00005  */
00006 //
00007 // $Id: MessageProcessor.cc,v 1.3 2004/05/19 15:56:24 mmarsh Exp $
00008 //
00009 // $Log: MessageProcessor.cc,v $
00010 // Revision 1.3  2004/05/19 15:56:24  mmarsh
00011 // Added copyright and license statements.
00012 //
00013 // Revision 1.2  2003/11/04 22:31:45  mmarsh
00014 // *** empty log message ***
00015 //
00016 //
00017 
00018 #include "MessageProcessor.h"
00019 #include "InitActivity.h"
00020 #include "InitEvent.h"
00021 #include "StateInfo.h"
00022 #include "CODEX_Server/ServerResponseEvent.h"
00023 #include "FailureEvent.h"
00024 
00025 #include "timing.h"
00026 
00027 using namespace CODEX_APSS;
00028 
00029 MessageProcessor::MessageProcessor(
00030    CODEX_Events::DeadPileType& deadPile,
00031    CODEX_Events::QType& eventQueue,
00032    CODEX_Server::ServerResponseHandler* destination,
00033    CODEX_Server::UnicastRequestHandler* requestHandler ) :
00034    CODEX_Events::Activity( deadPile, eventQueue ),
00035    m_destination( destination ),
00036    m_requestHandler( requestHandler )
00037 {
00038 }
00039 
00040 MessageProcessor::~MessageProcessor()
00041 {
00042 }
00043 
00044 bool
00045 MessageProcessor::handler( RoutedMessageEvent< SignedInitMsg >& event )
00046 {
00047 #ifdef TIMING
00048    ActiveTimer.start();
00049 #endif
00050    StateInfo* stateInfo = StateInfo::instance();
00051    CODEX_Server::ServerState* serverState =
00052       CODEX_Server::ServerState::instance();
00053    const SignedInitMsg& signedMessage = event.message();
00054    const InitMsg& message = signedMessage.message();
00055    unsigned int coordinator = message.coordinator().value();
00056 
00057    // Get the shares for this label.
00058    ShareSetType shares;
00059    try
00060    {
00061       const LSType& labeledShares =
00062          stateInfo->sharing( message.label(), coordinator );
00063       shares.addShare( labeledShares.share() );
00064    }
00065    catch ( ... )
00066    {
00067       sendEvent( 0, event.source(), true ); // send a NACK
00068 #ifdef TIMING
00069       ActiveTimer.stop();
00070 #endif
00071       return true;
00072    }
00073 
00074    // Get the list of subsharings for this label.
00075    SubshareList& sList = stateInfo->splittings( message.label() );
00076 
00077    const unsigned int NumShares = ShareType::NumShares;
00078 
00079    // If there is not a subsharing for each share, we must generate
00080    // new subsharings.
00081    typedef CODEX_VSS::ShareSplitting< ShareType >  SplitType;
00082    unsigned int num = message.label().num().value();
00083    if ( sList.size() != NumShares )
00084    {
00085       sList.clear();
00086       sList.resize( NumShares );
00087       const CODEX_VSS::Range& range = stateInfo->subshareRange(num);
00088       OneWay oneWay( stateInfo->witness(num).args() );
00089       for ( unsigned int i = 0 ; i < NumShares ; ++i )
00090       {
00091          // We only care about shares that we have _now_.  No recovery
00092          // is performed.
00093          if ( shares(i).initialized() )
00094          {
00095             ShareSetType sSet;
00096             SplitType::split( shares(i).value(), sSet, range );
00097             SublabelType sLabel( message.label(), i, sSet, oneWay );
00098             sList[i] = LabeledSet( sSet, sLabel );
00099          }
00100       }
00101    }
00102 
00103    // Create the Activity that will coordinate subshare distribution.
00104    // We don't need to keep the pointer anywhere, since it's self-closing.
00105    InitActivity* ia = new InitActivity( m_deadPile,
00106                                         m_queue,
00107                                         message.version().value(),
00108                                         coordinator,
00109                                         serverState->hostNum(),
00110                                         num,
00111                                         event.seqNum(),
00112                                         sList,
00113                                         m_destination,
00114                                         m_requestHandler );
00115 
00116    // Send the event.
00117    sendEvent( new InitEvent( this, ia ), event.source() );
00118 #ifdef TIMING
00119    ActiveTimer.stop();
00120 #endif
00121    return true;
00122 }
00123 
00124 bool
00125 MessageProcessor::handler( RoutedMessageEvent< SignedEstablishMsg >& event )
00126 {
00127 #ifdef TIMING
00128    ActiveTimer.start();
00129 #endif
00130    StateInfo* stateInfo = StateInfo::instance();
00131    CODEX_Server::ServerState* serverState =
00132       CODEX_Server::ServerState::instance();
00133    const SignedEstablishMsg& signedMessage = event.message();
00134    const EstablishMsg& message = signedMessage.message();
00135    unsigned int establisher = message.establisher().value();
00136 
00137    BIGNUM * digest = 0;
00138    try
00139    {
00140       // The subshares are stored when the message is stored in the
00141       // verifier.  We just need to compose the response, cache it,
00142       // and respond.
00143       EstablishedMsg response( message );
00144 
00145       const CODEX_Ciphers::HashFunction& hashFunc = serverState->hashFunc();
00146       digest = response.digest( hashFunc );
00147       const CODEX_Ciphers::RSAPrivateKey& key = serverState->privateKey();
00148       CODEX_Ciphers::RSASignature* signature = key.sign( digest );
00149       BN_free(digest);
00150       digest = 0;
00151       if ( 0 == signature )
00152       {
00153          sendEvent( 0, event.source(), true ); // NACK
00154 #ifdef TIMING
00155          ActiveTimer.stop();
00156 #endif
00157          return true;
00158       }
00159       SignedEstablishedMsg signedResponse( response, *signature );
00160       delete signature;
00161 
00162       stateInfo->addEstablishResponse( signedResponse );
00163 
00164       // Send the response.
00165       CODEX_Quorum::Message m;
00166       unsigned char server[CODEX_Server::ServerState::nSID];
00167       memcpy( server, event.server(), CODEX_Server::ServerState::nSID );
00168       // Make sure the "outgoing" mask isn't set.
00169       server[0] &= ~(CODEX_Server::ServerState::OutgoingMask);
00170       int serverNum = server[0];
00171 
00172       // Fill message headers.
00173       m.fill( server, CODEX_Server::ServerState::nSID );
00174       m.fill( event.seqNum(), CODEX_Server::ServerState::nMID );
00175       m.fill( stateInfo->domain() );
00176       m.fill( kEstablishedMsg | SignatureMask );
00177       int length = signedResponse.marshal(0);
00178       unsigned char* buffer = new unsigned char[length];
00179       unsigned char* pBuffer = buffer;
00180       signedResponse.marshal(&pBuffer);
00181       m.fill( buffer, length );
00182       delete [] buffer;
00183 
00184       sendEvent( new CODEX_Server::ServerResponseEvent( this,
00185                                                         m_destination,
00186                                                         m,
00187                                                         serverNum ),
00188                  event.source() );
00189 #ifdef TIMING
00190       ActiveTimer.stop();
00191 #endif
00192       return true;
00193    }
00194    catch ( ... )
00195    {
00196       if ( 0 != digest ) BN_free( digest );
00197       sendEvent( 0, event.source(), true ); // send a NACK
00198 #ifdef TIMING
00199       ActiveTimer.stop();
00200 #endif
00201       return true;
00202    }
00203 }
00204 
00205 bool
00206 MessageProcessor::handler( RoutedMessageEvent< SignedComputeMsg >& event )
00207 {
00208 #ifdef TIMING
00209    ActiveTimer.start();
00210 #endif
00211    StateInfo* stateInfo = StateInfo::instance();
00212    CODEX_Server::ServerState* serverState =
00213       CODEX_Server::ServerState::instance();
00214    const SignedComputeMsg& signedMessage = event.message();
00215    const ComputeMsg& message = signedMessage.message();
00216    unsigned int coordinator = message.coordinator().value();
00217    unsigned int version =
00218       message.subshareLabel(0).label().version().value() + 1;
00219    unsigned int num = message.subshareLabel(0).label().num().value();
00220    unsigned int self = serverState->hostNum();
00221 
00222    // Sanity check -- have we moved past this version already?
00223    if ( version <= stateInfo->version(num) )
00224    {
00225       sendEvent( 0, event.source(), true ); // send a NACK
00226 #ifdef TIMING
00227       ActiveTimer.stop();
00228 #endif
00229       return true;
00230    }
00231 
00232    // Check for missing subshares.  Initiate subshare recovery if necessary.
00233    //   This will return false, not true, since the request still needs
00234    //   to be handled.
00235    const ShareSetType* subsharesets[ ShareType::NumShares ];
00236    bool needRecovery = false;
00237    for ( unsigned int i = 0 ; i < ShareType::NumShares ; ++i )
00238    {
00239       try
00240       {
00241          subsharesets[i] =
00242             &(stateInfo->subsharing( message.subshareLabel(i), coordinator ));
00243          for ( unsigned int j = 0 ; j < ShareType::NumShares ; ++j )
00244          {
00245             if ( ( ! (*subsharesets[i])(j).initialized() ) &&
00246                  subsharesets[i]->shouldHave( j, self ) )
00247             {
00248                // At this point we know that recovery must be performed for
00249                // subshare label i.
00250                needRecovery = true;
00251                stateInfo->recover( message.subshareLabel(i) );
00252                break;
00253             }
00254          }
00255       }
00256       catch ( CODEX_Server::KeySharesNotFoundException& )
00257       {
00258          needRecovery = true;
00259          stateInfo->recover( message.subshareLabel(i) );
00260       }
00261    }
00262    if ( needRecovery )
00263    {
00264       // At least one subsharing is being recovered, so stop processing
00265       // for now.
00266 #ifdef TIMING
00267       ActiveTimer.stop();
00268 #endif
00269       return false;
00270    }
00271 
00272    // Compute the new shares and label
00273    ShareSetType newShareSet;
00274    LabelType::VType::ValueType labelVals[ ShareType::NumShares ];
00275    OneWay oneWay( stateInfo->witness(num).args() );
00276    LabelType::VType vr( oneWay );
00277    for ( unsigned int i = 0 ; i < ShareType::NumShares ; ++i )
00278    {
00279       // Compute the share
00280       if ( newShareSet.shouldHave( i, self ) )
00281       {
00282          // We have already ensured that if a shareset *should* have i,
00283          // then it *does*.
00284          ShareSetType shareSet;
00285          for ( unsigned int j = 0 ; j < ShareType::NumShares ; ++j )
00286          {
00287             shareSet.setShare( j, (*subsharesets[j])(i) );
00288          }
00289          // Compute this share
00290          ShareType::ValueType shareVal;
00291          shareSet.recover( shareVal );
00292          newShareSet.setShare( i, shareVal );
00293       }
00294 
00295       // Compute the label
00296       LabelType::VType::ValueType sublabelVals[ ShareType::NumShares ];
00297       for ( unsigned int j = 0 ; j < ShareType::NumShares ; ++j )
00298       {
00299          sublabelVals[j] = message.subshareLabel(j).vc(i);
00300       }
00301       vr( sublabelVals, labelVals[i] );
00302    }
00303    ShareType newShare( newShareSet, self );
00304    LabelType label( num, version, coordinator, labelVals );
00305 
00306    // Make sure the label is valid.
00307    if ( ! label.verify( stateInfo->witness(num).witness(), oneWay ) )
00308    {
00309       sendEvent( 0, event.source(), true ); // send a NACK
00310 #ifdef TIMING
00311       ActiveTimer.stop();
00312 #endif
00313       return true;
00314    }
00315    if ( ! label.check( newShare, oneWay ) )
00316    {
00317       sendEvent( 0, event.source(), true ); // send a NACK
00318 #ifdef TIMING
00319       ActiveTimer.stop();
00320 #endif
00321       return true;
00322    }
00323 
00324    LSType* newLabeledShare = new LSType( newShare, label );
00325    stateInfo->addSharing( newLabeledShare );
00326 
00327    // Compose the ComputedMsg response
00328    ComputedMsg response( label, CODEX_ASN1::Integer(serverState->hostNum()) );
00329 
00330    // Sign the response
00331    BIGNUM * digest = 0;
00332    try
00333    {
00334       const CODEX_Ciphers::HashFunction& hashFunc = serverState->hashFunc();
00335       digest = response.digest( hashFunc );
00336       const CODEX_Ciphers::RSAPrivateKey& key = serverState->privateKey();
00337       CODEX_Ciphers::RSASignature* signature = key.sign( digest );
00338       BN_free(digest);
00339       digest = 0;
00340       if ( 0 == signature )
00341       {
00342          sendEvent( 0, event.source(), true ); // NACK
00343 #ifdef TIMING
00344          ActiveTimer.stop();
00345 #endif
00346          return true;
00347       }
00348       SignedComputedMsg signedResponse( response, *signature );
00349       delete signature;
00350 
00351       // Cache the signed response
00352       stateInfo->addComputeResponse( signedResponse );
00353 
00354       // Send the signed response
00355       CODEX_Quorum::Message m;
00356       unsigned char server[CODEX_Server::ServerState::nSID];
00357       memcpy( server, event.server(), CODEX_Server::ServerState::nSID );
00358       // Make sure the "outgoing" mask isn't set.
00359       server[0] &= ~(CODEX_Server::ServerState::OutgoingMask);
00360       int serverNum = server[0];
00361 
00362       // Fill message headers.
00363       m.fill( server, CODEX_Server::ServerState::nSID );
00364       m.fill( event.seqNum(), CODEX_Server::ServerState::nMID );
00365       m.fill( stateInfo->domain() );
00366       m.fill( kComputedMsg | SignatureMask );
00367       int length = signedResponse.marshal(0);
00368       unsigned char* buffer = new unsigned char[length];
00369       unsigned char* pBuffer = buffer;
00370       signedResponse.marshal(&pBuffer);
00371       m.fill( buffer, length );
00372       delete [] buffer;
00373 
00374       sendEvent( new CODEX_Server::ServerResponseEvent( this,
00375                                                         m_destination,
00376                                                         m,
00377                                                         serverNum ),
00378                  event.source() );
00379 #ifdef TIMING
00380       ActiveTimer.stop();
00381 #endif
00382       return true;
00383    }
00384    catch ( ... )
00385    {
00386       if ( 0 != digest ) BN_free( digest );
00387       sendEvent( 0, event.source(), true ); // send a NACK
00388 #ifdef TIMING
00389       ActiveTimer.stop();
00390 #endif
00391       return true;
00392    }
00393 }
00394 
00395 bool
00396 MessageProcessor::handler( RoutedMessageEvent< SignedRecoverMsg >& event )
00397 {
00398 #ifdef TIMING
00399    ActiveTimer.start();
00400 #endif
00401    StateInfo* stateInfo = StateInfo::instance();
00402    CODEX_Server::ServerState* serverState =
00403       CODEX_Server::ServerState::instance();
00404    const SignedRecoverMsg& signedMessage = event.message();
00405    const RecoverMsg& message = signedMessage.message();
00406    unsigned int requester = message.requester().value();
00407 
00408    BIGNUM * digest = 0;
00409    try
00410    {
00411       // Get the subsharing.
00412       const ShareSetType& shareset =
00413          stateInfo->subsharing( message.sublabel(), requester );
00414 
00415       // Get the subset for the requesting server.
00416       unsigned char server[CODEX_Server::ServerState::nSID];
00417       memcpy( server, event.server(), CODEX_Server::ServerState::nSID );
00418       // Make sure the "outgoing" mask isn't set.
00419       server[0] &= ~(CODEX_Server::ServerState::OutgoingMask);
00420       int serverNum = server[0];
00421       ShareType shares( shareset, serverNum );
00422 
00423       // Compose the response
00424       RecoveredMsg response( message, shares );
00425 
00426       // Sign the response
00427       const CODEX_Ciphers::HashFunction& hashFunc = serverState->hashFunc();
00428       digest = response.digest( hashFunc );
00429       const CODEX_Ciphers::RSAPrivateKey& key = serverState->privateKey();
00430       CODEX_Ciphers::RSASignature* signature = key.sign( digest );
00431       BN_free(digest);
00432       digest = 0;
00433       if ( 0 == signature )
00434       {
00435          sendEvent( 0, event.source(), true ); // NACK
00436 #ifdef TIMING
00437          ActiveTimer.stop();
00438 #endif
00439          return true;
00440       }
00441       SignedRecoveredMsg signedResponse( response, *signature );
00442       delete signature;
00443 
00444       // Send the signed response.
00445       CODEX_Quorum::Message m;
00446 
00447       // Fill message headers.
00448       m.fill( server, CODEX_Server::ServerState::nSID );
00449       m.fill( event.seqNum(), CODEX_Server::ServerState::nMID );
00450       m.fill( stateInfo->domain() );
00451       m.fill( kRecoveredMsg | SignatureMask );
00452       int length = signedResponse.marshal(0);
00453       unsigned char* buffer = new unsigned char[length];
00454       unsigned char* pBuffer = buffer;
00455       signedResponse.marshal(&pBuffer);
00456       m.fill( buffer, length );
00457       delete [] buffer;
00458 
00459       sendEvent( new CODEX_Server::ServerResponseEvent( this,
00460                                                         m_destination,
00461                                                         m,
00462                                                         serverNum ),
00463                  event.source() );
00464 #ifdef TIMING
00465       ActiveTimer.stop();
00466 #endif
00467       return true;
00468    }
00469    catch ( CODEX_Server::KeySharesNotFoundException& )
00470    {
00471       if ( 0 != digest ) BN_free( digest );
00472       sendEvent( new FailureEvent( this, m_destination, event ),
00473                  event.source(), true ); // send a NACK
00474 #ifdef TIMING
00475       ActiveTimer.stop();
00476 #endif
00477       return true;
00478    }
00479    catch ( ... )
00480    {
00481       if ( 0 != digest ) BN_free( digest );
00482       sendEvent( 0, event.source(), true ); // send a NACK
00483 #ifdef TIMING
00484       ActiveTimer.stop();
00485 #endif
00486       return true;
00487    }
00488 }
00489 
00490 bool
00491 MessageProcessor::handler( RoutedMessageEvent< SignedFinishedMsg >& event )
00492 {
00493    // Everything that needs to be done with a FinishedMsg should have
00494    // been done at verification time.  No response is needed.
00495    return true;
00496 }

Generated on Fri May 6 17:40:43 2005 for COrnell Data EXchange (CODEX) by  doxygen 1.4.1