4 from zope.interface import implements
5 from twisted.internet import defer
7 from allmydata.mutable.common import NotMutableError
8 from allmydata.mutable.node import MutableFileNode
9 from allmydata.interfaces import IMutableFileNode, IDirectoryNode,\
10 IURI, IFileNode, IMutableFileURI, IVerifierURI, IFilesystemNode, \
11 ExistingChildError, ICheckable
12 from allmydata.checker_results import DeepCheckResults, \
13 DeepCheckAndRepairResults
14 from allmydata.util import hashutil, mathutil, base32, log
15 from allmydata.util.hashutil import netstring
16 from allmydata.util.limiter import ConcurrencyLimiter
17 from allmydata.uri import NewDirectoryURI
18 from pycryptopp.cipher.aes import AES
20 def split_netstring(data, numstrings, allow_leftover=False):
21 """like string.split(), but extracts netstrings. If allow_leftover=False,
22 returns numstrings elements, and throws ValueError if there was leftover
23 data. If allow_leftover=True, returns numstrings+1 elements, in which the
24 last element is the leftover data (possibly an empty string)"""
26 assert numstrings >= 0
28 colon = data.index(":")
29 length = int(data[:colon])
30 string = data[colon+1:colon+1+length]
31 assert len(string) == length
32 elements.append(string)
33 assert data[colon+1+length] == ","
34 data = data[colon+1+length+1:]
35 if len(elements) == numstrings:
37 if len(elements) < numstrings:
38 raise ValueError("ran out of netstrings")
40 return tuple(elements + [data])
42 raise ValueError("leftover data in netstrings")
43 return tuple(elements)
46 def __init__(self, node, name, must_exist=True):
49 self.must_exist = True
50 def modify(self, old_contents):
51 children = self.node._unpack_contents(old_contents)
52 if self.name not in children:
54 raise KeyError(self.name)
57 self.old_child, metadata = children[self.name]
58 del children[self.name]
59 new_contents = self.node._pack_contents(children)
63 def __init__(self, node, name, metadata):
66 self.metadata = metadata
68 def modify(self, old_contents):
69 children = self.node._unpack_contents(old_contents)
70 children[self.name] = (children[self.name][0], self.metadata)
71 new_contents = self.node._pack_contents(children)
76 def __init__(self, node, entries=None, overwrite=True):
80 self.entries = entries
81 self.overwrite = overwrite
83 def set_node(self, name, node, metadata):
84 self.entries.append( [name, node, metadata] )
86 def modify(self, old_contents):
87 children = self.node._unpack_contents(old_contents)
89 for e in self.entries:
95 name, child, new_metadata = e
96 assert isinstance(name, unicode)
98 if not self.overwrite:
99 raise ExistingChildError("child '%s' already exists" % name)
100 metadata = children[name][1].copy()
102 metadata = {"ctime": now,
104 if new_metadata is None:
106 if "ctime" not in metadata:
107 metadata["ctime"] = now
108 metadata["mtime"] = now
111 metadata = new_metadata.copy()
112 children[name] = (child, metadata)
113 new_contents = self.node._pack_contents(children)
116 class NewDirectoryNode:
117 implements(IDirectoryNode, ICheckable)
118 filenode_class = MutableFileNode
120 def __init__(self, client):
121 self._client = client
122 self._most_recent_size = None
125 return "<%s %s %s>" % (self.__class__.__name__, self.is_readonly() and "RO" or "RW", hasattr(self, '_uri') and self._uri.abbrev())
126 def init_from_uri(self, myuri):
127 self._uri = IURI(myuri)
128 self._node = self.filenode_class(self._client)
129 self._node.init_from_uri(self._uri.get_filenode_uri())
132 def create(self, keypair_generator=None):
134 Returns a deferred that eventually fires with self once the directory
135 has been created (distributed across a set of storage servers).
137 # first we create a MutableFileNode with empty_contents, then use its
138 # URI to create our own.
139 self._node = self.filenode_class(self._client)
140 empty_contents = self._pack_contents({})
141 d = self._node.create(empty_contents, keypair_generator)
142 d.addCallback(self._filenode_created)
144 def _filenode_created(self, res):
145 self._uri = NewDirectoryURI(IMutableFileURI(self._node.get_uri()))
149 # return the size of our backing mutable file, in bytes, if we've
151 return self._most_recent_size
153 def _set_size(self, data):
154 self._most_recent_size = len(data)
158 d = self._node.download_best_version()
159 d.addCallback(self._set_size)
160 d.addCallback(self._unpack_contents)
163 def _encrypt_rwcap(self, rwcap):
164 assert isinstance(rwcap, str)
166 key = hashutil.mutable_rwcap_key_hash(IV, self._node.get_writekey())
168 crypttext = cryptor.process(rwcap)
169 mac = hashutil.hmac(key, IV + crypttext)
170 assert len(mac) == 32
171 return IV + crypttext + mac
173 def _decrypt_rwcapdata(self, encwrcap):
175 crypttext = encwrcap[16:-32]
177 key = hashutil.mutable_rwcap_key_hash(IV, self._node.get_writekey())
178 if mac != hashutil.hmac(key, IV+crypttext):
179 raise hashutil.IntegrityCheckError("HMAC does not match, crypttext is corrupted")
181 plaintext = cryptor.process(crypttext)
184 def _create_node(self, child_uri):
185 return self._client.create_node_from_uri(child_uri)
187 def _unpack_contents(self, data):
188 # the directory is serialized as a list of netstrings, one per child.
189 # Each child is serialized as a list of four netstrings: (name,
190 # rocap, rwcap, metadata), in which the name,rocap,metadata are in
191 # cleartext. The 'name' is UTF-8 encoded. The rwcap is formatted as:
192 # pack("16ss32s", iv, AES(H(writekey+iv), plaintextrwcap), mac)
193 assert isinstance(data, str)
194 # an empty directory is serialized as an empty string
197 writeable = not self.is_readonly()
200 entry, data = split_netstring(data, 1, True)
201 name, rocap, rwcapdata, metadata_s = split_netstring(entry, 4)
202 name = name.decode("utf-8")
204 rwcap = self._decrypt_rwcapdata(rwcapdata)
205 child = self._create_node(rwcap)
207 child = self._create_node(rocap)
208 metadata = simplejson.loads(metadata_s)
209 assert isinstance(metadata, dict)
210 children[name] = (child, metadata)
213 def _pack_contents(self, children):
214 # expects children in the same format as _unpack_contents
215 assert isinstance(children, dict)
217 for name in sorted(children.keys()):
218 child, metadata = children[name]
219 assert isinstance(name, unicode)
220 assert (IFileNode.providedBy(child)
221 or IMutableFileNode.providedBy(child)
222 or IDirectoryNode.providedBy(child)), (name,child)
223 assert isinstance(metadata, dict)
224 rwcap = child.get_uri() # might be RO if the child is not writeable
225 rocap = child.get_readonly_uri()
226 entry = "".join([netstring(name.encode("utf-8")),
228 netstring(self._encrypt_rwcap(rwcap)),
229 netstring(simplejson.dumps(metadata))])
230 entries.append(netstring(entry))
231 return "".join(entries)
233 def is_readonly(self):
234 return self._node.is_readonly()
235 def is_mutable(self):
236 return self._node.is_mutable()
239 return self._uri.to_string()
241 def get_readonly_uri(self):
242 return self._uri.get_readonly().to_string()
244 def get_verifier(self):
245 return self._uri.get_verifier()
247 def get_storage_index(self):
248 return self._uri._filenode_uri.storage_index
250 def check(self, verify=False):
251 """Perform a file check. See IChecker.check for details."""
252 return self._node.check(verify)
253 def check_and_repair(self, verify=False):
254 return self._node.check_and_repair(verify)
257 """I return a Deferred that fires with a dictionary mapping child
258 name to a tuple of (IFileNode or IDirectoryNode, metadata)."""
261 def has_child(self, name):
262 """I return a Deferred that fires with a boolean, True if there
263 exists a child of the given name, False if not."""
264 assert isinstance(name, unicode)
266 d.addCallback(lambda children: children.has_key(name))
269 def _get(self, children, name):
270 child = children.get(name)
276 """I return a Deferred that fires with the named child node,
277 which is either an IFileNode or an IDirectoryNode."""
278 assert isinstance(name, unicode)
280 d.addCallback(self._get, name)
283 def get_metadata_for(self, name):
284 assert isinstance(name, unicode)
286 d.addCallback(lambda children: children[name][1])
289 def set_metadata_for(self, name, metadata):
290 assert isinstance(name, unicode)
291 if self.is_readonly():
292 return defer.fail(NotMutableError())
293 assert isinstance(metadata, dict)
294 s = MetadataSetter(self, name, metadata)
295 d = self._node.modify(s.modify)
296 d.addCallback(lambda res: self)
299 def get_child_at_path(self, path):
300 """Transform a child path into an IDirectoryNode or IFileNode.
302 I perform a recursive series of 'get' operations to find the named
303 descendant node. I return a Deferred that fires with the node, or
304 errbacks with IndexError if the node could not be found.
306 The path can be either a single string (slash-separated) or a list of
311 return defer.succeed(self)
312 if isinstance(path, (list, tuple)):
315 path = path.split("/")
317 assert isinstance(p, unicode)
319 remaining_path = path[1:]
320 d = self.get(childname)
323 return node.get_child_at_path(remaining_path)
327 def set_uri(self, name, child_uri, metadata=None, overwrite=True):
328 """I add a child (by URI) at the specific name. I return a Deferred
329 that fires with the child node when the operation finishes. I will
330 replace any existing child of the same name.
332 The child_uri could be for a file, or for a directory (either
333 read-write or read-only, using a URI that came from get_uri() ).
335 If this directory node is read-only, the Deferred will errback with a
337 assert isinstance(name, unicode)
338 child_node = self._create_node(child_uri)
339 d = self.set_node(name, child_node, metadata, overwrite)
340 d.addCallback(lambda res: child_node)
343 def set_children(self, entries, overwrite=True):
345 a = Adder(self, overwrite=overwrite)
353 name, child_uri, metadata = e
354 assert isinstance(name, unicode)
355 a.set_node(name, self._create_node(child_uri), metadata)
356 return self._node.modify(a.modify)
358 def set_node(self, name, child, metadata=None, overwrite=True):
359 """I add a child at the specific name. I return a Deferred that fires
360 when the operation finishes. This Deferred will fire with the child
361 node that was just added. I will replace any existing child of the
364 If this directory node is read-only, the Deferred will errback with a
367 if self.is_readonly():
368 return defer.fail(NotMutableError())
369 assert isinstance(name, unicode)
370 assert IFilesystemNode.providedBy(child), child
371 a = Adder(self, overwrite=overwrite)
372 a.set_node(name, child, metadata)
373 d = self._node.modify(a.modify)
374 d.addCallback(lambda res: child)
377 def set_nodes(self, entries, overwrite=True):
378 if self.is_readonly():
379 return defer.fail(NotMutableError())
380 a = Adder(self, entries, overwrite=overwrite)
381 d = self._node.modify(a.modify)
382 d.addCallback(lambda res: None)
386 def add_file(self, name, uploadable, metadata=None, overwrite=True):
387 """I upload a file (using the given IUploadable), then attach the
388 resulting FileNode to the directory at the given name. I return a
389 Deferred that fires (with the IFileNode of the uploaded file) when
390 the operation completes."""
391 assert isinstance(name, unicode)
392 if self.is_readonly():
393 return defer.fail(NotMutableError())
394 d = self._client.upload(uploadable)
395 d.addCallback(lambda results: results.uri)
396 d.addCallback(self._client.create_node_from_uri)
397 d.addCallback(lambda node:
398 self.set_node(name, node, metadata, overwrite))
401 def delete(self, name):
402 """I remove the child at the specific name. I return a Deferred that
403 fires (with the node just removed) when the operation finishes."""
404 assert isinstance(name, unicode)
405 if self.is_readonly():
406 return defer.fail(NotMutableError())
407 deleter = Deleter(self, name)
408 d = self._node.modify(deleter.modify)
409 d.addCallback(lambda res: deleter.old_child)
412 def create_empty_directory(self, name, overwrite=True):
413 """I create and attach an empty directory at the given name. I return
414 a Deferred that fires (with the new directory node) when the
415 operation finishes."""
416 assert isinstance(name, unicode)
417 if self.is_readonly():
418 return defer.fail(NotMutableError())
419 d = self._client.create_empty_dirnode()
421 entries = [(name, child, None)]
422 a = Adder(self, entries, overwrite=overwrite)
423 d = self._node.modify(a.modify)
424 d.addCallback(lambda res: child)
426 d.addCallback(_created)
429 def move_child_to(self, current_child_name, new_parent,
430 new_child_name=None, overwrite=True):
431 """I take one of my children and move them to a new parent. The child
432 is referenced by name. On the new parent, the child will live under
433 'new_child_name', which defaults to 'current_child_name'. I return a
434 Deferred that fires when the operation finishes."""
435 assert isinstance(current_child_name, unicode)
436 if self.is_readonly() or new_parent.is_readonly():
437 return defer.fail(NotMutableError())
438 if new_child_name is None:
439 new_child_name = current_child_name
440 assert isinstance(new_child_name, unicode)
441 d = self.get(current_child_name)
443 return new_parent.set_node(new_child_name, child,
446 d.addCallback(lambda child: self.delete(current_child_name))
449 def build_manifest(self):
450 """Return a frozenset of verifier-capability strings for all nodes
451 (directories and files) reachable from this one."""
453 # this is just a tree-walker, except that following each edge
454 # requires a Deferred. We use a ConcurrencyLimiter to make sure the
455 # fan-out doesn't cause problems.
458 manifest.add(self.get_verifier())
459 limiter = ConcurrencyLimiter(10) # allow 10 in parallel
461 d = self._build_manifest_from_node(self, manifest, limiter)
463 # LIT nodes have no verifier-capability: their data is stored
464 # inside the URI itself, so there is no need to refresh anything.
465 # They indicate this by returning None from their get_verifier
466 # method. We need to remove any such Nones from our set. We also
467 # want to convert all these caps into strings.
468 return frozenset([IVerifierURI(cap).to_string()
474 def _build_manifest_from_node(self, node, manifest, limiter):
475 d = limiter.add(node.list)
478 for name, (child, metadata) in res.iteritems():
479 verifier = child.get_verifier()
480 if verifier not in manifest:
481 manifest.add(verifier)
482 if IDirectoryNode.providedBy(child):
483 dl.append(self._build_manifest_from_node(child,
487 return defer.DeferredList(dl)
488 d.addCallback(_got_list)
491 def deep_stats(self):
493 # we track verifier caps, to avoid double-counting children for which
494 # we've got both a write-cap and a read-cap
496 found.add(self.get_verifier())
498 limiter = ConcurrencyLimiter(10)
500 d = self._add_deepstats_from_node(self, found, stats, limiter)
501 d.addCallback(lambda res: stats.get_results())
504 def _add_deepstats_from_node(self, node, found, stats, limiter):
505 d = limiter.add(node.list)
506 def _got_list(children):
508 dirsize_bytes = node.get_size()
509 dirsize_children = len(children)
510 stats.add("count-directories")
511 stats.add("size-directories", dirsize_bytes)
512 stats.max("largest-directory", dirsize_bytes)
513 stats.max("largest-directory-children", dirsize_children)
514 for name, (child, metadata) in children.iteritems():
515 verifier = child.get_verifier()
516 if verifier in found:
519 if IDirectoryNode.providedBy(child):
520 dl.append(self._add_deepstats_from_node(child, found,
522 elif IMutableFileNode.providedBy(child):
523 stats.add("count-files")
524 stats.add("count-mutable-files")
525 # TODO: update the servermap, compute a size, add it to
526 # size-mutable-files, max it into "largest-mutable-file"
527 elif IFileNode.providedBy(child): # CHK and LIT
528 stats.add("count-files")
529 size = child.get_size()
530 stats.histogram("size-files-histogram", size)
531 if child.get_uri().startswith("URI:LIT:"):
532 stats.add("count-literal-files")
533 stats.add("size-literal-files", size)
535 stats.add("count-immutable-files")
536 stats.add("size-immutable-files", size)
537 stats.max("largest-immutable-file", size)
539 return defer.DeferredList(dl)
540 d.addCallback(_got_list)
543 def deep_check(self, verify=False):
544 return self.deep_check_base(verify, False)
545 def deep_check_and_repair(self, verify=False):
546 return self.deep_check_base(verify, True)
548 def deep_check_base(self, verify, repair):
549 # shallow-check each object first, then traverse children
550 root_si = self._node.get_storage_index()
551 self._lp = log.msg(format="deep-check starting (%(si)s),"
552 " verify=%(verify)s, repair=%(repair)s",
553 si=base32.b2a(root_si), verify=verify, repair=repair)
555 results = DeepCheckAndRepairResults(root_si)
557 results = DeepCheckResults(root_si)
559 limiter = ConcurrencyLimiter(10)
561 d = self._add_deepcheck_from_node([], self, results, found, limiter,
564 log.msg("deep-check done", parent=self._lp)
569 def _add_deepcheck_from_node(self, path, node, results, found, limiter,
571 verifier = node.get_verifier()
572 if verifier in found:
578 d = limiter.add(node.check_and_repair, verify)
579 d.addCallback(results.add_check_and_repair, path)
581 d = limiter.add(node.check, verify)
582 d.addCallback(results.add_check, path)
584 # TODO: stats: split the DeepStats.foo calls out of
585 # _add_deepstats_from_node into a separate non-recursing method, call
586 # it from both here and _add_deepstats_from_node.
588 if IDirectoryNode.providedBy(node):
589 d.addCallback(lambda res: node.list())
590 def _got_children(children):
592 for name, (child, metadata) in children.iteritems():
593 childpath = path + [name]
594 d2 = self._add_deepcheck_from_node(childpath, child,
601 return defer.DeferredList(dl, fireOnOneErrback=True)
602 d.addCallback(_got_children)
609 for k in ["count-immutable-files",
610 "count-mutable-files",
611 "count-literal-files",
614 "size-immutable-files",
615 #"size-mutable-files",
616 "size-literal-files",
619 "largest-directory-children",
620 "largest-immutable-file",
621 #"largest-mutable-file",
625 for k in ["size-files-histogram"]:
626 self.histograms[k] = {} # maps (min,max) to count
627 self.buckets = [ (0,0), (1,3)]
628 self.root = math.sqrt(10)
630 def add(self, key, value=1):
631 self.stats[key] += value
633 def max(self, key, value):
634 self.stats[key] = max(self.stats[key], value)
636 def which_bucket(self, size):
637 # return (min,max) such that min <= size <= max
638 # values are from the set (0,0), (1,3), (4,10), (11,31), (32,100),
639 # (101,316), (317, 1000), etc: two per decade
643 if i >= len(self.buckets):
645 new_lower = self.buckets[i-1][1]+1
646 new_upper = int(mathutil.next_power_of_k(new_lower, self.root))
647 self.buckets.append( (new_lower, new_upper) )
648 maybe = self.buckets[i]
649 if maybe[0] <= size <= maybe[1]:
653 def histogram(self, key, size):
654 bucket = self.which_bucket(size)
655 h = self.histograms[key]
660 def get_results(self):
661 stats = self.stats.copy()
662 for key in self.histograms:
663 h = self.histograms[key]
664 out = [ (bucket[0], bucket[1], h[bucket]) for bucket in h ]
670 # use client.create_dirnode() to make one of these