EpidemicApp.cc

00001 // Copyright 2008 Michael Marsh, University of Maryland.
00002 //
00003 // This file is part of pydtn.
00004 //
00005 // pydtn is free software: you can redistribute it and/or modify
00006 // it under the terms of the GNU General Public License as published by
00007 // the Free Software Foundation, either version 3 of the License, or
00008 // (at your option) any later version.
00009 //
00010 // pydtn is distributed in the hope that it will be useful,
00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013 // GNU General Public License for more details.
00014 //
00015 // You should have received a copy of the GNU General Public License
00016 // along with pydtn.  If not, see <http://www.gnu.org/licenses/>.
00017 //
00018 // The views and conclusions contained in the software and documentation
00019 // are those of the authors and should not be interpreted as representing
00020 // official policies, either expressed or implied, of the University
00021 // of Maryland.
00022 //
00023 // pydtn extends and embeds the Python interpreter, which is
00024 // Copyright 2001-2006 Python Software Foundation, All Rights Reserved,
00025 // and is released under the PSF License Agreement.
00026 //
00027 // RANLUX random number generation uses the Boost library,
00028 // Copyright 1994-2006 by various authors (details in individual files),
00029 // which is released under the Boost Software License, Version 1.0.
00030 
00031 #include "EpidemicApp.h"
00032 
00033 #include "simlpy/interpreter_hooks.h"
00034 #include "simlpy/SimulationException.h"
00035 #include "dtn/Node.h"
00036 #include "pydtn/parsers.h"
00037 #include <iostream>
00038 
00039 using namespace BasicEpidemic;
00040 
00041 // Bookkeeping: initialize static data
00042 DTN::ByteString EpidemicApp::kIdentifier((unsigned char*)"basicepidemic");
00043 unsigned char EpidemicApp::kSummaryVector = 0;
00044 unsigned char EpidemicApp::kDataRequest = 1;
00045 
00046 // Construct the protocol instance.  See python_init.cc for actual
00047 // protocol instantiation.
00048 EpidemicApp::EpidemicApp( Mobility::MobileNode* n,
00049                           DTN::Link* l,
00050                           const struct timeval& t ) :
00051    MobileForwarding::Protocol(n,l,t)
00052 {
00053 }
00054 
00055 // Clean up
00056 EpidemicApp::~EpidemicApp()
00057 {
00058    LinkMap::iterator itr = m_linkMap.begin();
00059    LinkMap::iterator end = m_linkMap.end();
00060    for ( ; itr != end ; ++itr )
00061    {
00062       if ( 0 != itr->second )
00063       {
00064          delete itr->second;
00065       }
00066    }
00067 }
00068 
00069 
00070 //======================================================================
00071 // Define the MobileForwarding::Protocol methods.
00072 //
00073 // We're going to do this in the order that they're listed in
00074 // Protocol.h, so that it's easier to follow.
00075 //======================================================================
00076 
00077 // Return the unique identifier, which we've stored as static data.
00078 // This could have been inlined in the header (and thus been more
00079 // efficient), but for clarity we put it here.
00080 const DTN::ByteString&
00081 EpidemicApp::appID() const
00082 {
00083    return kIdentifier;
00084 }
00085 
00086 // We've seen a new neighbor.  This will require exchanging state.
00087 void
00088 EpidemicApp::newNeighbor( const DTN::ByteString& addr )
00089 {
00090    if ( 0 == m_owner ) return;
00091 
00092    // Make sure there's a mock link for this address.
00093    if ( m_linkMap.end() == m_linkMap.find(addr) )
00094    {
00095       m_linkMap[addr] = new Mobility::MockLink(link(),addr);
00096    }
00097 
00098    // Construct the summary vector.
00099    DTN::ByteString data;
00100    data.push_back( kSummaryVector );
00101    DTN::BundlePointer itr = m_owner->cachedPersistent();
00102    for ( ; ! itr.isNull() ; itr = itr.next() )
00103    {
00104       data.append( digest(itr) );
00105    }
00106    if ( 1 == data.length() )
00107    {
00108       // We're not storing anything, so there's no point in sending the
00109       // summary vector.
00110       return;
00111    }
00112 
00113    if ( verbosity() > 1 )
00114    {
00115       std::cout << "sending summary vector from "
00116                 << stringify(m_owner->addr())
00117                 << " to "
00118                 << stringify(addr)
00119                 << " ["
00120                 << stringify(data)
00121                 << "]"
00122                 << std::endl;
00123    }
00124 
00125    // Create a bundle to carry the message.
00126    struct timeval now = DTN::dtn_time();
00127 
00128    struct timeval exp = DTN::dtn_time();
00129    exp.tv_sec += lifetime().tv_sec;
00130    exp.tv_usec += lifetime().tv_usec;
00131    while ( exp.tv_usec > 1000000 )
00132    {
00133       exp.tv_usec -= 1000000;
00134       exp.tv_sec += 1;
00135    }
00136 
00137    DTN::Bundle* b = new DTN::Bundle( m_owner->nextSeq(),
00138                                      m_owner->addr(),
00139                                      addr,
00140                                      data,
00141                                      now,
00142                                      exp,
00143                                      DTN::Bundle::kData,
00144                                      appID() );
00145    b->recv() = addr; // We have to set the receiver now.
00146 
00147    // Send the message via the node.
00148    m_owner->forward(b);
00149 }
00150 
00151 // We received a protocol message from another node.
00152 //
00153 // There are two types of protocol messages that we might receive:
00154 // 1) summary vectors
00155 // 2) data requests
00156 void
00157 EpidemicApp::process( const DTN::Bundle& b )
00158 {
00159    if ( 0 == m_owner ) return;
00160 
00161    // All we have is a message type, which means there's no meaningful
00162    // content.
00163    if ( b.payload().length() < 2 ) return;
00164 
00165    // If we've received a summary vector, look through it for
00166    // bundles we haven't seen and send a data request.
00167    if ( kSummaryVector == b.payload()[0] )
00168    {
00169       if ( verbosity() > 1 )
00170       {
00171          std::cout << "processing a summary vector at "
00172                    << stringify( m_owner->addr())
00173                    << " from "
00174                    << stringify( b.source() )
00175                    << std::endl;
00176       }
00177 
00178       // Break the summary vector into distinct message digests.
00179       DigestList* dl = parseSummaryVector( b.payload().substr(1) );
00180       if ( 0 == dl ) return;
00181 
00182       // Compose the request.  If we don't have the message with a
00183       // specified digest, add the digest to our request list.
00184       DTN::ByteString data;
00185       data.push_back( kDataRequest );
00186       DigestList::iterator itr = dl->begin();
00187       DigestList::iterator end = dl->end();
00188       bool haveReq = false;
00189       for ( ; itr != end ; ++itr )
00190       {
00191          // Look in the data store for this message.
00192          DTN::BundlePointer bp = storedBundle(*itr);
00193          if ( bp.isNull() )
00194          {
00195             // We don't have it, so add it to the request.
00196             haveReq = true;
00197             data.append( *itr );
00198          }
00199       }
00200       delete dl;
00201 
00202       // If there was nothing of interest in the summary vector, we
00203       // have nothing more to do.
00204       if ( ! haveReq ) return;
00205 
00206       if ( verbosity() > 1 )
00207       {
00208          std::cout << "sending data request from "
00209                    << stringify(m_owner->addr())
00210                    << " to "
00211                    << stringify(b.source())
00212                    << " ["
00213                    << stringify(data)
00214                    << "]"
00215                    << std::endl;
00216       }
00217 
00218       // Construct the bundle with the protocol's data request.
00219       struct timeval now = DTN::dtn_time();
00220 
00221       struct timeval exp = DTN::dtn_time();
00222       exp.tv_sec += lifetime().tv_sec;
00223       exp.tv_usec += lifetime().tv_usec;
00224       while ( exp.tv_usec > 1000000 )
00225       {
00226          exp.tv_usec -= 1000000;
00227          exp.tv_sec += 1;
00228       }
00229 
00230       DTN::Bundle* pB = new DTN::Bundle( m_owner->nextSeq(),
00231                                          m_owner->addr(),
00232                                          b.source(),
00233                                          data,
00234                                          now,
00235                                          exp,
00236                                          DTN::Bundle::kData,
00237                                          appID() );
00238       // We have to set the receiver now.
00239       pB->recv() = b.source();
00240 
00241       // Send the message via the node.
00242       m_owner->forward(pB);
00243       return;
00244    }
00245 
00246    // If we've received a data request, enqueue the specified bundles.
00247    if ( kDataRequest == b.payload()[0] )
00248    {
00249       // Break the summary vector into distinct message digests.
00250       DigestList* dl = parseSummaryVector( b.payload().substr(1) );
00251       if ( 0 == dl ) return;
00252 
00253       // Go through the specified bundles, scheduling them to be sent
00254       // to the node that sent us this request.
00255       DigestList::iterator itr = dl->begin();
00256       DigestList::iterator end = dl->end();
00257       for ( ; itr != end ; ++itr )
00258       {
00259          // Look in the data store for the message.
00260          DTN::BundlePointer bp = storedBundle(*itr);
00261          if ( ! bp.isNull() )
00262          {
00263             // Copy the bundle.
00264             DTN::Bundle* pB = bp.repr()->bundle()->clone();
00265             // Set the next-hop receiver to the requesting node.
00266             pB->recv() = b.source();
00267             // Enqueue the bundle.
00268             m_owner->forward(pB);
00269          }
00270       }
00271       delete dl;
00272 
00273       return;
00274    }
00275 
00276    // The message type was invalid.  We'll ignore this.
00277 }
00278 
00279 // Retrieve the aliased link for a particular destination.
00280 DTN::Link*
00281 EpidemicApp::getLink( const DTN::ByteString& addr )
00282 {
00283    LinkMap::iterator itr = m_linkMap.find(addr);
00284    if ( m_linkMap.end() == itr ) return 0;
00285    return itr->second;
00286 }
00287 
00288 // Make sure the no-ACK flag is set.
00289 void
00290 EpidemicApp::tweak( DTN::Bundle* b )
00291 {
00292    if ( 0 == b )
00293    {
00294       throw SimulationException( __FILE__ , __LINE__ );
00295    }
00296    b->type() |= DTN::Bundle::kNoACK;
00297 }
00298 
00299 
00300 //==================
00301 // Utility methods
00302 //==================
00303 
00304 // Compute the digest for a message.
00305 DTN::ByteString
00306 EpidemicApp::digest( const DTN::BundlePointer& p )
00307 {
00308    DTN::ByteString retval;
00309    if ( p.isNull() ) return retval;
00310    const DTN::Bundle* b = p.repr()->bundle();
00311    unsigned char n = 4 + b->source().length();
00312    retval.push_back(n);
00313    union
00314    {
00315          uint32_t i;
00316          unsigned char c[4];
00317    } seq;
00318    seq.i = b->seqNum();
00319    retval.append(seq.c,4);
00320    retval.append(b->source());
00321    return retval;
00322 }
00323 
00324 // Split a summary vector into individual message digests.
00325 EpidemicApp::DigestList*
00326 EpidemicApp::parseSummaryVector( const DTN::ByteString& sv )
00327 {
00328    DigestList* retval = 0;
00329    if ( 0 == sv.length() ) return retval;
00330    retval = new DigestList;
00331    for ( unsigned int i = 0 ; i < sv.length() ; )
00332    {
00333       unsigned char c = sv[i];
00334       retval->push_back( DTN::ByteString(sv,i,c+1) );
00335       i += c+1;
00336    }
00337    return retval;
00338 }
00339 
00340 // Find the stored bundle corresponding to a digest.
00341 DTN::BundlePointer
00342 EpidemicApp::storedBundle( const DTN::ByteString& d )
00343 {
00344    union
00345    {
00346          uint32_t i;
00347          unsigned char c[4];
00348    } seqNum;
00349    seqNum.c[0] = d[1];
00350    seqNum.c[1] = d[2];
00351    seqNum.c[2] = d[3];
00352    seqNum.c[3] = d[4];
00353    
00354    DTN::ByteString sender = d.substr(5);
00355 
00356    return m_owner->cachedPersistent( sender, seqNum.i );
00357 }

Generated on Mon Mar 24 11:15:45 2008 for Pydtn Simulator by  doxygen 1.5.4