4 from zope.interface import implements
5 from twisted.internet import defer
7 from allmydata.mutable.common import NotMutableError
8 from allmydata.mutable.node import MutableFileNode
9 from allmydata.interfaces import IMutableFileNode, IDirectoryNode,\
10 IURI, IFileNode, IMutableFileURI, IVerifierURI, IFilesystemNode
11 from allmydata.util import hashutil
12 from allmydata.util.hashutil import netstring
13 from allmydata.util.limiter import ConcurrencyLimiter
14 from allmydata.uri import NewDirectoryURI
15 from pycryptopp.cipher.aes import AES
17 def split_netstring(data, numstrings, allow_leftover=False):
18 """like string.split(), but extracts netstrings. If allow_leftover=False,
19 returns numstrings elements, and throws ValueError if there was leftover
20 data. If allow_leftover=True, returns numstrings+1 elements, in which the
21 last element is the leftover data (possibly an empty string)"""
23 assert numstrings >= 0
25 colon = data.index(":")
26 length = int(data[:colon])
27 string = data[colon+1:colon+1+length]
28 assert len(string) == length
29 elements.append(string)
30 assert data[colon+1+length] == ","
31 data = data[colon+1+length+1:]
32 if len(elements) == numstrings:
34 if len(elements) < numstrings:
35 raise ValueError("ran out of netstrings")
37 return tuple(elements + [data])
39 raise ValueError("leftover data in netstrings")
40 return tuple(elements)
43 def __init__(self, node, name, must_exist=True):
46 self.must_exist = True
47 def modify(self, old_contents):
48 children = self.node._unpack_contents(old_contents)
49 if self.name not in children:
51 raise KeyError(self.name)
54 self.old_child, metadata = children[self.name]
55 del children[self.name]
56 new_contents = self.node._pack_contents(children)
60 def __init__(self, node, name, metadata):
63 self.metadata = metadata
65 def modify(self, old_contents):
66 children = self.node._unpack_contents(old_contents)
67 children[self.name] = (children[self.name][0], self.metadata)
68 new_contents = self.node._pack_contents(children)
73 def __init__(self, node, entries=None):
77 self.entries = entries
79 def set_node(self, name, node, metadata):
80 self.entries.append( [name, node, metadata] )
82 def modify(self, old_contents):
83 children = self.node._unpack_contents(old_contents)
85 for e in self.entries:
91 name, child, new_metadata = e
92 assert isinstance(name, unicode)
94 metadata = children[name][1].copy()
96 metadata = {"ctime": now,
98 if new_metadata is None:
100 if "ctime" not in metadata:
101 metadata["ctime"] = now
102 metadata["mtime"] = now
105 metadata = new_metadata.copy()
106 children[name] = (child, metadata)
107 new_contents = self.node._pack_contents(children)
110 class NewDirectoryNode:
111 implements(IDirectoryNode)
112 filenode_class = MutableFileNode
114 def __init__(self, client):
115 self._client = client
116 self._most_recent_size = None
119 return "<%s %s %s>" % (self.__class__.__name__, self.is_readonly() and "RO" or "RW", hasattr(self, '_uri') and self._uri.abbrev())
120 def init_from_uri(self, myuri):
121 self._uri = IURI(myuri)
122 self._node = self.filenode_class(self._client)
123 self._node.init_from_uri(self._uri.get_filenode_uri())
126 def create(self, keypair_generator=None):
128 Returns a deferred that eventually fires with self once the directory
129 has been created (distributed across a set of storage servers).
131 # first we create a MutableFileNode with empty_contents, then use its
132 # URI to create our own.
133 self._node = self.filenode_class(self._client)
134 empty_contents = self._pack_contents({})
135 d = self._node.create(empty_contents, keypair_generator)
136 d.addCallback(self._filenode_created)
138 def _filenode_created(self, res):
139 self._uri = NewDirectoryURI(IMutableFileURI(self._node.get_uri()))
143 # return the size of our backing mutable file, in bytes, if we've
145 return self._most_recent_size
147 def _set_size(self, data):
148 self._most_recent_size = len(data)
152 d = self._node.download_best_version()
153 d.addCallback(self._set_size)
154 d.addCallback(self._unpack_contents)
157 def _encrypt_rwcap(self, rwcap):
158 assert isinstance(rwcap, str)
160 key = hashutil.mutable_rwcap_key_hash(IV, self._node.get_writekey())
162 crypttext = cryptor.process(rwcap)
163 mac = hashutil.hmac(key, IV + crypttext)
164 assert len(mac) == 32
165 return IV + crypttext + mac
167 def _decrypt_rwcapdata(self, encwrcap):
169 crypttext = encwrcap[16:-32]
171 key = hashutil.mutable_rwcap_key_hash(IV, self._node.get_writekey())
172 if mac != hashutil.hmac(key, IV+crypttext):
173 raise hashutil.IntegrityCheckError("HMAC does not match, crypttext is corrupted")
175 plaintext = cryptor.process(crypttext)
178 def _create_node(self, child_uri):
179 return self._client.create_node_from_uri(child_uri)
181 def _unpack_contents(self, data):
182 # the directory is serialized as a list of netstrings, one per child.
183 # Each child is serialized as a list of four netstrings: (name,
184 # rocap, rwcap, metadata), in which the name,rocap,metadata are in
185 # cleartext. The 'name' is UTF-8 encoded. The rwcap is formatted as:
186 # pack("16ss32s", iv, AES(H(writekey+iv), plaintextrwcap), mac)
187 assert isinstance(data, str)
188 # an empty directory is serialized as an empty string
191 writeable = not self.is_readonly()
194 entry, data = split_netstring(data, 1, True)
195 name, rocap, rwcapdata, metadata_s = split_netstring(entry, 4)
196 name = name.decode("utf-8")
198 rwcap = self._decrypt_rwcapdata(rwcapdata)
199 child = self._create_node(rwcap)
201 child = self._create_node(rocap)
202 metadata = simplejson.loads(metadata_s)
203 assert isinstance(metadata, dict)
204 children[name] = (child, metadata)
207 def _pack_contents(self, children):
208 # expects children in the same format as _unpack_contents
209 assert isinstance(children, dict)
211 for name in sorted(children.keys()):
212 child, metadata = children[name]
213 assert isinstance(name, unicode)
214 assert (IFileNode.providedBy(child)
215 or IMutableFileNode.providedBy(child)
216 or IDirectoryNode.providedBy(child)), (name,child)
217 assert isinstance(metadata, dict)
218 rwcap = child.get_uri() # might be RO if the child is not writeable
219 rocap = child.get_readonly_uri()
220 entry = "".join([netstring(name.encode("utf-8")),
222 netstring(self._encrypt_rwcap(rwcap)),
223 netstring(simplejson.dumps(metadata))])
224 entries.append(netstring(entry))
225 return "".join(entries)
227 def is_readonly(self):
228 return self._node.is_readonly()
229 def is_mutable(self):
230 return self._node.is_mutable()
233 return self._uri.to_string()
235 def get_readonly_uri(self):
236 return self._uri.get_readonly().to_string()
238 def get_verifier(self):
239 return self._uri.get_verifier().to_string()
242 """Perform a file check. See IChecker.check for details."""
243 return defer.succeed(None) # TODO
246 """I return a Deferred that fires with a dictionary mapping child
247 name to a tuple of (IFileNode or IDirectoryNode, metadata)."""
250 def has_child(self, name):
251 """I return a Deferred that fires with a boolean, True if there
252 exists a child of the given name, False if not."""
253 assert isinstance(name, unicode)
255 d.addCallback(lambda children: children.has_key(name))
258 def _get(self, children, name):
259 child = children.get(name)
265 """I return a Deferred that fires with the named child node,
266 which is either an IFileNode or an IDirectoryNode."""
267 assert isinstance(name, unicode)
269 d.addCallback(self._get, name)
272 def get_metadata_for(self, name):
273 assert isinstance(name, unicode)
275 d.addCallback(lambda children: children[name][1])
278 def set_metadata_for(self, name, metadata):
279 assert isinstance(name, unicode)
280 if self.is_readonly():
281 return defer.fail(NotMutableError())
282 assert isinstance(metadata, dict)
283 s = MetadataSetter(self, name, metadata)
284 d = self._node.modify(s.modify)
285 d.addCallback(lambda res: self)
288 def get_child_at_path(self, path):
289 """Transform a child path into an IDirectoryNode or IFileNode.
291 I perform a recursive series of 'get' operations to find the named
292 descendant node. I return a Deferred that fires with the node, or
293 errbacks with IndexError if the node could not be found.
295 The path can be either a single string (slash-separated) or a list of
300 return defer.succeed(self)
301 if isinstance(path, (list, tuple)):
304 path = path.split("/")
306 assert isinstance(p, unicode)
308 remaining_path = path[1:]
309 d = self.get(childname)
312 return node.get_child_at_path(remaining_path)
316 def set_uri(self, name, child_uri, metadata=None):
317 """I add a child (by URI) at the specific name. I return a Deferred
318 that fires with the child node when the operation finishes. I will
319 replace any existing child of the same name.
321 The child_uri could be for a file, or for a directory (either
322 read-write or read-only, using a URI that came from get_uri() ).
324 If this directory node is read-only, the Deferred will errback with a
326 assert isinstance(name, unicode)
327 child_node = self._create_node(child_uri)
328 d = self.set_node(name, child_node, metadata)
329 d.addCallback(lambda res: child_node)
332 def set_children(self, entries):
342 name, child_uri, metadata = e
343 assert isinstance(name, unicode)
344 a.set_node(name, self._create_node(child_uri), metadata)
345 return self._node.modify(a.modify)
347 def set_node(self, name, child, metadata=None):
348 """I add a child at the specific name. I return a Deferred that fires
349 when the operation finishes. This Deferred will fire with the child
350 node that was just added. I will replace any existing child of the
353 If this directory node is read-only, the Deferred will errback with a
356 if self.is_readonly():
357 return defer.fail(NotMutableError())
358 assert isinstance(name, unicode)
359 assert IFilesystemNode.providedBy(child), child
361 a.set_node(name, child, metadata)
362 d = self._node.modify(a.modify)
363 d.addCallback(lambda res: child)
366 def set_nodes(self, entries):
367 if self.is_readonly():
368 return defer.fail(NotMutableError())
369 a = Adder(self, entries)
370 d = self._node.modify(a.modify)
371 d.addCallback(lambda res: None)
375 def add_file(self, name, uploadable, metadata=None):
376 """I upload a file (using the given IUploadable), then attach the
377 resulting FileNode to the directory at the given name. I return a
378 Deferred that fires (with the IFileNode of the uploaded file) when
379 the operation completes."""
380 assert isinstance(name, unicode)
381 if self.is_readonly():
382 return defer.fail(NotMutableError())
383 d = self._client.upload(uploadable)
384 d.addCallback(lambda results: results.uri)
385 d.addCallback(self._client.create_node_from_uri)
386 d.addCallback(lambda node: self.set_node(name, node, metadata))
389 def delete(self, name):
390 """I remove the child at the specific name. I return a Deferred that
391 fires (with the node just removed) when the operation finishes."""
392 assert isinstance(name, unicode)
393 if self.is_readonly():
394 return defer.fail(NotMutableError())
395 deleter = Deleter(self, name)
396 d = self._node.modify(deleter.modify)
397 d.addCallback(lambda res: deleter.old_child)
400 def create_empty_directory(self, name):
401 """I create and attach an empty directory at the given name. I return
402 a Deferred that fires (with the new directory node) when the
403 operation finishes."""
404 assert isinstance(name, unicode)
405 if self.is_readonly():
406 return defer.fail(NotMutableError())
407 d = self._client.create_empty_dirnode()
409 entries = [(name, child, None)]
410 a = Adder(self, entries)
411 d = self._node.modify(a.modify)
412 d.addCallback(lambda res: child)
414 d.addCallback(_created)
417 def move_child_to(self, current_child_name, new_parent,
418 new_child_name=None):
419 """I take one of my children and move them to a new parent. The child
420 is referenced by name. On the new parent, the child will live under
421 'new_child_name', which defaults to 'current_child_name'. I return a
422 Deferred that fires when the operation finishes."""
423 assert isinstance(current_child_name, unicode)
424 if self.is_readonly() or new_parent.is_readonly():
425 return defer.fail(NotMutableError())
426 if new_child_name is None:
427 new_child_name = current_child_name
428 assert isinstance(new_child_name, unicode)
429 d = self.get(current_child_name)
431 return new_parent.set_node(new_child_name, child)
433 d.addCallback(lambda child: self.delete(current_child_name))
436 def build_manifest(self):
437 """Return a frozenset of verifier-capability strings for all nodes
438 (directories and files) reachable from this one."""
440 # this is just a tree-walker, except that following each edge
441 # requires a Deferred. We use a ConcurrencyLimiter to make sure the
442 # fan-out doesn't cause problems.
445 manifest.add(self.get_verifier())
446 limiter = ConcurrencyLimiter(10) # allow 10 in parallel
448 d = self._build_manifest_from_node(self, manifest, limiter)
450 # LIT nodes have no verifier-capability: their data is stored
451 # inside the URI itself, so there is no need to refresh anything.
452 # They indicate this by returning None from their get_verifier
453 # method. We need to remove any such Nones from our set. We also
454 # want to convert all these caps into strings.
455 return frozenset([IVerifierURI(cap).to_string()
461 def _build_manifest_from_node(self, node, manifest, limiter):
462 d = limiter.add(node.list)
465 for name, (child, metadata) in res.iteritems():
466 verifier = child.get_verifier()
467 if verifier not in manifest:
468 manifest.add(verifier)
469 if IDirectoryNode.providedBy(child):
470 dl.append(self._build_manifest_from_node(child,
474 return defer.DeferredList(dl)
475 d.addCallback(_got_list)
478 def deep_stats(self):
480 # we track verifier caps, to avoid double-counting children for which
481 # we've got both a write-cap and a read-cap
483 found.add(self.get_verifier())
485 limiter = ConcurrencyLimiter(10)
487 d = self._add_deepstats_from_node(self, found, stats, limiter)
488 d.addCallback(lambda res: stats.get_results())
491 def _add_deepstats_from_node(self, node, found, stats, limiter):
492 d = limiter.add(node.list)
493 def _got_list(children):
495 dirsize_bytes = node.get_size()
496 dirsize_children = len(children)
497 stats.add("count-directories")
498 stats.add("size-directories", dirsize_bytes)
499 stats.max("largest-directory", dirsize_bytes)
500 stats.max("largest-directory-children", dirsize_children)
501 for name, (child, metadata) in children.iteritems():
502 verifier = child.get_verifier()
503 if verifier in found:
506 if IDirectoryNode.providedBy(child):
507 dl.append(self._add_deepstats_from_node(child, found,
509 elif IMutableFileNode.providedBy(child):
510 stats.add("count-files")
511 stats.add("count-mutable-files")
512 # TODO: update the servermap, compute a size, add it to
513 # size-mutable-files, max it into "largest-mutable-file"
514 elif IFileNode.providedBy(child): # CHK and LIT
515 stats.add("count-files")
516 size = child.get_size()
517 if child.get_uri().startswith("URI:LIT:"):
518 stats.add("count-literal-files")
519 stats.add("size-literal-files", size)
521 stats.add("count-immutable-files")
522 stats.add("size-immutable-files", size)
523 stats.max("largest-immutable-file", size)
525 return defer.DeferredList(dl)
526 d.addCallback(_got_list)
532 for k in ["count-immutable-files",
533 "count-mutable-files",
534 "count-literal-files",
537 "size-immutable-files",
538 #"size-mutable-files",
539 "size-literal-files",
542 "largest-directory-children",
543 "largest-immutable-file",
544 #"largest-mutable-file",
548 def add(self, key, value=1):
549 self.stats[key] += value
551 def max(self, key, value):
552 self.stats[key] = max(self.stats[key], value)
554 def get_results(self):
558 # use client.create_dirnode() to make one of these