]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/dirnode.py
Rename stringutils to encodingutil, and drop listdir_unicode and open_unicode (since...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / dirnode.py
1
2 import time, math, unicodedata
3
4 from zope.interface import implements
5 from twisted.internet import defer
6 from foolscap.api import fireEventually
7 import simplejson
8 from allmydata.mutable.common import NotWriteableError
9 from allmydata.mutable.filenode import MutableFileNode
10 from allmydata.unknown import UnknownNode, strip_prefix_for_ro
11 from allmydata.interfaces import IFilesystemNode, IDirectoryNode, IFileNode, \
12      IImmutableFileNode, IMutableFileNode, \
13      ExistingChildError, NoSuchChildError, ICheckable, IDeepCheckable, \
14      MustBeDeepImmutableError, CapConstraintError, ChildOfWrongTypeError
15 from allmydata.check_results import DeepCheckResults, \
16      DeepCheckAndRepairResults
17 from allmydata.monitor import Monitor
18 from allmydata.util import hashutil, mathutil, base32, log
19 from allmydata.util.encodingutil import quote_output
20 from allmydata.util.assertutil import precondition
21 from allmydata.util.netstring import netstring, split_netstring
22 from allmydata.util.consumer import download_to_data
23 from allmydata.uri import LiteralFileURI, from_string, wrap_dirnode_cap
24 from pycryptopp.cipher.aes import AES
25 from allmydata.util.dictutil import AuxValueDict
26
27
28 def update_metadata(metadata, new_metadata, now):
29     """Updates 'metadata' in-place with the information in 'new_metadata'.
30     Timestamps are set according to the time 'now'."""
31
32     if metadata is None:
33         metadata = {}
34
35     old_ctime = None
36     if 'ctime' in metadata:
37         old_ctime = metadata['ctime']
38
39     if new_metadata is not None:
40         # Overwrite all metadata.
41         newmd = new_metadata.copy()
42
43         # Except 'tahoe'.
44         if 'tahoe' in newmd:
45             del newmd['tahoe']
46         if 'tahoe' in metadata:
47             newmd['tahoe'] = metadata['tahoe']
48
49         metadata = newmd
50
51     # update timestamps
52     sysmd = metadata.get('tahoe', {})
53     if 'linkcrtime' not in sysmd:
54         # In Tahoe < 1.4.0 we used the word 'ctime' to mean what Tahoe >= 1.4.0
55         # calls 'linkcrtime'. This field is only used if it was in the old metadata,
56         # and 'tahoe:linkcrtime' was not.
57         if old_ctime is not None:
58             sysmd['linkcrtime'] = old_ctime
59         else:
60             sysmd['linkcrtime'] = now
61
62     sysmd['linkmotime'] = now
63     metadata['tahoe'] = sysmd
64
65     return metadata
66
67
68 # 'x' at the end of a variable name indicates that it holds a Unicode string that may not
69 # be NFC-normalized.
70
71 def normalize(namex):
72     return unicodedata.normalize('NFC', namex)
73
74 # TODO: {Deleter,MetadataSetter,Adder}.modify all start by unpacking the
75 # contents and end by repacking them. It might be better to apply them to
76 # the unpacked contents.
77
78 class Deleter:
79     def __init__(self, node, namex, must_exist=True, must_be_directory=False, must_be_file=False):
80         self.node = node
81         self.name = normalize(namex)
82         self.must_exist = must_exist
83         self.must_be_directory = must_be_directory
84         self.must_be_file = must_be_file
85
86     def modify(self, old_contents, servermap, first_time):
87         children = self.node._unpack_contents(old_contents)
88         if self.name not in children:
89             if first_time and self.must_exist:
90                 raise NoSuchChildError(self.name)
91             self.old_child = None
92             return None
93         self.old_child, metadata = children[self.name]
94
95         # Unknown children can be removed regardless of must_be_directory or must_be_file.
96         if self.must_be_directory and IFileNode.providedBy(self.old_child):
97             raise ChildOfWrongTypeError("delete required a directory, not a file")
98         if self.must_be_file and IDirectoryNode.providedBy(self.old_child):
99             raise ChildOfWrongTypeError("delete required a file, not a directory")
100
101         del children[self.name]
102         new_contents = self.node._pack_contents(children)
103         return new_contents
104
105
106 class MetadataSetter:
107     def __init__(self, node, namex, metadata, create_readonly_node=None):
108         self.node = node
109         self.name = normalize(namex)
110         self.metadata = metadata
111         self.create_readonly_node = create_readonly_node
112
113     def modify(self, old_contents, servermap, first_time):
114         children = self.node._unpack_contents(old_contents)
115         name = self.name
116         if name not in children:
117             raise NoSuchChildError(name)
118
119         now = time.time()
120         child = children[name][0]
121
122         metadata = update_metadata(children[name][1].copy(), self.metadata, now)
123         if self.create_readonly_node and metadata.get('no-write', False):
124             child = self.create_readonly_node(child, name)
125
126         children[name] = (child, metadata)
127         new_contents = self.node._pack_contents(children)
128         return new_contents
129
130
131 class Adder:
132     def __init__(self, node, entries=None, overwrite=True, create_readonly_node=None):
133         self.node = node
134         if entries is None:
135             entries = {}
136         precondition(isinstance(entries, dict), entries)
137         # keys of 'entries' may not be normalized.
138         self.entries = entries
139         self.overwrite = overwrite
140         self.create_readonly_node = create_readonly_node
141
142     def set_node(self, namex, node, metadata):
143         precondition(IFilesystemNode.providedBy(node), node)
144         self.entries[namex] = (node, metadata)
145
146     def modify(self, old_contents, servermap, first_time):
147         children = self.node._unpack_contents(old_contents)
148         now = time.time()
149         for (namex, (child, new_metadata)) in self.entries.iteritems():
150             name = normalize(namex)
151             precondition(IFilesystemNode.providedBy(child), child)
152
153             # Strictly speaking this is redundant because we would raise the
154             # error again in _pack_normalized_children.
155             child.raise_error()
156
157             metadata = None
158             if name in children:
159                 if not self.overwrite:
160                     raise ExistingChildError("child %s already exists" % quote_output(name, encoding='utf-8'))
161
162                 if self.overwrite == "only-files" and IDirectoryNode.providedBy(children[name][0]):
163                     raise ExistingChildError("child %s already exists" % quote_output(name, encoding='utf-8'))
164                 metadata = children[name][1].copy()
165
166             metadata = update_metadata(metadata, new_metadata, now)
167             if self.create_readonly_node and metadata.get('no-write', False):
168                 child = self.create_readonly_node(child, name)
169
170             children[name] = (child, metadata)
171         new_contents = self.node._pack_contents(children)
172         return new_contents
173
174
175 def _encrypt_rw_uri(filenode, rw_uri):
176     assert isinstance(rw_uri, str)
177     writekey = filenode.get_writekey()
178     if not writekey:
179         return ""
180     salt = hashutil.mutable_rwcap_salt_hash(rw_uri)
181     key = hashutil.mutable_rwcap_key_hash(salt, writekey)
182     cryptor = AES(key)
183     crypttext = cryptor.process(rw_uri)
184     mac = hashutil.hmac(key, salt + crypttext)
185     assert len(mac) == 32
186     return salt + crypttext + mac
187     # The MAC is not checked by readers in Tahoe >= 1.3.0, but we still
188     # produce it for the sake of older readers.
189
190
191 def pack_children(filenode, childrenx, deep_immutable=False):
192     # initial_children must have metadata (i.e. {} instead of None)
193     children = {}
194     for (namex, (node, metadata)) in childrenx.iteritems():
195         precondition(isinstance(metadata, dict),
196                      "directory creation requires metadata to be a dict, not None", metadata)
197         children[normalize(namex)] = (node, metadata)
198
199     return _pack_normalized_children(filenode, children, deep_immutable=deep_immutable)
200
201
202 def _pack_normalized_children(filenode, children, deep_immutable=False):
203     """Take a dict that maps:
204          children[unicode_nfc_name] = (IFileSystemNode, metadata_dict)
205     and pack it into a single string, for use as the contents of the backing
206     file. This is the same format as is returned by _unpack_contents. I also
207     accept an AuxValueDict, in which case I'll use the auxilliary cached data
208     as the pre-packed entry, which is faster than re-packing everything each
209     time.
210
211     If deep_immutable is True, I will require that all my children are deeply
212     immutable, and will raise a MustBeDeepImmutableError if not.
213     """
214
215     has_aux = isinstance(children, AuxValueDict)
216     entries = []
217     for name in sorted(children.keys()):
218         assert isinstance(name, unicode)
219         entry = None
220         (child, metadata) = children[name]
221         child.raise_error()
222         if deep_immutable and not child.is_allowed_in_immutable_directory():
223             raise MustBeDeepImmutableError("child %s is not allowed in an immutable directory" %
224                                            quote_output(name, encoding='utf-8'), name)
225         if has_aux:
226             entry = children.get_aux(name)
227         if not entry:
228             assert IFilesystemNode.providedBy(child), (name,child)
229             assert isinstance(metadata, dict)
230             rw_uri = child.get_write_uri()
231             if rw_uri is None:
232                 rw_uri = ""
233             assert isinstance(rw_uri, str), rw_uri
234             
235             # should be prevented by MustBeDeepImmutableError check above
236             assert not (rw_uri and deep_immutable)
237
238             ro_uri = child.get_readonly_uri()
239             if ro_uri is None:
240                 ro_uri = ""
241             assert isinstance(ro_uri, str), ro_uri
242             entry = "".join([netstring(name.encode("utf-8")),
243                              netstring(strip_prefix_for_ro(ro_uri, deep_immutable)),
244                              netstring(_encrypt_rw_uri(filenode, rw_uri)),
245                              netstring(simplejson.dumps(metadata))])
246         entries.append(netstring(entry))
247     return "".join(entries)
248
249 class DirectoryNode:
250     implements(IDirectoryNode, ICheckable, IDeepCheckable)
251     filenode_class = MutableFileNode
252
253     def __init__(self, filenode, nodemaker, uploader):
254         assert IFileNode.providedBy(filenode), filenode
255         assert not IDirectoryNode.providedBy(filenode), filenode
256         self._node = filenode
257         filenode_cap = filenode.get_cap()
258         self._uri = wrap_dirnode_cap(filenode_cap)
259         self._nodemaker = nodemaker
260         self._uploader = uploader
261
262     def __repr__(self):
263         return "<%s %s-%s %s>" % (self.__class__.__name__,
264                                   self.is_readonly() and "RO" or "RW",
265                                   self.is_mutable() and "MUT" or "IMM",
266                                   hasattr(self, '_uri') and self._uri.abbrev())
267
268     def get_size(self):
269         """Return the size of our backing mutable file, in bytes, if we've
270         fetched it. Otherwise return None. This returns synchronously."""
271         return self._node.get_size()
272
273     def get_current_size(self):
274         """Calculate the size of our backing mutable file, in bytes. Returns
275         a Deferred that fires with the result."""
276         return self._node.get_current_size()
277
278     def _read(self):
279         if self._node.is_mutable():
280             # use the IMutableFileNode API.
281             d = self._node.download_best_version()
282         else:
283             d = download_to_data(self._node)
284         d.addCallback(self._unpack_contents)
285         return d
286
287     def _decrypt_rwcapdata(self, encwrcap):
288         salt = encwrcap[:16]
289         crypttext = encwrcap[16:-32]
290         key = hashutil.mutable_rwcap_key_hash(salt, self._node.get_writekey())
291         cryptor = AES(key)
292         plaintext = cryptor.process(crypttext)
293         return plaintext
294
295     def _create_and_validate_node(self, rw_uri, ro_uri, name):
296         # name is just for error reporting
297         node = self._nodemaker.create_from_cap(rw_uri, ro_uri,
298                                                deep_immutable=not self.is_mutable(),
299                                                name=name)
300         node.raise_error()
301         return node
302
303     def _create_readonly_node(self, node, name):
304         # name is just for error reporting
305         if not node.is_unknown() and node.is_readonly():
306             return node
307         return self._create_and_validate_node(None, node.get_readonly_uri(), name=name)
308
309     def _unpack_contents(self, data):
310         # the directory is serialized as a list of netstrings, one per child.
311         # Each child is serialized as a list of four netstrings: (name, ro_uri,
312         # rwcapdata, metadata), in which the name, ro_uri, metadata are in
313         # cleartext. The 'name' is UTF-8 encoded, and should be normalized to NFC.
314         # The rwcapdata is formatted as:
315         # pack("16ss32s", iv, AES(H(writekey+iv), plaintext_rw_uri), mac)
316         assert isinstance(data, str), (repr(data), type(data))
317         # an empty directory is serialized as an empty string
318         if data == "":
319             return AuxValueDict()
320         writeable = not self.is_readonly()
321         mutable = self.is_mutable()
322         children = AuxValueDict()
323         position = 0
324         while position < len(data):
325             entries, position = split_netstring(data, 1, position)
326             entry = entries[0]
327             (namex_utf8, ro_uri, rwcapdata, metadata_s), subpos = split_netstring(entry, 4)
328             if not mutable and len(rwcapdata) > 0:
329                 raise ValueError("the rwcapdata field of a dirnode in an immutable directory was not empty")
330
331             # A name containing characters that are unassigned in one version of Unicode might
332             # not be normalized wrt a later version. See the note in section 'Normalization Stability'
333             # at <http://unicode.org/policies/stability_policy.html>.
334             # Therefore we normalize names going both in and out of directories.
335             name = normalize(namex_utf8.decode("utf-8"))
336
337             rw_uri = ""
338             if writeable:
339                 rw_uri = self._decrypt_rwcapdata(rwcapdata)
340
341             # Since the encryption uses CTR mode, it currently leaks the length of the
342             # plaintext rw_uri -- and therefore whether it is present, i.e. whether the
343             # dirnode is writeable (ticket #925). By stripping trailing spaces in
344             # Tahoe >= 1.6.0, we may make it easier for future versions to plug this leak.
345             # ro_uri is treated in the same way for consistency.
346             # rw_uri and ro_uri will be either None or a non-empty string.
347
348             rw_uri = rw_uri.rstrip(' ') or None
349             ro_uri = ro_uri.rstrip(' ') or None
350
351             try:
352                 child = self._create_and_validate_node(rw_uri, ro_uri, name)
353                 if mutable or child.is_allowed_in_immutable_directory():
354                     metadata = simplejson.loads(metadata_s)
355                     assert isinstance(metadata, dict)
356                     children[name] = (child, metadata)
357                     children.set_with_aux(name, (child, metadata), auxilliary=entry)
358                 else:
359                     log.msg(format="mutable cap for child %(name)s unpacked from an immutable directory",
360                                    name=quote_output(name, encoding='utf-8'),
361                                    facility="tahoe.webish", level=log.UNUSUAL)
362             except CapConstraintError, e:
363                 log.msg(format="unmet constraint on cap for child %(name)s unpacked from a directory:\n"
364                                "%(message)s", message=e.args[0], name=quote_output(name, encoding='utf-8'),
365                                facility="tahoe.webish", level=log.UNUSUAL)
366
367         return children
368
369     def _pack_contents(self, children):
370         # expects children in the same format as _unpack_contents returns
371         return _pack_normalized_children(self._node, children)
372
373     def is_readonly(self):
374         return self._node.is_readonly()
375
376     def is_mutable(self):
377         return self._node.is_mutable()
378
379     def is_unknown(self):
380         return False
381
382     def is_allowed_in_immutable_directory(self):
383         return not self._node.is_mutable()
384
385     def raise_error(self):
386         pass
387
388     def get_uri(self):
389         return self._uri.to_string()
390
391     def get_write_uri(self):
392         if self.is_readonly():
393             return None
394         return self._uri.to_string()
395
396     def get_readonly_uri(self):
397         return self._uri.get_readonly().to_string()
398
399     def get_cap(self):
400         return self._uri
401
402     def get_readcap(self):
403         return self._uri.get_readonly()
404
405     def get_verify_cap(self):
406         return self._uri.get_verify_cap()
407
408     def get_repair_cap(self):
409         if self._node.is_readonly():
410             return None # readonly (mutable) dirnodes are not yet repairable
411         return self._uri
412
413     def get_storage_index(self):
414         return self._uri.get_storage_index()
415
416     def check(self, monitor, verify=False, add_lease=False):
417         """Perform a file check. See IChecker.check for details."""
418         return self._node.check(monitor, verify, add_lease)
419     def check_and_repair(self, monitor, verify=False, add_lease=False):
420         return self._node.check_and_repair(monitor, verify, add_lease)
421
422     def list(self):
423         """I return a Deferred that fires with a dictionary mapping child
424         name to a tuple of (IFilesystemNode, metadata)."""
425         return self._read()
426
427     def has_child(self, namex):
428         """I return a Deferred that fires with a boolean, True if there
429         exists a child of the given name, False if not."""
430         name = normalize(namex)
431         d = self._read()
432         d.addCallback(lambda children: children.has_key(name))
433         return d
434
435     def _get(self, children, name):
436         child = children.get(name)
437         if child is None:
438             raise NoSuchChildError(name)
439         return child[0]
440
441     def _get_with_metadata(self, children, name):
442         child = children.get(name)
443         if child is None:
444             raise NoSuchChildError(name)
445         return child
446
447     def get(self, namex):
448         """I return a Deferred that fires with the named child node,
449         which is an IFilesystemNode."""
450         name = normalize(namex)
451         d = self._read()
452         d.addCallback(self._get, name)
453         return d
454
455     def get_child_and_metadata(self, namex):
456         """I return a Deferred that fires with the (node, metadata) pair for
457         the named child. The node is an IFilesystemNode, and the metadata
458         is a dictionary."""
459         name = normalize(namex)
460         d = self._read()
461         d.addCallback(self._get_with_metadata, name)
462         return d
463
464     def get_metadata_for(self, namex):
465         name = normalize(namex)
466         d = self._read()
467         d.addCallback(lambda children: children[name][1])
468         return d
469
470     def set_metadata_for(self, namex, metadata):
471         name = normalize(namex)
472         if self.is_readonly():
473             return defer.fail(NotWriteableError())
474         assert isinstance(metadata, dict)
475         s = MetadataSetter(self, name, metadata,
476                            create_readonly_node=self._create_readonly_node)
477         d = self._node.modify(s.modify)
478         d.addCallback(lambda res: self)
479         return d
480
481     def get_child_at_path(self, pathx):
482         """Transform a child path into an IFilesystemNode.
483
484         I perform a recursive series of 'get' operations to find the named
485         descendant node. I return a Deferred that fires with the node, or
486         errbacks with IndexError if the node could not be found.
487
488         The path can be either a single string (slash-separated) or a list of
489         path-name elements.
490         """
491         d = self.get_child_and_metadata_at_path(pathx)
492         d.addCallback(lambda (node, metadata): node)
493         return d
494
495     def get_child_and_metadata_at_path(self, pathx):
496         """Transform a child path into an IFilesystemNode and
497         a metadata dictionary from the last edge that was traversed.
498         """
499
500         if not pathx:
501             return defer.succeed((self, {}))
502         if isinstance(pathx, (list, tuple)):
503             pass
504         else:
505             pathx = pathx.split("/")
506         for p in pathx:
507             assert isinstance(p, unicode), p
508         childnamex = pathx[0]
509         remaining_pathx = pathx[1:]
510         if remaining_pathx:
511             d = self.get(childnamex)
512             d.addCallback(lambda node:
513                           node.get_child_and_metadata_at_path(remaining_pathx))
514             return d
515         d = self.get_child_and_metadata(childnamex)
516         return d
517
518     def set_uri(self, namex, writecap, readcap, metadata=None, overwrite=True):
519         precondition(isinstance(writecap, (str,type(None))), writecap)
520         precondition(isinstance(readcap, (str,type(None))), readcap)
521
522         # We now allow packing unknown nodes, provided they are valid
523         # for this type of directory.
524         child_node = self._create_and_validate_node(writecap, readcap, namex)
525         d = self.set_node(namex, child_node, metadata, overwrite)
526         d.addCallback(lambda res: child_node)
527         return d
528
529     def set_children(self, entries, overwrite=True):
530         # this takes URIs
531         a = Adder(self, overwrite=overwrite,
532                   create_readonly_node=self._create_readonly_node)
533         for (namex, e) in entries.iteritems():
534             assert isinstance(namex, unicode), namex
535             if len(e) == 2:
536                 writecap, readcap = e
537                 metadata = None
538             else:
539                 assert len(e) == 3
540                 writecap, readcap, metadata = e
541             precondition(isinstance(writecap, (str,type(None))), writecap)
542             precondition(isinstance(readcap, (str,type(None))), readcap)
543             
544             # We now allow packing unknown nodes, provided they are valid
545             # for this type of directory.
546             child_node = self._create_and_validate_node(writecap, readcap, namex)
547             a.set_node(namex, child_node, metadata)
548         d = self._node.modify(a.modify)
549         d.addCallback(lambda ign: self)
550         return d
551
552     def set_node(self, namex, child, metadata=None, overwrite=True):
553         """I add a child at the specific name. I return a Deferred that fires
554         when the operation finishes. This Deferred will fire with the child
555         node that was just added. I will replace any existing child of the
556         same name.
557
558         If this directory node is read-only, the Deferred will errback with a
559         NotWriteableError."""
560
561         precondition(IFilesystemNode.providedBy(child), child)
562
563         if self.is_readonly():
564             return defer.fail(NotWriteableError())
565         assert IFilesystemNode.providedBy(child), child
566         a = Adder(self, overwrite=overwrite,
567                   create_readonly_node=self._create_readonly_node)
568         a.set_node(namex, child, metadata)
569         d = self._node.modify(a.modify)
570         d.addCallback(lambda res: child)
571         return d
572
573     def set_nodes(self, entries, overwrite=True):
574         precondition(isinstance(entries, dict), entries)
575         if self.is_readonly():
576             return defer.fail(NotWriteableError())
577         a = Adder(self, entries, overwrite=overwrite,
578                   create_readonly_node=self._create_readonly_node)
579         d = self._node.modify(a.modify)
580         d.addCallback(lambda res: self)
581         return d
582
583
584     def add_file(self, namex, uploadable, metadata=None, overwrite=True):
585         """I upload a file (using the given IUploadable), then attach the
586         resulting FileNode to the directory at the given name. I return a
587         Deferred that fires (with the IFileNode of the uploaded file) when
588         the operation completes."""
589         name = normalize(namex)
590         if self.is_readonly():
591             return defer.fail(NotWriteableError())
592         d = self._uploader.upload(uploadable)
593         d.addCallback(lambda results:
594                       self._create_and_validate_node(results.uri, None, name))
595         d.addCallback(lambda node:
596                       self.set_node(name, node, metadata, overwrite))
597         return d
598
599     def delete(self, namex, must_exist=True, must_be_directory=False, must_be_file=False):
600         """I remove the child at the specific name. I return a Deferred that
601         fires (with the node just removed) when the operation finishes."""
602         if self.is_readonly():
603             return defer.fail(NotWriteableError())
604         deleter = Deleter(self, namex, must_exist=must_exist,
605                           must_be_directory=must_be_directory, must_be_file=must_be_file)
606         d = self._node.modify(deleter.modify)
607         d.addCallback(lambda res: deleter.old_child)
608         return d
609
610     def create_subdirectory(self, namex, initial_children={}, overwrite=True,
611                             mutable=True, metadata=None):
612         name = normalize(namex)
613         if self.is_readonly():
614             return defer.fail(NotWriteableError())
615         if mutable:
616             d = self._nodemaker.create_new_mutable_directory(initial_children)
617         else:
618             d = self._nodemaker.create_immutable_directory(initial_children)
619         def _created(child):
620             entries = {name: (child, metadata)}
621             a = Adder(self, entries, overwrite=overwrite,
622                       create_readonly_node=self._create_readonly_node)
623             d = self._node.modify(a.modify)
624             d.addCallback(lambda res: child)
625             return d
626         d.addCallback(_created)
627         return d
628
629     def move_child_to(self, current_child_namex, new_parent,
630                       new_child_namex=None, overwrite=True):
631         """I take one of my children and move them to a new parent. The child
632         is referenced by name. On the new parent, the child will live under
633         'new_child_name', which defaults to 'current_child_name'. I return a
634         Deferred that fires when the operation finishes."""
635
636         if self.is_readonly() or new_parent.is_readonly():
637             return defer.fail(NotWriteableError())
638
639         current_child_name = normalize(current_child_namex)
640         if new_child_namex is None:
641             new_child_namex = current_child_name
642         d = self.get(current_child_name)
643         def sn(child):
644             return new_parent.set_node(new_child_namex, child,
645                                        overwrite=overwrite)
646         d.addCallback(sn)
647         d.addCallback(lambda child: self.delete(current_child_name))
648         return d
649
650
651     def deep_traverse(self, walker):
652         """Perform a recursive walk, using this dirnode as a root, notifying
653         the 'walker' instance of everything I encounter.
654
655         I call walker.enter_directory(parent, children) once for each dirnode
656         I visit, immediately after retrieving the list of children. I pass in
657         the parent dirnode and the dict of childname->(childnode,metadata).
658         This function should *not* traverse the children: I will do that.
659         enter_directory() is most useful for the deep-stats number that
660         counts how large a directory is.
661
662         I call walker.add_node(node, path) for each node (both files and
663         directories) I can reach. Most work should be done here.
664
665         I avoid loops by keeping track of verifier-caps and refusing to call
666         walker.add_node() or traverse a node that I've seen before. This
667         means that any file or directory will only be given to the walker
668         once. If files or directories are referenced multiple times by a
669         directory structure, this may appear to under-count or miss some of
670         them.
671
672         I return a Monitor which can be used to wait for the operation to
673         finish, learn about its progress, or cancel the operation.
674         """
675
676         # this is just a tree-walker, except that following each edge
677         # requires a Deferred. We used to use a ConcurrencyLimiter to limit
678         # fanout to 10 simultaneous operations, but the memory load of the
679         # queued operations was excessive (in one case, with 330k dirnodes,
680         # it caused the process to run into the 3.0GB-ish per-process 32bit
681         # linux memory limit, and crashed). So we use a single big Deferred
682         # chain, and do a strict depth-first traversal, one node at a time.
683         # This can be slower, because we aren't pipelining directory reads,
684         # but it brought the memory footprint down by roughly 50%.
685
686         monitor = Monitor()
687         walker.set_monitor(monitor)
688
689         found = set([self.get_verify_cap()])
690         d = self._deep_traverse_dirnode(self, [], walker, monitor, found)
691         d.addCallback(lambda ignored: walker.finish())
692         d.addBoth(monitor.finish)
693         d.addErrback(lambda f: None)
694
695         return monitor
696
697     def _deep_traverse_dirnode(self, node, path, walker, monitor, found):
698         # process this directory, then walk its children
699         monitor.raise_if_cancelled()
700         d = defer.maybeDeferred(walker.add_node, node, path)
701         d.addCallback(lambda ignored: node.list())
702         d.addCallback(self._deep_traverse_dirnode_children, node, path,
703                       walker, monitor, found)
704         return d
705
706     def _deep_traverse_dirnode_children(self, children, parent, path,
707                                         walker, monitor, found):
708         monitor.raise_if_cancelled()
709         d = defer.maybeDeferred(walker.enter_directory, parent, children)
710         # we process file-like children first, so we can drop their FileNode
711         # objects as quickly as possible. Tests suggest that a FileNode (held
712         # in the client's nodecache) consumes about 2440 bytes. dirnodes (not
713         # in the nodecache) seem to consume about 2000 bytes.
714         dirkids = []
715         filekids = []
716         for name, (child, metadata) in sorted(children.iteritems()):
717             childpath = path + [name]
718             if isinstance(child, UnknownNode):
719                 walker.add_node(child, childpath)
720                 continue
721             verifier = child.get_verify_cap()
722             # allow LIT files (for which verifier==None) to be processed
723             if (verifier is not None) and (verifier in found):
724                 continue
725             found.add(verifier)
726             if IDirectoryNode.providedBy(child):
727                 dirkids.append( (child, childpath) )
728             else:
729                 filekids.append( (child, childpath) )
730         for i, (child, childpath) in enumerate(filekids):
731             d.addCallback(lambda ignored, child=child, childpath=childpath:
732                           walker.add_node(child, childpath))
733             # to work around the Deferred tail-recursion problem
734             # (specifically the defer.succeed flavor) requires us to avoid
735             # doing more than 158 LIT files in a row. We insert a turn break
736             # once every 100 files (LIT or CHK) to preserve some stack space
737             # for other code. This is a different expression of the same
738             # Twisted problem as in #237.
739             if i % 100 == 99:
740                 d.addCallback(lambda ignored: fireEventually())
741         for (child, childpath) in dirkids:
742             d.addCallback(lambda ignored, child=child, childpath=childpath:
743                           self._deep_traverse_dirnode(child, childpath,
744                                                       walker, monitor,
745                                                       found))
746         return d
747
748
749     def build_manifest(self):
750         """Return a Monitor, with a ['status'] that will be a list of (path,
751         cap) tuples, for all nodes (directories and files) reachable from
752         this one."""
753         walker = ManifestWalker(self)
754         return self.deep_traverse(walker)
755
756     def start_deep_stats(self):
757         # Since deep_traverse tracks verifier caps, we avoid double-counting
758         # children for which we've got both a write-cap and a read-cap
759         return self.deep_traverse(DeepStats(self))
760
761     def start_deep_check(self, verify=False, add_lease=False):
762         return self.deep_traverse(DeepChecker(self, verify, repair=False, add_lease=add_lease))
763
764     def start_deep_check_and_repair(self, verify=False, add_lease=False):
765         return self.deep_traverse(DeepChecker(self, verify, repair=True, add_lease=add_lease))
766
767
768
769 class DeepStats:
770     def __init__(self, origin):
771         self.origin = origin
772         self.stats = {}
773         for k in ["count-immutable-files",
774                   "count-mutable-files",
775                   "count-literal-files",
776                   "count-files",
777                   "count-directories",
778                   "count-unknown",
779                   "size-immutable-files",
780                   #"size-mutable-files",
781                   "size-literal-files",
782                   "size-directories",
783                   "largest-directory",
784                   "largest-directory-children",
785                   "largest-immutable-file",
786                   #"largest-mutable-file",
787                   ]:
788             self.stats[k] = 0
789         self.histograms = {}
790         for k in ["size-files-histogram"]:
791             self.histograms[k] = {} # maps (min,max) to count
792         self.buckets = [ (0,0), (1,3)]
793         self.root = math.sqrt(10)
794
795     def set_monitor(self, monitor):
796         self.monitor = monitor
797         monitor.origin_si = self.origin.get_storage_index()
798         monitor.set_status(self.get_results())
799
800     def add_node(self, node, childpath):
801         if isinstance(node, UnknownNode):
802             self.add("count-unknown")
803         elif IDirectoryNode.providedBy(node):
804             self.add("count-directories")
805         elif IMutableFileNode.providedBy(node):
806             self.add("count-files")
807             self.add("count-mutable-files")
808             # TODO: update the servermap, compute a size, add it to
809             # size-mutable-files, max it into "largest-mutable-file"
810         elif IImmutableFileNode.providedBy(node): # CHK and LIT
811             self.add("count-files")
812             size = node.get_size()
813             self.histogram("size-files-histogram", size)
814             theuri = from_string(node.get_uri())
815             if isinstance(theuri, LiteralFileURI):
816                 self.add("count-literal-files")
817                 self.add("size-literal-files", size)
818             else:
819                 self.add("count-immutable-files")
820                 self.add("size-immutable-files", size)
821                 self.max("largest-immutable-file", size)
822
823     def enter_directory(self, parent, children):
824         dirsize_bytes = parent.get_size()
825         if dirsize_bytes is not None:
826             self.add("size-directories", dirsize_bytes)
827             self.max("largest-directory", dirsize_bytes)
828         dirsize_children = len(children)
829         self.max("largest-directory-children", dirsize_children)
830
831     def add(self, key, value=1):
832         self.stats[key] += value
833
834     def max(self, key, value):
835         self.stats[key] = max(self.stats[key], value)
836
837     def which_bucket(self, size):
838         # return (min,max) such that min <= size <= max
839         # values are from the set (0,0), (1,3), (4,10), (11,31), (32,100),
840         # (101,316), (317, 1000), etc: two per decade
841         assert size >= 0
842         i = 0
843         while True:
844             if i >= len(self.buckets):
845                 # extend the list
846                 new_lower = self.buckets[i-1][1]+1
847                 new_upper = int(mathutil.next_power_of_k(new_lower, self.root))
848                 self.buckets.append( (new_lower, new_upper) )
849             maybe = self.buckets[i]
850             if maybe[0] <= size <= maybe[1]:
851                 return maybe
852             i += 1
853
854     def histogram(self, key, size):
855         bucket = self.which_bucket(size)
856         h = self.histograms[key]
857         if bucket not in h:
858             h[bucket] = 0
859         h[bucket] += 1
860
861     def get_results(self):
862         stats = self.stats.copy()
863         for key in self.histograms:
864             h = self.histograms[key]
865             out = [ (bucket[0], bucket[1], h[bucket]) for bucket in h ]
866             out.sort()
867             stats[key] = out
868         return stats
869
870     def finish(self):
871         return self.get_results()
872
873 class ManifestWalker(DeepStats):
874     def __init__(self, origin):
875         DeepStats.__init__(self, origin)
876         self.manifest = []
877         self.storage_index_strings = set()
878         self.verifycaps = set()
879
880     def add_node(self, node, path):
881         self.manifest.append( (tuple(path), node.get_uri()) )
882         si = node.get_storage_index()
883         if si:
884             self.storage_index_strings.add(base32.b2a(si))
885         v = node.get_verify_cap()
886         if v:
887             self.verifycaps.add(v.to_string())
888         return DeepStats.add_node(self, node, path)
889
890     def get_results(self):
891         stats = DeepStats.get_results(self)
892         return {"manifest": self.manifest,
893                 "verifycaps": self.verifycaps,
894                 "storage-index": self.storage_index_strings,
895                 "stats": stats,
896                 }
897
898
899 class DeepChecker:
900     def __init__(self, root, verify, repair, add_lease):
901         root_si = root.get_storage_index()
902         if root_si:
903             root_si_base32 = base32.b2a(root_si)
904         else:
905             root_si_base32 = ""
906         self._lp = log.msg(format="deep-check starting (%(si)s),"
907                            " verify=%(verify)s, repair=%(repair)s",
908                            si=root_si_base32, verify=verify, repair=repair)
909         self._verify = verify
910         self._repair = repair
911         self._add_lease = add_lease
912         if repair:
913             self._results = DeepCheckAndRepairResults(root_si)
914         else:
915             self._results = DeepCheckResults(root_si)
916         self._stats = DeepStats(root)
917
918     def set_monitor(self, monitor):
919         self.monitor = monitor
920         monitor.set_status(self._results)
921
922     def add_node(self, node, childpath):
923         if self._repair:
924             d = node.check_and_repair(self.monitor, self._verify, self._add_lease)
925             d.addCallback(self._results.add_check_and_repair, childpath)
926         else:
927             d = node.check(self.monitor, self._verify, self._add_lease)
928             d.addCallback(self._results.add_check, childpath)
929         d.addCallback(lambda ignored: self._stats.add_node(node, childpath))
930         return d
931
932     def enter_directory(self, parent, children):
933         return self._stats.enter_directory(parent, children)
934
935     def finish(self):
936         log.msg("deep-check done", parent=self._lp)
937         self._results.update_stats(self._stats.get_results())
938         return self._results
939
940
941 # use client.create_dirnode() to make one of these
942
943