2 from zope.interface import Interface, implements
3 from twisted.python import failure, log
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap import Referenceable
8 from allmydata.util import idlib
9 from allmydata import encode
11 from cStringIO import StringIO
14 class NotEnoughPeersError(Exception):
17 class HaveAllPeersError(Exception):
18 # we use this to jump out of the loop
21 # this wants to live in storage, not here
22 class TooFullError(Exception):
28 def __init__(self, peer):
31 def set_filehandle(self, filehandle):
32 self._filehandle = filehandle
34 self._size = filehandle.tell()
37 def make_encoder(self):
38 self._needed_shares = 4
40 self._encoder = encode.Encoder(self._filehandle, self._shares)
41 self._share_size = self._size
43 def set_verifierid(self, vid):
44 assert isinstance(vid, str)
45 self._verifierid = vid
49 log.msg("starting upload")
51 print "starting upload"
52 # first step: who should we upload to?
54 # maybe limit max_peers to 2*len(self.shares), to reduce memory
58 self.permuted = self._peer.permute_peerids(self._verifierid, max_peers)
59 self._total_peers = len(self.permuted)
60 for p in self.permuted:
61 assert isinstance(p, str)
62 # we will shrink self.permuted as we give up on peers
64 self.goodness_points = 0
65 self.target_goodness = self._shares
66 self.landlords = [] # list of (peerid, bucket_num, remotebucket)
68 d = defer.maybeDeferred(self._check_next_peer)
69 d.addCallback(self._got_all_peers)
72 def _check_next_peer(self):
73 if len(self.permuted) == 0:
74 # there are no more to check
75 raise NotEnoughPeersError("%s goodness, want %s, have %d "
76 "landlords, %d total peers" %
77 (self.goodness_points,
81 if self.peer_index >= len(self.permuted):
84 peerid = self.permuted[self.peer_index]
86 d = self._peer.get_remote_service(peerid, "storageserver")
87 def _got_peer(service):
88 bucket_num = len(self.landlords)
89 if self.debug: print "asking %s" % idlib.b2a(peerid)
90 d2 = service.callRemote("allocate_bucket",
91 verifierid=self._verifierid,
92 bucket_num=bucket_num,
93 size=self._share_size,
94 leaser=self._peer.nodeid,
95 canary=Referenceable())
96 def _allocate_response(bucket):
98 print " peerid %s will grant us a lease" % idlib.b2a(peerid)
99 self.landlords.append( (peerid, bucket_num, bucket) )
100 self.goodness_points += 1
101 if self.goodness_points >= self.target_goodness:
102 if self.debug: print " we're done!"
103 raise HaveAllPeersError()
104 # otherwise we fall through to allocate more peers
105 d2.addCallback(_allocate_response)
107 d.addCallback(_got_peer)
108 def _done_with_peer(res):
109 if self.debug: print "done with peer %s:" % idlib.b2a(peerid)
110 if isinstance(res, failure.Failure):
111 if res.check(HaveAllPeersError):
112 if self.debug: print " all done"
115 if res.check(TooFullError):
116 if self.debug: print " too full"
117 elif res.check(IndexError):
118 if self.debug: print " no connection"
120 if self.debug: print " other error:", res
121 self.permuted.remove(peerid) # this peer was unusable
123 if self.debug: print " they gave us a lease"
124 # we get here for either good peers (when we still need
125 # more), or after checking a bad peer (and thus still need
126 # more). So now we need to grab a new peer.
128 return self._check_next_peer()
129 d.addBoth(_done_with_peer)
132 def _got_all_peers(self, res):
133 d = self._encoder.do_upload(self.landlords)
134 d.addCallback(lambda res: self._verifierid)
138 return "%d:%s," % (len(s), s)
140 class IUploadable(Interface):
141 def get_filehandle():
143 def close_filehandle(f):
147 implements(IUploadable)
148 def __init__(self, filename):
149 self._filename = filename
150 def get_filehandle(self):
151 return open(self._filename, "rb")
152 def close_filehandle(self, f):
156 implements(IUploadable)
157 def __init__(self, data):
159 def get_filehandle(self):
160 return StringIO(self._data)
161 def close_filehandle(self, f):
165 implements(IUploadable)
166 def __init__(self, filehandle):
167 self._filehandle = filehandle
168 def get_filehandle(self):
169 return self._filehandle
170 def close_filehandle(self, f):
171 # the originator of the filehandle reserves the right to close it
174 class Uploader(service.MultiService):
175 """I am a service that allows file uploading.
179 def _compute_verifierid(self, f):
180 hasher = sha.new(netstring("allmydata_v1_verifierid"))
182 hasher.update(f.read())
184 # note: this is only of the plaintext data, no encryption yet
185 return hasher.digest()
191 fh = f.get_filehandle()
192 u = FileUploader(self.parent)
194 u.set_verifierid(self._compute_verifierid(fh))
198 f.close_filehandle(fh)
204 def upload_data(self, data):
205 return self.upload(Data(data))
206 def upload_filename(self, filename):
207 return self.upload(FileName(filename))
208 def upload_filehandle(self, filehandle):
209 return self.upload(FileHandle(filehandle))