2 from twisted.trial import unittest
4 from twisted.application import service
5 from twisted.internet import defer
6 from foolscap import Referenceable
7 import time, os.path, stat
9 from allmydata import interfaces
10 from allmydata.util import fileutil, hashutil, idlib
11 from allmydata.storage import BucketWriter, BucketReader, \
12 WriteBucketProxy, ReadBucketProxy, StorageServer
13 from allmydata.interfaces import BadWriteEnablerError
15 class Bucket(unittest.TestCase):
16 def make_workdir(self, name):
17 basedir = os.path.join("storage", "Bucket", name)
18 incoming = os.path.join(basedir, "tmp", "bucket")
19 final = os.path.join(basedir, "bucket")
20 fileutil.make_dirs(basedir)
21 fileutil.make_dirs(os.path.join(basedir, "tmp"))
22 return incoming, final
24 def bucket_writer_closed(self, bw, consumed):
29 renew_secret = os.urandom(32)
30 cancel_secret = os.urandom(32)
31 expiration_time = time.time() + 5000
32 return (owner_num, renew_secret, cancel_secret, expiration_time)
34 def test_create(self):
35 incoming, final = self.make_workdir("test_create")
36 bw = BucketWriter(self, incoming, final, 200, self.make_lease())
37 bw.remote_write(0, "a"*25)
38 bw.remote_write(25, "b"*25)
39 bw.remote_write(50, "c"*25)
40 bw.remote_write(75, "d"*7)
43 def test_readwrite(self):
44 incoming, final = self.make_workdir("test_readwrite")
45 bw = BucketWriter(self, incoming, final, 200, self.make_lease())
46 bw.remote_write(0, "a"*25)
47 bw.remote_write(25, "b"*25)
48 bw.remote_write(50, "c"*7) # last block may be short
52 br = BucketReader(final)
53 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
54 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
55 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
59 def callRemote(self, methname, *args, **kwargs):
61 meth = getattr(self.target, "remote_" + methname)
62 return meth(*args, **kwargs)
63 return defer.maybeDeferred(_call)
65 class BucketProxy(unittest.TestCase):
66 def make_bucket(self, name, size):
67 basedir = os.path.join("storage", "BucketProxy", name)
68 incoming = os.path.join(basedir, "tmp", "bucket")
69 final = os.path.join(basedir, "bucket")
70 fileutil.make_dirs(basedir)
71 fileutil.make_dirs(os.path.join(basedir, "tmp"))
72 bw = BucketWriter(self, incoming, final, size, self.make_lease())
79 renew_secret = os.urandom(32)
80 cancel_secret = os.urandom(32)
81 expiration_time = time.time() + 5000
82 return (owner_num, renew_secret, cancel_secret, expiration_time)
84 def bucket_writer_closed(self, bw, consumed):
87 def test_create(self):
88 bw, rb, final = self.make_bucket("test_create", 500)
89 bp = WriteBucketProxy(rb,
94 uri_extension_size=500)
95 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
97 def test_readwrite(self):
98 # Let's pretend each share has 100 bytes of data, and that there are
99 # 4 segments (25 bytes each), and 8 shares total. So the three
100 # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
101 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
102 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
103 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
104 # long. That should make the whole share:
106 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
108 plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
110 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
112 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
114 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
116 uri_extension = "s" + "E"*498 + "e"
118 bw, rb, final = self.make_bucket("test_readwrite", 1414)
119 bp = WriteBucketProxy(rb,
124 uri_extension_size=len(uri_extension))
127 d.addCallback(lambda res: bp.put_block(0, "a"*25))
128 d.addCallback(lambda res: bp.put_block(1, "b"*25))
129 d.addCallback(lambda res: bp.put_block(2, "c"*25))
130 d.addCallback(lambda res: bp.put_block(3, "d"*20))
131 d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
132 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
133 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
134 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
135 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
136 d.addCallback(lambda res: bp.close())
138 # now read everything back
139 def _start_reading(res):
140 br = BucketReader(final)
143 rbp = ReadBucketProxy(rb)
144 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
146 d1 = rbp.startIfNecessary()
147 d1.addCallback(lambda res: rbp.get_block(0))
148 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
149 d1.addCallback(lambda res: rbp.get_block(1))
150 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
151 d1.addCallback(lambda res: rbp.get_block(2))
152 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
153 d1.addCallback(lambda res: rbp.get_block(3))
154 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
156 d1.addCallback(lambda res: rbp.get_plaintext_hashes())
157 d1.addCallback(lambda res:
158 self.failUnlessEqual(res, plaintext_hashes))
159 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
160 d1.addCallback(lambda res:
161 self.failUnlessEqual(res, crypttext_hashes))
162 d1.addCallback(lambda res: rbp.get_block_hashes())
163 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
164 d1.addCallback(lambda res: rbp.get_share_hashes())
165 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
166 d1.addCallback(lambda res: rbp.get_uri_extension())
167 d1.addCallback(lambda res:
168 self.failUnlessEqual(res, uri_extension))
172 d.addCallback(_start_reading)
178 class Server(unittest.TestCase):
181 self.sparent = service.MultiService()
182 self._secret = itertools.count()
184 return self.sparent.stopService()
186 def workdir(self, name):
187 basedir = os.path.join("storage", "Server", name)
190 def create(self, name, sizelimit=None):
191 workdir = self.workdir(name)
192 ss = StorageServer(workdir, sizelimit)
193 ss.setServiceParent(self.sparent)
196 def test_create(self):
197 ss = self.create("test_create")
199 def allocate(self, ss, storage_index, sharenums, size):
200 renew_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
201 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
202 return ss.remote_allocate_buckets(storage_index,
203 renew_secret, cancel_secret,
204 sharenums, size, Referenceable())
206 def test_remove_incoming(self):
207 ss = self.create("test_remove_incoming")
208 already, writers = self.allocate(ss, "vid", range(3), 10)
209 for i,wb in writers.items():
210 wb.remote_write(0, "%10d" % i)
212 incomingdir = os.path.join(self.workdir("test_remove_incoming"),
213 "shares", "incoming")
214 leftover_dirs = os.listdir(incomingdir)
215 self.failUnlessEqual(leftover_dirs, [])
217 def test_allocate(self):
218 ss = self.create("test_allocate")
220 self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
222 canary = Referenceable()
223 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
224 self.failUnlessEqual(already, set())
225 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
227 # while the buckets are open, they should not count as readable
228 self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
230 for i,wb in writers.items():
231 wb.remote_write(0, "%25d" % i)
234 # now they should be readable
235 b = ss.remote_get_buckets("vid")
236 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
237 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
239 # now if we about writing again, the server should offer those three
240 # buckets as already present. It should offer them even if we don't
241 # ask about those specific ones.
242 already,writers = self.allocate(ss, "vid", [2,3,4], 75)
243 self.failUnlessEqual(already, set([0,1,2]))
244 self.failUnlessEqual(set(writers.keys()), set([3,4]))
246 # while those two buckets are open for writing, the server should
247 # tell new uploaders that they already exist (so that we don't try to
248 # upload into them a second time)
250 already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
251 self.failUnlessEqual(already, set([0,1,2,3,4]))
252 self.failUnlessEqual(set(writers.keys()), set([5]))
254 def test_sizelimits(self):
255 ss = self.create("test_sizelimits", 5000)
256 canary = Referenceable()
257 # a newly created and filled share incurs this much overhead, beyond
258 # the size we request.
260 LEASE_SIZE = 4+32+32+4
262 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000)
263 self.failUnlessEqual(len(writers), 3)
264 # now the StorageServer should have 3000 bytes provisionally
265 # allocated, allowing only 2000 more to be claimed
266 self.failUnlessEqual(len(ss._active_writers), 3)
268 # allocating 1001-byte shares only leaves room for one
269 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001)
270 self.failUnlessEqual(len(writers2), 1)
271 self.failUnlessEqual(len(ss._active_writers), 4)
273 # we abandon the first set, so their provisional allocation should be
277 self.failUnlessEqual(len(ss._active_writers), 1)
278 # now we have a provisional allocation of 1001 bytes
280 # and we close the second set, so their provisional allocation should
281 # become real, long-term allocation, and grows to include the
283 for bw in writers2.values():
284 bw.remote_write(0, "a"*25)
289 self.failUnlessEqual(len(ss._active_writers), 0)
291 allocated = 1001 + OVERHEAD + LEASE_SIZE
292 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
293 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
294 already3,writers3 = self.allocate(ss,"vid3", range(100), 100)
295 self.failUnlessEqual(len(writers3), 39)
296 self.failUnlessEqual(len(ss._active_writers), 39)
300 self.failUnlessEqual(len(ss._active_writers), 0)
301 ss.disownServiceParent()
304 # creating a new StorageServer in the same directory should see the
307 # metadata that goes into the share file is counted upon share close,
308 # as well as at startup. metadata that goes into other files will not
309 # be counted until the next startup, so if we were creating any
310 # extra-file metadata, the allocation would be more than 'allocated'
311 # and this test would need to be changed.
312 ss = self.create("test_sizelimits", 5000)
313 already4,writers4 = self.allocate(ss, "vid4", range(100), 100)
314 self.failUnlessEqual(len(writers4), 39)
315 self.failUnlessEqual(len(ss._active_writers), 39)
318 basedir = self.workdir("test_seek_behavior")
319 fileutil.make_dirs(basedir)
320 filename = os.path.join(basedir, "testfile")
321 f = open(filename, "wb")
324 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
325 # files. mode="a" preserves previous contents but does not allow
326 # seeking-to-create-holes. mode="r+" allows both.
327 f = open(filename, "rb+")
331 filelen = os.stat(filename)[stat.ST_SIZE]
332 self.failUnlessEqual(filelen, 100+3)
333 f2 = open(filename, "rb")
334 self.failUnlessEqual(f2.read(5), "start")
337 def test_leases(self):
338 ss = self.create("test_leases")
339 canary = Referenceable()
343 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
344 hashutil.tagged_hash("blah", "%d" % self._secret.next()))
345 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
346 sharenums, size, canary)
347 self.failUnlessEqual(len(already), 0)
348 self.failUnlessEqual(len(writers), 5)
349 for wb in writers.values():
352 leases = list(ss.get_leases("si0"))
353 self.failUnlessEqual(len(leases), 1)
354 self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
356 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
357 hashutil.tagged_hash("blah", "%d" % self._secret.next()))
358 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
359 sharenums, size, canary)
360 for wb in writers.values():
363 # take out a second lease on si1
364 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
365 hashutil.tagged_hash("blah", "%d" % self._secret.next()))
366 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
367 sharenums, size, canary)
368 self.failUnlessEqual(len(already), 5)
369 self.failUnlessEqual(len(writers), 0)
371 leases = list(ss.get_leases("si1"))
372 self.failUnlessEqual(len(leases), 2)
373 self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
375 # check that si0 is readable
376 readers = ss.remote_get_buckets("si0")
377 self.failUnlessEqual(len(readers), 5)
379 # renew the first lease. Only the proper renew_secret should work
380 ss.remote_renew_lease("si0", rs0)
381 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
382 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
384 # check that si0 is still readable
385 readers = ss.remote_get_buckets("si0")
386 self.failUnlessEqual(len(readers), 5)
389 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
390 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
391 ss.remote_cancel_lease("si0", cs0)
393 # si0 should now be gone
394 readers = ss.remote_get_buckets("si0")
395 self.failUnlessEqual(len(readers), 0)
396 # and the renew should no longer work
397 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
400 # cancel the first lease on si1, leaving the second in place
401 ss.remote_cancel_lease("si1", cs1)
402 readers = ss.remote_get_buckets("si1")
403 self.failUnlessEqual(len(readers), 5)
404 # the corresponding renew should no longer work
405 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
407 leases = list(ss.get_leases("si1"))
408 self.failUnlessEqual(len(leases), 1)
409 self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
411 ss.remote_renew_lease("si1", rs2)
412 # cancelling the second should make it go away
413 ss.remote_cancel_lease("si1", cs2)
414 readers = ss.remote_get_buckets("si1")
415 self.failUnlessEqual(len(readers), 0)
416 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
417 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
419 leases = list(ss.get_leases("si1"))
420 self.failUnlessEqual(len(leases), 0)
423 # test overlapping uploads
424 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
425 hashutil.tagged_hash("blah", "%d" % self._secret.next()))
426 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
427 hashutil.tagged_hash("blah", "%d" % self._secret.next()))
428 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
429 sharenums, size, canary)
430 self.failUnlessEqual(len(already), 0)
431 self.failUnlessEqual(len(writers), 5)
432 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
433 sharenums, size, canary)
434 self.failUnlessEqual(len(already2), 5)
435 self.failUnlessEqual(len(writers2), 0)
436 for wb in writers.values():
439 leases = list(ss.get_leases("si3"))
440 self.failUnlessEqual(len(leases), 2)
444 class MutableServer(unittest.TestCase):
447 self.sparent = service.MultiService()
448 self._secret = itertools.count()
450 return self.sparent.stopService()
452 def workdir(self, name):
453 basedir = os.path.join("storage", "MutableServer", name)
456 def create(self, name, sizelimit=None):
457 workdir = self.workdir(name)
458 ss = StorageServer(workdir, sizelimit)
459 ss.setServiceParent(self.sparent)
460 ss.setNodeID("\x00" * 32)
463 def test_create(self):
464 ss = self.create("test_create")
466 def write_enabler(self, we_tag):
467 return hashutil.tagged_hash("we_blah", we_tag)
469 def renew_secret(self, tag):
470 return hashutil.tagged_hash("renew_blah", str(tag))
472 def cancel_secret(self, tag):
473 return hashutil.tagged_hash("cancel_blah", str(tag))
475 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
476 write_enabler = self.write_enabler(we_tag)
477 renew_secret = self.renew_secret(lease_tag)
478 cancel_secret = self.cancel_secret(lease_tag)
479 return ss.remote_allocate_mutable_slot(storage_index,
481 renew_secret, cancel_secret,
484 def test_allocate(self):
485 ss = self.create("test_allocate")
486 shares = self.allocate(ss, "si1", "we1", self._secret.next(),
488 self.failUnlessEqual(len(shares), 3)
489 self.failUnlessEqual(set(shares.keys()), set([0,1,2]))
490 shares2 = ss.remote_get_mutable_slot("si1")
491 self.failUnlessEqual(len(shares2), 3)
492 self.failUnlessEqual(set(shares2.keys()), set([0,1,2]))
495 self.failUnlessEqual(s0.remote_read(0, 10), "")
496 self.failUnlessEqual(s0.remote_read(100, 10), "")
498 WE = self.write_enabler("we1")
499 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
500 answer = s0.remote_testv_and_writev(WE,
504 self.failUnlessEqual(answer, (True, []))
506 self.failUnlessEqual(s0.remote_read(0, 20), "00000000001111111111")
507 self.failUnlessEqual(s0.remote_read(95, 10), "99999")
508 self.failUnlessEqual(s0.remote_get_length(), 100)
510 self.failUnlessRaises(BadWriteEnablerError,
511 s0.remote_testv_and_writev,
514 # this testv should fail
515 answer = s0.remote_testv_and_writev(WE,
516 [(0, 12, "eq", "444444444444"),
517 (20, 5, "eq", "22222"),
519 [(0, "x"*100)], None)
520 self.failUnlessEqual(answer, (False, ["000000000011",
522 self.failUnlessEqual(s0.remote_read(0, 100), data)
525 answer = s0.remote_testv_and_writev(WE,
526 [(10, 5, "lt", "11111"),
528 [(0, "x"*100)], None)
529 self.failUnlessEqual(answer, (False, ["11111"]))
530 self.failUnlessEqual(s0.remote_read(0, 100), data)
533 def test_operators(self):
534 # test operators, the data we're comparing is '11111' in all cases.
535 # test both fail+pass, reset data after each one.
536 ss = self.create("test_operators")
537 shares = self.allocate(ss, "si1", "we1", self._secret.next(),
540 WE = self.write_enabler("we1")
541 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
542 answer = s0.remote_testv_and_writev(WE,
548 answer = s0.remote_testv_and_writev(WE,
549 [(10, 5, "nop", "11111"),
551 [(0, "x"*100)], None)
552 self.failUnlessEqual(answer, (True, ["11111"]))
553 self.failUnlessEqual(s0.remote_read(0, 100), "x"*100)
554 s0.remote_testv_and_writev(WE, [], [(0,data)], None)
557 answer = s0.remote_testv_and_writev(WE,
558 [(10, 5, "lt", "11110"),
560 [(0, "x"*100)], None)
561 self.failUnlessEqual(answer, (False, ["11111"]))
562 self.failUnlessEqual(s0.remote_read(0, 100), data)
563 s0.remote_testv_and_writev(WE, [], [(0,data)], None)
565 answer = s0.remote_testv_and_writev(WE,
566 [(10, 5, "lt", "11111"),
568 [(0, "x"*100)], None)
569 self.failUnlessEqual(answer, (False, ["11111"]))
570 self.failUnlessEqual(s0.remote_read(0, 100), data)
571 s0.remote_testv_and_writev(WE, [], [(0,data)], None)
573 answer = s0.remote_testv_and_writev(WE,
574 [(10, 5, "lt", "11112"),
576 [(0, "y"*100)], None)
577 self.failUnlessEqual(answer, (True, ["11111"]))
578 self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
579 s0.remote_testv_and_writev(WE, [], [(0,data)], None)
582 answer = s0.remote_testv_and_writev(WE,
583 [(10, 5, "le", "11110"),
585 [(0, "x"*100)], None)
586 self.failUnlessEqual(answer, (False, ["11111"]))
587 self.failUnlessEqual(s0.remote_read(0, 100), data)
588 s0.remote_testv_and_writev(WE, [], [(0,data)], None)
590 answer = s0.remote_testv_and_writev(WE,
591 [(10, 5, "le", "11111"),
593 [(0, "y"*100)], None)
594 self.failUnlessEqual(answer, (True, ["11111"]))
595 self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
596 s0.remote_testv_and_writev(WE, [], [(0,data)], None)
598 answer = s0.remote_testv_and_writev(WE,
599 [(10, 5, "le", "11112"),
601 [(0, "y"*100)], None)
602 self.failUnlessEqual(answer, (True, ["11111"]))
603 self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
604 s0.remote_testv_and_writev(WE, [], [(0,data)], None)
607 answer = s0.remote_testv_and_writev(WE,
608 [(10, 5, "eq", "11112"),
610 [(0, "x"*100)], None)
611 self.failUnlessEqual(answer, (False, ["11111"]))
612 self.failUnlessEqual(s0.remote_read(0, 100), data)
613 s0.remote_testv_and_writev(WE, [], [(0,data)], None)
615 answer = s0.remote_testv_and_writev(WE,
616 [(10, 5, "eq", "11111"),
618 [(0, "y"*100)], None)
619 self.failUnlessEqual(answer, (True, ["11111"]))
620 self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
621 s0.remote_testv_and_writev(WE, [], [(0,data)], None)
624 answer = s0.remote_testv_and_writev(WE,
625 [(10, 5, "ne", "11111"),
627 [(0, "x"*100)], None)
628 self.failUnlessEqual(answer, (False, ["11111"]))
629 self.failUnlessEqual(s0.remote_read(0, 100), data)
630 s0.remote_testv_and_writev(WE, [], [(0,data)], None)
632 answer = s0.remote_testv_and_writev(WE,
633 [(10, 5, "ne", "11112"),
635 [(0, "y"*100)], None)
636 self.failUnlessEqual(answer, (True, ["11111"]))
637 self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
638 s0.remote_testv_and_writev(WE, [], [(0,data)], None)
641 answer = s0.remote_testv_and_writev(WE,
642 [(10, 5, "ge", "11110"),
644 [(0, "y"*100)], None)
645 self.failUnlessEqual(answer, (True, ["11111"]))
646 self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
647 s0.remote_testv_and_writev(WE, [], [(0,data)], None)
649 answer = s0.remote_testv_and_writev(WE,
650 [(10, 5, "ge", "11111"),
652 [(0, "y"*100)], None)
653 self.failUnlessEqual(answer, (True, ["11111"]))
654 self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
655 s0.remote_testv_and_writev(WE, [], [(0,data)], None)
657 answer = s0.remote_testv_and_writev(WE,
658 [(10, 5, "ge", "11112"),
660 [(0, "y"*100)], None)
661 self.failUnlessEqual(answer, (False, ["11111"]))
662 self.failUnlessEqual(s0.remote_read(0, 100), data)
663 s0.remote_testv_and_writev(WE, [], [(0,data)], None)
666 answer = s0.remote_testv_and_writev(WE,
667 [(10, 5, "gt", "11110"),
669 [(0, "y"*100)], None)
670 self.failUnlessEqual(answer, (True, ["11111"]))
671 self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
672 s0.remote_testv_and_writev(WE, [], [(0,data)], None)
674 answer = s0.remote_testv_and_writev(WE,
675 [(10, 5, "gt", "11111"),
677 [(0, "x"*100)], None)
678 self.failUnlessEqual(answer, (False, ["11111"]))
679 self.failUnlessEqual(s0.remote_read(0, 100), data)
680 s0.remote_testv_and_writev(WE, [], [(0,data)], None)
682 answer = s0.remote_testv_and_writev(WE,
683 [(10, 5, "gt", "11112"),
685 [(0, "x"*100)], None)
686 self.failUnlessEqual(answer, (False, ["11111"]))
687 self.failUnlessEqual(s0.remote_read(0, 100), data)
688 s0.remote_testv_and_writev(WE, [], [(0,data)], None)
690 def test_readv(self):
691 ss = self.create("test_allocate")
692 shares = self.allocate(ss, "si1", "we1", self._secret.next(),
694 WE = self.write_enabler("we1")
695 data = [("%d" % i) * 100 for i in range(3)]
697 rc = shares[i].remote_testv_and_writev(WE, [], [(0, data[i])],
699 self.failUnlessEqual(rc, (True, []))
700 answer = ss.remote_readv_slots("si1", [(0, 10)])
701 self.failUnlessEqual(answer, {0: ["0"*10],
705 def compare_leases_without_timestamps(self, a, b):
706 self.failUnlessEqual(len(a), len(b))
707 for i in range(len(a)):
708 (num_a, (ownerid_a, expiration_time_a,
709 renew_secret_a, cancel_secret_a, nodeid_a)) = a[i]
710 (num_b, (ownerid_b, expiration_time_b,
711 renew_secret_b, cancel_secret_b, nodeid_b)) = b[i]
712 self.failUnlessEqual( (num_a, ownerid_a, renew_secret_a,
713 cancel_secret_a, nodeid_a),
714 (num_b, ownerid_b, renew_secret_b,
715 cancel_secret_b, nodeid_b) )
717 def test_leases(self):
718 ss = self.create("test_leases")
720 shares = self.allocate(ss, "si1", "we1", secret, set([0,1,2]), 100)
722 WE = self.write_enabler("we1")
723 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
724 answer = s0.remote_testv_and_writev(WE,
729 # create a random non-numeric file in the bucket directory, to
730 # exercise the code that's supposed to ignore those.
731 bucket_dir = os.path.join(self.workdir("test_leases"),
732 "shares", idlib.b2a("si1"))
733 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
734 f.write("you ought to be ignoring me\n")
737 # re-allocate the slots and use the same secrets, that should update
739 shares2 = self.allocate(ss, "si1", "we1", secret, set([0,1,2]), 100)
742 ss.remote_renew_lease("si1", self.renew_secret(secret))
744 # now allocate them with a bunch of different secrets, to trigger the
745 # extended lease code
746 shares2 = self.allocate(ss, "si1", "we1", secret+1, set([0,1,2]), 100)
747 shares2 = self.allocate(ss, "si1", "we1", secret+2, set([0,1,2]), 100)
748 shares2 = self.allocate(ss, "si1", "we1", secret+3, set([0,1,2]), 100)
749 shares2 = self.allocate(ss, "si1", "we1", secret+4, set([0,1,2]), 100)
750 shares2 = self.allocate(ss, "si1", "we1", secret+5, set([0,1,2]), 100)
752 ss.remote_cancel_lease("si1", self.cancel_secret(secret+5))
754 all_leases = s0.debug_get_leases()
755 self.failUnlessEqual(len(all_leases), 5)
757 # and write enough data to expand the container, forcing the server
759 answer = s0.remote_testv_and_writev(WE,
764 # read back the leases, make sure they're still intact.
765 self.compare_leases_without_timestamps(all_leases,
766 s0.debug_get_leases())
768 ss.remote_renew_lease("si1", self.renew_secret(secret))
769 ss.remote_renew_lease("si1", self.renew_secret(secret+1))
770 ss.remote_renew_lease("si1", self.renew_secret(secret+2))
771 ss.remote_renew_lease("si1", self.renew_secret(secret+3))
772 ss.remote_renew_lease("si1", self.renew_secret(secret+4))
773 self.compare_leases_without_timestamps(all_leases,
774 s0.debug_get_leases())
775 # get a new copy of the leases, with the current timestamps. Reading
776 # data and failing to renew/cancel leases should leave the timestamps
778 all_leases = s0.debug_get_leases()
779 # renewing with a bogus token should prompt an error message
781 # TODO: examine the exception thus raised, make sure the old nodeid
782 # is present, to provide for share migration
783 self.failUnlessRaises(IndexError,
784 ss.remote_renew_lease, "si1",
785 self.renew_secret(secret+20))
786 # same for cancelling
787 self.failUnlessRaises(IndexError,
788 ss.remote_cancel_lease, "si1",
789 self.cancel_secret(secret+20))
790 self.failUnlessEqual(all_leases, s0.debug_get_leases())
791 s0.remote_read(0, 200)
792 self.failUnlessEqual(all_leases, s0.debug_get_leases())
794 answer = s0.remote_testv_and_writev(WE,
796 [(200, "make me bigger"),],
798 self.compare_leases_without_timestamps(all_leases,
799 s0.debug_get_leases())
801 answer = s0.remote_testv_and_writev(WE,
803 [(500, "make me really bigger"),],
805 self.compare_leases_without_timestamps(all_leases,
806 s0.debug_get_leases())
808 # now cancel them all
809 ss.remote_cancel_lease("si1", self.cancel_secret(secret))
810 ss.remote_cancel_lease("si1", self.cancel_secret(secret+1))
811 ss.remote_cancel_lease("si1", self.cancel_secret(secret+2))
812 ss.remote_cancel_lease("si1", self.cancel_secret(secret+3))
813 # the slot should still be there
814 shares3 = ss.remote_get_mutable_slot("si1")
815 self.failUnlessEqual(len(shares3), 3)
816 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
818 ss.remote_cancel_lease("si1", self.cancel_secret(secret+4))
819 # now the slot should be gone
820 self.failUnlessEqual(ss.remote_get_mutable_slot("si1"), {})