]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_helper.py
clean up Helper to make later changes easier
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_helper.py
1 import os
2 from twisted.internet import defer
3 from twisted.trial import unittest
4 from twisted.application import service
5
6 from foolscap.api import Tub, fireEventually, flushEventualQueue
7
8 from allmydata.storage.server import si_b2a
9 from allmydata.storage_client import StorageFarmBroker
10 from allmydata.immutable import offloaded, upload
11 from allmydata import uri, client
12 from allmydata.util import hashutil, fileutil, mathutil
13 from pycryptopp.cipher.aes import AES
14
15 MiB = 1024*1024
16
17 DATA = "I need help\n" * 1000
18
19 class CHKUploadHelper_fake(offloaded.CHKUploadHelper):
20     def start_encrypted(self, eu):
21         d = eu.get_size()
22         def _got_size(size):
23             d2 = eu.get_all_encoding_parameters()
24             def _got_parms(parms):
25                 needed_shares, happy, total_shares, segsize = parms
26                 ueb_data = {"needed_shares": needed_shares,
27                             "total_shares": total_shares,
28                             "segment_size": segsize,
29                             "size": size,
30                             }
31                 self._results.uri_extension_data = ueb_data
32                 self._results.verifycapstr = uri.CHKFileVerifierURI(self._storage_index, "x"*32,
33                                                                  needed_shares, total_shares,
34                                                                  size).to_string()
35                 return self._results
36             d2.addCallback(_got_parms)
37             return d2
38         d.addCallback(_got_size)
39         return d
40
41 class Helper_fake_upload(offloaded.Helper):
42     def _make_chk_upload_helper(self, storage_index, r, lp):
43         si_s = si_b2a(storage_index)
44         incoming_file = os.path.join(self._chk_incoming, si_s)
45         encoding_file = os.path.join(self._chk_encoding, si_s)
46         uh = CHKUploadHelper_fake(storage_index, self,
47                                   self._storage_broker,
48                                   self._secret_holder,
49                                   incoming_file, encoding_file,
50                                   r, lp)
51         return uh
52
53 class Helper_already_uploaded(Helper_fake_upload):
54     def _check_chk(self, storage_index, results, lp):
55         res = upload.UploadResults()
56         res.uri_extension_hash = hashutil.uri_extension_hash("")
57
58         # we're pretending that the file they're trying to upload was already
59         # present in the grid. We return some information about the file, so
60         # the client can decide if they like the way it looks. The parameters
61         # used here are chosen to match the defaults.
62         PARAMS = FakeClient.DEFAULT_ENCODING_PARAMETERS
63         ueb_data = {"needed_shares": PARAMS["k"],
64                     "total_shares": PARAMS["n"],
65                     "segment_size": min(PARAMS["max_segment_size"], len(DATA)),
66                     "size": len(DATA),
67                     }
68         res.uri_extension_data = ueb_data
69         return defer.succeed(res)
70
71 class FakeClient(service.MultiService):
72     DEFAULT_ENCODING_PARAMETERS = {"k":25,
73                                    "happy": 75,
74                                    "n": 100,
75                                    "max_segment_size": 1*MiB,
76                                    }
77
78     def get_encoding_parameters(self):
79         return self.DEFAULT_ENCODING_PARAMETERS
80
81 def flush_but_dont_ignore(res):
82     d = flushEventualQueue()
83     def _done(ignored):
84         return res
85     d.addCallback(_done)
86     return d
87
88 def wait_a_few_turns(ignored=None):
89     d = fireEventually()
90     d.addCallback(fireEventually)
91     d.addCallback(fireEventually)
92     d.addCallback(fireEventually)
93     d.addCallback(fireEventually)
94     d.addCallback(fireEventually)
95     return d
96
97 def upload_data(uploader, data, convergence):
98     u = upload.Data(data, convergence=convergence)
99     return uploader.upload(u)
100
101 class AssistedUpload(unittest.TestCase):
102     timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
103     def setUp(self):
104         self.s = FakeClient()
105         self.storage_broker = StorageFarmBroker(None, True)
106         self.secret_holder = client.SecretHolder("lease secret", "convergence")
107         self.s.startService()
108
109         self.tub = t = Tub()
110         t.setOption("expose-remote-exception-types", False)
111         t.setServiceParent(self.s)
112         self.s.tub = t
113         # we never actually use this for network traffic, so it can use a
114         # bogus host/port
115         t.setLocation("bogus:1234")
116
117     def setUpHelper(self, basedir, helper_class=Helper_fake_upload):
118         fileutil.make_dirs(basedir)
119         self.helper = h = helper_class(basedir,
120                                        self.storage_broker,
121                                        self.secret_holder,
122                                        None, None)
123         self.helper_furl = self.tub.registerReference(h)
124
125     def tearDown(self):
126         d = self.s.stopService()
127         d.addCallback(fireEventually)
128         d.addBoth(flush_but_dont_ignore)
129         return d
130
131
132     def test_one(self):
133         self.basedir = "helper/AssistedUpload/test_one"
134         self.setUpHelper(self.basedir)
135         u = upload.Uploader(self.helper_furl)
136         u.setServiceParent(self.s)
137
138         d = wait_a_few_turns()
139
140         def _ready(res):
141             assert u._helper
142
143             return upload_data(u, DATA, convergence="some convergence string")
144         d.addCallback(_ready)
145         def _uploaded(results):
146             the_uri = results.uri
147             assert "CHK" in the_uri
148         d.addCallback(_uploaded)
149
150         def _check_empty(res):
151             files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
152             self.failUnlessEqual(files, [])
153             files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
154             self.failUnlessEqual(files, [])
155         d.addCallback(_check_empty)
156
157         return d
158
159     def test_previous_upload_failed(self):
160         self.basedir = "helper/AssistedUpload/test_previous_upload_failed"
161         self.setUpHelper(self.basedir)
162
163         # we want to make sure that an upload which fails (leaving the
164         # ciphertext in the CHK_encoding/ directory) does not prevent a later
165         # attempt to upload that file from working. We simulate this by
166         # populating the directory manually. The hardest part is guessing the
167         # storage index.
168
169         k = FakeClient.DEFAULT_ENCODING_PARAMETERS["k"]
170         n = FakeClient.DEFAULT_ENCODING_PARAMETERS["n"]
171         max_segsize = FakeClient.DEFAULT_ENCODING_PARAMETERS["max_segment_size"]
172         segsize = min(max_segsize, len(DATA))
173         # this must be a multiple of 'required_shares'==k
174         segsize = mathutil.next_multiple(segsize, k)
175
176         key = hashutil.convergence_hash(k, n, segsize, DATA, "test convergence string")
177         assert len(key) == 16
178         encryptor = AES(key)
179         SI = hashutil.storage_index_hash(key)
180         SI_s = si_b2a(SI)
181         encfile = os.path.join(self.basedir, "CHK_encoding", SI_s)
182         f = open(encfile, "wb")
183         f.write(encryptor.process(DATA))
184         f.close()
185
186         u = upload.Uploader(self.helper_furl)
187         u.setServiceParent(self.s)
188
189         d = wait_a_few_turns()
190
191         def _ready(res):
192             assert u._helper
193             return upload_data(u, DATA, convergence="test convergence string")
194         d.addCallback(_ready)
195         def _uploaded(results):
196             the_uri = results.uri
197             assert "CHK" in the_uri
198         d.addCallback(_uploaded)
199
200         def _check_empty(res):
201             files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
202             self.failUnlessEqual(files, [])
203             files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
204             self.failUnlessEqual(files, [])
205         d.addCallback(_check_empty)
206
207         return d
208
209     def test_already_uploaded(self):
210         self.basedir = "helper/AssistedUpload/test_already_uploaded"
211         self.setUpHelper(self.basedir, helper_class=Helper_already_uploaded)
212         u = upload.Uploader(self.helper_furl)
213         u.setServiceParent(self.s)
214
215         d = wait_a_few_turns()
216
217         def _ready(res):
218             assert u._helper
219
220             return upload_data(u, DATA, convergence="some convergence string")
221         d.addCallback(_ready)
222         def _uploaded(results):
223             the_uri = results.uri
224             assert "CHK" in the_uri
225         d.addCallback(_uploaded)
226
227         def _check_empty(res):
228             files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
229             self.failUnlessEqual(files, [])
230             files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
231             self.failUnlessEqual(files, [])
232         d.addCallback(_check_empty)
233
234         return d