]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/dirnode.py
dirnode: cleanup, make get_verifier() always return a URI instance, not a string
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / dirnode.py
1
2 import os, time, math
3
4 from zope.interface import implements
5 from twisted.internet import defer
6 import simplejson
7 from allmydata.mutable.common import NotMutableError
8 from allmydata.mutable.node import MutableFileNode
9 from allmydata.interfaces import IMutableFileNode, IDirectoryNode,\
10      IURI, IFileNode, IMutableFileURI, IVerifierURI, IFilesystemNode, \
11      ExistingChildError, ICheckable
12 from allmydata.checker_results import DeepCheckResults, \
13      DeepCheckAndRepairResults
14 from allmydata.util import hashutil, mathutil, base32, log
15 from allmydata.util.hashutil import netstring
16 from allmydata.util.limiter import ConcurrencyLimiter
17 from allmydata.uri import NewDirectoryURI
18 from pycryptopp.cipher.aes import AES
19
20 def split_netstring(data, numstrings, allow_leftover=False):
21     """like string.split(), but extracts netstrings. If allow_leftover=False,
22     returns numstrings elements, and throws ValueError if there was leftover
23     data. If allow_leftover=True, returns numstrings+1 elements, in which the
24     last element is the leftover data (possibly an empty string)"""
25     elements = []
26     assert numstrings >= 0
27     while data:
28         colon = data.index(":")
29         length = int(data[:colon])
30         string = data[colon+1:colon+1+length]
31         assert len(string) == length
32         elements.append(string)
33         assert data[colon+1+length] == ","
34         data = data[colon+1+length+1:]
35         if len(elements) == numstrings:
36             break
37     if len(elements) < numstrings:
38         raise ValueError("ran out of netstrings")
39     if allow_leftover:
40         return tuple(elements + [data])
41     if data:
42         raise ValueError("leftover data in netstrings")
43     return tuple(elements)
44
45 class Deleter:
46     def __init__(self, node, name, must_exist=True):
47         self.node = node
48         self.name = name
49         self.must_exist = True
50     def modify(self, old_contents):
51         children = self.node._unpack_contents(old_contents)
52         if self.name not in children:
53             if self.must_exist:
54                 raise KeyError(self.name)
55             self.old_child = None
56             return None
57         self.old_child, metadata = children[self.name]
58         del children[self.name]
59         new_contents = self.node._pack_contents(children)
60         return new_contents
61
62 class MetadataSetter:
63     def __init__(self, node, name, metadata):
64         self.node = node
65         self.name = name
66         self.metadata = metadata
67
68     def modify(self, old_contents):
69         children = self.node._unpack_contents(old_contents)
70         children[self.name] = (children[self.name][0], self.metadata)
71         new_contents = self.node._pack_contents(children)
72         return new_contents
73
74
75 class Adder:
76     def __init__(self, node, entries=None, overwrite=True):
77         self.node = node
78         if entries is None:
79             entries = []
80         self.entries = entries
81         self.overwrite = overwrite
82
83     def set_node(self, name, node, metadata):
84         self.entries.append( [name, node, metadata] )
85
86     def modify(self, old_contents):
87         children = self.node._unpack_contents(old_contents)
88         now = time.time()
89         for e in self.entries:
90             if len(e) == 2:
91                 name, child = e
92                 new_metadata = None
93             else:
94                 assert len(e) == 3
95                 name, child, new_metadata = e
96             assert isinstance(name, unicode)
97             if name in children:
98                 if not self.overwrite:
99                     raise ExistingChildError("child '%s' already exists" % name)
100                 metadata = children[name][1].copy()
101             else:
102                 metadata = {"ctime": now,
103                             "mtime": now}
104             if new_metadata is None:
105                 # update timestamps
106                 if "ctime" not in metadata:
107                     metadata["ctime"] = now
108                 metadata["mtime"] = now
109             else:
110                 # just replace it
111                 metadata = new_metadata.copy()
112             children[name] = (child, metadata)
113         new_contents = self.node._pack_contents(children)
114         return new_contents
115
116 class NewDirectoryNode:
117     implements(IDirectoryNode, ICheckable)
118     filenode_class = MutableFileNode
119
120     def __init__(self, client):
121         self._client = client
122         self._most_recent_size = None
123
124     def __repr__(self):
125         return "<%s %s %s>" % (self.__class__.__name__, self.is_readonly() and "RO" or "RW", hasattr(self, '_uri') and self._uri.abbrev())
126     def init_from_uri(self, myuri):
127         self._uri = IURI(myuri)
128         self._node = self.filenode_class(self._client)
129         self._node.init_from_uri(self._uri.get_filenode_uri())
130         return self
131
132     def create(self, keypair_generator=None):
133         """
134         Returns a deferred that eventually fires with self once the directory
135         has been created (distributed across a set of storage servers).
136         """
137         # first we create a MutableFileNode with empty_contents, then use its
138         # URI to create our own.
139         self._node = self.filenode_class(self._client)
140         empty_contents = self._pack_contents({})
141         d = self._node.create(empty_contents, keypair_generator)
142         d.addCallback(self._filenode_created)
143         return d
144     def _filenode_created(self, res):
145         self._uri = NewDirectoryURI(IMutableFileURI(self._node.get_uri()))
146         return self
147
148     def get_size(self):
149         # return the size of our backing mutable file, in bytes, if we've
150         # fetched it.
151         return self._most_recent_size
152
153     def _set_size(self, data):
154         self._most_recent_size = len(data)
155         return data
156
157     def _read(self):
158         d = self._node.download_best_version()
159         d.addCallback(self._set_size)
160         d.addCallback(self._unpack_contents)
161         return d
162
163     def _encrypt_rwcap(self, rwcap):
164         assert isinstance(rwcap, str)
165         IV = os.urandom(16)
166         key = hashutil.mutable_rwcap_key_hash(IV, self._node.get_writekey())
167         cryptor = AES(key)
168         crypttext = cryptor.process(rwcap)
169         mac = hashutil.hmac(key, IV + crypttext)
170         assert len(mac) == 32
171         return IV + crypttext + mac
172
173     def _decrypt_rwcapdata(self, encwrcap):
174         IV = encwrcap[:16]
175         crypttext = encwrcap[16:-32]
176         mac = encwrcap[-32:]
177         key = hashutil.mutable_rwcap_key_hash(IV, self._node.get_writekey())
178         if mac != hashutil.hmac(key, IV+crypttext):
179             raise hashutil.IntegrityCheckError("HMAC does not match, crypttext is corrupted")
180         cryptor = AES(key)
181         plaintext = cryptor.process(crypttext)
182         return plaintext
183
184     def _create_node(self, child_uri):
185         return self._client.create_node_from_uri(child_uri)
186
187     def _unpack_contents(self, data):
188         # the directory is serialized as a list of netstrings, one per child.
189         # Each child is serialized as a list of four netstrings: (name,
190         # rocap, rwcap, metadata), in which the name,rocap,metadata are in
191         # cleartext. The 'name' is UTF-8 encoded. The rwcap is formatted as:
192         # pack("16ss32s", iv, AES(H(writekey+iv), plaintextrwcap), mac)
193         assert isinstance(data, str)
194         # an empty directory is serialized as an empty string
195         if data == "":
196             return {}
197         writeable = not self.is_readonly()
198         children = {}
199         while len(data) > 0:
200             entry, data = split_netstring(data, 1, True)
201             name, rocap, rwcapdata, metadata_s = split_netstring(entry, 4)
202             name = name.decode("utf-8")
203             if writeable:
204                 rwcap = self._decrypt_rwcapdata(rwcapdata)
205                 child = self._create_node(rwcap)
206             else:
207                 child = self._create_node(rocap)
208             metadata = simplejson.loads(metadata_s)
209             assert isinstance(metadata, dict)
210             children[name] = (child, metadata)
211         return children
212
213     def _pack_contents(self, children):
214         # expects children in the same format as _unpack_contents
215         assert isinstance(children, dict)
216         entries = []
217         for name in sorted(children.keys()):
218             child, metadata = children[name]
219             assert isinstance(name, unicode)
220             assert (IFileNode.providedBy(child)
221                     or IMutableFileNode.providedBy(child)
222                     or IDirectoryNode.providedBy(child)), (name,child)
223             assert isinstance(metadata, dict)
224             rwcap = child.get_uri() # might be RO if the child is not writeable
225             rocap = child.get_readonly_uri()
226             entry = "".join([netstring(name.encode("utf-8")),
227                              netstring(rocap),
228                              netstring(self._encrypt_rwcap(rwcap)),
229                              netstring(simplejson.dumps(metadata))])
230             entries.append(netstring(entry))
231         return "".join(entries)
232
233     def is_readonly(self):
234         return self._node.is_readonly()
235     def is_mutable(self):
236         return self._node.is_mutable()
237
238     def get_uri(self):
239         return self._uri.to_string()
240
241     def get_readonly_uri(self):
242         return self._uri.get_readonly().to_string()
243
244     def get_verifier(self):
245         return self._uri.get_verifier()
246
247     def get_storage_index(self):
248         return self._uri._filenode_uri.storage_index
249
250     def check(self, verify=False):
251         """Perform a file check. See IChecker.check for details."""
252         return self._node.check(verify)
253     def check_and_repair(self, verify=False):
254         return self._node.check_and_repair(verify)
255
256     def list(self):
257         """I return a Deferred that fires with a dictionary mapping child
258         name to a tuple of (IFileNode or IDirectoryNode, metadata)."""
259         return self._read()
260
261     def has_child(self, name):
262         """I return a Deferred that fires with a boolean, True if there
263         exists a child of the given name, False if not."""
264         assert isinstance(name, unicode)
265         d = self._read()
266         d.addCallback(lambda children: children.has_key(name))
267         return d
268
269     def _get(self, children, name):
270         child = children.get(name)
271         if child is None:
272             raise KeyError(name)
273         return child[0]
274
275     def get(self, name):
276         """I return a Deferred that fires with the named child node,
277         which is either an IFileNode or an IDirectoryNode."""
278         assert isinstance(name, unicode)
279         d = self._read()
280         d.addCallback(self._get, name)
281         return d
282
283     def get_metadata_for(self, name):
284         assert isinstance(name, unicode)
285         d = self._read()
286         d.addCallback(lambda children: children[name][1])
287         return d
288
289     def set_metadata_for(self, name, metadata):
290         assert isinstance(name, unicode)
291         if self.is_readonly():
292             return defer.fail(NotMutableError())
293         assert isinstance(metadata, dict)
294         s = MetadataSetter(self, name, metadata)
295         d = self._node.modify(s.modify)
296         d.addCallback(lambda res: self)
297         return d
298
299     def get_child_at_path(self, path):
300         """Transform a child path into an IDirectoryNode or IFileNode.
301
302         I perform a recursive series of 'get' operations to find the named
303         descendant node. I return a Deferred that fires with the node, or
304         errbacks with IndexError if the node could not be found.
305
306         The path can be either a single string (slash-separated) or a list of
307         path-name elements.
308         """
309
310         if not path:
311             return defer.succeed(self)
312         if isinstance(path, (list, tuple)):
313             pass
314         else:
315             path = path.split("/")
316         for p in path:
317             assert isinstance(p, unicode)
318         childname = path[0]
319         remaining_path = path[1:]
320         d = self.get(childname)
321         if remaining_path:
322             def _got(node):
323                 return node.get_child_at_path(remaining_path)
324             d.addCallback(_got)
325         return d
326
327     def set_uri(self, name, child_uri, metadata=None, overwrite=True):
328         """I add a child (by URI) at the specific name. I return a Deferred
329         that fires with the child node when the operation finishes. I will
330         replace any existing child of the same name.
331
332         The child_uri could be for a file, or for a directory (either
333         read-write or read-only, using a URI that came from get_uri() ).
334
335         If this directory node is read-only, the Deferred will errback with a
336         NotMutableError."""
337         assert isinstance(name, unicode)
338         child_node = self._create_node(child_uri)
339         d = self.set_node(name, child_node, metadata, overwrite)
340         d.addCallback(lambda res: child_node)
341         return d
342
343     def set_children(self, entries, overwrite=True):
344         # this takes URIs
345         a = Adder(self, overwrite=overwrite)
346         node_entries = []
347         for e in entries:
348             if len(e) == 2:
349                 name, child_uri = e
350                 metadata = None
351             else:
352                 assert len(e) == 3
353                 name, child_uri, metadata = e
354             assert isinstance(name, unicode)
355             a.set_node(name, self._create_node(child_uri), metadata)
356         return self._node.modify(a.modify)
357
358     def set_node(self, name, child, metadata=None, overwrite=True):
359         """I add a child at the specific name. I return a Deferred that fires
360         when the operation finishes. This Deferred will fire with the child
361         node that was just added. I will replace any existing child of the
362         same name.
363
364         If this directory node is read-only, the Deferred will errback with a
365         NotMutableError."""
366
367         if self.is_readonly():
368             return defer.fail(NotMutableError())
369         assert isinstance(name, unicode)
370         assert IFilesystemNode.providedBy(child), child
371         a = Adder(self, overwrite=overwrite)
372         a.set_node(name, child, metadata)
373         d = self._node.modify(a.modify)
374         d.addCallback(lambda res: child)
375         return d
376
377     def set_nodes(self, entries, overwrite=True):
378         if self.is_readonly():
379             return defer.fail(NotMutableError())
380         a = Adder(self, entries, overwrite=overwrite)
381         d = self._node.modify(a.modify)
382         d.addCallback(lambda res: None)
383         return d
384
385
386     def add_file(self, name, uploadable, metadata=None, overwrite=True):
387         """I upload a file (using the given IUploadable), then attach the
388         resulting FileNode to the directory at the given name. I return a
389         Deferred that fires (with the IFileNode of the uploaded file) when
390         the operation completes."""
391         assert isinstance(name, unicode)
392         if self.is_readonly():
393             return defer.fail(NotMutableError())
394         d = self._client.upload(uploadable)
395         d.addCallback(lambda results: results.uri)
396         d.addCallback(self._client.create_node_from_uri)
397         d.addCallback(lambda node:
398                       self.set_node(name, node, metadata, overwrite))
399         return d
400
401     def delete(self, name):
402         """I remove the child at the specific name. I return a Deferred that
403         fires (with the node just removed) when the operation finishes."""
404         assert isinstance(name, unicode)
405         if self.is_readonly():
406             return defer.fail(NotMutableError())
407         deleter = Deleter(self, name)
408         d = self._node.modify(deleter.modify)
409         d.addCallback(lambda res: deleter.old_child)
410         return d
411
412     def create_empty_directory(self, name, overwrite=True):
413         """I create and attach an empty directory at the given name. I return
414         a Deferred that fires (with the new directory node) when the
415         operation finishes."""
416         assert isinstance(name, unicode)
417         if self.is_readonly():
418             return defer.fail(NotMutableError())
419         d = self._client.create_empty_dirnode()
420         def _created(child):
421             entries = [(name, child, None)]
422             a = Adder(self, entries, overwrite=overwrite)
423             d = self._node.modify(a.modify)
424             d.addCallback(lambda res: child)
425             return d
426         d.addCallback(_created)
427         return d
428
429     def move_child_to(self, current_child_name, new_parent,
430                       new_child_name=None, overwrite=True):
431         """I take one of my children and move them to a new parent. The child
432         is referenced by name. On the new parent, the child will live under
433         'new_child_name', which defaults to 'current_child_name'. I return a
434         Deferred that fires when the operation finishes."""
435         assert isinstance(current_child_name, unicode)
436         if self.is_readonly() or new_parent.is_readonly():
437             return defer.fail(NotMutableError())
438         if new_child_name is None:
439             new_child_name = current_child_name
440         assert isinstance(new_child_name, unicode)
441         d = self.get(current_child_name)
442         def sn(child):
443             return new_parent.set_node(new_child_name, child,
444                                        overwrite=overwrite)
445         d.addCallback(sn)
446         d.addCallback(lambda child: self.delete(current_child_name))
447         return d
448
449     def build_manifest(self):
450         """Return a frozenset of verifier-capability strings for all nodes
451         (directories and files) reachable from this one."""
452
453         # this is just a tree-walker, except that following each edge
454         # requires a Deferred. We use a ConcurrencyLimiter to make sure the
455         # fan-out doesn't cause problems.
456
457         manifest = set()
458         manifest.add(self.get_verifier())
459         limiter = ConcurrencyLimiter(10) # allow 10 in parallel
460
461         d = self._build_manifest_from_node(self, manifest, limiter)
462         def _done(res):
463             # LIT nodes have no verifier-capability: their data is stored
464             # inside the URI itself, so there is no need to refresh anything.
465             # They indicate this by returning None from their get_verifier
466             # method. We need to remove any such Nones from our set. We also
467             # want to convert all these caps into strings.
468             return frozenset([IVerifierURI(cap).to_string()
469                               for cap in manifest
470                               if cap is not None])
471         d.addCallback(_done)
472         return d
473
474     def _build_manifest_from_node(self, node, manifest, limiter):
475         d = limiter.add(node.list)
476         def _got_list(res):
477             dl = []
478             for name, (child, metadata) in res.iteritems():
479                 verifier = child.get_verifier()
480                 if verifier not in manifest:
481                     manifest.add(verifier)
482                     if IDirectoryNode.providedBy(child):
483                         dl.append(self._build_manifest_from_node(child,
484                                                                  manifest,
485                                                                  limiter))
486             if dl:
487                 return defer.DeferredList(dl)
488         d.addCallback(_got_list)
489         return d
490
491     def deep_stats(self):
492         stats = DeepStats()
493         # we track verifier caps, to avoid double-counting children for which
494         # we've got both a write-cap and a read-cap
495         found = set()
496         found.add(self.get_verifier())
497
498         limiter = ConcurrencyLimiter(10)
499
500         d = self._add_deepstats_from_node(self, found, stats, limiter)
501         d.addCallback(lambda res: stats.get_results())
502         return d
503
504     def _add_deepstats_from_node(self, node, found, stats, limiter):
505         d = limiter.add(node.list)
506         def _got_list(children):
507             dl = []
508             dirsize_bytes = node.get_size()
509             dirsize_children = len(children)
510             stats.add("count-directories")
511             stats.add("size-directories", dirsize_bytes)
512             stats.max("largest-directory", dirsize_bytes)
513             stats.max("largest-directory-children", dirsize_children)
514             for name, (child, metadata) in children.iteritems():
515                 verifier = child.get_verifier()
516                 if verifier in found:
517                     continue
518                 found.add(verifier)
519                 if IDirectoryNode.providedBy(child):
520                     dl.append(self._add_deepstats_from_node(child, found,
521                                                             stats, limiter))
522                 elif IMutableFileNode.providedBy(child):
523                     stats.add("count-files")
524                     stats.add("count-mutable-files")
525                     # TODO: update the servermap, compute a size, add it to
526                     # size-mutable-files, max it into "largest-mutable-file"
527                 elif IFileNode.providedBy(child): # CHK and LIT
528                     stats.add("count-files")
529                     size = child.get_size()
530                     stats.histogram("size-files-histogram", size)
531                     if child.get_uri().startswith("URI:LIT:"):
532                         stats.add("count-literal-files")
533                         stats.add("size-literal-files", size)
534                     else:
535                         stats.add("count-immutable-files")
536                         stats.add("size-immutable-files", size)
537                         stats.max("largest-immutable-file", size)
538             if dl:
539                 return defer.DeferredList(dl)
540         d.addCallback(_got_list)
541         return d
542
543     def deep_check(self, verify=False):
544         return self.deep_check_base(verify, False)
545     def deep_check_and_repair(self, verify=False):
546         return self.deep_check_base(verify, True)
547
548     def deep_check_base(self, verify, repair):
549         # shallow-check each object first, then traverse children
550         root_si = self._node.get_storage_index()
551         self._lp = log.msg(format="deep-check starting (%(si)s),"
552                            " verify=%(verify)s, repair=%(repair)s",
553                            si=base32.b2a(root_si), verify=verify, repair=repair)
554         if repair:
555             results = DeepCheckAndRepairResults(root_si)
556         else:
557             results = DeepCheckResults(root_si)
558         found = set()
559         limiter = ConcurrencyLimiter(10)
560
561         d = self._add_deepcheck_from_node([], self, results, found, limiter,
562                                           verify, repair)
563         def _done(res):
564             log.msg("deep-check done", parent=self._lp)
565             return results
566         d.addCallback(_done)
567         return d
568
569     def _add_deepcheck_from_node(self, path, node, results, found, limiter,
570                                  verify, repair):
571         verifier = node.get_verifier()
572         if verifier in found:
573             # avoid loops
574             return None
575         found.add(verifier)
576
577         if repair:
578             d = limiter.add(node.check_and_repair, verify)
579             d.addCallback(results.add_check_and_repair, path)
580         else:
581             d = limiter.add(node.check, verify)
582             d.addCallback(results.add_check, path)
583
584         # TODO: stats: split the DeepStats.foo calls out of
585         # _add_deepstats_from_node into a separate non-recursing method, call
586         # it from both here and _add_deepstats_from_node.
587
588         if IDirectoryNode.providedBy(node):
589             d.addCallback(lambda res: node.list())
590             def _got_children(children):
591                 dl = []
592                 for name, (child, metadata) in children.iteritems():
593                     childpath = path + [name]
594                     d2 = self._add_deepcheck_from_node(childpath, child,
595                                                        results,
596                                                        found, limiter,
597                                                        verify, repair)
598                     if d2:
599                         dl.append(d2)
600                 if dl:
601                     return defer.DeferredList(dl, fireOnOneErrback=True)
602             d.addCallback(_got_children)
603         return d
604
605
606 class DeepStats:
607     def __init__(self):
608         self.stats = {}
609         for k in ["count-immutable-files",
610                   "count-mutable-files",
611                   "count-literal-files",
612                   "count-files",
613                   "count-directories",
614                   "size-immutable-files",
615                   #"size-mutable-files",
616                   "size-literal-files",
617                   "size-directories",
618                   "largest-directory",
619                   "largest-directory-children",
620                   "largest-immutable-file",
621                   #"largest-mutable-file",
622                   ]:
623             self.stats[k] = 0
624         self.histograms = {}
625         for k in ["size-files-histogram"]:
626             self.histograms[k] = {} # maps (min,max) to count
627         self.buckets = [ (0,0), (1,3)]
628         self.root = math.sqrt(10)
629
630     def add(self, key, value=1):
631         self.stats[key] += value
632
633     def max(self, key, value):
634         self.stats[key] = max(self.stats[key], value)
635
636     def which_bucket(self, size):
637         # return (min,max) such that min <= size <= max
638         # values are from the set (0,0), (1,3), (4,10), (11,31), (32,100),
639         # (101,316), (317, 1000), etc: two per decade
640         assert size >= 0
641         i = 0
642         while True:
643             if i >= len(self.buckets):
644                 # extend the list
645                 new_lower = self.buckets[i-1][1]+1
646                 new_upper = int(mathutil.next_power_of_k(new_lower, self.root))
647                 self.buckets.append( (new_lower, new_upper) )
648             maybe = self.buckets[i]
649             if maybe[0] <= size <= maybe[1]:
650                 return maybe
651             i += 1
652
653     def histogram(self, key, size):
654         bucket = self.which_bucket(size)
655         h = self.histograms[key]
656         if bucket not in h:
657             h[bucket] = 0
658         h[bucket] += 1
659
660     def get_results(self):
661         stats = self.stats.copy()
662         for key in self.histograms:
663             h = self.histograms[key]
664             out = [ (bucket[0], bucket[1], h[bucket]) for bucket in h ]
665             out.sort()
666             stats[key] = out
667         return stats
668
669
670 # use client.create_dirnode() to make one of these
671
672