]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_helper.py
ea4486980df23c745e2ea7aa3767dde6df1c2b46
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_helper.py
1 import os
2 from twisted.trial import unittest
3 from twisted.application import service
4
5 from foolscap.api import Tub, fireEventually, flushEventualQueue
6 from foolscap.logging import log
7
8 from allmydata.storage.server import si_b2a
9 from allmydata.immutable import offloaded, upload
10 from allmydata import uri
11 from allmydata.util import hashutil, fileutil, mathutil
12 from pycryptopp.cipher.aes import AES
13
14 MiB = 1024*1024
15
16 DATA = "I need help\n" * 1000
17
18 class CHKUploadHelper_fake(offloaded.CHKUploadHelper):
19     def start_encrypted(self, eu):
20         d = eu.get_size()
21         def _got_size(size):
22             d2 = eu.get_all_encoding_parameters()
23             def _got_parms(parms):
24                 needed_shares, happy, total_shares, segsize = parms
25                 ueb_data = {"needed_shares": needed_shares,
26                             "total_shares": total_shares,
27                             "segment_size": segsize,
28                             "size": size,
29                             }
30                 self._results.uri_extension_data = ueb_data
31                 self._results.verifycapstr = uri.CHKFileVerifierURI(self._storage_index, "x"*32,
32                                                                  needed_shares, total_shares,
33                                                                  size).to_string()
34                 return self._results
35             d2.addCallback(_got_parms)
36             return d2
37         d.addCallback(_got_size)
38         return d
39
40 class CHKUploadHelper_already_uploaded(offloaded.CHKUploadHelper):
41     def start(self):
42         res = upload.UploadResults()
43         res.uri_extension_hash = hashutil.uri_extension_hash("")
44
45         # we're pretending that the file they're trying to upload was already
46         # present in the grid. We return some information about the file, so
47         # the client can decide if they like the way it looks. The parameters
48         # used here are chosen to match the defaults.
49         PARAMS = FakeClient.DEFAULT_ENCODING_PARAMETERS
50         ueb_data = {"needed_shares": PARAMS["k"],
51                     "total_shares": PARAMS["n"],
52                     "segment_size": min(PARAMS["max_segment_size"], len(DATA)),
53                     "size": len(DATA),
54                     }
55         res.uri_extension_data = ueb_data
56         return (res, None)
57
58 class FakeClient(service.MultiService):
59     DEFAULT_ENCODING_PARAMETERS = {"k":25,
60                                    "happy": 75,
61                                    "n": 100,
62                                    "max_segment_size": 1*MiB,
63                                    }
64     stats_provider = None
65     def log(self, *args, **kwargs):
66         return log.msg(*args, **kwargs)
67     def get_encoding_parameters(self):
68         return self.DEFAULT_ENCODING_PARAMETERS
69     def get_permuted_peers(self, service_name, storage_index):
70         return []
71
72 def flush_but_dont_ignore(res):
73     d = flushEventualQueue()
74     def _done(ignored):
75         return res
76     d.addCallback(_done)
77     return d
78
79 def wait_a_few_turns(ignored=None):
80     d = fireEventually()
81     d.addCallback(fireEventually)
82     d.addCallback(fireEventually)
83     d.addCallback(fireEventually)
84     d.addCallback(fireEventually)
85     d.addCallback(fireEventually)
86     return d
87
88 def upload_data(uploader, data, convergence):
89     u = upload.Data(data, convergence=convergence)
90     return uploader.upload(u)
91
92 class AssistedUpload(unittest.TestCase):
93     def setUp(self):
94         self.s = FakeClient()
95         self.s.startService()
96
97         self.tub = t = Tub()
98         t.setOption("expose-remote-exception-types", False)
99         t.setServiceParent(self.s)
100         self.s.tub = t
101         # we never actually use this for network traffic, so it can use a
102         # bogus host/port
103         t.setLocation("bogus:1234")
104
105     def setUpHelper(self, basedir):
106         fileutil.make_dirs(basedir)
107         self.helper = h = offloaded.Helper(basedir)
108         h.chk_upload_helper_class = CHKUploadHelper_fake
109         h.setServiceParent(self.s)
110         self.helper_furl = self.tub.registerReference(h)
111
112     def tearDown(self):
113         d = self.s.stopService()
114         d.addCallback(fireEventually)
115         d.addBoth(flush_but_dont_ignore)
116         return d
117
118
119     def test_one(self):
120         self.basedir = "helper/AssistedUpload/test_one"
121         self.setUpHelper(self.basedir)
122         u = upload.Uploader(self.helper_furl)
123         u.setServiceParent(self.s)
124
125         d = wait_a_few_turns()
126
127         def _ready(res):
128             assert u._helper
129
130             return upload_data(u, DATA, convergence="some convergence string")
131         d.addCallback(_ready)
132         def _uploaded(results):
133             the_uri = results.uri
134             assert "CHK" in the_uri
135         d.addCallback(_uploaded)
136
137         def _check_empty(res):
138             files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
139             self.failUnlessEqual(files, [])
140             files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
141             self.failUnlessEqual(files, [])
142         d.addCallback(_check_empty)
143
144         return d
145
146     def test_previous_upload_failed(self):
147         self.basedir = "helper/AssistedUpload/test_previous_upload_failed"
148         self.setUpHelper(self.basedir)
149
150         # we want to make sure that an upload which fails (leaving the
151         # ciphertext in the CHK_encoding/ directory) does not prevent a later
152         # attempt to upload that file from working. We simulate this by
153         # populating the directory manually. The hardest part is guessing the
154         # storage index.
155
156         k = FakeClient.DEFAULT_ENCODING_PARAMETERS["k"]
157         n = FakeClient.DEFAULT_ENCODING_PARAMETERS["n"]
158         max_segsize = FakeClient.DEFAULT_ENCODING_PARAMETERS["max_segment_size"]
159         segsize = min(max_segsize, len(DATA))
160         # this must be a multiple of 'required_shares'==k
161         segsize = mathutil.next_multiple(segsize, k)
162
163         key = hashutil.convergence_hash(k, n, segsize, DATA, "test convergence string")
164         assert len(key) == 16
165         encryptor = AES(key)
166         SI = hashutil.storage_index_hash(key)
167         SI_s = si_b2a(SI)
168         encfile = os.path.join(self.basedir, "CHK_encoding", SI_s)
169         f = open(encfile, "wb")
170         f.write(encryptor.process(DATA))
171         f.close()
172
173         u = upload.Uploader(self.helper_furl)
174         u.setServiceParent(self.s)
175
176         d = wait_a_few_turns()
177
178         def _ready(res):
179             assert u._helper
180             return upload_data(u, DATA, convergence="test convergence string")
181         d.addCallback(_ready)
182         def _uploaded(results):
183             the_uri = results.uri
184             assert "CHK" in the_uri
185         d.addCallback(_uploaded)
186
187         def _check_empty(res):
188             files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
189             self.failUnlessEqual(files, [])
190             files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
191             self.failUnlessEqual(files, [])
192         d.addCallback(_check_empty)
193
194         return d
195
196     def test_already_uploaded(self):
197         self.basedir = "helper/AssistedUpload/test_already_uploaded"
198         self.setUpHelper(self.basedir)
199         self.helper.chk_upload_helper_class = CHKUploadHelper_already_uploaded
200         u = upload.Uploader(self.helper_furl)
201         u.setServiceParent(self.s)
202
203         d = wait_a_few_turns()
204
205         def _ready(res):
206             assert u._helper
207
208             return upload_data(u, DATA, convergence="some convergence string")
209         d.addCallback(_ready)
210         def _uploaded(results):
211             the_uri = results.uri
212             assert "CHK" in the_uri
213         d.addCallback(_uploaded)
214
215         def _check_empty(res):
216             files = os.listdir(os.path.join(self.basedir, "CHK_encoding"))
217             self.failUnlessEqual(files, [])
218             files = os.listdir(os.path.join(self.basedir, "CHK_incoming"))
219             self.failUnlessEqual(files, [])
220         d.addCallback(_check_empty)
221
222         return d