4 from zope.interface import implements
5 from twisted.internet import defer
7 from allmydata.mutable.common import NotMutableError
8 from allmydata.mutable.node import MutableFileNode
9 from allmydata.interfaces import IMutableFileNode, IDirectoryNode,\
10 IURI, IFileNode, IMutableFileURI, IVerifierURI, IFilesystemNode, \
11 ExistingChildError, ICheckable
12 from allmydata.immutable.checker import DeepCheckResults
13 from allmydata.util import hashutil, mathutil
14 from allmydata.util.hashutil import netstring
15 from allmydata.util.limiter import ConcurrencyLimiter
16 from allmydata.uri import NewDirectoryURI
17 from pycryptopp.cipher.aes import AES
19 def split_netstring(data, numstrings, allow_leftover=False):
20 """like string.split(), but extracts netstrings. If allow_leftover=False,
21 returns numstrings elements, and throws ValueError if there was leftover
22 data. If allow_leftover=True, returns numstrings+1 elements, in which the
23 last element is the leftover data (possibly an empty string)"""
25 assert numstrings >= 0
27 colon = data.index(":")
28 length = int(data[:colon])
29 string = data[colon+1:colon+1+length]
30 assert len(string) == length
31 elements.append(string)
32 assert data[colon+1+length] == ","
33 data = data[colon+1+length+1:]
34 if len(elements) == numstrings:
36 if len(elements) < numstrings:
37 raise ValueError("ran out of netstrings")
39 return tuple(elements + [data])
41 raise ValueError("leftover data in netstrings")
42 return tuple(elements)
45 def __init__(self, node, name, must_exist=True):
48 self.must_exist = True
49 def modify(self, old_contents):
50 children = self.node._unpack_contents(old_contents)
51 if self.name not in children:
53 raise KeyError(self.name)
56 self.old_child, metadata = children[self.name]
57 del children[self.name]
58 new_contents = self.node._pack_contents(children)
62 def __init__(self, node, name, metadata):
65 self.metadata = metadata
67 def modify(self, old_contents):
68 children = self.node._unpack_contents(old_contents)
69 children[self.name] = (children[self.name][0], self.metadata)
70 new_contents = self.node._pack_contents(children)
75 def __init__(self, node, entries=None, overwrite=True):
79 self.entries = entries
80 self.overwrite = overwrite
82 def set_node(self, name, node, metadata):
83 self.entries.append( [name, node, metadata] )
85 def modify(self, old_contents):
86 children = self.node._unpack_contents(old_contents)
88 for e in self.entries:
94 name, child, new_metadata = e
95 assert isinstance(name, unicode)
97 if not self.overwrite:
98 raise ExistingChildError("child '%s' already exists" % name)
99 metadata = children[name][1].copy()
101 metadata = {"ctime": now,
103 if new_metadata is None:
105 if "ctime" not in metadata:
106 metadata["ctime"] = now
107 metadata["mtime"] = now
110 metadata = new_metadata.copy()
111 children[name] = (child, metadata)
112 new_contents = self.node._pack_contents(children)
115 class NewDirectoryNode:
116 implements(IDirectoryNode, ICheckable)
117 filenode_class = MutableFileNode
119 def __init__(self, client):
120 self._client = client
121 self._most_recent_size = None
124 return "<%s %s %s>" % (self.__class__.__name__, self.is_readonly() and "RO" or "RW", hasattr(self, '_uri') and self._uri.abbrev())
125 def init_from_uri(self, myuri):
126 self._uri = IURI(myuri)
127 self._node = self.filenode_class(self._client)
128 self._node.init_from_uri(self._uri.get_filenode_uri())
131 def create(self, keypair_generator=None):
133 Returns a deferred that eventually fires with self once the directory
134 has been created (distributed across a set of storage servers).
136 # first we create a MutableFileNode with empty_contents, then use its
137 # URI to create our own.
138 self._node = self.filenode_class(self._client)
139 empty_contents = self._pack_contents({})
140 d = self._node.create(empty_contents, keypair_generator)
141 d.addCallback(self._filenode_created)
143 def _filenode_created(self, res):
144 self._uri = NewDirectoryURI(IMutableFileURI(self._node.get_uri()))
148 # return the size of our backing mutable file, in bytes, if we've
150 return self._most_recent_size
152 def _set_size(self, data):
153 self._most_recent_size = len(data)
157 d = self._node.download_best_version()
158 d.addCallback(self._set_size)
159 d.addCallback(self._unpack_contents)
162 def _encrypt_rwcap(self, rwcap):
163 assert isinstance(rwcap, str)
165 key = hashutil.mutable_rwcap_key_hash(IV, self._node.get_writekey())
167 crypttext = cryptor.process(rwcap)
168 mac = hashutil.hmac(key, IV + crypttext)
169 assert len(mac) == 32
170 return IV + crypttext + mac
172 def _decrypt_rwcapdata(self, encwrcap):
174 crypttext = encwrcap[16:-32]
176 key = hashutil.mutable_rwcap_key_hash(IV, self._node.get_writekey())
177 if mac != hashutil.hmac(key, IV+crypttext):
178 raise hashutil.IntegrityCheckError("HMAC does not match, crypttext is corrupted")
180 plaintext = cryptor.process(crypttext)
183 def _create_node(self, child_uri):
184 return self._client.create_node_from_uri(child_uri)
186 def _unpack_contents(self, data):
187 # the directory is serialized as a list of netstrings, one per child.
188 # Each child is serialized as a list of four netstrings: (name,
189 # rocap, rwcap, metadata), in which the name,rocap,metadata are in
190 # cleartext. The 'name' is UTF-8 encoded. The rwcap is formatted as:
191 # pack("16ss32s", iv, AES(H(writekey+iv), plaintextrwcap), mac)
192 assert isinstance(data, str)
193 # an empty directory is serialized as an empty string
196 writeable = not self.is_readonly()
199 entry, data = split_netstring(data, 1, True)
200 name, rocap, rwcapdata, metadata_s = split_netstring(entry, 4)
201 name = name.decode("utf-8")
203 rwcap = self._decrypt_rwcapdata(rwcapdata)
204 child = self._create_node(rwcap)
206 child = self._create_node(rocap)
207 metadata = simplejson.loads(metadata_s)
208 assert isinstance(metadata, dict)
209 children[name] = (child, metadata)
212 def _pack_contents(self, children):
213 # expects children in the same format as _unpack_contents
214 assert isinstance(children, dict)
216 for name in sorted(children.keys()):
217 child, metadata = children[name]
218 assert isinstance(name, unicode)
219 assert (IFileNode.providedBy(child)
220 or IMutableFileNode.providedBy(child)
221 or IDirectoryNode.providedBy(child)), (name,child)
222 assert isinstance(metadata, dict)
223 rwcap = child.get_uri() # might be RO if the child is not writeable
224 rocap = child.get_readonly_uri()
225 entry = "".join([netstring(name.encode("utf-8")),
227 netstring(self._encrypt_rwcap(rwcap)),
228 netstring(simplejson.dumps(metadata))])
229 entries.append(netstring(entry))
230 return "".join(entries)
232 def is_readonly(self):
233 return self._node.is_readonly()
234 def is_mutable(self):
235 return self._node.is_mutable()
238 return self._uri.to_string()
240 def get_readonly_uri(self):
241 return self._uri.get_readonly().to_string()
243 def get_verifier(self):
244 return self._uri.get_verifier().to_string()
246 def check(self, verify=False, repair=False):
247 """Perform a file check. See IChecker.check for details."""
248 return self._node.check(verify, repair)
251 """I return a Deferred that fires with a dictionary mapping child
252 name to a tuple of (IFileNode or IDirectoryNode, metadata)."""
255 def has_child(self, name):
256 """I return a Deferred that fires with a boolean, True if there
257 exists a child of the given name, False if not."""
258 assert isinstance(name, unicode)
260 d.addCallback(lambda children: children.has_key(name))
263 def _get(self, children, name):
264 child = children.get(name)
270 """I return a Deferred that fires with the named child node,
271 which is either an IFileNode or an IDirectoryNode."""
272 assert isinstance(name, unicode)
274 d.addCallback(self._get, name)
277 def get_metadata_for(self, name):
278 assert isinstance(name, unicode)
280 d.addCallback(lambda children: children[name][1])
283 def set_metadata_for(self, name, metadata):
284 assert isinstance(name, unicode)
285 if self.is_readonly():
286 return defer.fail(NotMutableError())
287 assert isinstance(metadata, dict)
288 s = MetadataSetter(self, name, metadata)
289 d = self._node.modify(s.modify)
290 d.addCallback(lambda res: self)
293 def get_child_at_path(self, path):
294 """Transform a child path into an IDirectoryNode or IFileNode.
296 I perform a recursive series of 'get' operations to find the named
297 descendant node. I return a Deferred that fires with the node, or
298 errbacks with IndexError if the node could not be found.
300 The path can be either a single string (slash-separated) or a list of
305 return defer.succeed(self)
306 if isinstance(path, (list, tuple)):
309 path = path.split("/")
311 assert isinstance(p, unicode)
313 remaining_path = path[1:]
314 d = self.get(childname)
317 return node.get_child_at_path(remaining_path)
321 def set_uri(self, name, child_uri, metadata=None, overwrite=True):
322 """I add a child (by URI) at the specific name. I return a Deferred
323 that fires with the child node when the operation finishes. I will
324 replace any existing child of the same name.
326 The child_uri could be for a file, or for a directory (either
327 read-write or read-only, using a URI that came from get_uri() ).
329 If this directory node is read-only, the Deferred will errback with a
331 assert isinstance(name, unicode)
332 child_node = self._create_node(child_uri)
333 d = self.set_node(name, child_node, metadata, overwrite)
334 d.addCallback(lambda res: child_node)
337 def set_children(self, entries, overwrite=True):
339 a = Adder(self, overwrite=overwrite)
347 name, child_uri, metadata = e
348 assert isinstance(name, unicode)
349 a.set_node(name, self._create_node(child_uri), metadata)
350 return self._node.modify(a.modify)
352 def set_node(self, name, child, metadata=None, overwrite=True):
353 """I add a child at the specific name. I return a Deferred that fires
354 when the operation finishes. This Deferred will fire with the child
355 node that was just added. I will replace any existing child of the
358 If this directory node is read-only, the Deferred will errback with a
361 if self.is_readonly():
362 return defer.fail(NotMutableError())
363 assert isinstance(name, unicode)
364 assert IFilesystemNode.providedBy(child), child
365 a = Adder(self, overwrite=overwrite)
366 a.set_node(name, child, metadata)
367 d = self._node.modify(a.modify)
368 d.addCallback(lambda res: child)
371 def set_nodes(self, entries, overwrite=True):
372 if self.is_readonly():
373 return defer.fail(NotMutableError())
374 a = Adder(self, entries, overwrite=overwrite)
375 d = self._node.modify(a.modify)
376 d.addCallback(lambda res: None)
380 def add_file(self, name, uploadable, metadata=None, overwrite=True):
381 """I upload a file (using the given IUploadable), then attach the
382 resulting FileNode to the directory at the given name. I return a
383 Deferred that fires (with the IFileNode of the uploaded file) when
384 the operation completes."""
385 assert isinstance(name, unicode)
386 if self.is_readonly():
387 return defer.fail(NotMutableError())
388 d = self._client.upload(uploadable)
389 d.addCallback(lambda results: results.uri)
390 d.addCallback(self._client.create_node_from_uri)
391 d.addCallback(lambda node:
392 self.set_node(name, node, metadata, overwrite))
395 def delete(self, name):
396 """I remove the child at the specific name. I return a Deferred that
397 fires (with the node just removed) when the operation finishes."""
398 assert isinstance(name, unicode)
399 if self.is_readonly():
400 return defer.fail(NotMutableError())
401 deleter = Deleter(self, name)
402 d = self._node.modify(deleter.modify)
403 d.addCallback(lambda res: deleter.old_child)
406 def create_empty_directory(self, name, overwrite=True):
407 """I create and attach an empty directory at the given name. I return
408 a Deferred that fires (with the new directory node) when the
409 operation finishes."""
410 assert isinstance(name, unicode)
411 if self.is_readonly():
412 return defer.fail(NotMutableError())
413 d = self._client.create_empty_dirnode()
415 entries = [(name, child, None)]
416 a = Adder(self, entries, overwrite=overwrite)
417 d = self._node.modify(a.modify)
418 d.addCallback(lambda res: child)
420 d.addCallback(_created)
423 def move_child_to(self, current_child_name, new_parent,
424 new_child_name=None, overwrite=True):
425 """I take one of my children and move them to a new parent. The child
426 is referenced by name. On the new parent, the child will live under
427 'new_child_name', which defaults to 'current_child_name'. I return a
428 Deferred that fires when the operation finishes."""
429 assert isinstance(current_child_name, unicode)
430 if self.is_readonly() or new_parent.is_readonly():
431 return defer.fail(NotMutableError())
432 if new_child_name is None:
433 new_child_name = current_child_name
434 assert isinstance(new_child_name, unicode)
435 d = self.get(current_child_name)
437 return new_parent.set_node(new_child_name, child,
440 d.addCallback(lambda child: self.delete(current_child_name))
443 def build_manifest(self):
444 """Return a frozenset of verifier-capability strings for all nodes
445 (directories and files) reachable from this one."""
447 # this is just a tree-walker, except that following each edge
448 # requires a Deferred. We use a ConcurrencyLimiter to make sure the
449 # fan-out doesn't cause problems.
452 manifest.add(self.get_verifier())
453 limiter = ConcurrencyLimiter(10) # allow 10 in parallel
455 d = self._build_manifest_from_node(self, manifest, limiter)
457 # LIT nodes have no verifier-capability: their data is stored
458 # inside the URI itself, so there is no need to refresh anything.
459 # They indicate this by returning None from their get_verifier
460 # method. We need to remove any such Nones from our set. We also
461 # want to convert all these caps into strings.
462 return frozenset([IVerifierURI(cap).to_string()
468 def _build_manifest_from_node(self, node, manifest, limiter):
469 d = limiter.add(node.list)
472 for name, (child, metadata) in res.iteritems():
473 verifier = child.get_verifier()
474 if verifier not in manifest:
475 manifest.add(verifier)
476 if IDirectoryNode.providedBy(child):
477 dl.append(self._build_manifest_from_node(child,
481 return defer.DeferredList(dl)
482 d.addCallback(_got_list)
485 def deep_stats(self):
487 # we track verifier caps, to avoid double-counting children for which
488 # we've got both a write-cap and a read-cap
490 found.add(self.get_verifier())
492 limiter = ConcurrencyLimiter(10)
494 d = self._add_deepstats_from_node(self, found, stats, limiter)
495 d.addCallback(lambda res: stats.get_results())
498 def _add_deepstats_from_node(self, node, found, stats, limiter):
499 d = limiter.add(node.list)
500 def _got_list(children):
502 dirsize_bytes = node.get_size()
503 dirsize_children = len(children)
504 stats.add("count-directories")
505 stats.add("size-directories", dirsize_bytes)
506 stats.max("largest-directory", dirsize_bytes)
507 stats.max("largest-directory-children", dirsize_children)
508 for name, (child, metadata) in children.iteritems():
509 verifier = child.get_verifier()
510 if verifier in found:
513 if IDirectoryNode.providedBy(child):
514 dl.append(self._add_deepstats_from_node(child, found,
516 elif IMutableFileNode.providedBy(child):
517 stats.add("count-files")
518 stats.add("count-mutable-files")
519 # TODO: update the servermap, compute a size, add it to
520 # size-mutable-files, max it into "largest-mutable-file"
521 elif IFileNode.providedBy(child): # CHK and LIT
522 stats.add("count-files")
523 size = child.get_size()
524 stats.histogram("size-files-histogram", size)
525 if child.get_uri().startswith("URI:LIT:"):
526 stats.add("count-literal-files")
527 stats.add("size-literal-files", size)
529 stats.add("count-immutable-files")
530 stats.add("size-immutable-files", size)
531 stats.max("largest-immutable-file", size)
533 return defer.DeferredList(dl)
534 d.addCallback(_got_list)
537 def deep_check(self, verify=False, repair=False):
538 results = DeepCheckResults()
540 found.add(self.get_verifier())
542 limiter = ConcurrencyLimiter(10)
544 d = self._add_check_from_node(self, results, limiter, verify, repair)
545 d.addCallback(lambda res:
546 self._add_deepcheck_from_dirnode(self,
547 found, results, limiter,
549 d.addCallback(lambda res: results)
552 def _add_check_from_node(self, node, results, limiter, verify, repair):
553 d = limiter.add(node.check, verify, repair)
554 d.addCallback(results.add_check)
557 def _add_deepcheck_from_dirnode(self, node, found, results, limiter,
559 d = limiter.add(node.list)
560 def _got_list(children):
562 for name, (child, metadata) in children.iteritems():
563 verifier = child.get_verifier()
564 if verifier in found:
567 dl.append(self._add_check_from_node(child,
570 if IDirectoryNode.providedBy(child):
571 dl.append(self._add_deepcheck_from_node(child, found,
575 return defer.DeferredList(dl)
576 d.addCallback(_got_list)
582 for k in ["count-immutable-files",
583 "count-mutable-files",
584 "count-literal-files",
587 "size-immutable-files",
588 #"size-mutable-files",
589 "size-literal-files",
592 "largest-directory-children",
593 "largest-immutable-file",
594 #"largest-mutable-file",
598 for k in ["size-files-histogram"]:
599 self.histograms[k] = {} # maps (min,max) to count
600 self.buckets = [ (0,0), (1,3)]
601 self.root = math.sqrt(10)
603 def add(self, key, value=1):
604 self.stats[key] += value
606 def max(self, key, value):
607 self.stats[key] = max(self.stats[key], value)
609 def which_bucket(self, size):
610 # return (min,max) such that min <= size <= max
611 # values are from the set (0,0), (1,3), (4,10), (11,31), (32,100),
612 # (101,316), (317, 1000), etc: two per decade
616 if i >= len(self.buckets):
618 new_lower = self.buckets[i-1][1]+1
619 new_upper = int(mathutil.next_power_of_k(new_lower, self.root))
620 self.buckets.append( (new_lower, new_upper) )
621 maybe = self.buckets[i]
622 if maybe[0] <= size <= maybe[1]:
626 def histogram(self, key, size):
627 bucket = self.which_bucket(size)
628 h = self.histograms[key]
633 def get_results(self):
634 stats = self.stats.copy()
635 for key in self.histograms:
636 h = self.histograms[key]
637 out = [ (bucket[0], bucket[1], h[bucket]) for bucket in h ]
643 # use client.create_dirnode() to make one of these