]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/dirnode.py
dirnode.py: fix a bug in the no-write change for Adder, and improve test coverage...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / dirnode.py
1
2 import time, math
3
4 from zope.interface import implements
5 from twisted.internet import defer
6 from foolscap.api import fireEventually
7 import simplejson
8 from allmydata.mutable.common import NotWriteableError
9 from allmydata.mutable.filenode import MutableFileNode
10 from allmydata.unknown import UnknownNode, strip_prefix_for_ro
11 from allmydata.interfaces import IFilesystemNode, IDirectoryNode, IFileNode, \
12      IImmutableFileNode, IMutableFileNode, \
13      ExistingChildError, NoSuchChildError, ICheckable, IDeepCheckable, \
14      MustBeDeepImmutableError, CapConstraintError, ChildOfWrongTypeError
15 from allmydata.check_results import DeepCheckResults, \
16      DeepCheckAndRepairResults
17 from allmydata.monitor import Monitor
18 from allmydata.util import hashutil, mathutil, base32, log
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.netstring import netstring, split_netstring
21 from allmydata.util.consumer import download_to_data
22 from allmydata.uri import LiteralFileURI, from_string, wrap_dirnode_cap
23 from pycryptopp.cipher.aes import AES
24 from allmydata.util.dictutil import AuxValueDict
25
26
27 def update_metadata(metadata, new_metadata, now):
28     """Updates 'metadata' in-place with the information in 'new_metadata'.
29     Timestamps are set according to the time 'now'."""
30
31     if metadata is None:
32         metadata = {'ctime': now,
33                     'mtime': now,
34                     'tahoe': {
35                         'linkcrtime': now,
36                         'linkmotime': now,
37                         }
38                     }
39
40     if new_metadata is not None:
41         # Overwrite all metadata.
42         newmd = new_metadata.copy()
43
44         # Except 'tahoe'.
45         if 'tahoe' in newmd:
46             del newmd['tahoe']
47         if 'tahoe' in metadata:
48             newmd['tahoe'] = metadata['tahoe']
49
50         # For backwards compatibility with Tahoe < 1.4.0:
51         if 'ctime' not in newmd:
52             if 'ctime' in metadata:
53                 newmd['ctime'] = metadata['ctime']
54             else:
55                 newmd['ctime'] = now
56         if 'mtime' not in newmd:
57             newmd['mtime'] = now
58
59         metadata = newmd
60     else:
61         # For backwards compatibility with Tahoe < 1.4.0:
62         if 'ctime' not in metadata:
63             metadata['ctime'] = now
64         metadata['mtime'] = now
65
66     # update timestamps
67     sysmd = metadata.get('tahoe', {})
68     if 'linkcrtime' not in sysmd:
69         # In Tahoe < 1.4.0 we used the word 'ctime' to mean what Tahoe >= 1.4.0
70         # calls 'linkcrtime'.
71         assert 'ctime' in metadata
72         sysmd['linkcrtime'] = metadata['ctime']
73     sysmd['linkmotime'] = now
74     metadata['tahoe'] = sysmd
75
76     return metadata
77
78
79 # TODO: {Deleter,MetadataSetter,Adder}.modify all start by unpacking the
80 # contents and end by repacking them. It might be better to apply them to
81 # the unpacked contents.
82
83 class Deleter:
84     def __init__(self, node, name, must_exist=True, must_be_directory=False, must_be_file=False):
85         self.node = node
86         self.name = name
87         self.must_exist = must_exist
88         self.must_be_directory = must_be_directory
89         self.must_be_file = must_be_file
90
91     def modify(self, old_contents, servermap, first_time):
92         children = self.node._unpack_contents(old_contents)
93         if self.name not in children:
94             if first_time and self.must_exist:
95                 raise NoSuchChildError(self.name)
96             self.old_child = None
97             return None
98         self.old_child, metadata = children[self.name]
99
100         # Unknown children can be removed regardless of must_be_directory or must_be_file.
101         if self.must_be_directory and IFileNode.providedBy(self.old_child):
102             raise ChildOfWrongTypeError("delete required a directory, not a file")
103         if self.must_be_file and IDirectoryNode.providedBy(self.old_child):
104             raise ChildOfWrongTypeError("delete required a file, not a directory")
105
106         del children[self.name]
107         new_contents = self.node._pack_contents(children)
108         return new_contents
109
110
111 class MetadataSetter:
112     def __init__(self, node, name, metadata, create_readonly_node=None):
113         self.node = node
114         self.name = name
115         self.metadata = metadata
116         self.create_readonly_node = create_readonly_node
117
118     def modify(self, old_contents, servermap, first_time):
119         children = self.node._unpack_contents(old_contents)
120         name = self.name
121         if name not in children:
122             raise NoSuchChildError(name)
123
124         now = time.time()
125         child = children[name][0]
126
127         metadata = update_metadata(children[name][1].copy(), self.metadata, now)
128         if self.create_readonly_node and metadata.get('no-write', False):
129             child = self.create_readonly_node(child, name)
130
131         children[name] = (child, metadata)
132         new_contents = self.node._pack_contents(children)
133         return new_contents
134
135
136 class Adder:
137     def __init__(self, node, entries=None, overwrite=True, create_readonly_node=None):
138         self.node = node
139         if entries is None:
140             entries = {}
141         precondition(isinstance(entries, dict), entries)
142         self.entries = entries
143         self.overwrite = overwrite
144         self.create_readonly_node = create_readonly_node
145
146     def set_node(self, name, node, metadata):
147         precondition(isinstance(name, unicode), name)
148         precondition(IFilesystemNode.providedBy(node), node)
149         self.entries[name] = (node, metadata)
150
151     def modify(self, old_contents, servermap, first_time):
152         children = self.node._unpack_contents(old_contents)
153         now = time.time()
154         for (name, (child, new_metadata)) in self.entries.iteritems():
155             precondition(isinstance(name, unicode), name)
156             precondition(IFilesystemNode.providedBy(child), child)
157
158             # Strictly speaking this is redundant because we would raise the
159             # error again in pack_children.
160             child.raise_error()
161
162             metadata = None
163             if name in children:
164                 if not self.overwrite:
165                     raise ExistingChildError("child '%s' already exists" % name)
166
167                 if self.overwrite == "only-files" and IDirectoryNode.providedBy(children[name][0]):
168                     raise ExistingChildError("child '%s' already exists" % name)
169                 metadata = children[name][1].copy()
170
171             metadata = update_metadata(metadata, new_metadata, now)
172             if self.create_readonly_node and metadata.get('no-write', False):
173                 child = self.create_readonly_node(child, name)
174
175             children[name] = (child, metadata)
176         new_contents = self.node._pack_contents(children)
177         return new_contents
178
179 def _encrypt_rw_uri(filenode, rw_uri):
180     assert isinstance(rw_uri, str)
181     writekey = filenode.get_writekey()
182     if not writekey:
183         return ""
184     salt = hashutil.mutable_rwcap_salt_hash(rw_uri)
185     key = hashutil.mutable_rwcap_key_hash(salt, writekey)
186     cryptor = AES(key)
187     crypttext = cryptor.process(rw_uri)
188     mac = hashutil.hmac(key, salt + crypttext)
189     assert len(mac) == 32
190     return salt + crypttext + mac
191     # The MAC is not checked by readers in Tahoe >= 1.3.0, but we still
192     # produce it for the sake of older readers.
193
194 def pack_children(filenode, children, deep_immutable=False):
195     """Take a dict that maps:
196          children[unicode_name] = (IFileSystemNode, metadata_dict)
197     and pack it into a single string, for use as the contents of the backing
198     file. This is the same format as is returned by _unpack_contents. I also
199     accept an AuxValueDict, in which case I'll use the auxilliary cached data
200     as the pre-packed entry, which is faster than re-packing everything each
201     time.
202
203     If deep_immutable is True, I will require that all my children are deeply
204     immutable, and will raise a MustBeDeepImmutableError if not.
205     """
206
207     has_aux = isinstance(children, AuxValueDict)
208     entries = []
209     for name in sorted(children.keys()):
210         assert isinstance(name, unicode)
211         entry = None
212         (child, metadata) = children[name]
213         child.raise_error()
214         if deep_immutable and not child.is_allowed_in_immutable_directory():
215             raise MustBeDeepImmutableError("child '%s' is not allowed in an immutable directory" % (name,), name)
216         if has_aux:
217             entry = children.get_aux(name)
218         if not entry:
219             assert IFilesystemNode.providedBy(child), (name,child)
220             assert isinstance(metadata, dict)
221             rw_uri = child.get_write_uri()
222             if rw_uri is None:
223                 rw_uri = ""
224             assert isinstance(rw_uri, str), rw_uri
225             
226             # should be prevented by MustBeDeepImmutableError check above
227             assert not (rw_uri and deep_immutable)
228
229             ro_uri = child.get_readonly_uri()
230             if ro_uri is None:
231                 ro_uri = ""
232             assert isinstance(ro_uri, str), ro_uri
233             entry = "".join([netstring(name.encode("utf-8")),
234                              netstring(strip_prefix_for_ro(ro_uri, deep_immutable)),
235                              netstring(_encrypt_rw_uri(filenode, rw_uri)),
236                              netstring(simplejson.dumps(metadata))])
237         entries.append(netstring(entry))
238     return "".join(entries)
239
240 class DirectoryNode:
241     implements(IDirectoryNode, ICheckable, IDeepCheckable)
242     filenode_class = MutableFileNode
243
244     def __init__(self, filenode, nodemaker, uploader):
245         assert IFileNode.providedBy(filenode), filenode
246         assert not IDirectoryNode.providedBy(filenode), filenode
247         self._node = filenode
248         filenode_cap = filenode.get_cap()
249         self._uri = wrap_dirnode_cap(filenode_cap)
250         self._nodemaker = nodemaker
251         self._uploader = uploader
252
253     def __repr__(self):
254         return "<%s %s-%s %s>" % (self.__class__.__name__,
255                                   self.is_readonly() and "RO" or "RW",
256                                   self.is_mutable() and "MUT" or "IMM",
257                                   hasattr(self, '_uri') and self._uri.abbrev())
258
259     def get_size(self):
260         """Return the size of our backing mutable file, in bytes, if we've
261         fetched it. Otherwise return None. This returns synchronously."""
262         return self._node.get_size()
263
264     def get_current_size(self):
265         """Calculate the size of our backing mutable file, in bytes. Returns
266         a Deferred that fires with the result."""
267         return self._node.get_current_size()
268
269     def _read(self):
270         if self._node.is_mutable():
271             # use the IMutableFileNode API.
272             d = self._node.download_best_version()
273         else:
274             d = download_to_data(self._node)
275         d.addCallback(self._unpack_contents)
276         return d
277
278     def _decrypt_rwcapdata(self, encwrcap):
279         salt = encwrcap[:16]
280         crypttext = encwrcap[16:-32]
281         key = hashutil.mutable_rwcap_key_hash(salt, self._node.get_writekey())
282         cryptor = AES(key)
283         plaintext = cryptor.process(crypttext)
284         return plaintext
285
286     def _create_and_validate_node(self, rw_uri, ro_uri, name):
287         node = self._nodemaker.create_from_cap(rw_uri, ro_uri,
288                                                deep_immutable=not self.is_mutable(),
289                                                name=name)
290         node.raise_error()
291         return node
292
293     def _create_readonly_node(self, node, name):
294         if not node.is_unknown() and node.is_readonly():
295             return node
296         return self._create_and_validate_node(None, node.get_readonly_uri(), name=name)
297
298     def _unpack_contents(self, data):
299         # the directory is serialized as a list of netstrings, one per child.
300         # Each child is serialized as a list of four netstrings: (name, ro_uri,
301         # rwcapdata, metadata), in which the name, ro_uri, metadata are in
302         # cleartext. The 'name' is UTF-8 encoded. The rwcapdata is formatted as:
303         # pack("16ss32s", iv, AES(H(writekey+iv), plaintext_rw_uri), mac)
304         assert isinstance(data, str), (repr(data), type(data))
305         # an empty directory is serialized as an empty string
306         if data == "":
307             return AuxValueDict()
308         writeable = not self.is_readonly()
309         mutable = self.is_mutable()
310         children = AuxValueDict()
311         position = 0
312         while position < len(data):
313             entries, position = split_netstring(data, 1, position)
314             entry = entries[0]
315             (name_utf8, ro_uri, rwcapdata, metadata_s), subpos = split_netstring(entry, 4)
316             if not mutable and len(rwcapdata) > 0:
317                 raise ValueError("the rwcapdata field of a dirnode in an immutable directory was not empty")
318             name = name_utf8.decode("utf-8")
319             rw_uri = ""
320             if writeable:
321                 rw_uri = self._decrypt_rwcapdata(rwcapdata)
322
323             # Since the encryption uses CTR mode, it currently leaks the length of the
324             # plaintext rw_uri -- and therefore whether it is present, i.e. whether the
325             # dirnode is writeable (ticket #925). By stripping trailing spaces in
326             # Tahoe >= 1.6.0, we may make it easier for future versions to plug this leak.
327             # ro_uri is treated in the same way for consistency.
328             # rw_uri and ro_uri will be either None or a non-empty string.
329
330             rw_uri = rw_uri.rstrip(' ') or None
331             ro_uri = ro_uri.rstrip(' ') or None
332
333             try:
334                 child = self._create_and_validate_node(rw_uri, ro_uri, name)
335                 if mutable or child.is_allowed_in_immutable_directory():
336                     metadata = simplejson.loads(metadata_s)
337                     assert isinstance(metadata, dict)
338                     children[name] = (child, metadata)
339                     children.set_with_aux(name, (child, metadata), auxilliary=entry)
340                 else:
341                     log.msg(format="mutable cap for child '%(name)s' unpacked from an immutable directory",
342                                    name=name_utf8,
343                                    facility="tahoe.webish", level=log.UNUSUAL)
344             except CapConstraintError, e:
345                 log.msg(format="unmet constraint on cap for child '%(name)s' unpacked from a directory:\n"
346                                "%(message)s", message=e.args[0], name=name_utf8,
347                                facility="tahoe.webish", level=log.UNUSUAL)
348
349         return children
350
351     def _pack_contents(self, children):
352         # expects children in the same format as _unpack_contents
353         return pack_children(self._node, children)
354
355     def is_readonly(self):
356         return self._node.is_readonly()
357
358     def is_mutable(self):
359         return self._node.is_mutable()
360
361     def is_unknown(self):
362         return False
363
364     def is_allowed_in_immutable_directory(self):
365         return not self._node.is_mutable()
366
367     def raise_error(self):
368         pass
369
370     def get_uri(self):
371         return self._uri.to_string()
372
373     def get_write_uri(self):
374         if self.is_readonly():
375             return None
376         return self._uri.to_string()
377
378     def get_readonly_uri(self):
379         return self._uri.get_readonly().to_string()
380
381     def get_cap(self):
382         return self._uri
383
384     def get_readcap(self):
385         return self._uri.get_readonly()
386
387     def get_verify_cap(self):
388         return self._uri.get_verify_cap()
389
390     def get_repair_cap(self):
391         if self._node.is_readonly():
392             return None # readonly (mutable) dirnodes are not yet repairable
393         return self._uri
394
395     def get_storage_index(self):
396         return self._uri.get_storage_index()
397
398     def check(self, monitor, verify=False, add_lease=False):
399         """Perform a file check. See IChecker.check for details."""
400         return self._node.check(monitor, verify, add_lease)
401     def check_and_repair(self, monitor, verify=False, add_lease=False):
402         return self._node.check_and_repair(monitor, verify, add_lease)
403
404     def list(self):
405         """I return a Deferred that fires with a dictionary mapping child
406         name to a tuple of (IFilesystemNode, metadata)."""
407         return self._read()
408
409     def has_child(self, name):
410         """I return a Deferred that fires with a boolean, True if there
411         exists a child of the given name, False if not."""
412         assert isinstance(name, unicode)
413         d = self._read()
414         d.addCallback(lambda children: children.has_key(name))
415         return d
416
417     def _get(self, children, name):
418         child = children.get(name)
419         if child is None:
420             raise NoSuchChildError(name)
421         return child[0]
422
423     def _get_with_metadata(self, children, name):
424         child = children.get(name)
425         if child is None:
426             raise NoSuchChildError(name)
427         return child
428
429     def get(self, name):
430         """I return a Deferred that fires with the named child node,
431         which is an IFilesystemNode."""
432         assert isinstance(name, unicode)
433         d = self._read()
434         d.addCallback(self._get, name)
435         return d
436
437     def get_child_and_metadata(self, name):
438         """I return a Deferred that fires with the (node, metadata) pair for
439         the named child. The node is an IFilesystemNode, and the metadata
440         is a dictionary."""
441         assert isinstance(name, unicode)
442         d = self._read()
443         d.addCallback(self._get_with_metadata, name)
444         return d
445
446     def get_metadata_for(self, name):
447         assert isinstance(name, unicode)
448         d = self._read()
449         d.addCallback(lambda children: children[name][1])
450         return d
451
452     def set_metadata_for(self, name, metadata):
453         assert isinstance(name, unicode)
454         if self.is_readonly():
455             return defer.fail(NotWriteableError())
456         assert isinstance(metadata, dict)
457         s = MetadataSetter(self, name, metadata,
458                            create_readonly_node=self._create_readonly_node)
459         d = self._node.modify(s.modify)
460         d.addCallback(lambda res: self)
461         return d
462
463     def get_child_at_path(self, path):
464         """Transform a child path into an IFilesystemNode.
465
466         I perform a recursive series of 'get' operations to find the named
467         descendant node. I return a Deferred that fires with the node, or
468         errbacks with IndexError if the node could not be found.
469
470         The path can be either a single string (slash-separated) or a list of
471         path-name elements.
472         """
473         d = self.get_child_and_metadata_at_path(path)
474         d.addCallback(lambda (node, metadata): node)
475         return d
476
477     def get_child_and_metadata_at_path(self, path):
478         """Transform a child path into an IFilesystemNode and
479         a metadata dictionary from the last edge that was traversed.
480         """
481
482         if not path:
483             return defer.succeed((self, {}))
484         if isinstance(path, (list, tuple)):
485             pass
486         else:
487             path = path.split("/")
488         for p in path:
489             assert isinstance(p, unicode)
490         childname = path[0]
491         remaining_path = path[1:]
492         if remaining_path:
493             d = self.get(childname)
494             d.addCallback(lambda node:
495                           node.get_child_and_metadata_at_path(remaining_path))
496             return d
497         d = self.get_child_and_metadata(childname)
498         return d
499
500     def set_uri(self, name, writecap, readcap, metadata=None, overwrite=True):
501         precondition(isinstance(name, unicode), name)
502         precondition(isinstance(writecap, (str,type(None))), writecap)
503         precondition(isinstance(readcap, (str,type(None))), readcap)
504
505         # We now allow packing unknown nodes, provided they are valid
506         # for this type of directory.
507         child_node = self._create_and_validate_node(writecap, readcap, name)
508         d = self.set_node(name, child_node, metadata, overwrite)
509         d.addCallback(lambda res: child_node)
510         return d
511
512     def set_children(self, entries, overwrite=True):
513         # this takes URIs
514         a = Adder(self, overwrite=overwrite,
515                   create_readonly_node=self._create_readonly_node)
516         for (name, e) in entries.iteritems():
517             assert isinstance(name, unicode)
518             if len(e) == 2:
519                 writecap, readcap = e
520                 metadata = None
521             else:
522                 assert len(e) == 3
523                 writecap, readcap, metadata = e
524             precondition(isinstance(writecap, (str,type(None))), writecap)
525             precondition(isinstance(readcap, (str,type(None))), readcap)
526             
527             # We now allow packing unknown nodes, provided they are valid
528             # for this type of directory.
529             child_node = self._create_and_validate_node(writecap, readcap, name)
530             a.set_node(name, child_node, metadata)
531         d = self._node.modify(a.modify)
532         d.addCallback(lambda ign: self)
533         return d
534
535     def set_node(self, name, child, metadata=None, overwrite=True):
536         """I add a child at the specific name. I return a Deferred that fires
537         when the operation finishes. This Deferred will fire with the child
538         node that was just added. I will replace any existing child of the
539         same name.
540
541         If this directory node is read-only, the Deferred will errback with a
542         NotWriteableError."""
543
544         precondition(IFilesystemNode.providedBy(child), child)
545
546         if self.is_readonly():
547             return defer.fail(NotWriteableError())
548         assert isinstance(name, unicode)
549         assert IFilesystemNode.providedBy(child), child
550         a = Adder(self, overwrite=overwrite,
551                   create_readonly_node=self._create_readonly_node)
552         a.set_node(name, child, metadata)
553         d = self._node.modify(a.modify)
554         d.addCallback(lambda res: child)
555         return d
556
557     def set_nodes(self, entries, overwrite=True):
558         precondition(isinstance(entries, dict), entries)
559         if self.is_readonly():
560             return defer.fail(NotWriteableError())
561         a = Adder(self, entries, overwrite=overwrite,
562                   create_readonly_node=self._create_readonly_node)
563         d = self._node.modify(a.modify)
564         d.addCallback(lambda res: self)
565         return d
566
567
568     def add_file(self, name, uploadable, metadata=None, overwrite=True):
569         """I upload a file (using the given IUploadable), then attach the
570         resulting FileNode to the directory at the given name. I return a
571         Deferred that fires (with the IFileNode of the uploaded file) when
572         the operation completes."""
573         assert isinstance(name, unicode)
574         if self.is_readonly():
575             return defer.fail(NotWriteableError())
576         d = self._uploader.upload(uploadable)
577         d.addCallback(lambda results:
578                       self._create_and_validate_node(results.uri, None, name))
579         d.addCallback(lambda node:
580                       self.set_node(name, node, metadata, overwrite))
581         return d
582
583     def delete(self, name, must_exist=True, must_be_directory=False, must_be_file=False):
584         """I remove the child at the specific name. I return a Deferred that
585         fires (with the node just removed) when the operation finishes."""
586         assert isinstance(name, unicode)
587         if self.is_readonly():
588             return defer.fail(NotWriteableError())
589         deleter = Deleter(self, name, must_exist=must_exist,
590                           must_be_directory=must_be_directory, must_be_file=must_be_file)
591         d = self._node.modify(deleter.modify)
592         d.addCallback(lambda res: deleter.old_child)
593         return d
594
595     def create_subdirectory(self, name, initial_children={}, overwrite=True,
596                             mutable=True, metadata=None):
597         assert isinstance(name, unicode)
598         if self.is_readonly():
599             return defer.fail(NotWriteableError())
600         if mutable:
601             d = self._nodemaker.create_new_mutable_directory(initial_children)
602         else:
603             d = self._nodemaker.create_immutable_directory(initial_children)
604         def _created(child):
605             entries = {name: (child, metadata)}
606             a = Adder(self, entries, overwrite=overwrite,
607                       create_readonly_node=self._create_readonly_node)
608             d = self._node.modify(a.modify)
609             d.addCallback(lambda res: child)
610             return d
611         d.addCallback(_created)
612         return d
613
614     def move_child_to(self, current_child_name, new_parent,
615                       new_child_name=None, overwrite=True):
616         """I take one of my children and move them to a new parent. The child
617         is referenced by name. On the new parent, the child will live under
618         'new_child_name', which defaults to 'current_child_name'. I return a
619         Deferred that fires when the operation finishes."""
620         assert isinstance(current_child_name, unicode)
621         if self.is_readonly() or new_parent.is_readonly():
622             return defer.fail(NotWriteableError())
623         if new_child_name is None:
624             new_child_name = current_child_name
625         assert isinstance(new_child_name, unicode)
626         d = self.get(current_child_name)
627         def sn(child):
628             return new_parent.set_node(new_child_name, child,
629                                        overwrite=overwrite)
630         d.addCallback(sn)
631         d.addCallback(lambda child: self.delete(current_child_name))
632         return d
633
634
635     def deep_traverse(self, walker):
636         """Perform a recursive walk, using this dirnode as a root, notifying
637         the 'walker' instance of everything I encounter.
638
639         I call walker.enter_directory(parent, children) once for each dirnode
640         I visit, immediately after retrieving the list of children. I pass in
641         the parent dirnode and the dict of childname->(childnode,metadata).
642         This function should *not* traverse the children: I will do that.
643         enter_directory() is most useful for the deep-stats number that
644         counts how large a directory is.
645
646         I call walker.add_node(node, path) for each node (both files and
647         directories) I can reach. Most work should be done here.
648
649         I avoid loops by keeping track of verifier-caps and refusing to call
650         walker.add_node() or traverse a node that I've seen before. This
651         means that any file or directory will only be given to the walker
652         once. If files or directories are referenced multiple times by a
653         directory structure, this may appear to under-count or miss some of
654         them.
655
656         I return a Monitor which can be used to wait for the operation to
657         finish, learn about its progress, or cancel the operation.
658         """
659
660         # this is just a tree-walker, except that following each edge
661         # requires a Deferred. We used to use a ConcurrencyLimiter to limit
662         # fanout to 10 simultaneous operations, but the memory load of the
663         # queued operations was excessive (in one case, with 330k dirnodes,
664         # it caused the process to run into the 3.0GB-ish per-process 32bit
665         # linux memory limit, and crashed). So we use a single big Deferred
666         # chain, and do a strict depth-first traversal, one node at a time.
667         # This can be slower, because we aren't pipelining directory reads,
668         # but it brought the memory footprint down by roughly 50%.
669
670         monitor = Monitor()
671         walker.set_monitor(monitor)
672
673         found = set([self.get_verify_cap()])
674         d = self._deep_traverse_dirnode(self, [], walker, monitor, found)
675         d.addCallback(lambda ignored: walker.finish())
676         d.addBoth(monitor.finish)
677         d.addErrback(lambda f: None)
678
679         return monitor
680
681     def _deep_traverse_dirnode(self, node, path, walker, monitor, found):
682         # process this directory, then walk its children
683         monitor.raise_if_cancelled()
684         d = defer.maybeDeferred(walker.add_node, node, path)
685         d.addCallback(lambda ignored: node.list())
686         d.addCallback(self._deep_traverse_dirnode_children, node, path,
687                       walker, monitor, found)
688         return d
689
690     def _deep_traverse_dirnode_children(self, children, parent, path,
691                                         walker, monitor, found):
692         monitor.raise_if_cancelled()
693         d = defer.maybeDeferred(walker.enter_directory, parent, children)
694         # we process file-like children first, so we can drop their FileNode
695         # objects as quickly as possible. Tests suggest that a FileNode (held
696         # in the client's nodecache) consumes about 2440 bytes. dirnodes (not
697         # in the nodecache) seem to consume about 2000 bytes.
698         dirkids = []
699         filekids = []
700         for name, (child, metadata) in sorted(children.iteritems()):
701             childpath = path + [name]
702             if isinstance(child, UnknownNode):
703                 walker.add_node(child, childpath)
704                 continue
705             verifier = child.get_verify_cap()
706             # allow LIT files (for which verifier==None) to be processed
707             if (verifier is not None) and (verifier in found):
708                 continue
709             found.add(verifier)
710             if IDirectoryNode.providedBy(child):
711                 dirkids.append( (child, childpath) )
712             else:
713                 filekids.append( (child, childpath) )
714         for i, (child, childpath) in enumerate(filekids):
715             d.addCallback(lambda ignored, child=child, childpath=childpath:
716                           walker.add_node(child, childpath))
717             # to work around the Deferred tail-recursion problem
718             # (specifically the defer.succeed flavor) requires us to avoid
719             # doing more than 158 LIT files in a row. We insert a turn break
720             # once every 100 files (LIT or CHK) to preserve some stack space
721             # for other code. This is a different expression of the same
722             # Twisted problem as in #237.
723             if i % 100 == 99:
724                 d.addCallback(lambda ignored: fireEventually())
725         for (child, childpath) in dirkids:
726             d.addCallback(lambda ignored, child=child, childpath=childpath:
727                           self._deep_traverse_dirnode(child, childpath,
728                                                       walker, monitor,
729                                                       found))
730         return d
731
732
733     def build_manifest(self):
734         """Return a Monitor, with a ['status'] that will be a list of (path,
735         cap) tuples, for all nodes (directories and files) reachable from
736         this one."""
737         walker = ManifestWalker(self)
738         return self.deep_traverse(walker)
739
740     def start_deep_stats(self):
741         # Since deep_traverse tracks verifier caps, we avoid double-counting
742         # children for which we've got both a write-cap and a read-cap
743         return self.deep_traverse(DeepStats(self))
744
745     def start_deep_check(self, verify=False, add_lease=False):
746         return self.deep_traverse(DeepChecker(self, verify, repair=False, add_lease=add_lease))
747
748     def start_deep_check_and_repair(self, verify=False, add_lease=False):
749         return self.deep_traverse(DeepChecker(self, verify, repair=True, add_lease=add_lease))
750
751
752
753 class DeepStats:
754     def __init__(self, origin):
755         self.origin = origin
756         self.stats = {}
757         for k in ["count-immutable-files",
758                   "count-mutable-files",
759                   "count-literal-files",
760                   "count-files",
761                   "count-directories",
762                   "count-unknown",
763                   "size-immutable-files",
764                   #"size-mutable-files",
765                   "size-literal-files",
766                   "size-directories",
767                   "largest-directory",
768                   "largest-directory-children",
769                   "largest-immutable-file",
770                   #"largest-mutable-file",
771                   ]:
772             self.stats[k] = 0
773         self.histograms = {}
774         for k in ["size-files-histogram"]:
775             self.histograms[k] = {} # maps (min,max) to count
776         self.buckets = [ (0,0), (1,3)]
777         self.root = math.sqrt(10)
778
779     def set_monitor(self, monitor):
780         self.monitor = monitor
781         monitor.origin_si = self.origin.get_storage_index()
782         monitor.set_status(self.get_results())
783
784     def add_node(self, node, childpath):
785         if isinstance(node, UnknownNode):
786             self.add("count-unknown")
787         elif IDirectoryNode.providedBy(node):
788             self.add("count-directories")
789         elif IMutableFileNode.providedBy(node):
790             self.add("count-files")
791             self.add("count-mutable-files")
792             # TODO: update the servermap, compute a size, add it to
793             # size-mutable-files, max it into "largest-mutable-file"
794         elif IImmutableFileNode.providedBy(node): # CHK and LIT
795             self.add("count-files")
796             size = node.get_size()
797             self.histogram("size-files-histogram", size)
798             theuri = from_string(node.get_uri())
799             if isinstance(theuri, LiteralFileURI):
800                 self.add("count-literal-files")
801                 self.add("size-literal-files", size)
802             else:
803                 self.add("count-immutable-files")
804                 self.add("size-immutable-files", size)
805                 self.max("largest-immutable-file", size)
806
807     def enter_directory(self, parent, children):
808         dirsize_bytes = parent.get_size()
809         if dirsize_bytes is not None:
810             self.add("size-directories", dirsize_bytes)
811             self.max("largest-directory", dirsize_bytes)
812         dirsize_children = len(children)
813         self.max("largest-directory-children", dirsize_children)
814
815     def add(self, key, value=1):
816         self.stats[key] += value
817
818     def max(self, key, value):
819         self.stats[key] = max(self.stats[key], value)
820
821     def which_bucket(self, size):
822         # return (min,max) such that min <= size <= max
823         # values are from the set (0,0), (1,3), (4,10), (11,31), (32,100),
824         # (101,316), (317, 1000), etc: two per decade
825         assert size >= 0
826         i = 0
827         while True:
828             if i >= len(self.buckets):
829                 # extend the list
830                 new_lower = self.buckets[i-1][1]+1
831                 new_upper = int(mathutil.next_power_of_k(new_lower, self.root))
832                 self.buckets.append( (new_lower, new_upper) )
833             maybe = self.buckets[i]
834             if maybe[0] <= size <= maybe[1]:
835                 return maybe
836             i += 1
837
838     def histogram(self, key, size):
839         bucket = self.which_bucket(size)
840         h = self.histograms[key]
841         if bucket not in h:
842             h[bucket] = 0
843         h[bucket] += 1
844
845     def get_results(self):
846         stats = self.stats.copy()
847         for key in self.histograms:
848             h = self.histograms[key]
849             out = [ (bucket[0], bucket[1], h[bucket]) for bucket in h ]
850             out.sort()
851             stats[key] = out
852         return stats
853
854     def finish(self):
855         return self.get_results()
856
857 class ManifestWalker(DeepStats):
858     def __init__(self, origin):
859         DeepStats.__init__(self, origin)
860         self.manifest = []
861         self.storage_index_strings = set()
862         self.verifycaps = set()
863
864     def add_node(self, node, path):
865         self.manifest.append( (tuple(path), node.get_uri()) )
866         si = node.get_storage_index()
867         if si:
868             self.storage_index_strings.add(base32.b2a(si))
869         v = node.get_verify_cap()
870         if v:
871             self.verifycaps.add(v.to_string())
872         return DeepStats.add_node(self, node, path)
873
874     def get_results(self):
875         stats = DeepStats.get_results(self)
876         return {"manifest": self.manifest,
877                 "verifycaps": self.verifycaps,
878                 "storage-index": self.storage_index_strings,
879                 "stats": stats,
880                 }
881
882
883 class DeepChecker:
884     def __init__(self, root, verify, repair, add_lease):
885         root_si = root.get_storage_index()
886         if root_si:
887             root_si_base32 = base32.b2a(root_si)
888         else:
889             root_si_base32 = ""
890         self._lp = log.msg(format="deep-check starting (%(si)s),"
891                            " verify=%(verify)s, repair=%(repair)s",
892                            si=root_si_base32, verify=verify, repair=repair)
893         self._verify = verify
894         self._repair = repair
895         self._add_lease = add_lease
896         if repair:
897             self._results = DeepCheckAndRepairResults(root_si)
898         else:
899             self._results = DeepCheckResults(root_si)
900         self._stats = DeepStats(root)
901
902     def set_monitor(self, monitor):
903         self.monitor = monitor
904         monitor.set_status(self._results)
905
906     def add_node(self, node, childpath):
907         if self._repair:
908             d = node.check_and_repair(self.monitor, self._verify, self._add_lease)
909             d.addCallback(self._results.add_check_and_repair, childpath)
910         else:
911             d = node.check(self.monitor, self._verify, self._add_lease)
912             d.addCallback(self._results.add_check, childpath)
913         d.addCallback(lambda ignored: self._stats.add_node(node, childpath))
914         return d
915
916     def enter_directory(self, parent, children):
917         return self._stats.enter_directory(parent, children)
918
919     def finish(self):
920         log.msg("deep-check done", parent=self._lp)
921         self._results.update_stats(self._stats.get_results())
922         return self._results
923
924
925 # use client.create_dirnode() to make one of these
926
927