EpidemicStore.cc

00001 // Copyright 2008 Michael Marsh, University of Maryland.
00002 //
00003 // This file is part of pydtn.
00004 //
00005 // pydtn is free software: you can redistribute it and/or modify
00006 // it under the terms of the GNU General Public License as published by
00007 // the Free Software Foundation, either version 3 of the License, or
00008 // (at your option) any later version.
00009 //
00010 // pydtn is distributed in the hope that it will be useful,
00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013 // GNU General Public License for more details.
00014 //
00015 // You should have received a copy of the GNU General Public License
00016 // along with pydtn.  If not, see <http://www.gnu.org/licenses/>.
00017 //
00018 // The views and conclusions contained in the software and documentation
00019 // are those of the authors and should not be interpreted as representing
00020 // official policies, either expressed or implied, of the University
00021 // of Maryland.
00022 //
00023 // pydtn extends and embeds the Python interpreter, which is
00024 // Copyright 2001-2006 Python Software Foundation, All Rights Reserved,
00025 // and is released under the PSF License Agreement.
00026 //
00027 // RANLUX random number generation uses the Boost library,
00028 // Copyright 1994-2006 by various authors (details in individual files),
00029 // which is released under the Boost Software License, Version 1.0.
00030 
00031 #include "simlpy/interpreter_defs.h"
00032 #include "simlpy/interpreter_hooks.h"
00033 
00034 #include "EpidemicStore.h"
00035 #include "dtn/Bundle.h"
00036 #include "simlpy/Clock.h"
00037 
00038 #include <iostream>
00039 
00040 using namespace Epidemic;
00041 
00042 //===================
00043 // EpidemicBundle
00044 //===================
00045 EpidemicBundle::EpidemicBundle( DTN::Bundle* b ) :
00046    m_bundle( b ),
00047    m_timeAdded( Clock::time() ),
00048    m_numTries( 0 )
00049 {
00050    if ( 0 != b )
00051    {
00052       m_originator = b->source();
00053       m_seqNum = b->seqNum();
00054       m_digest = EpidemicStore::digest(*b);
00055    }
00056 }
00057 
00058 EpidemicBundle::EpidemicBundle( const DTN::ByteString& originator,
00059                                 uint32_t seqNum ) :
00060    m_bundle( 0 ),
00061    m_originator( originator ),
00062    m_seqNum( seqNum ),
00063    m_numTries( 0 )
00064 {
00065    m_digest = EpidemicStore::digest(originator,seqNum);
00066 }
00067 
00068 EpidemicBundle::EpidemicBundle( const DTN::ByteString& digest ) :
00069    m_bundle( 0 ),
00070    m_numTries( 0 ),
00071    m_digest( digest )
00072 {
00073 }
00074 
00075 EpidemicBundle::~EpidemicBundle()
00076 {
00077 }
00078 
00079 DTN::Bundle*
00080 EpidemicBundle::bundle() const
00081 {
00082    return m_bundle;
00083 }
00084 
00085 bool
00086 EpidemicBundle::operator<( const EpidemicBundle& b ) const
00087 {
00088    return m_digest < b.m_digest ;
00089 }
00090 
00091 
00092 //==================
00093 // EpidemicStore
00094 //==================
00095 
00096 EpidemicStore::EpidemicStore( DTN::Node* owner, uint32_t h ) :
00097    DTN::PersistentBundleStore(owner),
00098    m_max_hops(h),
00099    m_bytesUsed(0)
00100 {
00101 }
00102 
00103 EpidemicStore::~EpidemicStore()
00104 {
00105 }
00106 
00107 DTN::BundlePointer
00108 EpidemicStore::p_addBundle( DTN::Bundle* b )
00109 {
00110    if ( 0 == b ) return DTN::BundlePointer();
00111 
00112    // Reject bundles we've already seen.
00113    if ( m_history.end() != m_history.find( digest(*b) ) )
00114       return DTN::BundlePointer();
00115 
00116    // Insert the new bundle into the store.
00117    std::pair< iterator , bool > r = m_list.insert( b );
00118    if ( r.second )
00119    {
00120       m_bytesUsed += b->size();
00121 
00122       // We'll never remove digests from this list.
00123       m_history.insert( r.first->digest() );
00124 
00125       if ( ( 0 == m_max_hops ) || ( b->hopCount() < m_max_hops ) )
00126       {
00127          m_digestMap.insert( DigestMap::value_type( r.first->digest(),
00128                                                     r.first ) );
00129       }
00130       else
00131       {
00132          m_destMap[b->destination()].push_back( r.first );
00133       }
00134       return DTN::BundlePointer( new EpidemicStoreItr(*this,r.first) );
00135    }
00136    return DTN::BundlePointer();
00137 }
00138 
00139 void
00140 EpidemicStore::p_deleteBundle( DTN::Bundle* b )
00141 {
00142    if ( 0 == b ) return;
00143 
00144    // If it's in the transferable bundle structure, remove it.
00145    m_digestMap.erase( digest(*b) );
00146 
00147    // Checking for the bundle in the last-hop delivery structure is
00148    // more complicated.  We begin by seeing if there are any bundles
00149    // for the particular destination.
00150    DestMap::iterator ditr = m_destMap.find( b->destination() );
00151    if ( m_destMap.end() != ditr )
00152    {
00153       // Having found an entry for this destination, we have to loop
00154       // over bundles looking for a match.  The set of iterators is
00155       // the second element of the pair iterator from the map.
00156       DestList::iterator itr = ditr->second.begin();
00157       DestList::iterator end = ditr->second.end();
00158       for ( ; itr != end ; ++itr )
00159       {
00160          // The addresses of the bundles should match, so we can do an
00161          // easy comparison.
00162          if ( (*itr)->bundle() == b )
00163          {
00164             // We've found the bundle's iterator, so first we erase
00165             // it.
00166             ditr->second.erase(itr);
00167 
00168             // Now we check to see if that was the last bundle for the
00169             // destination.  If so, we'll remove the map entry
00170             // completely.  This prevents cruft from accumulating.
00171             if ( 0 == ditr->second.size() )
00172             {
00173                m_destMap.erase(ditr);
00174             }
00175 
00176             // The bundle will only appear once, so we can exit the
00177             // loop.  What's more, we've invalidated the iterator
00178             // we're using in the loop, and might have invalidated the
00179             // map iterator as well.  All said, we've done all we need
00180             // to do here.
00181             break;
00182          }
00183       }
00184    }
00185 
00186    // Finally, get rid of the bundle from the persistent store and
00187    // decrement the usage.
00188    if ( 0 != m_list.erase( b ) )
00189    {
00190       m_bytesUsed -= b->size();
00191    }
00192 }
00193 
00194 void
00195 EpidemicStore::consume( DTN::Bundle* b )
00196 {
00197    if ( 0 == b ) return;
00198 
00199    // Don't store ACKs.
00200    if ( DTN::Bundle::kACK & b->type() ) return;
00201 
00202    // Don't store non-custodial bundles.
00203    if ( ! (DTN::Bundle::kCustodial & b->type()) ) return;
00204 
00205    m_history.insert( digest(*b) );
00206 }
00207 
00208 DTN::BundlePointer
00209 EpidemicStore::p_getPointer( const DTN::BundlePointer& p )
00210 {
00211    if ( p.isNull() )
00212    {
00213       if ( m_list.empty() )
00214       {
00215          return DTN::BundlePointer();
00216       }
00217       return DTN::BundlePointer(
00218          new EpidemicStoreItr(*this,m_list.begin()) );
00219    }
00220    iterator itr = m_list.lower_bound( p.repr()->bundle() );
00221    if ( m_list.end() != itr )
00222    {
00223       return DTN::BundlePointer( new EpidemicStoreItr(*this,itr) );
00224    }
00225    return DTN::BundlePointer();
00226 }
00227 
00228 DTN::BundlePointer
00229 EpidemicStore::p_getPointer( const DTN::ByteString& sender, uint32_t seqNum )
00230 {
00231    iterator itr = m_list.find( EpidemicBundle(sender,seqNum) );
00232    if ( m_list.end() == itr )
00233    {
00234       return DTN::BundlePointer();
00235    }
00236    return DTN::BundlePointer( new EpidemicStoreItr(*this,itr) );
00237 }
00238 
00239 DTN::BundlePointer
00240 EpidemicStore::getPointer( const DTN::ByteString& digest )
00241 {
00242    clean();
00243    iterator itr = m_list.find( EpidemicBundle(digest) );
00244    if ( m_list.end() == itr )
00245    {
00246       return DTN::BundlePointer();
00247    }
00248    return DTN::BundlePointer( new EpidemicStoreItr(*this,itr) );
00249 }
00250 
00251 bool
00252 EpidemicStore::p_validatePointer( const DTN::BundlePointer& p )
00253 {
00254    if ( p.isNull() ) return false;
00255    iterator itr = m_list.find( p.repr()->bundle() );
00256    if ( m_list.end() == itr ) return false;
00257    return true;
00258 }
00259 
00260 void
00261 EpidemicStore::p_shrinkStore( size_t s )
00262 {
00263    // This loop is not efficient.  If we wanted to be efficient, we'd
00264    // store a separate list of bundles sorted by time added.  However,
00265    // we don't expect this to be called often, so a little
00266    // inefficiency here does no real harm.  This is, after all, fairly
00267    // major node reconfiguration.
00268    while ( m_bytesUsed > s )
00269    {
00270       if ( 0 == m_list.size() ) return;
00271       BundleList::reverse_iterator itr = m_list.rbegin();
00272       BundleList::reverse_iterator end = m_list.rend();
00273       BundleList::reverse_iterator last = itr;
00274       for ( ; itr != end ; ++itr )
00275       {
00276          if ( itr->timeAdded() > last->timeAdded() )
00277          {
00278             last = itr;
00279          }
00280       }
00281       m_owner->recordPersistentRemove( *(last->bundle()) );
00282       p_deleteBundle(last->bundle());
00283       delete last->bundle();
00284    }
00285 }
00286 
00287 EpidemicStore::iterator
00288 EpidemicStore::begin()
00289 {
00290    return m_list.begin();
00291 }
00292 
00293 EpidemicStore::iterator
00294 EpidemicStore::lower_bound( const key_type& x )
00295 {
00296    return m_list.lower_bound( x );
00297 }
00298 
00299 EpidemicStore::iterator
00300 EpidemicStore::upper_bound( const key_type& x )
00301 {
00302    return m_list.upper_bound( x );
00303 }
00304 
00305 EpidemicStore::iterator
00306 EpidemicStore::end()
00307 {
00308    return m_list.end();
00309 }
00310 
00311 EpidemicStore::const_iterator
00312 EpidemicStore::begin() const
00313 {
00314    return m_list.begin();
00315 }
00316 
00317 EpidemicStore::const_iterator
00318 EpidemicStore::lower_bound( const key_type& x ) const
00319 {
00320    return m_list.lower_bound( x );
00321 }
00322 
00323 EpidemicStore::const_iterator
00324 EpidemicStore::upper_bound( const key_type& x ) const
00325 {
00326    return m_list.upper_bound( x );
00327 }
00328 
00329 EpidemicStore::const_iterator
00330 EpidemicStore::end() const
00331 {
00332    return m_list.end();
00333 }
00334 
00335 DTN::ByteString*
00336 EpidemicStore::summaryVector() const
00337 {
00338    DTN::ByteString* retval = new DTN::ByteString;
00339    if ( 0 == retval ) return 0;
00340    const_iterator itr = m_list.begin();
00341    const_iterator end = m_list.end();
00342    for ( ; itr != end ; ++itr )
00343    {
00345       retval->append( itr->digest() );
00346    }
00347    return retval;
00348 }
00349 
00350 DTN::ByteString
00351 EpidemicStore::digest( const DTN::Bundle& b )
00352 {
00353    return digest( b.source(), b.seqNum() );
00354 }
00355 
00356 DTN::ByteString
00357 EpidemicStore::digest( const DTN::ByteString& originator, uint32_t seqNum )
00358 {
00359    DTN::ByteString retval;
00360    unsigned char n = 4 + originator.length();
00361    retval.push_back(n);
00362    union
00363    {
00364          uint32_t i;
00365          unsigned char c[4];
00366    } seq;
00367    seq.i = seqNum;
00368    retval.append(seq.c,4);
00369    retval.append(originator);
00370    return retval;
00371 }
00372 
00373 EpidemicStore::DigestList*
00374 EpidemicStore::parseSummaryVector( const DTN::ByteString& sv )
00375 {
00376    DigestList* retval = 0;
00377    if ( 0 == sv.length() ) return retval;
00378    retval = new DigestList;
00379    for ( unsigned int i = 0 ; i < sv.length() ; )
00380    {
00381       unsigned char c = sv[i];
00382       retval->push_back( DTN::ByteString(sv,i,c+1) );
00383       i += c+1;
00384    }
00385    return retval;
00386 }
00387 
00388 void
00389 EpidemicStore::forwardLastHop( const DTN::ByteString& addr, DTN::Node& node )
00390 {
00391    DestMap::iterator ditr = m_destMap.find( addr );
00392    if ( m_destMap.end() == ditr ) return;
00393 
00394    DestList::iterator itr = ditr->second.begin();
00395    DestList::iterator end = ditr->second.end();
00396    for ( ; itr != end ; ++itr )
00397    {
00398       // Copy the bundle.  The iterator stores an iterator, hence the
00399       // double-dereference.
00400       DTN::Bundle* pB = (*itr)->bundle()->clone();
00401       // Set the next-hop receiver to the requesting node.
00402       pB->recv() = addr;
00403       // Enqueue the bundle.
00404       node.forward(pB);
00405    }
00406 }
00407 
00408 bool
00409 EpidemicStore::seen( const DTN::ByteString& digest )
00410 {
00411    return ( m_history.end() != m_history.find( digest ) );
00412 }
00413 
00414 //=====================
00415 // EpidemicStoreItr
00416 //=====================
00417 
00418 EpidemicStoreItr::EpidemicStoreItr(
00419    EpidemicStore& store,
00420    EpidemicStore::iterator itr ) :
00421    DTN::BundlePointerRepr(),
00422    m_store( store ),
00423    m_itr( itr )
00424 {
00425 }
00426 
00427 EpidemicStoreItr::~EpidemicStoreItr()
00428 {
00429 }
00430 
00431 bool
00432 EpidemicStoreItr::operator==( const DTN::BundlePointerRepr& b )
00433 {
00434    return false;
00435 }
00436 
00437 DTN::BundlePointerRepr*
00438 EpidemicStoreItr::next()
00439 {
00440    EpidemicStore::iterator itr = m_store.upper_bound( *m_itr );
00441    if ( itr != m_store.end() )
00442    {
00443       return new EpidemicStoreItr( m_store, itr );
00444    }
00445    return 0;
00446 }
00447 
00448 DTN::BundlePointerRepr*
00449 EpidemicStoreItr::clone()
00450 {
00451    return new EpidemicStoreItr( m_store, m_itr );
00452 }
00453 
00454 DTN::Bundle*
00455 EpidemicStoreItr::bundle() const
00456 {
00457    return m_itr->bundle();
00458 }

Generated on Mon Mar 24 11:15:45 2008 for Pydtn Simulator by  doxygen 1.5.4