00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include "simlpy/interpreter_defs.h"
00032 #include "simlpy/interpreter_hooks.h"
00033
00034 #include "EpidemicStore.h"
00035 #include "dtn/Bundle.h"
00036 #include "simlpy/Clock.h"
00037
00038 #include <iostream>
00039
00040 using namespace Epidemic;
00041
00042
00043
00044
00045 EpidemicBundle::EpidemicBundle( DTN::Bundle* b ) :
00046 m_bundle( b ),
00047 m_timeAdded( Clock::time() ),
00048 m_numTries( 0 )
00049 {
00050 if ( 0 != b )
00051 {
00052 m_originator = b->source();
00053 m_seqNum = b->seqNum();
00054 m_digest = EpidemicStore::digest(*b);
00055 }
00056 }
00057
00058 EpidemicBundle::EpidemicBundle( const DTN::ByteString& originator,
00059 uint32_t seqNum ) :
00060 m_bundle( 0 ),
00061 m_originator( originator ),
00062 m_seqNum( seqNum ),
00063 m_numTries( 0 )
00064 {
00065 m_digest = EpidemicStore::digest(originator,seqNum);
00066 }
00067
00068 EpidemicBundle::EpidemicBundle( const DTN::ByteString& digest ) :
00069 m_bundle( 0 ),
00070 m_numTries( 0 ),
00071 m_digest( digest )
00072 {
00073 }
00074
00075 EpidemicBundle::~EpidemicBundle()
00076 {
00077 }
00078
00079 DTN::Bundle*
00080 EpidemicBundle::bundle() const
00081 {
00082 return m_bundle;
00083 }
00084
00085 bool
00086 EpidemicBundle::operator<( const EpidemicBundle& b ) const
00087 {
00088 return m_digest < b.m_digest ;
00089 }
00090
00091
00092
00093
00094
00095
00096 EpidemicStore::EpidemicStore( DTN::Node* owner, uint32_t h ) :
00097 DTN::PersistentBundleStore(owner),
00098 m_max_hops(h),
00099 m_bytesUsed(0)
00100 {
00101 }
00102
00103 EpidemicStore::~EpidemicStore()
00104 {
00105 }
00106
00107 DTN::BundlePointer
00108 EpidemicStore::p_addBundle( DTN::Bundle* b )
00109 {
00110 if ( 0 == b ) return DTN::BundlePointer();
00111
00112
00113 if ( m_history.end() != m_history.find( digest(*b) ) )
00114 return DTN::BundlePointer();
00115
00116
00117 std::pair< iterator , bool > r = m_list.insert( b );
00118 if ( r.second )
00119 {
00120 m_bytesUsed += b->size();
00121
00122
00123 m_history.insert( r.first->digest() );
00124
00125 if ( ( 0 == m_max_hops ) || ( b->hopCount() < m_max_hops ) )
00126 {
00127 m_digestMap.insert( DigestMap::value_type( r.first->digest(),
00128 r.first ) );
00129 }
00130 else
00131 {
00132 m_destMap[b->destination()].push_back( r.first );
00133 }
00134 return DTN::BundlePointer( new EpidemicStoreItr(*this,r.first) );
00135 }
00136 return DTN::BundlePointer();
00137 }
00138
00139 void
00140 EpidemicStore::p_deleteBundle( DTN::Bundle* b )
00141 {
00142 if ( 0 == b ) return;
00143
00144
00145 m_digestMap.erase( digest(*b) );
00146
00147
00148
00149
00150 DestMap::iterator ditr = m_destMap.find( b->destination() );
00151 if ( m_destMap.end() != ditr )
00152 {
00153
00154
00155
00156 DestList::iterator itr = ditr->second.begin();
00157 DestList::iterator end = ditr->second.end();
00158 for ( ; itr != end ; ++itr )
00159 {
00160
00161
00162 if ( (*itr)->bundle() == b )
00163 {
00164
00165
00166 ditr->second.erase(itr);
00167
00168
00169
00170
00171 if ( 0 == ditr->second.size() )
00172 {
00173 m_destMap.erase(ditr);
00174 }
00175
00176
00177
00178
00179
00180
00181 break;
00182 }
00183 }
00184 }
00185
00186
00187
00188 if ( 0 != m_list.erase( b ) )
00189 {
00190 m_bytesUsed -= b->size();
00191 }
00192 }
00193
00194 void
00195 EpidemicStore::consume( DTN::Bundle* b )
00196 {
00197 if ( 0 == b ) return;
00198
00199
00200 if ( DTN::Bundle::kACK & b->type() ) return;
00201
00202
00203 if ( ! (DTN::Bundle::kCustodial & b->type()) ) return;
00204
00205 m_history.insert( digest(*b) );
00206 }
00207
00208 DTN::BundlePointer
00209 EpidemicStore::p_getPointer( const DTN::BundlePointer& p )
00210 {
00211 if ( p.isNull() )
00212 {
00213 if ( m_list.empty() )
00214 {
00215 return DTN::BundlePointer();
00216 }
00217 return DTN::BundlePointer(
00218 new EpidemicStoreItr(*this,m_list.begin()) );
00219 }
00220 iterator itr = m_list.lower_bound( p.repr()->bundle() );
00221 if ( m_list.end() != itr )
00222 {
00223 return DTN::BundlePointer( new EpidemicStoreItr(*this,itr) );
00224 }
00225 return DTN::BundlePointer();
00226 }
00227
00228 DTN::BundlePointer
00229 EpidemicStore::p_getPointer( const DTN::ByteString& sender, uint32_t seqNum )
00230 {
00231 iterator itr = m_list.find( EpidemicBundle(sender,seqNum) );
00232 if ( m_list.end() == itr )
00233 {
00234 return DTN::BundlePointer();
00235 }
00236 return DTN::BundlePointer( new EpidemicStoreItr(*this,itr) );
00237 }
00238
00239 DTN::BundlePointer
00240 EpidemicStore::getPointer( const DTN::ByteString& digest )
00241 {
00242 clean();
00243 iterator itr = m_list.find( EpidemicBundle(digest) );
00244 if ( m_list.end() == itr )
00245 {
00246 return DTN::BundlePointer();
00247 }
00248 return DTN::BundlePointer( new EpidemicStoreItr(*this,itr) );
00249 }
00250
00251 bool
00252 EpidemicStore::p_validatePointer( const DTN::BundlePointer& p )
00253 {
00254 if ( p.isNull() ) return false;
00255 iterator itr = m_list.find( p.repr()->bundle() );
00256 if ( m_list.end() == itr ) return false;
00257 return true;
00258 }
00259
00260 void
00261 EpidemicStore::p_shrinkStore( size_t s )
00262 {
00263
00264
00265
00266
00267
00268 while ( m_bytesUsed > s )
00269 {
00270 if ( 0 == m_list.size() ) return;
00271 BundleList::reverse_iterator itr = m_list.rbegin();
00272 BundleList::reverse_iterator end = m_list.rend();
00273 BundleList::reverse_iterator last = itr;
00274 for ( ; itr != end ; ++itr )
00275 {
00276 if ( itr->timeAdded() > last->timeAdded() )
00277 {
00278 last = itr;
00279 }
00280 }
00281 m_owner->recordPersistentRemove( *(last->bundle()) );
00282 p_deleteBundle(last->bundle());
00283 delete last->bundle();
00284 }
00285 }
00286
00287 EpidemicStore::iterator
00288 EpidemicStore::begin()
00289 {
00290 return m_list.begin();
00291 }
00292
00293 EpidemicStore::iterator
00294 EpidemicStore::lower_bound( const key_type& x )
00295 {
00296 return m_list.lower_bound( x );
00297 }
00298
00299 EpidemicStore::iterator
00300 EpidemicStore::upper_bound( const key_type& x )
00301 {
00302 return m_list.upper_bound( x );
00303 }
00304
00305 EpidemicStore::iterator
00306 EpidemicStore::end()
00307 {
00308 return m_list.end();
00309 }
00310
00311 EpidemicStore::const_iterator
00312 EpidemicStore::begin() const
00313 {
00314 return m_list.begin();
00315 }
00316
00317 EpidemicStore::const_iterator
00318 EpidemicStore::lower_bound( const key_type& x ) const
00319 {
00320 return m_list.lower_bound( x );
00321 }
00322
00323 EpidemicStore::const_iterator
00324 EpidemicStore::upper_bound( const key_type& x ) const
00325 {
00326 return m_list.upper_bound( x );
00327 }
00328
00329 EpidemicStore::const_iterator
00330 EpidemicStore::end() const
00331 {
00332 return m_list.end();
00333 }
00334
00335 DTN::ByteString*
00336 EpidemicStore::summaryVector() const
00337 {
00338 DTN::ByteString* retval = new DTN::ByteString;
00339 if ( 0 == retval ) return 0;
00340 const_iterator itr = m_list.begin();
00341 const_iterator end = m_list.end();
00342 for ( ; itr != end ; ++itr )
00343 {
00345 retval->append( itr->digest() );
00346 }
00347 return retval;
00348 }
00349
00350 DTN::ByteString
00351 EpidemicStore::digest( const DTN::Bundle& b )
00352 {
00353 return digest( b.source(), b.seqNum() );
00354 }
00355
00356 DTN::ByteString
00357 EpidemicStore::digest( const DTN::ByteString& originator, uint32_t seqNum )
00358 {
00359 DTN::ByteString retval;
00360 unsigned char n = 4 + originator.length();
00361 retval.push_back(n);
00362 union
00363 {
00364 uint32_t i;
00365 unsigned char c[4];
00366 } seq;
00367 seq.i = seqNum;
00368 retval.append(seq.c,4);
00369 retval.append(originator);
00370 return retval;
00371 }
00372
00373 EpidemicStore::DigestList*
00374 EpidemicStore::parseSummaryVector( const DTN::ByteString& sv )
00375 {
00376 DigestList* retval = 0;
00377 if ( 0 == sv.length() ) return retval;
00378 retval = new DigestList;
00379 for ( unsigned int i = 0 ; i < sv.length() ; )
00380 {
00381 unsigned char c = sv[i];
00382 retval->push_back( DTN::ByteString(sv,i,c+1) );
00383 i += c+1;
00384 }
00385 return retval;
00386 }
00387
00388 void
00389 EpidemicStore::forwardLastHop( const DTN::ByteString& addr, DTN::Node& node )
00390 {
00391 DestMap::iterator ditr = m_destMap.find( addr );
00392 if ( m_destMap.end() == ditr ) return;
00393
00394 DestList::iterator itr = ditr->second.begin();
00395 DestList::iterator end = ditr->second.end();
00396 for ( ; itr != end ; ++itr )
00397 {
00398
00399
00400 DTN::Bundle* pB = (*itr)->bundle()->clone();
00401
00402 pB->recv() = addr;
00403
00404 node.forward(pB);
00405 }
00406 }
00407
00408 bool
00409 EpidemicStore::seen( const DTN::ByteString& digest )
00410 {
00411 return ( m_history.end() != m_history.find( digest ) );
00412 }
00413
00414
00415
00416
00417
00418 EpidemicStoreItr::EpidemicStoreItr(
00419 EpidemicStore& store,
00420 EpidemicStore::iterator itr ) :
00421 DTN::BundlePointerRepr(),
00422 m_store( store ),
00423 m_itr( itr )
00424 {
00425 }
00426
00427 EpidemicStoreItr::~EpidemicStoreItr()
00428 {
00429 }
00430
00431 bool
00432 EpidemicStoreItr::operator==( const DTN::BundlePointerRepr& b )
00433 {
00434 return false;
00435 }
00436
00437 DTN::BundlePointerRepr*
00438 EpidemicStoreItr::next()
00439 {
00440 EpidemicStore::iterator itr = m_store.upper_bound( *m_itr );
00441 if ( itr != m_store.end() )
00442 {
00443 return new EpidemicStoreItr( m_store, itr );
00444 }
00445 return 0;
00446 }
00447
00448 DTN::BundlePointerRepr*
00449 EpidemicStoreItr::clone()
00450 {
00451 return new EpidemicStoreItr( m_store, m_itr );
00452 }
00453
00454 DTN::Bundle*
00455 EpidemicStoreItr::bundle() const
00456 {
00457 return m_itr->bundle();
00458 }