]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/dirnode.py
dirnode: refactor deep-stats a bit
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / dirnode.py
1
2 import os, time
3
4 from zope.interface import implements
5 from twisted.internet import defer
6 import simplejson
7 from allmydata.mutable.common import NotMutableError
8 from allmydata.mutable.node import MutableFileNode
9 from allmydata.interfaces import IMutableFileNode, IDirectoryNode,\
10      IURI, IFileNode, IMutableFileURI, IVerifierURI, IFilesystemNode
11 from allmydata.util import hashutil
12 from allmydata.util.hashutil import netstring
13 from allmydata.util.limiter import ConcurrencyLimiter
14 from allmydata.uri import NewDirectoryURI
15 from pycryptopp.cipher.aes import AES
16
17 def split_netstring(data, numstrings, allow_leftover=False):
18     """like string.split(), but extracts netstrings. If allow_leftover=False,
19     returns numstrings elements, and throws ValueError if there was leftover
20     data. If allow_leftover=True, returns numstrings+1 elements, in which the
21     last element is the leftover data (possibly an empty string)"""
22     elements = []
23     assert numstrings >= 0
24     while data:
25         colon = data.index(":")
26         length = int(data[:colon])
27         string = data[colon+1:colon+1+length]
28         assert len(string) == length
29         elements.append(string)
30         assert data[colon+1+length] == ","
31         data = data[colon+1+length+1:]
32         if len(elements) == numstrings:
33             break
34     if len(elements) < numstrings:
35         raise ValueError("ran out of netstrings")
36     if allow_leftover:
37         return tuple(elements + [data])
38     if data:
39         raise ValueError("leftover data in netstrings")
40     return tuple(elements)
41
42 class Deleter:
43     def __init__(self, node, name, must_exist=True):
44         self.node = node
45         self.name = name
46         self.must_exist = True
47     def modify(self, old_contents):
48         children = self.node._unpack_contents(old_contents)
49         if self.name not in children:
50             if self.must_exist:
51                 raise KeyError(self.name)
52             self.old_child = None
53             return None
54         self.old_child, metadata = children[self.name]
55         del children[self.name]
56         new_contents = self.node._pack_contents(children)
57         return new_contents
58
59 class MetadataSetter:
60     def __init__(self, node, name, metadata):
61         self.node = node
62         self.name = name
63         self.metadata = metadata
64
65     def modify(self, old_contents):
66         children = self.node._unpack_contents(old_contents)
67         children[self.name] = (children[self.name][0], self.metadata)
68         new_contents = self.node._pack_contents(children)
69         return new_contents
70
71
72 class Adder:
73     def __init__(self, node, entries=None):
74         self.node = node
75         if entries is None:
76             entries = []
77         self.entries = entries
78
79     def set_node(self, name, node, metadata):
80         self.entries.append( [name, node, metadata] )
81
82     def modify(self, old_contents):
83         children = self.node._unpack_contents(old_contents)
84         now = time.time()
85         for e in self.entries:
86             if len(e) == 2:
87                 name, child = e
88                 new_metadata = None
89             else:
90                 assert len(e) == 3
91                 name, child, new_metadata = e
92             assert isinstance(name, unicode)
93             if name in children:
94                 metadata = children[name][1].copy()
95             else:
96                 metadata = {"ctime": now,
97                             "mtime": now}
98             if new_metadata is None:
99                 # update timestamps
100                 if "ctime" not in metadata:
101                     metadata["ctime"] = now
102                 metadata["mtime"] = now
103             else:
104                 # just replace it
105                 metadata = new_metadata.copy()
106             children[name] = (child, metadata)
107         new_contents = self.node._pack_contents(children)
108         return new_contents
109
110 class NewDirectoryNode:
111     implements(IDirectoryNode)
112     filenode_class = MutableFileNode
113
114     def __init__(self, client):
115         self._client = client
116         self._most_recent_size = None
117
118     def __repr__(self):
119         return "<%s %s %s>" % (self.__class__.__name__, self.is_readonly() and "RO" or "RW", hasattr(self, '_uri') and self._uri.abbrev())
120     def init_from_uri(self, myuri):
121         self._uri = IURI(myuri)
122         self._node = self.filenode_class(self._client)
123         self._node.init_from_uri(self._uri.get_filenode_uri())
124         return self
125
126     def create(self, keypair_generator=None):
127         """
128         Returns a deferred that eventually fires with self once the directory
129         has been created (distributed across a set of storage servers).
130         """
131         # first we create a MutableFileNode with empty_contents, then use its
132         # URI to create our own.
133         self._node = self.filenode_class(self._client)
134         empty_contents = self._pack_contents({})
135         d = self._node.create(empty_contents, keypair_generator)
136         d.addCallback(self._filenode_created)
137         return d
138     def _filenode_created(self, res):
139         self._uri = NewDirectoryURI(IMutableFileURI(self._node.get_uri()))
140         return self
141
142     def get_size(self):
143         # return the size of our backing mutable file, in bytes, if we've
144         # fetched it.
145         return self._most_recent_size
146
147     def _set_size(self, data):
148         self._most_recent_size = len(data)
149         return data
150
151     def _read(self):
152         d = self._node.download_best_version()
153         d.addCallback(self._set_size)
154         d.addCallback(self._unpack_contents)
155         return d
156
157     def _encrypt_rwcap(self, rwcap):
158         assert isinstance(rwcap, str)
159         IV = os.urandom(16)
160         key = hashutil.mutable_rwcap_key_hash(IV, self._node.get_writekey())
161         cryptor = AES(key)
162         crypttext = cryptor.process(rwcap)
163         mac = hashutil.hmac(key, IV + crypttext)
164         assert len(mac) == 32
165         return IV + crypttext + mac
166
167     def _decrypt_rwcapdata(self, encwrcap):
168         IV = encwrcap[:16]
169         crypttext = encwrcap[16:-32]
170         mac = encwrcap[-32:]
171         key = hashutil.mutable_rwcap_key_hash(IV, self._node.get_writekey())
172         if mac != hashutil.hmac(key, IV+crypttext):
173             raise hashutil.IntegrityCheckError("HMAC does not match, crypttext is corrupted")
174         cryptor = AES(key)
175         plaintext = cryptor.process(crypttext)
176         return plaintext
177
178     def _create_node(self, child_uri):
179         return self._client.create_node_from_uri(child_uri)
180
181     def _unpack_contents(self, data):
182         # the directory is serialized as a list of netstrings, one per child.
183         # Each child is serialized as a list of four netstrings: (name,
184         # rocap, rwcap, metadata), in which the name,rocap,metadata are in
185         # cleartext. The 'name' is UTF-8 encoded. The rwcap is formatted as:
186         # pack("16ss32s", iv, AES(H(writekey+iv), plaintextrwcap), mac)
187         assert isinstance(data, str)
188         # an empty directory is serialized as an empty string
189         if data == "":
190             return {}
191         writeable = not self.is_readonly()
192         children = {}
193         while len(data) > 0:
194             entry, data = split_netstring(data, 1, True)
195             name, rocap, rwcapdata, metadata_s = split_netstring(entry, 4)
196             name = name.decode("utf-8")
197             if writeable:
198                 rwcap = self._decrypt_rwcapdata(rwcapdata)
199                 child = self._create_node(rwcap)
200             else:
201                 child = self._create_node(rocap)
202             metadata = simplejson.loads(metadata_s)
203             assert isinstance(metadata, dict)
204             children[name] = (child, metadata)
205         return children
206
207     def _pack_contents(self, children):
208         # expects children in the same format as _unpack_contents
209         assert isinstance(children, dict)
210         entries = []
211         for name in sorted(children.keys()):
212             child, metadata = children[name]
213             assert isinstance(name, unicode)
214             assert (IFileNode.providedBy(child)
215                     or IMutableFileNode.providedBy(child)
216                     or IDirectoryNode.providedBy(child)), (name,child)
217             assert isinstance(metadata, dict)
218             rwcap = child.get_uri() # might be RO if the child is not writeable
219             rocap = child.get_readonly_uri()
220             entry = "".join([netstring(name.encode("utf-8")),
221                              netstring(rocap),
222                              netstring(self._encrypt_rwcap(rwcap)),
223                              netstring(simplejson.dumps(metadata))])
224             entries.append(netstring(entry))
225         return "".join(entries)
226
227     def is_readonly(self):
228         return self._node.is_readonly()
229     def is_mutable(self):
230         return self._node.is_mutable()
231
232     def get_uri(self):
233         return self._uri.to_string()
234
235     def get_readonly_uri(self):
236         return self._uri.get_readonly().to_string()
237
238     def get_verifier(self):
239         return self._uri.get_verifier().to_string()
240
241     def check(self):
242         """Perform a file check. See IChecker.check for details."""
243         return defer.succeed(None) # TODO
244
245     def list(self):
246         """I return a Deferred that fires with a dictionary mapping child
247         name to a tuple of (IFileNode or IDirectoryNode, metadata)."""
248         return self._read()
249
250     def has_child(self, name):
251         """I return a Deferred that fires with a boolean, True if there
252         exists a child of the given name, False if not."""
253         assert isinstance(name, unicode)
254         d = self._read()
255         d.addCallback(lambda children: children.has_key(name))
256         return d
257
258     def _get(self, children, name):
259         child = children.get(name)
260         if child is None:
261             raise KeyError(name)
262         return child[0]
263
264     def get(self, name):
265         """I return a Deferred that fires with the named child node,
266         which is either an IFileNode or an IDirectoryNode."""
267         assert isinstance(name, unicode)
268         d = self._read()
269         d.addCallback(self._get, name)
270         return d
271
272     def get_metadata_for(self, name):
273         assert isinstance(name, unicode)
274         d = self._read()
275         d.addCallback(lambda children: children[name][1])
276         return d
277
278     def set_metadata_for(self, name, metadata):
279         assert isinstance(name, unicode)
280         if self.is_readonly():
281             return defer.fail(NotMutableError())
282         assert isinstance(metadata, dict)
283         s = MetadataSetter(self, name, metadata)
284         d = self._node.modify(s.modify)
285         d.addCallback(lambda res: self)
286         return d
287
288     def get_child_at_path(self, path):
289         """Transform a child path into an IDirectoryNode or IFileNode.
290
291         I perform a recursive series of 'get' operations to find the named
292         descendant node. I return a Deferred that fires with the node, or
293         errbacks with IndexError if the node could not be found.
294
295         The path can be either a single string (slash-separated) or a list of
296         path-name elements.
297         """
298
299         if not path:
300             return defer.succeed(self)
301         if isinstance(path, (list, tuple)):
302             pass
303         else:
304             path = path.split("/")
305         for p in path:
306             assert isinstance(p, unicode)
307         childname = path[0]
308         remaining_path = path[1:]
309         d = self.get(childname)
310         if remaining_path:
311             def _got(node):
312                 return node.get_child_at_path(remaining_path)
313             d.addCallback(_got)
314         return d
315
316     def set_uri(self, name, child_uri, metadata=None):
317         """I add a child (by URI) at the specific name. I return a Deferred
318         that fires with the child node when the operation finishes. I will
319         replace any existing child of the same name.
320
321         The child_uri could be for a file, or for a directory (either
322         read-write or read-only, using a URI that came from get_uri() ).
323
324         If this directory node is read-only, the Deferred will errback with a
325         NotMutableError."""
326         assert isinstance(name, unicode)
327         child_node = self._create_node(child_uri)
328         d = self.set_node(name, child_node, metadata)
329         d.addCallback(lambda res: child_node)
330         return d
331
332     def set_children(self, entries):
333         # this takes URIs
334         a = Adder(self)
335         node_entries = []
336         for e in entries:
337             if len(e) == 2:
338                 name, child_uri = e
339                 metadata = None
340             else:
341                 assert len(e) == 3
342                 name, child_uri, metadata = e
343             assert isinstance(name, unicode)
344             a.set_node(name, self._create_node(child_uri), metadata)
345         return self._node.modify(a.modify)
346
347     def set_node(self, name, child, metadata=None):
348         """I add a child at the specific name. I return a Deferred that fires
349         when the operation finishes. This Deferred will fire with the child
350         node that was just added. I will replace any existing child of the
351         same name.
352
353         If this directory node is read-only, the Deferred will errback with a
354         NotMutableError."""
355
356         if self.is_readonly():
357             return defer.fail(NotMutableError())
358         assert isinstance(name, unicode)
359         assert IFilesystemNode.providedBy(child), child
360         a = Adder(self)
361         a.set_node(name, child, metadata)
362         d = self._node.modify(a.modify)
363         d.addCallback(lambda res: child)
364         return d
365
366     def set_nodes(self, entries):
367         if self.is_readonly():
368             return defer.fail(NotMutableError())
369         a = Adder(self, entries)
370         d = self._node.modify(a.modify)
371         d.addCallback(lambda res: None)
372         return d
373
374
375     def add_file(self, name, uploadable, metadata=None):
376         """I upload a file (using the given IUploadable), then attach the
377         resulting FileNode to the directory at the given name. I return a
378         Deferred that fires (with the IFileNode of the uploaded file) when
379         the operation completes."""
380         assert isinstance(name, unicode)
381         if self.is_readonly():
382             return defer.fail(NotMutableError())
383         d = self._client.upload(uploadable)
384         d.addCallback(lambda results: results.uri)
385         d.addCallback(self._client.create_node_from_uri)
386         d.addCallback(lambda node: self.set_node(name, node, metadata))
387         return d
388
389     def delete(self, name):
390         """I remove the child at the specific name. I return a Deferred that
391         fires (with the node just removed) when the operation finishes."""
392         assert isinstance(name, unicode)
393         if self.is_readonly():
394             return defer.fail(NotMutableError())
395         deleter = Deleter(self, name)
396         d = self._node.modify(deleter.modify)
397         d.addCallback(lambda res: deleter.old_child)
398         return d
399
400     def create_empty_directory(self, name):
401         """I create and attach an empty directory at the given name. I return
402         a Deferred that fires (with the new directory node) when the
403         operation finishes."""
404         assert isinstance(name, unicode)
405         if self.is_readonly():
406             return defer.fail(NotMutableError())
407         d = self._client.create_empty_dirnode()
408         def _created(child):
409             entries = [(name, child, None)]
410             a = Adder(self, entries)
411             d = self._node.modify(a.modify)
412             d.addCallback(lambda res: child)
413             return d
414         d.addCallback(_created)
415         return d
416
417     def move_child_to(self, current_child_name, new_parent,
418                       new_child_name=None):
419         """I take one of my children and move them to a new parent. The child
420         is referenced by name. On the new parent, the child will live under
421         'new_child_name', which defaults to 'current_child_name'. I return a
422         Deferred that fires when the operation finishes."""
423         assert isinstance(current_child_name, unicode)
424         if self.is_readonly() or new_parent.is_readonly():
425             return defer.fail(NotMutableError())
426         if new_child_name is None:
427             new_child_name = current_child_name
428         assert isinstance(new_child_name, unicode)
429         d = self.get(current_child_name)
430         def sn(child):
431             return new_parent.set_node(new_child_name, child)
432         d.addCallback(sn)
433         d.addCallback(lambda child: self.delete(current_child_name))
434         return d
435
436     def build_manifest(self):
437         """Return a frozenset of verifier-capability strings for all nodes
438         (directories and files) reachable from this one."""
439
440         # this is just a tree-walker, except that following each edge
441         # requires a Deferred. We use a ConcurrencyLimiter to make sure the
442         # fan-out doesn't cause problems.
443
444         manifest = set()
445         manifest.add(self.get_verifier())
446         limiter = ConcurrencyLimiter(10) # allow 10 in parallel
447
448         d = self._build_manifest_from_node(self, manifest, limiter)
449         def _done(res):
450             # LIT nodes have no verifier-capability: their data is stored
451             # inside the URI itself, so there is no need to refresh anything.
452             # They indicate this by returning None from their get_verifier
453             # method. We need to remove any such Nones from our set. We also
454             # want to convert all these caps into strings.
455             return frozenset([IVerifierURI(cap).to_string()
456                               for cap in manifest
457                               if cap is not None])
458         d.addCallback(_done)
459         return d
460
461     def _build_manifest_from_node(self, node, manifest, limiter):
462         d = limiter.add(node.list)
463         def _got_list(res):
464             dl = []
465             for name, (child, metadata) in res.iteritems():
466                 verifier = child.get_verifier()
467                 if verifier not in manifest:
468                     manifest.add(verifier)
469                     if IDirectoryNode.providedBy(child):
470                         dl.append(self._build_manifest_from_node(child,
471                                                                  manifest,
472                                                                  limiter))
473             if dl:
474                 return defer.DeferredList(dl)
475         d.addCallback(_got_list)
476         return d
477
478     def deep_stats(self):
479         stats = DeepStats()
480         # we track verifier caps, to avoid double-counting children for which
481         # we've got both a write-cap and a read-cap
482         found = set()
483         found.add(self.get_verifier())
484
485         limiter = ConcurrencyLimiter(10)
486
487         d = self._add_deepstats_from_node(self, found, stats, limiter)
488         d.addCallback(lambda res: stats.get_results())
489         return d
490
491     def _add_deepstats_from_node(self, node, found, stats, limiter):
492         d = limiter.add(node.list)
493         def _got_list(children):
494             dl = []
495             dirsize_bytes = node.get_size()
496             dirsize_children = len(children)
497             stats.add("count-directories")
498             stats.add("size-directories", dirsize_bytes)
499             stats.max("largest-directory", dirsize_bytes)
500             stats.max("largest-directory-children", dirsize_children)
501             for name, (child, metadata) in children.iteritems():
502                 verifier = child.get_verifier()
503                 if verifier in found:
504                     continue
505                 found.add(verifier)
506                 if IDirectoryNode.providedBy(child):
507                     dl.append(self._add_deepstats_from_node(child, found,
508                                                             stats, limiter))
509                 elif IMutableFileNode.providedBy(child):
510                     stats.add("count-files")
511                     stats.add("count-mutable-files")
512                     # TODO: update the servermap, compute a size, add it to
513                     # size-mutable-files, max it into "largest-mutable-file"
514                 elif IFileNode.providedBy(child): # CHK and LIT
515                     stats.add("count-files")
516                     size = child.get_size()
517                     if child.get_uri().startswith("URI:LIT:"):
518                         stats.add("count-literal-files")
519                         stats.add("size-literal-files", size)
520                     else:
521                         stats.add("count-immutable-files")
522                         stats.add("size-immutable-files", size)
523                         stats.max("largest-immutable-file", size)
524             if dl:
525                 return defer.DeferredList(dl)
526         d.addCallback(_got_list)
527         return d
528
529 class DeepStats:
530     def __init__(self):
531         self.stats = {}
532         for k in ["count-immutable-files",
533                   "count-mutable-files",
534                   "count-literal-files",
535                   "count-files",
536                   "count-directories",
537                   "size-immutable-files",
538                   #"size-mutable-files",
539                   "size-literal-files",
540                   "size-directories",
541                   "largest-directory",
542                   "largest-directory-children",
543                   "largest-immutable-file",
544                   #"largest-mutable-file",
545                   ]:
546             self.stats[k] = 0
547
548     def add(self, key, value=1):
549         self.stats[key] += value
550
551     def max(self, key, value):
552         self.stats[key] = max(self.stats[key], value)
553
554     def get_results(self):
555         return self.stats
556
557
558 # use client.create_dirnode() to make one of these
559
560