]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/publish.py
e5f44c7872885b99089af64c48782051c5122d9d
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / publish.py
1
2
3 import os, time
4 from StringIO import StringIO
5 from itertools import count
6 from zope.interface import implements
7 from twisted.internet import defer
8 from twisted.python import failure
9 from allmydata.interfaces import IPublishStatus, SDMF_VERSION, MDMF_VERSION, \
10                                  IMutableUploadable
11 from allmydata.util import base32, hashutil, mathutil, idlib, log
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from foolscap.api import eventually, fireEventually
17
18 from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, \
19      UncoordinatedWriteError, NotEnoughServersError
20 from allmydata.mutable.servermap import ServerMap
21 from allmydata.mutable.layout import unpack_checkstring, MDMFSlotWriteProxy, \
22                                      SDMFSlotWriteProxy
23
24 KiB = 1024
25 DEFAULT_MAX_SEGMENT_SIZE = 128 * KiB
26 PUSHING_BLOCKS_STATE = 0
27 PUSHING_EVERYTHING_ELSE_STATE = 1
28 DONE_STATE = 2
29
30 class PublishStatus:
31     implements(IPublishStatus)
32     statusid_counter = count(0)
33     def __init__(self):
34         self.timings = {}
35         self.timings["send_per_server"] = {}
36         self.timings["encrypt"] = 0.0
37         self.timings["encode"] = 0.0
38         self.servermap = None
39         self.problems = {}
40         self.active = True
41         self.storage_index = None
42         self.helper = False
43         self.encoding = ("?", "?")
44         self.size = None
45         self.status = "Not started"
46         self.progress = 0.0
47         self.counter = self.statusid_counter.next()
48         self.started = time.time()
49
50     def add_per_server_time(self, peerid, elapsed):
51         if peerid not in self.timings["send_per_server"]:
52             self.timings["send_per_server"][peerid] = []
53         self.timings["send_per_server"][peerid].append(elapsed)
54     def accumulate_encode_time(self, elapsed):
55         self.timings["encode"] += elapsed
56     def accumulate_encrypt_time(self, elapsed):
57         self.timings["encrypt"] += elapsed
58
59     def get_started(self):
60         return self.started
61     def get_storage_index(self):
62         return self.storage_index
63     def get_encoding(self):
64         return self.encoding
65     def using_helper(self):
66         return self.helper
67     def get_servermap(self):
68         return self.servermap
69     def get_size(self):
70         return self.size
71     def get_status(self):
72         return self.status
73     def get_progress(self):
74         return self.progress
75     def get_active(self):
76         return self.active
77     def get_counter(self):
78         return self.counter
79
80     def set_storage_index(self, si):
81         self.storage_index = si
82     def set_helper(self, helper):
83         self.helper = helper
84     def set_servermap(self, servermap):
85         self.servermap = servermap
86     def set_encoding(self, k, n):
87         self.encoding = (k, n)
88     def set_size(self, size):
89         self.size = size
90     def set_status(self, status):
91         self.status = status
92     def set_progress(self, value):
93         self.progress = value
94     def set_active(self, value):
95         self.active = value
96
97 class LoopLimitExceededError(Exception):
98     pass
99
100 class Publish:
101     """I represent a single act of publishing the mutable file to the grid. I
102     will only publish my data if the servermap I am using still represents
103     the current state of the world.
104
105     To make the initial publish, set servermap to None.
106     """
107
108     def __init__(self, filenode, storage_broker, servermap):
109         self._node = filenode
110         self._storage_broker = storage_broker
111         self._servermap = servermap
112         self._storage_index = self._node.get_storage_index()
113         self._log_prefix = prefix = si_b2a(self._storage_index)[:5]
114         num = self.log("Publish(%s): starting" % prefix, parent=None)
115         self._log_number = num
116         self._running = True
117         self._first_write_error = None
118         self._last_failure = None
119
120         self._status = PublishStatus()
121         self._status.set_storage_index(self._storage_index)
122         self._status.set_helper(False)
123         self._status.set_progress(0.0)
124         self._status.set_active(True)
125         self._version = self._node.get_version()
126         assert self._version in (SDMF_VERSION, MDMF_VERSION)
127
128
129     def get_status(self):
130         return self._status
131
132     def log(self, *args, **kwargs):
133         if 'parent' not in kwargs:
134             kwargs['parent'] = self._log_number
135         if "facility" not in kwargs:
136             kwargs["facility"] = "tahoe.mutable.publish"
137         return log.msg(*args, **kwargs)
138
139
140     def update(self, data, offset, blockhashes, version):
141         """
142         I replace the contents of this file with the contents of data,
143         starting at offset. I return a Deferred that fires with None
144         when the replacement has been completed, or with an error if
145         something went wrong during the process.
146
147         Note that this process will not upload new shares. If the file
148         being updated is in need of repair, callers will have to repair
149         it on their own.
150         """
151         # How this works:
152         # 1: Make peer assignments. We'll assign each share that we know
153         # about on the grid to that peer that currently holds that
154         # share, and will not place any new shares.
155         # 2: Setup encoding parameters. Most of these will stay the same
156         # -- datalength will change, as will some of the offsets.
157         # 3. Upload the new segments.
158         # 4. Be done.
159         assert IMutableUploadable.providedBy(data)
160
161         self.data = data
162
163         # XXX: Use the MutableFileVersion instead.
164         self.datalength = self._node.get_size()
165         if data.get_size() > self.datalength:
166             self.datalength = data.get_size()
167
168         self.log("starting update")
169         self.log("adding new data of length %d at offset %d" % \
170                     (data.get_size(), offset))
171         self.log("new data length is %d" % self.datalength)
172         self._status.set_size(self.datalength)
173         self._status.set_status("Started")
174         self._started = time.time()
175
176         self.done_deferred = defer.Deferred()
177
178         self._writekey = self._node.get_writekey()
179         assert self._writekey, "need write capability to publish"
180
181         # first, which servers will we publish to? We require that the
182         # servermap was updated in MODE_WRITE, so we can depend upon the
183         # peerlist computed by that process instead of computing our own.
184         assert self._servermap
185         assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
186         # we will push a version that is one larger than anything present
187         # in the grid, according to the servermap.
188         self._new_seqnum = self._servermap.highest_seqnum() + 1
189         self._status.set_servermap(self._servermap)
190
191         self.log(format="new seqnum will be %(seqnum)d",
192                  seqnum=self._new_seqnum, level=log.NOISY)
193
194         # We're updating an existing file, so all of the following
195         # should be available.
196         self.readkey = self._node.get_readkey()
197         self.required_shares = self._node.get_required_shares()
198         assert self.required_shares is not None
199         self.total_shares = self._node.get_total_shares()
200         assert self.total_shares is not None
201         self._status.set_encoding(self.required_shares, self.total_shares)
202
203         self._pubkey = self._node.get_pubkey()
204         assert self._pubkey
205         self._privkey = self._node.get_privkey()
206         assert self._privkey
207         self._encprivkey = self._node.get_encprivkey()
208
209         sb = self._storage_broker
210         full_peerlist = [(s.get_serverid(), s.get_rref())
211                          for s in sb.get_servers_for_psi(self._storage_index)]
212         self.full_peerlist = full_peerlist # for use later, immutable
213         self.bad_peers = set() # peerids who have errbacked/refused requests
214
215         # This will set self.segment_size, self.num_segments, and
216         # self.fec. TODO: Does it know how to do the offset? Probably
217         # not. So do that part next.
218         self.setup_encoding_parameters(offset=offset)
219
220         # if we experience any surprises (writes which were rejected because
221         # our test vector did not match, or shares which we didn't expect to
222         # see), we set this flag and report an UncoordinatedWriteError at the
223         # end of the publish process.
224         self.surprised = False
225
226         # we keep track of three tables. The first is our goal: which share
227         # we want to see on which servers. This is initially populated by the
228         # existing servermap.
229         self.goal = set() # pairs of (peerid, shnum) tuples
230
231         # the second table is our list of outstanding queries: those which
232         # are in flight and may or may not be delivered, accepted, or
233         # acknowledged. Items are added to this table when the request is
234         # sent, and removed when the response returns (or errbacks).
235         self.outstanding = set() # (peerid, shnum) tuples
236
237         # the third is a table of successes: share which have actually been
238         # placed. These are populated when responses come back with success.
239         # When self.placed == self.goal, we're done.
240         self.placed = set() # (peerid, shnum) tuples
241
242         # we also keep a mapping from peerid to RemoteReference. Each time we
243         # pull a connection out of the full peerlist, we add it to this for
244         # use later.
245         self.connections = {}
246
247         self.bad_share_checkstrings = {}
248
249         # This is set at the last step of the publishing process.
250         self.versioninfo = ""
251
252         # we use the servermap to populate the initial goal: this way we will
253         # try to update each existing share in place. Since we're
254         # updating, we ignore damaged and missing shares -- callers must
255         # do a repair to repair and recreate these.
256         for (peerid, shnum) in self._servermap.servermap:
257             self.goal.add( (peerid, shnum) )
258             self.connections[peerid] = self._servermap.connections[peerid]
259         self.writers = {}
260
261         # SDMF files are updated differently.
262         self._version = MDMF_VERSION
263         writer_class = MDMFSlotWriteProxy
264
265         # For each (peerid, shnum) in self.goal, we make a
266         # write proxy for that peer. We'll use this to write
267         # shares to the peer.
268         for key in self.goal:
269             peerid, shnum = key
270             write_enabler = self._node.get_write_enabler(peerid)
271             renew_secret = self._node.get_renewal_secret(peerid)
272             cancel_secret = self._node.get_cancel_secret(peerid)
273             secrets = (write_enabler, renew_secret, cancel_secret)
274
275             self.writers[shnum] =  writer_class(shnum,
276                                                 self.connections[peerid],
277                                                 self._storage_index,
278                                                 secrets,
279                                                 self._new_seqnum,
280                                                 self.required_shares,
281                                                 self.total_shares,
282                                                 self.segment_size,
283                                                 self.datalength)
284             self.writers[shnum].peerid = peerid
285             assert (peerid, shnum) in self._servermap.servermap
286             old_versionid, old_timestamp = self._servermap.servermap[key]
287             (old_seqnum, old_root_hash, old_salt, old_segsize,
288              old_datalength, old_k, old_N, old_prefix,
289              old_offsets_tuple) = old_versionid
290             self.writers[shnum].set_checkstring(old_seqnum,
291                                                 old_root_hash,
292                                                 old_salt)
293
294         # Our remote shares will not have a complete checkstring until
295         # after we are done writing share data and have started to write
296         # blocks. In the meantime, we need to know what to look for when
297         # writing, so that we can detect UncoordinatedWriteErrors.
298         self._checkstring = self.writers.values()[0].get_checkstring()
299
300         # Now, we start pushing shares.
301         self._status.timings["setup"] = time.time() - self._started
302         # First, we encrypt, encode, and publish the shares that we need
303         # to encrypt, encode, and publish.
304
305         # Our update process fetched these for us. We need to update
306         # them in place as publishing happens.
307         self.blockhashes = {} # (shnum, [blochashes])
308         for (i, bht) in blockhashes.iteritems():
309             # We need to extract the leaves from our old hash tree.
310             old_segcount = mathutil.div_ceil(version[4],
311                                              version[3])
312             h = hashtree.IncompleteHashTree(old_segcount)
313             bht = dict(enumerate(bht))
314             h.set_hashes(bht)
315             leaves = h[h.get_leaf_index(0):]
316             for j in xrange(self.num_segments - len(leaves)):
317                 leaves.append(None)
318
319             assert len(leaves) >= self.num_segments
320             self.blockhashes[i] = leaves
321             # This list will now be the leaves that were set during the
322             # initial upload + enough empty hashes to make it a
323             # power-of-two. If we exceed a power of two boundary, we
324             # should be encoding the file over again, and should not be
325             # here. So, we have
326             #assert len(self.blockhashes[i]) == \
327             #    hashtree.roundup_pow2(self.num_segments), \
328             #        len(self.blockhashes[i])
329             # XXX: Except this doesn't work. Figure out why.
330
331         # These are filled in later, after we've modified the block hash
332         # tree suitably.
333         self.sharehash_leaves = None # eventually [sharehashes]
334         self.sharehashes = {} # shnum -> [sharehash leaves necessary to 
335                               # validate the share]
336
337         self.log("Starting push")
338
339         self._state = PUSHING_BLOCKS_STATE
340         self._push()
341
342         return self.done_deferred
343
344
345     def publish(self, newdata):
346         """Publish the filenode's current contents.  Returns a Deferred that
347         fires (with None) when the publish has done as much work as it's ever
348         going to do, or errbacks with ConsistencyError if it detects a
349         simultaneous write.
350         """
351
352         # 0. Setup encoding parameters, encoder, and other such things.
353         # 1. Encrypt, encode, and publish segments.
354         assert IMutableUploadable.providedBy(newdata)
355
356         self.data = newdata
357         self.datalength = newdata.get_size()
358         #if self.datalength >= DEFAULT_MAX_SEGMENT_SIZE:
359         #    self._version = MDMF_VERSION
360         #else:
361         #    self._version = SDMF_VERSION
362
363         self.log("starting publish, datalen is %s" % self.datalength)
364         self._status.set_size(self.datalength)
365         self._status.set_status("Started")
366         self._started = time.time()
367
368         self.done_deferred = defer.Deferred()
369
370         self._writekey = self._node.get_writekey()
371         assert self._writekey, "need write capability to publish"
372
373         # first, which servers will we publish to? We require that the
374         # servermap was updated in MODE_WRITE, so we can depend upon the
375         # peerlist computed by that process instead of computing our own.
376         if self._servermap:
377             assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
378             # we will push a version that is one larger than anything present
379             # in the grid, according to the servermap.
380             self._new_seqnum = self._servermap.highest_seqnum() + 1
381         else:
382             # If we don't have a servermap, that's because we're doing the
383             # initial publish
384             self._new_seqnum = 1
385             self._servermap = ServerMap()
386         self._status.set_servermap(self._servermap)
387
388         self.log(format="new seqnum will be %(seqnum)d",
389                  seqnum=self._new_seqnum, level=log.NOISY)
390
391         # having an up-to-date servermap (or using a filenode that was just
392         # created for the first time) also guarantees that the following
393         # fields are available
394         self.readkey = self._node.get_readkey()
395         self.required_shares = self._node.get_required_shares()
396         assert self.required_shares is not None
397         self.total_shares = self._node.get_total_shares()
398         assert self.total_shares is not None
399         self._status.set_encoding(self.required_shares, self.total_shares)
400
401         self._pubkey = self._node.get_pubkey()
402         assert self._pubkey
403         self._privkey = self._node.get_privkey()
404         assert self._privkey
405         self._encprivkey = self._node.get_encprivkey()
406
407         sb = self._storage_broker
408         full_peerlist = [(s.get_serverid(), s.get_rref())
409                          for s in sb.get_servers_for_psi(self._storage_index)]
410         self.full_peerlist = full_peerlist # for use later, immutable
411         self.bad_peers = set() # peerids who have errbacked/refused requests
412
413         # This will set self.segment_size, self.num_segments, and
414         # self.fec.
415         self.setup_encoding_parameters()
416
417         # if we experience any surprises (writes which were rejected because
418         # our test vector did not match, or shares which we didn't expect to
419         # see), we set this flag and report an UncoordinatedWriteError at the
420         # end of the publish process.
421         self.surprised = False
422
423         # we keep track of three tables. The first is our goal: which share
424         # we want to see on which servers. This is initially populated by the
425         # existing servermap.
426         self.goal = set() # pairs of (peerid, shnum) tuples
427
428         # the second table is our list of outstanding queries: those which
429         # are in flight and may or may not be delivered, accepted, or
430         # acknowledged. Items are added to this table when the request is
431         # sent, and removed when the response returns (or errbacks).
432         self.outstanding = set() # (peerid, shnum) tuples
433
434         # the third is a table of successes: share which have actually been
435         # placed. These are populated when responses come back with success.
436         # When self.placed == self.goal, we're done.
437         self.placed = set() # (peerid, shnum) tuples
438
439         # we also keep a mapping from peerid to RemoteReference. Each time we
440         # pull a connection out of the full peerlist, we add it to this for
441         # use later.
442         self.connections = {}
443
444         self.bad_share_checkstrings = {}
445
446         # This is set at the last step of the publishing process.
447         self.versioninfo = ""
448
449         # we use the servermap to populate the initial goal: this way we will
450         # try to update each existing share in place.
451         for (peerid, shnum) in self._servermap.servermap:
452             self.goal.add( (peerid, shnum) )
453             self.connections[peerid] = self._servermap.connections[peerid]
454         # then we add in all the shares that were bad (corrupted, bad
455         # signatures, etc). We want to replace these.
456         for key, old_checkstring in self._servermap.bad_shares.items():
457             (peerid, shnum) = key
458             self.goal.add(key)
459             self.bad_share_checkstrings[key] = old_checkstring
460             self.connections[peerid] = self._servermap.connections[peerid]
461
462         # TODO: Make this part do peer selection.
463         self.update_goal()
464         self.writers = {}
465         if self._version == MDMF_VERSION:
466             writer_class = MDMFSlotWriteProxy
467         else:
468             writer_class = SDMFSlotWriteProxy
469
470         # For each (peerid, shnum) in self.goal, we make a
471         # write proxy for that peer. We'll use this to write
472         # shares to the peer.
473         for key in self.goal:
474             peerid, shnum = key
475             write_enabler = self._node.get_write_enabler(peerid)
476             renew_secret = self._node.get_renewal_secret(peerid)
477             cancel_secret = self._node.get_cancel_secret(peerid)
478             secrets = (write_enabler, renew_secret, cancel_secret)
479
480             self.writers[shnum] =  writer_class(shnum,
481                                                 self.connections[peerid],
482                                                 self._storage_index,
483                                                 secrets,
484                                                 self._new_seqnum,
485                                                 self.required_shares,
486                                                 self.total_shares,
487                                                 self.segment_size,
488                                                 self.datalength)
489             self.writers[shnum].peerid = peerid
490             if (peerid, shnum) in self._servermap.servermap:
491                 old_versionid, old_timestamp = self._servermap.servermap[key]
492                 (old_seqnum, old_root_hash, old_salt, old_segsize,
493                  old_datalength, old_k, old_N, old_prefix,
494                  old_offsets_tuple) = old_versionid
495                 self.writers[shnum].set_checkstring(old_seqnum,
496                                                     old_root_hash,
497                                                     old_salt)
498             elif (peerid, shnum) in self.bad_share_checkstrings:
499                 old_checkstring = self.bad_share_checkstrings[(peerid, shnum)]
500                 self.writers[shnum].set_checkstring(old_checkstring)
501
502         # Our remote shares will not have a complete checkstring until
503         # after we are done writing share data and have started to write
504         # blocks. In the meantime, we need to know what to look for when
505         # writing, so that we can detect UncoordinatedWriteErrors.
506         self._checkstring = self.writers.values()[0].get_checkstring()
507
508         # Now, we start pushing shares.
509         self._status.timings["setup"] = time.time() - self._started
510         # First, we encrypt, encode, and publish the shares that we need
511         # to encrypt, encode, and publish.
512
513         # This will eventually hold the block hash chain for each share
514         # that we publish. We define it this way so that empty publishes
515         # will still have something to write to the remote slot.
516         self.blockhashes = dict([(i, []) for i in xrange(self.total_shares)])
517         for i in xrange(self.total_shares):
518             blocks = self.blockhashes[i]
519             for j in xrange(self.num_segments):
520                 blocks.append(None)
521         self.sharehash_leaves = None # eventually [sharehashes]
522         self.sharehashes = {} # shnum -> [sharehash leaves necessary to 
523                               # validate the share]
524
525         self.log("Starting push")
526
527         self._state = PUSHING_BLOCKS_STATE
528         self._push()
529
530         return self.done_deferred
531
532
533     def _update_status(self):
534         self._status.set_status("Sending Shares: %d placed out of %d, "
535                                 "%d messages outstanding" %
536                                 (len(self.placed),
537                                  len(self.goal),
538                                  len(self.outstanding)))
539         self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
540
541
542     def setup_encoding_parameters(self, offset=0):
543         if self._version == MDMF_VERSION:
544             segment_size = DEFAULT_MAX_SEGMENT_SIZE # 128 KiB by default
545         else:
546             segment_size = self.datalength # SDMF is only one segment
547         # this must be a multiple of self.required_shares
548         segment_size = mathutil.next_multiple(segment_size,
549                                               self.required_shares)
550         self.segment_size = segment_size
551
552         # Calculate the starting segment for the upload.
553         if segment_size:
554             # We use div_ceil instead of integer division here because
555             # it is semantically correct.
556             # If datalength isn't an even multiple of segment_size, but
557             # is larger than segment_size, datalength // segment_size
558             # will be the largest number such that num <= datalength and
559             # num % segment_size == 0. But that's not what we want,
560             # because it ignores the extra data. div_ceil will give us
561             # the right number of segments for the data that we're
562             # given.
563             self.num_segments = mathutil.div_ceil(self.datalength,
564                                                   segment_size)
565
566             self.starting_segment = offset // segment_size
567
568         else:
569             self.num_segments = 0
570             self.starting_segment = 0
571
572
573         self.log("building encoding parameters for file")
574         self.log("got segsize %d" % self.segment_size)
575         self.log("got %d segments" % self.num_segments)
576
577         if self._version == SDMF_VERSION:
578             assert self.num_segments in (0, 1) # SDMF
579         # calculate the tail segment size.
580
581         if segment_size and self.datalength:
582             self.tail_segment_size = self.datalength % segment_size
583             self.log("got tail segment size %d" % self.tail_segment_size)
584         else:
585             self.tail_segment_size = 0
586
587         if self.tail_segment_size == 0 and segment_size:
588             # The tail segment is the same size as the other segments.
589             self.tail_segment_size = segment_size
590
591         # Make FEC encoders
592         fec = codec.CRSEncoder()
593         fec.set_params(self.segment_size,
594                        self.required_shares, self.total_shares)
595         self.piece_size = fec.get_block_size()
596         self.fec = fec
597
598         if self.tail_segment_size == self.segment_size:
599             self.tail_fec = self.fec
600         else:
601             tail_fec = codec.CRSEncoder()
602             tail_fec.set_params(self.tail_segment_size,
603                                 self.required_shares,
604                                 self.total_shares)
605             self.tail_fec = tail_fec
606
607         self._current_segment = self.starting_segment
608         self.end_segment = self.num_segments - 1
609         # Now figure out where the last segment should be.
610         if self.data.get_size() != self.datalength:
611             # We're updating a few segments in the middle of a mutable
612             # file, so we don't want to republish the whole thing.
613             # (we don't have enough data to do that even if we wanted
614             # to)
615             end = self.data.get_size()
616             self.end_segment = end // segment_size
617             if end % segment_size == 0:
618                 self.end_segment -= 1
619
620         self.log("got start segment %d" % self.starting_segment)
621         self.log("got end segment %d" % self.end_segment)
622
623
624     def _push(self, ignored=None):
625         """
626         I manage state transitions. In particular, I see that we still
627         have a good enough number of writers to complete the upload
628         successfully.
629         """
630         # Can we still successfully publish this file?
631         # TODO: Keep track of outstanding queries before aborting the
632         #       process.
633         if len(self.writers) < self.required_shares or self.surprised:
634             return self._failure()
635
636         # Figure out what we need to do next. Each of these needs to
637         # return a deferred so that we don't block execution when this
638         # is first called in the upload method.
639         if self._state == PUSHING_BLOCKS_STATE:
640             return self.push_segment(self._current_segment)
641
642         elif self._state == PUSHING_EVERYTHING_ELSE_STATE:
643             return self.push_everything_else()
644
645         # If we make it to this point, we were successful in placing the
646         # file.
647         return self._done()
648
649
650     def push_segment(self, segnum):
651         if self.num_segments == 0 and self._version == SDMF_VERSION:
652             self._add_dummy_salts()
653
654         if segnum > self.end_segment:
655             # We don't have any more segments to push.
656             self._state = PUSHING_EVERYTHING_ELSE_STATE
657             return self._push()
658
659         d = self._encode_segment(segnum)
660         d.addCallback(self._push_segment, segnum)
661         def _increment_segnum(ign):
662             self._current_segment += 1
663         # XXX: I don't think we need to do addBoth here -- any errBacks
664         # should be handled within push_segment.
665         d.addCallback(_increment_segnum)
666         d.addCallback(self._turn_barrier)
667         d.addCallback(self._push)
668         d.addErrback(self._failure)
669
670
671     def _turn_barrier(self, result):
672         """
673         I help the publish process avoid the recursion limit issues
674         described in #237.
675         """
676         return fireEventually(result)
677
678
679     def _add_dummy_salts(self):
680         """
681         SDMF files need a salt even if they're empty, or the signature
682         won't make sense. This method adds a dummy salt to each of our
683         SDMF writers so that they can write the signature later.
684         """
685         salt = os.urandom(16)
686         assert self._version == SDMF_VERSION
687
688         for writer in self.writers.itervalues():
689             writer.put_salt(salt)
690
691
692     def _encode_segment(self, segnum):
693         """
694         I encrypt and encode the segment segnum.
695         """
696         started = time.time()
697
698         if segnum + 1 == self.num_segments:
699             segsize = self.tail_segment_size
700         else:
701             segsize = self.segment_size
702
703
704         self.log("Pushing segment %d of %d" % (segnum + 1, self.num_segments))
705         data = self.data.read(segsize)
706         # XXX: This is dumb. Why return a list?
707         data = "".join(data)
708
709         assert len(data) == segsize, len(data)
710
711         salt = os.urandom(16)
712
713         key = hashutil.ssk_readkey_data_hash(salt, self.readkey)
714         self._status.set_status("Encrypting")
715         enc = AES(key)
716         crypttext = enc.process(data)
717         assert len(crypttext) == len(data)
718
719         now = time.time()
720         self._status.accumulate_encrypt_time(now - started)
721         started = now
722
723         # now apply FEC
724         if segnum + 1 == self.num_segments:
725             fec = self.tail_fec
726         else:
727             fec = self.fec
728
729         self._status.set_status("Encoding")
730         crypttext_pieces = [None] * self.required_shares
731         piece_size = fec.get_block_size()
732         for i in range(len(crypttext_pieces)):
733             offset = i * piece_size
734             piece = crypttext[offset:offset+piece_size]
735             piece = piece + "\x00"*(piece_size - len(piece)) # padding
736             crypttext_pieces[i] = piece
737             assert len(piece) == piece_size
738         d = fec.encode(crypttext_pieces)
739         def _done_encoding(res):
740             elapsed = time.time() - started
741             self._status.accumulate_encode_time(elapsed)
742             return (res, salt)
743         d.addCallback(_done_encoding)
744         return d
745
746
747     def _push_segment(self, encoded_and_salt, segnum):
748         """
749         I push (data, salt) as segment number segnum.
750         """
751         results, salt = encoded_and_salt
752         shares, shareids = results
753         self._status.set_status("Pushing segment")
754         for i in xrange(len(shares)):
755             sharedata = shares[i]
756             shareid = shareids[i]
757             if self._version == MDMF_VERSION:
758                 hashed = salt + sharedata
759             else:
760                 hashed = sharedata
761             block_hash = hashutil.block_hash(hashed)
762             self.blockhashes[shareid][segnum] = block_hash
763             # find the writer for this share
764             writer = self.writers[shareid]
765             writer.put_block(sharedata, segnum, salt)
766
767
768     def push_everything_else(self):
769         """
770         I put everything else associated with a share.
771         """
772         self._pack_started = time.time()
773         self.push_encprivkey()
774         self.push_blockhashes()
775         self.push_sharehashes()
776         self.push_toplevel_hashes_and_signature()
777         d = self.finish_publishing()
778         def _change_state(ignored):
779             self._state = DONE_STATE
780         d.addCallback(_change_state)
781         d.addCallback(self._push)
782         return d
783
784
785     def push_encprivkey(self):
786         encprivkey = self._encprivkey
787         self._status.set_status("Pushing encrypted private key")
788         for writer in self.writers.itervalues():
789             writer.put_encprivkey(encprivkey)
790
791
792     def push_blockhashes(self):
793         self.sharehash_leaves = [None] * len(self.blockhashes)
794         self._status.set_status("Building and pushing block hash tree")
795         for shnum, blockhashes in self.blockhashes.iteritems():
796             t = hashtree.HashTree(blockhashes)
797             self.blockhashes[shnum] = list(t)
798             # set the leaf for future use.
799             self.sharehash_leaves[shnum] = t[0]
800
801             writer = self.writers[shnum]
802             writer.put_blockhashes(self.blockhashes[shnum])
803
804
805     def push_sharehashes(self):
806         self._status.set_status("Building and pushing share hash chain")
807         share_hash_tree = hashtree.HashTree(self.sharehash_leaves)
808         for shnum in xrange(len(self.sharehash_leaves)):
809             needed_indices = share_hash_tree.needed_hashes(shnum)
810             self.sharehashes[shnum] = dict( [ (i, share_hash_tree[i])
811                                              for i in needed_indices] )
812             writer = self.writers[shnum]
813             writer.put_sharehashes(self.sharehashes[shnum])
814         self.root_hash = share_hash_tree[0]
815
816
817     def push_toplevel_hashes_and_signature(self):
818         # We need to to three things here:
819         #   - Push the root hash and salt hash
820         #   - Get the checkstring of the resulting layout; sign that.
821         #   - Push the signature
822         self._status.set_status("Pushing root hashes and signature")
823         for shnum in xrange(self.total_shares):
824             writer = self.writers[shnum]
825             writer.put_root_hash(self.root_hash)
826         self._update_checkstring()
827         self._make_and_place_signature()
828
829
830     def _update_checkstring(self):
831         """
832         After putting the root hash, MDMF files will have the
833         checkstring written to the storage server. This means that we
834         can update our copy of the checkstring so we can detect
835         uncoordinated writes. SDMF files will have the same checkstring,
836         so we need not do anything.
837         """
838         self._checkstring = self.writers.values()[0].get_checkstring()
839
840
841     def _make_and_place_signature(self):
842         """
843         I create and place the signature.
844         """
845         started = time.time()
846         self._status.set_status("Signing prefix")
847         signable = self.writers[0].get_signable()
848         self.signature = self._privkey.sign(signable)
849
850         for (shnum, writer) in self.writers.iteritems():
851             writer.put_signature(self.signature)
852         self._status.timings['sign'] = time.time() - started
853
854
855     def finish_publishing(self):
856         # We're almost done -- we just need to put the verification key
857         # and the offsets
858         started = time.time()
859         self._status.set_status("Pushing shares")
860         self._started_pushing = started
861         ds = []
862         verification_key = self._pubkey.serialize()
863
864
865         # TODO: Bad, since we remove from this same dict. We need to
866         # make a copy, or just use a non-iterated value.
867         for (shnum, writer) in self.writers.iteritems():
868             writer.put_verification_key(verification_key)
869             d = writer.finish_publishing()
870             d.addErrback(self._connection_problem, writer)
871             # Add the (peerid, shnum) tuple to our list of outstanding
872             # queries. This gets used by _loop if some of our queries
873             # fail to place shares.
874             self.outstanding.add((writer.peerid, writer.shnum))
875             d.addCallback(self._got_write_answer, writer, started)
876             ds.append(d)
877         self._record_verinfo()
878         self._status.timings['pack'] = time.time() - started
879         return defer.DeferredList(ds)
880
881
882     def _record_verinfo(self):
883         self.versioninfo = self.writers.values()[0].get_verinfo()
884
885
886     def _connection_problem(self, f, writer):
887         """
888         We ran into a connection problem while working with writer, and
889         need to deal with that.
890         """
891         self.log("found problem: %s" % str(f))
892         self._last_failure = f
893         del(self.writers[writer.shnum])
894
895
896     def log_goal(self, goal, message=""):
897         logmsg = [message]
898         for (shnum, peerid) in sorted([(s,p) for (p,s) in goal]):
899             logmsg.append("sh%d to [%s]" % (shnum,
900                                             idlib.shortnodeid_b2a(peerid)))
901         self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
902         self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
903                  level=log.NOISY)
904
905     def update_goal(self):
906         # if log.recording_noisy
907         if True:
908             self.log_goal(self.goal, "before update: ")
909
910         # first, remove any bad peers from our goal
911         self.goal = set([ (peerid, shnum)
912                           for (peerid, shnum) in self.goal
913                           if peerid not in self.bad_peers ])
914
915         # find the homeless shares:
916         homefull_shares = set([shnum for (peerid, shnum) in self.goal])
917         homeless_shares = set(range(self.total_shares)) - homefull_shares
918         homeless_shares = sorted(list(homeless_shares))
919         # place them somewhere. We prefer unused servers at the beginning of
920         # the available peer list.
921
922         if not homeless_shares:
923             return
924
925         # if an old share X is on a node, put the new share X there too.
926         # TODO: 1: redistribute shares to achieve one-per-peer, by copying
927         #       shares from existing peers to new (less-crowded) ones. The
928         #       old shares must still be updated.
929         # TODO: 2: move those shares instead of copying them, to reduce future
930         #       update work
931
932         # this is a bit CPU intensive but easy to analyze. We create a sort
933         # order for each peerid. If the peerid is marked as bad, we don't
934         # even put them in the list. Then we care about the number of shares
935         # which have already been assigned to them. After that we care about
936         # their permutation order.
937         old_assignments = DictOfSets()
938         for (peerid, shnum) in self.goal:
939             old_assignments.add(peerid, shnum)
940
941         peerlist = []
942         for i, (peerid, ss) in enumerate(self.full_peerlist):
943             if peerid in self.bad_peers:
944                 continue
945             entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
946             peerlist.append(entry)
947         peerlist.sort()
948
949         if not peerlist:
950             raise NotEnoughServersError("Ran out of non-bad servers, "
951                                         "first_error=%s" %
952                                         str(self._first_write_error),
953                                         self._first_write_error)
954
955         # we then index this peerlist with an integer, because we may have to
956         # wrap. We update the goal as we go.
957         i = 0
958         for shnum in homeless_shares:
959             (ignored1, ignored2, peerid, ss) = peerlist[i]
960             # if we are forced to send a share to a server that already has
961             # one, we may have two write requests in flight, and the
962             # servermap (which was computed before either request was sent)
963             # won't reflect the new shares, so the second response will be
964             # surprising. There is code in _got_write_answer() to tolerate
965             # this, otherwise it would cause the publish to fail with an
966             # UncoordinatedWriteError. See #546 for details of the trouble
967             # this used to cause.
968             self.goal.add( (peerid, shnum) )
969             self.connections[peerid] = ss
970             i += 1
971             if i >= len(peerlist):
972                 i = 0
973         if True:
974             self.log_goal(self.goal, "after update: ")
975
976
977     def _got_write_answer(self, answer, writer, started):
978         if not answer:
979             # SDMF writers only pretend to write when readers set their
980             # blocks, salts, and so on -- they actually just write once,
981             # at the end of the upload process. In fake writes, they
982             # return defer.succeed(None). If we see that, we shouldn't
983             # bother checking it.
984             return
985
986         peerid = writer.peerid
987         lp = self.log("_got_write_answer from %s, share %d" %
988                       (idlib.shortnodeid_b2a(peerid), writer.shnum))
989
990         now = time.time()
991         elapsed = now - started
992
993         self._status.add_per_server_time(peerid, elapsed)
994
995         wrote, read_data = answer
996
997         surprise_shares = set(read_data.keys()) - set([writer.shnum])
998
999         # We need to remove from surprise_shares any shares that we are
1000         # knowingly also writing to that peer from other writers.
1001
1002         # TODO: Precompute this.
1003         known_shnums = [x.shnum for x in self.writers.values()
1004                         if x.peerid == peerid]
1005         surprise_shares -= set(known_shnums)
1006         self.log("found the following surprise shares: %s" %
1007                  str(surprise_shares))
1008
1009         # Now surprise shares contains all of the shares that we did not
1010         # expect to be there.
1011
1012         surprised = False
1013         for shnum in surprise_shares:
1014             # read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX)
1015             checkstring = read_data[shnum][0]
1016             # What we want to do here is to see if their (seqnum,
1017             # roothash, salt) is the same as our (seqnum, roothash,
1018             # salt), or the equivalent for MDMF. The best way to do this
1019             # is to store a packed representation of our checkstring
1020             # somewhere, then not bother unpacking the other
1021             # checkstring.
1022             if checkstring == self._checkstring:
1023                 # they have the right share, somehow
1024
1025                 if (peerid,shnum) in self.goal:
1026                     # and we want them to have it, so we probably sent them a
1027                     # copy in an earlier write. This is ok, and avoids the
1028                     # #546 problem.
1029                     continue
1030
1031                 # They aren't in our goal, but they are still for the right
1032                 # version. Somebody else wrote them, and it's a convergent
1033                 # uncoordinated write. Pretend this is ok (don't be
1034                 # surprised), since I suspect there's a decent chance that
1035                 # we'll hit this in normal operation.
1036                 continue
1037
1038             else:
1039                 # the new shares are of a different version
1040                 if peerid in self._servermap.reachable_peers:
1041                     # we asked them about their shares, so we had knowledge
1042                     # of what they used to have. Any surprising shares must
1043                     # have come from someone else, so UCW.
1044                     surprised = True
1045                 else:
1046                     # we didn't ask them, and now we've discovered that they
1047                     # have a share we didn't know about. This indicates that
1048                     # mapupdate should have wokred harder and asked more
1049                     # servers before concluding that it knew about them all.
1050
1051                     # signal UCW, but make sure to ask this peer next time,
1052                     # so we'll remember to update it if/when we retry.
1053                     surprised = True
1054                     # TODO: ask this peer next time. I don't yet have a good
1055                     # way to do this. Two insufficient possibilities are:
1056                     #
1057                     # self._servermap.add_new_share(peerid, shnum, verinfo, now)
1058                     #  but that requires fetching/validating/parsing the whole
1059                     #  version string, and all we have is the checkstring
1060                     # self._servermap.mark_bad_share(peerid, shnum, checkstring)
1061                     #  that will make publish overwrite the share next time,
1062                     #  but it won't re-query the server, and it won't make
1063                     #  mapupdate search further
1064
1065                     # TODO later: when publish starts, do
1066                     # servermap.get_best_version(), extract the seqnum,
1067                     # subtract one, and store as highest-replaceable-seqnum.
1068                     # Then, if this surprise-because-we-didn't-ask share is
1069                     # of highest-replaceable-seqnum or lower, we're allowed
1070                     # to replace it: send out a new writev (or rather add it
1071                     # to self.goal and loop).
1072                     pass
1073
1074                 surprised = True
1075
1076         if surprised:
1077             self.log("they had shares %s that we didn't know about" %
1078                      (list(surprise_shares),),
1079                      parent=lp, level=log.WEIRD, umid="un9CSQ")
1080             self.surprised = True
1081
1082         if not wrote:
1083             # TODO: there are two possibilities. The first is that the server
1084             # is full (or just doesn't want to give us any room), which means
1085             # we shouldn't ask them again, but is *not* an indication of an
1086             # uncoordinated write. The second is that our testv failed, which
1087             # *does* indicate an uncoordinated write. We currently don't have
1088             # a way to tell these two apart (in fact, the storage server code
1089             # doesn't have the option of refusing our share).
1090             #
1091             # If the server is full, mark the peer as bad (so we don't ask
1092             # them again), but don't set self.surprised. The loop() will find
1093             # a new server.
1094             #
1095             # If the testv failed, log it, set self.surprised, but don't
1096             # bother adding to self.bad_peers .
1097
1098             self.log("our testv failed, so the write did not happen",
1099                      parent=lp, level=log.WEIRD, umid="8sc26g")
1100             self.surprised = True
1101             self.bad_peers.add(writer) # don't ask them again
1102             # use the checkstring to add information to the log message
1103             for (shnum,readv) in read_data.items():
1104                 checkstring = readv[0]
1105                 (other_seqnum,
1106                  other_roothash,
1107                  other_salt) = unpack_checkstring(checkstring)
1108                 expected_version = self._servermap.version_on_peer(peerid,
1109                                                                    shnum)
1110                 if expected_version:
1111                     (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
1112                      offsets_tuple) = expected_version
1113                     self.log("somebody modified the share on us:"
1114                              " shnum=%d: I thought they had #%d:R=%s,"
1115                              " but testv reported #%d:R=%s" %
1116                              (shnum,
1117                               seqnum, base32.b2a(root_hash)[:4],
1118                               other_seqnum, base32.b2a(other_roothash)[:4]),
1119                              parent=lp, level=log.NOISY)
1120                 # if expected_version==None, then we didn't expect to see a
1121                 # share on that peer, and the 'surprise_shares' clause above
1122                 # will have logged it.
1123             return
1124
1125         # and update the servermap
1126         # self.versioninfo is set during the last phase of publishing.
1127         # If we get there, we know that responses correspond to placed
1128         # shares, and can safely execute these statements.
1129         if self.versioninfo:
1130             self.log("wrote successfully: adding new share to servermap")
1131             self._servermap.add_new_share(peerid, writer.shnum,
1132                                           self.versioninfo, started)
1133             self.placed.add( (peerid, writer.shnum) )
1134         self._update_status()
1135         # the next method in the deferred chain will check to see if
1136         # we're done and successful.
1137         return
1138
1139
1140     def _done(self):
1141         if not self._running:
1142             return
1143         self._running = False
1144         now = time.time()
1145         self._status.timings["total"] = now - self._started
1146
1147         elapsed = now - self._started_pushing
1148         self._status.timings['push'] = elapsed
1149
1150         self._status.set_active(False)
1151         self.log("Publish done, success")
1152         self._status.set_status("Finished")
1153         self._status.set_progress(1.0)
1154         # Get k and segsize, then give them to the caller.
1155         hints = {}
1156         hints['segsize'] = self.segment_size
1157         hints['k'] = self.required_shares
1158         self._node.set_downloader_hints(hints)
1159         eventually(self.done_deferred.callback, None)
1160
1161     def _failure(self, f=None):
1162         if f:
1163             self._last_failure = f
1164
1165         if not self.surprised:
1166             # We ran out of servers
1167             msg = "Publish ran out of good servers"
1168             if self._last_failure:
1169                 msg += ", last failure was: %s" % str(self._last_failure)
1170             self.log(msg)
1171             e = NotEnoughServersError(msg)
1172
1173         else:
1174             # We ran into shares that we didn't recognize, which means
1175             # that we need to return an UncoordinatedWriteError.
1176             self.log("Publish failed with UncoordinatedWriteError")
1177             e = UncoordinatedWriteError()
1178         f = failure.Failure(e)
1179         eventually(self.done_deferred.callback, f)
1180
1181
1182 class MutableFileHandle:
1183     """
1184     I am a mutable uploadable built around a filehandle-like object,
1185     usually either a StringIO instance or a handle to an actual file.
1186     """
1187     implements(IMutableUploadable)
1188
1189     def __init__(self, filehandle):
1190         # The filehandle is defined as a generally file-like object that
1191         # has these two methods. We don't care beyond that.
1192         assert hasattr(filehandle, "read")
1193         assert hasattr(filehandle, "close")
1194
1195         self._filehandle = filehandle
1196         # We must start reading at the beginning of the file, or we risk
1197         # encountering errors when the data read does not match the size
1198         # reported to the uploader.
1199         self._filehandle.seek(0)
1200
1201         # We have not yet read anything, so our position is 0.
1202         self._marker = 0
1203
1204
1205     def get_size(self):
1206         """
1207         I return the amount of data in my filehandle.
1208         """
1209         if not hasattr(self, "_size"):
1210             old_position = self._filehandle.tell()
1211             # Seek to the end of the file by seeking 0 bytes from the
1212             # file's end
1213             self._filehandle.seek(0, 2) # 2 == os.SEEK_END in 2.5+
1214             self._size = self._filehandle.tell()
1215             # Restore the previous position, in case this was called
1216             # after a read.
1217             self._filehandle.seek(old_position)
1218             assert self._filehandle.tell() == old_position
1219
1220         assert hasattr(self, "_size")
1221         return self._size
1222
1223
1224     def pos(self):
1225         """
1226         I return the position of my read marker -- i.e., how much data I
1227         have already read and returned to callers.
1228         """
1229         return self._marker
1230
1231
1232     def read(self, length):
1233         """
1234         I return some data (up to length bytes) from my filehandle.
1235
1236         In most cases, I return length bytes, but sometimes I won't --
1237         for example, if I am asked to read beyond the end of a file, or
1238         an error occurs.
1239         """
1240         results = self._filehandle.read(length)
1241         self._marker += len(results)
1242         return [results]
1243
1244
1245     def close(self):
1246         """
1247         I close the underlying filehandle. Any further operations on the
1248         filehandle fail at this point.
1249         """
1250         self._filehandle.close()
1251
1252
1253 class MutableData(MutableFileHandle):
1254     """
1255     I am a mutable uploadable built around a string, which I then cast
1256     into a StringIO and treat as a filehandle.
1257     """
1258
1259     def __init__(self, s):
1260         # Take a string and return a file-like uploadable.
1261         assert isinstance(s, str)
1262
1263         MutableFileHandle.__init__(self, StringIO(s))
1264
1265
1266 class TransformingUploadable:
1267     """
1268     I am an IMutableUploadable that wraps another IMutableUploadable,
1269     and some segments that are already on the grid. When I am called to
1270     read, I handle merging of boundary segments.
1271     """
1272     implements(IMutableUploadable)
1273
1274
1275     def __init__(self, data, offset, segment_size, start, end):
1276         assert IMutableUploadable.providedBy(data)
1277
1278         self._newdata = data
1279         self._offset = offset
1280         self._segment_size = segment_size
1281         self._start = start
1282         self._end = end
1283
1284         self._read_marker = 0
1285
1286         self._first_segment_offset = offset % segment_size
1287
1288         num = self.log("TransformingUploadable: starting", parent=None)
1289         self._log_number = num
1290         self.log("got fso: %d" % self._first_segment_offset)
1291         self.log("got offset: %d" % self._offset)
1292
1293
1294     def log(self, *args, **kwargs):
1295         if 'parent' not in kwargs:
1296             kwargs['parent'] = self._log_number
1297         if "facility" not in kwargs:
1298             kwargs["facility"] = "tahoe.mutable.transforminguploadable"
1299         return log.msg(*args, **kwargs)
1300
1301
1302     def get_size(self):
1303         return self._offset + self._newdata.get_size()
1304
1305
1306     def read(self, length):
1307         # We can get data from 3 sources here. 
1308         #   1. The first of the segments provided to us.
1309         #   2. The data that we're replacing things with.
1310         #   3. The last of the segments provided to us.
1311
1312         # are we in state 0?
1313         self.log("reading %d bytes" % length)
1314
1315         old_start_data = ""
1316         old_data_length = self._first_segment_offset - self._read_marker
1317         if old_data_length > 0:
1318             if old_data_length > length:
1319                 old_data_length = length
1320             self.log("returning %d bytes of old start data" % old_data_length)
1321
1322             old_data_end = old_data_length + self._read_marker
1323             old_start_data = self._start[self._read_marker:old_data_end]
1324             length -= old_data_length
1325         else:
1326             # otherwise calculations later get screwed up.
1327             old_data_length = 0
1328
1329         # Is there enough new data to satisfy this read? If not, we need
1330         # to pad the end of the data with data from our last segment.
1331         old_end_length = length - \
1332             (self._newdata.get_size() - self._newdata.pos())
1333         old_end_data = ""
1334         if old_end_length > 0:
1335             self.log("reading %d bytes of old end data" % old_end_length)
1336
1337             # TODO: We're not explicitly checking for tail segment size
1338             # here. Is that a problem?
1339             old_data_offset = (length - old_end_length + \
1340                                old_data_length) % self._segment_size
1341             self.log("reading at offset %d" % old_data_offset)
1342             old_end = old_data_offset + old_end_length
1343             old_end_data = self._end[old_data_offset:old_end]
1344             length -= old_end_length
1345             assert length == self._newdata.get_size() - self._newdata.pos()
1346
1347         self.log("reading %d bytes of new data" % length)
1348         new_data = self._newdata.read(length)
1349         new_data = "".join(new_data)
1350
1351         self._read_marker += len(old_start_data + new_data + old_end_data)
1352
1353         return old_start_data + new_data + old_end_data
1354
1355     def close(self):
1356         pass