]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage.py
bd9949caa5d11566e13e25e899634f6c2eb0d6e1
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage.py
1 import os, re, weakref, stat, struct, time
2 from itertools import chain
3
4 from foolscap import Referenceable
5 from twisted.application import service
6 from twisted.internet import defer
7
8 from zope.interface import implements
9 from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
10      RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE, \
11      BadWriteEnablerError
12 from allmydata.util import fileutil, idlib, mathutil
13 from allmydata.util.assertutil import precondition, _assert
14
15 class DataTooLargeError(Exception):
16     pass
17
18 # storage/
19 # storage/shares/incoming
20 #   incoming/ holds temp dirs named $STORAGEINDEX/$SHARENUM which will be
21 #   moved to storage/shares/$STORAGEINDEX/$SHARENUM upon success
22 # storage/shares/$STORAGEINDEX
23 # storage/shares/$STORAGEINDEX/$SHARENUM
24
25 # $SHARENUM matches this regex:
26 NUM_RE=re.compile("^[0-9]+$")
27
28 # each share file (in storage/shares/$SI/$SHNUM) contains lease information
29 # and share data. The share data is accessed by RIBucketWriter.write and
30 # RIBucketReader.read . The lease information is not accessible through these
31 # interfaces.
32
33 # The share file has the following layout:
34 #  0x00: share file version number, four bytes, current version is 1
35 #  0x04: share data length, four bytes big-endian = A
36 #  0x08: number of leases, four bytes big-endian
37 #  0x0c: beginning of share data (described below, at WriteBucketProxy)
38 #  A+0x0c = B: first lease. Lease format is:
39 #   B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner
40 #   B+0x04: renew secret, 32 bytes (SHA256)
41 #   B+0x24: cancel secret, 32 bytes (SHA256)
42 #   B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch
43 #   B+0x48: next lease, or end of record
44
45 class ShareFile:
46     LEASE_SIZE = struct.calcsize(">L32s32sL")
47
48     def __init__(self, filename):
49         self.home = filename
50         f = open(self.home, 'rb')
51         (version, size, num_leases) = struct.unpack(">LLL", f.read(0xc))
52         assert version == 1
53         self._size = size
54         self._num_leases = num_leases
55         self._data_offset = 0xc
56         self._lease_offset = 0xc + self._size
57
58     def read_share_data(self, offset, length):
59         precondition(offset >= 0)
60         precondition(offset+length <= self._size)
61         f = open(self.home, 'rb')
62         f.seek(self._data_offset+offset)
63         return f.read(length)
64
65     def write_share_data(self, offset, data):
66         length = len(data)
67         precondition(offset >= 0)
68         precondition(offset+length <= self._size)
69         f = open(self.home, 'rb+')
70         real_offset = self._data_offset+offset
71         f.seek(real_offset)
72         assert f.tell() == real_offset
73         f.write(data)
74         f.close()
75
76     def _write_lease_record(self, f, lease_number, lease_info):
77         (owner_num, renew_secret, cancel_secret, expiration_time) = lease_info
78         offset = self._lease_offset + lease_number * self.LEASE_SIZE
79         f.seek(offset)
80         assert f.tell() == offset
81         f.write(struct.pack(">L32s32sL",
82                             owner_num, renew_secret, cancel_secret,
83                             int(expiration_time)))
84
85     def _read_num_leases(self, f):
86         f.seek(0x08)
87         (num_leases,) = struct.unpack(">L", f.read(4))
88         return num_leases
89
90     def _write_num_leases(self, f, num_leases):
91         f.seek(0x08)
92         f.write(struct.pack(">L", num_leases))
93
94     def _truncate_leases(self, f, num_leases):
95         f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE)
96
97     def iter_leases(self):
98         """Yields (ownernum, renew_secret, cancel_secret, expiration_time)
99         for all leases."""
100         f = open(self.home, 'rb')
101         (version, size, num_leases) = struct.unpack(">LLL", f.read(0xc))
102         f.seek(self._lease_offset)
103         for i in range(num_leases):
104             data = f.read(self.LEASE_SIZE)
105             if data:
106                 yield struct.unpack(">L32s32sL", data)
107
108     def add_lease(self, lease_info):
109         f = open(self.home, 'rb+')
110         num_leases = self._read_num_leases(f)
111         self._write_lease_record(f, num_leases, lease_info)
112         self._write_num_leases(f, num_leases+1)
113         f.close()
114
115     def renew_lease(self, renew_secret, new_expire_time):
116         for i,(on,rs,cs,et) in enumerate(self.iter_leases()):
117             if rs == renew_secret:
118                 # yup. See if we need to update the owner time.
119                 if new_expire_time > et:
120                     # yes
121                     new_lease = (on,rs,cs,new_expire_time)
122                     f = open(self.home, 'rb+')
123                     self._write_lease_record(f, i, new_lease)
124                     f.close()
125                 return
126         raise IndexError("unable to renew non-existent lease")
127
128     def add_or_renew_lease(self, lease_info):
129         owner_num, renew_secret, cancel_secret, expire_time = lease_info
130         try:
131             self.renew_lease(renew_secret, expire_time)
132         except IndexError:
133             self.add_lease(lease_info)
134
135     def cancel_lease(self, cancel_secret):
136         """Remove a lease with the given cancel_secret. Return
137         (num_remaining_leases, space_freed). Raise IndexError if there was no
138         lease with the given cancel_secret."""
139
140         leases = list(self.iter_leases())
141         num_leases = len(leases)
142         num_leases_removed = 0
143         for i,lease_info in enumerate(leases[:]):
144             (on,rs,cs,et) = lease_info
145             if cs == cancel_secret:
146                 leases[i] = None
147                 num_leases_removed += 1
148         if not num_leases_removed:
149             raise IndexError("unable to find matching lease to cancel")
150         if num_leases_removed:
151             # pack and write out the remaining leases. We write these out in
152             # the same order as they were added, so that if we crash while
153             # doing this, we won't lose any non-cancelled leases.
154             leases = [l for l in leases if l] # remove the cancelled leases
155             f = open(self.home, 'rb+')
156             for i,lease in enumerate(leases):
157                 self._write_lease_record(f, i, lease)
158             self._write_num_leases(f, len(leases))
159             self._truncate_leases(f, len(leases))
160             f.close()
161         return len(leases), self.LEASE_SIZE * num_leases_removed
162
163
164 class BucketWriter(Referenceable):
165     implements(RIBucketWriter)
166
167     def __init__(self, ss, incominghome, finalhome, size, lease_info):
168         self.ss = ss
169         self.incominghome = incominghome
170         self.finalhome = finalhome
171         self._size = size
172         self.closed = False
173         self.throw_out_all_data = False
174         # touch the file, so later callers will see that we're working on it.
175         # Also construct the metadata.
176         assert not os.path.exists(self.incominghome)
177         f = open(self.incominghome, 'wb')
178         f.write(struct.pack(">LLL", 1, size, 0))
179         f.close()
180         self._sharefile = ShareFile(self.incominghome)
181         # also, add our lease to the file now, so that other ones can be
182         # added by simultaneous uploaders
183         self._sharefile.add_lease(lease_info)
184
185     def allocated_size(self):
186         return self._size
187
188     def remote_write(self, offset, data):
189         precondition(not self.closed)
190         if self.throw_out_all_data:
191             return
192         self._sharefile.write_share_data(offset, data)
193
194     def remote_close(self):
195         precondition(not self.closed)
196         fileutil.rename(self.incominghome, self.finalhome)
197         self._sharefile = None
198         self.closed = True
199
200         filelen = os.stat(self.finalhome)[stat.ST_SIZE]
201         self.ss.bucket_writer_closed(self, filelen)
202
203         # if we were the last share to be moved, remove the incoming/
204         # directory that was our parent
205         parentdir = os.path.split(self.incominghome)[0]
206         if not os.listdir(parentdir):
207             os.rmdir(parentdir)
208
209
210 class BucketReader(Referenceable):
211     implements(RIBucketReader)
212
213     def __init__(self, home):
214         self._share_file = ShareFile(home)
215
216     def remote_read(self, offset, length):
217         return self._share_file.read_share_data(offset, length)
218
219
220 # the MutableShareFile is like the ShareFile, but used for mutable data. It
221 # has a different layout. See docs/mutable.txt for more details.
222
223 # #   offset    size    name
224 # 1   0         32      magic verstr "tahoe mutable container v1" plus binary
225 # 2   32        32      write enabler's nodeid
226 # 3   64        32      write enabler
227 # 4   96        8       data size (actual share data present) (a)
228 # 5   104       8       offset of (8) count of extra leases (after data)
229 # 6   112       416     four leases, 104 bytes each
230 #                        0    4   ownerid (0 means "no lease here")
231 #                        4    4   expiration timestamp
232 #                        8   32   renewal token
233 #                        40  32   cancel token
234 #                        72  32   nodeid which accepted the tokens
235 # 7   528       (a)     data
236 # 8   ??        4       count of extra leases
237 # 9   ??        n*104    extra leases
238
239
240 assert struct.calcsize("L"), 4
241 assert struct.calcsize("Q"), 8
242
243 class MutableShareFile:
244
245     DATA_LENGTH_OFFSET = struct.calcsize(">32s32s32s")
246     EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
247     HEADER_SIZE = struct.calcsize(">32s32s32sQQ") # doesn't include leases
248     LEASE_SIZE = struct.calcsize(">LL32s32s32s")
249     assert LEASE_SIZE == 104
250     DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
251     assert DATA_OFFSET == 528, DATA_OFFSET
252     # our sharefiles share with a recognizable string, plus some random
253     # binary data to reduce the chance that a regular text file will look
254     # like a sharefile.
255     MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
256     assert len(MAGIC) == 32
257     MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
258     # TODO: decide upon a policy for max share size
259
260     def __init__(self, filename):
261         self.home = filename
262         if os.path.exists(self.home):
263             # we don't cache anything, just check the magic
264             f = open(self.home, 'rb')
265             data = f.read(self.HEADER_SIZE)
266             (magic,
267              write_enabler_nodeid, write_enabler,
268              data_length, extra_least_offset) = \
269              struct.unpack(">32s32s32sQQ", data)
270             assert magic == self.MAGIC
271
272
273     def create(self, my_nodeid, write_enabler):
274         assert not os.path.exists(self.home)
275         data_length = 0
276         extra_lease_offset = (self.HEADER_SIZE
277                               + 4 * self.LEASE_SIZE
278                               + data_length)
279         assert extra_lease_offset == self.DATA_OFFSET # true at creation
280         num_extra_leases = 0
281         f = open(self.home, 'wb')
282         header = struct.pack(">32s32s32sQQ",
283                              self.MAGIC, my_nodeid, write_enabler,
284                              data_length, extra_lease_offset,
285                              )
286         leases = ("\x00"*self.LEASE_SIZE) * 4
287         f.write(header + leases)
288         # data goes here, empty after creation
289         f.write(struct.pack(">L", num_extra_leases))
290         # extra leases go here, none at creation
291         f.close()
292
293     def _read_data_length(self, f):
294         f.seek(self.DATA_LENGTH_OFFSET)
295         (data_length,) = struct.unpack(">Q", f.read(8))
296         return data_length
297
298     def _write_data_length(self, f, data_length):
299         f.seek(self.DATA_LENGTH_OFFSET)
300         f.write(struct.pack(">Q", data_length))
301
302     def _read_share_data(self, f, offset, length):
303         precondition(offset >= 0)
304         data_length = self._read_data_length(f)
305         if offset+length > data_length:
306             # reads beyond the end of the data are truncated. Reads that
307             # start beyond the end of the data return an empty string.
308             length = max(0, data_length-offset)
309         if length == 0:
310             return ""
311         precondition(offset+length <= data_length)
312         f.seek(self.DATA_OFFSET+offset)
313         data = f.read(length)
314         return data
315
316     def _read_extra_lease_offset(self, f):
317         f.seek(self.EXTRA_LEASE_OFFSET)
318         (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
319         return extra_lease_offset
320
321     def _write_extra_lease_offset(self, f, offset):
322         f.seek(self.EXTRA_LEASE_OFFSET)
323         f.write(struct.pack(">Q", offset))
324
325     def _read_num_extra_leases(self, f):
326         offset = self._read_extra_lease_offset(f)
327         f.seek(offset)
328         (num_extra_leases,) = struct.unpack(">L", f.read(4))
329         return num_extra_leases
330
331     def _write_num_extra_leases(self, f, num_leases):
332         extra_lease_offset = self._read_extra_lease_offset(f)
333         f.seek(extra_lease_offset)
334         f.write(struct.pack(">L", num_leases))
335
336     def _change_container_size(self, f, new_container_size):
337         if new_container_size > self.MAX_SIZE:
338             raise DataTooLargeError()
339         old_extra_lease_offset = self._read_extra_lease_offset(f)
340         new_extra_lease_offset = self.DATA_OFFSET + new_container_size
341         if new_extra_lease_offset < old_extra_lease_offset:
342             # TODO: allow containers to shrink. For now they remain large.
343             return
344         num_extra_leases = self._read_num_extra_leases(f)
345         f.seek(old_extra_lease_offset)
346         extra_lease_data = f.read(4 + num_extra_leases * self.LEASE_SIZE)
347         f.seek(new_extra_lease_offset)
348         f.write(extra_lease_data)
349         # an interrupt here will corrupt the leases, iff the move caused the
350         # extra leases to overlap.
351         self._write_extra_lease_offset(f, new_extra_lease_offset)
352
353     def _write_share_data(self, f, offset, data):
354         length = len(data)
355         precondition(offset >= 0)
356         data_length = self._read_data_length(f)
357         extra_lease_offset = self._read_extra_lease_offset(f)
358
359         if offset+length >= data_length:
360             # They are expanding their data size.
361             if self.DATA_OFFSET+offset+length > extra_lease_offset:
362                 # Their new data won't fit in the current container, so we
363                 # have to move the leases. With luck, they're expanding it
364                 # more than the size of the extra lease block, which will
365                 # minimize the corrupt-the-share window
366                 self._change_container_size(f, offset+length)
367                 extra_lease_offset = self._read_extra_lease_offset(f)
368
369                 # an interrupt here is ok.. the container has been enlarged
370                 # but the data remains untouched
371
372             assert self.DATA_OFFSET+offset+length <= extra_lease_offset
373             # Their data now fits in the current container. We must write
374             # their new data and modify the recorded data size.
375             new_data_length = offset+length
376             self._write_data_length(f, new_data_length)
377             # an interrupt here will result in a corrupted share
378
379         # now all that's left to do is write out their data
380         f.seek(self.DATA_OFFSET+offset)
381         f.write(data)
382         return
383
384     def _write_lease_record(self, f, lease_number, lease_info):
385         (ownerid, expiration_time,
386          renew_secret, cancel_secret, nodeid) = lease_info
387         extra_lease_offset = self._read_extra_lease_offset(f)
388         num_extra_leases = self._read_num_extra_leases(f)
389         if lease_number < 4:
390             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
391         elif (lease_number-4) < num_extra_leases:
392             offset = (extra_lease_offset
393                       + 4
394                       + (lease_number-4)*self.LEASE_SIZE)
395         else:
396             # must add an extra lease record
397             self._write_num_extra_leases(f, num_extra_leases+1)
398             offset = (extra_lease_offset
399                       + 4
400                       + (lease_number-4)*self.LEASE_SIZE)
401         f.seek(offset)
402         assert f.tell() == offset
403         f.write(struct.pack(">LL32s32s32s",
404                             ownerid, int(expiration_time),
405                             renew_secret, cancel_secret, nodeid))
406
407     def _read_lease_record(self, f, lease_number):
408         # returns a 5-tuple of lease info, or None
409         extra_lease_offset = self._read_extra_lease_offset(f)
410         num_extra_leases = self._read_num_extra_leases(f)
411         if lease_number < 4:
412             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
413         elif (lease_number-4) < num_extra_leases:
414             offset = (extra_lease_offset
415                       + 4
416                       + (lease_number-4)*self.LEASE_SIZE)
417         else:
418             raise IndexError("No such lease number %d" % lease_number)
419         f.seek(offset)
420         assert f.tell() == offset
421         data = f.read(self.LEASE_SIZE)
422         lease_info = struct.unpack(">LL32s32s32s", data)
423         (ownerid, expiration_time,
424          renew_secret, cancel_secret, nodeid) = lease_info
425         if ownerid == 0:
426             return None
427         return lease_info
428
429     def _get_num_lease_slots(self, f):
430         # how many places do we have allocated for leases? Not all of them
431         # are filled.
432         num_extra_leases = self._read_num_extra_leases(f)
433         return 4+num_extra_leases
434
435     def _get_first_empty_lease_slot(self, f):
436         # return an int with the index of an empty slot, or None if we do not
437         # currently have an empty slot
438
439         for i in range(self._get_num_lease_slots(f)):
440             if self._read_lease_record(f, i) is None:
441                 return i
442         return None
443
444     def _enumerate_leases(self, f):
445         """Yields (leasenum, (ownerid, expiration_time, renew_secret,
446         cancel_secret, accepting_nodeid)) for all leases."""
447         for i in range(self._get_num_lease_slots(f)):
448             try:
449                 data = self._read_lease_record(f, i)
450                 if data is not None:
451                     yield (i,data)
452             except IndexError:
453                 return
454
455     def debug_get_leases(self):
456         f = open(self.home, 'rb')
457         leases = list(self._enumerate_leases(f))
458         f.close()
459         return leases
460
461     def add_lease(self, lease_info):
462         f = open(self.home, 'rb+')
463         num_lease_slots = self._get_num_lease_slots(f)
464         empty_slot = self._get_first_empty_lease_slot(f)
465         if empty_slot is not None:
466             self._write_lease_record(f, empty_slot, lease_info)
467         else:
468             self._write_lease_record(f, num_lease_slots, lease_info)
469         f.close()
470
471     def renew_lease(self, renew_secret, new_expire_time):
472         accepting_nodeids = set()
473         f = open(self.home, 'rb+')
474         for (leasenum,(oid,et,rs,cs,anid)) in self._enumerate_leases(f):
475             if rs == renew_secret:
476                 # yup. See if we need to update the owner time.
477                 if new_expire_time > et:
478                     # yes
479                     new_lease = (oid,new_expire_time,rs,cs,anid)
480                     self._write_lease_record(f, leasenum, new_lease)
481                 f.close()
482                 return
483             accepting_nodeids.add(anid)
484         f.close()
485         # Return the accepting_nodeids set, to give the client a chance to
486         # update the leases on a share which has been migrated from its
487         # original server to a new one.
488         msg = ("Unable to renew non-existent lease. I have leases accepted by"
489                " nodeids: ")
490         msg += ",".join([("'%s'" % idlib.b2a(anid))
491                          for anid in accepting_nodeids])
492         msg += " ."
493         raise IndexError(msg)
494
495     def add_or_renew_lease(self, lease_info):
496         ownerid, expire_time, renew_secret, cancel_secret, anid = lease_info
497         try:
498             self.renew_lease(renew_secret, expire_time)
499         except IndexError:
500             self.add_lease(lease_info)
501
502     def cancel_lease(self, cancel_secret):
503         """Remove any leases with the given cancel_secret. Return
504         (num_remaining_leases, space_freed). Raise IndexError if there was no
505         lease with the given cancel_secret."""
506
507         accepting_nodeids = set()
508         modified = 0
509         remaining = 0
510         blank = "\x00"*32
511         blank_lease = (0, 0, blank, blank, blank)
512         f = open(self.home, 'rb+')
513         for (leasenum,(oid,et,rs,cs,anid)) in self._enumerate_leases(f):
514             accepting_nodeids.add(anid)
515             if cs == cancel_secret:
516                 self._write_lease_record(f, leasenum, blank_lease)
517                 modified += 1
518             else:
519                 remaining += 1
520         if modified:
521             freed_space = self._pack_leases(f)
522             f.close()
523             return (remaining, freed_space)
524         msg = ("Unable to cancel non-existent lease. I have leases "
525                "accepted by nodeids: ")
526         msg += ",".join([("'%s'" % idlib.b2a(anid))
527                          for anid in accepting_nodeids])
528         msg += " ."
529         raise IndexError(msg)
530
531     def _pack_leases(self, f):
532         # TODO: reclaim space from cancelled leases
533         return 0
534
535     def _read_write_enabler_and_nodeid(self, f):
536         f.seek(0)
537         data = f.read(self.HEADER_SIZE)
538         (magic,
539          write_enabler_nodeid, write_enabler,
540          data_length, extra_least_offset) = \
541          struct.unpack(">32s32s32sQQ", data)
542         assert magic == self.MAGIC
543         return (write_enabler, write_enabler_nodeid)
544
545     def readv(self, readv):
546         datav = []
547         f = open(self.home, 'rb')
548         for (offset, length) in readv:
549             datav.append(self._read_share_data(f, offset, length))
550         f.close()
551         return datav
552
553 #    def remote_get_length(self):
554 #        f = open(self.home, 'rb')
555 #        data_length = self._read_data_length(f)
556 #        f.close()
557 #        return data_length
558
559     def check_write_enabler(self, write_enabler):
560         f = open(self.home, 'rb+')
561         (real_write_enabler, write_enabler_nodeid) = \
562                              self._read_write_enabler_and_nodeid(f)
563         f.close()
564         if write_enabler != real_write_enabler:
565             # accomodate share migration by reporting the nodeid used for the
566             # old write enabler.
567             msg = "The write enabler was recorded by nodeid '%s'." % \
568                   (idlib.b2a(write_enabler_nodeid),)
569             raise BadWriteEnablerError(msg)
570
571     def check_testv(self, testv):
572         test_good = True
573         f = open(self.home, 'rb+')
574         for (offset, length, operator, specimen) in testv:
575             data = self._read_share_data(f, offset, length)
576             if not self.compare(data, operator, specimen):
577                 test_good = False
578                 break
579         f.close()
580         return test_good
581
582     def writev(self, datav, new_length):
583         f = open(self.home, 'rb+')
584         for (offset, data) in datav:
585             self._write_share_data(f, offset, data)
586         if new_length is not None:
587             self._change_container_size(f, new_length)
588             f.seek(self.DATA_LENGTH_OFFSET)
589             f.write(struct.pack(">Q", new_length))
590         f.close()
591
592     def compare(self, a, op, b):
593         assert op in ("lt", "le", "eq", "ne", "ge", "gt")
594         if op == "lt":
595             return a < b
596         if op == "le":
597             return a <= b
598         if op == "eq":
599             return a == b
600         if op == "ne":
601             return a != b
602         if op == "ge":
603             return a >= b
604         if op == "gt":
605             return a > b
606         # never reached
607
608 class EmptyShare:
609
610     def check_testv(self, testv):
611         test_good = True
612         for (offset, length, operator, specimen) in testv:
613             data = ""
614             if not self.compare(data, operator, specimen):
615                 test_good = False
616                 break
617         return test_good
618
619     def compare(self, a, op, b):
620         assert op in ("lt", "le", "eq", "ne", "ge", "gt")
621         if op == "lt":
622             return a < b
623         if op == "le":
624             return a <= b
625         if op == "eq":
626             return a == b
627         if op == "ne":
628             return a != b
629         if op == "ge":
630             return a >= b
631         if op == "gt":
632             return a > b
633         # never reached
634
635 def create_mutable_sharefile(filename, my_nodeid, write_enabler):
636     ms = MutableShareFile(filename)
637     ms.create(my_nodeid, write_enabler)
638     del ms
639     return MutableShareFile(filename)
640
641
642 class StorageServer(service.MultiService, Referenceable):
643     implements(RIStorageServer)
644     name = 'storageserver'
645
646     def __init__(self, storedir, sizelimit=None, no_storage=False):
647         service.MultiService.__init__(self)
648         self.storedir = storedir
649         sharedir = os.path.join(storedir, "shares")
650         fileutil.make_dirs(sharedir)
651         self.sharedir = sharedir
652         self.sizelimit = sizelimit
653         self.no_storage = no_storage
654         self.incomingdir = os.path.join(sharedir, 'incoming')
655         self._clean_incomplete()
656         fileutil.make_dirs(self.incomingdir)
657         self._active_writers = weakref.WeakKeyDictionary()
658         self.measure_size()
659
660     def setNodeID(self, nodeid):
661         # somebody must set this before any slots can be created or leases
662         # added
663         self.my_nodeid = nodeid
664
665     def startService(self):
666         service.MultiService.startService(self)
667         if self.parent:
668             nodeid = self.parent.nodeid # 20 bytes, binary
669             assert len(nodeid) == 20
670             self.setNodeID(nodeid + "\x00"*12) # make it 32 bytes
671             # TODO: review this 20-vs-32 thing, settle on one or the other
672
673     def _clean_incomplete(self):
674         fileutil.rm_dir(self.incomingdir)
675
676     def measure_size(self):
677         self.consumed = fileutil.du(self.sharedir)
678
679     def allocated_size(self):
680         space = self.consumed
681         for bw in self._active_writers:
682             space += bw.allocated_size()
683         return space
684
685     def remote_allocate_buckets(self, storage_index,
686                                 renew_secret, cancel_secret,
687                                 sharenums, allocated_size,
688                                 canary, owner_num=0):
689         # owner_num is not for clients to set, but rather it should be
690         # curried into the PersonalStorageServer instance that is dedicated
691         # to a particular owner.
692         alreadygot = set()
693         bucketwriters = {} # k: shnum, v: BucketWriter
694         si_s = idlib.b2a(storage_index)
695
696         # in this implementation, the lease information (including secrets)
697         # goes into the share files themselves. It could also be put into a
698         # separate database. Note that the lease should not be added until
699         # the BucketWrite has been closed.
700         expire_time = time.time() + 31*24*60*60
701         lease_info = (owner_num, renew_secret, cancel_secret, expire_time)
702
703         space_per_bucket = allocated_size
704         no_limits = self.sizelimit is None
705         yes_limits = not no_limits
706         if yes_limits:
707             remaining_space = self.sizelimit - self.allocated_size()
708
709         # fill alreadygot with all shares that we have, not just the ones
710         # they asked about: this will save them a lot of work. Add or update
711         # leases for all of them: if they want us to hold shares for this
712         # file, they'll want us to hold leases for this file.
713         for (shnum, fn) in chain(self._get_bucket_shares(storage_index),
714                                  self._get_incoming_shares(storage_index)):
715             alreadygot.add(shnum)
716             sf = ShareFile(fn)
717             sf.add_or_renew_lease(lease_info)
718
719         for shnum in sharenums:
720             incominghome = os.path.join(self.incomingdir, si_s, "%d" % shnum)
721             finalhome = os.path.join(self.sharedir, si_s, "%d" % shnum)
722             if os.path.exists(incominghome) or os.path.exists(finalhome):
723                 # great! we already have it. easy.
724                 pass
725             elif no_limits or remaining_space >= space_per_bucket:
726                 # ok! we need to create the new share file.
727                 fileutil.make_dirs(os.path.join(self.incomingdir, si_s))
728                 bw = BucketWriter(self, incominghome, finalhome,
729                                   space_per_bucket, lease_info)
730                 if self.no_storage:
731                     bw.throw_out_all_data = True
732                 bucketwriters[shnum] = bw
733                 self._active_writers[bw] = 1
734                 if yes_limits:
735                     remaining_space -= space_per_bucket
736             else:
737                 # bummer! not enough space to accept this bucket
738                 pass
739
740         if bucketwriters:
741             fileutil.make_dirs(os.path.join(self.sharedir, si_s))
742
743         return alreadygot, bucketwriters
744
745     def remote_renew_lease(self, storage_index, renew_secret):
746         new_expire_time = time.time() + 31*24*60*60
747         found_buckets = False
748         for shnum, filename in self._get_bucket_shares(storage_index):
749             found_buckets = True
750             f = open(filename, 'rb')
751             header = f.read(32)
752             f.close()
753             if header[:32] == MutableShareFile.MAGIC:
754                 sf = MutableShareFile(filename)
755                 # note: if the share has been migrated, the renew_lease()
756                 # call will throw an exception, with information to help the
757                 # client update the lease.
758             elif header[:4] == struct.pack(">L", 1):
759                 sf = ShareFile(filename)
760             else:
761                 pass # non-sharefile
762             sf.renew_lease(renew_secret, new_expire_time)
763         if not found_buckets:
764             raise IndexError("no such lease to renew")
765
766     def remote_cancel_lease(self, storage_index, cancel_secret):
767         storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
768
769         remaining_files = 0
770         total_space_freed = 0
771         found_buckets = False
772         for shnum, filename in self._get_bucket_shares(storage_index):
773             # note: if we can't find a lease on one share, we won't bother
774             # looking in the others. Unless something broke internally
775             # (perhaps we ran out of disk space while adding a lease), the
776             # leases on all shares will be identical.
777             found_buckets = True
778             f = open(filename, 'rb')
779             header = f.read(32)
780             f.close()
781             if header[:32] == MutableShareFile.MAGIC:
782                 sf = MutableShareFile(filename)
783                 # note: if the share has been migrated, the renew_lease()
784                 # call will throw an exception, with information to help the
785                 # client update the lease.
786             elif header[:4] == struct.pack(">L", 1):
787                 sf = ShareFile(filename)
788             else:
789                 pass # non-sharefile
790             # this raises IndexError if the lease wasn't present
791             remaining_leases, space_freed = sf.cancel_lease(cancel_secret)
792             total_space_freed += space_freed
793             if remaining_leases:
794                 remaining_files += 1
795             else:
796                 # now remove the sharefile. We'll almost certainly be
797                 # removing the entire directory soon.
798                 filelen = os.stat(filename)[stat.ST_SIZE]
799                 os.unlink(filename)
800                 total_space_freed += filelen
801         if not remaining_files:
802             fileutil.rm_dir(storagedir)
803         self.consumed -= total_space_freed
804         if not found_buckets:
805             raise IndexError("no such lease to cancel")
806
807     def bucket_writer_closed(self, bw, consumed_size):
808         self.consumed += consumed_size
809         del self._active_writers[bw]
810
811     def _get_bucket_shares(self, storage_index):
812         """Return a list of (shnum, pathname) tuples for files that hold
813         shares for this storage_index. In each tuple, 'shnum' will always be
814         the integer form of the last component of 'pathname'."""
815         storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
816         try:
817             for f in os.listdir(storagedir):
818                 if NUM_RE.match(f):
819                     filename = os.path.join(storagedir, f)
820                     yield (int(f), filename)
821         except OSError:
822             # Commonly caused by there being no buckets at all.
823             pass
824
825     def _get_incoming_shares(self, storage_index):
826         incomingdir = os.path.join(self.incomingdir, idlib.b2a(storage_index))
827         try:
828             for f in os.listdir(incomingdir):
829                 if NUM_RE.match(f):
830                     filename = os.path.join(incomingdir, f)
831                     yield (int(f), filename)
832         except OSError:
833             pass
834
835     def remote_get_buckets(self, storage_index):
836         bucketreaders = {} # k: sharenum, v: BucketReader
837         for shnum, filename in self._get_bucket_shares(storage_index):
838             bucketreaders[shnum] = BucketReader(filename)
839         return bucketreaders
840
841     def get_leases(self, storage_index):
842         """Provide an iterator that yields all of the leases attached to this
843         bucket. Each lease is returned as a tuple of (owner_num,
844         renew_secret, cancel_secret, expiration_time).
845
846         This method is not for client use.
847         """
848
849         # since all shares get the same lease data, we just grab the leases
850         # from the first share
851         try:
852             shnum, filename = self._get_bucket_shares(storage_index).next()
853             sf = ShareFile(filename)
854             return sf.iter_leases()
855         except StopIteration:
856             return iter([])
857
858     def remote_slot_testv_and_readv_and_writev(self, storage_index,
859                                                secrets,
860                                                test_and_write_vectors,
861                                                read_vector):
862         si_s = idlib.b2a(storage_index)
863         (write_enabler, renew_secret, cancel_secret) = secrets
864         # shares exist if there is a file for them
865         bucketdir = os.path.join(self.sharedir, si_s)
866         shares = {}
867         if os.path.isdir(bucketdir):
868             for sharenum_s in os.listdir(bucketdir):
869                 try:
870                     sharenum = int(sharenum_s)
871                 except ValueError:
872                     continue
873                 filename = os.path.join(bucketdir, sharenum_s)
874                 msf = MutableShareFile(filename)
875                 msf.check_write_enabler(write_enabler)
876                 shares[sharenum] = msf
877         # write_enabler is good for all existing shares.
878
879         # Now evaluate test vectors.
880         testv_is_good = True
881         for sharenum in test_and_write_vectors:
882             (testv, datav, new_length) = test_and_write_vectors[sharenum]
883             if sharenum in shares:
884                 if not shares[sharenum].check_testv(testv):
885                     testv_is_good = False
886                     break
887             else:
888                 # compare the vectors against an empty share, in which all
889                 # reads return empty strings.
890                 if not EmptyShare().check_testv(testv):
891                     testv_is_good = False
892                     break
893
894         # now gather the read vectors, before we do any writes
895         read_data = {}
896         for sharenum, share in shares.items():
897             read_data[sharenum] = share.readv(read_vector)
898
899         if testv_is_good:
900             # now apply the write vectors
901             for sharenum in test_and_write_vectors:
902                 (testv, datav, new_length) = test_and_write_vectors[sharenum]
903                 if sharenum not in shares:
904                     # allocate a new share
905                     allocated_size = 2000 # arbitrary, really
906                     share = self._allocate_slot_share(bucketdir, secrets,
907                                                       sharenum,
908                                                       allocated_size,
909                                                       owner_num=0)
910                     shares[sharenum] = share
911                 shares[sharenum].writev(datav, new_length)
912             # and update the leases on all shares
913             ownerid = 1 # TODO
914             expire_time = time.time() + 31*24*60*60   # one month
915             my_nodeid = self.my_nodeid
916             anid = my_nodeid
917             lease_info = (ownerid, expire_time, renew_secret, cancel_secret,
918                           anid)
919             for share in shares.values():
920                 share.add_or_renew_lease(lease_info)
921
922         # all done
923         return (testv_is_good, read_data)
924
925     def _allocate_slot_share(self, bucketdir, secrets, sharenum,
926                              allocated_size, owner_num=0):
927         (write_enabler, renew_secret, cancel_secret) = secrets
928         my_nodeid = self.my_nodeid
929         fileutil.make_dirs(bucketdir)
930         filename = os.path.join(bucketdir, "%d" % sharenum)
931         share = create_mutable_sharefile(filename, my_nodeid, write_enabler)
932         return share
933
934     def remote_slot_readv(self, storage_index, shares, readv):
935         si_s = idlib.b2a(storage_index)
936         # shares exist if there is a file for them
937         bucketdir = os.path.join(self.sharedir, si_s)
938         if not os.path.isdir(bucketdir):
939             return {}
940         datavs = {}
941         for sharenum_s in os.listdir(bucketdir):
942             try:
943                 sharenum = int(sharenum_s)
944             except ValueError:
945                 continue
946             if sharenum in shares or not shares:
947                 filename = os.path.join(bucketdir, sharenum_s)
948                 msf = MutableShareFile(filename)
949                 datavs[sharenum] = msf.readv(readv)
950         return datavs
951
952
953
954 # the code before here runs on the storage server, not the client
955 # the code beyond here runs on the client, not the storage server
956
957 """
958 Share data is written into a single file. At the start of the file, there is
959 a series of four-byte big-endian offset values, which indicate where each
960 section starts. Each offset is measured from the beginning of the file.
961
962 0x00: version number (=00 00 00 01)
963 0x04: segment size
964 0x08: data size
965 0x0c: offset of data (=00 00 00 24)
966 0x10: offset of plaintext_hash_tree
967 0x14: offset of crypttext_hash_tree
968 0x18: offset of block_hashes
969 0x1c: offset of share_hashes
970 0x20: offset of uri_extension_length + uri_extension
971 0x24: start of data
972 ?   : start of plaintext_hash_tree
973 ?   : start of crypttext_hash_tree
974 ?   : start of block_hashes
975 ?   : start of share_hashes
976        each share_hash is written as a two-byte (big-endian) hashnum
977        followed by the 32-byte SHA-256 hash. We only store the hashes
978        necessary to validate the share hash root
979 ?   : start of uri_extension_length (four-byte big-endian value)
980 ?   : start of uri_extension
981 """
982
983 def allocated_size(data_size, num_segments, num_share_hashes,
984                    uri_extension_size):
985     wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes,
986                            uri_extension_size)
987     uri_extension_starts_at = wbp._offsets['uri_extension']
988     return uri_extension_starts_at + 4 + uri_extension_size
989
990 class WriteBucketProxy:
991     implements(IStorageBucketWriter)
992     def __init__(self, rref, data_size, segment_size, num_segments,
993                  num_share_hashes, uri_extension_size):
994         self._rref = rref
995         self._data_size = data_size
996         self._segment_size = segment_size
997         self._num_segments = num_segments
998
999         effective_segments = mathutil.next_power_of_k(num_segments,2)
1000         self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
1001         # how many share hashes are included in each share? This will be
1002         # about ln2(num_shares).
1003         self._share_hash_size = num_share_hashes * (2+HASH_SIZE)
1004         # we commit to not sending a uri extension larger than this
1005         self._uri_extension_size = uri_extension_size
1006
1007         offsets = self._offsets = {}
1008         x = 0x24
1009         offsets['data'] = x
1010         x += data_size
1011         offsets['plaintext_hash_tree'] = x
1012         x += self._segment_hash_size
1013         offsets['crypttext_hash_tree'] = x
1014         x += self._segment_hash_size
1015         offsets['block_hashes'] = x
1016         x += self._segment_hash_size
1017         offsets['share_hashes'] = x
1018         x += self._share_hash_size
1019         offsets['uri_extension'] = x
1020
1021         offset_data = struct.pack(">LLLLLLLLL",
1022                                   1, # version number
1023                                   segment_size,
1024                                   data_size,
1025                                   offsets['data'],
1026                                   offsets['plaintext_hash_tree'],
1027                                   offsets['crypttext_hash_tree'],
1028                                   offsets['block_hashes'],
1029                                   offsets['share_hashes'],
1030                                   offsets['uri_extension'],
1031                                   )
1032         assert len(offset_data) == 0x24
1033         self._offset_data = offset_data
1034
1035     def start(self):
1036         return self._write(0, self._offset_data)
1037
1038     def put_block(self, segmentnum, data):
1039         offset = self._offsets['data'] + segmentnum * self._segment_size
1040         assert offset + len(data) <= self._offsets['uri_extension']
1041         assert isinstance(data, str)
1042         if segmentnum < self._num_segments-1:
1043             precondition(len(data) == self._segment_size,
1044                          len(data), self._segment_size)
1045         else:
1046             precondition(len(data) == (self._data_size -
1047                                        (self._segment_size *
1048                                         (self._num_segments - 1))),
1049                          len(data), self._segment_size)
1050         return self._write(offset, data)
1051
1052     def put_plaintext_hashes(self, hashes):
1053         offset = self._offsets['plaintext_hash_tree']
1054         assert isinstance(hashes, list)
1055         data = "".join(hashes)
1056         precondition(len(data) == self._segment_hash_size,
1057                      len(data), self._segment_hash_size)
1058         precondition(offset+len(data) <= self._offsets['crypttext_hash_tree'],
1059                      offset, len(data), offset+len(data),
1060                      self._offsets['crypttext_hash_tree'])
1061         return self._write(offset, data)
1062
1063     def put_crypttext_hashes(self, hashes):
1064         offset = self._offsets['crypttext_hash_tree']
1065         assert isinstance(hashes, list)
1066         data = "".join(hashes)
1067         precondition(len(data) == self._segment_hash_size,
1068                      len(data), self._segment_hash_size)
1069         precondition(offset + len(data) <= self._offsets['block_hashes'],
1070                      offset, len(data), offset+len(data),
1071                      self._offsets['block_hashes'])
1072         return self._write(offset, data)
1073
1074     def put_block_hashes(self, blockhashes):
1075         offset = self._offsets['block_hashes']
1076         assert isinstance(blockhashes, list)
1077         data = "".join(blockhashes)
1078         precondition(len(data) == self._segment_hash_size,
1079                      len(data), self._segment_hash_size)
1080         precondition(offset + len(data) <= self._offsets['share_hashes'],
1081                      offset, len(data), offset+len(data),
1082                      self._offsets['share_hashes'])
1083         return self._write(offset, data)
1084
1085     def put_share_hashes(self, sharehashes):
1086         # sharehashes is a list of (index, hash) tuples, so they get stored
1087         # as 2+32=34 bytes each
1088         offset = self._offsets['share_hashes']
1089         assert isinstance(sharehashes, list)
1090         data = "".join([struct.pack(">H", hashnum) + hashvalue
1091                         for hashnum,hashvalue in sharehashes])
1092         precondition(len(data) == self._share_hash_size,
1093                      len(data), self._share_hash_size)
1094         precondition(offset + len(data) <= self._offsets['uri_extension'],
1095                      offset, len(data), offset+len(data),
1096                      self._offsets['uri_extension'])
1097         return self._write(offset, data)
1098
1099     def put_uri_extension(self, data):
1100         offset = self._offsets['uri_extension']
1101         assert isinstance(data, str)
1102         precondition(len(data) <= self._uri_extension_size,
1103                      len(data), self._uri_extension_size)
1104         length = struct.pack(">L", len(data))
1105         return self._write(offset, length+data)
1106
1107     def _write(self, offset, data):
1108         # TODO: for small shares, buffer the writes and do just a single call
1109         return self._rref.callRemote("write", offset, data)
1110
1111     def close(self):
1112         return self._rref.callRemote("close")
1113
1114 class ReadBucketProxy:
1115     implements(IStorageBucketReader)
1116     def __init__(self, rref):
1117         self._rref = rref
1118         self._started = False
1119
1120     def startIfNecessary(self):
1121         if self._started:
1122             return defer.succeed(self)
1123         d = self.start()
1124         d.addCallback(lambda res: self)
1125         return d
1126
1127     def start(self):
1128         # TODO: for small shares, read the whole bucket in start()
1129         d = self._read(0, 0x24)
1130         d.addCallback(self._parse_offsets)
1131         return d
1132
1133     def _parse_offsets(self, data):
1134         precondition(len(data) == 0x24)
1135         self._offsets = {}
1136         (version, self._segment_size, self._data_size) = \
1137                   struct.unpack(">LLL", data[0:0xc])
1138         _assert(version == 1)
1139         x = 0x0c
1140         for field in ( 'data',
1141                        'plaintext_hash_tree',
1142                        'crypttext_hash_tree',
1143                        'block_hashes',
1144                        'share_hashes',
1145                        'uri_extension',
1146                        ):
1147             offset = struct.unpack(">L", data[x:x+4])[0]
1148             x += 4
1149             self._offsets[field] = offset
1150         return self._offsets
1151
1152     def get_block(self, blocknum):
1153         num_segments = mathutil.div_ceil(self._data_size, self._segment_size)
1154         if blocknum < num_segments-1:
1155             size = self._segment_size
1156         else:
1157             size = self._data_size % self._segment_size
1158             if size == 0:
1159                 size = self._segment_size
1160         offset = self._offsets['data'] + blocknum * self._segment_size
1161         return self._read(offset, size)
1162
1163     def _str2l(self, s):
1164         """ split string (pulled from storage) into a list of blockids """
1165         return [ s[i:i+HASH_SIZE]
1166                  for i in range(0, len(s), HASH_SIZE) ]
1167
1168     def get_plaintext_hashes(self):
1169         offset = self._offsets['plaintext_hash_tree']
1170         size = self._offsets['crypttext_hash_tree'] - offset
1171         d = self._read(offset, size)
1172         d.addCallback(self._str2l)
1173         return d
1174
1175     def get_crypttext_hashes(self):
1176         offset = self._offsets['crypttext_hash_tree']
1177         size = self._offsets['block_hashes'] - offset
1178         d = self._read(offset, size)
1179         d.addCallback(self._str2l)
1180         return d
1181
1182     def get_block_hashes(self):
1183         offset = self._offsets['block_hashes']
1184         size = self._offsets['share_hashes'] - offset
1185         d = self._read(offset, size)
1186         d.addCallback(self._str2l)
1187         return d
1188
1189     def get_share_hashes(self):
1190         offset = self._offsets['share_hashes']
1191         size = self._offsets['uri_extension'] - offset
1192         assert size % (2+HASH_SIZE) == 0
1193         d = self._read(offset, size)
1194         def _unpack_share_hashes(data):
1195             assert len(data) == size
1196             hashes = []
1197             for i in range(0, size, 2+HASH_SIZE):
1198                 hashnum = struct.unpack(">H", data[i:i+2])[0]
1199                 hashvalue = data[i+2:i+2+HASH_SIZE]
1200                 hashes.append( (hashnum, hashvalue) )
1201             return hashes
1202         d.addCallback(_unpack_share_hashes)
1203         return d
1204
1205     def get_uri_extension(self):
1206         offset = self._offsets['uri_extension']
1207         d = self._read(offset, 4)
1208         def _got_length(data):
1209             length = struct.unpack(">L", data)[0]
1210             return self._read(offset+4, length)
1211         d.addCallback(_got_length)
1212         return d
1213
1214     def _read(self, offset, length):
1215         return self._rref.callRemote("read", offset, length)