]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_helper.py
add HelperUploadResults
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_helper.py
1 import os
2 from twisted.internet import defer
3 from twisted.trial import unittest
4 from twisted.application import service
5
6 from foolscap.api import Tub, fireEventually, flushEventualQueue
7
8 from allmydata.storage.server import si_b2a
9 from allmydata.storage_client import StorageFarmBroker
10 from allmydata.immutable import offloaded, upload
11 from allmydata import uri, client
12 from allmydata.util import hashutil, fileutil, mathutil
13 from pycryptopp.cipher.aes import AES
14
15 MiB = 1024*1024
16
17 DATA = "I need help\n" * 1000
18
19 class CHKUploadHelper_fake(offloaded.CHKUploadHelper):
20     def start_encrypted(self, eu):
21         d = eu.get_size()
22         def _got_size(size):
23             d2 = eu.get_all_encoding_parameters()
24             def _got_parms(parms):
25                 # just pretend we did the upload
26                 needed_shares, happy, total_shares, segsize = parms
27                 ueb_data = {"needed_shares": needed_shares,
28                             "total_shares": total_shares,
29                             "segment_size": segsize,
30                             "size": size,
31                             }
32
33                 r = upload.UploadResults()
34                 r.preexisting_shares = 0
35                 r.pushed_shares = total_shares
36                 r.file_size = size
37                 r.uri_extension_data = ueb_data
38                 v = uri.CHKFileVerifierURI(self._storage_index, "x"*32,
39                                            needed_shares, total_shares,
40                                            size)
41                 r.verifycapstr = v.to_string()
42                 return r
43             d2.addCallback(_got_parms)
44             return d2
45         d.addCallback(_got_size)
46         return d
47
48 class Helper_fake_upload(offloaded.Helper):
49     def _make_chk_upload_helper(self, storage_index, lp):
50         si_s = si_b2a(storage_index)
51         incoming_file = os.path.join(self._chk_incoming, si_s)
52         encoding_file = os.path.join(self._chk_encoding, si_s)
53         uh = CHKUploadHelper_fake(storage_index, self,
54                                   self._storage_broker,
55                                   self._secret_holder,
56                                   incoming_file, encoding_file,
57                                   lp)
58         return uh
59
60 class Helper_already_uploaded(Helper_fake_upload):
61     def _check_chk(self, storage_index, lp):
62         res = upload.HelperUploadResults()
63         res.uri_extension_hash = hashutil.uri_extension_hash("")
64
65         # we're pretending that the file they're trying to upload was already
66         # present in the grid. We return some information about the file, so
67         # the client can decide if they like the way it looks. The parameters
68         # used here are chosen to match the defaults.
69         PARAMS = FakeClient.DEFAULT_ENCODING_PARAMETERS
70         ueb_data = {"needed_shares": PARAMS["k"],
71                     "total_shares": PARAMS["n"],
72                     "segment_size": min(PARAMS["max_segment_size"], len(DATA)),
73                     "size": len(DATA),
74                     }
75         res.uri_extension_data = ueb_data
76         return defer.succeed(res)
77
78 class FakeClient(service.MultiService):
79     DEFAULT_ENCODING_PARAMETERS = {"k":25,
80                                    "happy": 75,
81                                    "n": 100,
82                                    "max_segment_size": 1*MiB,
83                                    }
84
85     def get_encoding_parameters(self):
86         return self.DEFAULT_ENCODING_PARAMETERS
87
88 def flush_but_dont_ignore(res):
89     d = flushEventualQueue()
90     def _done(ignored):
91         return res
92     d.addCallback(_done)
93     return d
94
95 def wait_a_few_turns(ignored=None):
96     d = fireEventually()
97     d.addCallback(fireEventually)
98     d.addCallback(fireEventually)
99     d.addCallback(fireEventually)
100     d.addCallback(fireEventually)
101     d.addCallback(fireEventually)
102     return d
103
104 def upload_data(uploader, data, convergence):
105     u = upload.Data(data, convergence=convergence)
106     return uploader.upload(u)
107
108 class AssistedUpload(unittest.TestCase):
109     timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
110     def setUp(self):
111         self.s = FakeClient()
112         self.storage_broker = StorageFarmBroker(None, True)
113         self.secret_holder = client.SecretHolder("lease secret", "convergence")
114         self.s.startService()
115
116         self.tub = t = Tub()
117         t.setOption("expose-remote-exception-types", False)
118         t.setServiceParent(self.s)
119         self.s.tub = t
120         # we never actually use this for network traffic, so it can use a
121         # bogus host/port
122         t.setLocation("bogus:1234")
123
124     def setUpHelper(self, basedir, helper_class=Helper_fake_upload):
125         fileutil.make_dirs(basedir)
126         self.helper = h = helper_class(basedir,
127                                        self.storage_broker,
128                                        self.secret_holder,
129                                        None, None)
130         self.helper_furl = self.tub.registerReference(h)
131
132     def tearDown(self):
133         d = self.s.stopService()
134         d.addCallback(fireEventually)
135         d.addBoth(flush_but_dont_ignore)
136         return d
137
138
139     def test_one(self):
140         self.basedir = "helper/AssistedUpload/test_one"
141         self.setUpHelper(self.basedir)
142         u = upload.Uploader(self.helper_furl)
143         u.setServiceParent(self.s)
144
145         d = wait_a_few_turns()
146
147         def _ready(res):
148             assert u._helper
149
150             return upload_data(u, DATA, convergence="some convergence string")
151         d.addCallback(_ready)
152         def _uploaded(results):
153             the_uri = results.uri
154             assert "CHK" in the_uri
155         d.addCallback(_uploaded)
156
157         def _check_empty(res):
158             files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
159             self.failUnlessEqual(files, [])
160             files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
161             self.failUnlessEqual(files, [])
162         d.addCallback(_check_empty)
163
164         return d
165
166     def test_previous_upload_failed(self):
167         self.basedir = "helper/AssistedUpload/test_previous_upload_failed"
168         self.setUpHelper(self.basedir)
169
170         # we want to make sure that an upload which fails (leaving the
171         # ciphertext in the CHK_encoding/ directory) does not prevent a later
172         # attempt to upload that file from working. We simulate this by
173         # populating the directory manually. The hardest part is guessing the
174         # storage index.
175
176         k = FakeClient.DEFAULT_ENCODING_PARAMETERS["k"]
177         n = FakeClient.DEFAULT_ENCODING_PARAMETERS["n"]
178         max_segsize = FakeClient.DEFAULT_ENCODING_PARAMETERS["max_segment_size"]
179         segsize = min(max_segsize, len(DATA))
180         # this must be a multiple of 'required_shares'==k
181         segsize = mathutil.next_multiple(segsize, k)
182
183         key = hashutil.convergence_hash(k, n, segsize, DATA, "test convergence string")
184         assert len(key) == 16
185         encryptor = AES(key)
186         SI = hashutil.storage_index_hash(key)
187         SI_s = si_b2a(SI)
188         encfile = os.path.join(self.basedir, "CHK_encoding", SI_s)
189         f = open(encfile, "wb")
190         f.write(encryptor.process(DATA))
191         f.close()
192
193         u = upload.Uploader(self.helper_furl)
194         u.setServiceParent(self.s)
195
196         d = wait_a_few_turns()
197
198         def _ready(res):
199             assert u._helper
200             return upload_data(u, DATA, convergence="test convergence string")
201         d.addCallback(_ready)
202         def _uploaded(results):
203             the_uri = results.uri
204             assert "CHK" in the_uri
205         d.addCallback(_uploaded)
206
207         def _check_empty(res):
208             files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
209             self.failUnlessEqual(files, [])
210             files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
211             self.failUnlessEqual(files, [])
212         d.addCallback(_check_empty)
213
214         return d
215
216     def test_already_uploaded(self):
217         self.basedir = "helper/AssistedUpload/test_already_uploaded"
218         self.setUpHelper(self.basedir, helper_class=Helper_already_uploaded)
219         u = upload.Uploader(self.helper_furl)
220         u.setServiceParent(self.s)
221
222         d = wait_a_few_turns()
223
224         def _ready(res):
225             assert u._helper
226
227             return upload_data(u, DATA, convergence="some convergence string")
228         d.addCallback(_ready)
229         def _uploaded(results):
230             the_uri = results.uri
231             assert "CHK" in the_uri
232         d.addCallback(_uploaded)
233
234         def _check_empty(res):
235             files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
236             self.failUnlessEqual(files, [])
237             files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
238             self.failUnlessEqual(files, [])
239         d.addCallback(_check_empty)
240
241         return d