]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_helper.py
Teach StorageFarmBroker to fire a deferred when a connection threshold is reached...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_helper.py
1 import os
2 from twisted.internet import defer
3 from twisted.trial import unittest
4 from twisted.application import service
5
6 from foolscap.api import Tub, fireEventually, flushEventualQueue
7
8 from allmydata.storage.server import si_b2a
9 from allmydata.storage_client import StorageFarmBroker
10 from allmydata.immutable import offloaded, upload
11 from allmydata import uri, client
12 from allmydata.util import hashutil, fileutil, mathutil
13 from pycryptopp.cipher.aes import AES
14
15 MiB = 1024*1024
16
17 DATA = "I need help\n" * 1000
18
19 class CHKUploadHelper_fake(offloaded.CHKUploadHelper):
20     def start_encrypted(self, eu):
21         d = eu.get_size()
22         def _got_size(size):
23             d2 = eu.get_all_encoding_parameters()
24             def _got_parms(parms):
25                 # just pretend we did the upload
26                 needed_shares, happy, total_shares, segsize = parms
27                 ueb_data = {"needed_shares": needed_shares,
28                             "total_shares": total_shares,
29                             "segment_size": segsize,
30                             "size": size,
31                             }
32                 ueb_hash = "fake"
33                 v = uri.CHKFileVerifierURI(self._storage_index, "x"*32,
34                                            needed_shares, total_shares, size)
35                 _UR = upload.UploadResults
36                 ur = _UR(file_size=size,
37                          ciphertext_fetched=0,
38                          preexisting_shares=0,
39                          pushed_shares=total_shares,
40                          sharemap={},
41                          servermap={},
42                          timings={},
43                          uri_extension_data=ueb_data,
44                          uri_extension_hash=ueb_hash,
45                          verifycapstr=v.to_string())
46                 self._upload_status.set_results(ur)
47                 return ur
48             d2.addCallback(_got_parms)
49             return d2
50         d.addCallback(_got_size)
51         return d
52
53 class Helper_fake_upload(offloaded.Helper):
54     def _make_chk_upload_helper(self, storage_index, lp):
55         si_s = si_b2a(storage_index)
56         incoming_file = os.path.join(self._chk_incoming, si_s)
57         encoding_file = os.path.join(self._chk_encoding, si_s)
58         uh = CHKUploadHelper_fake(storage_index, self,
59                                   self._storage_broker,
60                                   self._secret_holder,
61                                   incoming_file, encoding_file,
62                                   lp)
63         return uh
64
65 class Helper_already_uploaded(Helper_fake_upload):
66     def _check_chk(self, storage_index, lp):
67         res = upload.HelperUploadResults()
68         res.uri_extension_hash = hashutil.uri_extension_hash("")
69
70         # we're pretending that the file they're trying to upload was already
71         # present in the grid. We return some information about the file, so
72         # the client can decide if they like the way it looks. The parameters
73         # used here are chosen to match the defaults.
74         PARAMS = FakeClient.DEFAULT_ENCODING_PARAMETERS
75         ueb_data = {"needed_shares": PARAMS["k"],
76                     "total_shares": PARAMS["n"],
77                     "segment_size": min(PARAMS["max_segment_size"], len(DATA)),
78                     "size": len(DATA),
79                     }
80         res.uri_extension_data = ueb_data
81         return defer.succeed(res)
82
83 class FakeClient(service.MultiService):
84     DEFAULT_ENCODING_PARAMETERS = {"k":25,
85                                    "happy": 75,
86                                    "n": 100,
87                                    "max_segment_size": 1*MiB,
88                                    }
89
90     def get_encoding_parameters(self):
91         return self.DEFAULT_ENCODING_PARAMETERS
92     def get_storage_broker(self):
93         return self.storage_broker
94
95 def flush_but_dont_ignore(res):
96     d = flushEventualQueue()
97     def _done(ignored):
98         return res
99     d.addCallback(_done)
100     return d
101
102 def wait_a_few_turns(ignored=None):
103     d = fireEventually()
104     d.addCallback(fireEventually)
105     d.addCallback(fireEventually)
106     d.addCallback(fireEventually)
107     d.addCallback(fireEventually)
108     d.addCallback(fireEventually)
109     return d
110
111 def upload_data(uploader, data, convergence):
112     u = upload.Data(data, convergence=convergence)
113     return uploader.upload(u)
114
115 class AssistedUpload(unittest.TestCase):
116     timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
117     def setUp(self):
118         self.s = FakeClient()
119         self.s.storage_broker = StorageFarmBroker(None, True, 0, None)
120         self.s.secret_holder = client.SecretHolder("lease secret", "converge")
121         self.s.startService()
122
123         self.tub = t = Tub()
124         t.setOption("expose-remote-exception-types", False)
125         t.setServiceParent(self.s)
126         self.s.tub = t
127         # we never actually use this for network traffic, so it can use a
128         # bogus host/port
129         t.setLocation("bogus:1234")
130
131     def setUpHelper(self, basedir, helper_class=Helper_fake_upload):
132         fileutil.make_dirs(basedir)
133         self.helper = h = helper_class(basedir,
134                                        self.s.storage_broker,
135                                        self.s.secret_holder,
136                                        None, None)
137         self.helper_furl = self.tub.registerReference(h)
138
139     def tearDown(self):
140         d = self.s.stopService()
141         d.addCallback(fireEventually)
142         d.addBoth(flush_but_dont_ignore)
143         return d
144
145
146     def test_one(self):
147         self.basedir = "helper/AssistedUpload/test_one"
148         self.setUpHelper(self.basedir)
149         u = upload.Uploader(self.helper_furl)
150         u.setServiceParent(self.s)
151
152         d = wait_a_few_turns()
153
154         def _ready(res):
155             assert u._helper
156
157             return upload_data(u, DATA, convergence="some convergence string")
158         d.addCallback(_ready)
159         def _uploaded(results):
160             the_uri = results.get_uri()
161             assert "CHK" in the_uri
162         d.addCallback(_uploaded)
163
164         def _check_empty(res):
165             files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
166             self.failUnlessEqual(files, [])
167             files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
168             self.failUnlessEqual(files, [])
169         d.addCallback(_check_empty)
170
171         return d
172
173     def test_previous_upload_failed(self):
174         self.basedir = "helper/AssistedUpload/test_previous_upload_failed"
175         self.setUpHelper(self.basedir)
176
177         # we want to make sure that an upload which fails (leaving the
178         # ciphertext in the CHK_encoding/ directory) does not prevent a later
179         # attempt to upload that file from working. We simulate this by
180         # populating the directory manually. The hardest part is guessing the
181         # storage index.
182
183         k = FakeClient.DEFAULT_ENCODING_PARAMETERS["k"]
184         n = FakeClient.DEFAULT_ENCODING_PARAMETERS["n"]
185         max_segsize = FakeClient.DEFAULT_ENCODING_PARAMETERS["max_segment_size"]
186         segsize = min(max_segsize, len(DATA))
187         # this must be a multiple of 'required_shares'==k
188         segsize = mathutil.next_multiple(segsize, k)
189
190         key = hashutil.convergence_hash(k, n, segsize, DATA, "test convergence string")
191         assert len(key) == 16
192         encryptor = AES(key)
193         SI = hashutil.storage_index_hash(key)
194         SI_s = si_b2a(SI)
195         encfile = os.path.join(self.basedir, "CHK_encoding", SI_s)
196         f = open(encfile, "wb")
197         f.write(encryptor.process(DATA))
198         f.close()
199
200         u = upload.Uploader(self.helper_furl)
201         u.setServiceParent(self.s)
202
203         d = wait_a_few_turns()
204
205         def _ready(res):
206             assert u._helper
207             return upload_data(u, DATA, convergence="test convergence string")
208         d.addCallback(_ready)
209         def _uploaded(results):
210             the_uri = results.get_uri()
211             assert "CHK" in the_uri
212         d.addCallback(_uploaded)
213
214         def _check_empty(res):
215             files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
216             self.failUnlessEqual(files, [])
217             files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
218             self.failUnlessEqual(files, [])
219         d.addCallback(_check_empty)
220
221         return d
222
223     def test_already_uploaded(self):
224         self.basedir = "helper/AssistedUpload/test_already_uploaded"
225         self.setUpHelper(self.basedir, helper_class=Helper_already_uploaded)
226         u = upload.Uploader(self.helper_furl)
227         u.setServiceParent(self.s)
228
229         d = wait_a_few_turns()
230
231         def _ready(res):
232             assert u._helper
233
234             return upload_data(u, DATA, convergence="some convergence string")
235         d.addCallback(_ready)
236         def _uploaded(results):
237             the_uri = results.get_uri()
238             assert "CHK" in the_uri
239         d.addCallback(_uploaded)
240
241         def _check_empty(res):
242             files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
243             self.failUnlessEqual(files, [])
244             files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
245             self.failUnlessEqual(files, [])
246         d.addCallback(_check_empty)
247
248         return d