]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/publish.py
066ee9045c22e4975db51195945c49ca9d3ee9e7
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / publish.py
1
2
3 import os, time
4 from StringIO import StringIO
5 from itertools import count
6 from zope.interface import implements
7 from twisted.internet import defer
8 from twisted.python import failure
9 from allmydata.interfaces import IPublishStatus, SDMF_VERSION, MDMF_VERSION, \
10                                  IMutableUploadable
11 from allmydata.util import base32, hashutil, mathutil, idlib, log
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from foolscap.api import eventually, fireEventually
17
18 from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, \
19      UncoordinatedWriteError, NotEnoughServersError
20 from allmydata.mutable.servermap import ServerMap
21 from allmydata.mutable.layout import unpack_checkstring, MDMFSlotWriteProxy, \
22                                      SDMFSlotWriteProxy
23
24 KiB = 1024
25 DEFAULT_MAX_SEGMENT_SIZE = 128 * KiB
26 PUSHING_BLOCKS_STATE = 0
27 PUSHING_EVERYTHING_ELSE_STATE = 1
28 DONE_STATE = 2
29
30 class PublishStatus:
31     implements(IPublishStatus)
32     statusid_counter = count(0)
33     def __init__(self):
34         self.timings = {}
35         self.timings["send_per_server"] = {}
36         self.servermap = None
37         self.problems = {}
38         self.active = True
39         self.storage_index = None
40         self.helper = False
41         self.encoding = ("?", "?")
42         self.size = None
43         self.status = "Not started"
44         self.progress = 0.0
45         self.counter = self.statusid_counter.next()
46         self.started = time.time()
47
48     def add_per_server_time(self, peerid, elapsed):
49         if peerid not in self.timings["send_per_server"]:
50             self.timings["send_per_server"][peerid] = []
51         self.timings["send_per_server"][peerid].append(elapsed)
52
53     def get_started(self):
54         return self.started
55     def get_storage_index(self):
56         return self.storage_index
57     def get_encoding(self):
58         return self.encoding
59     def using_helper(self):
60         return self.helper
61     def get_servermap(self):
62         return self.servermap
63     def get_size(self):
64         return self.size
65     def get_status(self):
66         return self.status
67     def get_progress(self):
68         return self.progress
69     def get_active(self):
70         return self.active
71     def get_counter(self):
72         return self.counter
73
74     def set_storage_index(self, si):
75         self.storage_index = si
76     def set_helper(self, helper):
77         self.helper = helper
78     def set_servermap(self, servermap):
79         self.servermap = servermap
80     def set_encoding(self, k, n):
81         self.encoding = (k, n)
82     def set_size(self, size):
83         self.size = size
84     def set_status(self, status):
85         self.status = status
86     def set_progress(self, value):
87         self.progress = value
88     def set_active(self, value):
89         self.active = value
90
91 class LoopLimitExceededError(Exception):
92     pass
93
94 class Publish:
95     """I represent a single act of publishing the mutable file to the grid. I
96     will only publish my data if the servermap I am using still represents
97     the current state of the world.
98
99     To make the initial publish, set servermap to None.
100     """
101
102     def __init__(self, filenode, storage_broker, servermap):
103         self._node = filenode
104         self._storage_broker = storage_broker
105         self._servermap = servermap
106         self._storage_index = self._node.get_storage_index()
107         self._log_prefix = prefix = si_b2a(self._storage_index)[:5]
108         num = self.log("Publish(%s): starting" % prefix, parent=None)
109         self._log_number = num
110         self._running = True
111         self._first_write_error = None
112         self._last_failure = None
113
114         self._status = PublishStatus()
115         self._status.set_storage_index(self._storage_index)
116         self._status.set_helper(False)
117         self._status.set_progress(0.0)
118         self._status.set_active(True)
119         self._version = self._node.get_version()
120         assert self._version in (SDMF_VERSION, MDMF_VERSION)
121
122
123     def get_status(self):
124         return self._status
125
126     def log(self, *args, **kwargs):
127         if 'parent' not in kwargs:
128             kwargs['parent'] = self._log_number
129         if "facility" not in kwargs:
130             kwargs["facility"] = "tahoe.mutable.publish"
131         return log.msg(*args, **kwargs)
132
133
134     def update(self, data, offset, blockhashes, version):
135         """
136         I replace the contents of this file with the contents of data,
137         starting at offset. I return a Deferred that fires with None
138         when the replacement has been completed, or with an error if
139         something went wrong during the process.
140
141         Note that this process will not upload new shares. If the file
142         being updated is in need of repair, callers will have to repair
143         it on their own.
144         """
145         # How this works:
146         # 1: Make peer assignments. We'll assign each share that we know
147         # about on the grid to that peer that currently holds that
148         # share, and will not place any new shares.
149         # 2: Setup encoding parameters. Most of these will stay the same
150         # -- datalength will change, as will some of the offsets.
151         # 3. Upload the new segments.
152         # 4. Be done.
153         assert IMutableUploadable.providedBy(data)
154
155         self.data = data
156
157         # XXX: Use the MutableFileVersion instead.
158         self.datalength = self._node.get_size()
159         if data.get_size() > self.datalength:
160             self.datalength = data.get_size()
161
162         self.log("starting update")
163         self.log("adding new data of length %d at offset %d" % \
164                     (data.get_size(), offset))
165         self.log("new data length is %d" % self.datalength)
166         self._status.set_size(self.datalength)
167         self._status.set_status("Started")
168         self._started = time.time()
169
170         self.done_deferred = defer.Deferred()
171
172         self._writekey = self._node.get_writekey()
173         assert self._writekey, "need write capability to publish"
174
175         # first, which servers will we publish to? We require that the
176         # servermap was updated in MODE_WRITE, so we can depend upon the
177         # peerlist computed by that process instead of computing our own.
178         assert self._servermap
179         assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
180         # we will push a version that is one larger than anything present
181         # in the grid, according to the servermap.
182         self._new_seqnum = self._servermap.highest_seqnum() + 1
183         self._status.set_servermap(self._servermap)
184
185         self.log(format="new seqnum will be %(seqnum)d",
186                  seqnum=self._new_seqnum, level=log.NOISY)
187
188         # We're updating an existing file, so all of the following
189         # should be available.
190         self.readkey = self._node.get_readkey()
191         self.required_shares = self._node.get_required_shares()
192         assert self.required_shares is not None
193         self.total_shares = self._node.get_total_shares()
194         assert self.total_shares is not None
195         self._status.set_encoding(self.required_shares, self.total_shares)
196
197         self._pubkey = self._node.get_pubkey()
198         assert self._pubkey
199         self._privkey = self._node.get_privkey()
200         assert self._privkey
201         self._encprivkey = self._node.get_encprivkey()
202
203         sb = self._storage_broker
204         full_peerlist = [(s.get_serverid(), s.get_rref())
205                          for s in sb.get_servers_for_psi(self._storage_index)]
206         self.full_peerlist = full_peerlist # for use later, immutable
207         self.bad_peers = set() # peerids who have errbacked/refused requests
208
209         # This will set self.segment_size, self.num_segments, and
210         # self.fec. TODO: Does it know how to do the offset? Probably
211         # not. So do that part next.
212         self.setup_encoding_parameters(offset=offset)
213
214         # if we experience any surprises (writes which were rejected because
215         # our test vector did not match, or shares which we didn't expect to
216         # see), we set this flag and report an UncoordinatedWriteError at the
217         # end of the publish process.
218         self.surprised = False
219
220         # we keep track of three tables. The first is our goal: which share
221         # we want to see on which servers. This is initially populated by the
222         # existing servermap.
223         self.goal = set() # pairs of (peerid, shnum) tuples
224
225         # the second table is our list of outstanding queries: those which
226         # are in flight and may or may not be delivered, accepted, or
227         # acknowledged. Items are added to this table when the request is
228         # sent, and removed when the response returns (or errbacks).
229         self.outstanding = set() # (peerid, shnum) tuples
230
231         # the third is a table of successes: share which have actually been
232         # placed. These are populated when responses come back with success.
233         # When self.placed == self.goal, we're done.
234         self.placed = set() # (peerid, shnum) tuples
235
236         # we also keep a mapping from peerid to RemoteReference. Each time we
237         # pull a connection out of the full peerlist, we add it to this for
238         # use later.
239         self.connections = {}
240
241         self.bad_share_checkstrings = {}
242
243         # This is set at the last step of the publishing process.
244         self.versioninfo = ""
245
246         # we use the servermap to populate the initial goal: this way we will
247         # try to update each existing share in place. Since we're
248         # updating, we ignore damaged and missing shares -- callers must
249         # do a repair to repair and recreate these.
250         for (peerid, shnum) in self._servermap.servermap:
251             self.goal.add( (peerid, shnum) )
252             self.connections[peerid] = self._servermap.connections[peerid]
253         self.writers = {}
254
255         # SDMF files are updated differently.
256         self._version = MDMF_VERSION
257         writer_class = MDMFSlotWriteProxy
258
259         # For each (peerid, shnum) in self.goal, we make a
260         # write proxy for that peer. We'll use this to write
261         # shares to the peer.
262         for key in self.goal:
263             peerid, shnum = key
264             write_enabler = self._node.get_write_enabler(peerid)
265             renew_secret = self._node.get_renewal_secret(peerid)
266             cancel_secret = self._node.get_cancel_secret(peerid)
267             secrets = (write_enabler, renew_secret, cancel_secret)
268
269             self.writers[shnum] =  writer_class(shnum,
270                                                 self.connections[peerid],
271                                                 self._storage_index,
272                                                 secrets,
273                                                 self._new_seqnum,
274                                                 self.required_shares,
275                                                 self.total_shares,
276                                                 self.segment_size,
277                                                 self.datalength)
278             self.writers[shnum].peerid = peerid
279             assert (peerid, shnum) in self._servermap.servermap
280             old_versionid, old_timestamp = self._servermap.servermap[key]
281             (old_seqnum, old_root_hash, old_salt, old_segsize,
282              old_datalength, old_k, old_N, old_prefix,
283              old_offsets_tuple) = old_versionid
284             self.writers[shnum].set_checkstring(old_seqnum,
285                                                 old_root_hash,
286                                                 old_salt)
287
288         # Our remote shares will not have a complete checkstring until
289         # after we are done writing share data and have started to write
290         # blocks. In the meantime, we need to know what to look for when
291         # writing, so that we can detect UncoordinatedWriteErrors.
292         self._checkstring = self.writers.values()[0].get_checkstring()
293
294         # Now, we start pushing shares.
295         self._status.timings["setup"] = time.time() - self._started
296         # First, we encrypt, encode, and publish the shares that we need
297         # to encrypt, encode, and publish.
298
299         # Our update process fetched these for us. We need to update
300         # them in place as publishing happens.
301         self.blockhashes = {} # (shnum, [blochashes])
302         for (i, bht) in blockhashes.iteritems():
303             # We need to extract the leaves from our old hash tree.
304             old_segcount = mathutil.div_ceil(version[4],
305                                              version[3])
306             h = hashtree.IncompleteHashTree(old_segcount)
307             bht = dict(enumerate(bht))
308             h.set_hashes(bht)
309             leaves = h[h.get_leaf_index(0):]
310             for j in xrange(self.num_segments - len(leaves)):
311                 leaves.append(None)
312
313             assert len(leaves) >= self.num_segments
314             self.blockhashes[i] = leaves
315             # This list will now be the leaves that were set during the
316             # initial upload + enough empty hashes to make it a
317             # power-of-two. If we exceed a power of two boundary, we
318             # should be encoding the file over again, and should not be
319             # here. So, we have
320             #assert len(self.blockhashes[i]) == \
321             #    hashtree.roundup_pow2(self.num_segments), \
322             #        len(self.blockhashes[i])
323             # XXX: Except this doesn't work. Figure out why.
324
325         # These are filled in later, after we've modified the block hash
326         # tree suitably.
327         self.sharehash_leaves = None # eventually [sharehashes]
328         self.sharehashes = {} # shnum -> [sharehash leaves necessary to 
329                               # validate the share]
330
331         self.log("Starting push")
332
333         self._state = PUSHING_BLOCKS_STATE
334         self._push()
335
336         return self.done_deferred
337
338
339     def publish(self, newdata):
340         """Publish the filenode's current contents.  Returns a Deferred that
341         fires (with None) when the publish has done as much work as it's ever
342         going to do, or errbacks with ConsistencyError if it detects a
343         simultaneous write.
344         """
345
346         # 0. Setup encoding parameters, encoder, and other such things.
347         # 1. Encrypt, encode, and publish segments.
348         assert IMutableUploadable.providedBy(newdata)
349
350         self.data = newdata
351         self.datalength = newdata.get_size()
352         #if self.datalength >= DEFAULT_MAX_SEGMENT_SIZE:
353         #    self._version = MDMF_VERSION
354         #else:
355         #    self._version = SDMF_VERSION
356
357         self.log("starting publish, datalen is %s" % self.datalength)
358         self._status.set_size(self.datalength)
359         self._status.set_status("Started")
360         self._started = time.time()
361
362         self.done_deferred = defer.Deferred()
363
364         self._writekey = self._node.get_writekey()
365         assert self._writekey, "need write capability to publish"
366
367         # first, which servers will we publish to? We require that the
368         # servermap was updated in MODE_WRITE, so we can depend upon the
369         # peerlist computed by that process instead of computing our own.
370         if self._servermap:
371             assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
372             # we will push a version that is one larger than anything present
373             # in the grid, according to the servermap.
374             self._new_seqnum = self._servermap.highest_seqnum() + 1
375         else:
376             # If we don't have a servermap, that's because we're doing the
377             # initial publish
378             self._new_seqnum = 1
379             self._servermap = ServerMap()
380         self._status.set_servermap(self._servermap)
381
382         self.log(format="new seqnum will be %(seqnum)d",
383                  seqnum=self._new_seqnum, level=log.NOISY)
384
385         # having an up-to-date servermap (or using a filenode that was just
386         # created for the first time) also guarantees that the following
387         # fields are available
388         self.readkey = self._node.get_readkey()
389         self.required_shares = self._node.get_required_shares()
390         assert self.required_shares is not None
391         self.total_shares = self._node.get_total_shares()
392         assert self.total_shares is not None
393         self._status.set_encoding(self.required_shares, self.total_shares)
394
395         self._pubkey = self._node.get_pubkey()
396         assert self._pubkey
397         self._privkey = self._node.get_privkey()
398         assert self._privkey
399         self._encprivkey = self._node.get_encprivkey()
400
401         sb = self._storage_broker
402         full_peerlist = [(s.get_serverid(), s.get_rref())
403                          for s in sb.get_servers_for_psi(self._storage_index)]
404         self.full_peerlist = full_peerlist # for use later, immutable
405         self.bad_peers = set() # peerids who have errbacked/refused requests
406
407         # This will set self.segment_size, self.num_segments, and
408         # self.fec.
409         self.setup_encoding_parameters()
410
411         # if we experience any surprises (writes which were rejected because
412         # our test vector did not match, or shares which we didn't expect to
413         # see), we set this flag and report an UncoordinatedWriteError at the
414         # end of the publish process.
415         self.surprised = False
416
417         # we keep track of three tables. The first is our goal: which share
418         # we want to see on which servers. This is initially populated by the
419         # existing servermap.
420         self.goal = set() # pairs of (peerid, shnum) tuples
421
422         # the second table is our list of outstanding queries: those which
423         # are in flight and may or may not be delivered, accepted, or
424         # acknowledged. Items are added to this table when the request is
425         # sent, and removed when the response returns (or errbacks).
426         self.outstanding = set() # (peerid, shnum) tuples
427
428         # the third is a table of successes: share which have actually been
429         # placed. These are populated when responses come back with success.
430         # When self.placed == self.goal, we're done.
431         self.placed = set() # (peerid, shnum) tuples
432
433         # we also keep a mapping from peerid to RemoteReference. Each time we
434         # pull a connection out of the full peerlist, we add it to this for
435         # use later.
436         self.connections = {}
437
438         self.bad_share_checkstrings = {}
439
440         # This is set at the last step of the publishing process.
441         self.versioninfo = ""
442
443         # we use the servermap to populate the initial goal: this way we will
444         # try to update each existing share in place.
445         for (peerid, shnum) in self._servermap.servermap:
446             self.goal.add( (peerid, shnum) )
447             self.connections[peerid] = self._servermap.connections[peerid]
448         # then we add in all the shares that were bad (corrupted, bad
449         # signatures, etc). We want to replace these.
450         for key, old_checkstring in self._servermap.bad_shares.items():
451             (peerid, shnum) = key
452             self.goal.add(key)
453             self.bad_share_checkstrings[key] = old_checkstring
454             self.connections[peerid] = self._servermap.connections[peerid]
455
456         # TODO: Make this part do peer selection.
457         self.update_goal()
458         self.writers = {}
459         if self._version == MDMF_VERSION:
460             writer_class = MDMFSlotWriteProxy
461         else:
462             writer_class = SDMFSlotWriteProxy
463
464         # For each (peerid, shnum) in self.goal, we make a
465         # write proxy for that peer. We'll use this to write
466         # shares to the peer.
467         for key in self.goal:
468             peerid, shnum = key
469             write_enabler = self._node.get_write_enabler(peerid)
470             renew_secret = self._node.get_renewal_secret(peerid)
471             cancel_secret = self._node.get_cancel_secret(peerid)
472             secrets = (write_enabler, renew_secret, cancel_secret)
473
474             self.writers[shnum] =  writer_class(shnum,
475                                                 self.connections[peerid],
476                                                 self._storage_index,
477                                                 secrets,
478                                                 self._new_seqnum,
479                                                 self.required_shares,
480                                                 self.total_shares,
481                                                 self.segment_size,
482                                                 self.datalength)
483             self.writers[shnum].peerid = peerid
484             if (peerid, shnum) in self._servermap.servermap:
485                 old_versionid, old_timestamp = self._servermap.servermap[key]
486                 (old_seqnum, old_root_hash, old_salt, old_segsize,
487                  old_datalength, old_k, old_N, old_prefix,
488                  old_offsets_tuple) = old_versionid
489                 self.writers[shnum].set_checkstring(old_seqnum,
490                                                     old_root_hash,
491                                                     old_salt)
492             elif (peerid, shnum) in self.bad_share_checkstrings:
493                 old_checkstring = self.bad_share_checkstrings[(peerid, shnum)]
494                 self.writers[shnum].set_checkstring(old_checkstring)
495
496         # Our remote shares will not have a complete checkstring until
497         # after we are done writing share data and have started to write
498         # blocks. In the meantime, we need to know what to look for when
499         # writing, so that we can detect UncoordinatedWriteErrors.
500         self._checkstring = self.writers.values()[0].get_checkstring()
501
502         # Now, we start pushing shares.
503         self._status.timings["setup"] = time.time() - self._started
504         # First, we encrypt, encode, and publish the shares that we need
505         # to encrypt, encode, and publish.
506
507         # This will eventually hold the block hash chain for each share
508         # that we publish. We define it this way so that empty publishes
509         # will still have something to write to the remote slot.
510         self.blockhashes = dict([(i, []) for i in xrange(self.total_shares)])
511         for i in xrange(self.total_shares):
512             blocks = self.blockhashes[i]
513             for j in xrange(self.num_segments):
514                 blocks.append(None)
515         self.sharehash_leaves = None # eventually [sharehashes]
516         self.sharehashes = {} # shnum -> [sharehash leaves necessary to 
517                               # validate the share]
518
519         self.log("Starting push")
520
521         self._state = PUSHING_BLOCKS_STATE
522         self._push()
523
524         return self.done_deferred
525
526
527     def _update_status(self):
528         self._status.set_status("Sending Shares: %d placed out of %d, "
529                                 "%d messages outstanding" %
530                                 (len(self.placed),
531                                  len(self.goal),
532                                  len(self.outstanding)))
533         self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
534
535
536     def setup_encoding_parameters(self, offset=0):
537         if self._version == MDMF_VERSION:
538             segment_size = DEFAULT_MAX_SEGMENT_SIZE # 128 KiB by default
539         else:
540             segment_size = self.datalength # SDMF is only one segment
541         # this must be a multiple of self.required_shares
542         segment_size = mathutil.next_multiple(segment_size,
543                                               self.required_shares)
544         self.segment_size = segment_size
545
546         # Calculate the starting segment for the upload.
547         if segment_size:
548             # We use div_ceil instead of integer division here because
549             # it is semantically correct.
550             # If datalength isn't an even multiple of segment_size, but
551             # is larger than segment_size, datalength // segment_size
552             # will be the largest number such that num <= datalength and
553             # num % segment_size == 0. But that's not what we want,
554             # because it ignores the extra data. div_ceil will give us
555             # the right number of segments for the data that we're
556             # given.
557             self.num_segments = mathutil.div_ceil(self.datalength,
558                                                   segment_size)
559
560             self.starting_segment = offset // segment_size
561
562         else:
563             self.num_segments = 0
564             self.starting_segment = 0
565
566
567         self.log("building encoding parameters for file")
568         self.log("got segsize %d" % self.segment_size)
569         self.log("got %d segments" % self.num_segments)
570
571         if self._version == SDMF_VERSION:
572             assert self.num_segments in (0, 1) # SDMF
573         # calculate the tail segment size.
574
575         if segment_size and self.datalength:
576             self.tail_segment_size = self.datalength % segment_size
577             self.log("got tail segment size %d" % self.tail_segment_size)
578         else:
579             self.tail_segment_size = 0
580
581         if self.tail_segment_size == 0 and segment_size:
582             # The tail segment is the same size as the other segments.
583             self.tail_segment_size = segment_size
584
585         # Make FEC encoders
586         fec = codec.CRSEncoder()
587         fec.set_params(self.segment_size,
588                        self.required_shares, self.total_shares)
589         self.piece_size = fec.get_block_size()
590         self.fec = fec
591
592         if self.tail_segment_size == self.segment_size:
593             self.tail_fec = self.fec
594         else:
595             tail_fec = codec.CRSEncoder()
596             tail_fec.set_params(self.tail_segment_size,
597                                 self.required_shares,
598                                 self.total_shares)
599             self.tail_fec = tail_fec
600
601         self._current_segment = self.starting_segment
602         self.end_segment = self.num_segments - 1
603         # Now figure out where the last segment should be.
604         if self.data.get_size() != self.datalength:
605             # We're updating a few segments in the middle of a mutable
606             # file, so we don't want to republish the whole thing.
607             # (we don't have enough data to do that even if we wanted
608             # to)
609             end = self.data.get_size()
610             self.end_segment = end // segment_size
611             if end % segment_size == 0:
612                 self.end_segment -= 1
613
614         self.log("got start segment %d" % self.starting_segment)
615         self.log("got end segment %d" % self.end_segment)
616
617
618     def _push(self, ignored=None):
619         """
620         I manage state transitions. In particular, I see that we still
621         have a good enough number of writers to complete the upload
622         successfully.
623         """
624         # Can we still successfully publish this file?
625         # TODO: Keep track of outstanding queries before aborting the
626         #       process.
627         if len(self.writers) < self.required_shares or self.surprised:
628             return self._failure()
629
630         # Figure out what we need to do next. Each of these needs to
631         # return a deferred so that we don't block execution when this
632         # is first called in the upload method.
633         if self._state == PUSHING_BLOCKS_STATE:
634             return self.push_segment(self._current_segment)
635
636         elif self._state == PUSHING_EVERYTHING_ELSE_STATE:
637             return self.push_everything_else()
638
639         # If we make it to this point, we were successful in placing the
640         # file.
641         return self._done()
642
643
644     def push_segment(self, segnum):
645         if self.num_segments == 0 and self._version == SDMF_VERSION:
646             self._add_dummy_salts()
647
648         if segnum > self.end_segment:
649             # We don't have any more segments to push.
650             self._state = PUSHING_EVERYTHING_ELSE_STATE
651             return self._push()
652
653         d = self._encode_segment(segnum)
654         d.addCallback(self._push_segment, segnum)
655         def _increment_segnum(ign):
656             self._current_segment += 1
657         # XXX: I don't think we need to do addBoth here -- any errBacks
658         # should be handled within push_segment.
659         d.addCallback(_increment_segnum)
660         d.addCallback(self._turn_barrier)
661         d.addCallback(self._push)
662         d.addErrback(self._failure)
663
664
665     def _turn_barrier(self, result):
666         """
667         I help the publish process avoid the recursion limit issues
668         described in #237.
669         """
670         return fireEventually(result)
671
672
673     def _add_dummy_salts(self):
674         """
675         SDMF files need a salt even if they're empty, or the signature
676         won't make sense. This method adds a dummy salt to each of our
677         SDMF writers so that they can write the signature later.
678         """
679         salt = os.urandom(16)
680         assert self._version == SDMF_VERSION
681
682         for writer in self.writers.itervalues():
683             writer.put_salt(salt)
684
685
686     def _encode_segment(self, segnum):
687         """
688         I encrypt and encode the segment segnum.
689         """
690         started = time.time()
691
692         if segnum + 1 == self.num_segments:
693             segsize = self.tail_segment_size
694         else:
695             segsize = self.segment_size
696
697
698         self.log("Pushing segment %d of %d" % (segnum + 1, self.num_segments))
699         data = self.data.read(segsize)
700         # XXX: This is dumb. Why return a list?
701         data = "".join(data)
702
703         assert len(data) == segsize, len(data)
704
705         salt = os.urandom(16)
706
707         key = hashutil.ssk_readkey_data_hash(salt, self.readkey)
708         self._status.set_status("Encrypting")
709         enc = AES(key)
710         crypttext = enc.process(data)
711         assert len(crypttext) == len(data)
712
713         now = time.time()
714         self._status.timings["encrypt"] = now - started
715         started = now
716
717         # now apply FEC
718         if segnum + 1 == self.num_segments:
719             fec = self.tail_fec
720         else:
721             fec = self.fec
722
723         self._status.set_status("Encoding")
724         crypttext_pieces = [None] * self.required_shares
725         piece_size = fec.get_block_size()
726         for i in range(len(crypttext_pieces)):
727             offset = i * piece_size
728             piece = crypttext[offset:offset+piece_size]
729             piece = piece + "\x00"*(piece_size - len(piece)) # padding
730             crypttext_pieces[i] = piece
731             assert len(piece) == piece_size
732         d = fec.encode(crypttext_pieces)
733         def _done_encoding(res):
734             elapsed = time.time() - started
735             self._status.timings["encode"] = elapsed
736             return (res, salt)
737         d.addCallback(_done_encoding)
738         return d
739
740
741     def _push_segment(self, encoded_and_salt, segnum):
742         """
743         I push (data, salt) as segment number segnum.
744         """
745         results, salt = encoded_and_salt
746         shares, shareids = results
747         self._status.set_status("Pushing segment")
748         for i in xrange(len(shares)):
749             sharedata = shares[i]
750             shareid = shareids[i]
751             if self._version == MDMF_VERSION:
752                 hashed = salt + sharedata
753             else:
754                 hashed = sharedata
755             block_hash = hashutil.block_hash(hashed)
756             self.blockhashes[shareid][segnum] = block_hash
757             # find the writer for this share
758             writer = self.writers[shareid]
759             writer.put_block(sharedata, segnum, salt)
760
761
762     def push_everything_else(self):
763         """
764         I put everything else associated with a share.
765         """
766         self._pack_started = time.time()
767         self.push_encprivkey()
768         self.push_blockhashes()
769         self.push_sharehashes()
770         self.push_toplevel_hashes_and_signature()
771         d = self.finish_publishing()
772         def _change_state(ignored):
773             self._state = DONE_STATE
774         d.addCallback(_change_state)
775         d.addCallback(self._push)
776         return d
777
778
779     def push_encprivkey(self):
780         encprivkey = self._encprivkey
781         self._status.set_status("Pushing encrypted private key")
782         for writer in self.writers.itervalues():
783             writer.put_encprivkey(encprivkey)
784
785
786     def push_blockhashes(self):
787         self.sharehash_leaves = [None] * len(self.blockhashes)
788         self._status.set_status("Building and pushing block hash tree")
789         for shnum, blockhashes in self.blockhashes.iteritems():
790             t = hashtree.HashTree(blockhashes)
791             self.blockhashes[shnum] = list(t)
792             # set the leaf for future use.
793             self.sharehash_leaves[shnum] = t[0]
794
795             writer = self.writers[shnum]
796             writer.put_blockhashes(self.blockhashes[shnum])
797
798
799     def push_sharehashes(self):
800         self._status.set_status("Building and pushing share hash chain")
801         share_hash_tree = hashtree.HashTree(self.sharehash_leaves)
802         for shnum in xrange(len(self.sharehash_leaves)):
803             needed_indices = share_hash_tree.needed_hashes(shnum)
804             self.sharehashes[shnum] = dict( [ (i, share_hash_tree[i])
805                                              for i in needed_indices] )
806             writer = self.writers[shnum]
807             writer.put_sharehashes(self.sharehashes[shnum])
808         self.root_hash = share_hash_tree[0]
809
810
811     def push_toplevel_hashes_and_signature(self):
812         # We need to to three things here:
813         #   - Push the root hash and salt hash
814         #   - Get the checkstring of the resulting layout; sign that.
815         #   - Push the signature
816         self._status.set_status("Pushing root hashes and signature")
817         for shnum in xrange(self.total_shares):
818             writer = self.writers[shnum]
819             writer.put_root_hash(self.root_hash)
820         self._update_checkstring()
821         self._make_and_place_signature()
822
823
824     def _update_checkstring(self):
825         """
826         After putting the root hash, MDMF files will have the
827         checkstring written to the storage server. This means that we
828         can update our copy of the checkstring so we can detect
829         uncoordinated writes. SDMF files will have the same checkstring,
830         so we need not do anything.
831         """
832         self._checkstring = self.writers.values()[0].get_checkstring()
833
834
835     def _make_and_place_signature(self):
836         """
837         I create and place the signature.
838         """
839         started = time.time()
840         self._status.set_status("Signing prefix")
841         signable = self.writers[0].get_signable()
842         self.signature = self._privkey.sign(signable)
843
844         for (shnum, writer) in self.writers.iteritems():
845             writer.put_signature(self.signature)
846         self._status.timings['sign'] = time.time() - started
847
848
849     def finish_publishing(self):
850         # We're almost done -- we just need to put the verification key
851         # and the offsets
852         started = time.time()
853         self._status.set_status("Pushing shares")
854         self._started_pushing = started
855         ds = []
856         verification_key = self._pubkey.serialize()
857
858
859         # TODO: Bad, since we remove from this same dict. We need to
860         # make a copy, or just use a non-iterated value.
861         for (shnum, writer) in self.writers.iteritems():
862             writer.put_verification_key(verification_key)
863             d = writer.finish_publishing()
864             # Add the (peerid, shnum) tuple to our list of outstanding
865             # queries. This gets used by _loop if some of our queries
866             # fail to place shares.
867             self.outstanding.add((writer.peerid, writer.shnum))
868             d.addCallback(self._got_write_answer, writer, started)
869             d.addErrback(self._connection_problem, writer)
870             ds.append(d)
871         self._record_verinfo()
872         self._status.timings['pack'] = time.time() - started
873         return defer.DeferredList(ds)
874
875
876     def _record_verinfo(self):
877         self.versioninfo = self.writers.values()[0].get_verinfo()
878
879
880     def _connection_problem(self, f, writer):
881         """
882         We ran into a connection problem while working with writer, and
883         need to deal with that.
884         """
885         self.log("found problem: %s" % str(f))
886         self._last_failure = f
887         del(self.writers[writer.shnum])
888
889
890     def log_goal(self, goal, message=""):
891         logmsg = [message]
892         for (shnum, peerid) in sorted([(s,p) for (p,s) in goal]):
893             logmsg.append("sh%d to [%s]" % (shnum,
894                                             idlib.shortnodeid_b2a(peerid)))
895         self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
896         self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
897                  level=log.NOISY)
898
899     def update_goal(self):
900         # if log.recording_noisy
901         if True:
902             self.log_goal(self.goal, "before update: ")
903
904         # first, remove any bad peers from our goal
905         self.goal = set([ (peerid, shnum)
906                           for (peerid, shnum) in self.goal
907                           if peerid not in self.bad_peers ])
908
909         # find the homeless shares:
910         homefull_shares = set([shnum for (peerid, shnum) in self.goal])
911         homeless_shares = set(range(self.total_shares)) - homefull_shares
912         homeless_shares = sorted(list(homeless_shares))
913         # place them somewhere. We prefer unused servers at the beginning of
914         # the available peer list.
915
916         if not homeless_shares:
917             return
918
919         # if an old share X is on a node, put the new share X there too.
920         # TODO: 1: redistribute shares to achieve one-per-peer, by copying
921         #       shares from existing peers to new (less-crowded) ones. The
922         #       old shares must still be updated.
923         # TODO: 2: move those shares instead of copying them, to reduce future
924         #       update work
925
926         # this is a bit CPU intensive but easy to analyze. We create a sort
927         # order for each peerid. If the peerid is marked as bad, we don't
928         # even put them in the list. Then we care about the number of shares
929         # which have already been assigned to them. After that we care about
930         # their permutation order.
931         old_assignments = DictOfSets()
932         for (peerid, shnum) in self.goal:
933             old_assignments.add(peerid, shnum)
934
935         peerlist = []
936         for i, (peerid, ss) in enumerate(self.full_peerlist):
937             if peerid in self.bad_peers:
938                 continue
939             entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
940             peerlist.append(entry)
941         peerlist.sort()
942
943         if not peerlist:
944             raise NotEnoughServersError("Ran out of non-bad servers, "
945                                         "first_error=%s" %
946                                         str(self._first_write_error),
947                                         self._first_write_error)
948
949         # we then index this peerlist with an integer, because we may have to
950         # wrap. We update the goal as we go.
951         i = 0
952         for shnum in homeless_shares:
953             (ignored1, ignored2, peerid, ss) = peerlist[i]
954             # if we are forced to send a share to a server that already has
955             # one, we may have two write requests in flight, and the
956             # servermap (which was computed before either request was sent)
957             # won't reflect the new shares, so the second response will be
958             # surprising. There is code in _got_write_answer() to tolerate
959             # this, otherwise it would cause the publish to fail with an
960             # UncoordinatedWriteError. See #546 for details of the trouble
961             # this used to cause.
962             self.goal.add( (peerid, shnum) )
963             self.connections[peerid] = ss
964             i += 1
965             if i >= len(peerlist):
966                 i = 0
967         if True:
968             self.log_goal(self.goal, "after update: ")
969
970
971     def _got_write_answer(self, answer, writer, started):
972         if not answer:
973             # SDMF writers only pretend to write when readers set their
974             # blocks, salts, and so on -- they actually just write once,
975             # at the end of the upload process. In fake writes, they
976             # return defer.succeed(None). If we see that, we shouldn't
977             # bother checking it.
978             return
979
980         peerid = writer.peerid
981         lp = self.log("_got_write_answer from %s, share %d" %
982                       (idlib.shortnodeid_b2a(peerid), writer.shnum))
983
984         now = time.time()
985         elapsed = now - started
986
987         self._status.add_per_server_time(peerid, elapsed)
988
989         wrote, read_data = answer
990
991         surprise_shares = set(read_data.keys()) - set([writer.shnum])
992
993         # We need to remove from surprise_shares any shares that we are
994         # knowingly also writing to that peer from other writers.
995
996         # TODO: Precompute this.
997         known_shnums = [x.shnum for x in self.writers.values()
998                         if x.peerid == peerid]
999         surprise_shares -= set(known_shnums)
1000         self.log("found the following surprise shares: %s" %
1001                  str(surprise_shares))
1002
1003         # Now surprise shares contains all of the shares that we did not
1004         # expect to be there.
1005
1006         surprised = False
1007         for shnum in surprise_shares:
1008             # read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX)
1009             checkstring = read_data[shnum][0]
1010             # What we want to do here is to see if their (seqnum,
1011             # roothash, salt) is the same as our (seqnum, roothash,
1012             # salt), or the equivalent for MDMF. The best way to do this
1013             # is to store a packed representation of our checkstring
1014             # somewhere, then not bother unpacking the other
1015             # checkstring.
1016             if checkstring == self._checkstring:
1017                 # they have the right share, somehow
1018
1019                 if (peerid,shnum) in self.goal:
1020                     # and we want them to have it, so we probably sent them a
1021                     # copy in an earlier write. This is ok, and avoids the
1022                     # #546 problem.
1023                     continue
1024
1025                 # They aren't in our goal, but they are still for the right
1026                 # version. Somebody else wrote them, and it's a convergent
1027                 # uncoordinated write. Pretend this is ok (don't be
1028                 # surprised), since I suspect there's a decent chance that
1029                 # we'll hit this in normal operation.
1030                 continue
1031
1032             else:
1033                 # the new shares are of a different version
1034                 if peerid in self._servermap.reachable_peers:
1035                     # we asked them about their shares, so we had knowledge
1036                     # of what they used to have. Any surprising shares must
1037                     # have come from someone else, so UCW.
1038                     surprised = True
1039                 else:
1040                     # we didn't ask them, and now we've discovered that they
1041                     # have a share we didn't know about. This indicates that
1042                     # mapupdate should have wokred harder and asked more
1043                     # servers before concluding that it knew about them all.
1044
1045                     # signal UCW, but make sure to ask this peer next time,
1046                     # so we'll remember to update it if/when we retry.
1047                     surprised = True
1048                     # TODO: ask this peer next time. I don't yet have a good
1049                     # way to do this. Two insufficient possibilities are:
1050                     #
1051                     # self._servermap.add_new_share(peerid, shnum, verinfo, now)
1052                     #  but that requires fetching/validating/parsing the whole
1053                     #  version string, and all we have is the checkstring
1054                     # self._servermap.mark_bad_share(peerid, shnum, checkstring)
1055                     #  that will make publish overwrite the share next time,
1056                     #  but it won't re-query the server, and it won't make
1057                     #  mapupdate search further
1058
1059                     # TODO later: when publish starts, do
1060                     # servermap.get_best_version(), extract the seqnum,
1061                     # subtract one, and store as highest-replaceable-seqnum.
1062                     # Then, if this surprise-because-we-didn't-ask share is
1063                     # of highest-replaceable-seqnum or lower, we're allowed
1064                     # to replace it: send out a new writev (or rather add it
1065                     # to self.goal and loop).
1066                     pass
1067
1068                 surprised = True
1069
1070         if surprised:
1071             self.log("they had shares %s that we didn't know about" %
1072                      (list(surprise_shares),),
1073                      parent=lp, level=log.WEIRD, umid="un9CSQ")
1074             self.surprised = True
1075
1076         if not wrote:
1077             # TODO: there are two possibilities. The first is that the server
1078             # is full (or just doesn't want to give us any room), which means
1079             # we shouldn't ask them again, but is *not* an indication of an
1080             # uncoordinated write. The second is that our testv failed, which
1081             # *does* indicate an uncoordinated write. We currently don't have
1082             # a way to tell these two apart (in fact, the storage server code
1083             # doesn't have the option of refusing our share).
1084             #
1085             # If the server is full, mark the peer as bad (so we don't ask
1086             # them again), but don't set self.surprised. The loop() will find
1087             # a new server.
1088             #
1089             # If the testv failed, log it, set self.surprised, but don't
1090             # bother adding to self.bad_peers .
1091
1092             self.log("our testv failed, so the write did not happen",
1093                      parent=lp, level=log.WEIRD, umid="8sc26g")
1094             self.surprised = True
1095             self.bad_peers.add(writer) # don't ask them again
1096             # use the checkstring to add information to the log message
1097             for (shnum,readv) in read_data.items():
1098                 checkstring = readv[0]
1099                 (other_seqnum,
1100                  other_roothash,
1101                  other_salt) = unpack_checkstring(checkstring)
1102                 expected_version = self._servermap.version_on_peer(peerid,
1103                                                                    shnum)
1104                 if expected_version:
1105                     (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
1106                      offsets_tuple) = expected_version
1107                     self.log("somebody modified the share on us:"
1108                              " shnum=%d: I thought they had #%d:R=%s,"
1109                              " but testv reported #%d:R=%s" %
1110                              (shnum,
1111                               seqnum, base32.b2a(root_hash)[:4],
1112                               other_seqnum, base32.b2a(other_roothash)[:4]),
1113                              parent=lp, level=log.NOISY)
1114                 # if expected_version==None, then we didn't expect to see a
1115                 # share on that peer, and the 'surprise_shares' clause above
1116                 # will have logged it.
1117             return
1118
1119         # and update the servermap
1120         # self.versioninfo is set during the last phase of publishing.
1121         # If we get there, we know that responses correspond to placed
1122         # shares, and can safely execute these statements.
1123         if self.versioninfo:
1124             self.log("wrote successfully: adding new share to servermap")
1125             self._servermap.add_new_share(peerid, writer.shnum,
1126                                           self.versioninfo, started)
1127             self.placed.add( (peerid, writer.shnum) )
1128         self._update_status()
1129         # the next method in the deferred chain will check to see if
1130         # we're done and successful.
1131         return
1132
1133
1134     def _done(self):
1135         if not self._running:
1136             return
1137         self._running = False
1138         now = time.time()
1139         self._status.timings["total"] = now - self._started
1140
1141         elapsed = now - self._started_pushing
1142         self._status.timings['push'] = elapsed
1143
1144         self._status.set_active(False)
1145         self.log("Publish done, success")
1146         self._status.set_status("Finished")
1147         self._status.set_progress(1.0)
1148         # Get k and segsize, then give them to the caller.
1149         hints = {}
1150         hints['segsize'] = self.segment_size
1151         hints['k'] = self.required_shares
1152         self._node.set_downloader_hints(hints)
1153         eventually(self.done_deferred.callback, None)
1154
1155     def _failure(self, f=None):
1156         if f:
1157             self._last_failure = f
1158
1159         if not self.surprised:
1160             # We ran out of servers
1161             msg = "Publish ran out of good servers"
1162             if self._last_failure:
1163                 msg += ", last failure was: %s" % str(self._last_failure)
1164             self.log(msg)
1165             e = NotEnoughServersError(msg)
1166
1167         else:
1168             # We ran into shares that we didn't recognize, which means
1169             # that we need to return an UncoordinatedWriteError.
1170             self.log("Publish failed with UncoordinatedWriteError")
1171             e = UncoordinatedWriteError()
1172         f = failure.Failure(e)
1173         eventually(self.done_deferred.callback, f)
1174
1175
1176 class MutableFileHandle:
1177     """
1178     I am a mutable uploadable built around a filehandle-like object,
1179     usually either a StringIO instance or a handle to an actual file.
1180     """
1181     implements(IMutableUploadable)
1182
1183     def __init__(self, filehandle):
1184         # The filehandle is defined as a generally file-like object that
1185         # has these two methods. We don't care beyond that.
1186         assert hasattr(filehandle, "read")
1187         assert hasattr(filehandle, "close")
1188
1189         self._filehandle = filehandle
1190         # We must start reading at the beginning of the file, or we risk
1191         # encountering errors when the data read does not match the size
1192         # reported to the uploader.
1193         self._filehandle.seek(0)
1194
1195         # We have not yet read anything, so our position is 0.
1196         self._marker = 0
1197
1198
1199     def get_size(self):
1200         """
1201         I return the amount of data in my filehandle.
1202         """
1203         if not hasattr(self, "_size"):
1204             old_position = self._filehandle.tell()
1205             # Seek to the end of the file by seeking 0 bytes from the
1206             # file's end
1207             self._filehandle.seek(0, 2) # 2 == os.SEEK_END in 2.5+
1208             self._size = self._filehandle.tell()
1209             # Restore the previous position, in case this was called
1210             # after a read.
1211             self._filehandle.seek(old_position)
1212             assert self._filehandle.tell() == old_position
1213
1214         assert hasattr(self, "_size")
1215         return self._size
1216
1217
1218     def pos(self):
1219         """
1220         I return the position of my read marker -- i.e., how much data I
1221         have already read and returned to callers.
1222         """
1223         return self._marker
1224
1225
1226     def read(self, length):
1227         """
1228         I return some data (up to length bytes) from my filehandle.
1229
1230         In most cases, I return length bytes, but sometimes I won't --
1231         for example, if I am asked to read beyond the end of a file, or
1232         an error occurs.
1233         """
1234         results = self._filehandle.read(length)
1235         self._marker += len(results)
1236         return [results]
1237
1238
1239     def close(self):
1240         """
1241         I close the underlying filehandle. Any further operations on the
1242         filehandle fail at this point.
1243         """
1244         self._filehandle.close()
1245
1246
1247 class MutableData(MutableFileHandle):
1248     """
1249     I am a mutable uploadable built around a string, which I then cast
1250     into a StringIO and treat as a filehandle.
1251     """
1252
1253     def __init__(self, s):
1254         # Take a string and return a file-like uploadable.
1255         assert isinstance(s, str)
1256
1257         MutableFileHandle.__init__(self, StringIO(s))
1258
1259
1260 class TransformingUploadable:
1261     """
1262     I am an IMutableUploadable that wraps another IMutableUploadable,
1263     and some segments that are already on the grid. When I am called to
1264     read, I handle merging of boundary segments.
1265     """
1266     implements(IMutableUploadable)
1267
1268
1269     def __init__(self, data, offset, segment_size, start, end):
1270         assert IMutableUploadable.providedBy(data)
1271
1272         self._newdata = data
1273         self._offset = offset
1274         self._segment_size = segment_size
1275         self._start = start
1276         self._end = end
1277
1278         self._read_marker = 0
1279
1280         self._first_segment_offset = offset % segment_size
1281
1282         num = self.log("TransformingUploadable: starting", parent=None)
1283         self._log_number = num
1284         self.log("got fso: %d" % self._first_segment_offset)
1285         self.log("got offset: %d" % self._offset)
1286
1287
1288     def log(self, *args, **kwargs):
1289         if 'parent' not in kwargs:
1290             kwargs['parent'] = self._log_number
1291         if "facility" not in kwargs:
1292             kwargs["facility"] = "tahoe.mutable.transforminguploadable"
1293         return log.msg(*args, **kwargs)
1294
1295
1296     def get_size(self):
1297         return self._offset + self._newdata.get_size()
1298
1299
1300     def read(self, length):
1301         # We can get data from 3 sources here. 
1302         #   1. The first of the segments provided to us.
1303         #   2. The data that we're replacing things with.
1304         #   3. The last of the segments provided to us.
1305
1306         # are we in state 0?
1307         self.log("reading %d bytes" % length)
1308
1309         old_start_data = ""
1310         old_data_length = self._first_segment_offset - self._read_marker
1311         if old_data_length > 0:
1312             if old_data_length > length:
1313                 old_data_length = length
1314             self.log("returning %d bytes of old start data" % old_data_length)
1315
1316             old_data_end = old_data_length + self._read_marker
1317             old_start_data = self._start[self._read_marker:old_data_end]
1318             length -= old_data_length
1319         else:
1320             # otherwise calculations later get screwed up.
1321             old_data_length = 0
1322
1323         # Is there enough new data to satisfy this read? If not, we need
1324         # to pad the end of the data with data from our last segment.
1325         old_end_length = length - \
1326             (self._newdata.get_size() - self._newdata.pos())
1327         old_end_data = ""
1328         if old_end_length > 0:
1329             self.log("reading %d bytes of old end data" % old_end_length)
1330
1331             # TODO: We're not explicitly checking for tail segment size
1332             # here. Is that a problem?
1333             old_data_offset = (length - old_end_length + \
1334                                old_data_length) % self._segment_size
1335             self.log("reading at offset %d" % old_data_offset)
1336             old_end = old_data_offset + old_end_length
1337             old_end_data = self._end[old_data_offset:old_end]
1338             length -= old_end_length
1339             assert length == self._newdata.get_size() - self._newdata.pos()
1340
1341         self.log("reading %d bytes of new data" % length)
1342         new_data = self._newdata.read(length)
1343         new_data = "".join(new_data)
1344
1345         self._read_marker += len(old_start_data + new_data + old_end_data)
1346
1347         return old_start_data + new_data + old_end_data
1348
1349     def close(self):
1350         pass