4 from zope.interface import implements
5 from twisted.internet import defer
7 from allmydata.mutable.common import NotMutableError
8 from allmydata.mutable.node import MutableFileNode
9 from allmydata.interfaces import IMutableFileNode, IDirectoryNode,\
10 IURI, IFileNode, IMutableFileURI, IFilesystemNode, \
11 ExistingChildError, NoSuchChildError, ICheckable, IDeepCheckable
12 from allmydata.checker_results import DeepCheckResults, \
13 DeepCheckAndRepairResults
14 from allmydata.monitor import Monitor
15 from allmydata.util import hashutil, mathutil, base32, log
16 from allmydata.util.hashutil import netstring
17 from allmydata.util.limiter import ConcurrencyLimiter
18 from allmydata.util.netstring import split_netstring
19 from allmydata.uri import NewDirectoryURI, LiteralFileURI, from_string
20 from pycryptopp.cipher.aes import AES
23 def __init__(self, node, name, must_exist=True):
26 self.must_exist = True
27 def modify(self, old_contents, servermap, first_time):
28 children = self.node._unpack_contents(old_contents)
29 if self.name not in children:
31 raise NoSuchChildError(self.name)
34 self.old_child, metadata = children[self.name]
35 del children[self.name]
36 new_contents = self.node._pack_contents(children)
40 def __init__(self, node, name, metadata):
43 self.metadata = metadata
45 def modify(self, old_contents, servermap, first_time):
46 children = self.node._unpack_contents(old_contents)
47 if self.name not in children:
48 raise NoSuchChildError(self.name)
49 children[self.name] = (children[self.name][0], self.metadata)
50 new_contents = self.node._pack_contents(children)
55 def __init__(self, node, entries=None, overwrite=True):
59 self.entries = entries
60 self.overwrite = overwrite
62 def set_node(self, name, node, metadata):
63 self.entries.append( [name, node, metadata] )
65 def modify(self, old_contents, servermap, first_time):
66 children = self.node._unpack_contents(old_contents)
68 for e in self.entries:
74 name, child, new_metadata = e
75 assert isinstance(name, unicode)
77 if not self.overwrite:
78 raise ExistingChildError("child '%s' already exists" % name)
79 metadata = children[name][1].copy()
81 metadata = {"ctime": now,
83 if new_metadata is None:
85 if "ctime" not in metadata:
86 metadata["ctime"] = now
87 metadata["mtime"] = now
90 metadata = new_metadata.copy()
91 children[name] = (child, metadata)
92 new_contents = self.node._pack_contents(children)
95 class NewDirectoryNode:
96 implements(IDirectoryNode, ICheckable, IDeepCheckable)
97 filenode_class = MutableFileNode
99 def __init__(self, client):
100 self._client = client
101 self._most_recent_size = None
104 return "<%s %s %s>" % (self.__class__.__name__, self.is_readonly() and "RO" or "RW", hasattr(self, '_uri') and self._uri.abbrev())
105 def init_from_uri(self, myuri):
106 self._uri = IURI(myuri)
107 self._node = self.filenode_class(self._client)
108 self._node.init_from_uri(self._uri.get_filenode_uri())
111 def create(self, keypair_generator=None):
113 Returns a deferred that eventually fires with self once the directory
114 has been created (distributed across a set of storage servers).
116 # first we create a MutableFileNode with empty_contents, then use its
117 # URI to create our own.
118 self._node = self.filenode_class(self._client)
119 empty_contents = self._pack_contents({})
120 d = self._node.create(empty_contents, keypair_generator)
121 d.addCallback(self._filenode_created)
123 def _filenode_created(self, res):
124 self._uri = NewDirectoryURI(IMutableFileURI(self._node.get_uri()))
128 # return the size of our backing mutable file, in bytes, if we've
130 return self._most_recent_size
132 def _set_size(self, data):
133 self._most_recent_size = len(data)
137 d = self._node.download_best_version()
138 d.addCallback(self._set_size)
139 d.addCallback(self._unpack_contents)
142 def _encrypt_rwcap(self, rwcap):
143 assert isinstance(rwcap, str)
145 key = hashutil.mutable_rwcap_key_hash(IV, self._node.get_writekey())
147 crypttext = cryptor.process(rwcap)
148 mac = hashutil.hmac(key, IV + crypttext)
149 assert len(mac) == 32
150 return IV + crypttext + mac
152 def _decrypt_rwcapdata(self, encwrcap):
154 crypttext = encwrcap[16:-32]
156 key = hashutil.mutable_rwcap_key_hash(IV, self._node.get_writekey())
157 if mac != hashutil.hmac(key, IV+crypttext):
158 raise hashutil.IntegrityCheckError("HMAC does not match, crypttext is corrupted")
160 plaintext = cryptor.process(crypttext)
163 def _create_node(self, child_uri):
164 return self._client.create_node_from_uri(child_uri)
166 def _unpack_contents(self, data):
167 # the directory is serialized as a list of netstrings, one per child.
168 # Each child is serialized as a list of four netstrings: (name,
169 # rocap, rwcap, metadata), in which the name,rocap,metadata are in
170 # cleartext. The 'name' is UTF-8 encoded. The rwcap is formatted as:
171 # pack("16ss32s", iv, AES(H(writekey+iv), plaintextrwcap), mac)
172 assert isinstance(data, str)
173 # an empty directory is serialized as an empty string
176 writeable = not self.is_readonly()
179 entry, data = split_netstring(data, 1, True)
180 name, rocap, rwcapdata, metadata_s = split_netstring(entry, 4)
181 name = name.decode("utf-8")
183 rwcap = self._decrypt_rwcapdata(rwcapdata)
184 child = self._create_node(rwcap)
186 child = self._create_node(rocap)
187 metadata = simplejson.loads(metadata_s)
188 assert isinstance(metadata, dict)
189 children[name] = (child, metadata)
192 def _pack_contents(self, children):
193 # expects children in the same format as _unpack_contents
194 assert isinstance(children, dict)
196 for name in sorted(children.keys()):
197 child, metadata = children[name]
198 assert isinstance(name, unicode)
199 assert (IFileNode.providedBy(child)
200 or IMutableFileNode.providedBy(child)
201 or IDirectoryNode.providedBy(child)), (name,child)
202 assert isinstance(metadata, dict)
203 rwcap = child.get_uri() # might be RO if the child is not writeable
204 rocap = child.get_readonly_uri()
205 assert isinstance(rocap, str), rocap
206 assert isinstance(rwcap, str), rwcap
207 entry = "".join([netstring(name.encode("utf-8")),
209 netstring(self._encrypt_rwcap(rwcap)),
210 netstring(simplejson.dumps(metadata))])
211 entries.append(netstring(entry))
212 return "".join(entries)
214 def is_readonly(self):
215 return self._node.is_readonly()
216 def is_mutable(self):
217 return self._node.is_mutable()
220 return self._uri.to_string()
222 def get_readonly_uri(self):
223 return self._uri.get_readonly().to_string()
225 def get_verifier(self):
226 return self._uri.get_verifier()
228 def get_storage_index(self):
229 return self._uri._filenode_uri.storage_index
231 def check(self, monitor, verify=False):
232 """Perform a file check. See IChecker.check for details."""
233 return self._node.check(monitor, verify)
234 def check_and_repair(self, monitor, verify=False):
235 return self._node.check_and_repair(monitor, verify)
238 """I return a Deferred that fires with a dictionary mapping child
239 name to a tuple of (IFileNode or IDirectoryNode, metadata)."""
242 def has_child(self, name):
243 """I return a Deferred that fires with a boolean, True if there
244 exists a child of the given name, False if not."""
245 assert isinstance(name, unicode)
247 d.addCallback(lambda children: children.has_key(name))
250 def _get(self, children, name):
251 child = children.get(name)
253 raise NoSuchChildError(name)
256 def _get_with_metadata(self, children, name):
257 child = children.get(name)
259 raise NoSuchChildError(name)
263 """I return a Deferred that fires with the named child node,
264 which is either an IFileNode or an IDirectoryNode."""
265 assert isinstance(name, unicode)
267 d.addCallback(self._get, name)
270 def get_child_and_metadata(self, name):
271 """I return a Deferred that fires with the (node, metadata) pair for
272 the named child. The node is either an IFileNode or an
273 IDirectoryNode, and the metadata is a dictionary."""
274 assert isinstance(name, unicode)
276 d.addCallback(self._get_with_metadata, name)
279 def get_metadata_for(self, name):
280 assert isinstance(name, unicode)
282 d.addCallback(lambda children: children[name][1])
285 def set_metadata_for(self, name, metadata):
286 assert isinstance(name, unicode)
287 if self.is_readonly():
288 return defer.fail(NotMutableError())
289 assert isinstance(metadata, dict)
290 s = MetadataSetter(self, name, metadata)
291 d = self._node.modify(s.modify)
292 d.addCallback(lambda res: self)
295 def get_child_at_path(self, path):
296 """Transform a child path into an IDirectoryNode or IFileNode.
298 I perform a recursive series of 'get' operations to find the named
299 descendant node. I return a Deferred that fires with the node, or
300 errbacks with IndexError if the node could not be found.
302 The path can be either a single string (slash-separated) or a list of
305 d = self.get_child_and_metadata_at_path(path)
306 d.addCallback(lambda (node, metadata): node)
309 def get_child_and_metadata_at_path(self, path):
310 """Transform a child path into an IDirectoryNode or IFileNode and
311 a metadata dictionary from the last edge that was traversed.
315 return defer.succeed((self, {}))
316 if isinstance(path, (list, tuple)):
319 path = path.split("/")
321 assert isinstance(p, unicode)
323 remaining_path = path[1:]
325 d = self.get(childname)
326 d.addCallback(lambda node:
327 node.get_child_and_metadata_at_path(remaining_path))
329 d = self.get_child_and_metadata(childname)
332 def set_uri(self, name, child_uri, metadata=None, overwrite=True):
333 """I add a child (by URI) at the specific name. I return a Deferred
334 that fires with the child node when the operation finishes. I will
335 replace any existing child of the same name.
337 The child_uri could be for a file, or for a directory (either
338 read-write or read-only, using a URI that came from get_uri() ).
340 If this directory node is read-only, the Deferred will errback with a
342 assert isinstance(name, unicode)
343 child_node = self._create_node(child_uri)
344 d = self.set_node(name, child_node, metadata, overwrite)
345 d.addCallback(lambda res: child_node)
348 def set_children(self, entries, overwrite=True):
350 a = Adder(self, overwrite=overwrite)
358 name, child_uri, metadata = e
359 assert isinstance(name, unicode)
360 a.set_node(name, self._create_node(child_uri), metadata)
361 return self._node.modify(a.modify)
363 def set_node(self, name, child, metadata=None, overwrite=True):
364 """I add a child at the specific name. I return a Deferred that fires
365 when the operation finishes. This Deferred will fire with the child
366 node that was just added. I will replace any existing child of the
369 If this directory node is read-only, the Deferred will errback with a
372 if self.is_readonly():
373 return defer.fail(NotMutableError())
374 assert isinstance(name, unicode)
375 assert IFilesystemNode.providedBy(child), child
376 a = Adder(self, overwrite=overwrite)
377 a.set_node(name, child, metadata)
378 d = self._node.modify(a.modify)
379 d.addCallback(lambda res: child)
382 def set_nodes(self, entries, overwrite=True):
383 if self.is_readonly():
384 return defer.fail(NotMutableError())
385 a = Adder(self, entries, overwrite=overwrite)
386 d = self._node.modify(a.modify)
387 d.addCallback(lambda res: None)
391 def add_file(self, name, uploadable, metadata=None, overwrite=True):
392 """I upload a file (using the given IUploadable), then attach the
393 resulting FileNode to the directory at the given name. I return a
394 Deferred that fires (with the IFileNode of the uploaded file) when
395 the operation completes."""
396 assert isinstance(name, unicode)
397 if self.is_readonly():
398 return defer.fail(NotMutableError())
399 d = self._client.upload(uploadable)
400 d.addCallback(lambda results: results.uri)
401 d.addCallback(self._client.create_node_from_uri)
402 d.addCallback(lambda node:
403 self.set_node(name, node, metadata, overwrite))
406 def delete(self, name):
407 """I remove the child at the specific name. I return a Deferred that
408 fires (with the node just removed) when the operation finishes."""
409 assert isinstance(name, unicode)
410 if self.is_readonly():
411 return defer.fail(NotMutableError())
412 deleter = Deleter(self, name)
413 d = self._node.modify(deleter.modify)
414 d.addCallback(lambda res: deleter.old_child)
417 def create_empty_directory(self, name, overwrite=True):
418 """I create and attach an empty directory at the given name. I return
419 a Deferred that fires (with the new directory node) when the
420 operation finishes."""
421 assert isinstance(name, unicode)
422 if self.is_readonly():
423 return defer.fail(NotMutableError())
424 d = self._client.create_empty_dirnode()
426 entries = [(name, child, None)]
427 a = Adder(self, entries, overwrite=overwrite)
428 d = self._node.modify(a.modify)
429 d.addCallback(lambda res: child)
431 d.addCallback(_created)
434 def move_child_to(self, current_child_name, new_parent,
435 new_child_name=None, overwrite=True):
436 """I take one of my children and move them to a new parent. The child
437 is referenced by name. On the new parent, the child will live under
438 'new_child_name', which defaults to 'current_child_name'. I return a
439 Deferred that fires when the operation finishes."""
440 assert isinstance(current_child_name, unicode)
441 if self.is_readonly() or new_parent.is_readonly():
442 return defer.fail(NotMutableError())
443 if new_child_name is None:
444 new_child_name = current_child_name
445 assert isinstance(new_child_name, unicode)
446 d = self.get(current_child_name)
448 return new_parent.set_node(new_child_name, child,
451 d.addCallback(lambda child: self.delete(current_child_name))
455 def deep_traverse(self, walker):
456 """Perform a recursive walk, using this dirnode as a root, notifying
457 the 'walker' instance of everything I encounter.
459 I call walker.enter_directory(parent, children) once for each dirnode
460 I visit, immediately after retrieving the list of children. I pass in
461 the parent dirnode and the dict of childname->(childnode,metadata).
462 This function should *not* traverse the children: I will do that.
463 enter_directory() is most useful for the deep-stats number that
464 counts how large a directory is.
466 I call walker.add_node(node, path) for each node (both files and
467 directories) I can reach. Most work should be done here.
469 I avoid loops by keeping track of verifier-caps and refusing to call
470 each() or traverse a node that I've seen before.
472 I return a Deferred that will fire with the value of walker.finish().
475 # this is just a tree-walker, except that following each edge
476 # requires a Deferred. We use a ConcurrencyLimiter to make sure the
477 # fan-out doesn't cause problems.
480 walker.set_monitor(monitor)
482 found = set([self.get_verifier()])
483 limiter = ConcurrencyLimiter(10)
484 d = self._deep_traverse_dirnode(self, [],
485 walker, monitor, found, limiter)
486 d.addCallback(lambda ignored: walker.finish())
487 d.addBoth(monitor.finish)
488 d.addErrback(lambda f: None)
492 def _deep_traverse_dirnode(self, node, path,
493 walker, monitor, found, limiter):
494 # process this directory, then walk its children
495 monitor.raise_if_cancelled()
496 d = limiter.add(walker.add_node, node, path)
497 d.addCallback(lambda ignored: limiter.add(node.list))
498 d.addCallback(self._deep_traverse_dirnode_children, node, path,
499 walker, monitor, found, limiter)
502 def _deep_traverse_dirnode_children(self, children, parent, path,
503 walker, monitor, found, limiter):
504 monitor.raise_if_cancelled()
505 dl = [limiter.add(walker.enter_directory, parent, children)]
506 for name, (child, metadata) in children.iteritems():
507 verifier = child.get_verifier()
508 # allow LIT files (for which verifier==None) to be processed
509 if (verifier is not None) and (verifier in found):
512 childpath = path + [name]
513 if IDirectoryNode.providedBy(child):
514 dl.append(self._deep_traverse_dirnode(child, childpath,
518 dl.append(limiter.add(walker.add_node, child, childpath))
519 return defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True)
522 def build_manifest(self):
523 """Return a Monitor, with a ['status'] that will be a list of (path,
524 cap) tuples, for all nodes (directories and files) reachable from
526 walker = ManifestWalker(self)
527 return self.deep_traverse(walker)
529 def start_deep_stats(self):
530 # Since deep_traverse tracks verifier caps, we avoid double-counting
531 # children for which we've got both a write-cap and a read-cap
532 return self.deep_traverse(DeepStats(self))
534 def start_deep_check(self, verify=False):
535 return self.deep_traverse(DeepChecker(self, verify, repair=False))
537 def start_deep_check_and_repair(self, verify=False):
538 return self.deep_traverse(DeepChecker(self, verify, repair=True))
543 def __init__(self, origin):
546 for k in ["count-immutable-files",
547 "count-mutable-files",
548 "count-literal-files",
551 "size-immutable-files",
552 #"size-mutable-files",
553 "size-literal-files",
556 "largest-directory-children",
557 "largest-immutable-file",
558 #"largest-mutable-file",
562 for k in ["size-files-histogram"]:
563 self.histograms[k] = {} # maps (min,max) to count
564 self.buckets = [ (0,0), (1,3)]
565 self.root = math.sqrt(10)
567 def set_monitor(self, monitor):
568 self.monitor = monitor
569 monitor.origin_si = self.origin.get_storage_index()
570 monitor.set_status(self.get_results())
572 def add_node(self, node, childpath):
573 if IDirectoryNode.providedBy(node):
574 self.add("count-directories")
575 elif IMutableFileNode.providedBy(node):
576 self.add("count-files")
577 self.add("count-mutable-files")
578 # TODO: update the servermap, compute a size, add it to
579 # size-mutable-files, max it into "largest-mutable-file"
580 elif IFileNode.providedBy(node): # CHK and LIT
581 self.add("count-files")
582 size = node.get_size()
583 self.histogram("size-files-histogram", size)
584 theuri = from_string(node.get_uri())
585 if isinstance(theuri, LiteralFileURI):
586 self.add("count-literal-files")
587 self.add("size-literal-files", size)
589 self.add("count-immutable-files")
590 self.add("size-immutable-files", size)
591 self.max("largest-immutable-file", size)
593 def enter_directory(self, parent, children):
594 dirsize_bytes = parent.get_size()
595 dirsize_children = len(children)
596 self.add("size-directories", dirsize_bytes)
597 self.max("largest-directory", dirsize_bytes)
598 self.max("largest-directory-children", dirsize_children)
600 def add(self, key, value=1):
601 self.stats[key] += value
603 def max(self, key, value):
604 self.stats[key] = max(self.stats[key], value)
606 def which_bucket(self, size):
607 # return (min,max) such that min <= size <= max
608 # values are from the set (0,0), (1,3), (4,10), (11,31), (32,100),
609 # (101,316), (317, 1000), etc: two per decade
613 if i >= len(self.buckets):
615 new_lower = self.buckets[i-1][1]+1
616 new_upper = int(mathutil.next_power_of_k(new_lower, self.root))
617 self.buckets.append( (new_lower, new_upper) )
618 maybe = self.buckets[i]
619 if maybe[0] <= size <= maybe[1]:
623 def histogram(self, key, size):
624 bucket = self.which_bucket(size)
625 h = self.histograms[key]
630 def get_results(self):
631 stats = self.stats.copy()
632 for key in self.histograms:
633 h = self.histograms[key]
634 out = [ (bucket[0], bucket[1], h[bucket]) for bucket in h ]
640 return self.get_results()
642 class ManifestWalker(DeepStats):
643 def __init__(self, origin):
644 DeepStats.__init__(self, origin)
646 self.storage_index_strings = set()
647 self.verifycaps = set()
649 def add_node(self, node, path):
650 self.manifest.append( (tuple(path), node.get_uri()) )
651 si = node.get_storage_index()
653 self.storage_index_strings.add(base32.b2a(si))
654 v = node.get_verifier()
656 self.verifycaps.add(v.to_string())
657 return DeepStats.add_node(self, node, path)
659 def get_results(self):
660 stats = DeepStats.get_results(self)
661 return {"manifest": self.manifest,
662 "verifycaps": self.verifycaps,
663 "storage-index": self.storage_index_strings,
669 def __init__(self, root, verify, repair):
670 root_si = root.get_storage_index()
671 self._lp = log.msg(format="deep-check starting (%(si)s),"
672 " verify=%(verify)s, repair=%(repair)s",
673 si=base32.b2a(root_si), verify=verify, repair=repair)
674 self._verify = verify
675 self._repair = repair
677 self._results = DeepCheckAndRepairResults(root_si)
679 self._results = DeepCheckResults(root_si)
680 self._stats = DeepStats(root)
682 def set_monitor(self, monitor):
683 self.monitor = monitor
684 monitor.set_status(self._results)
686 def add_node(self, node, childpath):
688 d = node.check_and_repair(self.monitor, self._verify)
689 d.addCallback(self._results.add_check_and_repair, childpath)
691 d = node.check(self.monitor, self._verify)
692 d.addCallback(self._results.add_check, childpath)
693 d.addCallback(lambda ignored: self._stats.add_node(node, childpath))
696 def enter_directory(self, parent, children):
697 return self._stats.enter_directory(parent, children)
700 log.msg("deep-check done", parent=self._lp)
701 self._results.update_stats(self._stats.get_results())
705 # use client.create_dirnode() to make one of these