]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/dirnode2.py
decentralized directories: integration and testing
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / dirnode2.py
1
2 import os
3
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import log
7 import simplejson
8 from allmydata.mutable import NotMutableError
9 from allmydata.interfaces import IMutableFileNode, IDirectoryNode,\
10      INewDirectoryURI, IURI, IFileNode, \
11      IVerifierURI
12 from allmydata.util import hashutil
13 from allmydata.util.hashutil import netstring
14 from allmydata.uri import NewDirectoryURI
15 from allmydata.Crypto.Cipher import AES
16
17 from allmydata.mutable import MutableFileNode
18
19 def split_netstring(data, numstrings, allow_leftover=False):
20     """like string.split(), but extracts netstrings. If allow_leftover=False,
21     returns numstrings elements, and throws ValueError if there was leftover
22     data. If allow_leftover=True, returns numstrings+1 elements, in which the
23     last element is the leftover data (possibly an empty string)"""
24     elements = []
25     assert numstrings >= 0
26     while data:
27         colon = data.index(":")
28         length = int(data[:colon])
29         string = data[colon+1:colon+1+length]
30         assert len(string) == length
31         elements.append(string)
32         assert data[colon+1+length] == ","
33         data = data[colon+1+length+1:]
34         if len(elements) == numstrings:
35             break
36     if len(elements) < numstrings:
37         raise ValueError("ran out of netstrings")
38     if allow_leftover:
39         return tuple(elements + [data])
40     if data:
41         raise ValueError("leftover data in netstrings")
42     return tuple(elements)
43
44 class NewDirectoryNode:
45     implements(IDirectoryNode)
46     filenode_class = MutableFileNode
47
48     def __init__(self, client):
49         self._client = client
50     def __repr__(self):
51         return "<%s %s %s>" % (self.__class__.__name__, self.is_readonly() and "RO" or "RW", hasattr(self, '_uri') and self._uri.abbrev())
52     def init_from_uri(self, myuri):
53         self._uri = IURI(myuri)
54         self._node = self.filenode_class(self._client)
55         self._node.init_from_uri(self._uri.get_filenode_uri())
56         return self
57
58     def create(self, wait_for_numpeers=None):
59         """
60         Returns a deferred that eventually fires with self once the directory
61         has been created (distributed across a set of storage servers).
62         """
63         # first we create a MutableFileNode with empty_contents, then use its
64         # URI to create our own.
65         self._node = self.filenode_class(self._client)
66         empty_contents = self._pack_contents({})
67         d = self._node.create(empty_contents, wait_for_numpeers=wait_for_numpeers)
68         d.addCallback(self._filenode_created)
69         return d
70     def _filenode_created(self, res):
71         self._uri = NewDirectoryURI(self._node._uri)
72         return self
73
74     def _read(self):
75         d = self._node.download_to_data()
76         d.addCallback(self._unpack_contents)
77         return d
78
79     def _encrypt_rwcap(self, rwcap):
80         assert isinstance(rwcap, str)
81         IV = os.urandom(16)
82         key = hashutil.mutable_rwcap_key_hash(IV, self._node.get_writekey())
83         counterstart = "\x00"*16
84         cryptor = AES.new(key=key, mode=AES.MODE_CTR, counterstart=counterstart)
85         crypttext = cryptor.encrypt(rwcap)
86         mac = hashutil.hmac(key, IV + crypttext)
87         assert len(mac) == 32
88         return IV + crypttext + mac
89
90     def _decrypt_rwcapdata(self, encwrcap):
91         IV = encwrcap[:16]
92         crypttext = encwrcap[16:-32]
93         mac = encwrcap[-32:]
94         key = hashutil.mutable_rwcap_key_hash(IV, self._node.get_writekey())
95         if mac != hashutil.hmac(key, IV+crypttext):
96             raise hashutil.IntegrityCheckError("HMAC does not match, crypttext is corrupted")
97         counterstart = "\x00"*16
98         cryptor = AES.new(key=key, mode=AES.MODE_CTR, counterstart=counterstart)
99         plaintext = cryptor.decrypt(crypttext)
100         return plaintext
101
102     def _create_node(self, child_uri):
103         return self._client.create_node_from_uri(child_uri)
104
105     def _unpack_contents(self, data):
106         # the directory is serialized as a list of netstrings, one per child.
107         # Each child is serialized as a list of four netstrings: (name,
108         # rocap, rwcap, metadata), in which the name,rocap,metadata are in
109         # cleartext. The rwcap is formatted as:
110         #  pack("16ss32s", iv, AES(H(writekey+iv), plaintextrwcap), mac)
111         assert isinstance(data, str)
112         # an empty directory is serialized as an empty string
113         if data == "":
114             return {}
115         writeable = not self.is_readonly()
116         children = {}
117         while len(data) > 0:
118             entry, data = split_netstring(data, 1, True)
119             name, rocap, rwcapdata, metadata_s = split_netstring(entry, 4)
120             if writeable:
121                 rwcap = self._decrypt_rwcapdata(rwcapdata)
122                 child = self._create_node(rwcap)
123             else:
124                 child = self._create_node(rocap)
125             metadata = simplejson.loads(metadata_s)
126             assert isinstance(metadata, dict)
127             children[name] = (child, metadata)
128         return children
129
130     def _pack_contents(self, children):
131         # expects children in the same format as _unpack_contents
132         assert isinstance(children, dict)
133         entries = []
134         for name in sorted(children.keys()):
135             child, metadata = children[name]
136             assert (IFileNode.providedBy(child)
137                     or IMutableFileNode.providedBy(child)
138                     or IDirectoryNode.providedBy(child)), children
139             assert isinstance(metadata, dict)
140             rwcap = child.get_uri() # might be RO if the child is not writeable
141             rocap = child.get_readonly_uri()
142             entry = "".join([netstring(name),
143                              netstring(rocap),
144                              netstring(self._encrypt_rwcap(rwcap)),
145                              netstring(simplejson.dumps(metadata))])
146             entries.append(netstring(entry))
147         return "".join(entries)
148
149     def is_readonly(self):
150         return self._node.is_readonly()
151     def is_mutable(self):
152         return self._node.is_mutable()
153
154     def get_uri(self):
155         return self._uri.to_string()
156
157     def get_readonly_uri(self):
158         return self._uri.get_readonly().to_string()
159
160     def get_verifier(self):
161         return self._uri.get_verifier().to_string()
162
163     def check(self):
164         """Perform a file check. See IChecker.check for details."""
165         pass # TODO
166
167     def list(self):
168         """I return a Deferred that fires with a dictionary mapping child
169         name to a tuple of (IFileNode or IDirectoryNode, metadata)."""
170         return self._read()
171
172     def has_child(self, name):
173         """I return a Deferred that fires with a boolean, True if there
174         exists a child of the given name, False if not."""
175         d = self._read()
176         d.addCallback(lambda children: children.has_key(name))
177         return d
178
179     def _get(self, children, name):
180         child = children.get(name)
181         if child is None:
182             raise KeyError(name)
183         return child[0]
184
185     def get(self, name):
186         """I return a Deferred that fires with the named child node,
187         which is either an IFileNode or an IDirectoryNode."""
188         d = self._read()
189         d.addCallback(self._get, name)
190         return d
191
192     def get_metadata_for(self, name):
193         d = self._read()
194         d.addCallback(lambda children: children[name][1])
195         return d
196
197     def get_child_at_path(self, path):
198         """Transform a child path into an IDirectoryNode or IFileNode.
199
200         I perform a recursive series of 'get' operations to find the named
201         descendant node. I return a Deferred that fires with the node, or
202         errbacks with IndexError if the node could not be found.
203
204         The path can be either a single string (slash-separated) or a list of
205         path-name elements.
206         """
207
208         if not path:
209             return defer.succeed(self)
210         if isinstance(path, (str, unicode)):
211             path = path.split("/")
212         childname = path[0]
213         remaining_path = path[1:]
214         d = self.get(childname)
215         if remaining_path:
216             def _got(node):
217                 return node.get_child_at_path(remaining_path)
218             d.addCallback(_got)
219         return d
220
221     def set_uri(self, name, child_uri, metadata={}, wait_for_numpeers=None):
222         """I add a child (by URI) at the specific name. I return a Deferred
223         that fires with the child node when the operation finishes. I will
224         replace any existing child of the same name.
225
226         The child_uri could be for a file, or for a directory (either
227         read-write or read-only, using a URI that came from get_uri() ).
228
229         If this directory node is read-only, the Deferred will errback with a
230         NotMutableError."""
231         return self.set_node(name, self._create_node(child_uri), metadata, wait_for_numpeers=wait_for_numpeers)
232
233     def set_node(self, name, child, metadata={}, wait_for_numpeers=None):
234         """I add a child at the specific name. I return a Deferred that fires
235         when the operation finishes. This Deferred will fire with the child
236         node that was just added. I will replace any existing child of the
237         same name.
238
239         If this directory node is read-only, the Deferred will errback with a
240         NotMutableError."""
241         if self.is_readonly():
242             return defer.fail(NotMutableError())
243         d = self._read()
244         def _add(children):
245             children[name] = (child, metadata)
246             new_contents = self._pack_contents(children)
247             return self._node.replace(new_contents, wait_for_numpeers=wait_for_numpeers)
248         d.addCallback(_add)
249         d.addCallback(lambda res: child)
250         return d
251
252     def add_file(self, name, uploadable, wait_for_numpeers=None):
253         """I upload a file (using the given IUploadable), then attach the
254         resulting FileNode to the directory at the given name. I return a
255         Deferred that fires (with the IFileNode of the uploaded file) when
256         the operation completes."""
257         if self.is_readonly():
258             return defer.fail(NotMutableError())
259         d = self._client.upload(uploadable, wait_for_numpeers=wait_for_numpeers)
260         d.addCallback(self._client.create_node_from_uri)
261         d.addCallback(lambda node: self.set_node(name, node, wait_for_numpeers=wait_for_numpeers))
262         return d
263
264     def delete(self, name):
265         """I remove the child at the specific name. I return a Deferred that
266         fires (with the node just removed) when the operation finishes."""
267         if self.is_readonly():
268             return defer.fail(NotMutableError())
269         d = self._read()
270         def _delete(children):
271             old_child, metadata = children[name]
272             del children[name]
273             new_contents = self._pack_contents(children)
274             d = self._node.replace(new_contents)
275             def _done(res):
276                 return old_child
277             d.addCallback(_done)
278             return d
279         d.addCallback(_delete)
280         return d
281
282     def create_empty_directory(self, name, wait_for_numpeers=None):
283         """I create and attach an empty directory at the given name. I return
284         a Deferred that fires (with the new directory node) when the
285         operation finishes."""
286         if self.is_readonly():
287             return defer.fail(NotMutableError())
288         d = self._client.create_empty_dirnode(wait_for_numpeers=wait_for_numpeers)
289         def _created(child):
290             d = self.set_node(name, child, wait_for_numpeers=wait_for_numpeers)
291             d.addCallback(lambda res: child)
292             return d
293         d.addCallback(_created)
294         return d
295
296     def move_child_to(self, current_child_name, new_parent,
297                       new_child_name=None, wait_for_numpeers=None):
298         """I take one of my children and move them to a new parent. The child
299         is referenced by name. On the new parent, the child will live under
300         'new_child_name', which defaults to 'current_child_name'. I return a
301         Deferred that fires when the operation finishes."""
302         if self.is_readonly() or new_parent.is_readonly():
303             return defer.fail(NotMutableError())
304         if new_child_name is None:
305             new_child_name = current_child_name
306         d = self.get(current_child_name)
307         def sn(child):
308             return new_parent.set_node(new_child_name, child,
309                                 wait_for_numpeers=wait_for_numpeers)
310         d.addCallback(sn)
311         d.addCallback(lambda child: self.delete(current_child_name))
312         return d
313
314     def build_manifest(self):
315         """Return a frozenset of verifier-capability strings for all nodes
316         (directories and files) reachable from this one."""
317
318         # this is just a tree-walker, except that following each edge
319         # requires a Deferred.
320
321         manifest = set()
322         manifest.add(self.get_verifier())
323
324         d = self._build_manifest_from_node(self, manifest)
325         def _done(res):
326             # LIT nodes have no verifier-capability: their data is stored
327             # inside the URI itself, so there is no need to refresh anything.
328             # They indicate this by returning None from their get_verifier
329             # method. We need to remove any such Nones from our set. We also
330             # want to convert all these caps into strings.
331             return frozenset([IVerifierURI(cap).to_string()
332                               for cap in manifest
333                               if cap is not None])
334         d.addCallback(_done)
335         return d
336
337     def _build_manifest_from_node(self, node, manifest):
338         d = node.list()
339         def _got_list(res):
340             dl = []
341             for name, (child, metadata) in res.iteritems():
342                 verifier = child.get_verifier()
343                 if verifier not in manifest:
344                     manifest.add(verifier)
345                     if IDirectoryNode.providedBy(child):
346                         dl.append(self._build_manifest_from_node(child,
347                                                                  manifest))
348             if dl:
349                 return defer.DeferredList(dl)
350         d.addCallback(_got_list)
351         return d
352
353 # use client.create_dirnode() to make one of these
354
355