]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/dirnode.py
022a6e00f6e4d602c805d34ac3ae22d3fe5f89b5
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / dirnode.py
1
2 import time, math, unicodedata
3
4 from zope.interface import implements
5 from twisted.internet import defer
6 from foolscap.api import fireEventually
7 import simplejson
8 from allmydata.mutable.common import NotWriteableError
9 from allmydata.mutable.filenode import MutableFileNode
10 from allmydata.unknown import UnknownNode, strip_prefix_for_ro
11 from allmydata.interfaces import IFilesystemNode, IDirectoryNode, IFileNode, \
12      IImmutableFileNode, IMutableFileNode, \
13      ExistingChildError, NoSuchChildError, ICheckable, IDeepCheckable, \
14      MustBeDeepImmutableError, CapConstraintError, ChildOfWrongTypeError
15 from allmydata.check_results import DeepCheckResults, \
16      DeepCheckAndRepairResults
17 from allmydata.monitor import Monitor
18 from allmydata.util import hashutil, mathutil, base32, log
19 from allmydata.util.encodingutil import quote_output
20 from allmydata.util.assertutil import precondition
21 from allmydata.util.netstring import netstring, split_netstring
22 from allmydata.util.consumer import download_to_data
23 from allmydata.uri import LiteralFileURI, from_string, wrap_dirnode_cap
24 from pycryptopp.cipher.aes import AES
25 from allmydata.util.dictutil import AuxValueDict
26
27
28 def update_metadata(metadata, new_metadata, now):
29     """Updates 'metadata' in-place with the information in 'new_metadata'.
30     Timestamps are set according to the time 'now'."""
31
32     if metadata is None:
33         metadata = {}
34
35     old_ctime = None
36     if 'ctime' in metadata:
37         old_ctime = metadata['ctime']
38
39     if new_metadata is not None:
40         # Overwrite all metadata.
41         newmd = new_metadata.copy()
42
43         # Except 'tahoe'.
44         if 'tahoe' in newmd:
45             del newmd['tahoe']
46         if 'tahoe' in metadata:
47             newmd['tahoe'] = metadata['tahoe']
48
49         metadata = newmd
50
51     # update timestamps
52     sysmd = metadata.get('tahoe', {})
53     if 'linkcrtime' not in sysmd:
54         # In Tahoe < 1.4.0 we used the word 'ctime' to mean what Tahoe >= 1.4.0
55         # calls 'linkcrtime'. This field is only used if it was in the old metadata,
56         # and 'tahoe:linkcrtime' was not.
57         if old_ctime is not None:
58             sysmd['linkcrtime'] = old_ctime
59         else:
60             sysmd['linkcrtime'] = now
61
62     sysmd['linkmotime'] = now
63     metadata['tahoe'] = sysmd
64
65     return metadata
66
67
68 # 'x' at the end of a variable name indicates that it holds a Unicode string that may not
69 # be NFC-normalized.
70
71 def normalize(namex):
72     return unicodedata.normalize('NFC', namex)
73
74 # TODO: {Deleter,MetadataSetter,Adder}.modify all start by unpacking the
75 # contents and end by repacking them. It might be better to apply them to
76 # the unpacked contents.
77
78 class Deleter:
79     def __init__(self, node, namex, must_exist=True, must_be_directory=False, must_be_file=False):
80         self.node = node
81         self.name = normalize(namex)
82         self.must_exist = must_exist
83         self.must_be_directory = must_be_directory
84         self.must_be_file = must_be_file
85
86     def modify(self, old_contents, servermap, first_time):
87         children = self.node._unpack_contents(old_contents)
88         if self.name not in children:
89             if first_time and self.must_exist:
90                 raise NoSuchChildError(self.name)
91             self.old_child = None
92             return None
93         self.old_child, metadata = children[self.name]
94
95         # Unknown children can be removed regardless of must_be_directory or must_be_file.
96         if self.must_be_directory and IFileNode.providedBy(self.old_child):
97             raise ChildOfWrongTypeError("delete required a directory, not a file")
98         if self.must_be_file and IDirectoryNode.providedBy(self.old_child):
99             raise ChildOfWrongTypeError("delete required a file, not a directory")
100
101         del children[self.name]
102         new_contents = self.node._pack_contents(children)
103         return new_contents
104
105
106 class MetadataSetter:
107     def __init__(self, node, namex, metadata, create_readonly_node=None):
108         self.node = node
109         self.name = normalize(namex)
110         self.metadata = metadata
111         self.create_readonly_node = create_readonly_node
112
113     def modify(self, old_contents, servermap, first_time):
114         children = self.node._unpack_contents(old_contents)
115         name = self.name
116         if name not in children:
117             raise NoSuchChildError(name)
118
119         now = time.time()
120         child = children[name][0]
121
122         metadata = update_metadata(children[name][1].copy(), self.metadata, now)
123         if self.create_readonly_node and metadata.get('no-write', False):
124             child = self.create_readonly_node(child, name)
125
126         children[name] = (child, metadata)
127         new_contents = self.node._pack_contents(children)
128         return new_contents
129
130
131 class Adder:
132     def __init__(self, node, entries=None, overwrite=True, create_readonly_node=None):
133         self.node = node
134         if entries is None:
135             entries = {}
136         precondition(isinstance(entries, dict), entries)
137         # keys of 'entries' may not be normalized.
138         self.entries = entries
139         self.overwrite = overwrite
140         self.create_readonly_node = create_readonly_node
141
142     def set_node(self, namex, node, metadata):
143         precondition(IFilesystemNode.providedBy(node), node)
144         self.entries[namex] = (node, metadata)
145
146     def modify(self, old_contents, servermap, first_time):
147         children = self.node._unpack_contents(old_contents)
148         now = time.time()
149         for (namex, (child, new_metadata)) in self.entries.iteritems():
150             name = normalize(namex)
151             precondition(IFilesystemNode.providedBy(child), child)
152
153             # Strictly speaking this is redundant because we would raise the
154             # error again in _pack_normalized_children.
155             child.raise_error()
156
157             metadata = None
158             if name in children:
159                 if not self.overwrite:
160                     raise ExistingChildError("child %s already exists" % quote_output(name, encoding='utf-8'))
161
162                 if self.overwrite == "only-files" and IDirectoryNode.providedBy(children[name][0]):
163                     raise ExistingChildError("child %s already exists" % quote_output(name, encoding='utf-8'))
164                 metadata = children[name][1].copy()
165
166             metadata = update_metadata(metadata, new_metadata, now)
167             if self.create_readonly_node and metadata.get('no-write', False):
168                 child = self.create_readonly_node(child, name)
169
170             children[name] = (child, metadata)
171         new_contents = self.node._pack_contents(children)
172         return new_contents
173
174 def _encrypt_rw_uri(writekey, rw_uri):
175     precondition(isinstance(rw_uri, str), rw_uri)
176     precondition(isinstance(writekey, str), writekey)
177
178     salt = hashutil.mutable_rwcap_salt_hash(rw_uri)
179     key = hashutil.mutable_rwcap_key_hash(salt, writekey)
180     cryptor = AES(key)
181     crypttext = cryptor.process(rw_uri)
182     mac = hashutil.hmac(key, salt + crypttext)
183     assert len(mac) == 32
184     return salt + crypttext + mac
185     # The MAC is not checked by readers in Tahoe >= 1.3.0, but we still
186     # produce it for the sake of older readers.
187
188 def pack_children(childrenx, writekey, deep_immutable=False):
189     # initial_children must have metadata (i.e. {} instead of None)
190     children = {}
191     for (namex, (node, metadata)) in childrenx.iteritems():
192         precondition(isinstance(metadata, dict),
193                      "directory creation requires metadata to be a dict, not None", metadata)
194         children[normalize(namex)] = (node, metadata)
195
196     return _pack_normalized_children(children, writekey=writekey, deep_immutable=deep_immutable)
197
198
199 ZERO_LEN_NETSTR=netstring('')
200 def _pack_normalized_children(children, writekey, deep_immutable=False):
201     """Take a dict that maps:
202          children[unicode_nfc_name] = (IFileSystemNode, metadata_dict)
203     and pack it into a single string, for use as the contents of the backing
204     file. This is the same format as is returned by _unpack_contents. I also
205     accept an AuxValueDict, in which case I'll use the auxilliary cached data
206     as the pre-packed entry, which is faster than re-packing everything each
207     time.
208
209     If writekey is provided then I will superencrypt the child's writecap with
210     writekey.
211
212     If deep_immutable is True, I will require that all my children are deeply
213     immutable, and will raise a MustBeDeepImmutableError if not.
214     """
215     precondition((writekey is None) or isinstance(writekey, str), writekey)
216
217     has_aux = isinstance(children, AuxValueDict)
218     entries = []
219     for name in sorted(children.keys()):
220         assert isinstance(name, unicode)
221         entry = None
222         (child, metadata) = children[name]
223         child.raise_error()
224         if deep_immutable and not child.is_allowed_in_immutable_directory():
225             raise MustBeDeepImmutableError("child %s is not allowed in an immutable directory" %
226                                            quote_output(name, encoding='utf-8'), name)
227         if has_aux:
228             entry = children.get_aux(name)
229         if not entry:
230             assert IFilesystemNode.providedBy(child), (name,child)
231             assert isinstance(metadata, dict)
232             rw_uri = child.get_write_uri()
233             if rw_uri is None:
234                 rw_uri = ""
235             assert isinstance(rw_uri, str), rw_uri
236
237             # should be prevented by MustBeDeepImmutableError check above
238             assert not (rw_uri and deep_immutable)
239
240             ro_uri = child.get_readonly_uri()
241             if ro_uri is None:
242                 ro_uri = ""
243             assert isinstance(ro_uri, str), ro_uri
244             if writekey is not None:
245                 writecap = netstring(_encrypt_rw_uri(writekey, rw_uri))
246             else:
247                 writecap = ZERO_LEN_NETSTR
248             entry = "".join([netstring(name.encode("utf-8")),
249                              netstring(strip_prefix_for_ro(ro_uri, deep_immutable)),
250                              writecap,
251                              netstring(simplejson.dumps(metadata))])
252         entries.append(netstring(entry))
253     return "".join(entries)
254
255 class DirectoryNode:
256     implements(IDirectoryNode, ICheckable, IDeepCheckable)
257     filenode_class = MutableFileNode
258
259     def __init__(self, filenode, nodemaker, uploader):
260         assert IFileNode.providedBy(filenode), filenode
261         assert not IDirectoryNode.providedBy(filenode), filenode
262         self._node = filenode
263         filenode_cap = filenode.get_cap()
264         self._uri = wrap_dirnode_cap(filenode_cap)
265         self._nodemaker = nodemaker
266         self._uploader = uploader
267
268     def __repr__(self):
269         return "<%s %s-%s %s>" % (self.__class__.__name__,
270                                   self.is_readonly() and "RO" or "RW",
271                                   self.is_mutable() and "MUT" or "IMM",
272                                   hasattr(self, '_uri') and self._uri.abbrev())
273
274     def get_size(self):
275         """Return the size of our backing mutable file, in bytes, if we've
276         fetched it. Otherwise return None. This returns synchronously."""
277         return self._node.get_size()
278
279     def get_current_size(self):
280         """Calculate the size of our backing mutable file, in bytes. Returns
281         a Deferred that fires with the result."""
282         return self._node.get_current_size()
283
284     def _read(self):
285         if self._node.is_mutable():
286             # use the IMutableFileNode API.
287             d = self._node.download_best_version()
288         else:
289             d = download_to_data(self._node)
290         d.addCallback(self._unpack_contents)
291         return d
292
293     def _decrypt_rwcapdata(self, encwrcap):
294         salt = encwrcap[:16]
295         crypttext = encwrcap[16:-32]
296         key = hashutil.mutable_rwcap_key_hash(salt, self._node.get_writekey())
297         cryptor = AES(key)
298         plaintext = cryptor.process(crypttext)
299         return plaintext
300
301     def _create_and_validate_node(self, rw_uri, ro_uri, name):
302         # name is just for error reporting
303         node = self._nodemaker.create_from_cap(rw_uri, ro_uri,
304                                                deep_immutable=not self.is_mutable(),
305                                                name=name)
306         node.raise_error()
307         return node
308
309     def _create_readonly_node(self, node, name):
310         # name is just for error reporting
311         if not node.is_unknown() and node.is_readonly():
312             return node
313         return self._create_and_validate_node(None, node.get_readonly_uri(), name=name)
314
315     def _unpack_contents(self, data):
316         # the directory is serialized as a list of netstrings, one per child.
317         # Each child is serialized as a list of four netstrings: (name, ro_uri,
318         # rwcapdata, metadata), in which the name, ro_uri, metadata are in
319         # cleartext. The 'name' is UTF-8 encoded, and should be normalized to NFC.
320         # The rwcapdata is formatted as:
321         # pack("16ss32s", iv, AES(H(writekey+iv), plaintext_rw_uri), mac)
322         assert isinstance(data, str), (repr(data), type(data))
323         # an empty directory is serialized as an empty string
324         if data == "":
325             return AuxValueDict()
326         writeable = not self.is_readonly()
327         mutable = self.is_mutable()
328         children = AuxValueDict()
329         position = 0
330         while position < len(data):
331             entries, position = split_netstring(data, 1, position)
332             entry = entries[0]
333             (namex_utf8, ro_uri, rwcapdata, metadata_s), subpos = split_netstring(entry, 4)
334             if not mutable and len(rwcapdata) > 0:
335                 raise ValueError("the rwcapdata field of a dirnode in an immutable directory was not empty")
336
337             # A name containing characters that are unassigned in one version of Unicode might
338             # not be normalized wrt a later version. See the note in section 'Normalization Stability'
339             # at <http://unicode.org/policies/stability_policy.html>.
340             # Therefore we normalize names going both in and out of directories.
341             name = normalize(namex_utf8.decode("utf-8"))
342
343             rw_uri = ""
344             if writeable:
345                 rw_uri = self._decrypt_rwcapdata(rwcapdata)
346
347             # Since the encryption uses CTR mode, it currently leaks the length of the
348             # plaintext rw_uri -- and therefore whether it is present, i.e. whether the
349             # dirnode is writeable (ticket #925). By stripping trailing spaces in
350             # Tahoe >= 1.6.0, we may make it easier for future versions to plug this leak.
351             # ro_uri is treated in the same way for consistency.
352             # rw_uri and ro_uri will be either None or a non-empty string.
353
354             rw_uri = rw_uri.rstrip(' ') or None
355             ro_uri = ro_uri.rstrip(' ') or None
356
357             try:
358                 child = self._create_and_validate_node(rw_uri, ro_uri, name)
359                 if mutable or child.is_allowed_in_immutable_directory():
360                     metadata = simplejson.loads(metadata_s)
361                     assert isinstance(metadata, dict)
362                     children[name] = (child, metadata)
363                     children.set_with_aux(name, (child, metadata), auxilliary=entry)
364                 else:
365                     log.msg(format="mutable cap for child %(name)s unpacked from an immutable directory",
366                                    name=quote_output(name, encoding='utf-8'),
367                                    facility="tahoe.webish", level=log.UNUSUAL)
368             except CapConstraintError, e:
369                 log.msg(format="unmet constraint on cap for child %(name)s unpacked from a directory:\n"
370                                "%(message)s", message=e.args[0], name=quote_output(name, encoding='utf-8'),
371                                facility="tahoe.webish", level=log.UNUSUAL)
372
373         return children
374
375     def _pack_contents(self, children):
376         # expects children in the same format as _unpack_contents returns
377         return _pack_normalized_children(children, self._node.get_writekey())
378
379     def is_readonly(self):
380         return self._node.is_readonly()
381
382     def is_mutable(self):
383         return self._node.is_mutable()
384
385     def is_unknown(self):
386         return False
387
388     def is_allowed_in_immutable_directory(self):
389         return not self._node.is_mutable()
390
391     def raise_error(self):
392         pass
393
394     def get_uri(self):
395         return self._uri.to_string()
396
397     def get_write_uri(self):
398         if self.is_readonly():
399             return None
400         return self._uri.to_string()
401
402     def get_readonly_uri(self):
403         return self._uri.get_readonly().to_string()
404
405     def get_cap(self):
406         return self._uri
407
408     def get_readcap(self):
409         return self._uri.get_readonly()
410
411     def get_verify_cap(self):
412         return self._uri.get_verify_cap()
413
414     def get_repair_cap(self):
415         if self._node.is_readonly():
416             return None # readonly (mutable) dirnodes are not yet repairable
417         return self._uri
418
419     def get_storage_index(self):
420         return self._uri.get_storage_index()
421
422     def check(self, monitor, verify=False, add_lease=False):
423         """Perform a file check. See IChecker.check for details."""
424         return self._node.check(monitor, verify, add_lease)
425     def check_and_repair(self, monitor, verify=False, add_lease=False):
426         return self._node.check_and_repair(monitor, verify, add_lease)
427
428     def list(self):
429         """I return a Deferred that fires with a dictionary mapping child
430         name to a tuple of (IFilesystemNode, metadata)."""
431         return self._read()
432
433     def has_child(self, namex):
434         """I return a Deferred that fires with a boolean, True if there
435         exists a child of the given name, False if not."""
436         name = normalize(namex)
437         d = self._read()
438         d.addCallback(lambda children: children.has_key(name))
439         return d
440
441     def _get(self, children, name):
442         child = children.get(name)
443         if child is None:
444             raise NoSuchChildError(name)
445         return child[0]
446
447     def _get_with_metadata(self, children, name):
448         child = children.get(name)
449         if child is None:
450             raise NoSuchChildError(name)
451         return child
452
453     def get(self, namex):
454         """I return a Deferred that fires with the named child node,
455         which is an IFilesystemNode."""
456         name = normalize(namex)
457         d = self._read()
458         d.addCallback(self._get, name)
459         return d
460
461     def get_child_and_metadata(self, namex):
462         """I return a Deferred that fires with the (node, metadata) pair for
463         the named child. The node is an IFilesystemNode, and the metadata
464         is a dictionary."""
465         name = normalize(namex)
466         d = self._read()
467         d.addCallback(self._get_with_metadata, name)
468         return d
469
470     def get_metadata_for(self, namex):
471         name = normalize(namex)
472         d = self._read()
473         d.addCallback(lambda children: children[name][1])
474         return d
475
476     def set_metadata_for(self, namex, metadata):
477         name = normalize(namex)
478         if self.is_readonly():
479             return defer.fail(NotWriteableError())
480         assert isinstance(metadata, dict)
481         s = MetadataSetter(self, name, metadata,
482                            create_readonly_node=self._create_readonly_node)
483         d = self._node.modify(s.modify)
484         d.addCallback(lambda res: self)
485         return d
486
487     def get_child_at_path(self, pathx):
488         """Transform a child path into an IFilesystemNode.
489
490         I perform a recursive series of 'get' operations to find the named
491         descendant node. I return a Deferred that fires with the node, or
492         errbacks with IndexError if the node could not be found.
493
494         The path can be either a single string (slash-separated) or a list of
495         path-name elements.
496         """
497         d = self.get_child_and_metadata_at_path(pathx)
498         d.addCallback(lambda (node, metadata): node)
499         return d
500
501     def get_child_and_metadata_at_path(self, pathx):
502         """Transform a child path into an IFilesystemNode and
503         a metadata dictionary from the last edge that was traversed.
504         """
505
506         if not pathx:
507             return defer.succeed((self, {}))
508         if isinstance(pathx, (list, tuple)):
509             pass
510         else:
511             pathx = pathx.split("/")
512         for p in pathx:
513             assert isinstance(p, unicode), p
514         childnamex = pathx[0]
515         remaining_pathx = pathx[1:]
516         if remaining_pathx:
517             d = self.get(childnamex)
518             d.addCallback(lambda node:
519                           node.get_child_and_metadata_at_path(remaining_pathx))
520             return d
521         d = self.get_child_and_metadata(childnamex)
522         return d
523
524     def set_uri(self, namex, writecap, readcap, metadata=None, overwrite=True):
525         precondition(isinstance(writecap, (str,type(None))), writecap)
526         precondition(isinstance(readcap, (str,type(None))), readcap)
527
528         # We now allow packing unknown nodes, provided they are valid
529         # for this type of directory.
530         child_node = self._create_and_validate_node(writecap, readcap, namex)
531         d = self.set_node(namex, child_node, metadata, overwrite)
532         d.addCallback(lambda res: child_node)
533         return d
534
535     def set_children(self, entries, overwrite=True):
536         # this takes URIs
537         a = Adder(self, overwrite=overwrite,
538                   create_readonly_node=self._create_readonly_node)
539         for (namex, e) in entries.iteritems():
540             assert isinstance(namex, unicode), namex
541             if len(e) == 2:
542                 writecap, readcap = e
543                 metadata = None
544             else:
545                 assert len(e) == 3
546                 writecap, readcap, metadata = e
547             precondition(isinstance(writecap, (str,type(None))), writecap)
548             precondition(isinstance(readcap, (str,type(None))), readcap)
549             
550             # We now allow packing unknown nodes, provided they are valid
551             # for this type of directory.
552             child_node = self._create_and_validate_node(writecap, readcap, namex)
553             a.set_node(namex, child_node, metadata)
554         d = self._node.modify(a.modify)
555         d.addCallback(lambda ign: self)
556         return d
557
558     def set_node(self, namex, child, metadata=None, overwrite=True):
559         """I add a child at the specific name. I return a Deferred that fires
560         when the operation finishes. This Deferred will fire with the child
561         node that was just added. I will replace any existing child of the
562         same name.
563
564         If this directory node is read-only, the Deferred will errback with a
565         NotWriteableError."""
566
567         precondition(IFilesystemNode.providedBy(child), child)
568
569         if self.is_readonly():
570             return defer.fail(NotWriteableError())
571         assert IFilesystemNode.providedBy(child), child
572         a = Adder(self, overwrite=overwrite,
573                   create_readonly_node=self._create_readonly_node)
574         a.set_node(namex, child, metadata)
575         d = self._node.modify(a.modify)
576         d.addCallback(lambda res: child)
577         return d
578
579     def set_nodes(self, entries, overwrite=True):
580         precondition(isinstance(entries, dict), entries)
581         if self.is_readonly():
582             return defer.fail(NotWriteableError())
583         a = Adder(self, entries, overwrite=overwrite,
584                   create_readonly_node=self._create_readonly_node)
585         d = self._node.modify(a.modify)
586         d.addCallback(lambda res: self)
587         return d
588
589
590     def add_file(self, namex, uploadable, metadata=None, overwrite=True):
591         """I upload a file (using the given IUploadable), then attach the
592         resulting FileNode to the directory at the given name. I return a
593         Deferred that fires (with the IFileNode of the uploaded file) when
594         the operation completes."""
595         name = normalize(namex)
596         if self.is_readonly():
597             return defer.fail(NotWriteableError())
598         d = self._uploader.upload(uploadable)
599         d.addCallback(lambda results:
600                       self._create_and_validate_node(results.uri, None, name))
601         d.addCallback(lambda node:
602                       self.set_node(name, node, metadata, overwrite))
603         return d
604
605     def delete(self, namex, must_exist=True, must_be_directory=False, must_be_file=False):
606         """I remove the child at the specific name. I return a Deferred that
607         fires (with the node just removed) when the operation finishes."""
608         if self.is_readonly():
609             return defer.fail(NotWriteableError())
610         deleter = Deleter(self, namex, must_exist=must_exist,
611                           must_be_directory=must_be_directory, must_be_file=must_be_file)
612         d = self._node.modify(deleter.modify)
613         d.addCallback(lambda res: deleter.old_child)
614         return d
615
616     def create_subdirectory(self, namex, initial_children={}, overwrite=True,
617                             mutable=True, metadata=None):
618         name = normalize(namex)
619         if self.is_readonly():
620             return defer.fail(NotWriteableError())
621         if mutable:
622             d = self._nodemaker.create_new_mutable_directory(initial_children)
623         else:
624             d = self._nodemaker.create_immutable_directory(initial_children)
625         def _created(child):
626             entries = {name: (child, metadata)}
627             a = Adder(self, entries, overwrite=overwrite,
628                       create_readonly_node=self._create_readonly_node)
629             d = self._node.modify(a.modify)
630             d.addCallback(lambda res: child)
631             return d
632         d.addCallback(_created)
633         return d
634
635     def move_child_to(self, current_child_namex, new_parent,
636                       new_child_namex=None, overwrite=True):
637         """I take one of my children and move them to a new parent. The child
638         is referenced by name. On the new parent, the child will live under
639         'new_child_name', which defaults to 'current_child_name'. I return a
640         Deferred that fires when the operation finishes."""
641
642         if self.is_readonly() or new_parent.is_readonly():
643             return defer.fail(NotWriteableError())
644
645         current_child_name = normalize(current_child_namex)
646         if new_child_namex is None:
647             new_child_namex = current_child_name
648         d = self.get(current_child_name)
649         def sn(child):
650             return new_parent.set_node(new_child_namex, child,
651                                        overwrite=overwrite)
652         d.addCallback(sn)
653         d.addCallback(lambda child: self.delete(current_child_name))
654         return d
655
656
657     def deep_traverse(self, walker):
658         """Perform a recursive walk, using this dirnode as a root, notifying
659         the 'walker' instance of everything I encounter.
660
661         I call walker.enter_directory(parent, children) once for each dirnode
662         I visit, immediately after retrieving the list of children. I pass in
663         the parent dirnode and the dict of childname->(childnode,metadata).
664         This function should *not* traverse the children: I will do that.
665         enter_directory() is most useful for the deep-stats number that
666         counts how large a directory is.
667
668         I call walker.add_node(node, path) for each node (both files and
669         directories) I can reach. Most work should be done here.
670
671         I avoid loops by keeping track of verifier-caps and refusing to call
672         walker.add_node() or traverse a node that I've seen before. This
673         means that any file or directory will only be given to the walker
674         once. If files or directories are referenced multiple times by a
675         directory structure, this may appear to under-count or miss some of
676         them.
677
678         I return a Monitor which can be used to wait for the operation to
679         finish, learn about its progress, or cancel the operation.
680         """
681
682         # this is just a tree-walker, except that following each edge
683         # requires a Deferred. We used to use a ConcurrencyLimiter to limit
684         # fanout to 10 simultaneous operations, but the memory load of the
685         # queued operations was excessive (in one case, with 330k dirnodes,
686         # it caused the process to run into the 3.0GB-ish per-process 32bit
687         # linux memory limit, and crashed). So we use a single big Deferred
688         # chain, and do a strict depth-first traversal, one node at a time.
689         # This can be slower, because we aren't pipelining directory reads,
690         # but it brought the memory footprint down by roughly 50%.
691
692         monitor = Monitor()
693         walker.set_monitor(monitor)
694
695         found = set([self.get_verify_cap()])
696         d = self._deep_traverse_dirnode(self, [], walker, monitor, found)
697         d.addCallback(lambda ignored: walker.finish())
698         d.addBoth(monitor.finish)
699         d.addErrback(lambda f: None)
700
701         return monitor
702
703     def _deep_traverse_dirnode(self, node, path, walker, monitor, found):
704         # process this directory, then walk its children
705         monitor.raise_if_cancelled()
706         d = defer.maybeDeferred(walker.add_node, node, path)
707         d.addCallback(lambda ignored: node.list())
708         d.addCallback(self._deep_traverse_dirnode_children, node, path,
709                       walker, monitor, found)
710         return d
711
712     def _deep_traverse_dirnode_children(self, children, parent, path,
713                                         walker, monitor, found):
714         monitor.raise_if_cancelled()
715         d = defer.maybeDeferred(walker.enter_directory, parent, children)
716         # we process file-like children first, so we can drop their FileNode
717         # objects as quickly as possible. Tests suggest that a FileNode (held
718         # in the client's nodecache) consumes about 2440 bytes. dirnodes (not
719         # in the nodecache) seem to consume about 2000 bytes.
720         dirkids = []
721         filekids = []
722         for name, (child, metadata) in sorted(children.iteritems()):
723             childpath = path + [name]
724             if isinstance(child, UnknownNode):
725                 walker.add_node(child, childpath)
726                 continue
727             verifier = child.get_verify_cap()
728             # allow LIT files (for which verifier==None) to be processed
729             if (verifier is not None) and (verifier in found):
730                 continue
731             found.add(verifier)
732             if IDirectoryNode.providedBy(child):
733                 dirkids.append( (child, childpath) )
734             else:
735                 filekids.append( (child, childpath) )
736         for i, (child, childpath) in enumerate(filekids):
737             d.addCallback(lambda ignored, child=child, childpath=childpath:
738                           walker.add_node(child, childpath))
739             # to work around the Deferred tail-recursion problem
740             # (specifically the defer.succeed flavor) requires us to avoid
741             # doing more than 158 LIT files in a row. We insert a turn break
742             # once every 100 files (LIT or CHK) to preserve some stack space
743             # for other code. This is a different expression of the same
744             # Twisted problem as in #237.
745             if i % 100 == 99:
746                 d.addCallback(lambda ignored: fireEventually())
747         for (child, childpath) in dirkids:
748             d.addCallback(lambda ignored, child=child, childpath=childpath:
749                           self._deep_traverse_dirnode(child, childpath,
750                                                       walker, monitor,
751                                                       found))
752         return d
753
754
755     def build_manifest(self):
756         """Return a Monitor, with a ['status'] that will be a list of (path,
757         cap) tuples, for all nodes (directories and files) reachable from
758         this one."""
759         walker = ManifestWalker(self)
760         return self.deep_traverse(walker)
761
762     def start_deep_stats(self):
763         # Since deep_traverse tracks verifier caps, we avoid double-counting
764         # children for which we've got both a write-cap and a read-cap
765         return self.deep_traverse(DeepStats(self))
766
767     def start_deep_check(self, verify=False, add_lease=False):
768         return self.deep_traverse(DeepChecker(self, verify, repair=False, add_lease=add_lease))
769
770     def start_deep_check_and_repair(self, verify=False, add_lease=False):
771         return self.deep_traverse(DeepChecker(self, verify, repair=True, add_lease=add_lease))
772
773
774
775 class DeepStats:
776     def __init__(self, origin):
777         self.origin = origin
778         self.stats = {}
779         for k in ["count-immutable-files",
780                   "count-mutable-files",
781                   "count-literal-files",
782                   "count-files",
783                   "count-directories",
784                   "count-unknown",
785                   "size-immutable-files",
786                   #"size-mutable-files",
787                   "size-literal-files",
788                   "size-directories",
789                   "largest-directory",
790                   "largest-directory-children",
791                   "largest-immutable-file",
792                   #"largest-mutable-file",
793                   ]:
794             self.stats[k] = 0
795         self.histograms = {}
796         for k in ["size-files-histogram"]:
797             self.histograms[k] = {} # maps (min,max) to count
798         self.buckets = [ (0,0), (1,3)]
799         self.root = math.sqrt(10)
800
801     def set_monitor(self, monitor):
802         self.monitor = monitor
803         monitor.origin_si = self.origin.get_storage_index()
804         monitor.set_status(self.get_results())
805
806     def add_node(self, node, childpath):
807         if isinstance(node, UnknownNode):
808             self.add("count-unknown")
809         elif IDirectoryNode.providedBy(node):
810             self.add("count-directories")
811         elif IMutableFileNode.providedBy(node):
812             self.add("count-files")
813             self.add("count-mutable-files")
814             # TODO: update the servermap, compute a size, add it to
815             # size-mutable-files, max it into "largest-mutable-file"
816         elif IImmutableFileNode.providedBy(node): # CHK and LIT
817             self.add("count-files")
818             size = node.get_size()
819             self.histogram("size-files-histogram", size)
820             theuri = from_string(node.get_uri())
821             if isinstance(theuri, LiteralFileURI):
822                 self.add("count-literal-files")
823                 self.add("size-literal-files", size)
824             else:
825                 self.add("count-immutable-files")
826                 self.add("size-immutable-files", size)
827                 self.max("largest-immutable-file", size)
828
829     def enter_directory(self, parent, children):
830         dirsize_bytes = parent.get_size()
831         if dirsize_bytes is not None:
832             self.add("size-directories", dirsize_bytes)
833             self.max("largest-directory", dirsize_bytes)
834         dirsize_children = len(children)
835         self.max("largest-directory-children", dirsize_children)
836
837     def add(self, key, value=1):
838         self.stats[key] += value
839
840     def max(self, key, value):
841         self.stats[key] = max(self.stats[key], value)
842
843     def which_bucket(self, size):
844         # return (min,max) such that min <= size <= max
845         # values are from the set (0,0), (1,3), (4,10), (11,31), (32,100),
846         # (101,316), (317, 1000), etc: two per decade
847         assert size >= 0
848         i = 0
849         while True:
850             if i >= len(self.buckets):
851                 # extend the list
852                 new_lower = self.buckets[i-1][1]+1
853                 new_upper = int(mathutil.next_power_of_k(new_lower, self.root))
854                 self.buckets.append( (new_lower, new_upper) )
855             maybe = self.buckets[i]
856             if maybe[0] <= size <= maybe[1]:
857                 return maybe
858             i += 1
859
860     def histogram(self, key, size):
861         bucket = self.which_bucket(size)
862         h = self.histograms[key]
863         if bucket not in h:
864             h[bucket] = 0
865         h[bucket] += 1
866
867     def get_results(self):
868         stats = self.stats.copy()
869         for key in self.histograms:
870             h = self.histograms[key]
871             out = [ (bucket[0], bucket[1], h[bucket]) for bucket in h ]
872             out.sort()
873             stats[key] = out
874         return stats
875
876     def finish(self):
877         return self.get_results()
878
879 class ManifestWalker(DeepStats):
880     def __init__(self, origin):
881         DeepStats.__init__(self, origin)
882         self.manifest = []
883         self.storage_index_strings = set()
884         self.verifycaps = set()
885
886     def add_node(self, node, path):
887         self.manifest.append( (tuple(path), node.get_uri()) )
888         si = node.get_storage_index()
889         if si:
890             self.storage_index_strings.add(base32.b2a(si))
891         v = node.get_verify_cap()
892         if v:
893             self.verifycaps.add(v.to_string())
894         return DeepStats.add_node(self, node, path)
895
896     def get_results(self):
897         stats = DeepStats.get_results(self)
898         return {"manifest": self.manifest,
899                 "verifycaps": self.verifycaps,
900                 "storage-index": self.storage_index_strings,
901                 "stats": stats,
902                 }
903
904
905 class DeepChecker:
906     def __init__(self, root, verify, repair, add_lease):
907         root_si = root.get_storage_index()
908         if root_si:
909             root_si_base32 = base32.b2a(root_si)
910         else:
911             root_si_base32 = ""
912         self._lp = log.msg(format="deep-check starting (%(si)s),"
913                            " verify=%(verify)s, repair=%(repair)s",
914                            si=root_si_base32, verify=verify, repair=repair)
915         self._verify = verify
916         self._repair = repair
917         self._add_lease = add_lease
918         if repair:
919             self._results = DeepCheckAndRepairResults(root_si)
920         else:
921             self._results = DeepCheckResults(root_si)
922         self._stats = DeepStats(root)
923
924     def set_monitor(self, monitor):
925         self.monitor = monitor
926         monitor.set_status(self._results)
927
928     def add_node(self, node, childpath):
929         if self._repair:
930             d = node.check_and_repair(self.monitor, self._verify, self._add_lease)
931             d.addCallback(self._results.add_check_and_repair, childpath)
932         else:
933             d = node.check(self.monitor, self._verify, self._add_lease)
934             d.addCallback(self._results.add_check, childpath)
935         d.addCallback(lambda ignored: self._stats.add_node(node, childpath))
936         return d
937
938     def enter_directory(self, parent, children):
939         return self._stats.enter_directory(parent, children)
940
941     def finish(self):
942         log.msg("deep-check done", parent=self._lp)
943         self._results.update_stats(self._stats.get_results())
944         return self._results
945
946
947 # use client.create_dirnode() to make one of these
948
949