00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include "simlpy/interpreter_defs.h"
00032
00033 #include <iostream>
00034
00035 #include "Globals.h"
00036 #include "WrapNode.h"
00037 #include "WrapLink.h"
00038 #include "parsers.h"
00039 #include "MockBundle.h"
00040 #include "PydtnBundle.h"
00041 #include "ForwardingEntity.h"
00042 #include "CustodyEntity.h"
00043 #include "BundleEvent.h"
00044 #include "ResendEvent.h"
00045 #include "FlatStore.h"
00046 #include "PersistentStore.h"
00047 #include "DumpConsumer.h"
00048 #include "ConsumerAlias.h"
00049 #include "GlobalStatistics.h"
00050 #include "Tracer.h"
00051 #include "TraceConsumer.h"
00052
00053 #include "simlpy/Clock.h"
00054 #include "simlpy/Registry.h"
00055 #include "simlpy/interpreter_hooks.h"
00056 #include "simlpy/SimulationException.h"
00057
00058 #include "dtn/DropTail.h"
00059
00060 struct timeval
00061 DTN::dtn_time()
00062 {
00063 return Clock::time().tv;
00064 }
00065
00066 WrapNode::WrapNode() :
00067 m_bundleLifetime( Globals::bundleLifetime() ),
00068 m_resendPeriod( Globals::resendPeriod() ),
00069 m_resending( false )
00070 {
00071 m_node.setVolatileCap( Globals::volatileCap() );
00072 m_node.setPersistentCap( Globals::persistentCap() );
00073 m_consumers = new DTN::ConsumerChain(&m_node);
00074 m_node.setConsumer( m_consumers );
00075 m_consumers->addConsumer( new DumpConsumer(&m_node) );
00076 GlobalStatistics* gs = GlobalStatistics::instance();
00077 if ( 0 != gs )
00078 {
00079 gs->addNode(&m_node);
00080 m_consumers->addConsumer( new ConsumerAlias(&m_node,gs) );
00081 }
00082 if ( 0 != GlobalTracer::instance() )
00083 {
00084 m_consumers->addConsumer( new TraceConsumer(&m_node) );
00085 }
00086 m_node.setVolatileStorePolicy( new DTN::DropTail(&m_node) );
00087 m_node.setForwardingPolicy( 0 );
00088 m_node.setVolatileStore( new FlatStore(&m_node) );
00089 m_node.setPersistentStore( new PersistentStore(this) );
00090 }
00091
00092 WrapNode::~WrapNode()
00093 {
00094
00095 CollectorList::iterator itr = m_collectors.begin();
00096 CollectorList::iterator end = m_collectors.end();
00097 for ( ; itr != end ; ++itr )
00098 {
00099 if ( 0 != *itr )
00100 {
00101 delete *itr;
00102 }
00103 }
00104 }
00105
00106 Entity*
00107 WrapNode::create() const
00108 {
00109 return new WrapNode;
00110 }
00111
00112 void
00113 WrapNode::configure( const ArgList& args )
00114 {
00115 unsigned int nargs = args.size();
00116 if ( nargs < 2 )
00117 {
00118 std::cerr << "config() takes at least 2 arguments ("
00119 << nargs << " given)"
00120 << std::endl;
00121 throw SimulationException( __FILE__ , __LINE__ );
00122 }
00123 std::string command = parse_string(args[0]);
00124 if ( "addr" == command )
00125 {
00126 std::string addrString = parse_string( args[1] );
00127 DTN::ByteString a;
00128 for ( unsigned int i = 0 ; i < addrString.length() ; ++i )
00129 {
00130 a += addrString[i];
00131 }
00132 m_node.setAddr( a );
00133 }
00134 else if ( "capacity" == command )
00135 {
00136 double dcap = parse_double( args[1] );
00137
00138 size_t cap = (size_t)(1024 * dcap);
00139 m_node.setVolatileCap(cap);
00140 }
00141 else if ( "forwarding" == command )
00142 {
00143 ForwardingEntity* fe = 0;
00144 try
00145 {
00146 ItemWrapper item = resolve_symbol( args[1] );
00147 if ( 0 != item.type.find( "forwarding:" ) )
00148 {
00149 throw SimulationException( __FILE__ , __LINE__ );
00150 }
00151 fe = (ForwardingEntity*)(item.item);
00152
00153 DTN::ForwardingPolicy* fwd = fe->generatePolicy( &m_node );
00154 if ( 0 == fwd )
00155 {
00156 throw SimulationException( __FILE__ , __LINE__ );
00157 }
00158 m_node.setForwardingPolicy( fwd );
00159 }
00160 catch ( ... )
00161 {
00162 std::cerr << "The second argument of config('forwarding',pol) "
00163 << "must be an entity\ncapable of generating a "
00164 << "forwarding policy"
00165 << std::endl;
00166 }
00167 }
00168 else if ( "stable_store" == command )
00169 {
00170 double dcap = parse_double( args[1] );
00171
00172 size_t cap = (size_t)(1024 * dcap);
00173 m_node.setPersistentCap(cap);
00174 }
00175 else if ( "custody" == command )
00176 {
00177 Entity* e = 0;
00178 std::string ptype = parse_string( args[1] );
00179 try
00180 {
00181 e = Registry::lookup( "custody:" + ptype );
00182 }
00183 catch ( ... )
00184 {
00185 std::cerr << "The custody policy type " << ptype
00186 << " is not recognized"
00187 << std::endl;
00188 throw;
00189 }
00190 CustodyEntity* ce = (CustodyEntity*)( e->create() );
00191 if ( 0 == ce )
00192 {
00193 throw SimulationException( __FILE__ , __LINE__ );
00194 }
00195 m_node.setCustodyPolicy( ce->generatePolicy( &m_node ) );
00196 delete ce;
00197 }
00198 else if ( "bundle_lifetime" == command )
00199 {
00200 m_bundleLifetime = parse_time(args[1]);
00201 }
00202 else if ( "resend" == command )
00203 {
00204 m_resendPeriod = parse_time(args[1]);
00205 }
00206 else
00207 {
00208 std::cerr << "Unrecognized configuration option: " << command
00209 << std::endl;
00210 throw SimulationException( __FILE__ , __LINE__ );
00211 }
00212
00213
00214 GlobalTracer* gt = GlobalTracer::instance();
00215 if ( 0 != gt )
00216 {
00217 gt->node(*this);
00218 }
00219 }
00220
00221 void
00222 WrapNode::emit( const ArgList& args )
00223 {
00224 unsigned int nargs = args.size();
00225 if ( nargs < 3 )
00226 {
00227 std::cerr << "emit() takes at least 3 arguments ("
00228 << nargs << " given)"
00229 << std::endl;
00230 throw SimulationException( __FILE__ , __LINE__ );
00231 }
00232
00233 Time t = parse_time(args[0]);
00234
00235 std::string command = parse_string(args[1]);
00236 if ( "data" == command )
00237 {
00238 if ( nargs < 4 )
00239 {
00240 std::cerr << "emit(t,\"data\") takes at least 4 arguments ("
00241 << nargs << " given)"
00242 << std::endl;
00243 throw SimulationException( __FILE__ , __LINE__ );
00244 }
00245
00246 WrapNode* dest = parse_node( args[2] );
00247 if ( 0 == dest )
00248 {
00249 std::cerr << "The third argument of emit() must be a node"
00250 << std::endl;
00251 throw SimulationException( __FILE__ , __LINE__ );
00252 }
00253 std::string dataString = parse_string(args[3]);
00254 DTN::ByteString dat;
00255 for ( unsigned int i = 0 ; i < dataString.length() ; ++i )
00256 {
00257 dat += dataString[i];
00258 }
00259 DTN::Bundle::BundleType type =
00260 DTN::Bundle::kData | DTN::Bundle::kCustodial;
00261 Time e = t;
00262 if ( 4 == nargs )
00263 {
00264 e += m_bundleLifetime;
00265 }
00266 else
00267 {
00268 e += parse_time(args[4]);
00269 }
00270 DTN::Bundle* b =
00271 new DTN::Bundle( m_node.nextSeq(),
00272 addr(),
00273 dest->addr(),
00274 dat,
00275 t.tv,
00276 e.tv,
00277 type );
00278 if ( 0 != b )
00279 {
00280 Clock::schedule( new BundleEvent(this,this,t,t,b,true) );
00281 }
00282 }
00283 else if ( "datafake" == command )
00284 {
00285 if ( nargs < 4 )
00286 {
00287 std::cerr << "emit(t,\"datafake\") takes at least 4 arguments ("
00288 << nargs << " given)"
00289 << std::endl;
00290 throw SimulationException( __FILE__ , __LINE__ );
00291 }
00292
00293 WrapNode* dest = parse_node( args[2] );
00294 if ( 0 == dest )
00295 {
00296 std::cerr << "The third argument of emit() must be a node"
00297 << std::endl;
00298 throw SimulationException( __FILE__ , __LINE__ );
00299 }
00300
00301 unsigned long int dat = parse_long(args[3]);
00302
00303 DTN::Bundle::BundleType type =
00304 DTN::Bundle::kData | DTN::Bundle::kCustodial;
00305 Time e = t;
00306 if ( 4 == nargs )
00307 {
00308 e += m_bundleLifetime;
00309 }
00310 else
00311 {
00312 e += parse_time(args[4]);
00313 }
00314 DTN::Bundle* b =
00315 new MockBundle( m_node.nextSeq(),
00316 addr(),
00317 dest->addr(),
00318 dat,
00319 t.tv,
00320 e.tv,
00321 type );
00322 if ( 0 != b )
00323 {
00324 Clock::schedule( new BundleEvent(this,this,t,t,b,true) );
00325 }
00326 }
00327 else if ( "bcast" == command )
00328 {
00329 if ( nargs < 3 )
00330 {
00331 std::cerr << "emit(t,\"bcast\") takes at least 3 arguments ("
00332 << nargs << " given)"
00333 << std::endl;
00334 throw SimulationException( __FILE__ , __LINE__ );
00335 }
00336
00337 std::string dataString = parse_string(args[2]);
00338 DTN::ByteString dat;
00339 for ( unsigned int i = 0 ; i < dataString.length() ; ++i )
00340 {
00341 dat += dataString[i];
00342 }
00343 DTN::Bundle::BundleType type = DTN::Bundle::kData | DTN::Bundle::kBcast;
00344 Time e = t;
00345 if ( 3 == nargs )
00346 {
00347 e += m_bundleLifetime;
00348 }
00349 else
00350 {
00351 e += parse_time(args[3]);
00352 }
00353 DTN::Bundle* b =
00354 new DTN::Bundle( m_node.nextSeq(),
00355 addr(),
00356 addr(),
00357 dat,
00358 t.tv,
00359 e.tv,
00360 type );
00361 if ( 0 != b )
00362 {
00363 Clock::schedule( new BundleEvent(this,this,t,t,b,false) );
00364 }
00365 }
00366 else if ( "bundle" == command )
00367 {
00368 if ( ! PydtnBundle_Check(args[2]) )
00369 {
00370 PyErr_SetString(PyExc_TypeError,
00371 "pydtn.Bundle expected as third argument");
00372 throw SimulationException( __FILE__ , __LINE__ );
00373 }
00374 PydtnBundle* pb = (PydtnBundle*)(args[2]);
00375 if ( 0 != pb->b )
00376 {
00377 Clock::schedule( new BundleEvent(this,this,t,t,pb->b->clone(),true) );
00378 }
00379 }
00380 else
00381 {
00382 std::cerr << "Unrecognized event: " << command << std::endl;
00383 throw SimulationException( __FILE__ , __LINE__ );
00384 }
00385 }
00386
00387 InterpreterItem
00388 WrapNode::get( const ArgList& args )
00389 {
00390 unsigned int nargs = args.size();
00391 if ( 0 == nargs )
00392 {
00393 std::cerr << "No attribute specified" << std::endl;
00394 throw SimulationException( __FILE__ , __LINE__ );
00395 }
00396
00397 std::string attr = parse_string(args[0]);
00398
00399 if ( "addr" == attr )
00400 {
00401 return construct_string( addrString() );
00402 }
00403 if ( "bundle_lifetime" == attr )
00404 {
00405 double t =
00406 m_bundleLifetime.tv.tv_sec +
00407 1.0 * m_bundleLifetime.tv.tv_usec / Time::MILLION;
00408 return construct_double( t );
00409 }
00410 if ( "resend_period" == attr )
00411 {
00412 double t =
00413 m_resendPeriod.tv.tv_sec +
00414 1.0 * m_resendPeriod.tv.tv_usec / Time::MILLION;
00415 return construct_double( t );
00416 }
00417 if ( ( "volatile_capacity" == attr ) ||
00418 ( "queue_capacity" == attr ) )
00419 {
00420 return construct_long( m_node.volatileCap() );
00421 }
00422 if ( ( "persistent_capacity" == attr ) ||
00423 ( "stable_capacity" == attr ) )
00424 {
00425 return construct_long( m_node.persistentCap() );
00426 }
00427 if ( ( "volatile_used" == attr ) ||
00428 ( "queue_used" == attr ) )
00429 {
00430 ArgList alist;
00431 alist.push_back( construct_long( m_node.usedVolatileCap() ) );
00432 alist.push_back( construct_long( m_node.volatileBundles() ) );
00433 return construct_list( alist );
00434 }
00435 if ( ( "persistent_used" == attr ) ||
00436 ( "stable_used" == attr ) )
00437 {
00438 ArgList alist;
00439 alist.push_back( construct_long( m_node.usedPersistentCap() ) );
00440 alist.push_back( construct_long( m_node.persistentBundles() ) );
00441 return construct_list( alist );
00442 }
00443
00444 if ( "forwarding" == attr )
00445 {
00446 std::cerr << "not yet implemented" << std::endl;
00447 Entity::ArgList subList( args.begin()+1, args.end() );
00448 }
00449 if ( "storage" == attr )
00450 {
00451 std::cerr << "not yet implemented" << std::endl;
00452 }
00453 if ( "custody" == attr )
00454 {
00455 std::cerr << "not yet implemented" << std::endl;
00456 }
00457
00458
00459 return Entity::get(args);
00460 }
00461
00462 void
00463 WrapNode::addLink( DTN::Link* l )
00464 {
00465 if ( 0 == l ) return;
00466 m_node.addLink(l);
00467 }
00468
00469 void
00470 WrapNode::addConsumer( DTN::Consumer* c )
00471 {
00472 if ( 0 == c ) return;
00473 if ( 0 == m_consumers )
00474 {
00475 m_node.setConsumer(c);
00476 return;
00477 }
00478 m_consumers->addConsumer(c);
00479 }
00480
00481 void
00482 WrapNode::addApplication( DTN::Application* a )
00483 {
00484 if ( 0 == a ) return;
00485 a->setOwner(&m_node);
00486 addConsumer(a);
00487 }
00488
00489 void
00490 WrapNode::addCollector( NodeCollector* c )
00491 {
00492 if ( 0 == c ) return;
00493 m_collectors.push_back(c);
00494 }
00495
00496 void
00497 WrapNode::setVolatileStorePolicy( DTN::VolatileStorePolicy* p )
00498 {
00499 if ( 0 == p ) return;
00500 m_node.setVolatileStorePolicy(p);
00501 }
00502
00503 void
00504 WrapNode::setForwardingPolicy( DTN::ForwardingPolicy* p )
00505 {
00506 if ( 0 == p ) return;
00507 m_node.setForwardingPolicy(p);
00508 }
00509
00510 void
00511 WrapNode::setVolatileStore( DTN::VolatileBundleStore* s )
00512 {
00513 if ( 0 == s ) return;
00514 m_node.setVolatileStore(s);
00515 }
00516
00517 void
00518 WrapNode::setPersistentStore( DTN::PersistentBundleStore* s )
00519 {
00520 if ( 0 == s ) return;
00521 m_node.setPersistentStore(s);
00522 }
00523
00524 bool
00525 WrapNode::handler( BundleEvent& event )
00526 {
00527 if ( event.storeLocally() )
00528 {
00529 if ( ! m_node.addPersistent(event.data()) )
00530 {
00531 m_node.drop( event.pData() );
00532 return false;
00533 }
00534 }
00535 if ( ( event.source() == this ) &&
00536 ( m_node.addr() == event.data().send() ) )
00537 {
00538 if ( event.data().type() & DTN::Bundle::kBcast )
00539 {
00540 m_node.broadcast(event.pData());
00541 }
00542 else
00543 {
00544 m_node.forward(event.pData());
00545 }
00546 }
00547 else
00548 {
00549 if ( ( m_node.addr() == event.data().recv() ) ||
00550 ( event.data().type() & DTN::Bundle::kBcast ) )
00551 {
00552 m_node.recv(event.pData());
00553 }
00554 else
00555 {
00556
00557 return true;
00558 }
00559 }
00560 if ( 0 != m_node.usedPersistentCap() )
00561 {
00562 if ( ! m_resending )
00563 {
00564 m_resending = true;
00565 if ( m_resendPeriod > Time(0,0) )
00566 {
00569 Clock::schedule(
00570 new ResendEvent( this,
00571 this,
00572 Clock::time() + m_resendPeriod ) );
00573 }
00574 }
00575 }
00576 return true;
00577 }
00578
00579 bool
00580 WrapNode::handler( ResendEvent& event )
00581 {
00582
00583
00584 m_node.retryStored();
00585 if ( 0 == m_node.usedPersistentCap() )
00586 {
00587 m_resending = false;
00588 }
00589 else
00590 {
00591 if ( m_resendPeriod > Time(0,0) )
00592 {
00593 Time t = Clock::time() + m_resendPeriod;
00594 Clock::schedule( new ResendEvent(this,this,t) );
00595 }
00596 }
00597 return true;
00598 }
00599
00600 void
00601 WrapNode::finalize() const
00602 {
00603 collect();
00604 }
00605
00606 void
00607 WrapNode::collect() const
00608 {
00609 CollectorList::const_iterator itr = m_collectors.begin();
00610 CollectorList::const_iterator end = m_collectors.end();
00611 for ( ; end != itr ; ++itr )
00612 {
00613 if ( 0 != (*itr) ) (**itr)(*this);
00614 }
00615 }
00616
00617 void
00618 WrapNode::setAddr( const DTN::ByteString& a )
00619 {
00620 m_node.setAddr(a);
00621 }
00622
00623 const DTN::ByteString&
00624 WrapNode::addr() const
00625 {
00626 return m_node.addr();
00627 }
00628
00629 const std::string&
00630 WrapNode::addrString() const
00631 {
00632 if ( 0 == m_addrString.length() )
00633 {
00634 const DTN::ByteString& b = m_node.addr();
00635 bool printable = true;
00636 for ( unsigned int i = 0 ; i < b.length() ; ++i )
00637 {
00638 if ( !isprint(b[i]) ) printable = false;
00639 if ( isspace(b[i]) ) printable = false;
00640 }
00641 if ( ! printable )
00642 {
00643 m_addrString += "0x";
00644 }
00645 for ( unsigned int i = 0 ; i < b.length() ; ++i )
00646 {
00647 if ( printable )
00648 {
00649 m_addrString += b[i];
00650 }
00651 else
00652 {
00653 char tmp[3];
00654 ::sprintf(tmp,"%02X",b[i]);
00655 m_addrString += tmp[0];
00656 m_addrString += tmp[1];
00657 }
00658 }
00659 }
00660 return m_addrString;
00661 }
00662
00663 void
00664 WrapNode::forwardOn( DTN::Link& l )
00665 {
00666 m_node.forwardOn(l);
00667 }
00668
00669 void
00670 WrapNode::drop( DTN::Bundle* b, const DTN::DropCause& c )
00671 {
00672 if ( 0 == b ) return;
00673 m_node.drop(b,c);
00674 }
00675
00676 size_t
00677 WrapNode::volatileBundles() const
00678 {
00679 return m_node.volatileBundles();
00680 }
00681
00682 DTN::BundlePointer
00683 WrapNode::cachedVolatile()
00684 {
00685 return m_node.cachedVolatile();
00686 }
00687
00688 const DTN::BundlePointer
00689 WrapNode::cachedVolatile() const
00690 {
00691 return m_node.cachedVolatile();
00692 }
00693
00694 size_t
00695 WrapNode::persistentBundles() const
00696 {
00697 return m_node.persistentBundles();
00698 }
00699
00700 DTN::BundlePointer
00701 WrapNode::cachedPersistent()
00702 {
00703 return m_node.cachedPersistent();
00704 }
00705
00706 const DTN::BundlePointer
00707 WrapNode::cachedPersistent() const
00708 {
00709 return m_node.cachedPersistent();
00710 }