4 from zope.interface import implements
5 from twisted.internet import defer
7 from allmydata.mutable.common import NotMutableError
8 from allmydata.mutable.filenode import MutableFileNode
9 from allmydata.interfaces import IMutableFileNode, IDirectoryNode,\
10 IURI, IFileNode, IMutableFileURI, IFilesystemNode, \
11 ExistingChildError, NoSuchChildError, ICheckable, IDeepCheckable
12 from allmydata.check_results import DeepCheckResults, \
13 DeepCheckAndRepairResults
14 from allmydata.monitor import Monitor
15 from allmydata.util import hashutil, mathutil, base32, log
16 from allmydata.util.assertutil import _assert, precondition
17 from allmydata.util.hashutil import netstring
18 from allmydata.util.limiter import ConcurrencyLimiter
19 from allmydata.util.netstring import split_netstring
20 from allmydata.uri import NewDirectoryURI, LiteralFileURI, from_string
21 from pycryptopp.cipher.aes import AES
24 def __init__(self, node, name, must_exist=True):
27 self.must_exist = True
28 def modify(self, old_contents, servermap, first_time):
29 children = self.node._unpack_contents(old_contents)
30 if self.name not in children:
31 if first_time and self.must_exist:
32 raise NoSuchChildError(self.name)
35 self.old_child, metadata = children[self.name]
36 del children[self.name]
37 new_contents = self.node._pack_contents(children)
41 def __init__(self, node, name, metadata):
44 self.metadata = metadata
46 def modify(self, old_contents, servermap, first_time):
47 children = self.node._unpack_contents(old_contents)
48 if self.name not in children:
49 raise NoSuchChildError(self.name)
50 children[self.name] = (children[self.name][0], self.metadata)
51 new_contents = self.node._pack_contents(children)
56 def __init__(self, node, entries=None, overwrite=True):
60 self.entries = entries
61 self.overwrite = overwrite
63 def set_node(self, name, node, metadata):
64 precondition(isinstance(name, unicode), name)
65 precondition(IFilesystemNode.providedBy(node), node)
66 self.entries.append( [name, node, metadata] )
68 def modify(self, old_contents, servermap, first_time):
69 children = self.node._unpack_contents(old_contents)
71 for e in self.entries:
77 name, child, new_metadata = e
78 assert _assert(IFilesystemNode.providedBy(child), child)
79 assert isinstance(name, unicode)
81 if not self.overwrite:
82 raise ExistingChildError("child '%s' already exists" % name)
83 metadata = children[name][1].copy()
85 metadata = {"ctime": now,
87 if new_metadata is None:
89 if "ctime" not in metadata:
90 metadata["ctime"] = now
91 metadata["mtime"] = now
94 metadata = new_metadata.copy()
95 children[name] = (child, metadata)
96 new_contents = self.node._pack_contents(children)
99 class NewDirectoryNode:
100 implements(IDirectoryNode, ICheckable, IDeepCheckable)
101 filenode_class = MutableFileNode
103 def __init__(self, client):
104 self._client = client
105 self._most_recent_size = None
108 return "<%s %s %s>" % (self.__class__.__name__, self.is_readonly() and "RO" or "RW", hasattr(self, '_uri') and self._uri.abbrev())
109 def init_from_uri(self, myuri):
110 self._uri = IURI(myuri)
111 self._node = self.filenode_class(self._client)
112 self._node.init_from_uri(self._uri.get_filenode_uri())
115 def create(self, keypair_generator=None):
117 Returns a deferred that eventually fires with self once the directory
118 has been created (distributed across a set of storage servers).
120 # first we create a MutableFileNode with empty_contents, then use its
121 # URI to create our own.
122 self._node = self.filenode_class(self._client)
123 empty_contents = self._pack_contents({})
124 d = self._node.create(empty_contents, keypair_generator)
125 d.addCallback(self._filenode_created)
127 def _filenode_created(self, res):
128 self._uri = NewDirectoryURI(IMutableFileURI(self._node.get_uri()))
132 # return the size of our backing mutable file, in bytes, if we've
134 return self._most_recent_size
136 def _set_size(self, data):
137 self._most_recent_size = len(data)
141 d = self._node.download_best_version()
142 d.addCallback(self._set_size)
143 d.addCallback(self._unpack_contents)
146 def _encrypt_rwcap(self, rwcap):
147 assert isinstance(rwcap, str)
149 key = hashutil.mutable_rwcap_key_hash(IV, self._node.get_writekey())
151 crypttext = cryptor.process(rwcap)
152 mac = hashutil.hmac(key, IV + crypttext)
153 assert len(mac) == 32
154 return IV + crypttext + mac
155 # The MAC is not checked by readers in Tahoe >= 1.3.0, but we still produce it for the sake of older readers.
157 def _decrypt_rwcapdata(self, encwrcap):
159 crypttext = encwrcap[16:-32]
160 key = hashutil.mutable_rwcap_key_hash(IV, self._node.get_writekey())
162 plaintext = cryptor.process(crypttext)
165 def _create_node(self, child_uri):
166 return self._client.create_node_from_uri(child_uri)
168 def _unpack_contents(self, data):
169 # the directory is serialized as a list of netstrings, one per child.
170 # Each child is serialized as a list of four netstrings: (name,
171 # rocap, rwcap, metadata), in which the name,rocap,metadata are in
172 # cleartext. The 'name' is UTF-8 encoded. The rwcap is formatted as:
173 # pack("16ss32s", iv, AES(H(writekey+iv), plaintextrwcap), mac)
174 assert isinstance(data, str)
175 # an empty directory is serialized as an empty string
178 writeable = not self.is_readonly()
181 entry, data = split_netstring(data, 1, True)
182 name, rocap, rwcapdata, metadata_s = split_netstring(entry, 4)
183 name = name.decode("utf-8")
185 rwcap = self._decrypt_rwcapdata(rwcapdata)
186 child = self._create_node(rwcap)
188 child = self._create_node(rocap)
189 metadata = simplejson.loads(metadata_s)
190 assert isinstance(metadata, dict)
191 children[name] = (child, metadata)
194 def _pack_contents(self, children):
195 # expects children in the same format as _unpack_contents
196 assert isinstance(children, dict)
198 for name in sorted(children.keys()):
199 child, metadata = children[name]
200 assert isinstance(name, unicode)
201 assert (IFileNode.providedBy(child)
202 or IMutableFileNode.providedBy(child)
203 or IDirectoryNode.providedBy(child)), (name,child)
204 assert isinstance(metadata, dict)
205 rwcap = child.get_uri() # might be RO if the child is not writeable
206 assert isinstance(rwcap, str), rwcap
207 rocap = child.get_readonly_uri()
208 assert isinstance(rocap, str), rocap
209 entry = "".join([netstring(name.encode("utf-8")),
211 netstring(self._encrypt_rwcap(rwcap)),
212 netstring(simplejson.dumps(metadata))])
213 entries.append(netstring(entry))
214 return "".join(entries)
216 def is_readonly(self):
217 return self._node.is_readonly()
218 def is_mutable(self):
219 return self._node.is_mutable()
222 return self._uri.to_string()
224 def get_readonly_uri(self):
225 return self._uri.get_readonly().to_string()
227 def get_verify_cap(self):
228 return self._uri.get_verify_cap()
230 def get_storage_index(self):
231 return self._uri._filenode_uri.storage_index
233 def check(self, monitor, verify=False):
234 """Perform a file check. See IChecker.check for details."""
235 return self._node.check(monitor, verify)
236 def check_and_repair(self, monitor, verify=False):
237 return self._node.check_and_repair(monitor, verify)
240 """I return a Deferred that fires with a dictionary mapping child
241 name to a tuple of (IFileNode or IDirectoryNode, metadata)."""
244 def has_child(self, name):
245 """I return a Deferred that fires with a boolean, True if there
246 exists a child of the given name, False if not."""
247 assert isinstance(name, unicode)
249 d.addCallback(lambda children: children.has_key(name))
252 def _get(self, children, name):
253 child = children.get(name)
255 raise NoSuchChildError(name)
258 def _get_with_metadata(self, children, name):
259 child = children.get(name)
261 raise NoSuchChildError(name)
265 """I return a Deferred that fires with the named child node,
266 which is either an IFileNode or an IDirectoryNode."""
267 assert isinstance(name, unicode)
269 d.addCallback(self._get, name)
272 def get_child_and_metadata(self, name):
273 """I return a Deferred that fires with the (node, metadata) pair for
274 the named child. The node is either an IFileNode or an
275 IDirectoryNode, and the metadata is a dictionary."""
276 assert isinstance(name, unicode)
278 d.addCallback(self._get_with_metadata, name)
281 def get_metadata_for(self, name):
282 assert isinstance(name, unicode)
284 d.addCallback(lambda children: children[name][1])
287 def set_metadata_for(self, name, metadata):
288 assert isinstance(name, unicode)
289 if self.is_readonly():
290 return defer.fail(NotMutableError())
291 assert isinstance(metadata, dict)
292 s = MetadataSetter(self, name, metadata)
293 d = self._node.modify(s.modify)
294 d.addCallback(lambda res: self)
297 def get_child_at_path(self, path):
298 """Transform a child path into an IDirectoryNode or IFileNode.
300 I perform a recursive series of 'get' operations to find the named
301 descendant node. I return a Deferred that fires with the node, or
302 errbacks with IndexError if the node could not be found.
304 The path can be either a single string (slash-separated) or a list of
307 d = self.get_child_and_metadata_at_path(path)
308 d.addCallback(lambda (node, metadata): node)
311 def get_child_and_metadata_at_path(self, path):
312 """Transform a child path into an IDirectoryNode or IFileNode and
313 a metadata dictionary from the last edge that was traversed.
317 return defer.succeed((self, {}))
318 if isinstance(path, (list, tuple)):
321 path = path.split("/")
323 assert isinstance(p, unicode)
325 remaining_path = path[1:]
327 d = self.get(childname)
328 d.addCallback(lambda node:
329 node.get_child_and_metadata_at_path(remaining_path))
331 d = self.get_child_and_metadata(childname)
334 def set_uri(self, name, child_uri, metadata=None, overwrite=True):
335 """I add a child (by URI) at the specific name. I return a Deferred
336 that fires with the child node when the operation finishes. I will
337 replace any existing child of the same name.
339 The child_uri could be for a file, or for a directory (either
340 read-write or read-only, using a URI that came from get_uri() ).
342 If this directory node is read-only, the Deferred will errback with a
344 precondition(isinstance(name, unicode), name)
345 precondition(isinstance(child_uri, str), child_uri)
346 child_node = self._create_node(child_uri)
347 d = self.set_node(name, child_node, metadata, overwrite)
348 d.addCallback(lambda res: child_node)
351 def set_children(self, entries, overwrite=True):
353 a = Adder(self, overwrite=overwrite)
361 name, child_uri, metadata = e
362 assert isinstance(name, unicode)
363 a.set_node(name, self._create_node(child_uri), metadata)
364 return self._node.modify(a.modify)
366 def set_node(self, name, child, metadata=None, overwrite=True):
367 """I add a child at the specific name. I return a Deferred that fires
368 when the operation finishes. This Deferred will fire with the child
369 node that was just added. I will replace any existing child of the
372 If this directory node is read-only, the Deferred will errback with a
375 precondition(IFilesystemNode.providedBy(child), child)
377 if self.is_readonly():
378 return defer.fail(NotMutableError())
379 assert isinstance(name, unicode)
380 assert IFilesystemNode.providedBy(child), child
381 a = Adder(self, overwrite=overwrite)
382 a.set_node(name, child, metadata)
383 d = self._node.modify(a.modify)
384 d.addCallback(lambda res: child)
387 def set_nodes(self, entries, overwrite=True):
388 if self.is_readonly():
389 return defer.fail(NotMutableError())
390 a = Adder(self, entries, overwrite=overwrite)
391 d = self._node.modify(a.modify)
392 d.addCallback(lambda res: None)
396 def add_file(self, name, uploadable, metadata=None, overwrite=True):
397 """I upload a file (using the given IUploadable), then attach the
398 resulting FileNode to the directory at the given name. I return a
399 Deferred that fires (with the IFileNode of the uploaded file) when
400 the operation completes."""
401 assert isinstance(name, unicode)
402 if self.is_readonly():
403 return defer.fail(NotMutableError())
404 d = self._client.upload(uploadable)
405 d.addCallback(lambda results: results.uri)
406 d.addCallback(self._client.create_node_from_uri)
407 d.addCallback(lambda node:
408 self.set_node(name, node, metadata, overwrite))
411 def delete(self, name):
412 """I remove the child at the specific name. I return a Deferred that
413 fires (with the node just removed) when the operation finishes."""
414 assert isinstance(name, unicode)
415 if self.is_readonly():
416 return defer.fail(NotMutableError())
417 deleter = Deleter(self, name)
418 d = self._node.modify(deleter.modify)
419 d.addCallback(lambda res: deleter.old_child)
422 def create_empty_directory(self, name, overwrite=True):
423 """I create and attach an empty directory at the given name. I return
424 a Deferred that fires (with the new directory node) when the
425 operation finishes."""
426 assert isinstance(name, unicode)
427 if self.is_readonly():
428 return defer.fail(NotMutableError())
429 d = self._client.create_empty_dirnode()
431 entries = [(name, child, None)]
432 a = Adder(self, entries, overwrite=overwrite)
433 d = self._node.modify(a.modify)
434 d.addCallback(lambda res: child)
436 d.addCallback(_created)
439 def move_child_to(self, current_child_name, new_parent,
440 new_child_name=None, overwrite=True):
441 """I take one of my children and move them to a new parent. The child
442 is referenced by name. On the new parent, the child will live under
443 'new_child_name', which defaults to 'current_child_name'. I return a
444 Deferred that fires when the operation finishes."""
445 assert isinstance(current_child_name, unicode)
446 if self.is_readonly() or new_parent.is_readonly():
447 return defer.fail(NotMutableError())
448 if new_child_name is None:
449 new_child_name = current_child_name
450 assert isinstance(new_child_name, unicode)
451 d = self.get(current_child_name)
453 return new_parent.set_node(new_child_name, child,
456 d.addCallback(lambda child: self.delete(current_child_name))
460 def deep_traverse(self, walker):
461 """Perform a recursive walk, using this dirnode as a root, notifying
462 the 'walker' instance of everything I encounter.
464 I call walker.enter_directory(parent, children) once for each dirnode
465 I visit, immediately after retrieving the list of children. I pass in
466 the parent dirnode and the dict of childname->(childnode,metadata).
467 This function should *not* traverse the children: I will do that.
468 enter_directory() is most useful for the deep-stats number that
469 counts how large a directory is.
471 I call walker.add_node(node, path) for each node (both files and
472 directories) I can reach. Most work should be done here.
474 I avoid loops by keeping track of verifier-caps and refusing to call
475 each() or traverse a node that I've seen before.
477 I return a Deferred that will fire with the value of walker.finish().
480 # this is just a tree-walker, except that following each edge
481 # requires a Deferred. We use a ConcurrencyLimiter to make sure the
482 # fan-out doesn't cause problems.
485 walker.set_monitor(monitor)
487 found = set([self.get_verify_cap()])
488 limiter = ConcurrencyLimiter(10)
489 d = self._deep_traverse_dirnode(self, [],
490 walker, monitor, found, limiter)
491 d.addCallback(lambda ignored: walker.finish())
492 d.addBoth(monitor.finish)
493 d.addErrback(lambda f: None)
497 def _deep_traverse_dirnode(self, node, path,
498 walker, monitor, found, limiter):
499 # process this directory, then walk its children
500 monitor.raise_if_cancelled()
501 d = limiter.add(walker.add_node, node, path)
502 d.addCallback(lambda ignored: limiter.add(node.list))
503 d.addCallback(self._deep_traverse_dirnode_children, node, path,
504 walker, monitor, found, limiter)
507 def _deep_traverse_dirnode_children(self, children, parent, path,
508 walker, monitor, found, limiter):
509 monitor.raise_if_cancelled()
510 dl = [limiter.add(walker.enter_directory, parent, children)]
511 for name, (child, metadata) in children.iteritems():
512 verifier = child.get_verify_cap()
513 # allow LIT files (for which verifier==None) to be processed
514 if (verifier is not None) and (verifier in found):
517 childpath = path + [name]
518 if IDirectoryNode.providedBy(child):
519 dl.append(self._deep_traverse_dirnode(child, childpath,
523 dl.append(limiter.add(walker.add_node, child, childpath))
524 return defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True)
527 def build_manifest(self):
528 """Return a Monitor, with a ['status'] that will be a list of (path,
529 cap) tuples, for all nodes (directories and files) reachable from
531 walker = ManifestWalker(self)
532 return self.deep_traverse(walker)
534 def start_deep_stats(self):
535 # Since deep_traverse tracks verifier caps, we avoid double-counting
536 # children for which we've got both a write-cap and a read-cap
537 return self.deep_traverse(DeepStats(self))
539 def start_deep_check(self, verify=False):
540 return self.deep_traverse(DeepChecker(self, verify, repair=False))
542 def start_deep_check_and_repair(self, verify=False):
543 return self.deep_traverse(DeepChecker(self, verify, repair=True))
548 def __init__(self, origin):
551 for k in ["count-immutable-files",
552 "count-mutable-files",
553 "count-literal-files",
556 "size-immutable-files",
557 #"size-mutable-files",
558 "size-literal-files",
561 "largest-directory-children",
562 "largest-immutable-file",
563 #"largest-mutable-file",
567 for k in ["size-files-histogram"]:
568 self.histograms[k] = {} # maps (min,max) to count
569 self.buckets = [ (0,0), (1,3)]
570 self.root = math.sqrt(10)
572 def set_monitor(self, monitor):
573 self.monitor = monitor
574 monitor.origin_si = self.origin.get_storage_index()
575 monitor.set_status(self.get_results())
577 def add_node(self, node, childpath):
578 if IDirectoryNode.providedBy(node):
579 self.add("count-directories")
580 elif IMutableFileNode.providedBy(node):
581 self.add("count-files")
582 self.add("count-mutable-files")
583 # TODO: update the servermap, compute a size, add it to
584 # size-mutable-files, max it into "largest-mutable-file"
585 elif IFileNode.providedBy(node): # CHK and LIT
586 self.add("count-files")
587 size = node.get_size()
588 self.histogram("size-files-histogram", size)
589 theuri = from_string(node.get_uri())
590 if isinstance(theuri, LiteralFileURI):
591 self.add("count-literal-files")
592 self.add("size-literal-files", size)
594 self.add("count-immutable-files")
595 self.add("size-immutable-files", size)
596 self.max("largest-immutable-file", size)
598 def enter_directory(self, parent, children):
599 dirsize_bytes = parent.get_size()
600 dirsize_children = len(children)
601 self.add("size-directories", dirsize_bytes)
602 self.max("largest-directory", dirsize_bytes)
603 self.max("largest-directory-children", dirsize_children)
605 def add(self, key, value=1):
606 self.stats[key] += value
608 def max(self, key, value):
609 self.stats[key] = max(self.stats[key], value)
611 def which_bucket(self, size):
612 # return (min,max) such that min <= size <= max
613 # values are from the set (0,0), (1,3), (4,10), (11,31), (32,100),
614 # (101,316), (317, 1000), etc: two per decade
618 if i >= len(self.buckets):
620 new_lower = self.buckets[i-1][1]+1
621 new_upper = int(mathutil.next_power_of_k(new_lower, self.root))
622 self.buckets.append( (new_lower, new_upper) )
623 maybe = self.buckets[i]
624 if maybe[0] <= size <= maybe[1]:
628 def histogram(self, key, size):
629 bucket = self.which_bucket(size)
630 h = self.histograms[key]
635 def get_results(self):
636 stats = self.stats.copy()
637 for key in self.histograms:
638 h = self.histograms[key]
639 out = [ (bucket[0], bucket[1], h[bucket]) for bucket in h ]
645 return self.get_results()
647 class ManifestWalker(DeepStats):
648 def __init__(self, origin):
649 DeepStats.__init__(self, origin)
651 self.storage_index_strings = set()
652 self.verifycaps = set()
654 def add_node(self, node, path):
655 self.manifest.append( (tuple(path), node.get_uri()) )
656 si = node.get_storage_index()
658 self.storage_index_strings.add(base32.b2a(si))
659 v = node.get_verify_cap()
661 self.verifycaps.add(v.to_string())
662 return DeepStats.add_node(self, node, path)
664 def get_results(self):
665 stats = DeepStats.get_results(self)
666 return {"manifest": self.manifest,
667 "verifycaps": self.verifycaps,
668 "storage-index": self.storage_index_strings,
674 def __init__(self, root, verify, repair):
675 root_si = root.get_storage_index()
676 self._lp = log.msg(format="deep-check starting (%(si)s),"
677 " verify=%(verify)s, repair=%(repair)s",
678 si=base32.b2a(root_si), verify=verify, repair=repair)
679 self._verify = verify
680 self._repair = repair
682 self._results = DeepCheckAndRepairResults(root_si)
684 self._results = DeepCheckResults(root_si)
685 self._stats = DeepStats(root)
687 def set_monitor(self, monitor):
688 self.monitor = monitor
689 monitor.set_status(self._results)
691 def add_node(self, node, childpath):
693 d = node.check_and_repair(self.monitor, self._verify)
694 d.addCallback(self._results.add_check_and_repair, childpath)
696 d = node.check(self.monitor, self._verify)
697 d.addCallback(self._results.add_check, childpath)
698 d.addCallback(lambda ignored: self._stats.add_node(node, childpath))
701 def enter_directory(self, parent, children):
702 return self._stats.enter_directory(parent, children)
705 log.msg("deep-check done", parent=self._lp)
706 self._results.update_stats(self._stats.get_results())
710 # use client.create_dirnode() to make one of these