00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include <iostream>
00032 #include <netinet/in.h>
00033
00034 #include "Node.h"
00035 #include "Link.h"
00036 #include "RoutingTable.h"
00037 #include "VolatileBundleStore.h"
00038 #include "PersistentBundleStore.h"
00039 #include "Exception.h"
00040
00041 using namespace DTN;
00042
00043 Node::Node() :
00044 m_seqNum( 0U ),
00045 m_consumer( 0 ),
00046 m_volatileCap( 4096 ),
00047 m_volatileStore( 0 ),
00048 m_persistentCap( 4096 ),
00049 m_persistentStore( 0 ),
00050 m_vstorePol( 0 ),
00051 m_forwPol( 0 ),
00052 m_custPol( 0 )
00053 {
00054 }
00055
00056 Node::~Node()
00057 {
00058 if ( 0 != m_consumer )
00059 {
00060 delete m_consumer;
00061 }
00062 if ( 0 != m_volatileStore )
00063 {
00064 delete m_volatileStore;
00065 }
00066 if ( 0 != m_persistentStore )
00067 {
00068 delete m_persistentStore;
00069 }
00070 if ( 0 != m_vstorePol )
00071 {
00072 delete m_vstorePol;
00073 }
00074 if ( 0 != m_forwPol )
00075 {
00076 delete m_forwPol;
00077 }
00078 if ( 0 != m_custPol )
00079 {
00080 delete m_custPol;
00081 }
00082 LinkList::iterator llItr = m_links.begin();
00083 LinkList::iterator llEnd = m_links.end();
00084 for ( ; llItr != llEnd ; ++llItr )
00085 {
00086 Link* l = *llItr;
00087 if ( 0 != l ) delete l;
00088 }
00089 }
00090
00091 void
00092 Node::forward( Bundle* b )
00093 {
00094 if ( 0 == b ) return;
00095 b->send() = m_addr;
00096 if ( 0 == m_forwPol )
00097 {
00098
00099 std::cerr << "No forwarding policy exists for this node.\n"
00100 << "All bundles will be dropped." << std::endl;
00101 drop(b);
00102 return;
00103 }
00104
00105
00106 if ( b->expired() )
00107 {
00108 drop(b,ExpiryDrop::inst);
00109 return;
00110 }
00111
00112
00113 if ( 0 != m_custPol )
00114 {
00115 CustodyPolicy::CustodyReturn cr = m_custPol->takeCustody(*b);
00116 if ( CustodyPolicy::kCustodyTaken == cr )
00117 {
00118 if ( 0 != m_consumer )
00119 {
00120 m_consumer->custody(*b);
00121 m_consumer->persistentStore(*b);
00122 }
00123 acknowledge(*b);
00124 b->custodian() = addr();
00125 }
00126 else if ( CustodyPolicy::kDropBundle == cr )
00127 {
00128 drop(b,CustodyPolicyDrop::inst);
00129 return;
00130 }
00131 }
00132
00133
00134 if ( ! m_forwPol->forward(b) )
00135 {
00136
00137 if ( 0 != m_vstorePol )
00138 {
00139
00140 BundlePointer p = m_vstorePol->store(b);
00141 if ( p.isNull() )
00142 {
00143 drop(b,FullQueueDrop::inst);
00144 }
00145 else
00146 {
00147
00148
00149 m_forwPol->cache(p);
00150 }
00151 }
00152 else
00153 {
00154
00155 drop(b,FullQueueDrop::inst);
00156 }
00157 }
00158 }
00159
00160 void
00161 Node::broadcast( Bundle* b )
00162 {
00163 if ( 0 == b ) return;
00164
00165 if ( ! b->expired() )
00166 {
00167 LinkList::const_iterator itr = m_links.begin();
00168 LinkList::const_iterator end = m_links.end();
00169 for ( ; itr != end ; ++itr )
00170 {
00171 if ( 0 != *itr )
00172 {
00173 Bundle* pB = b->clone();
00174 pB->destination() = (*itr)->remoteAddr();
00175 pB->recv() = (*itr)->remoteAddr();
00176 pB->custodian() = m_addr;
00177 forward( pB );
00178 }
00179 }
00180 }
00181
00182 delete b;
00183 }
00184
00185 bool
00186 Node::addPersistent( const Bundle& b )
00187 {
00188 if ( 0 == m_persistentStore ) return false;
00189 unsigned int size = b.size();
00190 unsigned int avail = persistentCap() - usedPersistentCap();
00191 if ( size > avail ) return false;
00192 BundlePointer ptr = m_persistentStore->addBundle( b.clone() );
00193 if ( ! ptr.isNull() )
00194 {
00195 if ( 0 != m_consumer ) m_consumer->persistentStore( b );
00196 return true;
00197 }
00198 return false;
00199 }
00200
00201 bool
00202 Node::validate( const BundlePointer& p )
00203 {
00204 if ( p.isNull() ) return false;
00205 if ( 0 == m_volatileStore ) return false;
00206 if ( ! m_volatileStore->validatePointer(p) ) return false;
00207 return true;
00208 }
00209
00210 bool
00211 Node::validatePersistent( const BundlePointer& p )
00212 {
00213 if ( p.isNull() ) return false;
00214 if ( 0 == m_persistentStore ) return false;
00215 if ( ! m_persistentStore->validatePointer(p) ) return false;
00216 return true;
00217 }
00218
00219 void
00220 Node::remove( Bundle* b, bool dealloc, const DropCause& c )
00221 {
00222 if ( 0 == m_volatileStore ) return;
00223 m_volatileStore->deleteBundle( b );
00224 if ( dealloc ) drop(b,c);
00225 }
00226
00227 void
00228 Node::removePersistent( Bundle* b, bool dropping )
00229 {
00230 if ( 0 == m_persistentStore ) return;
00231 if ( 0 != m_custPol )
00232 {
00233 if ( ! m_custPol->policyRemove(*b) ) return;
00234 }
00235 m_persistentStore->deleteBundle( b );
00236 recordPersistentRemove(*b);
00237 if ( dropping )
00238 {
00239 drop(b, CustodyPolicyDrop::inst);
00240 }
00241 else
00242 {
00243 delete b;
00244 }
00245 }
00246
00247 void
00248 Node::send( Bundle* b, Link& l )
00249 {
00250 if ( 0 == b ) return;
00251 if ( ! ( b->type() & Bundle::kBcast ) )
00252 {
00253 b->recv() = l.remoteAddr();
00254 }
00255 if ( 0 != m_consumer ) m_consumer->send( *b );
00256 l.send(b);
00257 }
00258
00259 void
00260 Node::recv( Bundle* b )
00261 {
00262 if ( 0 == b ) return;
00263
00264 if ( ! ( ours(b->recv()) || ( b->type() & Bundle::kBcast ) ) )
00265 {
00266
00267 delete b;
00268 return;
00269 }
00270
00271 b->incrHop();
00272 if ( 0 != m_consumer ) m_consumer->recv(*b);
00273
00274 if ( ours(b->destination()) || ( b->type() & Bundle::kBcast ) )
00275 {
00276 consume(b);
00277 }
00278 else
00279 {
00280 forward(b);
00281 }
00282 }
00283
00284 void
00285 Node::forwardOn( Link& l )
00286 {
00287 if ( 0 != m_forwPol )
00288 {
00289 while ( 1 )
00290 {
00291 Bundle* b = m_forwPol->forwardOn(l);
00292 if ( 0 == b ) return;
00293 if ( b->expired() )
00294 {
00295 drop(b, ExpiryDrop::inst);
00296 }
00297 else
00298 {
00299 forward(b);
00300 if ( ! l.available() )
00301 {
00302 return;
00303 }
00304 }
00305 }
00306 }
00307 }
00308
00309 void
00310 Node::retryStored()
00311 {
00312 if ( 0 != m_custPol )
00313 {
00314 m_custPol->retry();
00315 }
00316 }
00317
00318 void
00319 Node::drop( Bundle* b, const DropCause& c )
00320 {
00321 if ( 0 != m_consumer )
00322 {
00323 m_consumer->drop(*b,c);
00324 }
00325 delete b;
00326 }
00327
00328 void
00329 Node::recordPersistentRemove( const Bundle& b, bool cleanup )
00330 {
00331 if ( 0 != m_consumer )
00332 {
00333 m_consumer->persistentRemove(b,cleanup);
00334 }
00335 }
00336
00337 void
00338 Node::setAddr( const ByteString& addr )
00339 {
00340 m_addr = addr;
00341 }
00342
00343 bool
00344 Node::ours( const ByteString& destination )
00345 {
00346 return ( destination == m_addr );
00347 }
00348
00349 void
00350 Node::setConsumer( Consumer* c )
00351 {
00352 if ( 0 == c ) return;
00353 if ( 0 != m_consumer )
00354 {
00355 delete m_consumer;
00356 }
00357 m_consumer = c;
00358 }
00359
00360 size_t
00361 Node::usedVolatileCap( bool cleanup ) const
00362 {
00363 if ( 0 == m_volatileStore ) return 0;
00364 return m_volatileStore->bytesUsed(cleanup);
00365 }
00366
00367 size_t
00368 Node::volatileBundles( bool cleanup ) const
00369 {
00370 if ( 0 == m_volatileStore ) return 0;
00371 return m_volatileStore->bundles(cleanup);
00372 }
00373
00374 size_t
00375 Node::usedPersistentCap( bool cleanup ) const
00376 {
00377 if ( 0 == m_persistentStore ) return 0;
00378 return m_persistentStore->bytesUsed(cleanup);
00379 }
00380
00381 size_t
00382 Node::persistentBundles( bool cleanup ) const
00383 {
00384 if ( 0 == m_persistentStore ) return 0;
00385 return m_persistentStore->bundles(cleanup);
00386 }
00387
00388 BundlePointer
00389 Node::cachedVolatile()
00390 {
00391 if ( 0 == m_volatileStore )
00392 {
00393 return BundlePointer();
00394 }
00395 return m_volatileStore->getPointer( BundlePointer() );
00396 }
00397
00398 const BundlePointer
00399 Node::cachedVolatile() const
00400 {
00401 if ( 0 == m_volatileStore )
00402 {
00403 return BundlePointer();
00404 }
00405 return m_volatileStore->getPointer( BundlePointer() );
00406 }
00407
00408 BundlePointer
00409 Node::cachedVolatile( const BundlePointer& p )
00410 {
00411 if ( 0 == m_volatileStore )
00412 {
00413 return BundlePointer();
00414 }
00415 return m_volatileStore->getPointer( p );
00416 }
00417
00418 const BundlePointer
00419 Node::cachedVolatile( const BundlePointer& p ) const
00420 {
00421 if ( 0 == m_volatileStore )
00422 {
00423 return BundlePointer();
00424 }
00425 return m_volatileStore->getPointer( p );
00426 }
00427
00428 BundlePointer
00429 Node::cachedVolatile( const ByteString& sender,
00430 uint32_t seqNum )
00431 {
00432 if ( 0 == m_volatileStore )
00433 {
00434 return BundlePointer();
00435 }
00436 return m_volatileStore->getPointer( sender, seqNum );
00437 }
00438
00439 const BundlePointer
00440 Node::cachedVolatile( const ByteString& sender,
00441 uint32_t seqNum ) const
00442 {
00443 if ( 0 == m_volatileStore )
00444 {
00445 return BundlePointer();
00446 }
00447 return m_volatileStore->getPointer( sender, seqNum );
00448 }
00449
00450 BundlePointer
00451 Node::cachedPersistent()
00452 {
00453 if ( 0 == m_persistentStore )
00454 {
00455 return BundlePointer();
00456 }
00457 return m_persistentStore->getPointer( BundlePointer() );
00458 }
00459
00460 const BundlePointer
00461 Node::cachedPersistent() const
00462 {
00463 if ( 0 == m_persistentStore )
00464 {
00465 return BundlePointer();
00466 }
00467 return m_persistentStore->getPointer( BundlePointer() );
00468 }
00469
00470 BundlePointer
00471 Node::cachedPersistent( const BundlePointer& p )
00472 {
00473 if ( 0 == m_persistentStore )
00474 {
00475 return BundlePointer();
00476 }
00477 return m_persistentStore->getPointer( p );
00478 }
00479
00480 const BundlePointer
00481 Node::cachedPersistent( const BundlePointer& p ) const
00482 {
00483 if ( 0 == m_persistentStore )
00484 {
00485 return BundlePointer();
00486 }
00487 return m_persistentStore->getPointer( p );
00488 }
00489
00490 BundlePointer
00491 Node::cachedPersistent( const ByteString& sender,
00492 uint32_t seqNum )
00493 {
00494 if ( 0 == m_persistentStore )
00495 {
00496 return BundlePointer();
00497 }
00498 return m_persistentStore->getPointer( sender, seqNum );
00499 }
00500
00501 const BundlePointer
00502 Node::cachedPersistent( const ByteString& sender,
00503 uint32_t seqNum ) const
00504 {
00505 if ( 0 == m_persistentStore )
00506 {
00507 return BundlePointer();
00508 }
00509 return m_persistentStore->getPointer( sender, seqNum );
00510 }
00511
00512 void
00513 Node::setVolatileStore( VolatileBundleStore* s )
00514 {
00515 if ( 0 == s ) return;
00516 if ( 0 != m_volatileStore )
00517 {
00518 delete m_volatileStore;
00519 }
00520 m_volatileStore = s;
00521 if ( 0 != m_vstorePol ) m_vstorePol->setStore( m_volatileStore );
00522 if ( 0 != m_forwPol ) m_forwPol->setStore( m_volatileStore );
00523 }
00524
00525 void
00526 Node::setPersistentStore( PersistentBundleStore* s )
00527 {
00528 if ( 0 == s ) return;
00529 if ( 0 != m_persistentStore )
00530 {
00531 delete m_persistentStore;
00532 }
00533 m_persistentStore = s;
00534 if ( 0 != m_custPol ) m_custPol->setStore( m_persistentStore );
00535 }
00536
00537 void
00538 Node::setVolatileCap( size_t cap )
00539 {
00540 if ( 0 != m_volatileStore )
00541 {
00542 if ( cap < m_volatileCap ) m_volatileStore->shrinkStore(cap);
00543 }
00544 m_volatileCap = cap;
00545 }
00546
00547 void
00548 Node::setPersistentCap( size_t cap )
00549 {
00550 if ( 0 != m_persistentStore )
00551 {
00552 if ( cap < m_persistentCap ) m_persistentStore->shrinkStore(cap);
00553 }
00554 m_persistentCap = cap;
00555 }
00556
00557 void
00558 Node::setVolatileStorePolicy( VolatileStorePolicy* p )
00559 {
00560 if ( 0 == p ) return;
00561 if ( 0 != m_vstorePol ) delete m_vstorePol;
00562 m_vstorePol = p;
00563 m_vstorePol->setStore( m_volatileStore );
00564 }
00565
00566 void
00567 Node::setForwardingPolicy( ForwardingPolicy* p )
00568 {
00569 if ( 0 == p ) return;
00570 if ( 0 != m_forwPol ) delete m_forwPol;
00571 m_forwPol = p;
00572 m_forwPol->setStore( m_volatileStore );
00573 }
00574
00575 void
00576 Node::setCustodyPolicy( CustodyPolicy* p )
00577 {
00578 if ( 0 == p ) return;
00579 if ( 0 != m_custPol ) delete m_custPol;
00580 m_custPol = p;
00581 m_custPol->setStore( m_persistentStore );
00582 }
00583
00584 void
00585 Node::addLink( Link* l )
00586 {
00587 if ( 0 != l )
00588 {
00589 m_links.insert( l );
00590 }
00591 }
00592
00593 void
00594 Node::signalExhausted( const Bundle& b )
00595 {
00596 if ( 0 != m_consumer )
00597 {
00598 m_consumer->exhausted(b);
00599 }
00600 }
00601
00602 void
00603 Node::consume( Bundle* b )
00604 {
00605 if ( 0 == b ) return;
00606 Bundle::BundleType t = b->type();
00607
00608 if ( 0 != m_persistentStore )
00609 {
00610 m_persistentStore->consume(b);
00611 }
00612 if ( 0 != m_consumer )
00613 {
00614 (*m_consumer)(*b);
00615 }
00616
00617 if ( Bundle::kACK & t )
00618 {
00619
00620 if ( 0 != m_persistentStore )
00621 {
00622 BundlePointer p = m_persistentStore->getPointer( b->source(),
00623 b->seqNum() );
00624 if ( ! p.isNull() )
00625 {
00626 Bundle* pB = p.repr()->bundle();
00627 if ( (0 != m_custPol) && m_custPol->policyRemove( *pB ) )
00628 {
00629 m_persistentStore->deleteBundle( pB );
00630 recordPersistentRemove(*pB);
00631 delete pB;
00632 }
00633 }
00634 }
00635 }
00636 else
00637 {
00638
00639 acknowledge( *b );
00640 }
00641
00642 delete b;
00643 }
00644
00645 void
00646 Node::acknowledge( const Bundle& b )
00647 {
00648
00649 Bundle::BundleType t = b.type();
00650 if ( Bundle::kNoACK & t )
00651 {
00652 return;
00653 }
00654
00655 struct timeval tv = dtn_time();
00656
00657
00658
00659 struct timeval e;
00660 e.tv_sec = tv.tv_sec + b.expiry().tv_sec - b.created().tv_sec;
00661 e.tv_usec = tv.tv_usec + b.expiry().tv_usec - b.created().tv_usec;
00662 e.tv_sec += ( e.tv_usec / 1000000 );
00663 e.tv_usec = ( e.tv_usec % 1000000 );
00664
00665 Bundle* ack = new Bundle( ntohl(b.seqNum()),
00666 b.source(),
00667 b.custodian(),
00668 tv,
00669 e,
00670 b.type() | Bundle::kACK );
00671 ack->custodian() = addr();
00672 forward( ack );
00673 }