]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - allmydata/upload.py
allmydata.Crypto: fix all internal imports
[tahoe-lafs/tahoe-lafs.git] / allmydata / upload.py
1
2 from zope.interface import Interface, implements
3 from twisted.python import failure, log
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap import Referenceable
7
8 from allmydata.util import idlib
9 from allmydata import encode
10
11 from cStringIO import StringIO
12 import sha
13
14 class NotEnoughPeersError(Exception):
15     pass
16
17 class HaveAllPeersError(Exception):
18     # we use this to jump out of the loop
19     pass
20
21 # this wants to live in storage, not here
22 class TooFullError(Exception):
23     pass
24
25 class FileUploader:
26     debug = False
27
28     def __init__(self, peer):
29         self._peer = peer
30
31     def set_filehandle(self, filehandle):
32         self._filehandle = filehandle
33         filehandle.seek(0, 2)
34         self._size = filehandle.tell()
35         filehandle.seek(0)
36
37     def make_encoder(self):
38         self._needed_shares = 4
39         self._shares = 4
40         self._encoder = encode.Encoder(self._filehandle, self._shares)
41         self._share_size = self._size
42
43     def set_verifierid(self, vid):
44         assert isinstance(vid, str)
45         self._verifierid = vid
46
47
48     def start(self):
49         log.msg("starting upload")
50         if self.debug:
51             print "starting upload"
52         # first step: who should we upload to?
53
54         # maybe limit max_peers to 2*len(self.shares), to reduce memory
55         # footprint
56         max_peers = None
57
58         self.permuted = self._peer.permute_peerids(self._verifierid, max_peers)
59         self._total_peers = len(self.permuted)
60         for p in self.permuted:
61             assert isinstance(p, str)
62         # we will shrink self.permuted as we give up on peers
63         self.peer_index = 0
64         self.goodness_points = 0
65         self.target_goodness = self._shares
66         self.landlords = [] # list of (peerid, bucket_num, remotebucket)
67
68         d = defer.maybeDeferred(self._check_next_peer)
69         d.addCallback(self._got_all_peers)
70         return d
71
72     def _check_next_peer(self):
73         if len(self.permuted) == 0:
74             # there are no more to check
75             raise NotEnoughPeersError("%s goodness, want %s, have %d "
76                                       "landlords, %d total peers" %
77                                       (self.goodness_points,
78                                        self.target_goodness,
79                                        len(self.landlords),
80                                        self._total_peers))
81         if self.peer_index >= len(self.permuted):
82             self.peer_index = 0
83
84         peerid = self.permuted[self.peer_index]
85
86         d = self._peer.get_remote_service(peerid, "storageserver")
87         def _got_peer(service):
88             bucket_num = len(self.landlords)
89             if self.debug: print "asking %s" % idlib.b2a(peerid)
90             d2 = service.callRemote("allocate_bucket",
91                                     verifierid=self._verifierid,
92                                     bucket_num=bucket_num,
93                                     size=self._share_size,
94                                     leaser=self._peer.nodeid,
95                                     canary=Referenceable())
96             def _allocate_response(bucket):
97                 if self.debug:
98                     print " peerid %s will grant us a lease" % idlib.b2a(peerid)
99                 self.landlords.append( (peerid, bucket_num, bucket) )
100                 self.goodness_points += 1
101                 if self.goodness_points >= self.target_goodness:
102                     if self.debug: print " we're done!"
103                     raise HaveAllPeersError()
104                 # otherwise we fall through to allocate more peers
105             d2.addCallback(_allocate_response)
106             return d2
107         d.addCallback(_got_peer)
108         def _done_with_peer(res):
109             if self.debug: print "done with peer %s:" % idlib.b2a(peerid)
110             if isinstance(res, failure.Failure):
111                 if res.check(HaveAllPeersError):
112                     if self.debug: print " all done"
113                     # we're done!
114                     return
115                 if res.check(TooFullError):
116                     if self.debug: print " too full"
117                 elif res.check(IndexError):
118                     if self.debug: print " no connection"
119                 else:
120                     if self.debug: print " other error:", res
121                 self.permuted.remove(peerid) # this peer was unusable
122             else:
123                 if self.debug: print " they gave us a lease"
124                 # we get here for either good peers (when we still need
125                 # more), or after checking a bad peer (and thus still need
126                 # more). So now we need to grab a new peer.
127                 self.peer_index += 1
128             return self._check_next_peer()
129         d.addBoth(_done_with_peer)
130         return d
131
132     def _got_all_peers(self, res):
133         d = self._encoder.do_upload(self.landlords)
134         d.addCallback(lambda res: self._verifierid)
135         return d
136
137 def netstring(s):
138     return "%d:%s," % (len(s), s)
139
140 class IUploadable(Interface):
141     def get_filehandle():
142         pass
143     def close_filehandle(f):
144         pass
145
146 class FileName:
147     implements(IUploadable)
148     def __init__(self, filename):
149         self._filename = filename
150     def get_filehandle(self):
151         return open(self._filename, "rb")
152     def close_filehandle(self, f):
153         f.close()
154
155 class Data:
156     implements(IUploadable)
157     def __init__(self, data):
158         self._data = data
159     def get_filehandle(self):
160         return StringIO(self._data)
161     def close_filehandle(self, f):
162         pass
163
164 class FileHandle:
165     implements(IUploadable)
166     def __init__(self, filehandle):
167         self._filehandle = filehandle
168     def get_filehandle(self):
169         return self._filehandle
170     def close_filehandle(self, f):
171         # the originator of the filehandle reserves the right to close it
172         pass
173
174 class Uploader(service.MultiService):
175     """I am a service that allows file uploading.
176     """
177     name = "uploader"
178
179     def _compute_verifierid(self, f):
180         hasher = sha.new(netstring("allmydata_v1_verifierid"))
181         f.seek(0)
182         hasher.update(f.read())
183         f.seek(0)
184         # note: this is only of the plaintext data, no encryption yet
185         return hasher.digest()
186
187     def upload(self, f):
188         assert self.parent
189         assert self.running
190         f = IUploadable(f)
191         fh = f.get_filehandle()
192         u = FileUploader(self.parent)
193         u.set_filehandle(fh)
194         u.set_verifierid(self._compute_verifierid(fh))
195         u.make_encoder()
196         d = u.start()
197         def _done(res):
198             f.close_filehandle(fh)
199             return res
200         d.addBoth(_done)
201         return d
202
203     # utility functions
204     def upload_data(self, data):
205         return self.upload(Data(data))
206     def upload_filename(self, filename):
207         return self.upload(FileName(filename))
208     def upload_filehandle(self, filehandle):
209         return self.upload(FileHandle(filehandle))