]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/dirnode.py
dirnode.py: Fix #1034 (MetadataSetter does not enforce restriction on setting 'tahoe...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / dirnode.py
1
2 import time, math
3
4 from zope.interface import implements
5 from twisted.internet import defer
6 from foolscap.api import fireEventually
7 import simplejson
8 from allmydata.mutable.common import NotWriteableError
9 from allmydata.mutable.filenode import MutableFileNode
10 from allmydata.unknown import UnknownNode, strip_prefix_for_ro
11 from allmydata.interfaces import IFilesystemNode, IDirectoryNode, IFileNode, \
12      IImmutableFileNode, IMutableFileNode, \
13      ExistingChildError, NoSuchChildError, ICheckable, IDeepCheckable, \
14      MustBeDeepImmutableError, CapConstraintError
15 from allmydata.check_results import DeepCheckResults, \
16      DeepCheckAndRepairResults
17 from allmydata.monitor import Monitor
18 from allmydata.util import hashutil, mathutil, base32, log
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.netstring import netstring, split_netstring
21 from allmydata.util.consumer import download_to_data
22 from allmydata.uri import LiteralFileURI, from_string, wrap_dirnode_cap
23 from pycryptopp.cipher.aes import AES
24 from allmydata.util.dictutil import AuxValueDict
25
26
27 def update_metadata(metadata, new_metadata, now):
28     """Updates 'metadata' in-place with the information in 'new_metadata'.
29     Timestamps are set according to the time 'now'."""
30
31     if metadata is None:
32         metadata = {"ctime": now,
33                     "mtime": now,
34                     "tahoe": {
35                         "linkcrtime": now,
36                         "linkmotime": now,
37                         }
38                     }
39
40     if new_metadata is not None:
41         # Overwrite all metadata.
42         newmd = new_metadata.copy()
43
44         # Except 'tahoe'.
45         if newmd.has_key('tahoe'):
46             del newmd['tahoe']
47         if metadata.has_key('tahoe'):
48             newmd['tahoe'] = metadata['tahoe']
49
50         metadata = newmd
51     else:
52         # For backwards compatibility with Tahoe < 1.4.0:
53         if "ctime" not in metadata:
54             metadata["ctime"] = now
55         metadata["mtime"] = now
56
57     # update timestamps
58     sysmd = metadata.get('tahoe', {})
59     if not 'linkcrtime' in sysmd:
60         if "ctime" in metadata:
61             # In Tahoe < 1.4.0 we used the word "ctime" to mean what Tahoe >= 1.4.0
62             # calls "linkcrtime".
63             sysmd["linkcrtime"] = metadata["ctime"]
64         else:
65             sysmd["linkcrtime"] = now
66     sysmd["linkmotime"] = now
67
68     return metadata
69
70
71 # TODO: {Deleter,MetadataSetter,Adder}.modify all start by unpacking the
72 # contents and end by repacking them. It might be better to apply them to
73 # the unpacked contents.
74
75 class Deleter:
76     def __init__(self, node, name, must_exist=True):
77         self.node = node
78         self.name = name
79         self.must_exist = True
80     def modify(self, old_contents, servermap, first_time):
81         children = self.node._unpack_contents(old_contents)
82         if self.name not in children:
83             if first_time and self.must_exist:
84                 raise NoSuchChildError(self.name)
85             self.old_child = None
86             return None
87         self.old_child, metadata = children[self.name]
88         del children[self.name]
89         new_contents = self.node._pack_contents(children)
90         return new_contents
91
92
93 class MetadataSetter:
94     def __init__(self, node, name, metadata, create_readonly_node=None):
95         self.node = node
96         self.name = name
97         self.metadata = metadata
98         self.create_readonly_node = create_readonly_node
99
100     def modify(self, old_contents, servermap, first_time):
101         children = self.node._unpack_contents(old_contents)
102         name = self.name
103         if name not in children:
104             raise NoSuchChildError(name)
105
106         now = time.time()
107         metadata = update_metadata(children[name][1].copy(), self.metadata, now)
108         child = children[name][0]
109         if self.create_readonly_node and metadata and metadata.get('no-write', False):
110             child = self.create_readonly_node(child, name)
111
112         children[name] = (child, metadata)
113         new_contents = self.node._pack_contents(children)
114         return new_contents
115
116
117 class Adder:
118     def __init__(self, node, entries=None, overwrite=True, create_readonly_node=None):
119         self.node = node
120         if entries is None:
121             entries = {}
122         precondition(isinstance(entries, dict), entries)
123         self.entries = entries
124         self.overwrite = overwrite
125         self.create_readonly_node = create_readonly_node
126
127     def set_node(self, name, node, metadata):
128         precondition(isinstance(name, unicode), name)
129         precondition(IFilesystemNode.providedBy(node), node)
130         self.entries[name] = (node, metadata)
131
132     def modify(self, old_contents, servermap, first_time):
133         children = self.node._unpack_contents(old_contents)
134         now = time.time()
135         for (name, (child, new_metadata)) in self.entries.iteritems():
136             precondition(isinstance(name, unicode), name)
137             precondition(IFilesystemNode.providedBy(child), child)
138
139             # Strictly speaking this is redundant because we would raise the
140             # error again in pack_children.
141             child.raise_error()
142
143             metadata = None
144             if name in children:
145                 if not self.overwrite:
146                     raise ExistingChildError("child '%s' already exists" % name)
147
148                 if self.overwrite == "only-files" and IDirectoryNode.providedBy(children[name][0]):
149                     raise ExistingChildError("child '%s' already exists" % name)
150                 metadata = children[name][1].copy()
151
152             if self.create_readonly_node and metadata and metadata.get('no-write', False):
153                 child = self.create_readonly_node(child, name)
154
155             children[name] = (child, update_metadata(metadata, new_metadata, now))
156         new_contents = self.node._pack_contents(children)
157         return new_contents
158
159 def _encrypt_rw_uri(filenode, rw_uri):
160     assert isinstance(rw_uri, str)
161     writekey = filenode.get_writekey()
162     if not writekey:
163         return ""
164     salt = hashutil.mutable_rwcap_salt_hash(rw_uri)
165     key = hashutil.mutable_rwcap_key_hash(salt, writekey)
166     cryptor = AES(key)
167     crypttext = cryptor.process(rw_uri)
168     mac = hashutil.hmac(key, salt + crypttext)
169     assert len(mac) == 32
170     return salt + crypttext + mac
171     # The MAC is not checked by readers in Tahoe >= 1.3.0, but we still
172     # produce it for the sake of older readers.
173
174 def pack_children(filenode, children, deep_immutable=False):
175     """Take a dict that maps:
176          children[unicode_name] = (IFileSystemNode, metadata_dict)
177     and pack it into a single string, for use as the contents of the backing
178     file. This is the same format as is returned by _unpack_contents. I also
179     accept an AuxValueDict, in which case I'll use the auxilliary cached data
180     as the pre-packed entry, which is faster than re-packing everything each
181     time.
182
183     If deep_immutable is True, I will require that all my children are deeply
184     immutable, and will raise a MustBeDeepImmutableError if not.
185     """
186
187     has_aux = isinstance(children, AuxValueDict)
188     entries = []
189     for name in sorted(children.keys()):
190         assert isinstance(name, unicode)
191         entry = None
192         (child, metadata) = children[name]
193         child.raise_error()
194         if deep_immutable and not child.is_allowed_in_immutable_directory():
195             raise MustBeDeepImmutableError("child '%s' is not allowed in an immutable directory" % (name,), name)
196         if has_aux:
197             entry = children.get_aux(name)
198         if not entry:
199             assert IFilesystemNode.providedBy(child), (name,child)
200             assert isinstance(metadata, dict)
201             rw_uri = child.get_write_uri()
202             if rw_uri is None:
203                 rw_uri = ""
204             assert isinstance(rw_uri, str), rw_uri
205             
206             # should be prevented by MustBeDeepImmutableError check above
207             assert not (rw_uri and deep_immutable)
208
209             ro_uri = child.get_readonly_uri()
210             if ro_uri is None:
211                 ro_uri = ""
212             assert isinstance(ro_uri, str), ro_uri
213             entry = "".join([netstring(name.encode("utf-8")),
214                              netstring(strip_prefix_for_ro(ro_uri, deep_immutable)),
215                              netstring(_encrypt_rw_uri(filenode, rw_uri)),
216                              netstring(simplejson.dumps(metadata))])
217         entries.append(netstring(entry))
218     return "".join(entries)
219
220 class DirectoryNode:
221     implements(IDirectoryNode, ICheckable, IDeepCheckable)
222     filenode_class = MutableFileNode
223
224     def __init__(self, filenode, nodemaker, uploader):
225         assert IFileNode.providedBy(filenode), filenode
226         assert not IDirectoryNode.providedBy(filenode), filenode
227         self._node = filenode
228         filenode_cap = filenode.get_cap()
229         self._uri = wrap_dirnode_cap(filenode_cap)
230         self._nodemaker = nodemaker
231         self._uploader = uploader
232
233     def __repr__(self):
234         return "<%s %s-%s %s>" % (self.__class__.__name__,
235                                   self.is_readonly() and "RO" or "RW",
236                                   self.is_mutable() and "MUT" or "IMM",
237                                   hasattr(self, '_uri') and self._uri.abbrev())
238
239     def get_size(self):
240         """Return the size of our backing mutable file, in bytes, if we've
241         fetched it. Otherwise return None. This returns synchronously."""
242         return self._node.get_size()
243
244     def get_current_size(self):
245         """Calculate the size of our backing mutable file, in bytes. Returns
246         a Deferred that fires with the result."""
247         return self._node.get_current_size()
248
249     def _read(self):
250         if self._node.is_mutable():
251             # use the IMutableFileNode API.
252             d = self._node.download_best_version()
253         else:
254             d = download_to_data(self._node)
255         d.addCallback(self._unpack_contents)
256         return d
257
258     def _decrypt_rwcapdata(self, encwrcap):
259         salt = encwrcap[:16]
260         crypttext = encwrcap[16:-32]
261         key = hashutil.mutable_rwcap_key_hash(salt, self._node.get_writekey())
262         cryptor = AES(key)
263         plaintext = cryptor.process(crypttext)
264         return plaintext
265
266     def _create_and_validate_node(self, rw_uri, ro_uri, name):
267         node = self._nodemaker.create_from_cap(rw_uri, ro_uri,
268                                                deep_immutable=not self.is_mutable(),
269                                                name=name)
270         node.raise_error()
271         return node
272
273     def _create_readonly_node(self, node, name):
274         if not node.is_unknown() and node.is_readonly():
275             return node
276         return self._create_and_validate_node(None, node.get_readonly_uri(), name=name)
277
278     def _unpack_contents(self, data):
279         # the directory is serialized as a list of netstrings, one per child.
280         # Each child is serialized as a list of four netstrings: (name, ro_uri,
281         # rwcapdata, metadata), in which the name, ro_uri, metadata are in
282         # cleartext. The 'name' is UTF-8 encoded. The rwcapdata is formatted as:
283         # pack("16ss32s", iv, AES(H(writekey+iv), plaintext_rw_uri), mac)
284         assert isinstance(data, str), (repr(data), type(data))
285         # an empty directory is serialized as an empty string
286         if data == "":
287             return AuxValueDict()
288         writeable = not self.is_readonly()
289         mutable = self.is_mutable()
290         children = AuxValueDict()
291         position = 0
292         while position < len(data):
293             entries, position = split_netstring(data, 1, position)
294             entry = entries[0]
295             (name_utf8, ro_uri, rwcapdata, metadata_s), subpos = split_netstring(entry, 4)
296             if not mutable and len(rwcapdata) > 0:
297                 raise ValueError("the rwcapdata field of a dirnode in an immutable directory was not empty")
298             name = name_utf8.decode("utf-8")
299             rw_uri = ""
300             if writeable:
301                 rw_uri = self._decrypt_rwcapdata(rwcapdata)
302
303             # Since the encryption uses CTR mode, it currently leaks the length of the
304             # plaintext rw_uri -- and therefore whether it is present, i.e. whether the
305             # dirnode is writeable (ticket #925). By stripping trailing spaces in
306             # Tahoe >= 1.6.0, we may make it easier for future versions to plug this leak.
307             # ro_uri is treated in the same way for consistency.
308             # rw_uri and ro_uri will be either None or a non-empty string.
309
310             rw_uri = rw_uri.rstrip(' ') or None
311             ro_uri = ro_uri.rstrip(' ') or None
312
313             try:
314                 child = self._create_and_validate_node(rw_uri, ro_uri, name)
315                 if mutable or child.is_allowed_in_immutable_directory():
316                     metadata = simplejson.loads(metadata_s)
317                     assert isinstance(metadata, dict)
318                     children[name] = (child, metadata)
319                     children.set_with_aux(name, (child, metadata), auxilliary=entry)
320                 else:
321                     log.msg(format="mutable cap for child '%(name)s' unpacked from an immutable directory",
322                                    name=name_utf8,
323                                    facility="tahoe.webish", level=log.UNUSUAL)
324             except CapConstraintError, e:
325                 log.msg(format="unmet constraint on cap for child '%(name)s' unpacked from a directory:\n"
326                                "%(message)s", message=e.args[0], name=name_utf8,
327                                facility="tahoe.webish", level=log.UNUSUAL)
328
329         return children
330
331     def _pack_contents(self, children):
332         # expects children in the same format as _unpack_contents
333         return pack_children(self._node, children)
334
335     def is_readonly(self):
336         return self._node.is_readonly()
337
338     def is_mutable(self):
339         return self._node.is_mutable()
340
341     def is_unknown(self):
342         return False
343
344     def is_allowed_in_immutable_directory(self):
345         return not self._node.is_mutable()
346
347     def raise_error(self):
348         pass
349
350     def get_uri(self):
351         return self._uri.to_string()
352
353     def get_write_uri(self):
354         if self.is_readonly():
355             return None
356         return self._uri.to_string()
357
358     def get_readonly_uri(self):
359         return self._uri.get_readonly().to_string()
360
361     def get_cap(self):
362         return self._uri
363
364     def get_readcap(self):
365         return self._uri.get_readonly()
366
367     def get_verify_cap(self):
368         return self._uri.get_verify_cap()
369
370     def get_repair_cap(self):
371         if self._node.is_readonly():
372             return None # readonly (mutable) dirnodes are not yet repairable
373         return self._uri
374
375     def get_storage_index(self):
376         return self._uri.get_storage_index()
377
378     def check(self, monitor, verify=False, add_lease=False):
379         """Perform a file check. See IChecker.check for details."""
380         return self._node.check(monitor, verify, add_lease)
381     def check_and_repair(self, monitor, verify=False, add_lease=False):
382         return self._node.check_and_repair(monitor, verify, add_lease)
383
384     def list(self):
385         """I return a Deferred that fires with a dictionary mapping child
386         name to a tuple of (IFilesystemNode, metadata)."""
387         return self._read()
388
389     def has_child(self, name):
390         """I return a Deferred that fires with a boolean, True if there
391         exists a child of the given name, False if not."""
392         assert isinstance(name, unicode)
393         d = self._read()
394         d.addCallback(lambda children: children.has_key(name))
395         return d
396
397     def _get(self, children, name):
398         child = children.get(name)
399         if child is None:
400             raise NoSuchChildError(name)
401         return child[0]
402
403     def _get_with_metadata(self, children, name):
404         child = children.get(name)
405         if child is None:
406             raise NoSuchChildError(name)
407         return child
408
409     def get(self, name):
410         """I return a Deferred that fires with the named child node,
411         which is an IFilesystemNode."""
412         assert isinstance(name, unicode)
413         d = self._read()
414         d.addCallback(self._get, name)
415         return d
416
417     def get_child_and_metadata(self, name):
418         """I return a Deferred that fires with the (node, metadata) pair for
419         the named child. The node is an IFilesystemNode, and the metadata
420         is a dictionary."""
421         assert isinstance(name, unicode)
422         d = self._read()
423         d.addCallback(self._get_with_metadata, name)
424         return d
425
426     def get_metadata_for(self, name):
427         assert isinstance(name, unicode)
428         d = self._read()
429         d.addCallback(lambda children: children[name][1])
430         return d
431
432     def set_metadata_for(self, name, metadata):
433         assert isinstance(name, unicode)
434         if self.is_readonly():
435             return defer.fail(NotWriteableError())
436         assert isinstance(metadata, dict)
437         s = MetadataSetter(self, name, metadata,
438                            create_readonly_node=self._create_readonly_node)
439         d = self._node.modify(s.modify)
440         d.addCallback(lambda res: self)
441         return d
442
443     def get_child_at_path(self, path):
444         """Transform a child path into an IFilesystemNode.
445
446         I perform a recursive series of 'get' operations to find the named
447         descendant node. I return a Deferred that fires with the node, or
448         errbacks with IndexError if the node could not be found.
449
450         The path can be either a single string (slash-separated) or a list of
451         path-name elements.
452         """
453         d = self.get_child_and_metadata_at_path(path)
454         d.addCallback(lambda (node, metadata): node)
455         return d
456
457     def get_child_and_metadata_at_path(self, path):
458         """Transform a child path into an IFilesystemNode and
459         a metadata dictionary from the last edge that was traversed.
460         """
461
462         if not path:
463             return defer.succeed((self, {}))
464         if isinstance(path, (list, tuple)):
465             pass
466         else:
467             path = path.split("/")
468         for p in path:
469             assert isinstance(p, unicode)
470         childname = path[0]
471         remaining_path = path[1:]
472         if remaining_path:
473             d = self.get(childname)
474             d.addCallback(lambda node:
475                           node.get_child_and_metadata_at_path(remaining_path))
476             return d
477         d = self.get_child_and_metadata(childname)
478         return d
479
480     def set_uri(self, name, writecap, readcap, metadata=None, overwrite=True):
481         precondition(isinstance(name, unicode), name)
482         precondition(isinstance(writecap, (str,type(None))), writecap)
483         precondition(isinstance(readcap, (str,type(None))), readcap)
484
485         # We now allow packing unknown nodes, provided they are valid
486         # for this type of directory.
487         child_node = self._create_and_validate_node(writecap, readcap, name)
488         d = self.set_node(name, child_node, metadata, overwrite)
489         d.addCallback(lambda res: child_node)
490         return d
491
492     def set_children(self, entries, overwrite=True):
493         # this takes URIs
494         a = Adder(self, overwrite=overwrite,
495                   create_readonly_node=self._create_readonly_node)
496         for (name, e) in entries.iteritems():
497             assert isinstance(name, unicode)
498             if len(e) == 2:
499                 writecap, readcap = e
500                 metadata = None
501             else:
502                 assert len(e) == 3
503                 writecap, readcap, metadata = e
504             precondition(isinstance(writecap, (str,type(None))), writecap)
505             precondition(isinstance(readcap, (str,type(None))), readcap)
506             
507             # We now allow packing unknown nodes, provided they are valid
508             # for this type of directory.
509             child_node = self._create_and_validate_node(writecap, readcap, name)
510             a.set_node(name, child_node, metadata)
511         d = self._node.modify(a.modify)
512         d.addCallback(lambda ign: self)
513         return d
514
515     def set_node(self, name, child, metadata=None, overwrite=True):
516         """I add a child at the specific name. I return a Deferred that fires
517         when the operation finishes. This Deferred will fire with the child
518         node that was just added. I will replace any existing child of the
519         same name.
520
521         If this directory node is read-only, the Deferred will errback with a
522         NotWriteableError."""
523
524         precondition(IFilesystemNode.providedBy(child), child)
525
526         if self.is_readonly():
527             return defer.fail(NotWriteableError())
528         assert isinstance(name, unicode)
529         assert IFilesystemNode.providedBy(child), child
530         a = Adder(self, overwrite=overwrite,
531                   create_readonly_node=self._create_readonly_node)
532         a.set_node(name, child, metadata)
533         d = self._node.modify(a.modify)
534         d.addCallback(lambda res: child)
535         return d
536
537     def set_nodes(self, entries, overwrite=True):
538         precondition(isinstance(entries, dict), entries)
539         if self.is_readonly():
540             return defer.fail(NotWriteableError())
541         a = Adder(self, entries, overwrite=overwrite,
542                   create_readonly_node=self._create_readonly_node)
543         d = self._node.modify(a.modify)
544         d.addCallback(lambda res: self)
545         return d
546
547
548     def add_file(self, name, uploadable, metadata=None, overwrite=True):
549         """I upload a file (using the given IUploadable), then attach the
550         resulting FileNode to the directory at the given name. I return a
551         Deferred that fires (with the IFileNode of the uploaded file) when
552         the operation completes."""
553         assert isinstance(name, unicode)
554         if self.is_readonly():
555             return defer.fail(NotWriteableError())
556         d = self._uploader.upload(uploadable)
557         d.addCallback(lambda results:
558                       self._create_and_validate_node(results.uri, None, name))
559         d.addCallback(lambda node:
560                       self.set_node(name, node, metadata, overwrite))
561         return d
562
563     def delete(self, name):
564         """I remove the child at the specific name. I return a Deferred that
565         fires (with the node just removed) when the operation finishes."""
566         assert isinstance(name, unicode)
567         if self.is_readonly():
568             return defer.fail(NotWriteableError())
569         deleter = Deleter(self, name)
570         d = self._node.modify(deleter.modify)
571         d.addCallback(lambda res: deleter.old_child)
572         return d
573
574     def create_subdirectory(self, name, initial_children={}, overwrite=True,
575                             mutable=True):
576         assert isinstance(name, unicode)
577         if self.is_readonly():
578             return defer.fail(NotWriteableError())
579         if mutable:
580             d = self._nodemaker.create_new_mutable_directory(initial_children)
581         else:
582             d = self._nodemaker.create_immutable_directory(initial_children)
583         def _created(child):
584             entries = {name: (child, None)}
585             a = Adder(self, entries, overwrite=overwrite,
586                       create_readonly_node=self._create_readonly_node)
587             d = self._node.modify(a.modify)
588             d.addCallback(lambda res: child)
589             return d
590         d.addCallback(_created)
591         return d
592
593     def move_child_to(self, current_child_name, new_parent,
594                       new_child_name=None, overwrite=True):
595         """I take one of my children and move them to a new parent. The child
596         is referenced by name. On the new parent, the child will live under
597         'new_child_name', which defaults to 'current_child_name'. I return a
598         Deferred that fires when the operation finishes."""
599         assert isinstance(current_child_name, unicode)
600         if self.is_readonly() or new_parent.is_readonly():
601             return defer.fail(NotWriteableError())
602         if new_child_name is None:
603             new_child_name = current_child_name
604         assert isinstance(new_child_name, unicode)
605         d = self.get(current_child_name)
606         def sn(child):
607             return new_parent.set_node(new_child_name, child,
608                                        overwrite=overwrite)
609         d.addCallback(sn)
610         d.addCallback(lambda child: self.delete(current_child_name))
611         return d
612
613
614     def deep_traverse(self, walker):
615         """Perform a recursive walk, using this dirnode as a root, notifying
616         the 'walker' instance of everything I encounter.
617
618         I call walker.enter_directory(parent, children) once for each dirnode
619         I visit, immediately after retrieving the list of children. I pass in
620         the parent dirnode and the dict of childname->(childnode,metadata).
621         This function should *not* traverse the children: I will do that.
622         enter_directory() is most useful for the deep-stats number that
623         counts how large a directory is.
624
625         I call walker.add_node(node, path) for each node (both files and
626         directories) I can reach. Most work should be done here.
627
628         I avoid loops by keeping track of verifier-caps and refusing to call
629         walker.add_node() or traverse a node that I've seen before. This
630         means that any file or directory will only be given to the walker
631         once. If files or directories are referenced multiple times by a
632         directory structure, this may appear to under-count or miss some of
633         them.
634
635         I return a Monitor which can be used to wait for the operation to
636         finish, learn about its progress, or cancel the operation.
637         """
638
639         # this is just a tree-walker, except that following each edge
640         # requires a Deferred. We used to use a ConcurrencyLimiter to limit
641         # fanout to 10 simultaneous operations, but the memory load of the
642         # queued operations was excessive (in one case, with 330k dirnodes,
643         # it caused the process to run into the 3.0GB-ish per-process 32bit
644         # linux memory limit, and crashed). So we use a single big Deferred
645         # chain, and do a strict depth-first traversal, one node at a time.
646         # This can be slower, because we aren't pipelining directory reads,
647         # but it brought the memory footprint down by roughly 50%.
648
649         monitor = Monitor()
650         walker.set_monitor(monitor)
651
652         found = set([self.get_verify_cap()])
653         d = self._deep_traverse_dirnode(self, [], walker, monitor, found)
654         d.addCallback(lambda ignored: walker.finish())
655         d.addBoth(monitor.finish)
656         d.addErrback(lambda f: None)
657
658         return monitor
659
660     def _deep_traverse_dirnode(self, node, path, walker, monitor, found):
661         # process this directory, then walk its children
662         monitor.raise_if_cancelled()
663         d = defer.maybeDeferred(walker.add_node, node, path)
664         d.addCallback(lambda ignored: node.list())
665         d.addCallback(self._deep_traverse_dirnode_children, node, path,
666                       walker, monitor, found)
667         return d
668
669     def _deep_traverse_dirnode_children(self, children, parent, path,
670                                         walker, monitor, found):
671         monitor.raise_if_cancelled()
672         d = defer.maybeDeferred(walker.enter_directory, parent, children)
673         # we process file-like children first, so we can drop their FileNode
674         # objects as quickly as possible. Tests suggest that a FileNode (held
675         # in the client's nodecache) consumes about 2440 bytes. dirnodes (not
676         # in the nodecache) seem to consume about 2000 bytes.
677         dirkids = []
678         filekids = []
679         for name, (child, metadata) in sorted(children.iteritems()):
680             childpath = path + [name]
681             if isinstance(child, UnknownNode):
682                 walker.add_node(child, childpath)
683                 continue
684             verifier = child.get_verify_cap()
685             # allow LIT files (for which verifier==None) to be processed
686             if (verifier is not None) and (verifier in found):
687                 continue
688             found.add(verifier)
689             if IDirectoryNode.providedBy(child):
690                 dirkids.append( (child, childpath) )
691             else:
692                 filekids.append( (child, childpath) )
693         for i, (child, childpath) in enumerate(filekids):
694             d.addCallback(lambda ignored, child=child, childpath=childpath:
695                           walker.add_node(child, childpath))
696             # to work around the Deferred tail-recursion problem
697             # (specifically the defer.succeed flavor) requires us to avoid
698             # doing more than 158 LIT files in a row. We insert a turn break
699             # once every 100 files (LIT or CHK) to preserve some stack space
700             # for other code. This is a different expression of the same
701             # Twisted problem as in #237.
702             if i % 100 == 99:
703                 d.addCallback(lambda ignored: fireEventually())
704         for (child, childpath) in dirkids:
705             d.addCallback(lambda ignored, child=child, childpath=childpath:
706                           self._deep_traverse_dirnode(child, childpath,
707                                                       walker, monitor,
708                                                       found))
709         return d
710
711
712     def build_manifest(self):
713         """Return a Monitor, with a ['status'] that will be a list of (path,
714         cap) tuples, for all nodes (directories and files) reachable from
715         this one."""
716         walker = ManifestWalker(self)
717         return self.deep_traverse(walker)
718
719     def start_deep_stats(self):
720         # Since deep_traverse tracks verifier caps, we avoid double-counting
721         # children for which we've got both a write-cap and a read-cap
722         return self.deep_traverse(DeepStats(self))
723
724     def start_deep_check(self, verify=False, add_lease=False):
725         return self.deep_traverse(DeepChecker(self, verify, repair=False, add_lease=add_lease))
726
727     def start_deep_check_and_repair(self, verify=False, add_lease=False):
728         return self.deep_traverse(DeepChecker(self, verify, repair=True, add_lease=add_lease))
729
730
731
732 class DeepStats:
733     def __init__(self, origin):
734         self.origin = origin
735         self.stats = {}
736         for k in ["count-immutable-files",
737                   "count-mutable-files",
738                   "count-literal-files",
739                   "count-files",
740                   "count-directories",
741                   "count-unknown",
742                   "size-immutable-files",
743                   #"size-mutable-files",
744                   "size-literal-files",
745                   "size-directories",
746                   "largest-directory",
747                   "largest-directory-children",
748                   "largest-immutable-file",
749                   #"largest-mutable-file",
750                   ]:
751             self.stats[k] = 0
752         self.histograms = {}
753         for k in ["size-files-histogram"]:
754             self.histograms[k] = {} # maps (min,max) to count
755         self.buckets = [ (0,0), (1,3)]
756         self.root = math.sqrt(10)
757
758     def set_monitor(self, monitor):
759         self.monitor = monitor
760         monitor.origin_si = self.origin.get_storage_index()
761         monitor.set_status(self.get_results())
762
763     def add_node(self, node, childpath):
764         if isinstance(node, UnknownNode):
765             self.add("count-unknown")
766         elif IDirectoryNode.providedBy(node):
767             self.add("count-directories")
768         elif IMutableFileNode.providedBy(node):
769             self.add("count-files")
770             self.add("count-mutable-files")
771             # TODO: update the servermap, compute a size, add it to
772             # size-mutable-files, max it into "largest-mutable-file"
773         elif IImmutableFileNode.providedBy(node): # CHK and LIT
774             self.add("count-files")
775             size = node.get_size()
776             self.histogram("size-files-histogram", size)
777             theuri = from_string(node.get_uri())
778             if isinstance(theuri, LiteralFileURI):
779                 self.add("count-literal-files")
780                 self.add("size-literal-files", size)
781             else:
782                 self.add("count-immutable-files")
783                 self.add("size-immutable-files", size)
784                 self.max("largest-immutable-file", size)
785
786     def enter_directory(self, parent, children):
787         dirsize_bytes = parent.get_size()
788         if dirsize_bytes is not None:
789             self.add("size-directories", dirsize_bytes)
790             self.max("largest-directory", dirsize_bytes)
791         dirsize_children = len(children)
792         self.max("largest-directory-children", dirsize_children)
793
794     def add(self, key, value=1):
795         self.stats[key] += value
796
797     def max(self, key, value):
798         self.stats[key] = max(self.stats[key], value)
799
800     def which_bucket(self, size):
801         # return (min,max) such that min <= size <= max
802         # values are from the set (0,0), (1,3), (4,10), (11,31), (32,100),
803         # (101,316), (317, 1000), etc: two per decade
804         assert size >= 0
805         i = 0
806         while True:
807             if i >= len(self.buckets):
808                 # extend the list
809                 new_lower = self.buckets[i-1][1]+1
810                 new_upper = int(mathutil.next_power_of_k(new_lower, self.root))
811                 self.buckets.append( (new_lower, new_upper) )
812             maybe = self.buckets[i]
813             if maybe[0] <= size <= maybe[1]:
814                 return maybe
815             i += 1
816
817     def histogram(self, key, size):
818         bucket = self.which_bucket(size)
819         h = self.histograms[key]
820         if bucket not in h:
821             h[bucket] = 0
822         h[bucket] += 1
823
824     def get_results(self):
825         stats = self.stats.copy()
826         for key in self.histograms:
827             h = self.histograms[key]
828             out = [ (bucket[0], bucket[1], h[bucket]) for bucket in h ]
829             out.sort()
830             stats[key] = out
831         return stats
832
833     def finish(self):
834         return self.get_results()
835
836 class ManifestWalker(DeepStats):
837     def __init__(self, origin):
838         DeepStats.__init__(self, origin)
839         self.manifest = []
840         self.storage_index_strings = set()
841         self.verifycaps = set()
842
843     def add_node(self, node, path):
844         self.manifest.append( (tuple(path), node.get_uri()) )
845         si = node.get_storage_index()
846         if si:
847             self.storage_index_strings.add(base32.b2a(si))
848         v = node.get_verify_cap()
849         if v:
850             self.verifycaps.add(v.to_string())
851         return DeepStats.add_node(self, node, path)
852
853     def get_results(self):
854         stats = DeepStats.get_results(self)
855         return {"manifest": self.manifest,
856                 "verifycaps": self.verifycaps,
857                 "storage-index": self.storage_index_strings,
858                 "stats": stats,
859                 }
860
861
862 class DeepChecker:
863     def __init__(self, root, verify, repair, add_lease):
864         root_si = root.get_storage_index()
865         if root_si:
866             root_si_base32 = base32.b2a(root_si)
867         else:
868             root_si_base32 = ""
869         self._lp = log.msg(format="deep-check starting (%(si)s),"
870                            " verify=%(verify)s, repair=%(repair)s",
871                            si=root_si_base32, verify=verify, repair=repair)
872         self._verify = verify
873         self._repair = repair
874         self._add_lease = add_lease
875         if repair:
876             self._results = DeepCheckAndRepairResults(root_si)
877         else:
878             self._results = DeepCheckResults(root_si)
879         self._stats = DeepStats(root)
880
881     def set_monitor(self, monitor):
882         self.monitor = monitor
883         monitor.set_status(self._results)
884
885     def add_node(self, node, childpath):
886         if self._repair:
887             d = node.check_and_repair(self.monitor, self._verify, self._add_lease)
888             d.addCallback(self._results.add_check_and_repair, childpath)
889         else:
890             d = node.check(self.monitor, self._verify, self._add_lease)
891             d.addCallback(self._results.add_check, childpath)
892         d.addCallback(lambda ignored: self._stats.add_node(node, childpath))
893         return d
894
895     def enter_directory(self, parent, children):
896         return self._stats.enter_directory(parent, children)
897
898     def finish(self):
899         log.msg("deep-check done", parent=self._lp)
900         self._results.update_stats(self._stats.get_results())
901         return self._results
902
903
904 # use client.create_dirnode() to make one of these
905
906