WWW 2008 / Refereed Track: XML and Web Data - XML II April 21-25, 2008 · Beijing, China XML Data Dissemination using Automata on Top of Structured Overlay Networks Iris Miliaraki iris@di.uoa.gr Zoi Kaoudi zoi@di.uoa.gr Manolis Koubarakis koubarak@di.uoa.gr Dept. of Informatics and Telecommunications National and Kapodistrian University of Athens Athens, Greece ABSTRACT We present a novel approach for filtering XML documents using nondeterministic finite automata and distributed hash tables. Our approach differs architecturally from recent prop osals that deal with distributed XML filtering; they assume an XML broker architecture, whereas our solution is built on top of distributed hash tables. The essence of our work is a distributed implementation of YFilter, a state-of-the-art automata-based XML filtering system on top of Chord. We exp erimentally evaluate our approach and demonstrate that our algorithms can scale to millions of XPath queries under various filtering scenarios, and also exhibit very good load balancing prop erties. Categories and Subject Descriptors H.3.4 [Information Systems]: Systems and Software-- Distributed Systems ; H.2.3 [Information Systems]: Languages--Data description languages (DDL), query languages ; F.1.1 [Theory of Computation]: Models of Computation--Automata (e.g., finite, push-down, resource-bounded) General Terms Algorithms, Design, Exp erimentation 1. INTRODUCTION Publish/subscrib e systems have emerged in recent years as a promising paradigm for offering various p opular notification services including news monitoring, e-commerce site monitoring, blog monitoring and web feed dissemination (RSS). Since XML is widely used as the standard format for data exchange on the Web, a lot of research has focused on designing efficient and scalable XML filtering systems. In XML filtering systems, subscrib ers submit continuous queries expressed in XPath/XQuery to the system asking to b e notified whenever the query is satisfied by incoming XML data. In recent years, many approaches have b een presented for providing efficient filtering of XML data against large sets of continuous queries within a centralized server [5, 15, 10, 9, 23, 21, 25, 28]. However, in order to offer XML This work was supp orted by Microsoft Research through its PhD Scholarship Programme. Copyright is held by the International World Wide Web Conference Committee (IW3C2). Distribution of these papers is limited to classroom use, and personal use by others. WWW 2008, April 21­25, 2008, Beijing, China. ACM 978-1-60558-085-2/08/04. filtering functionality on Internet-scale and avoid the typical problems of centralized solutions, we need to deploy such a service in a distributed environment. Consequently, systems like [29, 12, 17, 16, 20, 32, 11] have employed distributed content-based routing protocols to disseminate XML data over a network of XML brokers or routers. XML brokers are organized in mesh or tree-based overlay networks [29, 16], and disseminate XML data using information stored in their routing tables. In the ONYX system [16], each broker has a broadcast tree for reaching all other brokers in the network, and also uses a routing table for forwarding messages only to brokers that are interested in the messages. The routing tables in ONYX are instances of the YFilter engine [15]. Other prop osals like [20] employ summarization techniques like Bloom Filters for summarizing the queries included in the routing tables. A ma jor weakness of previous prop osals for offering distributed content-based XML data dissemination is that they do not deal with how much processing load is imp osed on each XML broker resulting in load imbalances. The processing load of a broker includes b oth filtering of incoming XML data, and delivering notifications to interested users. Unbalanced load can cause a p erformance deterioration as the size of the query set increases, resulting in a fraction of p eers to b ecome overloaded. In ONYX [16], a centralized comp onent is resp onsible for assigning queries and data sources to the brokers of the network using criteria like top ological distances and bandwidth availability in order to minimize latencies but without dealing with load balancing. Other systems like [20] do not deal at all with the amount of load managed by each broker. However, load balancing in a distributed setting can b e crucial for achieving scalability. Another weakness of previous systems [29, 12, 17, 16, 20, 11] is that the size of the routing tables is dep endent on the numb er of the indexed queries. Therefore, as the numb er of indexed queries increases, so does the size of the routing tables making their maintenance a b ottleneck in the system. In this pap er, we deal with the aforementioned problems in XML dissemination systems and prop ose an alternative architecture that exploits the p ower of distributed hash tables (DHTs) (in our case, Chord [30]) to achieve XML filtering functionality on Internet-scale. DHTs have b een used recently with much success to implement publish/subscrib e systems for attribute-value data models [7, 22, 31]. We base our work on DHTs since we are interested in XML filtering systems that will run on large collections of loosely maintained, heterogeneous, unreliable machines spread through- 855 WWW 2008 / Refereed Track: XML and Web Data - XML II out the Internet. One can imagine that collections of such machines may b e used in the future to run non-commercial XML-based public filtering/alerting services for community systems like CiteSeer, blogs etc. The main idea of our approach is to adopt the successful automata-based XML filtering engine YFilter [15] and study its implementation on top of a DHT. We show how to construct, maintain and execute a nondeterministic finite automaton (NFA) which encodes a set of XPath queries on top of a DHT. This distributed NFA is maintained by having p eers b eing resp onsible for overlapping fragments of the corresp onding NFA. The size of these fragments is a tunable system parameter that allows us to control the amount of generated network traffic and load imp osed on each p eer. The main contributions of this pap er are the following: · We describ e a set of DHT-based protocols for efficient filtering of XML data on very large sets of XPath queries. Our approach overcomes the weaknesses of typical content-based XML dissemination systems built on top of mesh or tree-based overlays. In particular, there is no need for a centralized comp onent for assigning queries to network p eers, since queries are distributed among the p eers using the underlying DHT infrastructure (Section 4). Additionally, p eers maintain routing tables of constant size regardless of the numb er of indexed queries in the system. · We utilize the successful automata-based XML filtering engine YFilter [15] and study its implementation on top of a DHT. We present and evaluate two methods for executing the NFA (Section 5). In the iterative method a publisher p eer is resp onsible for managing the execution of the NFA while states are retrieved from other network p eers. The recursive method exploits the inherent parallelism of an NFA and executes several active paths of the NFA in parallel. The recursive approach preprocesses input XML documents and enriches them with p ositional information to achieve efficient parallel execution. Our focus is mainly on the recursive method which outp erforms the iterative one in terms of network latency, while generating the same or even less network traffic. · We show how to balance the storage load and the filtering load of the network p eers without requiring any kind of centralized control (Section 6.4). To achieve this, we utilize two complementary techniques: virtual nodes [30] and load-shedding [19, 31]. · We present an exp erimental evaluation for studying the feasibility of our approach. We demonstrate that our techniques scale to millions of XPath queries for various workloads (Section 6). 1: /dblp/phdthesis/year 1 q2: /dblp/www/title q3: /dblp/proceedings/school q4: /dblp/proceedings/title q5: /dblp/*/author q6: //*/cite April 21-25, 2008 · Beijing, China {q1} sear p hd e th sis 8 7 {q2} { 5 {q3} 8 4 title t l choo y itle w ww roceedings dblp e 0 * q 1* 1 p 2 1 3 {q4} 1 3 6 {q5} 10 3 q6} * 6 9 uthor 12 a cite 3 Figure 1: An example NFA constructed from a set of XPath queries or a value and each edge represents relationships b etween nodes such as an element - sub element relationship. XPath [13] is a language for navigating through the tree structure of an XML document. XPath treats an XML document as a tree and offers a way to select paths of this tree. Each XPath expression consists of a sequence of location steps. We consider location steps of the following form: axis nodetest [pr edicate1 ] . . . [pr edicaten ] where axis is a child (/) or a descendant (//) axis, nodetest is the name of the node or the wildcard character "*", and pr edicatei is a predicate in a list of zero or more predicates used to refine the selection of the node. We supp ort predicates of the form v alue1 oper ator v alue2 where v alue1 is an element name, an attribute name or the position() function, oper ator is one of the basic logical comparison op erators {=, >, >=, <, <=, <>}, and v alue2 is an element value, an attribute value or a numb er referring to the p osition of an element. A linear path query q is an expression of the form l1 l2 . . . ln , where each li is a location step. In this pap er queries are written using this subset of XPath, and we will refer to such queries as path queries or XPath queries interchangeably. Queries containing branches can b e managed by our algorithms by splitting them into a set of linear path queries. Example path queries for the DBLP XML database [1] are: q1: /dblp/phdthesis/[year=2007] which selects PhD theses published in year 2007. q2: /dblp/*/author[@name="John Smith"] which selects any publication of author John Smith. q3: //*/[school="University of Athens"] which selects any publication written by authors from the University of Athens. 2. BACKGROUND 2.2 Nondeterministic Finite Automata A nondeterministic finite automaton (NFA) is a 5-tuple A = (Q, , , q0 , F ), where Q is a finite set of states, is a finite set of input symb ols, q0 Q is the start state, F Q is the set of accepting states and , the transition function, is a function that takes as arguments a state in Q and a memb er of {} and returns a subset of Q [24]. Any path query can b e transformed into a regular expression and consequently there exists an NFA that accepts In this section, we give a very short introduction to the XML data model, the subset of XPath we allow, nondeterministic finite automata and distributed hash tables. 2.1 XML and XPath An XML document can b e represented using a rooted, ordered, lab eled tree where each node represents an element 856 WWW 2008 / Refereed Track: XML and Web Data - XML II NFA DHT network April 21-25, 2008 · Beijing, China eer p1 Peer p5 9 p hdthesis Peer p3 roceedings * blp e 0 p Peer p8 * 7 4 uthor 6 Peer p10 Peer p4 a ite c p9 p p 7 s ear 5 Peer p2 2 3 3 0 Peer p6 2 chool t y itle Peer p7 8 1 9 10 11 p1 p2 p 3 p1 p 4 p 1 9 10 10 p 8 11 10 11 3 5 247 6 7 4 56 * Peer p11 d 1 10 P p8 8 5 State key 0 1 2 3 4 5 6 7 8 9 10 11 Successor peer p3 p5 p1 p2 p6 p7 p7 p8 p10 p11 p4 p10 6 1 Figure 2: Distributing the NFA on top of a DHT network (l=1) the language describ ed by this query [24]. Following [15], for a given set of path queries, we will construct an NFA A = (Q, , , q0 , F ) where contains element names and the wildcard (*) character, and each path query is associated with an accepting state q F . An example of this construction is depicted in Figure 1. The language L(A) of an NFA A = (Q, , , q0 , F ) is ^ L(A) = {w | (w, q0 ) F = 0}. L(A) is the set of strings ^ w in {} such that (q0 , w) contains at least one ac^ cepting state, where is the extended transition function ^ constructed from . Function takes a state q and a string of input symb ols w, and returns the set of states that the NFA is in, if it starts in state q and processes the string w. each p eer in Chord maintains a routing table of size O(log n), called the finger table. In the rest of the pap er we use Chord as the underlying DHT. However, our techniques are DHT-agnostic; they can b e implemented on any DHT that offers the standard lookup op eration. 3. NFA-BASED DISTRIBUTED INDEX We have selected to use an NFA-based model, similar to the one used in the system YFilter [15], for indexing queries in our system. The NFA is constructed from a set of XPath queries and is used as a matching engine that scans incoming XML documents and discovers matching queries. In this section and Sections 4 and 5, we will describ e in detail how the NFA corresp onding to a set of XPath queries is constructed, maintained and executed on top of Chord. The NFA corresp onding to a set of path queries is essentially a tree structure that needs to b e traversed b oth for inserting a query in the NFA, and for executing the NFA to find matches against an incoming XML document. We will distribute an NFA on top of Chord and provide an efficient way of supp orting these two basic op erations p erformed by a filtering system. Our main motivation for distributing the automaton derives from the nondeterministic nature of NFAs that allows them to b e in several states at the same time, resulting in many different parallel executions. Moreover, an NFA was preferred to an equivalent DFA for reducing the numb er of states. The states of the NFA are distributed by assigning each ^ state qi along with every other state included in (qi , w), where w is a string of length l included in {}, to a single p eer in the network. Note that l is a parameter that determines how much of the NFA is the resp onsibility of each p eer. If l = 0, each state is indexed only once at a single p eer, with the exception of states that are reached by an transition, which are also stored at the p eers resp onsible for 2.3 Distributed Hash Tables DHTs like Chord [30] have emerged as a promising way of providing a highly efficient, scalable, robust and faulttolerant infrastructure for the development of distributed applications. DHTs are structured P2P systems which try to solve the following lookup problem: given a data item x stored by a network of p eers, find x. In Chord [30], each p eer and each data item is assigned a unique m-bit identifier by using a hash function such as SHA-1. The identifier of a p eer can b e computed by hashing its IP address. For data items, we first have to select a key, and then hash this key to obtain an identifier for this data item. Identifiers are ordered on an identifier circle modulo 2m , called the Chord ring. Key k with identifier id is assigned to the first p eer whose identifier is equal to or follows id in the identifier space. This p eer is called the successor p eer of key k or identifier id, denoted by succ(k) or succ(id). Chord offers an op eration lookup(id) that returns a p ointer to the p eer resp onsible for identifier id. When a Chord p eer receives a lookup request for identifier id, it efficiently routes the request to succ(id) in O(log n) hops, where n is the numb er of p eers in the network. This b ound can b e achieved b ecause 857 WWW 2008 / Refereed Track: XML and Web Data - XML II Procedure 1: IndexQuery(): Indexing a query 1 pro cedure n.IndexQuery(q, d, s) 2 st is state at depth d of q.NFA; 3 if n.states does not contain st then add st to n.states; // final state 4 if d = q.NFA.depth() then add q to st.quer ies; 5 else 6 t := transition lab el at depth d; 7 if there is a transition labeled t from st to st then 8 nextP eer := Lookup(st .key ); 9 nextP eer .IndexQuery(q, d + 1, s); 10 else 11 add a transition lab eled t from st to a new state st ; 12 if (t = ) then 13 st .self child:=true; 14 end 15 st .key := st.key + t; 16 nextP eer := Lookup(st .key ); 17 nextP eer .IndexQuery(q, d + 1, s); 18 end 19 end 20 end pro cedure April 21-25, 2008 · Beijing, China Pro cedure 2: PublishDocument(): Publishing an XML document - Iterative way 1 pro cedure n.PublishDocument(doc) 2 add star tS tate to activ eS tates r untimeS tack.push(activ eS tates); 3 foreach ev ent from parsing doc do 4 if ev ent is a startElement then 5 foreach st in activ eS tates do 6 add st.quer ies to satisf iedQuer ies; 7 nextP eer := Lookup(st.key ); 8 states := nextP eer .ExpandState(st, ev ent, my I d ) ; 9 add states to tar getS tates; 10 end 11 else 12 r untimeS tack.p op(); 13 end 14 activ eS tates := tar getS tates; 15 clear tar getS tates; 16 r untimeS tack.push(activ eS tates); 17 end 18 foreach q in satisf iedQuer ies do 19 notify subscrib er of q; 20 end 21 end pro cedure the state which contains the -transition. For larger values of l, each state is stored at a single p eer along with other states reachable from it by following a path of length l. This results in storing each state at more than one p eers. Therefore, p eers store overlapping fragments of the NFA and parameter l characterizes the size of these fragments. Each state is uniquely identified by a key and this key is used for determining the p eer that will b e resp onsible for this state. The responsible peer for state with key k is the successor p eer of H ash(k), where H ash() is the SHA-1 hash function used in Chord. The key of an automaton state is formed by the concatenation of the lab els of the transitions included in the path leading to the state. For example, the key of state 2 in Figure 1 is the string "star t"+"dblp"+"phdthesis", the key of the start state is "star t" and state 11 has key "star t"+"$", since -transitions are represented using character $. Op erator + is used to denote the concatenation of strings. 4.1 NFA construction in YFilter The construction of the NFA in YFilter is done as follows. A location step can b e represented by an NFA fragment [15]. The NFA for a path query can b e constructed by concatenating the NFA fragments of the location steps it consists of, and making the final state of the NFA the accepting state of the path query. Inserting a new query into an existing NFA requires to combine the NFA of the query with the already existing one. Formally, if L(R) is the language of the NFA already constructed by previously inserted queries, and L(S ) is the language of the NFA of the query b eing indexed, then the resulting NFA has language L(R) L(S ). To insert a new query represented by an NFA S to an existing NFA R, we start from the common start state shared by R and S and we traverse R until either the accepting state of S is reached or we reach a state for which there is no transition that matches the corresp onding transition of S . If the latter happ ens, a new transition is added to that state in R. 3.1 Peer local structures Each p eer p keeps a list, denoted by p.states, which contains the states assigned to p. Each state s included in states is associated with a data structure containing the state's identifier, the transitions from this state, including p otential self-loops and, in the case of accepting states, the identifiers and the subscrib ers of the relevant queries, in a list denoted by s.q uer ies. An example of how the NFA is distributed on top of Chord for l = 1 is depicted in Figure 2. We assume a network of 11 p eers and each state stored is depicted on the Chord ring. Notice that state 10 is included in p3 .states = {0, 1, 9, 10} b ecause the -transition does not contribute to the sp ecified length l. In the figure, we use unique integers instead of state keys for readability purp oses. 4.2 Distributed NFA construction In our work, an NFA corresp onding to a given set of path queries is constructed incrementally as queries are submitted, and it is distributed throughout the DHT. Thus, we will use the term distributed NFA to refer to it. We will now describ e how a query q is inserted into the distributed NFA. The exact steps followed are depicted in Procedure 1. Algorithms in this pap er are describ ed using a notation, where p.P r oc() means that p eer p receives a message and executes procedure P r oc(). Using Chord, the subscrib er p eer s sends a message IndexQuery(q , d, s) to p eer r , where q is the query b eing indexed in the form of an NFA, d is the current depth of the query NFA reached and s is the subscrib er p eer. Initially d = 0 and r is the p eer resp onsible for the start state, i.e. succ("star t"). Starting from this p eer, each p eer p1 that receives an IndexQuery message, checks the state st of q at depth d, retrieves the state with key st.key from its local data structure states, and checks whether st is the accepting state of q . If so, p1 inserts q in the list st.q uer ies. 4. CONSTRUCTING THE NFA To achieve the ab ove distribution of the NFA, the automaton is incrementally constructed as queries arrive in the system. We will describ e first how YFilter constructs the NFA incrementally, and then describ e how this has b een adjusted in our work. 858 WWW 2008 / Refereed Track: XML and Web Data - XML II Procedure 3: ExpandState(): Expanding a state by fol lowing transitions - Iterative way 1 pro cedure n.ExpandState(st, ev ent, publisher I d) 2 e := element name of ev ent; 3 foreach element in {e, , } do 4 st := follow transition lab eled element; 5 if st is not nul l then 6 add st to tar getS tates; 7 if element is then 8 nextP eer = LookUp(st .key ); 9 nextP eer .ExpandState (st , ev ent, my I d); 10 end 11 end 12 end 13 if st.selfState is true then add st to tar getS tates; 14 return tar getS tates; 15 end pro cedure April 21-25, 2008 · Beijing, China Pro cedure 4: RecExpandState(): Recursively expand states at each execution path - Recursive way 1 pro cedure n.RecExpandState(st, path, ev ents) 2 if st.isAcceptingState then 3 add st.quer ies to satisf iedQuer ies; 4 end 5 ev ent = ev ents.next(); 6 if ev ent.hasSiblings() then 7 siblingE v ents = ev ent.getSiblings(); 8 else 9 siblingE v ents = {ev ent}; 10 end 11 foreach e in siblingE v ents do 12 if e.isStartElement then 13 compute tar getS tates from st for input e; 14 foreach s in tar getS tates do 15 new P ath = path.add(e,s); 16 next = Lookup(s.key ); 17 next.RecExpandState(s, new P ath, ev ents); 18 end 19 end 20 foreach q in satisf iedQuer ies do 21 notify subscrib er of q; 22 end 23 end 24 end pro cedure Otherwise, let t b e the lab el of the transition from state st to a target state st as it app ears in the NFA of q . If there is no such transition from st, p1 adds a new transition to state st with lab el t with target state st . Finally, p1 sends a IndexQuery(q , d, s) message to p eer p2 with the same parameters except that depth d is increased by 1. If k is the key of state st, then k + t is the key of state st and p2 is succ(k + t). For the sake of simplicity, Procedure 1 describ es the base case for constructing the NFA when parameter l which determines the size of the NFA fragments assigned to each p eer is 0. For larger values of l, each p eer, instead of storing only the state that it is resp onsible for, it also stores a numb er of additional states defined by l. Constructing the NFA as describ ed ab ove, results in as many IndexQuery messages b eing sent to the network as the numb er of states in the NFA of q . Notice that the numb er of messages needed during the construction of the NFA is indep endent of the value of parameter l. are checked for transitions lab eled with e, wildcar d and transitions. In case of an -transition, the target state is recursively checked one more time. All active states containing a self-loop are also added to the target states. The target states are pushed into the run-time stack and b ecome the active states for the next execution step. If a EndOfElement event occurs, the top of the run-time stack is p opp ed and backtracking takes place. Execution proceeds in this way until the document has b een completely parsed. 5.2 Distributed NFA execution Let us now describ e how we execute a distributed NFA in our approach. Similarly to YFilter, we maintain a stack in order to b e able to backtrack during the execution of the NFA. For each active state, we want to retrieve all target states reached by a certain parsing event. Given that the NFA states in our approach are distributed among the p eers of the network, at each step of the execution, the relevant parsing event should b e forwarded to all the p eers resp onsible for the active states. Therefore, we can identify two ways for executing the NFA: the first proceeds in an iterative way while the other executes the NFA in a recursive fashion. In the iterative method, the publisher p eer is resp onsible for parsing the document, maintaining the run-time stack and forwarding the parsing events to the resp onsible p eers. In this case, the execution of the NFA proceeds in a similar way as in YFilter, with the exception that target states cannot b e retrieved locally but need to b e retrieved from other p eers. Procedures 2 and 3 describ e the actions required by the publisher p eer and the actions required by each p eer resp onsible for an active state. The publisher p eer p publishes a document by following the steps describ ed in procedure PublishDocument(doc) where doc is the XML document b eing published. For each active state, p sends an ExpandState(st, ev ent, publisher I d) message to p eer r = succ(st.key ), where st is the active state b eing expanded, ev ent is the current event produced by the parser and publisher I d is the identifier of the publisher p eer. At first, the only active state is the start state. 5. EXECUTING THE NFA Again, we will first describ e how YFilter op erates for executing the NFA and then describ e in detail our approach for executing a distributed NFA. 5.1 NFA execution in YFilter The NFA execution proceeds in an event-driven fashion. The XML document is parsed using a SAX parser and the produced events are fed, one event at a time, to the NFA. The parser produces events of the following typ es: StartOfElement, EndOfElement, StartOfDocument, EndOfDocument and Text. The nesting of elements in an XML document requires that when an EndOfElement event is raised, the NFA execution should backtrack to the states it was in when the corresp onding StartOfElement was raised. For achieving this, YFilter maintains a stack, called the runtime stack, while executing the NFA. Since many states can b e active at the same time in an NFA, the stack is used for tracking multiple active paths. The states placed on the top of the stack will represent the active states while the states found during each step of execution after following the transitions caused by the input event, will b e called the target states. Execution is initiated when a StartOfDocument event occurs and the start state of the NFA is pushed into the stack as the only active state. Then, each time a StartOfElement event occurs for element e, all active states 859 WWW 2008 / Refereed Track: XML and Web Data - XML II When r receives a message ExpandState, it computes the transitions from state st that is stored in its local states, and returns to the publisher p eer the set with all the target states computed. p continues the execution of the NFA until the document has b een completely parsed. p is also resp onsible for notifying the subscrib ers for all the queries satisfied. Although the iterative method p erforms p oorly due to the fact that the ma jority of the load of the system is imp osed on the publisher, we present it here for the reader's convenience (it might also b e helpful in understanding the details of the recursive method). In the iterative approach, a stack mechanism is employed for maintaining multiple active paths during NFA execution. Each active path consists of a chain of states, starting from the start state and linking it with the reached target states. The main idea of the recursive method is that these active paths will b e executed in parallel. The details of the recursive method are as follows. The publisher p eer forwards the XML document to the p eer resp onsible for the start state to initiate the execution of the NFA. The execution continues recursively, with each p eer resp onsible for an active state continuing the execution. Notice that the run-time stack is not explicitly maintained in this case, but it implicitly exists in the recursive executions of these paths. The execution of the NFA is parallelized in two cases. The first case is when the input event processed has siblings with resp ect to the p osition of the element in the tree structure of the XML document. In this case, a different execution path will b e created for each sibling event. The second case is when more than one target states result from expanding a state. Then, a different path is created for each target state, and a different p eer continues the execution for each such path. Procedure 4 describ es the actions required by each p eer resp onsible for an active state during the execution of a path. The publisher p eer p publishes an XML document by sending a message RecExpandState(st, path, ev ents) to p eer r = succ("star t"), where st is the start state of the distributed NFA, path is a stack containing only the pair consisting of the event StartOfDocument and the start state, and ev ents is the list with the parsing events. The parsing events StartOfElement and EndOfElement are enriched with a p ositional representation to efficiently check structural relationships b etween two elements. Sp ecifically, the events are enriched with the p osition of the corresp onding element with a pair (L:R,D), where L and R are generated by counting tags from the b eginning of the document until the start tag and the end tag of this element, and D is its nesting depth. The publisher p eer is resp onsible for enriching the parsing events. This representation was introduced in [14]. When p eer r which is resp onsible for state st, receives message RecExpandState, it first checks whether the next input event e in ev ents has siblings. Note that e is determined by the last event included in stack path. If this is the case, then r computes the transitions from state st for each sibling using its local data structure states. If st is a target state reached from st then r pushes the relevant sibling event along with st onto the stack path and sends a message RecExpandState(st , path, ev ents) to p eer r = succ(st .key ). Supp ose e1 , . . . , es are the sibling events and T S (e1 ), . . . , T S (es ) represent the sets with the| target states computed by each event. Then, r will send T S (ei ) | dif- April 21-25, 2008 · Beijing, China ferent messages, one for each of the different execution paths. The execution for each path continues until the document fragment has b een completely parsed. Peers that participate in the execution process are resp onsible for notifying the subscrib ers of the satisfied queries. Again, the steps describ ed for executing the NFA, refer to the case where l = 0. For larger values of l, the op erations ExpandState and RecExpandState instead of simulating the transition function , they simulate the extended transition ^ function . Note that the recursive method assumes that the XML document b eing filtered is relatively small and this is the reason for deciding to forward the whole document at each step of execution. In realistic scenarios XML documents are usually small as discussed in [6]. However, in the case we want to filter larger XML documents, our method can b e easily adjusted so that we forward smaller fragments of the document. 5.3 Predicate evaluation We evaluate the predicates included in the queries after the execution of the NFA. This technique of delaying predicate processing until after the structure of a query is matched resembles the Selection Postponed (SP) approach presented in the original YFilter prop osal [15]. An alternative approach called Inline, also presented in [15], evaluates the predicates of a query during the filtering process at each step of the NFA execution. As demonstrated in [15], SP outp erforms Inline esp ecially when queries contain a large numb er of predicates. This is mainly due to the fact that a lot of effort is sp ent evaluating predicates of queries which their structure may not b e matched against the filtering data. In future work, we plan to consider p otential sharing of value-based predicates. Works like [23] and [25] identify the need for exploiting the p otential commonalities b etween the predicates of the queries and not only structural commonalities. 6. EXPERIMENTAL EVALUATION In this section, we present an exp erimental evaluation of our algorithms using a Chord [30] simulator we implemented in Java. All exp eriments were conducted on a machine with a Pentium IV 2.99 GHz processor and 4 GB memory running the Fedora Core 4 distribution of Linux. Our goal is to demonstrate the feasibility of our approach and study its overall p erformance for various workloads. The p erformance of our approach under churn or for different parameters characterizing the underlying DHT overlay is b eyond the scop e of this pap er. We generated two different document workloads to evaluate our approach. The first one is created using a set of 10 DTDs including DBLP DTD, NITF (News Industry Text Format) DTD, ebXML (Electronic Business using eXtensible Markup Language) DTD and the Auction DTD from the XMark b enchmark [3] and will b e referred to as aggregated workload. Using this workload, we study the p erformance of our approach in a realistic scenario where users subscrib e to the same system to receive notifications concerning different interests of theirs (e.g., information ab out scientific pap ers vs. news feeds). The second workload is created using only the NITF DTD, which has also b een used in [10, 15, 25], and will b e referred to as NITF workload. We chose to run separate exp eriments using the NITF DTD b ecause 860 WWW 2008 / Refereed Track: XML and Web Data - XML II Parameter Numb er of do cuments Dataset size (MBs) Numb er of elements Numb er of attributes Numb er of queries Qu e r y d e p t h Query fanout Wildcard prob. Descendant axis prob. Skewness of element names ( ) Predicates p er query Network size Aggregated 1000 6 383 694 106 10 1 0.2 0.2 0 0 103 NITF 1000 1, 5 123 513 106 10 1 0.2 0.2 0 0 103 0000 2 NITF - Recursive NITF - Iterative April 21-25, 2008 · Beijing, China 000 2600 2 NITF - Recursive NITF - Iterative etwork traffic (messages) 1 000 8000 6000 4000 2 0 atency (hops) 1200 1 00 800 40 00K N 400K Q 00K 6 ueries 800K 1000K L 00K 400K Q 00K 6 ueries 800K 1000K (a) Network traffic (b) Latency Figure 3: Iterative vs. Recursive Table 1: Experiment parameters 00000 E 500K queries 1M queries it represents an interesting case where a large fraction of elements are allowed to b e recursive. For each workload 1000 documents are synthetically generated using the IBM XML generator [2]; 100 documents for each DTD in the aggregated workload. The path queries used in the exp eriments were generated using the XPath generator available in the YFilter release [4]. For each workload, we create two kinds of query sets: the distinct set which contains no duplicate queries and the random set which may contain duplicates. The random query set represents a more realistic scenario allowing subscrib ers to share interests. The default values of the parameters used for generating the document workloads, the query sets and creating the network for our exp eriments are shown in Table 1. The numb er of the indexed queries does not affect the size of the routing table, which for a network of n p eers is at most log n (in our case log 103 ). Note that this is not true in XML content-based dissemination systems like [16], where the maintenance of large routing tables can b ecome a b ottleneck. The average document size is 4.8Kb in the aggregated workload and 36.5Kb in the NITF workload. Note that the average document size of an XML document in the Web is only 4 Kb, while the maximum size reaches 500Kb as mentioned in the study presented in [6]. In addition, these sizes are also typical for XML data dissemination settings [10, 11]. The metrics used in the exp eriments and their definitions are as follows. The network traffic is defined as the total numb er of messages generated by network p eers while either indexing queries or filtering incoming XML documents. The filtering latency is measured in network hops as follows. While filtering an XML document, we measure the longest chain of hops needed during the execution of the NFA. In other words, we make the assumption that if a numb er of messages are sent simultaneously to a numb er of p eers, the latency is equal to the maximum numb er of hops needed for a message to reach its destination. Lastly, we define the NFA size as the total numb er of states included in the NFA. In Figure 4, we demonstrate the sizes of the NFAs for different DTDs. For instance, indexing 106 queries from the Auction DTD results in an NFA with 592199 states. Considering that in an Internet scale scenario we want to supp ort millions of path queries from several DTDs, we can see the b enefit of distributing such large NFAs to a large numb er of p eers in a DHT network which coop erate for the NFA execution. The following exp eriments are divided into four groups. The first group compares the iterative and the recursive method for executing the distributed NFA. The second group FA size (states) 650000 400000 350000 0 ~13% 1 BXML RSS 2.0 AUCTION DBLP D NITF PSD SIGMOD R ec. N TD Figure 4: NFA size per DTD studies the scalability of our approach for filtering XML data as the numb er of indexed queries increases. Then, in the third group of exp eriments we evaluate our approach as the size of the network increases. Finally, the fourth group demonstrates the effectiveness of load balancing techniques for distributing the load imp osed on the network p eers. We have also evaluated our algorithms in the case where non-determinism in the query set increases (i.e., increasing probability of wildcards and descendant axis), and also for different values of l. The results of these exp eriments will app ear in the extended version of this pap er. 6.1 Recursive vs. Iterative In this group of exp eriments, we compare the recursive and the iterative method for executing the distributed NFA. Even though the recursive method has obvious advantages over the iterative, we include the exp eriment for completeness. In this exp eriment, we created a network of 103 p eers and incrementally indexed 2105 to 106 distinct path queries. After each indexing iteration, which doubles the amount of queries we have, we published 1000 XML documents and counted the average numb er of messages and the average filtering latency for filtering this set of documents. We show the results for the NITF workload in Figure 3. We observe that the iterative method needs approximately twice more messages than the recursive one, mainly due to resp onse messages that are needed to b e sent back to the publisher. In terms of filtering latency, the recursive method always outp erforms the iterative method by more than 10 times. The reason for this is that the recursive method executes in parallel all active paths during the filtering of XML documents. Since the recursive approach always outp erforms the iterative, in the rest of our exp erimental results, we only show the p erformance of the recursive method. 861 WWW 2008 / Refereed Track: XML and Web Data - XML II 00 0 00000 April 21-25, 2008 · Beijing, China 50000 200000 2NITF-distinct AITF-random Aggregated-distinct ggregated-random ( etwork traffic es essag (messages) N 0 350000 50 0 2 NITF-distinct AITF AITre-raed om gg F gatnd Aggregated-distinct ggregated-random ( atency (hops) 00 0 70 A NI NITF TF-distinct A eg F ed ggrITat-random 2 50 20000 4000 atency (hops) 60 50 40 30 20 FA size (states) 24% ~ Aggregated-distinct ggregat(d-random e 40 250000 100000 1 0000 25000 3000 30 0 100000 1 000 ~16% ~24% 2000 20 ~16% 1 5 K 1 1 K 5 00200K 000 400K 400000 0 00000 400000 Q00000 6 ueries 800000 1000000 00000 200K 400000 400K NM Queries Q00000 6 ueries 600K 800000 800K 1000000 1000K Q00600K 6 000 Querieueries s 800K 800000 1001000K 0000 N L L a) Network traffic b) Latency Figure 5: Varying number of queries c) NFA size 0 80 1 NITF-distinct AITF-random Aggregated-distinct ggregated-random Latency (hops) F 70 60 50 40 30 20 10 to space limitations. Note that the network traffic generated during query indexing scales linearly with the length of the query automaton b eing indexed. 6.3 Varying the size of the network This set of exp eriments evaluates how the p erformance of our approach is affected when we increase the size of the network. We created networks of 103 to 5 103 p eers and indexed 106 distinct path queries. After indexing the queries, we published 1000 XML documents and counted the average numb er of messages and the average filtering latency for filtering these documents. We run two different exp eriments, one for the aggregated workload and one for the NITF workload. The results are shown in Figure 6. We observe that filtering latency remains unaffected as the size of the network increases for the same reasons explained ab ove. We do not show graphs concerning network traffic as it remains unaffected as the size of the network increases. The reason for this is that the filtering process results in roughly the same numb er of messages regardless of the network size. We rep eated the exp eriments for a random query set and we observed that the overall trends were similar. However the main motivation for using a larger network is to decrease the average load of each p eer assuming that load is equally shared among the p eers. The next section presents our techniques that allow us to ensure a balanced load in various settings. 000 2000N 3000 4000 5000 etwork size (peers) igure 6: Varying network size (Latency) 6.2 Varying the number of queries In this group of exp eriments, we study the p erformance of the system as the numb er of the indexed queries increases. We created a network of 103 p eers and incrementally indexed 2 105 to 106 distinct path queries. After each indexing iteration, which doubles the amount of queries we have, we published 1000 XML documents and counted the average numb er of messages and the average filtering latency for these documents. We run four different exp eriments: two for the aggregated workload and two for the NITF workload for b oth a random and a distinct query set. The results are shown in Figure 5. As depicted in Figure 5(a), the generated network traffic for b oth workloads scales linearly with the numb er of queries. The NITF workload results in almost twice more messages than the aggregated one mainly due to the larger size of the constructed NFA. In Figure 5(c), the constructed NFA for the NITF workload consists of almost 225000 states while the corresp onding size for the aggregated workload is 125000 states. Another factor that results in more network traffic for the random query set of NITF workload is its large document size. In Figure 5(b), we show the filtering latency for b oth workloads. We observe that the latency remains relatively unaffected as the numb er of indexed queries grows. This happ ens b ecause even though the size of the NFA increases, as more queries are added to it, the numb er of execution steps remains the same. In other words, the NFA reaches a state where the longest path corresp onding to latency remains constant. We omit the results for the generated network traffic and the latency during the indexing of queries in the network due 6.4 Load balancing We distinguish b etween two typ es of load: storage and filtering load [31, 22]. The storage load of a p eer is the total size of the states stored locally by the p eer. The size of each state is defined as the numb er of transitions associated with it. The filtering load of a p eer is the numb er of filtering requests that it processes (measured by the numb er of messages it receives during the filtering process). For achieving a balanced storage load in our system, we can use a simple load balancing scheme prop osed in Chord [30] which balances the numb er of keys p er node by associating keys with virtual nodes and then mapping multiple virtual nodes (with unrelated identifiers) to each real p eer. As demonstrated in [30], we need to allocate log N randomly chosen virtual nodes to each p eer for ensuring an equal partitioning of the identifier space b etween the p eers. We run an exp eriment for demonstrating the effectiveness of the ab ove load balancing technique for balancing the stor- 862 WWW 2008 / Refereed Track: XML and Web Data - XML II April 21-25, 2008 · Beijing, China 500 0 2000 NITF-no load balancing NITF-with load balancing 0000 1 70000 NITF - no load balancing NITF -with load balancing 2000 1 10000 Aggregated - no load balancing Aggregated -with load balancing Filtering load Filtering load Storage load 60000 50000 40000 30000 20000 1 0 100 199 298 397 496 595 694 793 892 991 Peers ranked by number of filtering requests 1 000 8000 6000 4000 2 0 100 199 298 397 496 595 694 793 892 991 Peers ranked by number of filtering requests 2500 1000 1 00 50 99 198 P 297 396 495 594 693 792 891 990 eers ranked by storage load ~13% (a) Storage load (NITF) (b) Filtering load (NITF) Figure 7: Load balancing (c) Filtering load (Aggregated) age load in our approach. We created a network of 103 p eers (each one assigned with 3 virtual nodes) and indexed 106 distinct path queries. Figures 7(a) shows the distribution of storage load across the p eers. On the x-axis of this graph, p eers are ranked starting from the p eer with the highest load. For the NITF workload, we observe that prior to load balancing the first 200 p eers received 118023 states which is 53% of total storage load and represents an unbalanced distribution. On the other hand, after assigning 3 virtual nodes to each network p eer we achieve a fairer distribution of the storage load. Particularly, the first 200 p eers received 86427 states or 38% of the total storage load. Let us now consider the case where all p eers share equal storage load. Then, if the states of the NFA are uniformly accessed, p eers would equally share filtering load. However, the case where all states of the NFA are uniformly accessed is unrealistic. This is due to the inherent tree-structure of the NFA which results in skewness during accessing the states of the NFA. Peers resp onsible for states at smaller depths of the NFA will receive more filtering load than others. Furthermore, another factor that can cause additional imbalance during traversing the NFA is the p otential skewness of elements contained in the XML document set b eing filtered. For this reason, using the virtual nodes load balancing scheme is insufficient for balancing the filtering load. For this reason, we have implemented and evaluated a load balancing method based on the concept of load-shedding used successfully in [19, 31]. The main idea is that when a p eer p b ecomes overloaded, it chooses the most frequently accessed state st and contacts a numb er of p eers requesting to replicate state st. Then, p notifies the rest of the p eers that st has b een replicated, so when a p eer needs to retrieve it, it will randomly select one of the resp onsible p eers. To demonstrate the effectiveness of load-shedding, we run the following exp eriment. We created a network of 103 p eers (each one assigned with 3 virtual nodes) and indexed 106 distinct path queries. A p eer considers itself overloaded if more than 10% of the incoming documents access the same state that it is resp onsible for and then it assigns 10 p eers to store a replica of that state. After each indexing iteration, which doubles the amount of queries we have, we publish 1000 XML documents and count the average filtering load suffered by each p eer. We run the exp eriment for b oth aggregated and NITF workloads and the resp ective results are shown in Figures 7(b) and 7(c). We observe a significant improvement in load distribution when load balancing is used. For instance, prior to load balancing the first 50 p eers re- ceive almost 20% of the total filtering requests while the last 200 p eers receive almost no load at all. In contrast, after applying the load balancing method, no p eer receives more than 0.3% of the overall filtering load. We also exp erimented with different values for b oth the numb er of replicas and the access rate that determines if a p eer is overloaded, but we did not observe any further improvements in the load distribution. 7. RELATED WORK We classify related work in the area of XML filtering in two basic categories, centralized approaches and distributed approaches. Many approaches exist for XML filtering in a centralized setting: YFilter [15] and its predecessor XFilter [5], XTrie [10], XPush [23], the BEA streaming processor [18], the work of Green et al. [21], XSQ [28], Index-Filter [9] and others. As we already said in the introduction, little attention has b een paid so far to providing a distributed Internet-scale XML filtering service. Systems like [29, 12, 17, 16, 20, 32, 11] have employed distributed content-based routing protocols to disseminate XML data over broker-based networks. XML brokers are organized in mesh or tree-based overlay networks [29, 16], and disseminate XML data using information stored in their routing tables. In the ONYX system [16], each broker has a broadcast tree for reaching all other brokers in the network and also uses a routing table for forwarding messages only to brokers that are interested in them. The routing tables in ONYX are instances of the YFilter engine. In [32], the authors concentrate on load balancing issues and present a system where XPath queries are transferred from overloaded to under-loaded servers by a centralized comp onent called XPE control server. Other techniques like [20] employ summarization techniques like Bloom Filters for summarizing the queries included in the routing tables. Also, recent works like [27, 11] focus on optimizing the functionality of each of these XML brokers. A recent system called SONNET [33], is most related to our work since it studies XML data dissemination on top of DHT. Each p eer keeps a Bloom-filter-based engine for forwarding XML packets. Load balancing is concerned with balancing the numb er of packets forwarded by each p eer regardless the size of each packet. Finally, [17] presents a hierarchical XML routing architecture based on XTrie. The authors describ e b oth a data-sharing and a filter-sharing strategy for distributing the filtering task. The authors mention that they consider load balancing strategies but they do not provide any further details regarding a sp ecific strategy. 863 WWW 2008 / Refereed Track: XML and Web Data - XML II Finally, we p oint out that there are various interesting pap ers on storing XML documents in P2P networks and executing XPath queries like [8, 19, 26]. We do not present an in-depth discussion of these pap ers since their emphasis is not on filtering algorithms. April 21-25, 2008 · Beijing, China 8. CONCLUSIONS We presented a novel XML filtering approach which distributes the state-of-the-art filtering engine YFilter [15] on top of structured overlays. Our approach differs architecturally from recent prop osals [29, 12, 17, 16, 20, 32, 11] that deal with distributed XML filtering as these systems assume broker-based architectures, whereas our solution is built on top of DHTs. Our approach is the first that achieves load balancing b etween the network p eers for XML data dissemination in a distributed setting without requiring any kind of centralized control. Additionally, p eers keep routing tables of constant sizes whereas in systems like ONYX [16] the routing tables have size dep endent on the numb er of the indexed queries causing it to b ecome a b ottleneck. We exp erimentally evaluate our approach and demonstrate that our algorithms can scale to millions of XPath queries under various filtering scenarios, and also exhibit very good load balancing prop erties. Our future work concentrates on implementing the prop osed methods on a real DHT, evaluating them on a testb ed like Planetlab and comparing them with other available systems. [1] DBLP XML records. http://dblp.uni-trier.de/xml/. [2] IBM XML Generator. http://www.alphaworks.ibm.com/tech/xmlgenerator. [3] XMark: An XML Benchmark Pro ject. http://www.xml-b enchmark.org/. [4] YFilter 1.0 release. http://yfilter.cs.umass.edu/code release.htm. [5] M. Altinel and M. J. Franklin. Efficient Filtering of XML Documents for Selective Dissemination of Information. In VLDB 2000. [6] D. Barb osa, L. Mignet, and P. Veltri. Studying the XML Web: Gathering Statistics from an XML Sample. World Wide Web, 9(2):187­212, 2006. [7] A. R. Bharamb e, M. Agrawal, and S. Seshan. Mercury: Supp orting Scalable Multi-attribute Range Queries. In SIGCOMM 2004. [8] A. Bonifati, U. Matrangolo, A. Cuzzocrea, and M. Jain. XPath Lookup Queries in P2P Networks. In WIDM 2004. [9] N. Bruno, L. Gravano, N. Koudas, and D. Srivastava. Navigation- vs. Index-Based XML Multi-Query Processing. In ICDE 2003. [10] C. Y. Chan, P. Felb er, M. N. Garofalakis, and R. Rastogi. Efficient Filtering of XML Documents with XPath Expressions. In ICDE 2002. [11] C. Y. Chan and Y. Ni. Efficient XML Data Dissemination with Piggybacking. In SIGMOD 2007. [12] R. Chand and P. A. Felb er. A Scalable Protocol for Content-Based Routing in Overlay Networks. In NCA 2003. [13] J. Clark and S. J. DeRose. XML Path Language (XPath) Version 1.0. World Wide Web Consortium, Recommendation, Novemb er 1999. 9. REFERENCES [14] M. P. Consens and T. Milo. Optimizing Queries on Files. In SIGMOD 1994. [15] Y. Diao, M. Altinel, M. J. Franklin, H. Zhang, and P. Fischer. Path Sharing and Predicate Evaluation for High-Performance XML Filtering. ACM TODS, 28(4):467­516, 2003. [16] Y. Diao, S. Rizvi, and M. J. Franklin. Towards an Internet-Scale XML Dissemination Service. In VLDB 2004. [17] P. Felb er, C.-Y. Chan, M. Garofalakis, and R. Rastogi. Scalable Filtering of XML Data for Web Services. IEEE Internet Computing, 7(1):49­57, 2003. [18] D. Florescu, C. Hillery, D. Kossmann, P. Lucas, F. Riccardi, T. Westmann, J. Carey, and A. Sundarara jan. The BEA Streaming XQuery Processor. The VLDB Journal, 13(3):294­315, 2004. [19] L. Galanis, Y. Wang, S. Jeffery, and D. J. DeWitt. Locating Data Sources in Large Distributed Systems. In VLDB 2003. [20] X. Gong, W. Qian, Y. Yan, and A. Zhou. Bloom Filter-Based XML Packets Filtering for Millions of Path Queries. In ICDE 2005. [21] T. J. Green, A. Gupta, G. Miklau, M. Onizuka, and D. Suciu. Processing XML Streams with Deterministic Automata and Stream Indexes. ACM Trans. Database Syst., 29(4):752­788, 2004. [22] A. Gupta, O. D. Sahin, D. Agrawal, and A. E. Abbadi. Meghdoot: Content-based publish/subscrib e over P2P networks. In Midd leware 2004. [23] A. K. Gupta and D. Suciu. Stream Processing of XPath Queries with Predicates. In SIGMOD 2003. [24] J. E. Hop croft, R. Motwani, Rotwani, and J. D. Ullman. Introduction to Automata Theory, Languages and Computability. Addison-Wesley Longman Publishing Co., Inc., Boston, MA, USA, 2000. [25] S. Hou and H.-A. Jacobsen. Predicate-based Filtering of XPath Expressions. In ICDE 2006. [26] G. Koloniari and E. Pitoura. Content-based Routing of Path Queries in Peer-to-Peer Systems. In EDBT 2004. [27] M. M. Moro, P. Bakalov, and V. J. Tsotras. Early Profile Pruning on XML-aware Publish/Subscrib e Systems. In VLDB 2007. [28] F. Peng and S. S. Chawathe. XPath queries on streaming data. In SIGMOD 2003. [29] A. C. Snoeren, K. Conley, and D. K. Gifford. Mesh-Based Content Routing using XML. SOSP 2001, 35(5):160­173, 2001. [30] I. Stoica, R. Morris, D. Karger, M. F. Kaashoek, and H. Balakrishnan. Chord: A Scalable Peer-to-p eer Lookup Service for Internet Applications. In SIGCOMM 2001. [31] C. Tryfonop oulos, S. Idreos, and M. Koubarakis. Publish/Subscrib e Functionality in IR Environments using Structured Overlay Networks. In SIGIR 2005. [32] H. Uchiyama, M. Onizuka, and T. Honishi. Distributed XML Stream Filtering System with High Scalability. In ICDE 2005. [33] A. Zhou, W. Qian, X. Gong, and M. Zhou. Sonnet: An Efficient Distributed Content-Based Dissemination Broker (Poster pap er). In SIGMOD 2007. 864