]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/dirnode.py
first pass at deep-checker, no webapi yet, probably big problems with it, only minima...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / dirnode.py
1
2 import os, time, math
3
4 from zope.interface import implements
5 from twisted.internet import defer
6 import simplejson
7 from allmydata.mutable.common import NotMutableError
8 from allmydata.mutable.node import MutableFileNode
9 from allmydata.interfaces import IMutableFileNode, IDirectoryNode,\
10      IURI, IFileNode, IMutableFileURI, IVerifierURI, IFilesystemNode, \
11      ExistingChildError, ICheckable
12 from allmydata.immutable.checker import DeepCheckResults
13 from allmydata.util import hashutil, mathutil
14 from allmydata.util.hashutil import netstring
15 from allmydata.util.limiter import ConcurrencyLimiter
16 from allmydata.uri import NewDirectoryURI
17 from pycryptopp.cipher.aes import AES
18
19 def split_netstring(data, numstrings, allow_leftover=False):
20     """like string.split(), but extracts netstrings. If allow_leftover=False,
21     returns numstrings elements, and throws ValueError if there was leftover
22     data. If allow_leftover=True, returns numstrings+1 elements, in which the
23     last element is the leftover data (possibly an empty string)"""
24     elements = []
25     assert numstrings >= 0
26     while data:
27         colon = data.index(":")
28         length = int(data[:colon])
29         string = data[colon+1:colon+1+length]
30         assert len(string) == length
31         elements.append(string)
32         assert data[colon+1+length] == ","
33         data = data[colon+1+length+1:]
34         if len(elements) == numstrings:
35             break
36     if len(elements) < numstrings:
37         raise ValueError("ran out of netstrings")
38     if allow_leftover:
39         return tuple(elements + [data])
40     if data:
41         raise ValueError("leftover data in netstrings")
42     return tuple(elements)
43
44 class Deleter:
45     def __init__(self, node, name, must_exist=True):
46         self.node = node
47         self.name = name
48         self.must_exist = True
49     def modify(self, old_contents):
50         children = self.node._unpack_contents(old_contents)
51         if self.name not in children:
52             if self.must_exist:
53                 raise KeyError(self.name)
54             self.old_child = None
55             return None
56         self.old_child, metadata = children[self.name]
57         del children[self.name]
58         new_contents = self.node._pack_contents(children)
59         return new_contents
60
61 class MetadataSetter:
62     def __init__(self, node, name, metadata):
63         self.node = node
64         self.name = name
65         self.metadata = metadata
66
67     def modify(self, old_contents):
68         children = self.node._unpack_contents(old_contents)
69         children[self.name] = (children[self.name][0], self.metadata)
70         new_contents = self.node._pack_contents(children)
71         return new_contents
72
73
74 class Adder:
75     def __init__(self, node, entries=None, overwrite=True):
76         self.node = node
77         if entries is None:
78             entries = []
79         self.entries = entries
80         self.overwrite = overwrite
81
82     def set_node(self, name, node, metadata):
83         self.entries.append( [name, node, metadata] )
84
85     def modify(self, old_contents):
86         children = self.node._unpack_contents(old_contents)
87         now = time.time()
88         for e in self.entries:
89             if len(e) == 2:
90                 name, child = e
91                 new_metadata = None
92             else:
93                 assert len(e) == 3
94                 name, child, new_metadata = e
95             assert isinstance(name, unicode)
96             if name in children:
97                 if not self.overwrite:
98                     raise ExistingChildError("child '%s' already exists" % name)
99                 metadata = children[name][1].copy()
100             else:
101                 metadata = {"ctime": now,
102                             "mtime": now}
103             if new_metadata is None:
104                 # update timestamps
105                 if "ctime" not in metadata:
106                     metadata["ctime"] = now
107                 metadata["mtime"] = now
108             else:
109                 # just replace it
110                 metadata = new_metadata.copy()
111             children[name] = (child, metadata)
112         new_contents = self.node._pack_contents(children)
113         return new_contents
114
115 class NewDirectoryNode:
116     implements(IDirectoryNode, ICheckable)
117     filenode_class = MutableFileNode
118
119     def __init__(self, client):
120         self._client = client
121         self._most_recent_size = None
122
123     def __repr__(self):
124         return "<%s %s %s>" % (self.__class__.__name__, self.is_readonly() and "RO" or "RW", hasattr(self, '_uri') and self._uri.abbrev())
125     def init_from_uri(self, myuri):
126         self._uri = IURI(myuri)
127         self._node = self.filenode_class(self._client)
128         self._node.init_from_uri(self._uri.get_filenode_uri())
129         return self
130
131     def create(self, keypair_generator=None):
132         """
133         Returns a deferred that eventually fires with self once the directory
134         has been created (distributed across a set of storage servers).
135         """
136         # first we create a MutableFileNode with empty_contents, then use its
137         # URI to create our own.
138         self._node = self.filenode_class(self._client)
139         empty_contents = self._pack_contents({})
140         d = self._node.create(empty_contents, keypair_generator)
141         d.addCallback(self._filenode_created)
142         return d
143     def _filenode_created(self, res):
144         self._uri = NewDirectoryURI(IMutableFileURI(self._node.get_uri()))
145         return self
146
147     def get_size(self):
148         # return the size of our backing mutable file, in bytes, if we've
149         # fetched it.
150         return self._most_recent_size
151
152     def _set_size(self, data):
153         self._most_recent_size = len(data)
154         return data
155
156     def _read(self):
157         d = self._node.download_best_version()
158         d.addCallback(self._set_size)
159         d.addCallback(self._unpack_contents)
160         return d
161
162     def _encrypt_rwcap(self, rwcap):
163         assert isinstance(rwcap, str)
164         IV = os.urandom(16)
165         key = hashutil.mutable_rwcap_key_hash(IV, self._node.get_writekey())
166         cryptor = AES(key)
167         crypttext = cryptor.process(rwcap)
168         mac = hashutil.hmac(key, IV + crypttext)
169         assert len(mac) == 32
170         return IV + crypttext + mac
171
172     def _decrypt_rwcapdata(self, encwrcap):
173         IV = encwrcap[:16]
174         crypttext = encwrcap[16:-32]
175         mac = encwrcap[-32:]
176         key = hashutil.mutable_rwcap_key_hash(IV, self._node.get_writekey())
177         if mac != hashutil.hmac(key, IV+crypttext):
178             raise hashutil.IntegrityCheckError("HMAC does not match, crypttext is corrupted")
179         cryptor = AES(key)
180         plaintext = cryptor.process(crypttext)
181         return plaintext
182
183     def _create_node(self, child_uri):
184         return self._client.create_node_from_uri(child_uri)
185
186     def _unpack_contents(self, data):
187         # the directory is serialized as a list of netstrings, one per child.
188         # Each child is serialized as a list of four netstrings: (name,
189         # rocap, rwcap, metadata), in which the name,rocap,metadata are in
190         # cleartext. The 'name' is UTF-8 encoded. The rwcap is formatted as:
191         # pack("16ss32s", iv, AES(H(writekey+iv), plaintextrwcap), mac)
192         assert isinstance(data, str)
193         # an empty directory is serialized as an empty string
194         if data == "":
195             return {}
196         writeable = not self.is_readonly()
197         children = {}
198         while len(data) > 0:
199             entry, data = split_netstring(data, 1, True)
200             name, rocap, rwcapdata, metadata_s = split_netstring(entry, 4)
201             name = name.decode("utf-8")
202             if writeable:
203                 rwcap = self._decrypt_rwcapdata(rwcapdata)
204                 child = self._create_node(rwcap)
205             else:
206                 child = self._create_node(rocap)
207             metadata = simplejson.loads(metadata_s)
208             assert isinstance(metadata, dict)
209             children[name] = (child, metadata)
210         return children
211
212     def _pack_contents(self, children):
213         # expects children in the same format as _unpack_contents
214         assert isinstance(children, dict)
215         entries = []
216         for name in sorted(children.keys()):
217             child, metadata = children[name]
218             assert isinstance(name, unicode)
219             assert (IFileNode.providedBy(child)
220                     or IMutableFileNode.providedBy(child)
221                     or IDirectoryNode.providedBy(child)), (name,child)
222             assert isinstance(metadata, dict)
223             rwcap = child.get_uri() # might be RO if the child is not writeable
224             rocap = child.get_readonly_uri()
225             entry = "".join([netstring(name.encode("utf-8")),
226                              netstring(rocap),
227                              netstring(self._encrypt_rwcap(rwcap)),
228                              netstring(simplejson.dumps(metadata))])
229             entries.append(netstring(entry))
230         return "".join(entries)
231
232     def is_readonly(self):
233         return self._node.is_readonly()
234     def is_mutable(self):
235         return self._node.is_mutable()
236
237     def get_uri(self):
238         return self._uri.to_string()
239
240     def get_readonly_uri(self):
241         return self._uri.get_readonly().to_string()
242
243     def get_verifier(self):
244         return self._uri.get_verifier().to_string()
245
246     def check(self, verify=False, repair=False):
247         """Perform a file check. See IChecker.check for details."""
248         return self._node.check(verify, repair)
249
250     def list(self):
251         """I return a Deferred that fires with a dictionary mapping child
252         name to a tuple of (IFileNode or IDirectoryNode, metadata)."""
253         return self._read()
254
255     def has_child(self, name):
256         """I return a Deferred that fires with a boolean, True if there
257         exists a child of the given name, False if not."""
258         assert isinstance(name, unicode)
259         d = self._read()
260         d.addCallback(lambda children: children.has_key(name))
261         return d
262
263     def _get(self, children, name):
264         child = children.get(name)
265         if child is None:
266             raise KeyError(name)
267         return child[0]
268
269     def get(self, name):
270         """I return a Deferred that fires with the named child node,
271         which is either an IFileNode or an IDirectoryNode."""
272         assert isinstance(name, unicode)
273         d = self._read()
274         d.addCallback(self._get, name)
275         return d
276
277     def get_metadata_for(self, name):
278         assert isinstance(name, unicode)
279         d = self._read()
280         d.addCallback(lambda children: children[name][1])
281         return d
282
283     def set_metadata_for(self, name, metadata):
284         assert isinstance(name, unicode)
285         if self.is_readonly():
286             return defer.fail(NotMutableError())
287         assert isinstance(metadata, dict)
288         s = MetadataSetter(self, name, metadata)
289         d = self._node.modify(s.modify)
290         d.addCallback(lambda res: self)
291         return d
292
293     def get_child_at_path(self, path):
294         """Transform a child path into an IDirectoryNode or IFileNode.
295
296         I perform a recursive series of 'get' operations to find the named
297         descendant node. I return a Deferred that fires with the node, or
298         errbacks with IndexError if the node could not be found.
299
300         The path can be either a single string (slash-separated) or a list of
301         path-name elements.
302         """
303
304         if not path:
305             return defer.succeed(self)
306         if isinstance(path, (list, tuple)):
307             pass
308         else:
309             path = path.split("/")
310         for p in path:
311             assert isinstance(p, unicode)
312         childname = path[0]
313         remaining_path = path[1:]
314         d = self.get(childname)
315         if remaining_path:
316             def _got(node):
317                 return node.get_child_at_path(remaining_path)
318             d.addCallback(_got)
319         return d
320
321     def set_uri(self, name, child_uri, metadata=None, overwrite=True):
322         """I add a child (by URI) at the specific name. I return a Deferred
323         that fires with the child node when the operation finishes. I will
324         replace any existing child of the same name.
325
326         The child_uri could be for a file, or for a directory (either
327         read-write or read-only, using a URI that came from get_uri() ).
328
329         If this directory node is read-only, the Deferred will errback with a
330         NotMutableError."""
331         assert isinstance(name, unicode)
332         child_node = self._create_node(child_uri)
333         d = self.set_node(name, child_node, metadata, overwrite)
334         d.addCallback(lambda res: child_node)
335         return d
336
337     def set_children(self, entries, overwrite=True):
338         # this takes URIs
339         a = Adder(self, overwrite=overwrite)
340         node_entries = []
341         for e in entries:
342             if len(e) == 2:
343                 name, child_uri = e
344                 metadata = None
345             else:
346                 assert len(e) == 3
347                 name, child_uri, metadata = e
348             assert isinstance(name, unicode)
349             a.set_node(name, self._create_node(child_uri), metadata)
350         return self._node.modify(a.modify)
351
352     def set_node(self, name, child, metadata=None, overwrite=True):
353         """I add a child at the specific name. I return a Deferred that fires
354         when the operation finishes. This Deferred will fire with the child
355         node that was just added. I will replace any existing child of the
356         same name.
357
358         If this directory node is read-only, the Deferred will errback with a
359         NotMutableError."""
360
361         if self.is_readonly():
362             return defer.fail(NotMutableError())
363         assert isinstance(name, unicode)
364         assert IFilesystemNode.providedBy(child), child
365         a = Adder(self, overwrite=overwrite)
366         a.set_node(name, child, metadata)
367         d = self._node.modify(a.modify)
368         d.addCallback(lambda res: child)
369         return d
370
371     def set_nodes(self, entries, overwrite=True):
372         if self.is_readonly():
373             return defer.fail(NotMutableError())
374         a = Adder(self, entries, overwrite=overwrite)
375         d = self._node.modify(a.modify)
376         d.addCallback(lambda res: None)
377         return d
378
379
380     def add_file(self, name, uploadable, metadata=None, overwrite=True):
381         """I upload a file (using the given IUploadable), then attach the
382         resulting FileNode to the directory at the given name. I return a
383         Deferred that fires (with the IFileNode of the uploaded file) when
384         the operation completes."""
385         assert isinstance(name, unicode)
386         if self.is_readonly():
387             return defer.fail(NotMutableError())
388         d = self._client.upload(uploadable)
389         d.addCallback(lambda results: results.uri)
390         d.addCallback(self._client.create_node_from_uri)
391         d.addCallback(lambda node:
392                       self.set_node(name, node, metadata, overwrite))
393         return d
394
395     def delete(self, name):
396         """I remove the child at the specific name. I return a Deferred that
397         fires (with the node just removed) when the operation finishes."""
398         assert isinstance(name, unicode)
399         if self.is_readonly():
400             return defer.fail(NotMutableError())
401         deleter = Deleter(self, name)
402         d = self._node.modify(deleter.modify)
403         d.addCallback(lambda res: deleter.old_child)
404         return d
405
406     def create_empty_directory(self, name, overwrite=True):
407         """I create and attach an empty directory at the given name. I return
408         a Deferred that fires (with the new directory node) when the
409         operation finishes."""
410         assert isinstance(name, unicode)
411         if self.is_readonly():
412             return defer.fail(NotMutableError())
413         d = self._client.create_empty_dirnode()
414         def _created(child):
415             entries = [(name, child, None)]
416             a = Adder(self, entries, overwrite=overwrite)
417             d = self._node.modify(a.modify)
418             d.addCallback(lambda res: child)
419             return d
420         d.addCallback(_created)
421         return d
422
423     def move_child_to(self, current_child_name, new_parent,
424                       new_child_name=None, overwrite=True):
425         """I take one of my children and move them to a new parent. The child
426         is referenced by name. On the new parent, the child will live under
427         'new_child_name', which defaults to 'current_child_name'. I return a
428         Deferred that fires when the operation finishes."""
429         assert isinstance(current_child_name, unicode)
430         if self.is_readonly() or new_parent.is_readonly():
431             return defer.fail(NotMutableError())
432         if new_child_name is None:
433             new_child_name = current_child_name
434         assert isinstance(new_child_name, unicode)
435         d = self.get(current_child_name)
436         def sn(child):
437             return new_parent.set_node(new_child_name, child,
438                                        overwrite=overwrite)
439         d.addCallback(sn)
440         d.addCallback(lambda child: self.delete(current_child_name))
441         return d
442
443     def build_manifest(self):
444         """Return a frozenset of verifier-capability strings for all nodes
445         (directories and files) reachable from this one."""
446
447         # this is just a tree-walker, except that following each edge
448         # requires a Deferred. We use a ConcurrencyLimiter to make sure the
449         # fan-out doesn't cause problems.
450
451         manifest = set()
452         manifest.add(self.get_verifier())
453         limiter = ConcurrencyLimiter(10) # allow 10 in parallel
454
455         d = self._build_manifest_from_node(self, manifest, limiter)
456         def _done(res):
457             # LIT nodes have no verifier-capability: their data is stored
458             # inside the URI itself, so there is no need to refresh anything.
459             # They indicate this by returning None from their get_verifier
460             # method. We need to remove any such Nones from our set. We also
461             # want to convert all these caps into strings.
462             return frozenset([IVerifierURI(cap).to_string()
463                               for cap in manifest
464                               if cap is not None])
465         d.addCallback(_done)
466         return d
467
468     def _build_manifest_from_node(self, node, manifest, limiter):
469         d = limiter.add(node.list)
470         def _got_list(res):
471             dl = []
472             for name, (child, metadata) in res.iteritems():
473                 verifier = child.get_verifier()
474                 if verifier not in manifest:
475                     manifest.add(verifier)
476                     if IDirectoryNode.providedBy(child):
477                         dl.append(self._build_manifest_from_node(child,
478                                                                  manifest,
479                                                                  limiter))
480             if dl:
481                 return defer.DeferredList(dl)
482         d.addCallback(_got_list)
483         return d
484
485     def deep_stats(self):
486         stats = DeepStats()
487         # we track verifier caps, to avoid double-counting children for which
488         # we've got both a write-cap and a read-cap
489         found = set()
490         found.add(self.get_verifier())
491
492         limiter = ConcurrencyLimiter(10)
493
494         d = self._add_deepstats_from_node(self, found, stats, limiter)
495         d.addCallback(lambda res: stats.get_results())
496         return d
497
498     def _add_deepstats_from_node(self, node, found, stats, limiter):
499         d = limiter.add(node.list)
500         def _got_list(children):
501             dl = []
502             dirsize_bytes = node.get_size()
503             dirsize_children = len(children)
504             stats.add("count-directories")
505             stats.add("size-directories", dirsize_bytes)
506             stats.max("largest-directory", dirsize_bytes)
507             stats.max("largest-directory-children", dirsize_children)
508             for name, (child, metadata) in children.iteritems():
509                 verifier = child.get_verifier()
510                 if verifier in found:
511                     continue
512                 found.add(verifier)
513                 if IDirectoryNode.providedBy(child):
514                     dl.append(self._add_deepstats_from_node(child, found,
515                                                             stats, limiter))
516                 elif IMutableFileNode.providedBy(child):
517                     stats.add("count-files")
518                     stats.add("count-mutable-files")
519                     # TODO: update the servermap, compute a size, add it to
520                     # size-mutable-files, max it into "largest-mutable-file"
521                 elif IFileNode.providedBy(child): # CHK and LIT
522                     stats.add("count-files")
523                     size = child.get_size()
524                     stats.histogram("size-files-histogram", size)
525                     if child.get_uri().startswith("URI:LIT:"):
526                         stats.add("count-literal-files")
527                         stats.add("size-literal-files", size)
528                     else:
529                         stats.add("count-immutable-files")
530                         stats.add("size-immutable-files", size)
531                         stats.max("largest-immutable-file", size)
532             if dl:
533                 return defer.DeferredList(dl)
534         d.addCallback(_got_list)
535         return d
536
537     def deep_check(self, verify=False, repair=False):
538         results = DeepCheckResults()
539         found = set()
540         found.add(self.get_verifier())
541
542         limiter = ConcurrencyLimiter(10)
543
544         d = self._add_check_from_node(self, results, limiter, verify, repair)
545         d.addCallback(lambda res:
546                       self._add_deepcheck_from_dirnode(self,
547                                                        found, results, limiter,
548                                                        verify, repair))
549         d.addCallback(lambda res: results)
550         return d
551
552     def _add_check_from_node(self, node, results, limiter, verify, repair):
553         d = limiter.add(node.check, verify, repair)
554         d.addCallback(results.add_check)
555         return d
556
557     def _add_deepcheck_from_dirnode(self, node, found, results, limiter,
558                                     verify, repair):
559         d = limiter.add(node.list)
560         def _got_list(children):
561             dl = []
562             for name, (child, metadata) in children.iteritems():
563                 verifier = child.get_verifier()
564                 if verifier in found:
565                     # avoid loops
566                     continue
567                 dl.append(self._add_check_from_node(child,
568                                                     results, limiter,
569                                                     verify, repair))
570                 if IDirectoryNode.providedBy(child):
571                     dl.append(self._add_deepcheck_from_node(child, found,
572                                                             results, limiter,
573                                                             verify, repair))
574             if dl:
575                 return defer.DeferredList(dl)
576         d.addCallback(_got_list)
577         return d
578
579 class DeepStats:
580     def __init__(self):
581         self.stats = {}
582         for k in ["count-immutable-files",
583                   "count-mutable-files",
584                   "count-literal-files",
585                   "count-files",
586                   "count-directories",
587                   "size-immutable-files",
588                   #"size-mutable-files",
589                   "size-literal-files",
590                   "size-directories",
591                   "largest-directory",
592                   "largest-directory-children",
593                   "largest-immutable-file",
594                   #"largest-mutable-file",
595                   ]:
596             self.stats[k] = 0
597         self.histograms = {}
598         for k in ["size-files-histogram"]:
599             self.histograms[k] = {} # maps (min,max) to count
600         self.buckets = [ (0,0), (1,3)]
601         self.root = math.sqrt(10)
602
603     def add(self, key, value=1):
604         self.stats[key] += value
605
606     def max(self, key, value):
607         self.stats[key] = max(self.stats[key], value)
608
609     def which_bucket(self, size):
610         # return (min,max) such that min <= size <= max
611         # values are from the set (0,0), (1,3), (4,10), (11,31), (32,100),
612         # (101,316), (317, 1000), etc: two per decade
613         assert size >= 0
614         i = 0
615         while True:
616             if i >= len(self.buckets):
617                 # extend the list
618                 new_lower = self.buckets[i-1][1]+1
619                 new_upper = int(mathutil.next_power_of_k(new_lower, self.root))
620                 self.buckets.append( (new_lower, new_upper) )
621             maybe = self.buckets[i]
622             if maybe[0] <= size <= maybe[1]:
623                 return maybe
624             i += 1
625
626     def histogram(self, key, size):
627         bucket = self.which_bucket(size)
628         h = self.histograms[key]
629         if bucket not in h:
630             h[bucket] = 0
631         h[bucket] += 1
632
633     def get_results(self):
634         stats = self.stats.copy()
635         for key in self.histograms:
636             h = self.histograms[key]
637             out = [ (bucket[0], bucket[1], h[bucket]) for bucket in h ]
638             out.sort()
639             stats[key] = out
640         return stats
641
642
643 # use client.create_dirnode() to make one of these
644
645