ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Tree-M/MT/BulkLoad.cpp
Revision: 1.1
Committed: Sun May 6 00:45:52 2001 UTC (23 years, 1 month ago) by root
Branch: MAIN
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 /*********************************************************************
2     * *
3     * Copyright (c) 1997,1998, 1999 *
4     * Multimedia DB Group and DEIS - CSITE-CNR, *
5     * University of Bologna, Bologna, ITALY. *
6     * *
7     * All Rights Reserved. *
8     * *
9     * Permission to use, copy, and distribute this software and its *
10     * documentation for NON-COMMERCIAL purposes and without fee is *
11     * hereby granted provided that this copyright notice appears in *
12     * all copies. *
13     * *
14     * THE AUTHORS MAKE NO REPRESENTATIONS OR WARRANTIES ABOUT THE *
15     * SUITABILITY OF THE SOFTWARE, EITHER EXPRESS OR IMPLIED, INCLUDING *
16     * BUT NOT LIMITED TO THE IMPLIED WARRANTIES OF MERCHANTABILITY, *
17     * FITNESS FOR A PARTICULAR PURPOSE, OR NON-INFRINGEMENT. THE AUTHOR *
18     * SHALL NOT BE LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A *
19     * RESULT OF USING, MODIFYING OR DISTRIBUTING THIS SOFTWARE OR ITS *
20     * DERIVATIVES. *
21     * *
22     *********************************************************************/
23    
24     #ifdef UNIX
25     #include <unistd.h>
26     #endif
27     #include "MT.h"
28    
29     extern double MIN_UTIL;
30    
31     // find the node having the minimum number of children
32     // this is used in the redistributing phase of the BulkLoad algorithm
33     int
34     FindMin(int *children, int max)
35     {
36     int j, jmin=0;
37    
38     for(j=1; j<max; j++)
39     if(children[j]<children[jmin]) jmin=j;
40     return jmin;
41     }
42    
43     // return root level+1 (the height of the tree)
44     // this is used in the "splitting" phase of the BulkLoad algorithm
45     int
46     MT::MaxLevel() const
47     {
48     GiSTnode *root;
49     GiSTpath path;
50     int i;
51    
52     path.MakeRoot();
53     root=ReadNode(path);
54     i=root->Level();
55     delete root;
56     return i+1;
57     }
58    
59     // split this M-tree into a list of trees having height level
60     // this is used in the "splitting" phase of the BulkLoad algorithm
61     GiSTlist<char *> *
62     MT::SplitTree(int *ncreated, int level, GiSTlist<MTentry *> *children, char *name)
63     // ncreated is the number of created sub-trees,
64     // level is the split level for the tree,
65     // children is the list of the parents of each sub-tree,
66     // name is the root for the sub-trees names
67     // the return value is the list of splitted sub-trees
68     {
69     GiSTlist<char *> *trees=new GiSTlist<char *>; // this is the results list
70     GiSTlist<MTnode *> *oldList=new GiSTlist<MTnode *>; // this is the nodes list
71     GiSTpath path;
72     MTnode *node=new MTnode; // this is because the first operation on node is a delete
73     char newname[50];
74    
75     path.MakeRoot();
76     oldList->Append((MTnode *)ReadNode(path));
77     do { // build the roots list
78     GiSTlist<MTnode *> *newList=new GiSTlist<MTnode *>; // this is the list of the current level nodes
79    
80     while(!oldList->IsEmpty()) {
81     delete node; // delete the old node created by ReadNode
82     node=oldList->RemoveFront(); // retrieve next node to be examined
83     path=node->Path();
84     for(int i=0; i<node->NumEntries(); i++) { // append all its children to the new list
85     MTentry *e=(MTentry *)(*node)[i].Ptr();
86    
87     path.MakeChild(e->Ptr());
88     newList->Append((MTnode *)ReadNode(path));
89     path.MakeParent();
90     }
91     }
92     delete oldList;
93     oldList=newList;
94     } while(node->Level()>level); // stop if we're at the split level
95     delete node;
96     while(!oldList->IsEmpty()) { // now append each sub-tree to its root
97     MT *subtree=new MT;
98     MTnode *newnode;
99    
100     node=oldList->RemoveFront();
101     sprintf(newname, "%s.%i", name, ++(*ncreated));
102     unlink(newname); // if this M-tree already exists, delete it
103     subtree->Create(newname); // create a new M-tree
104     path.MakeRoot();
105     newnode=(MTnode *)subtree->ReadNode(path); // read the root of the tree
106     subtree->Append(newnode, (MTnode *)node->Copy()); // append the sub-tree of current node to the root of this M-tree
107     children->Append(node->Entry()); // insert the root entry into the list
108     trees->Append(strdup(newname)); // insert the new M-tree name into the list
109     delete node;
110     delete newnode;
111     delete subtree;
112     }
113     delete oldList;
114     return trees;
115     }
116    
117     // load this M-tree with n data using the BulkLoad algorithm [CP98]
118     void
119     MT::BulkLoad(MTentry **data, int n, double padFactor, char *name)
120     // data is an array of n entries
121     // padFactor is the maximum node utilization (use 1)
122     // name is the name of the tree
123     {
124     int i, Size=0, totSize, addEntrySize=(sizeofEntry()? sizeof(GiSTpage): sizeof(GiSTlte)+sizeof(GiSTpage)), minSize=(int)(Store()->PageSize()*MIN_UTIL), NumEntries; // this is the total size of entries
125    
126     if(sizeofEntry()) Size=n*(sizeof(GiSTpage)+sizeofEntry()); // (only valid if we've fixed size entries)
127     else for(i=0; i<n; i++) Size+=sizeof(GiSTlte)+sizeof(GiSTpage)+data[i]->CompressedLength();
128     totSize=Size+GIST_PAGE_HEADER_SIZE+sizeof(GiSTlte);
129     NumEntries=(int)(Store()->PageSize()*padFactor*n)/totSize;
130     // cout << "exp. size=" << totSize << endl;
131     if(totSize>Store()->PageSize()) { // we need to split the entries into several sub-trees
132     GiSTlist<char *> nameslist, othernameslist; // list of the sub-trees names
133     GiSTlist<MTentry *> plist, parentslist; // lists of the root entries of each sub-tree
134     GiSTlist<int> *lists=NULL; // list of entries for each sub-tree
135     GiSTlist<double> *dists=NULL; // list of distances for each sub-tree
136     char **trees; // array of the sub-trees names
137     GiSTlist<MTnode *> *oldList=new GiSTlist<MTnode *>;
138     GiSTpath path;
139     MTentry ***arrays; // array of the entries for each sub-tree
140     MTentry **parents; // array of the root entries for each sub-tree
141     MTnode *node=NULL;
142     GiSTlist<double *> *distm; // distance matrix
143     int s=(int)MAX(MIN(NumEntries, ceil(((float)n)/NumEntries)), NumEntries*MIN_UTIL); // initial number of samples
144     int j, nsamples, *samples=new int[s], *sizes=NULL, *ns=NULL, ncreated=0, minLevel=MAXINT, nInit, l, iters=0, MAXITER=s*s;
145     char newname[50];
146     BOOL *sampled=new BOOL[n]; // is this entry in the samples set?
147    
148     // cout << "NE*pF=" << NumEntries*padFactor << ", n/NE*pF=" << n/floor(NumEntries*padFactor) << ", s=" << s << endl;
149     distm=(GiSTlist<double *> *)calloc(s,sizeof(GiSTlist<double *>));
150     do { // sampling phase
151     iters++;
152     if(iters>1) { // this is a new sampling phase
153     // cout << "Re-sampling: " << iters << "/" << MAXITER << endl;
154     while(!lists[0].IsEmpty()) {
155     lists[0].RemoveFront();
156     dists[0].RemoveFront();
157     }
158     delete []lists;
159     delete []dists;
160     delete []sizes;
161     delete []ns;
162     while(!distm[0].IsEmpty()) delete []distm[0].RemoveFront(); // empty the distance list
163     for(i=1; i<s; i++) distm[i].front=distm[i].rear=NULL;
164     }
165     // if(iters>=MAXITER) minSize=0;
166     if(iters>=MAXITER) {
167     cout << "Too many loops in BulkLoad!\nPlease select a lower minimum node utilization or a bigger node size.\n";
168     exit(1);
169     }
170     nsamples=0;
171     for(i=0; i<n; i++) sampled[i]=FALSE;
172     // pick samples to create parents
173     while(nsamples<s) {
174     do i=PickRandom(0, n);
175     while(sampled[i]);
176     sampled[i]=TRUE;
177     samples[nsamples++]=i;
178     }
179     // cout << "Samples:\n";
180     // for(i=0; i<s; i++) cout << "\t" << i << ":\t" << data[samples[i]];
181     lists=new GiSTlist<int>[s];
182     dists=new GiSTlist<double>[s];
183     sizes=new int[s];
184     ns=new int[s];
185     for(i=0; i<s; i++) {
186     sizes[i]=GIST_PAGE_HEADER_SIZE+sizeof(GiSTlte);
187     ns[i]=1;
188     }
189     for(i=0; i<s; i++) distm[i].Prepend(new double[s]);
190     for(i=0; i<s; i++) { // compute the relative distances between samples
191     for(j=0; j<i; j++)
192     distm[j].front->entry[i]=(distm[i].front->entry[j]=data[samples[j]]->object().distance(data[samples[i]]->object()));
193     distm[i].front->entry[i]=0;
194     }
195     for(i=0; i<n; i++) { // assign each entry to its nearest parent
196     // cout << "Now assigning " << data[i];
197     if(sampled[i]) {
198     for(j=0; samples[j]!=i; j++);
199     lists[j].Prepend(i); // insert the entry in the right list
200     dists[j].Prepend(0);
201     sizes[j]+=addEntrySize+data[i]->CompressedLength();
202     // cout << "\tAssigned (0) to " << j << ", " << data[samples[j]];
203     }
204     else { // here we optimize the distance computations (like we do in the insert algorithm)
205     double *dist=new double[s];
206     int minindex=0;
207    
208     dist[0]=data[samples[0]]->object().distance(data[i]->object());
209     for(j=1; j<s; j++) {
210     BOOL cont=TRUE;
211    
212     dist[j]=-1;
213     if(fabs(data[samples[j]]->Key()->distance-data[i]->Key()->distance)>dist[minindex]) continue; // pruning for reference point (parent)
214     for(int k=0; (k<j)&&cont; k++) // pruning for reference points (other samples)
215     if(dist[k]<0) continue;
216     else cont=(fabs(dist[k]-distm[j].front->entry[k])<dist[minindex]);
217     if(!cont) continue;
218     dist[j]=data[samples[j]]->object().distance(data[i]->object()); // we have to compute this distance
219     if(dist[j]<dist[minindex]) minindex=j;
220     }
221     // cout << "\tAssigned (" << dist[minindex] << ") to " << minindex << ", " << data[samples[minindex]];
222     lists[minindex].Append(i);
223     dists[minindex].Append(dist[minindex]);
224     sizes[minindex]+=addEntrySize+data[i]->CompressedLength();
225     ns[minindex]++;
226     if(sizes[minindex]>=minSize) delete []dist;
227     else distm[minindex].Append(dist);
228     }
229     }
230     // redistribute underfilled parents
231     // cout << "Underfilled parents redistribution...\n";
232     while(sizes[i=FindMin(sizes, nsamples)]<minSize) {
233     GiSTlist<int> list=lists[i];
234     GiSTlist<double *> dlist=distm[i];
235    
236     while(!dists[i].IsEmpty()) dists[i].RemoveFront();
237     // cout << "Redistributing " << i << "' set (" << sizes[i] << "/" << minSize << ")...\n";
238     for(j=0; j<nsamples; j++)
239     for(GiSTlistnode<double *> *lnode=distm[j].front; lnode; lnode=lnode->next) lnode->entry[i]=lnode->entry[nsamples-1];
240     // substitute this set with last set
241     distm[i]=distm[nsamples-1];
242     lists[i]=lists[nsamples-1];
243     dists[i]=dists[nsamples-1];
244     samples[i]=samples[nsamples-1];
245     sizes[i]=sizes[nsamples-1];
246     ns[i]=ns[nsamples-1];
247     nsamples--;
248     while(!list.IsEmpty()) { // assign each entry to its nearest parent
249     double *d=dlist.RemoveFront();
250     int k=list.RemoveFront(), index, minindex=-1;
251     // cout << "Now assigning " << data[k];
252    
253     for(index=0; (index<nsamples)&&(minindex<0); index++) // search for a computed distance
254     if(d[index]>0) minindex=index;
255     if(minindex<0) { // no distance was computed (i.e. all distances were pruned)
256     d[0]=data[samples[0]]->object().distance(data[k]->object());
257     minindex=0;
258     }
259     for(index=0; index<nsamples; index++) {
260     BOOL cont=TRUE;
261    
262     if(index==minindex) continue;
263     if(d[index]<0) { // distance wasn't computed
264     if(fabs(data[samples[index]]->Key()->distance-data[k]->Key()->distance)>d[minindex]) continue; // pruning for reference point (parent)
265     for(l=0; (l<index)&&cont; l++) // pruning for reference points (other samples)
266     if(d[l]<0) continue;
267     else cont=(fabs(d[l]-distm[index].front->entry[l])<d[minindex]);
268     if(!cont) continue;
269     d[index]=data[samples[index]]->object().distance(data[k]->object()); // we have to compute this distance
270     }
271     if(d[index]<d[minindex]) minindex=index;
272     }
273     // cout << "\tAssigned (" << d[minindex] << ") to " << minindex << ", " << data[samples[minindex]];
274     lists[minindex].Append(k);
275     dists[minindex].Append(d[minindex]);
276     sizes[minindex]+=addEntrySize+data[k]->CompressedLength();
277     ns[minindex]++;
278     if(sizes[minindex]>=minSize) delete []d;
279     else distm[minindex].Append(d);
280     }
281     assert(dlist.IsEmpty());
282     }
283     } while(nsamples==1); // if there's only one child, repeat the sampling phase
284     // cout << "Samples:\n";
285     // for(i=0; i<nsamples; i++) cout << "\t" << i << ": " << data[samples[i]];
286     arrays=new MTentry **[nsamples];
287     for(i=0; i<nsamples; i++) { // convert the lists into arrays
288     arrays[i]=new MTentry *[ns[i]];
289     for(j=0; j<ns[i]; j++) {
290     int k=lists[i].RemoveFront();
291    
292     arrays[i][j]=(MTentry *)data[k]->Copy();
293     arrays[i][j]->Key()->distance=dists[i].RemoveFront();
294     }
295     assert(lists[i].IsEmpty());
296     assert(dists[i].IsEmpty());
297     }
298     delete []dists;
299     delete []lists;
300     for(i=0; i<nsamples; i++)
301     while(!distm[i].IsEmpty())
302     delete [](distm[i].RemoveFront());
303     free(distm);
304     // build an M-tree under each parent
305     nInit=nsamples;
306     // cout << "Now building " << nsamples << " sub-trees...\n";
307     MT *tree=new MT;
308    
309     // for(i=0; i<nsamples; i++) cout << i+1 << "' set: " << ns[i] << endl;
310     for(i=0; i<nInit; i++) {
311     MTnode *root;
312     GiSTpath path;
313    
314     sprintf(newname, "%s.%i", name, ++ncreated);
315     unlink(newname);
316     tree->Create(newname); // create the new sub-tree
317     // cout << "Now building sub-tree " << newname << " on " << ns[i] << " data (exp. size=" << sizes[i] << ")...\n";
318     tree->BulkLoad(arrays[i], ns[i], padFactor, newname); // insert the data into the sub-tree
319     // cout << "Tree level=" << tree->MaxLevel() << endl;
320     // if the root node is underfilled, we have to split the tree
321     path.MakeRoot();
322     root=(MTnode *)tree->ReadNode(path);
323     if(root->IsUnderFull(*Store())) {
324     GiSTlist<MTentry *> *roots=new GiSTlist<MTentry *>;
325     GiSTlist<char *> *list=tree->SplitTree(&ncreated, tree->MaxLevel()-1, roots, name); // split the tree
326    
327     nsamples--;
328     while(!list->IsEmpty()) { // insert all the new trees in the sub-trees list
329     MTentry *e=roots->RemoveFront();
330    
331     othernameslist.Append(list->RemoveFront());
332     for(j=0; j<n; j++) if(data[j]->object()==e->object()) { // append also the root entry to the parents list
333     // cout << "parent=" << data[j];
334     plist.Append(data[j]);
335     j=n;
336     }
337     delete e;
338     nsamples++;
339     }
340     delete list;
341     delete roots;
342     minLevel=MIN(minLevel, tree->MaxLevel()-1);
343     }
344     else {
345     char *tmp=new char[50];
346    
347     strcpy(tmp, newname);
348     othernameslist.Append(tmp);
349     plist.Append(data[samples[i]]);
350     minLevel=MIN(minLevel, tree->MaxLevel());
351     }
352     delete root;
353     tree->Close();
354     delete tree->Store(); // it was created in tree->Create()
355     }
356     for(i=0; i<nInit; i++) {
357     for(j=0; j<ns[i]; j++) delete arrays[i][j];
358     delete []arrays[i];
359     }
360     delete []arrays;
361     while(!plist.IsEmpty()) { // insert the trees in the list (splitting if necessary)
362     MTentry *parent=plist.RemoveFront();
363     char *tmp=othernameslist.RemoveFront();
364    
365     strcpy(newname, tmp);
366     delete []tmp;
367     tree->Open(newname);
368     // cout << ": Tree level=" << tree->MaxLevel() << " (" << minLevel << ")\n";
369     if(tree->MaxLevel()>minLevel) { // we have to split the tree to reduce its height
370     // cout << "level too high!!! (min=" << minLevel << ") Splitting the tree...\n";
371     GiSTlist<MTentry *> *roots=new GiSTlist<MTentry *>;
372     GiSTlist<char *> *list=tree->SplitTree(&ncreated, minLevel, roots, name); // split the tree
373    
374     nsamples--;
375     while(!list->IsEmpty()) { // insert all the new trees in the sub-trees list
376     MTentry *e=roots->RemoveFront();
377    
378     nameslist.Append(list->RemoveFront());
379     for(j=0; j<n; j++) if(data[j]->object()==e->object()) { // append also the root entry to the parents list
380     // cout << "parent=" << data[j];
381     parentslist.Append(data[j]);
382     j=n;
383     }
384     delete e;
385     nsamples++;
386     }
387     delete list;
388     delete roots;
389     }
390     else { // simply insert the tree and its root to the lists
391     char *tmp=new char[50];
392    
393     strcpy(tmp, newname);
394     nameslist.Append(tmp);
395     // cout << "parent=" << data[samples[i]];
396     parentslist.Append(parent);
397     }
398     tree->Close();
399     delete tree->Store(); // it was created in tree->Open()
400     }
401     parents=new MTentry *[nsamples];
402     trees=new char *[nsamples];
403     for(i=0; i<nsamples; i++) { // convert the lists into arrays
404     trees[i]=nameslist.RemoveFront();
405     parents[i]=parentslist.RemoveFront();
406     }
407     // build the super-tree upon the parents
408     sprintf(newname, "%s.0", name);
409     // cout << "Now building super-tree " << newname << " on " << nsamples << " data...\n";
410     BulkLoad(parents, nsamples, padFactor, newname);
411     // attach each sub-tree to the leaves of the super-tree
412     path.MakeRoot();
413     node=(MTnode *)ReadNode(path);
414     oldList->Append(node);
415     // cout << "super-tree built!\n";
416     l=node->Level();
417     while(l>0) { // build the leaves list for super-tree
418     GiSTlist<MTnode *> *newList=new GiSTlist<MTnode *>;
419    
420     while(!oldList->IsEmpty()) {
421     node=oldList->RemoveFront();
422     path=node->Path();
423     node->SetLevel(node->Level()+minLevel); // update level of the upper nodes of the super-tree
424     WriteNode(node);
425     for(i=0; i<node->NumEntries(); i++) {
426     MTentry *e=(MTentry *)(*node)[i].Ptr();
427    
428     path.MakeChild(e->Ptr());
429     newList->Append((MTnode *)ReadNode(path));
430     path.MakeParent();
431     }
432     delete node;
433     }
434     delete oldList;
435     oldList=newList;
436     l--;
437     }
438     // cout << "Finished " << newname << endl;
439     while(!oldList->IsEmpty()) { // attach each sub-tree to its leaf
440     GiSTpath rootpath;
441    
442     rootpath.MakeRoot();
443     node=oldList->RemoveFront(); // retrieve next leaf (root of sub tree)
444     node->SetLevel(minLevel); // update level of the root of the sub-tree
445     path=node->Path();
446     for(i=0; i<node->NumEntries(); i++) {
447     MTnode *newnode=(MTnode *)CreateNode();
448     MTentry *e=(MTentry *)(*node)[i].Ptr();
449     GiSTpath newpath;
450    
451     path.MakeChild(Store()->Allocate());
452     newnode->Path()=path;
453     e->SetPtr(path.Page());
454     path.MakeParent();
455     for(j=0; e->object()!=parents[j]->object(); j++); // search the tree to append
456     tree->Open(trees[j]);
457     // cout << "Now appending sub-tree " << trees[j] << endl;
458     Append(newnode, (MTnode *)tree->ReadNode(rootpath)); // append this sub-tree to the super-tree
459     tree->Close();
460     delete tree->Store(); // it was created in tree->Open()
461     newpath=newnode->Path();
462     delete newnode;
463     }
464     WriteNode(node);
465     delete node;
466     }
467     tree->Open(trees[0]); // in order to destroy the object tree
468     delete tree;
469     for(i=0; i<nsamples; i++) delete []trees[i];
470     delete []trees;
471     // update radii of the upper nodes of the result M-tree
472     path.MakeRoot();
473     node=(MTnode *)ReadNode(path);
474     oldList->Append(node);
475     l=node->Level();
476     while(l>=minLevel) { // build the list of the nodes which radii should be recomputed
477     GiSTlist<MTnode *> *newList=new GiSTlist<MTnode *>;
478    
479     while(!oldList->IsEmpty()) {
480    
481     node=oldList->RemoveFront();
482     path=node->Path();
483     for(i=0; i<node->NumEntries(); i++) {
484     MTentry *e=(MTentry *)(*node)[i].Ptr();
485    
486     path.MakeChild(e->Ptr());
487     newList->Append((MTnode *)ReadNode(path));
488     path.MakeParent();
489     }
490     delete node;
491     }
492     delete oldList;
493     oldList=newList;
494     l--;
495     }
496     while(!oldList->IsEmpty()) { // adjust the radii of the nodes
497     MTnode *node=oldList->RemoveFront();
498    
499     AdjKeys(node);
500     delete node;
501     }
502     // be tidy...
503     delete oldList;
504     delete []parents;
505     delete []sizes;
506     delete []ns;
507     delete []sampled;
508     delete []samples;
509     for(i=0; i<=ncreated; i++) { // delete all temporary sub-trees
510     sprintf(newname, "%s.%i", name, i);
511     unlink(newname);
512     }
513     }
514     else { // we can insert all the entries in a single node
515     GiSTpath path;
516     GiSTnode *node;
517    
518     path.MakeRoot();
519     node=ReadNode(path);
520     for(i=0; i<n; i++)
521     node->Insert(*(data[i]));
522     assert(!node->IsOverFull(*Store()));
523     // cout << "real size=" << node->Size() << endl;
524     WriteNode(node);
525     delete node;
526     }
527     }
528    
529     // append the sub-tree rooted at from to the node to
530     // this is used in the "append" phase of the BulkLoad algorithm
531     void
532     MT::Append(MTnode *to, MTnode *from)
533     {
534     GiSTlist<MTnode *> *oldList=new GiSTlist<MTnode *>; // list of the nodes to append
535     GiSTlist<GiSTpath> pathList;
536     MT *fromtree=(MT *)from->Tree();
537     MTnode *node=new MTnode, *newnode;
538    
539     // cout << "Appending " << from;
540     oldList->Append(from);
541     pathList.Append(to->Path());
542     do {
543     GiSTlist<MTnode *> *newList=new GiSTlist<MTnode *>;
544    
545     while(!oldList->IsEmpty()) {
546     GiSTpath newpath=pathList.RemoveFront();
547    
548     delete node;
549     node=oldList->RemoveFront();
550     newnode=(MTnode *)ReadNode(newpath);
551     // cout << "Inserting " << node->NumEntries() << " entries:\n";
552     for(int i=0; i<node->NumEntries(); i++) {
553     MTentry *e=(MTentry *)(*node)[i].Ptr()->Copy();
554    
555     if(node->Level()>0) { // if node isn't a leaf, we've to allocate its children
556     GiSTpath nodepath=node->Path();
557     MTnode *childnode=(MTnode *)CreateNode(), *fromnode;
558    
559     nodepath.MakeChild(e->Ptr());
560     fromnode=(MTnode *)fromtree->ReadNode(nodepath);
561     newList->Append(fromnode);
562     e->SetPtr(Store()->Allocate());
563     newpath.MakeChild(e->Ptr());
564     childnode->Path()=newpath;
565     childnode->SetTree(this);
566     WriteNode(childnode); // write the empty node
567     pathList.Append(newpath);
568     newpath.MakeParent();
569     nodepath.MakeParent();
570     delete childnode;
571     }
572     newnode->Insert(*e);
573     // cout << "\tInserted " << e;
574     delete e;
575     }
576     newnode->SetLevel(node->Level());
577     WriteNode(newnode); // write the node
578     // cout << "Created " << newnode;
579     delete newnode;
580     }
581     delete oldList;
582     oldList=newList;
583     } while(node->Level()>0); // until we reach the leaves' level
584     // cout << node;
585     delete node;
586     delete oldList;
587     }
588    
589     // adjust the keys of node
590     // this is used during the final phase of the BulkLoad algorithm
591     void
592     MT::AdjKeys(GiSTnode *node)
593     {
594     GiSTnode *P;
595     GiSTpath parent_path=node->Path();
596     GiSTentry *entry, *actual;
597    
598     if(node->Path().IsRoot()) return;
599     parent_path.MakeParent();
600     P=ReadNode(parent_path);
601     entry=P->SearchPtr(node->Path().Page());
602     assert(entry!=NULL);
603     actual=node->Union();
604     actual->SetPtr(node->Path().Page());
605     ((MTkey *)actual->Key())->distance=((MTkey *)entry->Key())->distance; // necessary to keep track of the distance from the parent
606     if(!entry->IsEqual(*actual)) { // replace this entry
607     int pos=entry->Position();
608    
609     P->DeleteEntry(pos);
610     P->Insert(*actual);
611     /* if(P->IsOverFull(*Store())) { // this code should be unnecessary (if we have fixed length entries)
612     GiSTpage page=node->Path().Page();
613    
614     Split(&P, *actual);
615     node->Path()=P->Path();
616     node->Path().MakeChild(page);
617     }
618     else {
619     WriteNode(P);
620     AdjKeys(P);
621     } */
622     WriteNode(P);
623     AdjKeys(P);
624     }
625     delete P;
626     delete actual;
627     delete entry;
628     }