Node.cc

00001 // Copyright 2008 Michael Marsh, University of Maryland.
00002 //
00003 // This file is part of pydtn.
00004 //
00005 // pydtn is free software: you can redistribute it and/or modify
00006 // it under the terms of the GNU General Public License as published by
00007 // the Free Software Foundation, either version 3 of the License, or
00008 // (at your option) any later version.
00009 //
00010 // pydtn is distributed in the hope that it will be useful,
00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013 // GNU General Public License for more details.
00014 //
00015 // You should have received a copy of the GNU General Public License
00016 // along with pydtn.  If not, see <http://www.gnu.org/licenses/>.
00017 //
00018 // The views and conclusions contained in the software and documentation
00019 // are those of the authors and should not be interpreted as representing
00020 // official policies, either expressed or implied, of the University
00021 // of Maryland.
00022 //
00023 // pydtn extends and embeds the Python interpreter, which is
00024 // Copyright 2001-2006 Python Software Foundation, All Rights Reserved,
00025 // and is released under the PSF License Agreement.
00026 //
00027 // RANLUX random number generation uses the Boost library,
00028 // Copyright 1994-2006 by various authors (details in individual files),
00029 // which is released under the Boost Software License, Version 1.0.
00030 
00031 #include <iostream>
00032 #include <netinet/in.h>
00033 
00034 #include "Node.h"
00035 #include "Link.h"
00036 #include "RoutingTable.h"
00037 #include "VolatileBundleStore.h"
00038 #include "PersistentBundleStore.h"
00039 #include "Exception.h"
00040 
00041 using namespace DTN;
00042 
00043 Node::Node() :
00044    m_seqNum( 0U ),
00045    m_consumer( 0 ),
00046    m_volatileCap( 4096 ),
00047    m_volatileStore( 0 ),
00048    m_persistentCap( 4096 ),
00049    m_persistentStore( 0 ),
00050    m_vstorePol( 0 ),
00051    m_forwPol( 0 ),
00052    m_custPol( 0 )
00053 {
00054 }
00055 
00056 Node::~Node()
00057 {
00058    if ( 0 != m_consumer )
00059    {
00060       delete m_consumer;
00061    }
00062    if ( 0 != m_volatileStore )
00063    {
00064       delete m_volatileStore;
00065    }
00066    if ( 0 != m_persistentStore )
00067    {
00068       delete m_persistentStore;
00069    }
00070    if ( 0 != m_vstorePol )
00071    {
00072       delete m_vstorePol;
00073    }
00074    if ( 0 != m_forwPol )
00075    {
00076       delete m_forwPol;
00077    }
00078    if ( 0 != m_custPol )
00079    {
00080       delete m_custPol;
00081    }
00082    LinkList::iterator llItr = m_links.begin();
00083    LinkList::iterator llEnd = m_links.end();
00084    for ( ; llItr != llEnd ; ++llItr )
00085    {
00086       Link* l = *llItr;
00087       if ( 0 != l ) delete l;
00088    }
00089 }
00090 
00091 void
00092 Node::forward( Bundle* b )
00093 {
00094    if ( 0 == b ) return;
00095    b->send() = m_addr;
00096    if ( 0 == m_forwPol )
00097    {
00098       // We can't forward bundles at all, so we drop them.
00099       std::cerr << "No forwarding policy exists for this node.\n"
00100                 << "All bundles will be dropped." << std::endl;
00101       drop(b);
00102       return;
00103    }
00104 
00105    // Check for bundle expiration.
00106    if ( b->expired() )
00107    {
00108       drop(b,ExpiryDrop::inst);
00109       return;
00110    }
00111 
00112    // Check for custody transfer.
00113    if ( 0 != m_custPol )
00114    {
00115       CustodyPolicy::CustodyReturn cr = m_custPol->takeCustody(*b);
00116       if ( CustodyPolicy::kCustodyTaken == cr )
00117       {
00118          if ( 0 != m_consumer )
00119          {
00120             m_consumer->custody(*b);
00121             m_consumer->persistentStore(*b);
00122          }
00123          acknowledge(*b);
00124          b->custodian() = addr();
00125       }
00126       else if ( CustodyPolicy::kDropBundle == cr )
00127       {
00128          drop(b,CustodyPolicyDrop::inst);
00129          return;
00130       }
00131    }
00132 
00133    // Try to forward.
00134    if ( ! m_forwPol->forward(b) )
00135    {
00136       // Do we have local storage (including a simple FIFO queue)?
00137       if ( 0 != m_vstorePol )
00138       {
00139          // We couldn't forward immediately, so try to store.
00140          BundlePointer p = m_vstorePol->store(b);
00141          if ( p.isNull() )
00142          {
00143             drop(b,FullQueueDrop::inst);
00144          }
00145          else
00146          {
00147             // We stored the bundle; let the forwarding policy know so
00148             // that it can update its cache.
00149             m_forwPol->cache(p);
00150          }
00151       }
00152       else
00153       {
00154          // We can't store, so we have to drop.
00155          drop(b,FullQueueDrop::inst);
00156       }
00157    }
00158 }
00159 
00160 void
00161 Node::broadcast( Bundle* b )
00162 {
00163    if ( 0 == b ) return;
00164 
00165    if ( ! b->expired() )
00166    {
00167       LinkList::const_iterator itr = m_links.begin();
00168       LinkList::const_iterator end = m_links.end();
00169       for ( ; itr != end ; ++itr )
00170       {
00171          if ( 0 != *itr )
00172          {
00173             Bundle* pB = b->clone();
00174             pB->destination() = (*itr)->remoteAddr();
00175             pB->recv() =  (*itr)->remoteAddr();
00176             pB->custodian() = m_addr;
00177             forward( pB );
00178          }
00179       }
00180    }
00181 
00182    delete b;
00183 }
00184 
00185 bool
00186 Node::addPersistent( const Bundle& b )
00187 {
00188    if ( 0 == m_persistentStore ) return false;
00189    unsigned int size = b.size();
00190    unsigned int avail = persistentCap() - usedPersistentCap();
00191    if ( size > avail ) return false;
00192    BundlePointer ptr = m_persistentStore->addBundle( b.clone() );
00193    if ( ! ptr.isNull() )
00194    {
00195       if ( 0 != m_consumer ) m_consumer->persistentStore( b );
00196       return true;
00197    }
00198    return false;
00199 }
00200 
00201 bool
00202 Node::validate( const BundlePointer& p )
00203 {
00204    if ( p.isNull() ) return false;
00205    if ( 0 == m_volatileStore ) return false;
00206    if ( ! m_volatileStore->validatePointer(p) ) return false;
00207    return true;
00208 }
00209 
00210 bool
00211 Node::validatePersistent( const BundlePointer& p )
00212 {
00213    if ( p.isNull() ) return false;
00214    if ( 0 == m_persistentStore ) return false;
00215    if ( ! m_persistentStore->validatePointer(p) ) return false;
00216    return true;
00217 }
00218 
00219 void
00220 Node::remove( Bundle* b, bool dealloc, const DropCause& c )
00221 {
00222    if ( 0 == m_volatileStore ) return;
00223    m_volatileStore->deleteBundle( b );
00224    if ( dealloc ) drop(b,c);
00225 }
00226 
00227 void
00228 Node::removePersistent( Bundle* b, bool dropping )
00229 {
00230    if ( 0 == m_persistentStore ) return;
00231    if ( 0 != m_custPol )
00232    {
00233       if ( ! m_custPol->policyRemove(*b) ) return;
00234    }
00235    m_persistentStore->deleteBundle( b );
00236    recordPersistentRemove(*b);
00237    if ( dropping )
00238    {
00239       drop(b, CustodyPolicyDrop::inst);
00240    }
00241    else
00242    {
00243       delete b;
00244    }
00245 }
00246 
00247 void
00248 Node::send( Bundle* b, Link& l )
00249 {
00250    if ( 0 == b ) return;
00251    if ( ! ( b->type() & Bundle::kBcast ) )
00252    {
00253       b->recv() = l.remoteAddr();
00254    }
00255    if ( 0 != m_consumer ) m_consumer->send( *b );
00256    l.send(b);
00257 }
00258 
00259 void
00260 Node::recv( Bundle* b )
00261 {
00262    if ( 0 == b ) return;
00263 
00264    if ( ! ( ours(b->recv()) || ( b->type() & Bundle::kBcast ) ) )
00265    {
00266       // This wasn't meant for us.  Drop silently.
00267       delete b;
00268       return;
00269    }
00270 
00271    b->incrHop();
00272    if ( 0 != m_consumer ) m_consumer->recv(*b);
00273 
00274    if ( ours(b->destination()) || ( b->type() & Bundle::kBcast ) )
00275    {
00276       consume(b);
00277    }
00278    else
00279    {
00280       forward(b);
00281    }
00282 }
00283 
00284 void
00285 Node::forwardOn( Link& l )
00286 {
00287    if ( 0 != m_forwPol )
00288    {
00289       while ( 1 )
00290       {
00291          Bundle* b = m_forwPol->forwardOn(l);
00292          if ( 0 == b ) return;
00293          if ( b->expired() )
00294          {
00295             drop(b, ExpiryDrop::inst);
00296          }
00297          else
00298          {
00299             forward(b);
00300             if ( ! l.available() )
00301             {
00302                return;
00303             }
00304          }
00305       }
00306    }
00307 }
00308 
00309 void
00310 Node::retryStored()
00311 {
00312    if ( 0 != m_custPol )
00313    {
00314       m_custPol->retry();
00315    }
00316 }
00317 
00318 void
00319 Node::drop( Bundle* b, const DropCause& c )
00320 {
00321    if ( 0 != m_consumer )
00322    {
00323       m_consumer->drop(*b,c);
00324    }
00325    delete b;
00326 }
00327 
00328 void
00329 Node::recordPersistentRemove( const Bundle& b, bool cleanup )
00330 {
00331    if ( 0 != m_consumer )
00332    {
00333       m_consumer->persistentRemove(b,cleanup);
00334    }
00335 }
00336 
00337 void
00338 Node::setAddr( const ByteString& addr )
00339 {
00340    m_addr = addr;
00341 }
00342 
00343 bool
00344 Node::ours( const ByteString& destination )
00345 {
00346    return ( destination == m_addr );
00347 }
00348 
00349 void
00350 Node::setConsumer( Consumer* c )
00351 {
00352    if ( 0 == c ) return;
00353    if ( 0 != m_consumer )
00354    {
00355       delete m_consumer;
00356    }
00357    m_consumer = c;
00358 }
00359 
00360 size_t
00361 Node::usedVolatileCap( bool cleanup ) const
00362 {
00363    if ( 0 == m_volatileStore ) return 0;
00364    return m_volatileStore->bytesUsed(cleanup);
00365 }
00366 
00367 size_t
00368 Node::volatileBundles( bool cleanup ) const
00369 {
00370    if ( 0 == m_volatileStore ) return 0;
00371    return m_volatileStore->bundles(cleanup);
00372 }
00373 
00374 size_t
00375 Node::usedPersistentCap( bool cleanup ) const
00376 {
00377    if ( 0 == m_persistentStore ) return 0;
00378    return m_persistentStore->bytesUsed(cleanup);
00379 }
00380 
00381 size_t
00382 Node::persistentBundles( bool cleanup ) const
00383 {
00384    if ( 0 == m_persistentStore ) return 0;
00385    return m_persistentStore->bundles(cleanup);
00386 }
00387 
00388 BundlePointer
00389 Node::cachedVolatile()
00390 {
00391    if ( 0 == m_volatileStore )
00392    {
00393       return BundlePointer();
00394    }
00395    return m_volatileStore->getPointer( BundlePointer() );
00396 }
00397 
00398 const BundlePointer
00399 Node::cachedVolatile() const
00400 {
00401    if ( 0 == m_volatileStore )
00402    {
00403       return BundlePointer();
00404    }
00405    return m_volatileStore->getPointer( BundlePointer() );
00406 }
00407 
00408 BundlePointer
00409 Node::cachedVolatile( const BundlePointer& p )
00410 {
00411    if ( 0 == m_volatileStore )
00412    {
00413       return BundlePointer();
00414    }
00415    return m_volatileStore->getPointer( p );
00416 }
00417 
00418 const BundlePointer
00419 Node::cachedVolatile( const BundlePointer& p ) const
00420 {
00421    if ( 0 == m_volatileStore )
00422    {
00423       return BundlePointer();
00424    }
00425    return m_volatileStore->getPointer( p );
00426 }
00427 
00428 BundlePointer
00429 Node::cachedVolatile( const ByteString& sender,
00430                       uint32_t seqNum )
00431 {
00432    if ( 0 == m_volatileStore )
00433    {
00434       return BundlePointer();
00435    }
00436    return m_volatileStore->getPointer( sender, seqNum );
00437 }
00438 
00439 const BundlePointer
00440 Node::cachedVolatile( const ByteString& sender,
00441                       uint32_t seqNum ) const
00442 {
00443    if ( 0 == m_volatileStore )
00444    {
00445       return BundlePointer();
00446    }
00447    return m_volatileStore->getPointer( sender, seqNum );
00448 }
00449 
00450 BundlePointer
00451 Node::cachedPersistent()
00452 {
00453    if ( 0 == m_persistentStore )
00454    {
00455       return BundlePointer();
00456    }
00457    return m_persistentStore->getPointer( BundlePointer() );
00458 }
00459 
00460 const BundlePointer
00461 Node::cachedPersistent() const
00462 {
00463    if ( 0 == m_persistentStore )
00464    {
00465       return BundlePointer();
00466    }
00467    return m_persistentStore->getPointer( BundlePointer() );
00468 }
00469 
00470 BundlePointer
00471 Node::cachedPersistent( const BundlePointer& p )
00472 {
00473    if ( 0 == m_persistentStore )
00474    {
00475       return BundlePointer();
00476    }
00477    return m_persistentStore->getPointer( p );
00478 }
00479 
00480 const BundlePointer
00481 Node::cachedPersistent( const BundlePointer& p ) const
00482 {
00483    if ( 0 == m_persistentStore )
00484    {
00485       return BundlePointer();
00486    }
00487    return m_persistentStore->getPointer( p );
00488 }
00489 
00490 BundlePointer
00491 Node::cachedPersistent( const ByteString& sender,
00492                         uint32_t seqNum )
00493 {
00494    if ( 0 == m_persistentStore )
00495    {
00496       return BundlePointer();
00497    }
00498    return m_persistentStore->getPointer( sender, seqNum );
00499 }
00500 
00501 const BundlePointer
00502 Node::cachedPersistent( const ByteString& sender,
00503                         uint32_t seqNum ) const
00504 {
00505    if ( 0 == m_persistentStore )
00506    {
00507       return BundlePointer();
00508    }
00509    return m_persistentStore->getPointer( sender, seqNum );
00510 }
00511 
00512 void
00513 Node::setVolatileStore( VolatileBundleStore* s )
00514 {
00515    if ( 0 == s ) return;
00516    if ( 0 != m_volatileStore )
00517    {
00518       delete m_volatileStore;
00519    }
00520    m_volatileStore = s;
00521    if ( 0 != m_vstorePol ) m_vstorePol->setStore( m_volatileStore );
00522    if ( 0 != m_forwPol ) m_forwPol->setStore( m_volatileStore );
00523 }
00524 
00525 void
00526 Node::setPersistentStore( PersistentBundleStore* s )
00527 {
00528    if ( 0 == s ) return;
00529    if ( 0 != m_persistentStore )
00530    {
00531       delete m_persistentStore;
00532    }
00533    m_persistentStore = s;
00534    if ( 0 != m_custPol ) m_custPol->setStore( m_persistentStore );
00535 }
00536 
00537 void
00538 Node::setVolatileCap( size_t cap )
00539 {
00540    if ( 0 != m_volatileStore )
00541    {
00542       if ( cap < m_volatileCap ) m_volatileStore->shrinkStore(cap);
00543    }
00544    m_volatileCap = cap;
00545 }
00546 
00547 void
00548 Node::setPersistentCap( size_t cap )
00549 {
00550    if ( 0 != m_persistentStore )
00551    {
00552       if ( cap < m_persistentCap ) m_persistentStore->shrinkStore(cap);
00553    }
00554    m_persistentCap = cap;
00555 }
00556 
00557 void
00558 Node::setVolatileStorePolicy( VolatileStorePolicy* p )
00559 {
00560    if ( 0 == p ) return;
00561    if ( 0 != m_vstorePol ) delete m_vstorePol;
00562    m_vstorePol = p;
00563    m_vstorePol->setStore( m_volatileStore );
00564 }
00565 
00566 void
00567 Node::setForwardingPolicy( ForwardingPolicy* p )
00568 {
00569    if ( 0 == p ) return;
00570    if ( 0 != m_forwPol ) delete m_forwPol;
00571    m_forwPol = p;
00572    m_forwPol->setStore( m_volatileStore );
00573 }
00574 
00575 void
00576 Node::setCustodyPolicy( CustodyPolicy* p )
00577 {
00578    if ( 0 == p ) return;
00579    if ( 0 != m_custPol ) delete m_custPol;
00580    m_custPol = p;
00581    m_custPol->setStore( m_persistentStore );
00582 }
00583 
00584 void
00585 Node::addLink( Link* l )
00586 {
00587    if ( 0 != l )
00588    {
00589       m_links.insert( l );
00590    }
00591 }
00592 
00593 void
00594 Node::signalExhausted( const Bundle& b )
00595 {
00596    if ( 0 != m_consumer )
00597    {
00598       m_consumer->exhausted(b);
00599    }
00600 }
00601 
00602 void
00603 Node::consume( Bundle* b )
00604 {
00605    if ( 0 == b ) return;
00606    Bundle::BundleType t = b->type();
00607 
00608    if ( 0 != m_persistentStore )
00609    {
00610       m_persistentStore->consume(b);
00611    }
00612    if ( 0 != m_consumer )
00613    {
00614       (*m_consumer)(*b);
00615    }
00616 
00617    if ( Bundle::kACK & t )
00618    {
00619       // Clean out the persistent store.
00620       if ( 0 != m_persistentStore )
00621       {
00622          BundlePointer p = m_persistentStore->getPointer( b->source(),
00623                                                           b->seqNum() );
00624          if ( ! p.isNull() )
00625          {
00626             Bundle* pB = p.repr()->bundle();
00627             if ( (0 != m_custPol) && m_custPol->policyRemove( *pB ) )
00628             {
00629                m_persistentStore->deleteBundle( pB );
00630                recordPersistentRemove(*pB);
00631                delete pB;
00632             }
00633          }
00634       }
00635    }
00636    else
00637    {
00638       // Generate an acknowledgment.
00639       acknowledge( *b );
00640    }
00641 
00642    delete b;
00643 }
00644 
00645 void
00646 Node::acknowledge( const Bundle& b )
00647 {
00648    // Make sure this is an acknowledgeable bundle.
00649    Bundle::BundleType t = b.type();
00650    if ( Bundle::kNoACK & t )
00651    {
00652       return;
00653    }
00654 
00655    struct timeval tv = dtn_time();
00656 
00657    // Use the same time-to-live as the original Bundle, since we really
00658    // have no better guess.
00659    struct timeval e;
00660    e.tv_sec = tv.tv_sec + b.expiry().tv_sec - b.created().tv_sec;
00661    e.tv_usec = tv.tv_usec + b.expiry().tv_usec - b.created().tv_usec;
00662    e.tv_sec += ( e.tv_usec / 1000000 );
00663    e.tv_usec = ( e.tv_usec % 1000000 );
00664 
00665    Bundle* ack = new Bundle( ntohl(b.seqNum()),
00666                              b.source(),
00667                              b.custodian(),
00668                              tv,
00669                              e,
00670                              b.type() | Bundle::kACK );
00671    ack->custodian() = addr();
00672    forward( ack );
00673 }

Generated on Mon Mar 24 11:15:45 2008 for Pydtn Simulator by  doxygen 1.5.4