]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - allmydata/download.py
allmydata.Crypto: fix all internal imports
[tahoe-lafs/tahoe-lafs.git] / allmydata / download.py
1
2 import os
3 from zope.interface import Interface, implements
4 from twisted.python import failure, log
5 from twisted.internet import defer
6 from twisted.application import service
7
8 from allmydata.util import idlib
9 from allmydata import encode
10
11 class NotEnoughPeersError(Exception):
12     pass
13
14 class HaveAllPeersError(Exception):
15     # we use this to jump out of the loop
16     pass
17
18 class FileDownloader:
19     debug = False
20
21     def __init__(self, peer, verifierid):
22         self._peer = peer
23         assert isinstance(verifierid, str)
24         self._verifierid = verifierid
25
26     def set_download_target(self, target):
27         self._target = target
28         self._target.register_canceller(self._cancel)
29
30     def _cancel(self):
31         pass
32
33     def make_decoder(self):
34         n = self._shares = 4
35         k = self._desired_shares = 2
36         self._target.open()
37         self._decoder = encode.Decoder(self._target, k, n,
38                                        self._verifierid)
39
40     def start(self):
41         log.msg("starting download")
42         if self.debug:
43             print "starting download"
44         # first step: who should we download from?
45
46         # maybe limit max_peers to 2*len(self.shares), to reduce memory
47         # footprint
48         max_peers = None
49
50         self.permuted = self._peer.permute_peerids(self._verifierid, max_peers)
51         for p in self.permuted:
52             assert isinstance(p, str)
53         self.landlords = [] # list of (peerid, bucket_num, remotebucket)
54
55         d = defer.maybeDeferred(self._check_next_peer)
56         d.addCallback(self._got_all_peers)
57         return d
58
59     def _check_next_peer(self):
60         if len(self.permuted) == 0:
61             # there are no more to check
62             raise NotEnoughPeersError
63         peerid = self.permuted.pop(0)
64
65         d = self._peer.get_remote_service(peerid, "storageserver")
66         def _got_peer(service):
67             bucket_num = len(self.landlords)
68             if self.debug: print "asking %s" % idlib.b2a(peerid)
69             d2 = service.callRemote("get_buckets", verifierid=self._verifierid)
70             def _got_response(buckets):
71                 if buckets:
72                     bucket_nums = [num for (num,bucket) in buckets]
73                     if self.debug:
74                         print " peerid %s has buckets %s" % (idlib.b2a(peerid),
75                                                              bucket_nums)
76
77                     self.landlords.append( (peerid, buckets) )
78                 if len(self.landlords) >= self._desired_shares:
79                     if self.debug: print " we're done!"
80                     raise HaveAllPeersError
81                 # otherwise we fall through to search more peers
82             d2.addCallback(_got_response)
83             return d2
84         d.addCallback(_got_peer)
85
86         def _done_with_peer(res):
87             if self.debug: print "done with peer %s:" % idlib.b2a(peerid)
88             if isinstance(res, failure.Failure):
89                 if res.check(HaveAllPeersError):
90                     if self.debug: print " all done"
91                     # we're done!
92                     return
93                 if res.check(IndexError):
94                     if self.debug: print " no connection"
95                 else:
96                     if self.debug: print " other error:", res
97             else:
98                 if self.debug: print " they had data for us"
99             # we get here for either good peers (when we still need more), or
100             # after checking a bad peer (and thus still need more). So now we
101             # need to grab a new peer.
102             return self._check_next_peer()
103         d.addBoth(_done_with_peer)
104         return d
105
106     def _got_all_peers(self, res):
107         all_buckets = []
108         for peerid, buckets in self.landlords:
109             all_buckets.extend(buckets)
110         d = self._decoder.start(all_buckets)
111         def _done(res):
112             self._target.close()
113             return self._target.finish()
114         def _fail(res):
115             self._target.fail()
116             return res
117         d.addCallbacks(_done, _fail)
118         return d
119
120 def netstring(s):
121     return "%d:%s," % (len(s), s)
122
123 class IDownloadTarget(Interface):
124     def open():
125         """Called before any calls to write() or close()."""
126     def write(data):
127         pass
128     def close():
129         pass
130     def fail():
131         """fail() is called to indicate that the download has failed. No
132         further methods will be invoked on the IDownloadTarget after fail()."""
133     def register_canceller(cb):
134         """The FileDownloader uses this to register a no-argument function
135         that the target can call to cancel the download. Once this canceller
136         is invoked, no further calls to write() or close() will be made."""
137     def finish(self):
138         """When the FileDownloader is done, this finish() function will be
139         called. Whatever it returns will be returned to the invoker of
140         Downloader.download.
141         """
142
143 class FileName:
144     implements(IDownloadTarget)
145     def __init__(self, filename):
146         self._filename = filename
147     def open(self):
148         self.f = open(self._filename, "wb")
149         return self.f
150     def write(self, data):
151         self.f.write(data)
152     def close(self):
153         self.f.close()
154     def fail(self):
155         self.f.close()
156         os.unlink(self._filename)
157     def register_canceller(self, cb):
158         pass # we won't use it
159     def finish(self):
160         pass
161
162 class Data:
163     implements(IDownloadTarget)
164     def __init__(self):
165         self._data = []
166     def open(self):
167         pass
168     def write(self, data):
169         self._data.append(data)
170     def close(self):
171         self.data = "".join(self._data)
172         del self._data
173     def fail(self):
174         del self._data
175     def register_canceller(self, cb):
176         pass # we won't use it
177     def finish(self):
178         return self.data
179
180 class FileHandle:
181     implements(IDownloadTarget)
182     def __init__(self, filehandle):
183         self._filehandle = filehandle
184     def open(self):
185         pass
186     def write(self, data):
187         self._filehandle.write(data)
188     def close(self):
189         # the originator of the filehandle reserves the right to close it
190         pass
191     def fail(self):
192         pass
193     def register_canceller(self, cb):
194         pass
195     def finish(self):
196         pass
197
198 class IDownloader(Interface):
199     def download(verifierid, target):
200         pass
201
202 class Downloader(service.MultiService):
203     """I am a service that allows file downloading.
204     """
205     implements(IDownloader)
206     name = "downloader"
207
208     def download(self, verifierid, t):
209         assert self.parent
210         assert self.running
211         assert isinstance(verifierid, str)
212         t = IDownloadTarget(t)
213         assert t.write
214         assert t.close
215         dl = FileDownloader(self.parent, verifierid)
216         dl.set_download_target(t)
217         dl.make_decoder()
218         d = dl.start()
219         return d
220
221     # utility functions
222     def download_to_data(self, verifierid):
223         return self.download(verifierid, Data())
224     def download_to_filename(self, verifierid, filename):
225         return self.download(verifierid, FileName(filename))
226     def download_to_filehandle(self, verifierid, filehandle):
227         return self.download(verifierid, FileHandle(filehandle))
228
229