00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "MessageProcessor.h"
00019 #include "InitActivity.h"
00020 #include "InitEvent.h"
00021 #include "StateInfo.h"
00022 #include "CODEX_Server/ServerResponseEvent.h"
00023 #include "FailureEvent.h"
00024
00025 #include "timing.h"
00026
00027 using namespace CODEX_APSS;
00028
00029 MessageProcessor::MessageProcessor(
00030 CODEX_Events::DeadPileType& deadPile,
00031 CODEX_Events::QType& eventQueue,
00032 CODEX_Server::ServerResponseHandler* destination,
00033 CODEX_Server::UnicastRequestHandler* requestHandler ) :
00034 CODEX_Events::Activity( deadPile, eventQueue ),
00035 m_destination( destination ),
00036 m_requestHandler( requestHandler )
00037 {
00038 }
00039
00040 MessageProcessor::~MessageProcessor()
00041 {
00042 }
00043
00044 bool
00045 MessageProcessor::handler( RoutedMessageEvent< SignedInitMsg >& event )
00046 {
00047 #ifdef TIMING
00048 ActiveTimer.start();
00049 #endif
00050 StateInfo* stateInfo = StateInfo::instance();
00051 CODEX_Server::ServerState* serverState =
00052 CODEX_Server::ServerState::instance();
00053 const SignedInitMsg& signedMessage = event.message();
00054 const InitMsg& message = signedMessage.message();
00055 unsigned int coordinator = message.coordinator().value();
00056
00057
00058 ShareSetType shares;
00059 try
00060 {
00061 const LSType& labeledShares =
00062 stateInfo->sharing( message.label(), coordinator );
00063 shares.addShare( labeledShares.share() );
00064 }
00065 catch ( ... )
00066 {
00067 sendEvent( 0, event.source(), true );
00068 #ifdef TIMING
00069 ActiveTimer.stop();
00070 #endif
00071 return true;
00072 }
00073
00074
00075 SubshareList& sList = stateInfo->splittings( message.label() );
00076
00077 const unsigned int NumShares = ShareType::NumShares;
00078
00079
00080
00081 typedef CODEX_VSS::ShareSplitting< ShareType > SplitType;
00082 unsigned int num = message.label().num().value();
00083 if ( sList.size() != NumShares )
00084 {
00085 sList.clear();
00086 sList.resize( NumShares );
00087 const CODEX_VSS::Range& range = stateInfo->subshareRange(num);
00088 OneWay oneWay( stateInfo->witness(num).args() );
00089 for ( unsigned int i = 0 ; i < NumShares ; ++i )
00090 {
00091
00092
00093 if ( shares(i).initialized() )
00094 {
00095 ShareSetType sSet;
00096 SplitType::split( shares(i).value(), sSet, range );
00097 SublabelType sLabel( message.label(), i, sSet, oneWay );
00098 sList[i] = LabeledSet( sSet, sLabel );
00099 }
00100 }
00101 }
00102
00103
00104
00105 InitActivity* ia = new InitActivity( m_deadPile,
00106 m_queue,
00107 message.version().value(),
00108 coordinator,
00109 serverState->hostNum(),
00110 num,
00111 event.seqNum(),
00112 sList,
00113 m_destination,
00114 m_requestHandler );
00115
00116
00117 sendEvent( new InitEvent( this, ia ), event.source() );
00118 #ifdef TIMING
00119 ActiveTimer.stop();
00120 #endif
00121 return true;
00122 }
00123
00124 bool
00125 MessageProcessor::handler( RoutedMessageEvent< SignedEstablishMsg >& event )
00126 {
00127 #ifdef TIMING
00128 ActiveTimer.start();
00129 #endif
00130 StateInfo* stateInfo = StateInfo::instance();
00131 CODEX_Server::ServerState* serverState =
00132 CODEX_Server::ServerState::instance();
00133 const SignedEstablishMsg& signedMessage = event.message();
00134 const EstablishMsg& message = signedMessage.message();
00135 unsigned int establisher = message.establisher().value();
00136
00137 BIGNUM * digest = 0;
00138 try
00139 {
00140
00141
00142
00143 EstablishedMsg response( message );
00144
00145 const CODEX_Ciphers::HashFunction& hashFunc = serverState->hashFunc();
00146 digest = response.digest( hashFunc );
00147 const CODEX_Ciphers::RSAPrivateKey& key = serverState->privateKey();
00148 CODEX_Ciphers::RSASignature* signature = key.sign( digest );
00149 BN_free(digest);
00150 digest = 0;
00151 if ( 0 == signature )
00152 {
00153 sendEvent( 0, event.source(), true );
00154 #ifdef TIMING
00155 ActiveTimer.stop();
00156 #endif
00157 return true;
00158 }
00159 SignedEstablishedMsg signedResponse( response, *signature );
00160 delete signature;
00161
00162 stateInfo->addEstablishResponse( signedResponse );
00163
00164
00165 CODEX_Quorum::Message m;
00166 unsigned char server[CODEX_Server::ServerState::nSID];
00167 memcpy( server, event.server(), CODEX_Server::ServerState::nSID );
00168
00169 server[0] &= ~(CODEX_Server::ServerState::OutgoingMask);
00170 int serverNum = server[0];
00171
00172
00173 m.fill( server, CODEX_Server::ServerState::nSID );
00174 m.fill( event.seqNum(), CODEX_Server::ServerState::nMID );
00175 m.fill( stateInfo->domain() );
00176 m.fill( kEstablishedMsg | SignatureMask );
00177 int length = signedResponse.marshal(0);
00178 unsigned char* buffer = new unsigned char[length];
00179 unsigned char* pBuffer = buffer;
00180 signedResponse.marshal(&pBuffer);
00181 m.fill( buffer, length );
00182 delete [] buffer;
00183
00184 sendEvent( new CODEX_Server::ServerResponseEvent( this,
00185 m_destination,
00186 m,
00187 serverNum ),
00188 event.source() );
00189 #ifdef TIMING
00190 ActiveTimer.stop();
00191 #endif
00192 return true;
00193 }
00194 catch ( ... )
00195 {
00196 if ( 0 != digest ) BN_free( digest );
00197 sendEvent( 0, event.source(), true );
00198 #ifdef TIMING
00199 ActiveTimer.stop();
00200 #endif
00201 return true;
00202 }
00203 }
00204
00205 bool
00206 MessageProcessor::handler( RoutedMessageEvent< SignedComputeMsg >& event )
00207 {
00208 #ifdef TIMING
00209 ActiveTimer.start();
00210 #endif
00211 StateInfo* stateInfo = StateInfo::instance();
00212 CODEX_Server::ServerState* serverState =
00213 CODEX_Server::ServerState::instance();
00214 const SignedComputeMsg& signedMessage = event.message();
00215 const ComputeMsg& message = signedMessage.message();
00216 unsigned int coordinator = message.coordinator().value();
00217 unsigned int version =
00218 message.subshareLabel(0).label().version().value() + 1;
00219 unsigned int num = message.subshareLabel(0).label().num().value();
00220 unsigned int self = serverState->hostNum();
00221
00222
00223 if ( version <= stateInfo->version(num) )
00224 {
00225 sendEvent( 0, event.source(), true );
00226 #ifdef TIMING
00227 ActiveTimer.stop();
00228 #endif
00229 return true;
00230 }
00231
00232
00233
00234
00235 const ShareSetType* subsharesets[ ShareType::NumShares ];
00236 bool needRecovery = false;
00237 for ( unsigned int i = 0 ; i < ShareType::NumShares ; ++i )
00238 {
00239 try
00240 {
00241 subsharesets[i] =
00242 &(stateInfo->subsharing( message.subshareLabel(i), coordinator ));
00243 for ( unsigned int j = 0 ; j < ShareType::NumShares ; ++j )
00244 {
00245 if ( ( ! (*subsharesets[i])(j).initialized() ) &&
00246 subsharesets[i]->shouldHave( j, self ) )
00247 {
00248
00249
00250 needRecovery = true;
00251 stateInfo->recover( message.subshareLabel(i) );
00252 break;
00253 }
00254 }
00255 }
00256 catch ( CODEX_Server::KeySharesNotFoundException& )
00257 {
00258 needRecovery = true;
00259 stateInfo->recover( message.subshareLabel(i) );
00260 }
00261 }
00262 if ( needRecovery )
00263 {
00264
00265
00266 #ifdef TIMING
00267 ActiveTimer.stop();
00268 #endif
00269 return false;
00270 }
00271
00272
00273 ShareSetType newShareSet;
00274 LabelType::VType::ValueType labelVals[ ShareType::NumShares ];
00275 OneWay oneWay( stateInfo->witness(num).args() );
00276 LabelType::VType vr( oneWay );
00277 for ( unsigned int i = 0 ; i < ShareType::NumShares ; ++i )
00278 {
00279
00280 if ( newShareSet.shouldHave( i, self ) )
00281 {
00282
00283
00284 ShareSetType shareSet;
00285 for ( unsigned int j = 0 ; j < ShareType::NumShares ; ++j )
00286 {
00287 shareSet.setShare( j, (*subsharesets[j])(i) );
00288 }
00289
00290 ShareType::ValueType shareVal;
00291 shareSet.recover( shareVal );
00292 newShareSet.setShare( i, shareVal );
00293 }
00294
00295
00296 LabelType::VType::ValueType sublabelVals[ ShareType::NumShares ];
00297 for ( unsigned int j = 0 ; j < ShareType::NumShares ; ++j )
00298 {
00299 sublabelVals[j] = message.subshareLabel(j).vc(i);
00300 }
00301 vr( sublabelVals, labelVals[i] );
00302 }
00303 ShareType newShare( newShareSet, self );
00304 LabelType label( num, version, coordinator, labelVals );
00305
00306
00307 if ( ! label.verify( stateInfo->witness(num).witness(), oneWay ) )
00308 {
00309 sendEvent( 0, event.source(), true );
00310 #ifdef TIMING
00311 ActiveTimer.stop();
00312 #endif
00313 return true;
00314 }
00315 if ( ! label.check( newShare, oneWay ) )
00316 {
00317 sendEvent( 0, event.source(), true );
00318 #ifdef TIMING
00319 ActiveTimer.stop();
00320 #endif
00321 return true;
00322 }
00323
00324 LSType* newLabeledShare = new LSType( newShare, label );
00325 stateInfo->addSharing( newLabeledShare );
00326
00327
00328 ComputedMsg response( label, CODEX_ASN1::Integer(serverState->hostNum()) );
00329
00330
00331 BIGNUM * digest = 0;
00332 try
00333 {
00334 const CODEX_Ciphers::HashFunction& hashFunc = serverState->hashFunc();
00335 digest = response.digest( hashFunc );
00336 const CODEX_Ciphers::RSAPrivateKey& key = serverState->privateKey();
00337 CODEX_Ciphers::RSASignature* signature = key.sign( digest );
00338 BN_free(digest);
00339 digest = 0;
00340 if ( 0 == signature )
00341 {
00342 sendEvent( 0, event.source(), true );
00343 #ifdef TIMING
00344 ActiveTimer.stop();
00345 #endif
00346 return true;
00347 }
00348 SignedComputedMsg signedResponse( response, *signature );
00349 delete signature;
00350
00351
00352 stateInfo->addComputeResponse( signedResponse );
00353
00354
00355 CODEX_Quorum::Message m;
00356 unsigned char server[CODEX_Server::ServerState::nSID];
00357 memcpy( server, event.server(), CODEX_Server::ServerState::nSID );
00358
00359 server[0] &= ~(CODEX_Server::ServerState::OutgoingMask);
00360 int serverNum = server[0];
00361
00362
00363 m.fill( server, CODEX_Server::ServerState::nSID );
00364 m.fill( event.seqNum(), CODEX_Server::ServerState::nMID );
00365 m.fill( stateInfo->domain() );
00366 m.fill( kComputedMsg | SignatureMask );
00367 int length = signedResponse.marshal(0);
00368 unsigned char* buffer = new unsigned char[length];
00369 unsigned char* pBuffer = buffer;
00370 signedResponse.marshal(&pBuffer);
00371 m.fill( buffer, length );
00372 delete [] buffer;
00373
00374 sendEvent( new CODEX_Server::ServerResponseEvent( this,
00375 m_destination,
00376 m,
00377 serverNum ),
00378 event.source() );
00379 #ifdef TIMING
00380 ActiveTimer.stop();
00381 #endif
00382 return true;
00383 }
00384 catch ( ... )
00385 {
00386 if ( 0 != digest ) BN_free( digest );
00387 sendEvent( 0, event.source(), true );
00388 #ifdef TIMING
00389 ActiveTimer.stop();
00390 #endif
00391 return true;
00392 }
00393 }
00394
00395 bool
00396 MessageProcessor::handler( RoutedMessageEvent< SignedRecoverMsg >& event )
00397 {
00398 #ifdef TIMING
00399 ActiveTimer.start();
00400 #endif
00401 StateInfo* stateInfo = StateInfo::instance();
00402 CODEX_Server::ServerState* serverState =
00403 CODEX_Server::ServerState::instance();
00404 const SignedRecoverMsg& signedMessage = event.message();
00405 const RecoverMsg& message = signedMessage.message();
00406 unsigned int requester = message.requester().value();
00407
00408 BIGNUM * digest = 0;
00409 try
00410 {
00411
00412 const ShareSetType& shareset =
00413 stateInfo->subsharing( message.sublabel(), requester );
00414
00415
00416 unsigned char server[CODEX_Server::ServerState::nSID];
00417 memcpy( server, event.server(), CODEX_Server::ServerState::nSID );
00418
00419 server[0] &= ~(CODEX_Server::ServerState::OutgoingMask);
00420 int serverNum = server[0];
00421 ShareType shares( shareset, serverNum );
00422
00423
00424 RecoveredMsg response( message, shares );
00425
00426
00427 const CODEX_Ciphers::HashFunction& hashFunc = serverState->hashFunc();
00428 digest = response.digest( hashFunc );
00429 const CODEX_Ciphers::RSAPrivateKey& key = serverState->privateKey();
00430 CODEX_Ciphers::RSASignature* signature = key.sign( digest );
00431 BN_free(digest);
00432 digest = 0;
00433 if ( 0 == signature )
00434 {
00435 sendEvent( 0, event.source(), true );
00436 #ifdef TIMING
00437 ActiveTimer.stop();
00438 #endif
00439 return true;
00440 }
00441 SignedRecoveredMsg signedResponse( response, *signature );
00442 delete signature;
00443
00444
00445 CODEX_Quorum::Message m;
00446
00447
00448 m.fill( server, CODEX_Server::ServerState::nSID );
00449 m.fill( event.seqNum(), CODEX_Server::ServerState::nMID );
00450 m.fill( stateInfo->domain() );
00451 m.fill( kRecoveredMsg | SignatureMask );
00452 int length = signedResponse.marshal(0);
00453 unsigned char* buffer = new unsigned char[length];
00454 unsigned char* pBuffer = buffer;
00455 signedResponse.marshal(&pBuffer);
00456 m.fill( buffer, length );
00457 delete [] buffer;
00458
00459 sendEvent( new CODEX_Server::ServerResponseEvent( this,
00460 m_destination,
00461 m,
00462 serverNum ),
00463 event.source() );
00464 #ifdef TIMING
00465 ActiveTimer.stop();
00466 #endif
00467 return true;
00468 }
00469 catch ( CODEX_Server::KeySharesNotFoundException& )
00470 {
00471 if ( 0 != digest ) BN_free( digest );
00472 sendEvent( new FailureEvent( this, m_destination, event ),
00473 event.source(), true );
00474 #ifdef TIMING
00475 ActiveTimer.stop();
00476 #endif
00477 return true;
00478 }
00479 catch ( ... )
00480 {
00481 if ( 0 != digest ) BN_free( digest );
00482 sendEvent( 0, event.source(), true );
00483 #ifdef TIMING
00484 ActiveTimer.stop();
00485 #endif
00486 return true;
00487 }
00488 }
00489
00490 bool
00491 MessageProcessor::handler( RoutedMessageEvent< SignedFinishedMsg >& event )
00492 {
00493
00494
00495 return true;
00496 }