4 from zope.interface import implements
5 from twisted.internet import defer
6 from foolscap.api import fireEventually
8 from allmydata.mutable.common import NotMutableError
9 from allmydata.mutable.filenode import MutableFileNode
10 from allmydata.interfaces import IMutableFileNode, IDirectoryNode,\
11 IURI, IFileNode, IMutableFileURI, IFilesystemNode, \
12 ExistingChildError, NoSuchChildError, ICheckable, IDeepCheckable
13 from allmydata.check_results import DeepCheckResults, \
14 DeepCheckAndRepairResults
15 from allmydata.monitor import Monitor
16 from allmydata.util import hashutil, mathutil, base32, log
17 from allmydata.util.assertutil import _assert, precondition
18 from allmydata.util.hashutil import netstring
19 from allmydata.util.netstring import split_netstring
20 from allmydata.uri import NewDirectoryURI, LiteralFileURI, from_string
21 from pycryptopp.cipher.aes import AES
24 def __init__(self, node, name, must_exist=True):
27 self.must_exist = True
28 def modify(self, old_contents, servermap, first_time):
29 children = self.node._unpack_contents(old_contents)
30 if self.name not in children:
31 if first_time and self.must_exist:
32 raise NoSuchChildError(self.name)
35 self.old_child, metadata = children[self.name]
36 del children[self.name]
37 new_contents = self.node._pack_contents(children)
41 def __init__(self, node, name, metadata):
44 self.metadata = metadata
46 def modify(self, old_contents, servermap, first_time):
47 children = self.node._unpack_contents(old_contents)
48 if self.name not in children:
49 raise NoSuchChildError(self.name)
50 children[self.name] = (children[self.name][0], self.metadata)
51 new_contents = self.node._pack_contents(children)
56 def __init__(self, node, entries=None, overwrite=True):
60 self.entries = entries
61 self.overwrite = overwrite
63 def set_node(self, name, node, metadata):
64 precondition(isinstance(name, unicode), name)
65 precondition(IFilesystemNode.providedBy(node), node)
66 self.entries.append( [name, node, metadata] )
68 def modify(self, old_contents, servermap, first_time):
69 children = self.node._unpack_contents(old_contents)
71 for e in self.entries:
77 name, child, new_metadata = e
78 assert _assert(IFilesystemNode.providedBy(child), child)
79 assert isinstance(name, unicode)
81 if not self.overwrite:
82 raise ExistingChildError("child '%s' already exists" % name)
83 metadata = children[name][1].copy()
85 metadata = {"ctime": now,
93 if new_metadata is not None:
94 # Overwrite all metadata.
95 newmd = new_metadata.copy()
98 if newmd.has_key('tahoe'):
100 if metadata.has_key('tahoe'):
101 newmd['tahoe'] = metadata['tahoe']
105 # For backwards compatibility with Tahoe < 1.4.0:
106 if "ctime" not in metadata:
107 metadata["ctime"] = now
108 metadata["mtime"] = now
111 sysmd = metadata.get('tahoe', {})
112 if not 'linkcrtime' in sysmd:
113 if "ctime" in metadata:
114 # In Tahoe < 1.4.0 we used the word "ctime" to mean what Tahoe >= 1.4.0
115 # calls "linkcrtime".
116 sysmd["linkcrtime"] = metadata["ctime"]
118 sysmd["linkcrtime"] = now
119 sysmd["linkmotime"] = now
121 children[name] = (child, metadata)
122 new_contents = self.node._pack_contents(children)
125 class NewDirectoryNode:
126 implements(IDirectoryNode, ICheckable, IDeepCheckable)
127 filenode_class = MutableFileNode
129 def __init__(self, client):
130 self._client = client
131 self._most_recent_size = None
134 return "<%s %s %s>" % (self.__class__.__name__, self.is_readonly() and "RO" or "RW", hasattr(self, '_uri') and self._uri.abbrev())
135 def init_from_uri(self, myuri):
136 self._uri = IURI(myuri)
137 self._node = self.filenode_class(self._client)
138 self._node.init_from_uri(self._uri.get_filenode_uri())
141 def create(self, keypair_generator=None):
143 Returns a deferred that eventually fires with self once the directory
144 has been created (distributed across a set of storage servers).
146 # first we create a MutableFileNode with empty_contents, then use its
147 # URI to create our own.
148 self._node = self.filenode_class(self._client)
149 empty_contents = self._pack_contents({})
150 d = self._node.create(empty_contents, keypair_generator)
151 d.addCallback(self._filenode_created)
153 def _filenode_created(self, res):
154 self._uri = NewDirectoryURI(IMutableFileURI(self._node.get_uri()))
158 # return the size of our backing mutable file, in bytes, if we've
160 return self._most_recent_size
162 def _set_size(self, data):
163 self._most_recent_size = len(data)
167 d = self._node.download_best_version()
168 d.addCallback(self._set_size)
169 d.addCallback(self._unpack_contents)
172 def _encrypt_rwcap(self, rwcap):
173 assert isinstance(rwcap, str)
175 key = hashutil.mutable_rwcap_key_hash(IV, self._node.get_writekey())
177 crypttext = cryptor.process(rwcap)
178 mac = hashutil.hmac(key, IV + crypttext)
179 assert len(mac) == 32
180 return IV + crypttext + mac
181 # The MAC is not checked by readers in Tahoe >= 1.3.0, but we still produce it for the sake of older readers.
183 def _decrypt_rwcapdata(self, encwrcap):
185 crypttext = encwrcap[16:-32]
186 key = hashutil.mutable_rwcap_key_hash(IV, self._node.get_writekey())
188 plaintext = cryptor.process(crypttext)
191 def _create_node(self, child_uri):
192 return self._client.create_node_from_uri(child_uri)
194 def _unpack_contents(self, data):
195 # the directory is serialized as a list of netstrings, one per child.
196 # Each child is serialized as a list of four netstrings: (name,
197 # rocap, rwcap, metadata), in which the name,rocap,metadata are in
198 # cleartext. The 'name' is UTF-8 encoded. The rwcap is formatted as:
199 # pack("16ss32s", iv, AES(H(writekey+iv), plaintextrwcap), mac)
200 assert isinstance(data, str)
201 # an empty directory is serialized as an empty string
204 writeable = not self.is_readonly()
207 entry, data = split_netstring(data, 1, True)
208 name, rocap, rwcapdata, metadata_s = split_netstring(entry, 4)
209 name = name.decode("utf-8")
211 rwcap = self._decrypt_rwcapdata(rwcapdata)
212 child = self._create_node(rwcap)
214 child = self._create_node(rocap)
215 metadata = simplejson.loads(metadata_s)
216 assert isinstance(metadata, dict)
217 children[name] = (child, metadata)
220 def _pack_contents(self, children):
221 # expects children in the same format as _unpack_contents
222 assert isinstance(children, dict)
224 for name in sorted(children.keys()):
225 child, metadata = children[name]
226 assert isinstance(name, unicode)
227 assert (IFileNode.providedBy(child)
228 or IMutableFileNode.providedBy(child)
229 or IDirectoryNode.providedBy(child)), (name,child)
230 assert isinstance(metadata, dict)
231 rwcap = child.get_uri() # might be RO if the child is not writeable
232 assert isinstance(rwcap, str), rwcap
233 rocap = child.get_readonly_uri()
234 assert isinstance(rocap, str), rocap
235 entry = "".join([netstring(name.encode("utf-8")),
237 netstring(self._encrypt_rwcap(rwcap)),
238 netstring(simplejson.dumps(metadata))])
239 entries.append(netstring(entry))
240 return "".join(entries)
242 def is_readonly(self):
243 return self._node.is_readonly()
244 def is_mutable(self):
245 return self._node.is_mutable()
248 return self._uri.to_string()
250 def get_readonly_uri(self):
251 return self._uri.get_readonly().to_string()
253 def get_verify_cap(self):
254 return self._uri.get_verify_cap()
256 def get_repair_cap(self):
257 if self._node.is_readonly():
261 def get_storage_index(self):
262 return self._uri._filenode_uri.storage_index
264 def check(self, monitor, verify=False, add_lease=False):
265 """Perform a file check. See IChecker.check for details."""
266 return self._node.check(monitor, verify, add_lease)
267 def check_and_repair(self, monitor, verify=False, add_lease=False):
268 return self._node.check_and_repair(monitor, verify, add_lease)
271 """I return a Deferred that fires with a dictionary mapping child
272 name to a tuple of (IFileNode or IDirectoryNode, metadata)."""
275 def has_child(self, name):
276 """I return a Deferred that fires with a boolean, True if there
277 exists a child of the given name, False if not."""
278 assert isinstance(name, unicode)
280 d.addCallback(lambda children: children.has_key(name))
283 def _get(self, children, name):
284 child = children.get(name)
286 raise NoSuchChildError(name)
289 def _get_with_metadata(self, children, name):
290 child = children.get(name)
292 raise NoSuchChildError(name)
296 """I return a Deferred that fires with the named child node,
297 which is either an IFileNode or an IDirectoryNode."""
298 assert isinstance(name, unicode)
300 d.addCallback(self._get, name)
303 def get_child_and_metadata(self, name):
304 """I return a Deferred that fires with the (node, metadata) pair for
305 the named child. The node is either an IFileNode or an
306 IDirectoryNode, and the metadata is a dictionary."""
307 assert isinstance(name, unicode)
309 d.addCallback(self._get_with_metadata, name)
312 def get_metadata_for(self, name):
313 assert isinstance(name, unicode)
315 d.addCallback(lambda children: children[name][1])
318 def set_metadata_for(self, name, metadata):
319 assert isinstance(name, unicode)
320 if self.is_readonly():
321 return defer.fail(NotMutableError())
322 assert isinstance(metadata, dict)
323 s = MetadataSetter(self, name, metadata)
324 d = self._node.modify(s.modify)
325 d.addCallback(lambda res: self)
328 def get_child_at_path(self, path):
329 """Transform a child path into an IDirectoryNode or IFileNode.
331 I perform a recursive series of 'get' operations to find the named
332 descendant node. I return a Deferred that fires with the node, or
333 errbacks with IndexError if the node could not be found.
335 The path can be either a single string (slash-separated) or a list of
338 d = self.get_child_and_metadata_at_path(path)
339 d.addCallback(lambda (node, metadata): node)
342 def get_child_and_metadata_at_path(self, path):
343 """Transform a child path into an IDirectoryNode or IFileNode and
344 a metadata dictionary from the last edge that was traversed.
348 return defer.succeed((self, {}))
349 if isinstance(path, (list, tuple)):
352 path = path.split("/")
354 assert isinstance(p, unicode)
356 remaining_path = path[1:]
358 d = self.get(childname)
359 d.addCallback(lambda node:
360 node.get_child_and_metadata_at_path(remaining_path))
362 d = self.get_child_and_metadata(childname)
365 def set_uri(self, name, child_uri, metadata=None, overwrite=True):
366 """I add a child (by URI) at the specific name. I return a Deferred
367 that fires with the child node when the operation finishes. I will
368 replace any existing child of the same name.
370 The child_uri could be for a file, or for a directory (either
371 read-write or read-only, using a URI that came from get_uri() ).
373 If this directory node is read-only, the Deferred will errback with a
375 precondition(isinstance(name, unicode), name)
376 precondition(isinstance(child_uri, str), child_uri)
377 child_node = self._create_node(child_uri)
378 d = self.set_node(name, child_node, metadata, overwrite)
379 d.addCallback(lambda res: child_node)
382 def set_children(self, entries, overwrite=True):
384 a = Adder(self, overwrite=overwrite)
392 name, child_uri, metadata = e
393 assert isinstance(name, unicode)
394 a.set_node(name, self._create_node(child_uri), metadata)
395 return self._node.modify(a.modify)
397 def set_node(self, name, child, metadata=None, overwrite=True):
398 """I add a child at the specific name. I return a Deferred that fires
399 when the operation finishes. This Deferred will fire with the child
400 node that was just added. I will replace any existing child of the
403 If this directory node is read-only, the Deferred will errback with a
406 precondition(IFilesystemNode.providedBy(child), child)
408 if self.is_readonly():
409 return defer.fail(NotMutableError())
410 assert isinstance(name, unicode)
411 assert IFilesystemNode.providedBy(child), child
412 a = Adder(self, overwrite=overwrite)
413 a.set_node(name, child, metadata)
414 d = self._node.modify(a.modify)
415 d.addCallback(lambda res: child)
418 def set_nodes(self, entries, overwrite=True):
419 if self.is_readonly():
420 return defer.fail(NotMutableError())
421 a = Adder(self, entries, overwrite=overwrite)
422 d = self._node.modify(a.modify)
423 d.addCallback(lambda res: None)
427 def add_file(self, name, uploadable, metadata=None, overwrite=True):
428 """I upload a file (using the given IUploadable), then attach the
429 resulting FileNode to the directory at the given name. I return a
430 Deferred that fires (with the IFileNode of the uploaded file) when
431 the operation completes."""
432 assert isinstance(name, unicode)
433 if self.is_readonly():
434 return defer.fail(NotMutableError())
435 d = self._client.upload(uploadable)
436 d.addCallback(lambda results: results.uri)
437 d.addCallback(self._client.create_node_from_uri)
438 d.addCallback(lambda node:
439 self.set_node(name, node, metadata, overwrite))
442 def delete(self, name):
443 """I remove the child at the specific name. I return a Deferred that
444 fires (with the node just removed) when the operation finishes."""
445 assert isinstance(name, unicode)
446 if self.is_readonly():
447 return defer.fail(NotMutableError())
448 deleter = Deleter(self, name)
449 d = self._node.modify(deleter.modify)
450 d.addCallback(lambda res: deleter.old_child)
453 def create_empty_directory(self, name, overwrite=True):
454 """I create and attach an empty directory at the given name. I return
455 a Deferred that fires (with the new directory node) when the
456 operation finishes."""
457 assert isinstance(name, unicode)
458 if self.is_readonly():
459 return defer.fail(NotMutableError())
460 d = self._client.create_empty_dirnode()
462 entries = [(name, child, None)]
463 a = Adder(self, entries, overwrite=overwrite)
464 d = self._node.modify(a.modify)
465 d.addCallback(lambda res: child)
467 d.addCallback(_created)
470 def move_child_to(self, current_child_name, new_parent,
471 new_child_name=None, overwrite=True):
472 """I take one of my children and move them to a new parent. The child
473 is referenced by name. On the new parent, the child will live under
474 'new_child_name', which defaults to 'current_child_name'. I return a
475 Deferred that fires when the operation finishes."""
476 assert isinstance(current_child_name, unicode)
477 if self.is_readonly() or new_parent.is_readonly():
478 return defer.fail(NotMutableError())
479 if new_child_name is None:
480 new_child_name = current_child_name
481 assert isinstance(new_child_name, unicode)
482 d = self.get(current_child_name)
484 return new_parent.set_node(new_child_name, child,
487 d.addCallback(lambda child: self.delete(current_child_name))
491 def deep_traverse(self, walker):
492 """Perform a recursive walk, using this dirnode as a root, notifying
493 the 'walker' instance of everything I encounter.
495 I call walker.enter_directory(parent, children) once for each dirnode
496 I visit, immediately after retrieving the list of children. I pass in
497 the parent dirnode and the dict of childname->(childnode,metadata).
498 This function should *not* traverse the children: I will do that.
499 enter_directory() is most useful for the deep-stats number that
500 counts how large a directory is.
502 I call walker.add_node(node, path) for each node (both files and
503 directories) I can reach. Most work should be done here.
505 I avoid loops by keeping track of verifier-caps and refusing to call
506 walker.add_node() or traverse a node that I've seen before. This
507 means that any file or directory will only be given to the walker
508 once. If files or directories are referenced multiple times by a
509 directory structure, this may appear to under-count or miss some of
512 I return a Monitor which can be used to wait for the operation to
513 finish, learn about its progress, or cancel the operation.
516 # this is just a tree-walker, except that following each edge
517 # requires a Deferred. We used to use a ConcurrencyLimiter to limit
518 # fanout to 10 simultaneous operations, but the memory load of the
519 # queued operations was excessive (in one case, with 330k dirnodes,
520 # it caused the process to run into the 3.0GB-ish per-process 32bit
521 # linux memory limit, and crashed). So we use a single big Deferred
522 # chain, and do a strict depth-first traversal, one node at a time.
523 # This can be slower, because we aren't pipelining directory reads,
524 # but it brought the memory footprint down by roughly 50%.
527 walker.set_monitor(monitor)
529 found = set([self.get_verify_cap()])
530 d = self._deep_traverse_dirnode(self, [], walker, monitor, found)
531 d.addCallback(lambda ignored: walker.finish())
532 d.addBoth(monitor.finish)
533 d.addErrback(lambda f: None)
537 def _deep_traverse_dirnode(self, node, path, walker, monitor, found):
538 # process this directory, then walk its children
539 monitor.raise_if_cancelled()
540 d = defer.maybeDeferred(walker.add_node, node, path)
541 d.addCallback(lambda ignored: node.list())
542 d.addCallback(self._deep_traverse_dirnode_children, node, path,
543 walker, monitor, found)
546 def _deep_traverse_dirnode_children(self, children, parent, path,
547 walker, monitor, found):
548 monitor.raise_if_cancelled()
549 d = defer.maybeDeferred(walker.enter_directory, parent, children)
550 # we process file-like children first, so we can drop their FileNode
551 # objects as quickly as possible. Tests suggest that a FileNode (held
552 # in the client's nodecache) consumes about 2440 bytes. dirnodes (not
553 # in the nodecache) seem to consume about 2000 bytes.
556 for name, (child, metadata) in sorted(children.iteritems()):
557 verifier = child.get_verify_cap()
558 # allow LIT files (for which verifier==None) to be processed
559 if (verifier is not None) and (verifier in found):
562 childpath = path + [name]
563 if IDirectoryNode.providedBy(child):
564 dirkids.append( (child, childpath) )
566 filekids.append( (child, childpath) )
567 for i, (child, childpath) in enumerate(filekids):
568 d.addCallback(lambda ignored, child=child, childpath=childpath:
569 walker.add_node(child, childpath))
570 # to work around the Deferred tail-recursion problem
571 # (specifically the defer.succeed flavor) requires us to avoid
572 # doing more than 158 LIT files in a row. We insert a turn break
573 # once every 100 files (LIT or CHK) to preserve some stack space
574 # for other code. This is a different expression of the same
575 # Twisted problem as in #237.
577 d.addCallback(lambda ignored: fireEventually())
578 for (child, childpath) in dirkids:
579 d.addCallback(lambda ignored, child=child, childpath=childpath:
580 self._deep_traverse_dirnode(child, childpath,
586 def build_manifest(self):
587 """Return a Monitor, with a ['status'] that will be a list of (path,
588 cap) tuples, for all nodes (directories and files) reachable from
590 walker = ManifestWalker(self)
591 return self.deep_traverse(walker)
593 def start_deep_stats(self):
594 # Since deep_traverse tracks verifier caps, we avoid double-counting
595 # children for which we've got both a write-cap and a read-cap
596 return self.deep_traverse(DeepStats(self))
598 def start_deep_check(self, verify=False, add_lease=False):
599 return self.deep_traverse(DeepChecker(self, verify, repair=False, add_lease=add_lease))
601 def start_deep_check_and_repair(self, verify=False, add_lease=False):
602 return self.deep_traverse(DeepChecker(self, verify, repair=True, add_lease=add_lease))
607 def __init__(self, origin):
610 for k in ["count-immutable-files",
611 "count-mutable-files",
612 "count-literal-files",
615 "size-immutable-files",
616 #"size-mutable-files",
617 "size-literal-files",
620 "largest-directory-children",
621 "largest-immutable-file",
622 #"largest-mutable-file",
626 for k in ["size-files-histogram"]:
627 self.histograms[k] = {} # maps (min,max) to count
628 self.buckets = [ (0,0), (1,3)]
629 self.root = math.sqrt(10)
631 def set_monitor(self, monitor):
632 self.monitor = monitor
633 monitor.origin_si = self.origin.get_storage_index()
634 monitor.set_status(self.get_results())
636 def add_node(self, node, childpath):
637 if IDirectoryNode.providedBy(node):
638 self.add("count-directories")
639 elif IMutableFileNode.providedBy(node):
640 self.add("count-files")
641 self.add("count-mutable-files")
642 # TODO: update the servermap, compute a size, add it to
643 # size-mutable-files, max it into "largest-mutable-file"
644 elif IFileNode.providedBy(node): # CHK and LIT
645 self.add("count-files")
646 size = node.get_size()
647 self.histogram("size-files-histogram", size)
648 theuri = from_string(node.get_uri())
649 if isinstance(theuri, LiteralFileURI):
650 self.add("count-literal-files")
651 self.add("size-literal-files", size)
653 self.add("count-immutable-files")
654 self.add("size-immutable-files", size)
655 self.max("largest-immutable-file", size)
657 def enter_directory(self, parent, children):
658 dirsize_bytes = parent.get_size()
659 dirsize_children = len(children)
660 self.add("size-directories", dirsize_bytes)
661 self.max("largest-directory", dirsize_bytes)
662 self.max("largest-directory-children", dirsize_children)
664 def add(self, key, value=1):
665 self.stats[key] += value
667 def max(self, key, value):
668 self.stats[key] = max(self.stats[key], value)
670 def which_bucket(self, size):
671 # return (min,max) such that min <= size <= max
672 # values are from the set (0,0), (1,3), (4,10), (11,31), (32,100),
673 # (101,316), (317, 1000), etc: two per decade
677 if i >= len(self.buckets):
679 new_lower = self.buckets[i-1][1]+1
680 new_upper = int(mathutil.next_power_of_k(new_lower, self.root))
681 self.buckets.append( (new_lower, new_upper) )
682 maybe = self.buckets[i]
683 if maybe[0] <= size <= maybe[1]:
687 def histogram(self, key, size):
688 bucket = self.which_bucket(size)
689 h = self.histograms[key]
694 def get_results(self):
695 stats = self.stats.copy()
696 for key in self.histograms:
697 h = self.histograms[key]
698 out = [ (bucket[0], bucket[1], h[bucket]) for bucket in h ]
704 return self.get_results()
706 class ManifestWalker(DeepStats):
707 def __init__(self, origin):
708 DeepStats.__init__(self, origin)
710 self.storage_index_strings = set()
711 self.verifycaps = set()
713 def add_node(self, node, path):
714 self.manifest.append( (tuple(path), node.get_uri()) )
715 si = node.get_storage_index()
717 self.storage_index_strings.add(base32.b2a(si))
718 v = node.get_verify_cap()
720 self.verifycaps.add(v.to_string())
721 return DeepStats.add_node(self, node, path)
723 def get_results(self):
724 stats = DeepStats.get_results(self)
725 return {"manifest": self.manifest,
726 "verifycaps": self.verifycaps,
727 "storage-index": self.storage_index_strings,
733 def __init__(self, root, verify, repair, add_lease):
734 root_si = root.get_storage_index()
735 self._lp = log.msg(format="deep-check starting (%(si)s),"
736 " verify=%(verify)s, repair=%(repair)s",
737 si=base32.b2a(root_si), verify=verify, repair=repair)
738 self._verify = verify
739 self._repair = repair
740 self._add_lease = add_lease
742 self._results = DeepCheckAndRepairResults(root_si)
744 self._results = DeepCheckResults(root_si)
745 self._stats = DeepStats(root)
747 def set_monitor(self, monitor):
748 self.monitor = monitor
749 monitor.set_status(self._results)
751 def add_node(self, node, childpath):
753 d = node.check_and_repair(self.monitor, self._verify, self._add_lease)
754 d.addCallback(self._results.add_check_and_repair, childpath)
756 d = node.check(self.monitor, self._verify, self._add_lease)
757 d.addCallback(self._results.add_check, childpath)
758 d.addCallback(lambda ignored: self._stats.add_node(node, childpath))
761 def enter_directory(self, parent, children):
762 return self._stats.enter_directory(parent, children)
765 log.msg("deep-check done", parent=self._lp)
766 self._results.update_stats(self._stats.get_results())
770 # use client.create_dirnode() to make one of these