]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/publish.py
IServer refactoring: pass IServer instances around, instead of peerids
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / publish.py
1
2
3 import os, time
4 from StringIO import StringIO
5 from itertools import count
6 from zope.interface import implements
7 from twisted.internet import defer
8 from twisted.python import failure
9 from allmydata.interfaces import IPublishStatus, SDMF_VERSION, MDMF_VERSION, \
10                                  IMutableUploadable
11 from allmydata.util import base32, hashutil, mathutil, log
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from foolscap.api import eventually, fireEventually
17
18 from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, \
19      UncoordinatedWriteError, NotEnoughServersError
20 from allmydata.mutable.servermap import ServerMap
21 from allmydata.mutable.layout import get_version_from_checkstring,\
22                                      unpack_mdmf_checkstring, \
23                                      unpack_sdmf_checkstring, \
24                                      MDMFSlotWriteProxy, \
25                                      SDMFSlotWriteProxy
26
27 KiB = 1024
28 DEFAULT_MAX_SEGMENT_SIZE = 128 * KiB
29 PUSHING_BLOCKS_STATE = 0
30 PUSHING_EVERYTHING_ELSE_STATE = 1
31 DONE_STATE = 2
32
33 class PublishStatus:
34     implements(IPublishStatus)
35     statusid_counter = count(0)
36     def __init__(self):
37         self.timings = {}
38         self.timings["send_per_server"] = {}
39         self.timings["encrypt"] = 0.0
40         self.timings["encode"] = 0.0
41         self.servermap = None
42         self._problems = {}
43         self.active = True
44         self.storage_index = None
45         self.helper = False
46         self.encoding = ("?", "?")
47         self.size = None
48         self.status = "Not started"
49         self.progress = 0.0
50         self.counter = self.statusid_counter.next()
51         self.started = time.time()
52
53     def add_per_server_time(self, server, elapsed):
54         serverid = server.get_serverid()
55         if serverid not in self.timings["send_per_server"]:
56             self.timings["send_per_server"][serverid] = []
57         self.timings["send_per_server"][serverid].append(elapsed)
58     def accumulate_encode_time(self, elapsed):
59         self.timings["encode"] += elapsed
60     def accumulate_encrypt_time(self, elapsed):
61         self.timings["encrypt"] += elapsed
62
63     def get_started(self):
64         return self.started
65     def get_storage_index(self):
66         return self.storage_index
67     def get_encoding(self):
68         return self.encoding
69     def using_helper(self):
70         return self.helper
71     def get_servermap(self):
72         return self.servermap
73     def get_size(self):
74         return self.size
75     def get_status(self):
76         return self.status
77     def get_progress(self):
78         return self.progress
79     def get_active(self):
80         return self.active
81     def get_counter(self):
82         return self.counter
83     def get_problems(self):
84         return self._problems
85
86     def set_storage_index(self, si):
87         self.storage_index = si
88     def set_helper(self, helper):
89         self.helper = helper
90     def set_servermap(self, servermap):
91         self.servermap = servermap
92     def set_encoding(self, k, n):
93         self.encoding = (k, n)
94     def set_size(self, size):
95         self.size = size
96     def set_status(self, status):
97         self.status = status
98     def set_progress(self, value):
99         self.progress = value
100     def set_active(self, value):
101         self.active = value
102
103 class LoopLimitExceededError(Exception):
104     pass
105
106 class Publish:
107     """I represent a single act of publishing the mutable file to the grid. I
108     will only publish my data if the servermap I am using still represents
109     the current state of the world.
110
111     To make the initial publish, set servermap to None.
112     """
113
114     def __init__(self, filenode, storage_broker, servermap):
115         self._node = filenode
116         self._storage_broker = storage_broker
117         self._servermap = servermap
118         self._storage_index = self._node.get_storage_index()
119         self._log_prefix = prefix = si_b2a(self._storage_index)[:5]
120         num = self.log("Publish(%s): starting" % prefix, parent=None)
121         self._log_number = num
122         self._running = True
123         self._first_write_error = None
124         self._last_failure = None
125
126         self._status = PublishStatus()
127         self._status.set_storage_index(self._storage_index)
128         self._status.set_helper(False)
129         self._status.set_progress(0.0)
130         self._status.set_active(True)
131         self._version = self._node.get_version()
132         assert self._version in (SDMF_VERSION, MDMF_VERSION)
133
134
135     def get_status(self):
136         return self._status
137
138     def log(self, *args, **kwargs):
139         if 'parent' not in kwargs:
140             kwargs['parent'] = self._log_number
141         if "facility" not in kwargs:
142             kwargs["facility"] = "tahoe.mutable.publish"
143         return log.msg(*args, **kwargs)
144
145
146     def update(self, data, offset, blockhashes, version):
147         """
148         I replace the contents of this file with the contents of data,
149         starting at offset. I return a Deferred that fires with None
150         when the replacement has been completed, or with an error if
151         something went wrong during the process.
152
153         Note that this process will not upload new shares. If the file
154         being updated is in need of repair, callers will have to repair
155         it on their own.
156         """
157         # How this works:
158         # 1: Make server assignments. We'll assign each share that we know
159         # about on the grid to that server that currently holds that
160         # share, and will not place any new shares.
161         # 2: Setup encoding parameters. Most of these will stay the same
162         # -- datalength will change, as will some of the offsets.
163         # 3. Upload the new segments.
164         # 4. Be done.
165         assert IMutableUploadable.providedBy(data)
166
167         self.data = data
168
169         # XXX: Use the MutableFileVersion instead.
170         self.datalength = self._node.get_size()
171         if data.get_size() > self.datalength:
172             self.datalength = data.get_size()
173
174         self.log("starting update")
175         self.log("adding new data of length %d at offset %d" % \
176                     (data.get_size(), offset))
177         self.log("new data length is %d" % self.datalength)
178         self._status.set_size(self.datalength)
179         self._status.set_status("Started")
180         self._started = time.time()
181
182         self.done_deferred = defer.Deferred()
183
184         self._writekey = self._node.get_writekey()
185         assert self._writekey, "need write capability to publish"
186
187         # first, which servers will we publish to? We require that the
188         # servermap was updated in MODE_WRITE, so we can depend upon the
189         # serverlist computed by that process instead of computing our own.
190         assert self._servermap
191         assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK)
192         # we will push a version that is one larger than anything present
193         # in the grid, according to the servermap.
194         self._new_seqnum = self._servermap.highest_seqnum() + 1
195         self._status.set_servermap(self._servermap)
196
197         self.log(format="new seqnum will be %(seqnum)d",
198                  seqnum=self._new_seqnum, level=log.NOISY)
199
200         # We're updating an existing file, so all of the following
201         # should be available.
202         self.readkey = self._node.get_readkey()
203         self.required_shares = self._node.get_required_shares()
204         assert self.required_shares is not None
205         self.total_shares = self._node.get_total_shares()
206         assert self.total_shares is not None
207         self._status.set_encoding(self.required_shares, self.total_shares)
208
209         self._pubkey = self._node.get_pubkey()
210         assert self._pubkey
211         self._privkey = self._node.get_privkey()
212         assert self._privkey
213         self._encprivkey = self._node.get_encprivkey()
214
215         sb = self._storage_broker
216         full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
217         self.full_serverlist = full_serverlist # for use later, immutable
218         self.bad_servers = set() # servers who have errbacked/refused requests
219
220         # This will set self.segment_size, self.num_segments, and
221         # self.fec. TODO: Does it know how to do the offset? Probably
222         # not. So do that part next.
223         self.setup_encoding_parameters(offset=offset)
224
225         # if we experience any surprises (writes which were rejected because
226         # our test vector did not match, or shares which we didn't expect to
227         # see), we set this flag and report an UncoordinatedWriteError at the
228         # end of the publish process.
229         self.surprised = False
230
231         # we keep track of three tables. The first is our goal: which share
232         # we want to see on which servers. This is initially populated by the
233         # existing servermap.
234         self.goal = set() # pairs of (server, shnum) tuples
235
236         # the number of outstanding queries: those that are in flight and
237         # may or may not be delivered, accepted, or acknowledged. This is
238         # incremented when a query is sent, and decremented when the response
239         # returns or errbacks.
240         self.num_outstanding = 0
241
242         # the third is a table of successes: share which have actually been
243         # placed. These are populated when responses come back with success.
244         # When self.placed == self.goal, we're done.
245         self.placed = set() # (server, shnum) tuples
246
247         self.bad_share_checkstrings = {}
248
249         # This is set at the last step of the publishing process.
250         self.versioninfo = ""
251
252         # we use the servermap to populate the initial goal: this way we will
253         # try to update each existing share in place. Since we're
254         # updating, we ignore damaged and missing shares -- callers must
255         # do a repair to repair and recreate these.
256         self.goal = set(self._servermap.get_known_shares())
257         self.writers = {}
258
259         # SDMF files are updated differently.
260         self._version = MDMF_VERSION
261         writer_class = MDMFSlotWriteProxy
262
263         # For each (server, shnum) in self.goal, we make a
264         # write proxy for that server. We'll use this to write
265         # shares to the server.
266         for (server,shnum) in self.goal:
267             write_enabler = self._node.get_write_enabler(server)
268             renew_secret = self._node.get_renewal_secret(server)
269             cancel_secret = self._node.get_cancel_secret(server)
270             secrets = (write_enabler, renew_secret, cancel_secret)
271
272             self.writers[shnum] =  writer_class(shnum,
273                                                 server.get_rref(),
274                                                 self._storage_index,
275                                                 secrets,
276                                                 self._new_seqnum,
277                                                 self.required_shares,
278                                                 self.total_shares,
279                                                 self.segment_size,
280                                                 self.datalength)
281             self.writers[shnum].server = server
282             known_shares = self._servermap.get_known_shares()
283             assert (server, shnum) in known_shares
284             old_versionid, old_timestamp = known_shares[(server,shnum)]
285             (old_seqnum, old_root_hash, old_salt, old_segsize,
286              old_datalength, old_k, old_N, old_prefix,
287              old_offsets_tuple) = old_versionid
288             self.writers[shnum].set_checkstring(old_seqnum,
289                                                 old_root_hash,
290                                                 old_salt)
291
292         # Our remote shares will not have a complete checkstring until
293         # after we are done writing share data and have started to write
294         # blocks. In the meantime, we need to know what to look for when
295         # writing, so that we can detect UncoordinatedWriteErrors.
296         self._checkstring = self.writers.values()[0].get_checkstring()
297
298         # Now, we start pushing shares.
299         self._status.timings["setup"] = time.time() - self._started
300         # First, we encrypt, encode, and publish the shares that we need
301         # to encrypt, encode, and publish.
302
303         # Our update process fetched these for us. We need to update
304         # them in place as publishing happens.
305         self.blockhashes = {} # (shnum, [blochashes])
306         for (i, bht) in blockhashes.iteritems():
307             # We need to extract the leaves from our old hash tree.
308             old_segcount = mathutil.div_ceil(version[4],
309                                              version[3])
310             h = hashtree.IncompleteHashTree(old_segcount)
311             bht = dict(enumerate(bht))
312             h.set_hashes(bht)
313             leaves = h[h.get_leaf_index(0):]
314             for j in xrange(self.num_segments - len(leaves)):
315                 leaves.append(None)
316
317             assert len(leaves) >= self.num_segments
318             self.blockhashes[i] = leaves
319             # This list will now be the leaves that were set during the
320             # initial upload + enough empty hashes to make it a
321             # power-of-two. If we exceed a power of two boundary, we
322             # should be encoding the file over again, and should not be
323             # here. So, we have
324             #assert len(self.blockhashes[i]) == \
325             #    hashtree.roundup_pow2(self.num_segments), \
326             #        len(self.blockhashes[i])
327             # XXX: Except this doesn't work. Figure out why.
328
329         # These are filled in later, after we've modified the block hash
330         # tree suitably.
331         self.sharehash_leaves = None # eventually [sharehashes]
332         self.sharehashes = {} # shnum -> [sharehash leaves necessary to 
333                               # validate the share]
334
335         self.log("Starting push")
336
337         self._state = PUSHING_BLOCKS_STATE
338         self._push()
339
340         return self.done_deferred
341
342
343     def publish(self, newdata):
344         """Publish the filenode's current contents.  Returns a Deferred that
345         fires (with None) when the publish has done as much work as it's ever
346         going to do, or errbacks with ConsistencyError if it detects a
347         simultaneous write.
348         """
349
350         # 0. Setup encoding parameters, encoder, and other such things.
351         # 1. Encrypt, encode, and publish segments.
352         assert IMutableUploadable.providedBy(newdata)
353
354         self.data = newdata
355         self.datalength = newdata.get_size()
356         #if self.datalength >= DEFAULT_MAX_SEGMENT_SIZE:
357         #    self._version = MDMF_VERSION
358         #else:
359         #    self._version = SDMF_VERSION
360
361         self.log("starting publish, datalen is %s" % self.datalength)
362         self._status.set_size(self.datalength)
363         self._status.set_status("Started")
364         self._started = time.time()
365
366         self.done_deferred = defer.Deferred()
367
368         self._writekey = self._node.get_writekey()
369         assert self._writekey, "need write capability to publish"
370
371         # first, which servers will we publish to? We require that the
372         # servermap was updated in MODE_WRITE, so we can depend upon the
373         # serverlist computed by that process instead of computing our own.
374         if self._servermap:
375             assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK)
376             # we will push a version that is one larger than anything present
377             # in the grid, according to the servermap.
378             self._new_seqnum = self._servermap.highest_seqnum() + 1
379         else:
380             # If we don't have a servermap, that's because we're doing the
381             # initial publish
382             self._new_seqnum = 1
383             self._servermap = ServerMap()
384         self._status.set_servermap(self._servermap)
385
386         self.log(format="new seqnum will be %(seqnum)d",
387                  seqnum=self._new_seqnum, level=log.NOISY)
388
389         # having an up-to-date servermap (or using a filenode that was just
390         # created for the first time) also guarantees that the following
391         # fields are available
392         self.readkey = self._node.get_readkey()
393         self.required_shares = self._node.get_required_shares()
394         assert self.required_shares is not None
395         self.total_shares = self._node.get_total_shares()
396         assert self.total_shares is not None
397         self._status.set_encoding(self.required_shares, self.total_shares)
398
399         self._pubkey = self._node.get_pubkey()
400         assert self._pubkey
401         self._privkey = self._node.get_privkey()
402         assert self._privkey
403         self._encprivkey = self._node.get_encprivkey()
404
405         sb = self._storage_broker
406         full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
407         self.full_serverlist = full_serverlist # for use later, immutable
408         self.bad_servers = set() # servers who have errbacked/refused requests
409
410         # This will set self.segment_size, self.num_segments, and
411         # self.fec.
412         self.setup_encoding_parameters()
413
414         # if we experience any surprises (writes which were rejected because
415         # our test vector did not match, or shares which we didn't expect to
416         # see), we set this flag and report an UncoordinatedWriteError at the
417         # end of the publish process.
418         self.surprised = False
419
420         # we keep track of three tables. The first is our goal: which share
421         # we want to see on which servers. This is initially populated by the
422         # existing servermap.
423         self.goal = set() # pairs of (server, shnum) tuples
424
425         # the number of outstanding queries: those that are in flight and
426         # may or may not be delivered, accepted, or acknowledged. This is
427         # incremented when a query is sent, and decremented when the response
428         # returns or errbacks.
429         self.num_outstanding = 0
430
431         # the third is a table of successes: share which have actually been
432         # placed. These are populated when responses come back with success.
433         # When self.placed == self.goal, we're done.
434         self.placed = set() # (server, shnum) tuples
435
436         self.bad_share_checkstrings = {}
437
438         # This is set at the last step of the publishing process.
439         self.versioninfo = ""
440
441         # we use the servermap to populate the initial goal: this way we will
442         # try to update each existing share in place.
443         self.goal = set(self._servermap.get_known_shares())
444
445         # then we add in all the shares that were bad (corrupted, bad
446         # signatures, etc). We want to replace these.
447         for key, old_checkstring in self._servermap.get_bad_shares().items():
448             (server, shnum) = key
449             self.goal.add( (server,shnum) )
450             self.bad_share_checkstrings[(server,shnum)] = old_checkstring
451
452         # TODO: Make this part do server selection.
453         self.update_goal()
454         self.writers = {}
455         if self._version == MDMF_VERSION:
456             writer_class = MDMFSlotWriteProxy
457         else:
458             writer_class = SDMFSlotWriteProxy
459
460         # For each (server, shnum) in self.goal, we make a
461         # write proxy for that server. We'll use this to write
462         # shares to the server.
463         for (server,shnum) in self.goal:
464             write_enabler = self._node.get_write_enabler(server)
465             renew_secret = self._node.get_renewal_secret(server)
466             cancel_secret = self._node.get_cancel_secret(server)
467             secrets = (write_enabler, renew_secret, cancel_secret)
468
469             self.writers[shnum] =  writer_class(shnum,
470                                                 server.get_rref(),
471                                                 self._storage_index,
472                                                 secrets,
473                                                 self._new_seqnum,
474                                                 self.required_shares,
475                                                 self.total_shares,
476                                                 self.segment_size,
477                                                 self.datalength)
478             self.writers[shnum].server = server
479             known_shares = self._servermap.get_known_shares()
480             if (server, shnum) in known_shares:
481                 old_versionid, old_timestamp = known_shares[(server,shnum)]
482                 (old_seqnum, old_root_hash, old_salt, old_segsize,
483                  old_datalength, old_k, old_N, old_prefix,
484                  old_offsets_tuple) = old_versionid
485                 self.writers[shnum].set_checkstring(old_seqnum,
486                                                     old_root_hash,
487                                                     old_salt)
488             elif (server, shnum) in self.bad_share_checkstrings:
489                 old_checkstring = self.bad_share_checkstrings[(server, shnum)]
490                 self.writers[shnum].set_checkstring(old_checkstring)
491
492         # Our remote shares will not have a complete checkstring until
493         # after we are done writing share data and have started to write
494         # blocks. In the meantime, we need to know what to look for when
495         # writing, so that we can detect UncoordinatedWriteErrors.
496         self._checkstring = self.writers.values()[0].get_checkstring()
497
498         # Now, we start pushing shares.
499         self._status.timings["setup"] = time.time() - self._started
500         # First, we encrypt, encode, and publish the shares that we need
501         # to encrypt, encode, and publish.
502
503         # This will eventually hold the block hash chain for each share
504         # that we publish. We define it this way so that empty publishes
505         # will still have something to write to the remote slot.
506         self.blockhashes = dict([(i, []) for i in xrange(self.total_shares)])
507         for i in xrange(self.total_shares):
508             blocks = self.blockhashes[i]
509             for j in xrange(self.num_segments):
510                 blocks.append(None)
511         self.sharehash_leaves = None # eventually [sharehashes]
512         self.sharehashes = {} # shnum -> [sharehash leaves necessary to 
513                               # validate the share]
514
515         self.log("Starting push")
516
517         self._state = PUSHING_BLOCKS_STATE
518         self._push()
519
520         return self.done_deferred
521
522
523     def _update_status(self):
524         self._status.set_status("Sending Shares: %d placed out of %d, "
525                                 "%d messages outstanding" %
526                                 (len(self.placed),
527                                  len(self.goal),
528                                  self.num_outstanding))
529         self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
530
531
532     def setup_encoding_parameters(self, offset=0):
533         if self._version == MDMF_VERSION:
534             segment_size = DEFAULT_MAX_SEGMENT_SIZE # 128 KiB by default
535         else:
536             segment_size = self.datalength # SDMF is only one segment
537         # this must be a multiple of self.required_shares
538         segment_size = mathutil.next_multiple(segment_size,
539                                               self.required_shares)
540         self.segment_size = segment_size
541
542         # Calculate the starting segment for the upload.
543         if segment_size:
544             # We use div_ceil instead of integer division here because
545             # it is semantically correct.
546             # If datalength isn't an even multiple of segment_size, but
547             # is larger than segment_size, datalength // segment_size
548             # will be the largest number such that num <= datalength and
549             # num % segment_size == 0. But that's not what we want,
550             # because it ignores the extra data. div_ceil will give us
551             # the right number of segments for the data that we're
552             # given.
553             self.num_segments = mathutil.div_ceil(self.datalength,
554                                                   segment_size)
555
556             self.starting_segment = offset // segment_size
557
558         else:
559             self.num_segments = 0
560             self.starting_segment = 0
561
562
563         self.log("building encoding parameters for file")
564         self.log("got segsize %d" % self.segment_size)
565         self.log("got %d segments" % self.num_segments)
566
567         if self._version == SDMF_VERSION:
568             assert self.num_segments in (0, 1) # SDMF
569         # calculate the tail segment size.
570
571         if segment_size and self.datalength:
572             self.tail_segment_size = self.datalength % segment_size
573             self.log("got tail segment size %d" % self.tail_segment_size)
574         else:
575             self.tail_segment_size = 0
576
577         if self.tail_segment_size == 0 and segment_size:
578             # The tail segment is the same size as the other segments.
579             self.tail_segment_size = segment_size
580
581         # Make FEC encoders
582         fec = codec.CRSEncoder()
583         fec.set_params(self.segment_size,
584                        self.required_shares, self.total_shares)
585         self.piece_size = fec.get_block_size()
586         self.fec = fec
587
588         if self.tail_segment_size == self.segment_size:
589             self.tail_fec = self.fec
590         else:
591             tail_fec = codec.CRSEncoder()
592             tail_fec.set_params(self.tail_segment_size,
593                                 self.required_shares,
594                                 self.total_shares)
595             self.tail_fec = tail_fec
596
597         self._current_segment = self.starting_segment
598         self.end_segment = self.num_segments - 1
599         # Now figure out where the last segment should be.
600         if self.data.get_size() != self.datalength:
601             # We're updating a few segments in the middle of a mutable
602             # file, so we don't want to republish the whole thing.
603             # (we don't have enough data to do that even if we wanted
604             # to)
605             end = self.data.get_size()
606             self.end_segment = end // segment_size
607             if end % segment_size == 0:
608                 self.end_segment -= 1
609
610         self.log("got start segment %d" % self.starting_segment)
611         self.log("got end segment %d" % self.end_segment)
612
613
614     def _push(self, ignored=None):
615         """
616         I manage state transitions. In particular, I see that we still
617         have a good enough number of writers to complete the upload
618         successfully.
619         """
620         # Can we still successfully publish this file?
621         # TODO: Keep track of outstanding queries before aborting the
622         #       process.
623         if len(self.writers) < self.required_shares or self.surprised:
624             return self._failure()
625
626         # Figure out what we need to do next. Each of these needs to
627         # return a deferred so that we don't block execution when this
628         # is first called in the upload method.
629         if self._state == PUSHING_BLOCKS_STATE:
630             return self.push_segment(self._current_segment)
631
632         elif self._state == PUSHING_EVERYTHING_ELSE_STATE:
633             return self.push_everything_else()
634
635         # If we make it to this point, we were successful in placing the
636         # file.
637         return self._done()
638
639
640     def push_segment(self, segnum):
641         if self.num_segments == 0 and self._version == SDMF_VERSION:
642             self._add_dummy_salts()
643
644         if segnum > self.end_segment:
645             # We don't have any more segments to push.
646             self._state = PUSHING_EVERYTHING_ELSE_STATE
647             return self._push()
648
649         d = self._encode_segment(segnum)
650         d.addCallback(self._push_segment, segnum)
651         def _increment_segnum(ign):
652             self._current_segment += 1
653         # XXX: I don't think we need to do addBoth here -- any errBacks
654         # should be handled within push_segment.
655         d.addCallback(_increment_segnum)
656         d.addCallback(self._turn_barrier)
657         d.addCallback(self._push)
658         d.addErrback(self._failure)
659
660
661     def _turn_barrier(self, result):
662         """
663         I help the publish process avoid the recursion limit issues
664         described in #237.
665         """
666         return fireEventually(result)
667
668
669     def _add_dummy_salts(self):
670         """
671         SDMF files need a salt even if they're empty, or the signature
672         won't make sense. This method adds a dummy salt to each of our
673         SDMF writers so that they can write the signature later.
674         """
675         salt = os.urandom(16)
676         assert self._version == SDMF_VERSION
677
678         for writer in self.writers.itervalues():
679             writer.put_salt(salt)
680
681
682     def _encode_segment(self, segnum):
683         """
684         I encrypt and encode the segment segnum.
685         """
686         started = time.time()
687
688         if segnum + 1 == self.num_segments:
689             segsize = self.tail_segment_size
690         else:
691             segsize = self.segment_size
692
693
694         self.log("Pushing segment %d of %d" % (segnum + 1, self.num_segments))
695         data = self.data.read(segsize)
696         # XXX: This is dumb. Why return a list?
697         data = "".join(data)
698
699         assert len(data) == segsize, len(data)
700
701         salt = os.urandom(16)
702
703         key = hashutil.ssk_readkey_data_hash(salt, self.readkey)
704         self._status.set_status("Encrypting")
705         enc = AES(key)
706         crypttext = enc.process(data)
707         assert len(crypttext) == len(data)
708
709         now = time.time()
710         self._status.accumulate_encrypt_time(now - started)
711         started = now
712
713         # now apply FEC
714         if segnum + 1 == self.num_segments:
715             fec = self.tail_fec
716         else:
717             fec = self.fec
718
719         self._status.set_status("Encoding")
720         crypttext_pieces = [None] * self.required_shares
721         piece_size = fec.get_block_size()
722         for i in range(len(crypttext_pieces)):
723             offset = i * piece_size
724             piece = crypttext[offset:offset+piece_size]
725             piece = piece + "\x00"*(piece_size - len(piece)) # padding
726             crypttext_pieces[i] = piece
727             assert len(piece) == piece_size
728         d = fec.encode(crypttext_pieces)
729         def _done_encoding(res):
730             elapsed = time.time() - started
731             self._status.accumulate_encode_time(elapsed)
732             return (res, salt)
733         d.addCallback(_done_encoding)
734         return d
735
736
737     def _push_segment(self, encoded_and_salt, segnum):
738         """
739         I push (data, salt) as segment number segnum.
740         """
741         results, salt = encoded_and_salt
742         shares, shareids = results
743         self._status.set_status("Pushing segment")
744         for i in xrange(len(shares)):
745             sharedata = shares[i]
746             shareid = shareids[i]
747             if self._version == MDMF_VERSION:
748                 hashed = salt + sharedata
749             else:
750                 hashed = sharedata
751             block_hash = hashutil.block_hash(hashed)
752             self.blockhashes[shareid][segnum] = block_hash
753             # find the writer for this share
754             writer = self.writers[shareid]
755             writer.put_block(sharedata, segnum, salt)
756
757
758     def push_everything_else(self):
759         """
760         I put everything else associated with a share.
761         """
762         self._pack_started = time.time()
763         self.push_encprivkey()
764         self.push_blockhashes()
765         self.push_sharehashes()
766         self.push_toplevel_hashes_and_signature()
767         d = self.finish_publishing()
768         def _change_state(ignored):
769             self._state = DONE_STATE
770         d.addCallback(_change_state)
771         d.addCallback(self._push)
772         return d
773
774
775     def push_encprivkey(self):
776         encprivkey = self._encprivkey
777         self._status.set_status("Pushing encrypted private key")
778         for writer in self.writers.itervalues():
779             writer.put_encprivkey(encprivkey)
780
781
782     def push_blockhashes(self):
783         self.sharehash_leaves = [None] * len(self.blockhashes)
784         self._status.set_status("Building and pushing block hash tree")
785         for shnum, blockhashes in self.blockhashes.iteritems():
786             t = hashtree.HashTree(blockhashes)
787             self.blockhashes[shnum] = list(t)
788             # set the leaf for future use.
789             self.sharehash_leaves[shnum] = t[0]
790
791             writer = self.writers[shnum]
792             writer.put_blockhashes(self.blockhashes[shnum])
793
794
795     def push_sharehashes(self):
796         self._status.set_status("Building and pushing share hash chain")
797         share_hash_tree = hashtree.HashTree(self.sharehash_leaves)
798         for shnum in xrange(len(self.sharehash_leaves)):
799             needed_indices = share_hash_tree.needed_hashes(shnum)
800             self.sharehashes[shnum] = dict( [ (i, share_hash_tree[i])
801                                              for i in needed_indices] )
802             writer = self.writers[shnum]
803             writer.put_sharehashes(self.sharehashes[shnum])
804         self.root_hash = share_hash_tree[0]
805
806
807     def push_toplevel_hashes_and_signature(self):
808         # We need to to three things here:
809         #   - Push the root hash and salt hash
810         #   - Get the checkstring of the resulting layout; sign that.
811         #   - Push the signature
812         self._status.set_status("Pushing root hashes and signature")
813         for shnum in xrange(self.total_shares):
814             writer = self.writers[shnum]
815             writer.put_root_hash(self.root_hash)
816         self._update_checkstring()
817         self._make_and_place_signature()
818
819
820     def _update_checkstring(self):
821         """
822         After putting the root hash, MDMF files will have the
823         checkstring written to the storage server. This means that we
824         can update our copy of the checkstring so we can detect
825         uncoordinated writes. SDMF files will have the same checkstring,
826         so we need not do anything.
827         """
828         self._checkstring = self.writers.values()[0].get_checkstring()
829
830
831     def _make_and_place_signature(self):
832         """
833         I create and place the signature.
834         """
835         started = time.time()
836         self._status.set_status("Signing prefix")
837         signable = self.writers[0].get_signable()
838         self.signature = self._privkey.sign(signable)
839
840         for (shnum, writer) in self.writers.iteritems():
841             writer.put_signature(self.signature)
842         self._status.timings['sign'] = time.time() - started
843
844
845     def finish_publishing(self):
846         # We're almost done -- we just need to put the verification key
847         # and the offsets
848         started = time.time()
849         self._status.set_status("Pushing shares")
850         self._started_pushing = started
851         ds = []
852         verification_key = self._pubkey.serialize()
853
854         for (shnum, writer) in self.writers.copy().iteritems():
855             writer.put_verification_key(verification_key)
856             self.num_outstanding += 1
857             def _no_longer_outstanding(res):
858                 self.num_outstanding -= 1
859                 return res
860
861             d = writer.finish_publishing()
862             d.addBoth(_no_longer_outstanding)
863             d.addErrback(self._connection_problem, writer)
864             d.addCallback(self._got_write_answer, writer, started)
865             ds.append(d)
866         self._record_verinfo()
867         self._status.timings['pack'] = time.time() - started
868         return defer.DeferredList(ds)
869
870
871     def _record_verinfo(self):
872         self.versioninfo = self.writers.values()[0].get_verinfo()
873
874
875     def _connection_problem(self, f, writer):
876         """
877         We ran into a connection problem while working with writer, and
878         need to deal with that.
879         """
880         self.log("found problem: %s" % str(f))
881         self._last_failure = f
882         del(self.writers[writer.shnum])
883
884
885     def log_goal(self, goal, message=""):
886         logmsg = [message]
887         for (shnum, server) in sorted([(s,p) for (p,s) in goal]):
888             logmsg.append("sh%d to [%s]" % (shnum, server.get_name()))
889         self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
890         self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
891                  level=log.NOISY)
892
893     def update_goal(self):
894         # if log.recording_noisy
895         if True:
896             self.log_goal(self.goal, "before update: ")
897
898         # first, remove any bad servers from our goal
899         self.goal = set([ (server, shnum)
900                           for (server, shnum) in self.goal
901                           if server not in self.bad_servers ])
902
903         # find the homeless shares:
904         homefull_shares = set([shnum for (server, shnum) in self.goal])
905         homeless_shares = set(range(self.total_shares)) - homefull_shares
906         homeless_shares = sorted(list(homeless_shares))
907         # place them somewhere. We prefer unused servers at the beginning of
908         # the available server list.
909
910         if not homeless_shares:
911             return
912
913         # if an old share X is on a node, put the new share X there too.
914         # TODO: 1: redistribute shares to achieve one-per-server, by copying
915         #       shares from existing servers to new (less-crowded) ones. The
916         #       old shares must still be updated.
917         # TODO: 2: move those shares instead of copying them, to reduce future
918         #       update work
919
920         # this is a bit CPU intensive but easy to analyze. We create a sort
921         # order for each server. If the server is marked as bad, we don't
922         # even put them in the list. Then we care about the number of shares
923         # which have already been assigned to them. After that we care about
924         # their permutation order.
925         old_assignments = DictOfSets()
926         for (server, shnum) in self.goal:
927             old_assignments.add(server, shnum)
928
929         serverlist = []
930         for i, server in enumerate(self.full_serverlist):
931             serverid = server.get_serverid()
932             if server in self.bad_servers:
933                 continue
934             entry = (len(old_assignments.get(server, [])), i, serverid, server)
935             serverlist.append(entry)
936         serverlist.sort()
937
938         if not serverlist:
939             raise NotEnoughServersError("Ran out of non-bad servers, "
940                                         "first_error=%s" %
941                                         str(self._first_write_error),
942                                         self._first_write_error)
943
944         # we then index this serverlist with an integer, because we may have
945         # to wrap. We update the goal as we go.
946         i = 0
947         for shnum in homeless_shares:
948             (ignored1, ignored2, ignored3, server) = serverlist[i]
949             # if we are forced to send a share to a server that already has
950             # one, we may have two write requests in flight, and the
951             # servermap (which was computed before either request was sent)
952             # won't reflect the new shares, so the second response will be
953             # surprising. There is code in _got_write_answer() to tolerate
954             # this, otherwise it would cause the publish to fail with an
955             # UncoordinatedWriteError. See #546 for details of the trouble
956             # this used to cause.
957             self.goal.add( (server, shnum) )
958             i += 1
959             if i >= len(serverlist):
960                 i = 0
961         if True:
962             self.log_goal(self.goal, "after update: ")
963
964
965     def _got_write_answer(self, answer, writer, started):
966         if not answer:
967             # SDMF writers only pretend to write when readers set their
968             # blocks, salts, and so on -- they actually just write once,
969             # at the end of the upload process. In fake writes, they
970             # return defer.succeed(None). If we see that, we shouldn't
971             # bother checking it.
972             return
973
974         server = writer.server
975         lp = self.log("_got_write_answer from %s, share %d" %
976                       (server.get_name(), writer.shnum))
977
978         now = time.time()
979         elapsed = now - started
980
981         self._status.add_per_server_time(server, elapsed)
982
983         wrote, read_data = answer
984
985         surprise_shares = set(read_data.keys()) - set([writer.shnum])
986
987         # We need to remove from surprise_shares any shares that we are
988         # knowingly also writing to that server from other writers.
989
990         # TODO: Precompute this.
991         known_shnums = [x.shnum for x in self.writers.values()
992                         if x.server == server]
993         surprise_shares -= set(known_shnums)
994         self.log("found the following surprise shares: %s" %
995                  str(surprise_shares))
996
997         # Now surprise shares contains all of the shares that we did not
998         # expect to be there.
999
1000         surprised = False
1001         for shnum in surprise_shares:
1002             # read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX)
1003             checkstring = read_data[shnum][0]
1004             # What we want to do here is to see if their (seqnum,
1005             # roothash, salt) is the same as our (seqnum, roothash,
1006             # salt), or the equivalent for MDMF. The best way to do this
1007             # is to store a packed representation of our checkstring
1008             # somewhere, then not bother unpacking the other
1009             # checkstring.
1010             if checkstring == self._checkstring:
1011                 # they have the right share, somehow
1012
1013                 if (server,shnum) in self.goal:
1014                     # and we want them to have it, so we probably sent them a
1015                     # copy in an earlier write. This is ok, and avoids the
1016                     # #546 problem.
1017                     continue
1018
1019                 # They aren't in our goal, but they are still for the right
1020                 # version. Somebody else wrote them, and it's a convergent
1021                 # uncoordinated write. Pretend this is ok (don't be
1022                 # surprised), since I suspect there's a decent chance that
1023                 # we'll hit this in normal operation.
1024                 continue
1025
1026             else:
1027                 # the new shares are of a different version
1028                 if server in self._servermap.get_reachable_servers():
1029                     # we asked them about their shares, so we had knowledge
1030                     # of what they used to have. Any surprising shares must
1031                     # have come from someone else, so UCW.
1032                     surprised = True
1033                 else:
1034                     # we didn't ask them, and now we've discovered that they
1035                     # have a share we didn't know about. This indicates that
1036                     # mapupdate should have wokred harder and asked more
1037                     # servers before concluding that it knew about them all.
1038
1039                     # signal UCW, but make sure to ask this server next time,
1040                     # so we'll remember to update it if/when we retry.
1041                     surprised = True
1042                     # TODO: ask this server next time. I don't yet have a good
1043                     # way to do this. Two insufficient possibilities are:
1044                     #
1045                     # self._servermap.add_new_share(server, shnum, verinfo, now)
1046                     #  but that requires fetching/validating/parsing the whole
1047                     #  version string, and all we have is the checkstring
1048                     # self._servermap.mark_bad_share(server, shnum, checkstring)
1049                     #  that will make publish overwrite the share next time,
1050                     #  but it won't re-query the server, and it won't make
1051                     #  mapupdate search further
1052
1053                     # TODO later: when publish starts, do
1054                     # servermap.get_best_version(), extract the seqnum,
1055                     # subtract one, and store as highest-replaceable-seqnum.
1056                     # Then, if this surprise-because-we-didn't-ask share is
1057                     # of highest-replaceable-seqnum or lower, we're allowed
1058                     # to replace it: send out a new writev (or rather add it
1059                     # to self.goal and loop).
1060                     pass
1061
1062                 surprised = True
1063
1064         if surprised:
1065             self.log("they had shares %s that we didn't know about" %
1066                      (list(surprise_shares),),
1067                      parent=lp, level=log.WEIRD, umid="un9CSQ")
1068             self.surprised = True
1069
1070         if not wrote:
1071             # TODO: there are two possibilities. The first is that the server
1072             # is full (or just doesn't want to give us any room), which means
1073             # we shouldn't ask them again, but is *not* an indication of an
1074             # uncoordinated write. The second is that our testv failed, which
1075             # *does* indicate an uncoordinated write. We currently don't have
1076             # a way to tell these two apart (in fact, the storage server code
1077             # doesn't have the option of refusing our share).
1078             #
1079             # If the server is full, mark the server as bad (so we don't ask
1080             # them again), but don't set self.surprised. The loop() will find
1081             # a new server.
1082             #
1083             # If the testv failed, log it, set self.surprised, but don't
1084             # bother adding to self.bad_servers .
1085
1086             self.log("our testv failed, so the write did not happen",
1087                      parent=lp, level=log.WEIRD, umid="8sc26g")
1088             self.surprised = True
1089             self.bad_servers.add(server) # don't ask them again
1090             # use the checkstring to add information to the log message
1091             unknown_format = False
1092             for (shnum,readv) in read_data.items():
1093                 checkstring = readv[0]
1094                 version = get_version_from_checkstring(checkstring)
1095                 if version == MDMF_VERSION:
1096                     (other_seqnum,
1097                      other_roothash) = unpack_mdmf_checkstring(checkstring)
1098                 elif version == SDMF_VERSION:
1099                     (other_seqnum,
1100                      other_roothash,
1101                      other_IV) = unpack_sdmf_checkstring(checkstring)
1102                 else:
1103                     unknown_format = True
1104                 expected_version = self._servermap.version_on_server(server,
1105                                                                      shnum)
1106                 if expected_version:
1107                     (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
1108                      offsets_tuple) = expected_version
1109                     msg = ("somebody modified the share on us:"
1110                            " shnum=%d: I thought they had #%d:R=%s," %
1111                            (shnum,
1112                             seqnum, base32.b2a(root_hash)[:4]))
1113                     if unknown_format:
1114                         msg += (" but I don't know how to read share"
1115                                 " format %d" % version)
1116                     else:
1117                         msg += " but testv reported #%d:R=%s" % \
1118                                (other_seqnum, other_roothash)
1119                     self.log(msg, parent=lp, level=log.NOISY)
1120                 # if expected_version==None, then we didn't expect to see a
1121                 # share on that server, and the 'surprise_shares' clause
1122                 # above will have logged it.
1123             return
1124
1125         # and update the servermap
1126         # self.versioninfo is set during the last phase of publishing.
1127         # If we get there, we know that responses correspond to placed
1128         # shares, and can safely execute these statements.
1129         if self.versioninfo:
1130             self.log("wrote successfully: adding new share to servermap")
1131             self._servermap.add_new_share(server, writer.shnum,
1132                                           self.versioninfo, started)
1133             self.placed.add( (server, writer.shnum) )
1134         self._update_status()
1135         # the next method in the deferred chain will check to see if
1136         # we're done and successful.
1137         return
1138
1139
1140     def _done(self):
1141         if not self._running:
1142             return
1143         self._running = False
1144         now = time.time()
1145         self._status.timings["total"] = now - self._started
1146
1147         elapsed = now - self._started_pushing
1148         self._status.timings['push'] = elapsed
1149
1150         self._status.set_active(False)
1151         self.log("Publish done, success")
1152         self._status.set_status("Finished")
1153         self._status.set_progress(1.0)
1154         # Get k and segsize, then give them to the caller.
1155         hints = {}
1156         hints['segsize'] = self.segment_size
1157         hints['k'] = self.required_shares
1158         self._node.set_downloader_hints(hints)
1159         eventually(self.done_deferred.callback, None)
1160
1161     def _failure(self, f=None):
1162         if f:
1163             self._last_failure = f
1164
1165         if not self.surprised:
1166             # We ran out of servers
1167             msg = "Publish ran out of good servers"
1168             if self._last_failure:
1169                 msg += ", last failure was: %s" % str(self._last_failure)
1170             self.log(msg)
1171             e = NotEnoughServersError(msg)
1172
1173         else:
1174             # We ran into shares that we didn't recognize, which means
1175             # that we need to return an UncoordinatedWriteError.
1176             self.log("Publish failed with UncoordinatedWriteError")
1177             e = UncoordinatedWriteError()
1178         f = failure.Failure(e)
1179         eventually(self.done_deferred.callback, f)
1180
1181
1182 class MutableFileHandle:
1183     """
1184     I am a mutable uploadable built around a filehandle-like object,
1185     usually either a StringIO instance or a handle to an actual file.
1186     """
1187     implements(IMutableUploadable)
1188
1189     def __init__(self, filehandle):
1190         # The filehandle is defined as a generally file-like object that
1191         # has these two methods. We don't care beyond that.
1192         assert hasattr(filehandle, "read")
1193         assert hasattr(filehandle, "close")
1194
1195         self._filehandle = filehandle
1196         # We must start reading at the beginning of the file, or we risk
1197         # encountering errors when the data read does not match the size
1198         # reported to the uploader.
1199         self._filehandle.seek(0)
1200
1201         # We have not yet read anything, so our position is 0.
1202         self._marker = 0
1203
1204
1205     def get_size(self):
1206         """
1207         I return the amount of data in my filehandle.
1208         """
1209         if not hasattr(self, "_size"):
1210             old_position = self._filehandle.tell()
1211             # Seek to the end of the file by seeking 0 bytes from the
1212             # file's end
1213             self._filehandle.seek(0, 2) # 2 == os.SEEK_END in 2.5+
1214             self._size = self._filehandle.tell()
1215             # Restore the previous position, in case this was called
1216             # after a read.
1217             self._filehandle.seek(old_position)
1218             assert self._filehandle.tell() == old_position
1219
1220         assert hasattr(self, "_size")
1221         return self._size
1222
1223
1224     def pos(self):
1225         """
1226         I return the position of my read marker -- i.e., how much data I
1227         have already read and returned to callers.
1228         """
1229         return self._marker
1230
1231
1232     def read(self, length):
1233         """
1234         I return some data (up to length bytes) from my filehandle.
1235
1236         In most cases, I return length bytes, but sometimes I won't --
1237         for example, if I am asked to read beyond the end of a file, or
1238         an error occurs.
1239         """
1240         results = self._filehandle.read(length)
1241         self._marker += len(results)
1242         return [results]
1243
1244
1245     def close(self):
1246         """
1247         I close the underlying filehandle. Any further operations on the
1248         filehandle fail at this point.
1249         """
1250         self._filehandle.close()
1251
1252
1253 class MutableData(MutableFileHandle):
1254     """
1255     I am a mutable uploadable built around a string, which I then cast
1256     into a StringIO and treat as a filehandle.
1257     """
1258
1259     def __init__(self, s):
1260         # Take a string and return a file-like uploadable.
1261         assert isinstance(s, str)
1262
1263         MutableFileHandle.__init__(self, StringIO(s))
1264
1265
1266 class TransformingUploadable:
1267     """
1268     I am an IMutableUploadable that wraps another IMutableUploadable,
1269     and some segments that are already on the grid. When I am called to
1270     read, I handle merging of boundary segments.
1271     """
1272     implements(IMutableUploadable)
1273
1274
1275     def __init__(self, data, offset, segment_size, start, end):
1276         assert IMutableUploadable.providedBy(data)
1277
1278         self._newdata = data
1279         self._offset = offset
1280         self._segment_size = segment_size
1281         self._start = start
1282         self._end = end
1283
1284         self._read_marker = 0
1285
1286         self._first_segment_offset = offset % segment_size
1287
1288         num = self.log("TransformingUploadable: starting", parent=None)
1289         self._log_number = num
1290         self.log("got fso: %d" % self._first_segment_offset)
1291         self.log("got offset: %d" % self._offset)
1292
1293
1294     def log(self, *args, **kwargs):
1295         if 'parent' not in kwargs:
1296             kwargs['parent'] = self._log_number
1297         if "facility" not in kwargs:
1298             kwargs["facility"] = "tahoe.mutable.transforminguploadable"
1299         return log.msg(*args, **kwargs)
1300
1301
1302     def get_size(self):
1303         return self._offset + self._newdata.get_size()
1304
1305
1306     def read(self, length):
1307         # We can get data from 3 sources here. 
1308         #   1. The first of the segments provided to us.
1309         #   2. The data that we're replacing things with.
1310         #   3. The last of the segments provided to us.
1311
1312         # are we in state 0?
1313         self.log("reading %d bytes" % length)
1314
1315         old_start_data = ""
1316         old_data_length = self._first_segment_offset - self._read_marker
1317         if old_data_length > 0:
1318             if old_data_length > length:
1319                 old_data_length = length
1320             self.log("returning %d bytes of old start data" % old_data_length)
1321
1322             old_data_end = old_data_length + self._read_marker
1323             old_start_data = self._start[self._read_marker:old_data_end]
1324             length -= old_data_length
1325         else:
1326             # otherwise calculations later get screwed up.
1327             old_data_length = 0
1328
1329         # Is there enough new data to satisfy this read? If not, we need
1330         # to pad the end of the data with data from our last segment.
1331         old_end_length = length - \
1332             (self._newdata.get_size() - self._newdata.pos())
1333         old_end_data = ""
1334         if old_end_length > 0:
1335             self.log("reading %d bytes of old end data" % old_end_length)
1336
1337             # TODO: We're not explicitly checking for tail segment size
1338             # here. Is that a problem?
1339             old_data_offset = (length - old_end_length + \
1340                                old_data_length) % self._segment_size
1341             self.log("reading at offset %d" % old_data_offset)
1342             old_end = old_data_offset + old_end_length
1343             old_end_data = self._end[old_data_offset:old_end]
1344             length -= old_end_length
1345             assert length == self._newdata.get_size() - self._newdata.pos()
1346
1347         self.log("reading %d bytes of new data" % length)
1348         new_data = self._newdata.read(length)
1349         new_data = "".join(new_data)
1350
1351         self._read_marker += len(old_start_data + new_data + old_end_data)
1352
1353         return old_start_data + new_data + old_end_data
1354
1355     def close(self):
1356         pass