4 from zope.interface import implements
5 from twisted.internet import defer
7 from allmydata.mutable.common import NotMutableError
8 from allmydata.mutable.filenode import MutableFileNode
9 from allmydata.interfaces import IMutableFileNode, IDirectoryNode,\
10 IURI, IFileNode, IMutableFileURI, IFilesystemNode, \
11 ExistingChildError, NoSuchChildError, ICheckable, IDeepCheckable
12 from allmydata.check_results import DeepCheckResults, \
13 DeepCheckAndRepairResults
14 from allmydata.monitor import Monitor
15 from allmydata.util import hashutil, mathutil, base32, log
16 from allmydata.util.assertutil import _assert, precondition
17 from allmydata.util.hashutil import netstring
18 from allmydata.util.netstring import split_netstring
19 from allmydata.uri import NewDirectoryURI, LiteralFileURI, from_string
20 from pycryptopp.cipher.aes import AES
23 def __init__(self, node, name, must_exist=True):
26 self.must_exist = True
27 def modify(self, old_contents, servermap, first_time):
28 children = self.node._unpack_contents(old_contents)
29 if self.name not in children:
30 if first_time and self.must_exist:
31 raise NoSuchChildError(self.name)
34 self.old_child, metadata = children[self.name]
35 del children[self.name]
36 new_contents = self.node._pack_contents(children)
40 def __init__(self, node, name, metadata):
43 self.metadata = metadata
45 def modify(self, old_contents, servermap, first_time):
46 children = self.node._unpack_contents(old_contents)
47 if self.name not in children:
48 raise NoSuchChildError(self.name)
49 children[self.name] = (children[self.name][0], self.metadata)
50 new_contents = self.node._pack_contents(children)
55 def __init__(self, node, entries=None, overwrite=True):
59 self.entries = entries
60 self.overwrite = overwrite
62 def set_node(self, name, node, metadata):
63 precondition(isinstance(name, unicode), name)
64 precondition(IFilesystemNode.providedBy(node), node)
65 self.entries.append( [name, node, metadata] )
67 def modify(self, old_contents, servermap, first_time):
68 children = self.node._unpack_contents(old_contents)
70 for e in self.entries:
76 name, child, new_metadata = e
77 assert _assert(IFilesystemNode.providedBy(child), child)
78 assert isinstance(name, unicode)
80 if not self.overwrite:
81 raise ExistingChildError("child '%s' already exists" % name)
82 metadata = children[name][1].copy()
84 metadata = {"ctime": now,
86 if new_metadata is None:
88 if "ctime" not in metadata:
89 metadata["ctime"] = now
90 metadata["mtime"] = now
93 metadata = new_metadata.copy()
94 children[name] = (child, metadata)
95 new_contents = self.node._pack_contents(children)
98 class NewDirectoryNode:
99 implements(IDirectoryNode, ICheckable, IDeepCheckable)
100 filenode_class = MutableFileNode
102 def __init__(self, client):
103 self._client = client
104 self._most_recent_size = None
107 return "<%s %s %s>" % (self.__class__.__name__, self.is_readonly() and "RO" or "RW", hasattr(self, '_uri') and self._uri.abbrev())
108 def init_from_uri(self, myuri):
109 self._uri = IURI(myuri)
110 self._node = self.filenode_class(self._client)
111 self._node.init_from_uri(self._uri.get_filenode_uri())
114 def create(self, keypair_generator=None):
116 Returns a deferred that eventually fires with self once the directory
117 has been created (distributed across a set of storage servers).
119 # first we create a MutableFileNode with empty_contents, then use its
120 # URI to create our own.
121 self._node = self.filenode_class(self._client)
122 empty_contents = self._pack_contents({})
123 d = self._node.create(empty_contents, keypair_generator)
124 d.addCallback(self._filenode_created)
126 def _filenode_created(self, res):
127 self._uri = NewDirectoryURI(IMutableFileURI(self._node.get_uri()))
131 # return the size of our backing mutable file, in bytes, if we've
133 return self._most_recent_size
135 def _set_size(self, data):
136 self._most_recent_size = len(data)
140 d = self._node.download_best_version()
141 d.addCallback(self._set_size)
142 d.addCallback(self._unpack_contents)
145 def _encrypt_rwcap(self, rwcap):
146 assert isinstance(rwcap, str)
148 key = hashutil.mutable_rwcap_key_hash(IV, self._node.get_writekey())
150 crypttext = cryptor.process(rwcap)
151 mac = hashutil.hmac(key, IV + crypttext)
152 assert len(mac) == 32
153 return IV + crypttext + mac
154 # The MAC is not checked by readers in Tahoe >= 1.3.0, but we still produce it for the sake of older readers.
156 def _decrypt_rwcapdata(self, encwrcap):
158 crypttext = encwrcap[16:-32]
159 key = hashutil.mutable_rwcap_key_hash(IV, self._node.get_writekey())
161 plaintext = cryptor.process(crypttext)
164 def _create_node(self, child_uri):
165 return self._client.create_node_from_uri(child_uri)
167 def _unpack_contents(self, data):
168 # the directory is serialized as a list of netstrings, one per child.
169 # Each child is serialized as a list of four netstrings: (name,
170 # rocap, rwcap, metadata), in which the name,rocap,metadata are in
171 # cleartext. The 'name' is UTF-8 encoded. The rwcap is formatted as:
172 # pack("16ss32s", iv, AES(H(writekey+iv), plaintextrwcap), mac)
173 assert isinstance(data, str)
174 # an empty directory is serialized as an empty string
177 writeable = not self.is_readonly()
180 entry, data = split_netstring(data, 1, True)
181 name, rocap, rwcapdata, metadata_s = split_netstring(entry, 4)
182 name = name.decode("utf-8")
184 rwcap = self._decrypt_rwcapdata(rwcapdata)
185 child = self._create_node(rwcap)
187 child = self._create_node(rocap)
188 metadata = simplejson.loads(metadata_s)
189 assert isinstance(metadata, dict)
190 children[name] = (child, metadata)
193 def _pack_contents(self, children):
194 # expects children in the same format as _unpack_contents
195 assert isinstance(children, dict)
197 for name in sorted(children.keys()):
198 child, metadata = children[name]
199 assert isinstance(name, unicode)
200 assert (IFileNode.providedBy(child)
201 or IMutableFileNode.providedBy(child)
202 or IDirectoryNode.providedBy(child)), (name,child)
203 assert isinstance(metadata, dict)
204 rwcap = child.get_uri() # might be RO if the child is not writeable
205 assert isinstance(rwcap, str), rwcap
206 rocap = child.get_readonly_uri()
207 assert isinstance(rocap, str), rocap
208 entry = "".join([netstring(name.encode("utf-8")),
210 netstring(self._encrypt_rwcap(rwcap)),
211 netstring(simplejson.dumps(metadata))])
212 entries.append(netstring(entry))
213 return "".join(entries)
215 def is_readonly(self):
216 return self._node.is_readonly()
217 def is_mutable(self):
218 return self._node.is_mutable()
221 return self._uri.to_string()
223 def get_readonly_uri(self):
224 return self._uri.get_readonly().to_string()
226 def get_verify_cap(self):
227 return self._uri.get_verify_cap()
229 def get_repair_cap(self):
230 if self._node.is_readonly():
234 def get_storage_index(self):
235 return self._uri._filenode_uri.storage_index
237 def check(self, monitor, verify=False):
238 """Perform a file check. See IChecker.check for details."""
239 return self._node.check(monitor, verify)
240 def check_and_repair(self, monitor, verify=False):
241 return self._node.check_and_repair(monitor, verify)
244 """I return a Deferred that fires with a dictionary mapping child
245 name to a tuple of (IFileNode or IDirectoryNode, metadata)."""
248 def has_child(self, name):
249 """I return a Deferred that fires with a boolean, True if there
250 exists a child of the given name, False if not."""
251 assert isinstance(name, unicode)
253 d.addCallback(lambda children: children.has_key(name))
256 def _get(self, children, name):
257 child = children.get(name)
259 raise NoSuchChildError(name)
262 def _get_with_metadata(self, children, name):
263 child = children.get(name)
265 raise NoSuchChildError(name)
269 """I return a Deferred that fires with the named child node,
270 which is either an IFileNode or an IDirectoryNode."""
271 assert isinstance(name, unicode)
273 d.addCallback(self._get, name)
276 def get_child_and_metadata(self, name):
277 """I return a Deferred that fires with the (node, metadata) pair for
278 the named child. The node is either an IFileNode or an
279 IDirectoryNode, and the metadata is a dictionary."""
280 assert isinstance(name, unicode)
282 d.addCallback(self._get_with_metadata, name)
285 def get_metadata_for(self, name):
286 assert isinstance(name, unicode)
288 d.addCallback(lambda children: children[name][1])
291 def set_metadata_for(self, name, metadata):
292 assert isinstance(name, unicode)
293 if self.is_readonly():
294 return defer.fail(NotMutableError())
295 assert isinstance(metadata, dict)
296 s = MetadataSetter(self, name, metadata)
297 d = self._node.modify(s.modify)
298 d.addCallback(lambda res: self)
301 def get_child_at_path(self, path):
302 """Transform a child path into an IDirectoryNode or IFileNode.
304 I perform a recursive series of 'get' operations to find the named
305 descendant node. I return a Deferred that fires with the node, or
306 errbacks with IndexError if the node could not be found.
308 The path can be either a single string (slash-separated) or a list of
311 d = self.get_child_and_metadata_at_path(path)
312 d.addCallback(lambda (node, metadata): node)
315 def get_child_and_metadata_at_path(self, path):
316 """Transform a child path into an IDirectoryNode or IFileNode and
317 a metadata dictionary from the last edge that was traversed.
321 return defer.succeed((self, {}))
322 if isinstance(path, (list, tuple)):
325 path = path.split("/")
327 assert isinstance(p, unicode)
329 remaining_path = path[1:]
331 d = self.get(childname)
332 d.addCallback(lambda node:
333 node.get_child_and_metadata_at_path(remaining_path))
335 d = self.get_child_and_metadata(childname)
338 def set_uri(self, name, child_uri, metadata=None, overwrite=True):
339 """I add a child (by URI) at the specific name. I return a Deferred
340 that fires with the child node when the operation finishes. I will
341 replace any existing child of the same name.
343 The child_uri could be for a file, or for a directory (either
344 read-write or read-only, using a URI that came from get_uri() ).
346 If this directory node is read-only, the Deferred will errback with a
348 precondition(isinstance(name, unicode), name)
349 precondition(isinstance(child_uri, str), child_uri)
350 child_node = self._create_node(child_uri)
351 d = self.set_node(name, child_node, metadata, overwrite)
352 d.addCallback(lambda res: child_node)
355 def set_children(self, entries, overwrite=True):
357 a = Adder(self, overwrite=overwrite)
365 name, child_uri, metadata = e
366 assert isinstance(name, unicode)
367 a.set_node(name, self._create_node(child_uri), metadata)
368 return self._node.modify(a.modify)
370 def set_node(self, name, child, metadata=None, overwrite=True):
371 """I add a child at the specific name. I return a Deferred that fires
372 when the operation finishes. This Deferred will fire with the child
373 node that was just added. I will replace any existing child of the
376 If this directory node is read-only, the Deferred will errback with a
379 precondition(IFilesystemNode.providedBy(child), child)
381 if self.is_readonly():
382 return defer.fail(NotMutableError())
383 assert isinstance(name, unicode)
384 assert IFilesystemNode.providedBy(child), child
385 a = Adder(self, overwrite=overwrite)
386 a.set_node(name, child, metadata)
387 d = self._node.modify(a.modify)
388 d.addCallback(lambda res: child)
391 def set_nodes(self, entries, overwrite=True):
392 if self.is_readonly():
393 return defer.fail(NotMutableError())
394 a = Adder(self, entries, overwrite=overwrite)
395 d = self._node.modify(a.modify)
396 d.addCallback(lambda res: None)
400 def add_file(self, name, uploadable, metadata=None, overwrite=True):
401 """I upload a file (using the given IUploadable), then attach the
402 resulting FileNode to the directory at the given name. I return a
403 Deferred that fires (with the IFileNode of the uploaded file) when
404 the operation completes."""
405 assert isinstance(name, unicode)
406 if self.is_readonly():
407 return defer.fail(NotMutableError())
408 d = self._client.upload(uploadable)
409 d.addCallback(lambda results: results.uri)
410 d.addCallback(self._client.create_node_from_uri)
411 d.addCallback(lambda node:
412 self.set_node(name, node, metadata, overwrite))
415 def delete(self, name):
416 """I remove the child at the specific name. I return a Deferred that
417 fires (with the node just removed) when the operation finishes."""
418 assert isinstance(name, unicode)
419 if self.is_readonly():
420 return defer.fail(NotMutableError())
421 deleter = Deleter(self, name)
422 d = self._node.modify(deleter.modify)
423 d.addCallback(lambda res: deleter.old_child)
426 def create_empty_directory(self, name, overwrite=True):
427 """I create and attach an empty directory at the given name. I return
428 a Deferred that fires (with the new directory node) when the
429 operation finishes."""
430 assert isinstance(name, unicode)
431 if self.is_readonly():
432 return defer.fail(NotMutableError())
433 d = self._client.create_empty_dirnode()
435 entries = [(name, child, None)]
436 a = Adder(self, entries, overwrite=overwrite)
437 d = self._node.modify(a.modify)
438 d.addCallback(lambda res: child)
440 d.addCallback(_created)
443 def move_child_to(self, current_child_name, new_parent,
444 new_child_name=None, overwrite=True):
445 """I take one of my children and move them to a new parent. The child
446 is referenced by name. On the new parent, the child will live under
447 'new_child_name', which defaults to 'current_child_name'. I return a
448 Deferred that fires when the operation finishes."""
449 assert isinstance(current_child_name, unicode)
450 if self.is_readonly() or new_parent.is_readonly():
451 return defer.fail(NotMutableError())
452 if new_child_name is None:
453 new_child_name = current_child_name
454 assert isinstance(new_child_name, unicode)
455 d = self.get(current_child_name)
457 return new_parent.set_node(new_child_name, child,
460 d.addCallback(lambda child: self.delete(current_child_name))
464 def deep_traverse(self, walker):
465 """Perform a recursive walk, using this dirnode as a root, notifying
466 the 'walker' instance of everything I encounter.
468 I call walker.enter_directory(parent, children) once for each dirnode
469 I visit, immediately after retrieving the list of children. I pass in
470 the parent dirnode and the dict of childname->(childnode,metadata).
471 This function should *not* traverse the children: I will do that.
472 enter_directory() is most useful for the deep-stats number that
473 counts how large a directory is.
475 I call walker.add_node(node, path) for each node (both files and
476 directories) I can reach. Most work should be done here.
478 I avoid loops by keeping track of verifier-caps and refusing to call
479 walker.add_node() or traverse a node that I've seen before. This
480 means that any file or directory will only be given to the walker
481 once. If files or directories are referenced multiple times by a
482 directory structure, this may appear to under-count or miss some of
485 I return a Monitor which can be used to wait for the operation to
486 finish, learn about its progress, or cancel the operation.
489 # this is just a tree-walker, except that following each edge
490 # requires a Deferred. We used to use a ConcurrencyLimiter to limit
491 # fanout to 10 simultaneous operations, but the memory load of the
492 # queued operations was excessive (in one case, with 330k dirnodes,
493 # it caused the process to run into the 3.0GB-ish per-process 32bit
494 # linux memory limit, and crashed). So we use a single big Deferred
495 # chain, and do a strict depth-first traversal, one node at a time.
496 # This can be slower, because we aren't pipelining directory reads,
497 # but it brought the memory footprint down by roughly 50%.
500 walker.set_monitor(monitor)
502 found = set([self.get_verify_cap()])
503 d = self._deep_traverse_dirnode(self, [], walker, monitor, found)
504 d.addCallback(lambda ignored: walker.finish())
505 d.addBoth(monitor.finish)
506 d.addErrback(lambda f: None)
510 def _deep_traverse_dirnode(self, node, path, walker, monitor, found):
511 # process this directory, then walk its children
512 monitor.raise_if_cancelled()
513 d = defer.maybeDeferred(walker.add_node, node, path)
514 d.addCallback(lambda ignored: node.list())
515 d.addCallback(self._deep_traverse_dirnode_children, node, path,
516 walker, monitor, found)
519 def _deep_traverse_dirnode_children(self, children, parent, path,
520 walker, monitor, found):
521 monitor.raise_if_cancelled()
522 d = defer.maybeDeferred(walker.enter_directory, parent, children)
523 # we process file-like children first, so we can drop their FileNode
524 # objects as quickly as possible. Tests suggest that a FileNode (held
525 # in the client's nodecache) consumes about 2440 bytes. dirnodes (not
526 # in the nodecache) seem to consume about 2000 bytes.
529 for name, (child, metadata) in children.iteritems():
530 verifier = child.get_verify_cap()
531 # allow LIT files (for which verifier==None) to be processed
532 if (verifier is not None) and (verifier in found):
535 childpath = path + [name]
536 if IDirectoryNode.providedBy(child):
537 dirkids.append( (child, childpath) )
539 filekids.append( (child, childpath) )
540 for (child, childpath) in filekids:
541 d.addCallback(lambda ignored, child=child, childpath=childpath:
542 walker.add_node(child, childpath))
543 for (child, childpath) in dirkids:
544 d.addCallback(lambda ignored, child=child, childpath=childpath:
545 self._deep_traverse_dirnode(child, childpath,
551 def build_manifest(self):
552 """Return a Monitor, with a ['status'] that will be a list of (path,
553 cap) tuples, for all nodes (directories and files) reachable from
555 walker = ManifestWalker(self)
556 return self.deep_traverse(walker)
558 def start_deep_stats(self):
559 # Since deep_traverse tracks verifier caps, we avoid double-counting
560 # children for which we've got both a write-cap and a read-cap
561 return self.deep_traverse(DeepStats(self))
563 def start_deep_check(self, verify=False):
564 return self.deep_traverse(DeepChecker(self, verify, repair=False))
566 def start_deep_check_and_repair(self, verify=False):
567 return self.deep_traverse(DeepChecker(self, verify, repair=True))
572 def __init__(self, origin):
575 for k in ["count-immutable-files",
576 "count-mutable-files",
577 "count-literal-files",
580 "size-immutable-files",
581 #"size-mutable-files",
582 "size-literal-files",
585 "largest-directory-children",
586 "largest-immutable-file",
587 #"largest-mutable-file",
591 for k in ["size-files-histogram"]:
592 self.histograms[k] = {} # maps (min,max) to count
593 self.buckets = [ (0,0), (1,3)]
594 self.root = math.sqrt(10)
596 def set_monitor(self, monitor):
597 self.monitor = monitor
598 monitor.origin_si = self.origin.get_storage_index()
599 monitor.set_status(self.get_results())
601 def add_node(self, node, childpath):
602 if IDirectoryNode.providedBy(node):
603 self.add("count-directories")
604 elif IMutableFileNode.providedBy(node):
605 self.add("count-files")
606 self.add("count-mutable-files")
607 # TODO: update the servermap, compute a size, add it to
608 # size-mutable-files, max it into "largest-mutable-file"
609 elif IFileNode.providedBy(node): # CHK and LIT
610 self.add("count-files")
611 size = node.get_size()
612 self.histogram("size-files-histogram", size)
613 theuri = from_string(node.get_uri())
614 if isinstance(theuri, LiteralFileURI):
615 self.add("count-literal-files")
616 self.add("size-literal-files", size)
618 self.add("count-immutable-files")
619 self.add("size-immutable-files", size)
620 self.max("largest-immutable-file", size)
622 def enter_directory(self, parent, children):
623 dirsize_bytes = parent.get_size()
624 dirsize_children = len(children)
625 self.add("size-directories", dirsize_bytes)
626 self.max("largest-directory", dirsize_bytes)
627 self.max("largest-directory-children", dirsize_children)
629 def add(self, key, value=1):
630 self.stats[key] += value
632 def max(self, key, value):
633 self.stats[key] = max(self.stats[key], value)
635 def which_bucket(self, size):
636 # return (min,max) such that min <= size <= max
637 # values are from the set (0,0), (1,3), (4,10), (11,31), (32,100),
638 # (101,316), (317, 1000), etc: two per decade
642 if i >= len(self.buckets):
644 new_lower = self.buckets[i-1][1]+1
645 new_upper = int(mathutil.next_power_of_k(new_lower, self.root))
646 self.buckets.append( (new_lower, new_upper) )
647 maybe = self.buckets[i]
648 if maybe[0] <= size <= maybe[1]:
652 def histogram(self, key, size):
653 bucket = self.which_bucket(size)
654 h = self.histograms[key]
659 def get_results(self):
660 stats = self.stats.copy()
661 for key in self.histograms:
662 h = self.histograms[key]
663 out = [ (bucket[0], bucket[1], h[bucket]) for bucket in h ]
669 return self.get_results()
671 class ManifestWalker(DeepStats):
672 def __init__(self, origin):
673 DeepStats.__init__(self, origin)
675 self.storage_index_strings = set()
676 self.verifycaps = set()
678 def add_node(self, node, path):
679 self.manifest.append( (tuple(path), node.get_uri()) )
680 si = node.get_storage_index()
682 self.storage_index_strings.add(base32.b2a(si))
683 v = node.get_verify_cap()
685 self.verifycaps.add(v.to_string())
686 return DeepStats.add_node(self, node, path)
688 def get_results(self):
689 stats = DeepStats.get_results(self)
690 return {"manifest": self.manifest,
691 "verifycaps": self.verifycaps,
692 "storage-index": self.storage_index_strings,
698 def __init__(self, root, verify, repair):
699 root_si = root.get_storage_index()
700 self._lp = log.msg(format="deep-check starting (%(si)s),"
701 " verify=%(verify)s, repair=%(repair)s",
702 si=base32.b2a(root_si), verify=verify, repair=repair)
703 self._verify = verify
704 self._repair = repair
706 self._results = DeepCheckAndRepairResults(root_si)
708 self._results = DeepCheckResults(root_si)
709 self._stats = DeepStats(root)
711 def set_monitor(self, monitor):
712 self.monitor = monitor
713 monitor.set_status(self._results)
715 def add_node(self, node, childpath):
717 d = node.check_and_repair(self.monitor, self._verify)
718 d.addCallback(self._results.add_check_and_repair, childpath)
720 d = node.check(self.monitor, self._verify)
721 d.addCallback(self._results.add_check, childpath)
722 d.addCallback(lambda ignored: self._stats.add_node(node, childpath))
725 def enter_directory(self, parent, children):
726 return self._stats.enter_directory(parent, children)
729 log.msg("deep-check done", parent=self._lp)
730 self._results.update_stats(self._stats.get_results())
734 # use client.create_dirnode() to make one of these