3 from zope.interface import Interface, implements
4 from twisted.python import failure, log
5 from twisted.internet import defer
6 from twisted.application import service
8 from allmydata.util import idlib
9 from allmydata import encode
11 class NotEnoughPeersError(Exception):
14 class HaveAllPeersError(Exception):
15 # we use this to jump out of the loop
21 def __init__(self, peer, verifierid):
23 assert isinstance(verifierid, str)
24 self._verifierid = verifierid
26 def set_download_target(self, target):
28 self._target.register_canceller(self._cancel)
33 def make_decoder(self):
35 k = self._desired_shares = 2
37 self._decoder = encode.Decoder(self._target, k, n,
41 log.msg("starting download")
43 print "starting download"
44 # first step: who should we download from?
46 # maybe limit max_peers to 2*len(self.shares), to reduce memory
50 self.permuted = self._peer.permute_peerids(self._verifierid, max_peers)
51 for p in self.permuted:
52 assert isinstance(p, str)
53 self.landlords = [] # list of (peerid, bucket_num, remotebucket)
55 d = defer.maybeDeferred(self._check_next_peer)
56 d.addCallback(self._got_all_peers)
59 def _check_next_peer(self):
60 if len(self.permuted) == 0:
61 # there are no more to check
62 raise NotEnoughPeersError
63 peerid = self.permuted.pop(0)
65 d = self._peer.get_remote_service(peerid, "storageserver")
66 def _got_peer(service):
67 bucket_num = len(self.landlords)
68 if self.debug: print "asking %s" % idlib.b2a(peerid)
69 d2 = service.callRemote("get_buckets", verifierid=self._verifierid)
70 def _got_response(buckets):
72 bucket_nums = [num for (num,bucket) in buckets]
74 print " peerid %s has buckets %s" % (idlib.b2a(peerid),
77 self.landlords.append( (peerid, buckets) )
78 if len(self.landlords) >= self._desired_shares:
79 if self.debug: print " we're done!"
80 raise HaveAllPeersError
81 # otherwise we fall through to search more peers
82 d2.addCallback(_got_response)
84 d.addCallback(_got_peer)
86 def _done_with_peer(res):
87 if self.debug: print "done with peer %s:" % idlib.b2a(peerid)
88 if isinstance(res, failure.Failure):
89 if res.check(HaveAllPeersError):
90 if self.debug: print " all done"
93 if res.check(IndexError):
94 if self.debug: print " no connection"
96 if self.debug: print " other error:", res
98 if self.debug: print " they had data for us"
99 # we get here for either good peers (when we still need more), or
100 # after checking a bad peer (and thus still need more). So now we
101 # need to grab a new peer.
102 return self._check_next_peer()
103 d.addBoth(_done_with_peer)
106 def _got_all_peers(self, res):
108 for peerid, buckets in self.landlords:
109 all_buckets.extend(buckets)
110 d = self._decoder.start(all_buckets)
113 return self._target.finish()
117 d.addCallbacks(_done, _fail)
121 return "%d:%s," % (len(s), s)
123 class IDownloadTarget(Interface):
125 """Called before any calls to write() or close()."""
131 """fail() is called to indicate that the download has failed. No
132 further methods will be invoked on the IDownloadTarget after fail()."""
133 def register_canceller(cb):
134 """The FileDownloader uses this to register a no-argument function
135 that the target can call to cancel the download. Once this canceller
136 is invoked, no further calls to write() or close() will be made."""
138 """When the FileDownloader is done, this finish() function will be
139 called. Whatever it returns will be returned to the invoker of
144 implements(IDownloadTarget)
145 def __init__(self, filename):
146 self._filename = filename
148 self.f = open(self._filename, "wb")
150 def write(self, data):
156 os.unlink(self._filename)
157 def register_canceller(self, cb):
158 pass # we won't use it
163 implements(IDownloadTarget)
168 def write(self, data):
169 self._data.append(data)
171 self.data = "".join(self._data)
175 def register_canceller(self, cb):
176 pass # we won't use it
181 implements(IDownloadTarget)
182 def __init__(self, filehandle):
183 self._filehandle = filehandle
186 def write(self, data):
187 self._filehandle.write(data)
189 # the originator of the filehandle reserves the right to close it
193 def register_canceller(self, cb):
198 class IDownloader(Interface):
199 def download(verifierid, target):
202 class Downloader(service.MultiService):
203 """I am a service that allows file downloading.
205 implements(IDownloader)
208 def download(self, verifierid, t):
211 assert isinstance(verifierid, str)
212 t = IDownloadTarget(t)
215 dl = FileDownloader(self.parent, verifierid)
216 dl.set_download_target(t)
222 def download_to_data(self, verifierid):
223 return self.download(verifierid, Data())
224 def download_to_filename(self, verifierid, filename):
225 return self.download(verifierid, FileName(filename))
226 def download_to_filehandle(self, verifierid, filehandle):
227 return self.download(verifierid, FileHandle(filehandle))