]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_helper.py
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_helper.py
1 import os
2 from twisted.trial import unittest
3 from twisted.application import service
4
5 from foolscap.api import Tub, fireEventually, flushEventualQueue
6 from foolscap.logging import log
7
8 from allmydata.storage.server import si_b2a
9 from allmydata.storage_client import StorageFarmBroker
10 from allmydata.immutable import offloaded, upload
11 from allmydata import uri, client
12 from allmydata.util import hashutil, fileutil, mathutil
13 from pycryptopp.cipher.aes import AES
14
15 MiB = 1024*1024
16
17 DATA = "I need help\n" * 1000
18
19 class CHKUploadHelper_fake(offloaded.CHKUploadHelper):
20     def start_encrypted(self, eu):
21         d = eu.get_size()
22         def _got_size(size):
23             d2 = eu.get_all_encoding_parameters()
24             def _got_parms(parms):
25                 needed_shares, happy, total_shares, segsize = parms
26                 ueb_data = {"needed_shares": needed_shares,
27                             "total_shares": total_shares,
28                             "segment_size": segsize,
29                             "size": size,
30                             }
31                 self._results.uri_extension_data = ueb_data
32                 self._results.verifycapstr = uri.CHKFileVerifierURI(self._storage_index, "x"*32,
33                                                                  needed_shares, total_shares,
34                                                                  size).to_string()
35                 return self._results
36             d2.addCallback(_got_parms)
37             return d2
38         d.addCallback(_got_size)
39         return d
40
41 class CHKUploadHelper_already_uploaded(offloaded.CHKUploadHelper):
42     def start(self):
43         res = upload.UploadResults()
44         res.uri_extension_hash = hashutil.uri_extension_hash("")
45
46         # we're pretending that the file they're trying to upload was already
47         # present in the grid. We return some information about the file, so
48         # the client can decide if they like the way it looks. The parameters
49         # used here are chosen to match the defaults.
50         PARAMS = FakeClient.DEFAULT_ENCODING_PARAMETERS
51         ueb_data = {"needed_shares": PARAMS["k"],
52                     "total_shares": PARAMS["n"],
53                     "segment_size": min(PARAMS["max_segment_size"], len(DATA)),
54                     "size": len(DATA),
55                     }
56         res.uri_extension_data = ueb_data
57         return (res, None)
58
59 class FakeClient(service.MultiService):
60     DEFAULT_ENCODING_PARAMETERS = {"k":25,
61                                    "happy": 75,
62                                    "n": 100,
63                                    "max_segment_size": 1*MiB,
64                                    }
65     stats_provider = None
66     storage_broker = StorageFarmBroker(None, True)
67     _secret_holder = client.SecretHolder("lease secret")
68     def log(self, *args, **kwargs):
69         return log.msg(*args, **kwargs)
70     def get_encoding_parameters(self):
71         return self.DEFAULT_ENCODING_PARAMETERS
72     def get_storage_broker(self):
73         return self.storage_broker
74
75 def flush_but_dont_ignore(res):
76     d = flushEventualQueue()
77     def _done(ignored):
78         return res
79     d.addCallback(_done)
80     return d
81
82 def wait_a_few_turns(ignored=None):
83     d = fireEventually()
84     d.addCallback(fireEventually)
85     d.addCallback(fireEventually)
86     d.addCallback(fireEventually)
87     d.addCallback(fireEventually)
88     d.addCallback(fireEventually)
89     return d
90
91 def upload_data(uploader, data, convergence):
92     u = upload.Data(data, convergence=convergence)
93     return uploader.upload(u)
94
95 class AssistedUpload(unittest.TestCase):
96     timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
97     def setUp(self):
98         self.s = FakeClient()
99         self.s.startService()
100
101         self.tub = t = Tub()
102         t.setOption("expose-remote-exception-types", False)
103         t.setServiceParent(self.s)
104         self.s.tub = t
105         # we never actually use this for network traffic, so it can use a
106         # bogus host/port
107         t.setLocation("bogus:1234")
108
109     def setUpHelper(self, basedir):
110         fileutil.make_dirs(basedir)
111         self.helper = h = offloaded.Helper(basedir)
112         h.chk_upload_helper_class = CHKUploadHelper_fake
113         h.setServiceParent(self.s)
114         self.helper_furl = self.tub.registerReference(h)
115
116     def tearDown(self):
117         d = self.s.stopService()
118         d.addCallback(fireEventually)
119         d.addBoth(flush_but_dont_ignore)
120         return d
121
122
123     def test_one(self):
124         self.basedir = "helper/AssistedUpload/test_one"
125         self.setUpHelper(self.basedir)
126         u = upload.Uploader(self.helper_furl)
127         u.setServiceParent(self.s)
128
129         d = wait_a_few_turns()
130
131         def _ready(res):
132             assert u._helper
133
134             return upload_data(u, DATA, convergence="some convergence string")
135         d.addCallback(_ready)
136         def _uploaded(results):
137             the_uri = results.uri
138             assert "CHK" in the_uri
139         d.addCallback(_uploaded)
140
141         def _check_empty(res):
142             files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
143             self.failUnlessEqual(files, [])
144             files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
145             self.failUnlessEqual(files, [])
146         d.addCallback(_check_empty)
147
148         return d
149
150     def test_previous_upload_failed(self):
151         self.basedir = "helper/AssistedUpload/test_previous_upload_failed"
152         self.setUpHelper(self.basedir)
153
154         # we want to make sure that an upload which fails (leaving the
155         # ciphertext in the CHK_encoding/ directory) does not prevent a later
156         # attempt to upload that file from working. We simulate this by
157         # populating the directory manually. The hardest part is guessing the
158         # storage index.
159
160         k = FakeClient.DEFAULT_ENCODING_PARAMETERS["k"]
161         n = FakeClient.DEFAULT_ENCODING_PARAMETERS["n"]
162         max_segsize = FakeClient.DEFAULT_ENCODING_PARAMETERS["max_segment_size"]
163         segsize = min(max_segsize, len(DATA))
164         # this must be a multiple of 'required_shares'==k
165         segsize = mathutil.next_multiple(segsize, k)
166
167         key = hashutil.convergence_hash(k, n, segsize, DATA, "test convergence string")
168         assert len(key) == 16
169         encryptor = AES(key)
170         SI = hashutil.storage_index_hash(key)
171         SI_s = si_b2a(SI)
172         encfile = os.path.join(self.basedir, "CHK_encoding", SI_s)
173         f = open(encfile, "wb")
174         f.write(encryptor.process(DATA))
175         f.close()
176
177         u = upload.Uploader(self.helper_furl)
178         u.setServiceParent(self.s)
179
180         d = wait_a_few_turns()
181
182         def _ready(res):
183             assert u._helper
184             return upload_data(u, DATA, convergence="test convergence string")
185         d.addCallback(_ready)
186         def _uploaded(results):
187             the_uri = results.uri
188             assert "CHK" in the_uri
189         d.addCallback(_uploaded)
190
191         def _check_empty(res):
192             files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
193             self.failUnlessEqual(files, [])
194             files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
195             self.failUnlessEqual(files, [])
196         d.addCallback(_check_empty)
197
198         return d
199
200     def test_already_uploaded(self):
201         self.basedir = "helper/AssistedUpload/test_already_uploaded"
202         self.setUpHelper(self.basedir)
203         self.helper.chk_upload_helper_class = CHKUploadHelper_already_uploaded
204         u = upload.Uploader(self.helper_furl)
205         u.setServiceParent(self.s)
206
207         d = wait_a_few_turns()
208
209         def _ready(res):
210             assert u._helper
211
212             return upload_data(u, DATA, convergence="some convergence string")
213         d.addCallback(_ready)
214         def _uploaded(results):
215             the_uri = results.uri
216             assert "CHK" in the_uri
217         d.addCallback(_uploaded)
218
219         def _check_empty(res):
220             files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
221             self.failUnlessEqual(files, [])
222             files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
223             self.failUnlessEqual(files, [])
224         d.addCallback(_check_empty)
225
226         return d