]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/publish.py
036bf51d23d91ee9ee0ac87a01e8b14406ecf7ae
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / publish.py
1
2
3 import os, time
4 from StringIO import StringIO
5 from itertools import count
6 from zope.interface import implements
7 from twisted.internet import defer
8 from twisted.python import failure
9 from allmydata.interfaces import IPublishStatus, SDMF_VERSION, MDMF_VERSION, \
10                                  IMutableUploadable
11 from allmydata.util import base32, hashutil, mathutil, idlib, log
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from foolscap.api import eventually, fireEventually
17
18 from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, \
19      UncoordinatedWriteError, NotEnoughServersError
20 from allmydata.mutable.servermap import ServerMap
21 from allmydata.mutable.layout import get_version_from_checkstring,\
22                                      unpack_mdmf_checkstring, \
23                                      unpack_sdmf_checkstring, \
24                                      MDMFSlotWriteProxy, \
25                                      SDMFSlotWriteProxy, MDMFCHECKSTRING
26
27 KiB = 1024
28 DEFAULT_MAX_SEGMENT_SIZE = 128 * KiB
29 PUSHING_BLOCKS_STATE = 0
30 PUSHING_EVERYTHING_ELSE_STATE = 1
31 DONE_STATE = 2
32
33 class PublishStatus:
34     implements(IPublishStatus)
35     statusid_counter = count(0)
36     def __init__(self):
37         self.timings = {}
38         self.timings["send_per_server"] = {}
39         self.timings["encrypt"] = 0.0
40         self.timings["encode"] = 0.0
41         self.servermap = None
42         self.problems = {}
43         self.active = True
44         self.storage_index = None
45         self.helper = False
46         self.encoding = ("?", "?")
47         self.size = None
48         self.status = "Not started"
49         self.progress = 0.0
50         self.counter = self.statusid_counter.next()
51         self.started = time.time()
52
53     def add_per_server_time(self, peerid, elapsed):
54         if peerid not in self.timings["send_per_server"]:
55             self.timings["send_per_server"][peerid] = []
56         self.timings["send_per_server"][peerid].append(elapsed)
57     def accumulate_encode_time(self, elapsed):
58         self.timings["encode"] += elapsed
59     def accumulate_encrypt_time(self, elapsed):
60         self.timings["encrypt"] += elapsed
61
62     def get_started(self):
63         return self.started
64     def get_storage_index(self):
65         return self.storage_index
66     def get_encoding(self):
67         return self.encoding
68     def using_helper(self):
69         return self.helper
70     def get_servermap(self):
71         return self.servermap
72     def get_size(self):
73         return self.size
74     def get_status(self):
75         return self.status
76     def get_progress(self):
77         return self.progress
78     def get_active(self):
79         return self.active
80     def get_counter(self):
81         return self.counter
82
83     def set_storage_index(self, si):
84         self.storage_index = si
85     def set_helper(self, helper):
86         self.helper = helper
87     def set_servermap(self, servermap):
88         self.servermap = servermap
89     def set_encoding(self, k, n):
90         self.encoding = (k, n)
91     def set_size(self, size):
92         self.size = size
93     def set_status(self, status):
94         self.status = status
95     def set_progress(self, value):
96         self.progress = value
97     def set_active(self, value):
98         self.active = value
99
100 class LoopLimitExceededError(Exception):
101     pass
102
103 class Publish:
104     """I represent a single act of publishing the mutable file to the grid. I
105     will only publish my data if the servermap I am using still represents
106     the current state of the world.
107
108     To make the initial publish, set servermap to None.
109     """
110
111     def __init__(self, filenode, storage_broker, servermap):
112         self._node = filenode
113         self._storage_broker = storage_broker
114         self._servermap = servermap
115         self._storage_index = self._node.get_storage_index()
116         self._log_prefix = prefix = si_b2a(self._storage_index)[:5]
117         num = self.log("Publish(%s): starting" % prefix, parent=None)
118         self._log_number = num
119         self._running = True
120         self._first_write_error = None
121         self._last_failure = None
122
123         self._status = PublishStatus()
124         self._status.set_storage_index(self._storage_index)
125         self._status.set_helper(False)
126         self._status.set_progress(0.0)
127         self._status.set_active(True)
128         self._version = self._node.get_version()
129         assert self._version in (SDMF_VERSION, MDMF_VERSION)
130
131
132     def get_status(self):
133         return self._status
134
135     def log(self, *args, **kwargs):
136         if 'parent' not in kwargs:
137             kwargs['parent'] = self._log_number
138         if "facility" not in kwargs:
139             kwargs["facility"] = "tahoe.mutable.publish"
140         return log.msg(*args, **kwargs)
141
142
143     def update(self, data, offset, blockhashes, version):
144         """
145         I replace the contents of this file with the contents of data,
146         starting at offset. I return a Deferred that fires with None
147         when the replacement has been completed, or with an error if
148         something went wrong during the process.
149
150         Note that this process will not upload new shares. If the file
151         being updated is in need of repair, callers will have to repair
152         it on their own.
153         """
154         # How this works:
155         # 1: Make peer assignments. We'll assign each share that we know
156         # about on the grid to that peer that currently holds that
157         # share, and will not place any new shares.
158         # 2: Setup encoding parameters. Most of these will stay the same
159         # -- datalength will change, as will some of the offsets.
160         # 3. Upload the new segments.
161         # 4. Be done.
162         assert IMutableUploadable.providedBy(data)
163
164         self.data = data
165
166         # XXX: Use the MutableFileVersion instead.
167         self.datalength = self._node.get_size()
168         if data.get_size() > self.datalength:
169             self.datalength = data.get_size()
170
171         self.log("starting update")
172         self.log("adding new data of length %d at offset %d" % \
173                     (data.get_size(), offset))
174         self.log("new data length is %d" % self.datalength)
175         self._status.set_size(self.datalength)
176         self._status.set_status("Started")
177         self._started = time.time()
178
179         self.done_deferred = defer.Deferred()
180
181         self._writekey = self._node.get_writekey()
182         assert self._writekey, "need write capability to publish"
183
184         # first, which servers will we publish to? We require that the
185         # servermap was updated in MODE_WRITE, so we can depend upon the
186         # peerlist computed by that process instead of computing our own.
187         assert self._servermap
188         assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
189         # we will push a version that is one larger than anything present
190         # in the grid, according to the servermap.
191         self._new_seqnum = self._servermap.highest_seqnum() + 1
192         self._status.set_servermap(self._servermap)
193
194         self.log(format="new seqnum will be %(seqnum)d",
195                  seqnum=self._new_seqnum, level=log.NOISY)
196
197         # We're updating an existing file, so all of the following
198         # should be available.
199         self.readkey = self._node.get_readkey()
200         self.required_shares = self._node.get_required_shares()
201         assert self.required_shares is not None
202         self.total_shares = self._node.get_total_shares()
203         assert self.total_shares is not None
204         self._status.set_encoding(self.required_shares, self.total_shares)
205
206         self._pubkey = self._node.get_pubkey()
207         assert self._pubkey
208         self._privkey = self._node.get_privkey()
209         assert self._privkey
210         self._encprivkey = self._node.get_encprivkey()
211
212         sb = self._storage_broker
213         full_peerlist = [(s.get_serverid(), s.get_rref())
214                          for s in sb.get_servers_for_psi(self._storage_index)]
215         self.full_peerlist = full_peerlist # for use later, immutable
216         self.bad_peers = set() # peerids who have errbacked/refused requests
217
218         # This will set self.segment_size, self.num_segments, and
219         # self.fec. TODO: Does it know how to do the offset? Probably
220         # not. So do that part next.
221         self.setup_encoding_parameters(offset=offset)
222
223         # if we experience any surprises (writes which were rejected because
224         # our test vector did not match, or shares which we didn't expect to
225         # see), we set this flag and report an UncoordinatedWriteError at the
226         # end of the publish process.
227         self.surprised = False
228
229         # we keep track of three tables. The first is our goal: which share
230         # we want to see on which servers. This is initially populated by the
231         # existing servermap.
232         self.goal = set() # pairs of (peerid, shnum) tuples
233
234         # the number of outstanding queries: those that are in flight and
235         # may or may not be delivered, accepted, or acknowledged. This is
236         # incremented when a query is sent, and decremented when the response
237         # returns or errbacks.
238         self.num_outstanding = 0
239
240         # the third is a table of successes: share which have actually been
241         # placed. These are populated when responses come back with success.
242         # When self.placed == self.goal, we're done.
243         self.placed = set() # (peerid, shnum) tuples
244
245         # we also keep a mapping from peerid to RemoteReference. Each time we
246         # pull a connection out of the full peerlist, we add it to this for
247         # use later.
248         self.connections = {}
249
250         self.bad_share_checkstrings = {}
251
252         # This is set at the last step of the publishing process.
253         self.versioninfo = ""
254
255         # we use the servermap to populate the initial goal: this way we will
256         # try to update each existing share in place. Since we're
257         # updating, we ignore damaged and missing shares -- callers must
258         # do a repair to repair and recreate these.
259         for (peerid, shnum) in self._servermap.servermap:
260             self.goal.add( (peerid, shnum) )
261             self.connections[peerid] = self._servermap.connections[peerid]
262         self.writers = {}
263
264         # SDMF files are updated differently.
265         self._version = MDMF_VERSION
266         writer_class = MDMFSlotWriteProxy
267
268         # For each (peerid, shnum) in self.goal, we make a
269         # write proxy for that peer. We'll use this to write
270         # shares to the peer.
271         for key in self.goal:
272             peerid, shnum = key
273             write_enabler = self._node.get_write_enabler(peerid)
274             renew_secret = self._node.get_renewal_secret(peerid)
275             cancel_secret = self._node.get_cancel_secret(peerid)
276             secrets = (write_enabler, renew_secret, cancel_secret)
277
278             self.writers[shnum] =  writer_class(shnum,
279                                                 self.connections[peerid],
280                                                 self._storage_index,
281                                                 secrets,
282                                                 self._new_seqnum,
283                                                 self.required_shares,
284                                                 self.total_shares,
285                                                 self.segment_size,
286                                                 self.datalength)
287             self.writers[shnum].peerid = peerid
288             assert (peerid, shnum) in self._servermap.servermap
289             old_versionid, old_timestamp = self._servermap.servermap[key]
290             (old_seqnum, old_root_hash, old_salt, old_segsize,
291              old_datalength, old_k, old_N, old_prefix,
292              old_offsets_tuple) = old_versionid
293             self.writers[shnum].set_checkstring(old_seqnum,
294                                                 old_root_hash,
295                                                 old_salt)
296
297         # Our remote shares will not have a complete checkstring until
298         # after we are done writing share data and have started to write
299         # blocks. In the meantime, we need to know what to look for when
300         # writing, so that we can detect UncoordinatedWriteErrors.
301         self._checkstring = self.writers.values()[0].get_checkstring()
302
303         # Now, we start pushing shares.
304         self._status.timings["setup"] = time.time() - self._started
305         # First, we encrypt, encode, and publish the shares that we need
306         # to encrypt, encode, and publish.
307
308         # Our update process fetched these for us. We need to update
309         # them in place as publishing happens.
310         self.blockhashes = {} # (shnum, [blochashes])
311         for (i, bht) in blockhashes.iteritems():
312             # We need to extract the leaves from our old hash tree.
313             old_segcount = mathutil.div_ceil(version[4],
314                                              version[3])
315             h = hashtree.IncompleteHashTree(old_segcount)
316             bht = dict(enumerate(bht))
317             h.set_hashes(bht)
318             leaves = h[h.get_leaf_index(0):]
319             for j in xrange(self.num_segments - len(leaves)):
320                 leaves.append(None)
321
322             assert len(leaves) >= self.num_segments
323             self.blockhashes[i] = leaves
324             # This list will now be the leaves that were set during the
325             # initial upload + enough empty hashes to make it a
326             # power-of-two. If we exceed a power of two boundary, we
327             # should be encoding the file over again, and should not be
328             # here. So, we have
329             #assert len(self.blockhashes[i]) == \
330             #    hashtree.roundup_pow2(self.num_segments), \
331             #        len(self.blockhashes[i])
332             # XXX: Except this doesn't work. Figure out why.
333
334         # These are filled in later, after we've modified the block hash
335         # tree suitably.
336         self.sharehash_leaves = None # eventually [sharehashes]
337         self.sharehashes = {} # shnum -> [sharehash leaves necessary to 
338                               # validate the share]
339
340         self.log("Starting push")
341
342         self._state = PUSHING_BLOCKS_STATE
343         self._push()
344
345         return self.done_deferred
346
347
348     def publish(self, newdata):
349         """Publish the filenode's current contents.  Returns a Deferred that
350         fires (with None) when the publish has done as much work as it's ever
351         going to do, or errbacks with ConsistencyError if it detects a
352         simultaneous write.
353         """
354
355         # 0. Setup encoding parameters, encoder, and other such things.
356         # 1. Encrypt, encode, and publish segments.
357         assert IMutableUploadable.providedBy(newdata)
358
359         self.data = newdata
360         self.datalength = newdata.get_size()
361         #if self.datalength >= DEFAULT_MAX_SEGMENT_SIZE:
362         #    self._version = MDMF_VERSION
363         #else:
364         #    self._version = SDMF_VERSION
365
366         self.log("starting publish, datalen is %s" % self.datalength)
367         self._status.set_size(self.datalength)
368         self._status.set_status("Started")
369         self._started = time.time()
370
371         self.done_deferred = defer.Deferred()
372
373         self._writekey = self._node.get_writekey()
374         assert self._writekey, "need write capability to publish"
375
376         # first, which servers will we publish to? We require that the
377         # servermap was updated in MODE_WRITE, so we can depend upon the
378         # peerlist computed by that process instead of computing our own.
379         if self._servermap:
380             assert self._servermap.last_update_mode in (MODE_WRITE, MODE_CHECK)
381             # we will push a version that is one larger than anything present
382             # in the grid, according to the servermap.
383             self._new_seqnum = self._servermap.highest_seqnum() + 1
384         else:
385             # If we don't have a servermap, that's because we're doing the
386             # initial publish
387             self._new_seqnum = 1
388             self._servermap = ServerMap()
389         self._status.set_servermap(self._servermap)
390
391         self.log(format="new seqnum will be %(seqnum)d",
392                  seqnum=self._new_seqnum, level=log.NOISY)
393
394         # having an up-to-date servermap (or using a filenode that was just
395         # created for the first time) also guarantees that the following
396         # fields are available
397         self.readkey = self._node.get_readkey()
398         self.required_shares = self._node.get_required_shares()
399         assert self.required_shares is not None
400         self.total_shares = self._node.get_total_shares()
401         assert self.total_shares is not None
402         self._status.set_encoding(self.required_shares, self.total_shares)
403
404         self._pubkey = self._node.get_pubkey()
405         assert self._pubkey
406         self._privkey = self._node.get_privkey()
407         assert self._privkey
408         self._encprivkey = self._node.get_encprivkey()
409
410         sb = self._storage_broker
411         full_peerlist = [(s.get_serverid(), s.get_rref())
412                          for s in sb.get_servers_for_psi(self._storage_index)]
413         self.full_peerlist = full_peerlist # for use later, immutable
414         self.bad_peers = set() # peerids who have errbacked/refused requests
415
416         # This will set self.segment_size, self.num_segments, and
417         # self.fec.
418         self.setup_encoding_parameters()
419
420         # if we experience any surprises (writes which were rejected because
421         # our test vector did not match, or shares which we didn't expect to
422         # see), we set this flag and report an UncoordinatedWriteError at the
423         # end of the publish process.
424         self.surprised = False
425
426         # we keep track of three tables. The first is our goal: which share
427         # we want to see on which servers. This is initially populated by the
428         # existing servermap.
429         self.goal = set() # pairs of (peerid, shnum) tuples
430
431         # the number of outstanding queries: those that are in flight and
432         # may or may not be delivered, accepted, or acknowledged. This is
433         # incremented when a query is sent, and decremented when the response
434         # returns or errbacks.
435         self.num_outstanding = 0
436
437         # the third is a table of successes: share which have actually been
438         # placed. These are populated when responses come back with success.
439         # When self.placed == self.goal, we're done.
440         self.placed = set() # (peerid, shnum) tuples
441
442         # we also keep a mapping from peerid to RemoteReference. Each time we
443         # pull a connection out of the full peerlist, we add it to this for
444         # use later.
445         self.connections = {}
446
447         self.bad_share_checkstrings = {}
448
449         # This is set at the last step of the publishing process.
450         self.versioninfo = ""
451
452         # we use the servermap to populate the initial goal: this way we will
453         # try to update each existing share in place.
454         for (peerid, shnum) in self._servermap.servermap:
455             self.goal.add( (peerid, shnum) )
456             self.connections[peerid] = self._servermap.connections[peerid]
457         # then we add in all the shares that were bad (corrupted, bad
458         # signatures, etc). We want to replace these.
459         for key, old_checkstring in self._servermap.bad_shares.items():
460             (peerid, shnum) = key
461             self.goal.add(key)
462             self.bad_share_checkstrings[key] = old_checkstring
463             self.connections[peerid] = self._servermap.connections[peerid]
464
465         # TODO: Make this part do peer selection.
466         self.update_goal()
467         self.writers = {}
468         if self._version == MDMF_VERSION:
469             writer_class = MDMFSlotWriteProxy
470         else:
471             writer_class = SDMFSlotWriteProxy
472
473         # For each (peerid, shnum) in self.goal, we make a
474         # write proxy for that peer. We'll use this to write
475         # shares to the peer.
476         for key in self.goal:
477             peerid, shnum = key
478             write_enabler = self._node.get_write_enabler(peerid)
479             renew_secret = self._node.get_renewal_secret(peerid)
480             cancel_secret = self._node.get_cancel_secret(peerid)
481             secrets = (write_enabler, renew_secret, cancel_secret)
482
483             self.writers[shnum] =  writer_class(shnum,
484                                                 self.connections[peerid],
485                                                 self._storage_index,
486                                                 secrets,
487                                                 self._new_seqnum,
488                                                 self.required_shares,
489                                                 self.total_shares,
490                                                 self.segment_size,
491                                                 self.datalength)
492             self.writers[shnum].peerid = peerid
493             if (peerid, shnum) in self._servermap.servermap:
494                 old_versionid, old_timestamp = self._servermap.servermap[key]
495                 (old_seqnum, old_root_hash, old_salt, old_segsize,
496                  old_datalength, old_k, old_N, old_prefix,
497                  old_offsets_tuple) = old_versionid
498                 self.writers[shnum].set_checkstring(old_seqnum,
499                                                     old_root_hash,
500                                                     old_salt)
501             elif (peerid, shnum) in self.bad_share_checkstrings:
502                 old_checkstring = self.bad_share_checkstrings[(peerid, shnum)]
503                 self.writers[shnum].set_checkstring(old_checkstring)
504
505         # Our remote shares will not have a complete checkstring until
506         # after we are done writing share data and have started to write
507         # blocks. In the meantime, we need to know what to look for when
508         # writing, so that we can detect UncoordinatedWriteErrors.
509         self._checkstring = self.writers.values()[0].get_checkstring()
510
511         # Now, we start pushing shares.
512         self._status.timings["setup"] = time.time() - self._started
513         # First, we encrypt, encode, and publish the shares that we need
514         # to encrypt, encode, and publish.
515
516         # This will eventually hold the block hash chain for each share
517         # that we publish. We define it this way so that empty publishes
518         # will still have something to write to the remote slot.
519         self.blockhashes = dict([(i, []) for i in xrange(self.total_shares)])
520         for i in xrange(self.total_shares):
521             blocks = self.blockhashes[i]
522             for j in xrange(self.num_segments):
523                 blocks.append(None)
524         self.sharehash_leaves = None # eventually [sharehashes]
525         self.sharehashes = {} # shnum -> [sharehash leaves necessary to 
526                               # validate the share]
527
528         self.log("Starting push")
529
530         self._state = PUSHING_BLOCKS_STATE
531         self._push()
532
533         return self.done_deferred
534
535
536     def _update_status(self):
537         self._status.set_status("Sending Shares: %d placed out of %d, "
538                                 "%d messages outstanding" %
539                                 (len(self.placed),
540                                  len(self.goal),
541                                  self.num_outstanding))
542         self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
543
544
545     def setup_encoding_parameters(self, offset=0):
546         if self._version == MDMF_VERSION:
547             segment_size = DEFAULT_MAX_SEGMENT_SIZE # 128 KiB by default
548         else:
549             segment_size = self.datalength # SDMF is only one segment
550         # this must be a multiple of self.required_shares
551         segment_size = mathutil.next_multiple(segment_size,
552                                               self.required_shares)
553         self.segment_size = segment_size
554
555         # Calculate the starting segment for the upload.
556         if segment_size:
557             # We use div_ceil instead of integer division here because
558             # it is semantically correct.
559             # If datalength isn't an even multiple of segment_size, but
560             # is larger than segment_size, datalength // segment_size
561             # will be the largest number such that num <= datalength and
562             # num % segment_size == 0. But that's not what we want,
563             # because it ignores the extra data. div_ceil will give us
564             # the right number of segments for the data that we're
565             # given.
566             self.num_segments = mathutil.div_ceil(self.datalength,
567                                                   segment_size)
568
569             self.starting_segment = offset // segment_size
570
571         else:
572             self.num_segments = 0
573             self.starting_segment = 0
574
575
576         self.log("building encoding parameters for file")
577         self.log("got segsize %d" % self.segment_size)
578         self.log("got %d segments" % self.num_segments)
579
580         if self._version == SDMF_VERSION:
581             assert self.num_segments in (0, 1) # SDMF
582         # calculate the tail segment size.
583
584         if segment_size and self.datalength:
585             self.tail_segment_size = self.datalength % segment_size
586             self.log("got tail segment size %d" % self.tail_segment_size)
587         else:
588             self.tail_segment_size = 0
589
590         if self.tail_segment_size == 0 and segment_size:
591             # The tail segment is the same size as the other segments.
592             self.tail_segment_size = segment_size
593
594         # Make FEC encoders
595         fec = codec.CRSEncoder()
596         fec.set_params(self.segment_size,
597                        self.required_shares, self.total_shares)
598         self.piece_size = fec.get_block_size()
599         self.fec = fec
600
601         if self.tail_segment_size == self.segment_size:
602             self.tail_fec = self.fec
603         else:
604             tail_fec = codec.CRSEncoder()
605             tail_fec.set_params(self.tail_segment_size,
606                                 self.required_shares,
607                                 self.total_shares)
608             self.tail_fec = tail_fec
609
610         self._current_segment = self.starting_segment
611         self.end_segment = self.num_segments - 1
612         # Now figure out where the last segment should be.
613         if self.data.get_size() != self.datalength:
614             # We're updating a few segments in the middle of a mutable
615             # file, so we don't want to republish the whole thing.
616             # (we don't have enough data to do that even if we wanted
617             # to)
618             end = self.data.get_size()
619             self.end_segment = end // segment_size
620             if end % segment_size == 0:
621                 self.end_segment -= 1
622
623         self.log("got start segment %d" % self.starting_segment)
624         self.log("got end segment %d" % self.end_segment)
625
626
627     def _push(self, ignored=None):
628         """
629         I manage state transitions. In particular, I see that we still
630         have a good enough number of writers to complete the upload
631         successfully.
632         """
633         # Can we still successfully publish this file?
634         # TODO: Keep track of outstanding queries before aborting the
635         #       process.
636         if len(self.writers) < self.required_shares or self.surprised:
637             return self._failure()
638
639         # Figure out what we need to do next. Each of these needs to
640         # return a deferred so that we don't block execution when this
641         # is first called in the upload method.
642         if self._state == PUSHING_BLOCKS_STATE:
643             return self.push_segment(self._current_segment)
644
645         elif self._state == PUSHING_EVERYTHING_ELSE_STATE:
646             return self.push_everything_else()
647
648         # If we make it to this point, we were successful in placing the
649         # file.
650         return self._done()
651
652
653     def push_segment(self, segnum):
654         if self.num_segments == 0 and self._version == SDMF_VERSION:
655             self._add_dummy_salts()
656
657         if segnum > self.end_segment:
658             # We don't have any more segments to push.
659             self._state = PUSHING_EVERYTHING_ELSE_STATE
660             return self._push()
661
662         d = self._encode_segment(segnum)
663         d.addCallback(self._push_segment, segnum)
664         def _increment_segnum(ign):
665             self._current_segment += 1
666         # XXX: I don't think we need to do addBoth here -- any errBacks
667         # should be handled within push_segment.
668         d.addCallback(_increment_segnum)
669         d.addCallback(self._turn_barrier)
670         d.addCallback(self._push)
671         d.addErrback(self._failure)
672
673
674     def _turn_barrier(self, result):
675         """
676         I help the publish process avoid the recursion limit issues
677         described in #237.
678         """
679         return fireEventually(result)
680
681
682     def _add_dummy_salts(self):
683         """
684         SDMF files need a salt even if they're empty, or the signature
685         won't make sense. This method adds a dummy salt to each of our
686         SDMF writers so that they can write the signature later.
687         """
688         salt = os.urandom(16)
689         assert self._version == SDMF_VERSION
690
691         for writer in self.writers.itervalues():
692             writer.put_salt(salt)
693
694
695     def _encode_segment(self, segnum):
696         """
697         I encrypt and encode the segment segnum.
698         """
699         started = time.time()
700
701         if segnum + 1 == self.num_segments:
702             segsize = self.tail_segment_size
703         else:
704             segsize = self.segment_size
705
706
707         self.log("Pushing segment %d of %d" % (segnum + 1, self.num_segments))
708         data = self.data.read(segsize)
709         # XXX: This is dumb. Why return a list?
710         data = "".join(data)
711
712         assert len(data) == segsize, len(data)
713
714         salt = os.urandom(16)
715
716         key = hashutil.ssk_readkey_data_hash(salt, self.readkey)
717         self._status.set_status("Encrypting")
718         enc = AES(key)
719         crypttext = enc.process(data)
720         assert len(crypttext) == len(data)
721
722         now = time.time()
723         self._status.accumulate_encrypt_time(now - started)
724         started = now
725
726         # now apply FEC
727         if segnum + 1 == self.num_segments:
728             fec = self.tail_fec
729         else:
730             fec = self.fec
731
732         self._status.set_status("Encoding")
733         crypttext_pieces = [None] * self.required_shares
734         piece_size = fec.get_block_size()
735         for i in range(len(crypttext_pieces)):
736             offset = i * piece_size
737             piece = crypttext[offset:offset+piece_size]
738             piece = piece + "\x00"*(piece_size - len(piece)) # padding
739             crypttext_pieces[i] = piece
740             assert len(piece) == piece_size
741         d = fec.encode(crypttext_pieces)
742         def _done_encoding(res):
743             elapsed = time.time() - started
744             self._status.accumulate_encode_time(elapsed)
745             return (res, salt)
746         d.addCallback(_done_encoding)
747         return d
748
749
750     def _push_segment(self, encoded_and_salt, segnum):
751         """
752         I push (data, salt) as segment number segnum.
753         """
754         results, salt = encoded_and_salt
755         shares, shareids = results
756         self._status.set_status("Pushing segment")
757         for i in xrange(len(shares)):
758             sharedata = shares[i]
759             shareid = shareids[i]
760             if self._version == MDMF_VERSION:
761                 hashed = salt + sharedata
762             else:
763                 hashed = sharedata
764             block_hash = hashutil.block_hash(hashed)
765             self.blockhashes[shareid][segnum] = block_hash
766             # find the writer for this share
767             writer = self.writers[shareid]
768             writer.put_block(sharedata, segnum, salt)
769
770
771     def push_everything_else(self):
772         """
773         I put everything else associated with a share.
774         """
775         self._pack_started = time.time()
776         self.push_encprivkey()
777         self.push_blockhashes()
778         self.push_sharehashes()
779         self.push_toplevel_hashes_and_signature()
780         d = self.finish_publishing()
781         def _change_state(ignored):
782             self._state = DONE_STATE
783         d.addCallback(_change_state)
784         d.addCallback(self._push)
785         return d
786
787
788     def push_encprivkey(self):
789         encprivkey = self._encprivkey
790         self._status.set_status("Pushing encrypted private key")
791         for writer in self.writers.itervalues():
792             writer.put_encprivkey(encprivkey)
793
794
795     def push_blockhashes(self):
796         self.sharehash_leaves = [None] * len(self.blockhashes)
797         self._status.set_status("Building and pushing block hash tree")
798         for shnum, blockhashes in self.blockhashes.iteritems():
799             t = hashtree.HashTree(blockhashes)
800             self.blockhashes[shnum] = list(t)
801             # set the leaf for future use.
802             self.sharehash_leaves[shnum] = t[0]
803
804             writer = self.writers[shnum]
805             writer.put_blockhashes(self.blockhashes[shnum])
806
807
808     def push_sharehashes(self):
809         self._status.set_status("Building and pushing share hash chain")
810         share_hash_tree = hashtree.HashTree(self.sharehash_leaves)
811         for shnum in xrange(len(self.sharehash_leaves)):
812             needed_indices = share_hash_tree.needed_hashes(shnum)
813             self.sharehashes[shnum] = dict( [ (i, share_hash_tree[i])
814                                              for i in needed_indices] )
815             writer = self.writers[shnum]
816             writer.put_sharehashes(self.sharehashes[shnum])
817         self.root_hash = share_hash_tree[0]
818
819
820     def push_toplevel_hashes_and_signature(self):
821         # We need to to three things here:
822         #   - Push the root hash and salt hash
823         #   - Get the checkstring of the resulting layout; sign that.
824         #   - Push the signature
825         self._status.set_status("Pushing root hashes and signature")
826         for shnum in xrange(self.total_shares):
827             writer = self.writers[shnum]
828             writer.put_root_hash(self.root_hash)
829         self._update_checkstring()
830         self._make_and_place_signature()
831
832
833     def _update_checkstring(self):
834         """
835         After putting the root hash, MDMF files will have the
836         checkstring written to the storage server. This means that we
837         can update our copy of the checkstring so we can detect
838         uncoordinated writes. SDMF files will have the same checkstring,
839         so we need not do anything.
840         """
841         self._checkstring = self.writers.values()[0].get_checkstring()
842
843
844     def _make_and_place_signature(self):
845         """
846         I create and place the signature.
847         """
848         started = time.time()
849         self._status.set_status("Signing prefix")
850         signable = self.writers[0].get_signable()
851         self.signature = self._privkey.sign(signable)
852
853         for (shnum, writer) in self.writers.iteritems():
854             writer.put_signature(self.signature)
855         self._status.timings['sign'] = time.time() - started
856
857
858     def finish_publishing(self):
859         # We're almost done -- we just need to put the verification key
860         # and the offsets
861         started = time.time()
862         self._status.set_status("Pushing shares")
863         self._started_pushing = started
864         ds = []
865         verification_key = self._pubkey.serialize()
866
867         for (shnum, writer) in self.writers.copy().iteritems():
868             writer.put_verification_key(verification_key)
869             self.num_outstanding += 1
870             def _no_longer_outstanding(res):
871                 self.num_outstanding -= 1
872                 return res
873
874             d = writer.finish_publishing()
875             d.addBoth(_no_longer_outstanding)
876             d.addErrback(self._connection_problem, writer)
877             d.addCallback(self._got_write_answer, writer, started)
878             ds.append(d)
879         self._record_verinfo()
880         self._status.timings['pack'] = time.time() - started
881         return defer.DeferredList(ds)
882
883
884     def _record_verinfo(self):
885         self.versioninfo = self.writers.values()[0].get_verinfo()
886
887
888     def _connection_problem(self, f, writer):
889         """
890         We ran into a connection problem while working with writer, and
891         need to deal with that.
892         """
893         self.log("found problem: %s" % str(f))
894         self._last_failure = f
895         del(self.writers[writer.shnum])
896
897
898     def log_goal(self, goal, message=""):
899         logmsg = [message]
900         for (shnum, peerid) in sorted([(s,p) for (p,s) in goal]):
901             logmsg.append("sh%d to [%s]" % (shnum,
902                                             idlib.shortnodeid_b2a(peerid)))
903         self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
904         self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
905                  level=log.NOISY)
906
907     def update_goal(self):
908         # if log.recording_noisy
909         if True:
910             self.log_goal(self.goal, "before update: ")
911
912         # first, remove any bad peers from our goal
913         self.goal = set([ (peerid, shnum)
914                           for (peerid, shnum) in self.goal
915                           if peerid not in self.bad_peers ])
916
917         # find the homeless shares:
918         homefull_shares = set([shnum for (peerid, shnum) in self.goal])
919         homeless_shares = set(range(self.total_shares)) - homefull_shares
920         homeless_shares = sorted(list(homeless_shares))
921         # place them somewhere. We prefer unused servers at the beginning of
922         # the available peer list.
923
924         if not homeless_shares:
925             return
926
927         # if an old share X is on a node, put the new share X there too.
928         # TODO: 1: redistribute shares to achieve one-per-peer, by copying
929         #       shares from existing peers to new (less-crowded) ones. The
930         #       old shares must still be updated.
931         # TODO: 2: move those shares instead of copying them, to reduce future
932         #       update work
933
934         # this is a bit CPU intensive but easy to analyze. We create a sort
935         # order for each peerid. If the peerid is marked as bad, we don't
936         # even put them in the list. Then we care about the number of shares
937         # which have already been assigned to them. After that we care about
938         # their permutation order.
939         old_assignments = DictOfSets()
940         for (peerid, shnum) in self.goal:
941             old_assignments.add(peerid, shnum)
942
943         peerlist = []
944         for i, (peerid, ss) in enumerate(self.full_peerlist):
945             if peerid in self.bad_peers:
946                 continue
947             entry = (len(old_assignments.get(peerid, [])), i, peerid, ss)
948             peerlist.append(entry)
949         peerlist.sort()
950
951         if not peerlist:
952             raise NotEnoughServersError("Ran out of non-bad servers, "
953                                         "first_error=%s" %
954                                         str(self._first_write_error),
955                                         self._first_write_error)
956
957         # we then index this peerlist with an integer, because we may have to
958         # wrap. We update the goal as we go.
959         i = 0
960         for shnum in homeless_shares:
961             (ignored1, ignored2, peerid, ss) = peerlist[i]
962             # if we are forced to send a share to a server that already has
963             # one, we may have two write requests in flight, and the
964             # servermap (which was computed before either request was sent)
965             # won't reflect the new shares, so the second response will be
966             # surprising. There is code in _got_write_answer() to tolerate
967             # this, otherwise it would cause the publish to fail with an
968             # UncoordinatedWriteError. See #546 for details of the trouble
969             # this used to cause.
970             self.goal.add( (peerid, shnum) )
971             self.connections[peerid] = ss
972             i += 1
973             if i >= len(peerlist):
974                 i = 0
975         if True:
976             self.log_goal(self.goal, "after update: ")
977
978
979     def _got_write_answer(self, answer, writer, started):
980         if not answer:
981             # SDMF writers only pretend to write when readers set their
982             # blocks, salts, and so on -- they actually just write once,
983             # at the end of the upload process. In fake writes, they
984             # return defer.succeed(None). If we see that, we shouldn't
985             # bother checking it.
986             return
987
988         peerid = writer.peerid
989         lp = self.log("_got_write_answer from %s, share %d" %
990                       (idlib.shortnodeid_b2a(peerid), writer.shnum))
991
992         now = time.time()
993         elapsed = now - started
994
995         self._status.add_per_server_time(peerid, elapsed)
996
997         wrote, read_data = answer
998
999         surprise_shares = set(read_data.keys()) - set([writer.shnum])
1000
1001         # We need to remove from surprise_shares any shares that we are
1002         # knowingly also writing to that peer from other writers.
1003
1004         # TODO: Precompute this.
1005         known_shnums = [x.shnum for x in self.writers.values()
1006                         if x.peerid == peerid]
1007         surprise_shares -= set(known_shnums)
1008         self.log("found the following surprise shares: %s" %
1009                  str(surprise_shares))
1010
1011         # Now surprise shares contains all of the shares that we did not
1012         # expect to be there.
1013
1014         surprised = False
1015         for shnum in surprise_shares:
1016             # read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX)
1017             checkstring = read_data[shnum][0]
1018             # What we want to do here is to see if their (seqnum,
1019             # roothash, salt) is the same as our (seqnum, roothash,
1020             # salt), or the equivalent for MDMF. The best way to do this
1021             # is to store a packed representation of our checkstring
1022             # somewhere, then not bother unpacking the other
1023             # checkstring.
1024             if checkstring == self._checkstring:
1025                 # they have the right share, somehow
1026
1027                 if (peerid,shnum) in self.goal:
1028                     # and we want them to have it, so we probably sent them a
1029                     # copy in an earlier write. This is ok, and avoids the
1030                     # #546 problem.
1031                     continue
1032
1033                 # They aren't in our goal, but they are still for the right
1034                 # version. Somebody else wrote them, and it's a convergent
1035                 # uncoordinated write. Pretend this is ok (don't be
1036                 # surprised), since I suspect there's a decent chance that
1037                 # we'll hit this in normal operation.
1038                 continue
1039
1040             else:
1041                 # the new shares are of a different version
1042                 if peerid in self._servermap.reachable_peers:
1043                     # we asked them about their shares, so we had knowledge
1044                     # of what they used to have. Any surprising shares must
1045                     # have come from someone else, so UCW.
1046                     surprised = True
1047                 else:
1048                     # we didn't ask them, and now we've discovered that they
1049                     # have a share we didn't know about. This indicates that
1050                     # mapupdate should have wokred harder and asked more
1051                     # servers before concluding that it knew about them all.
1052
1053                     # signal UCW, but make sure to ask this peer next time,
1054                     # so we'll remember to update it if/when we retry.
1055                     surprised = True
1056                     # TODO: ask this peer next time. I don't yet have a good
1057                     # way to do this. Two insufficient possibilities are:
1058                     #
1059                     # self._servermap.add_new_share(peerid, shnum, verinfo, now)
1060                     #  but that requires fetching/validating/parsing the whole
1061                     #  version string, and all we have is the checkstring
1062                     # self._servermap.mark_bad_share(peerid, shnum, checkstring)
1063                     #  that will make publish overwrite the share next time,
1064                     #  but it won't re-query the server, and it won't make
1065                     #  mapupdate search further
1066
1067                     # TODO later: when publish starts, do
1068                     # servermap.get_best_version(), extract the seqnum,
1069                     # subtract one, and store as highest-replaceable-seqnum.
1070                     # Then, if this surprise-because-we-didn't-ask share is
1071                     # of highest-replaceable-seqnum or lower, we're allowed
1072                     # to replace it: send out a new writev (or rather add it
1073                     # to self.goal and loop).
1074                     pass
1075
1076                 surprised = True
1077
1078         if surprised:
1079             self.log("they had shares %s that we didn't know about" %
1080                      (list(surprise_shares),),
1081                      parent=lp, level=log.WEIRD, umid="un9CSQ")
1082             self.surprised = True
1083
1084         if not wrote:
1085             # TODO: there are two possibilities. The first is that the server
1086             # is full (or just doesn't want to give us any room), which means
1087             # we shouldn't ask them again, but is *not* an indication of an
1088             # uncoordinated write. The second is that our testv failed, which
1089             # *does* indicate an uncoordinated write. We currently don't have
1090             # a way to tell these two apart (in fact, the storage server code
1091             # doesn't have the option of refusing our share).
1092             #
1093             # If the server is full, mark the peer as bad (so we don't ask
1094             # them again), but don't set self.surprised. The loop() will find
1095             # a new server.
1096             #
1097             # If the testv failed, log it, set self.surprised, but don't
1098             # bother adding to self.bad_peers .
1099
1100             self.log("our testv failed, so the write did not happen",
1101                      parent=lp, level=log.WEIRD, umid="8sc26g")
1102             self.surprised = True
1103             self.bad_peers.add(writer) # don't ask them again
1104             # use the checkstring to add information to the log message
1105             unknown_format = False
1106             for (shnum,readv) in read_data.items():
1107                 checkstring = readv[0]
1108                 version = get_version_from_checkstring(checkstring)
1109                 if version == MDMF_VERSION:
1110                     (other_seqnum,
1111                      other_roothash) = unpack_mdmf_checkstring(checkstring)
1112                 elif version == SDMF_VERSION:
1113                     (other_seqnum,
1114                      other_roothash,
1115                      other_IV) = unpack_sdmf_checkstring(checkstring)
1116                 else:
1117                     unknown_format = True
1118                 expected_version = self._servermap.version_on_peer(peerid,
1119                                                                    shnum)
1120                 if expected_version:
1121                     (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
1122                      offsets_tuple) = expected_version
1123                     msg = ("somebody modified the share on us:"
1124                            " shnum=%d: I thought they had #%d:R=%s," %
1125                            (shnum,
1126                             seqnum, base32.b2a(root_hash)[:4]))
1127                     if unknown_format:
1128                         msg += (" but I don't know how to read share"
1129                                 " format %d" % version)
1130                     else:
1131                         msg += " but testv reported #%d:R=%s" % \
1132                                (other_seqnum, other_roothash)
1133                     self.log(msg, parent=lp, level=log.NOISY)
1134                 # if expected_version==None, then we didn't expect to see a
1135                 # share on that peer, and the 'surprise_shares' clause above
1136                 # will have logged it.
1137             return
1138
1139         # and update the servermap
1140         # self.versioninfo is set during the last phase of publishing.
1141         # If we get there, we know that responses correspond to placed
1142         # shares, and can safely execute these statements.
1143         if self.versioninfo:
1144             self.log("wrote successfully: adding new share to servermap")
1145             self._servermap.add_new_share(peerid, writer.shnum,
1146                                           self.versioninfo, started)
1147             self.placed.add( (peerid, writer.shnum) )
1148         self._update_status()
1149         # the next method in the deferred chain will check to see if
1150         # we're done and successful.
1151         return
1152
1153
1154     def _done(self):
1155         if not self._running:
1156             return
1157         self._running = False
1158         now = time.time()
1159         self._status.timings["total"] = now - self._started
1160
1161         elapsed = now - self._started_pushing
1162         self._status.timings['push'] = elapsed
1163
1164         self._status.set_active(False)
1165         self.log("Publish done, success")
1166         self._status.set_status("Finished")
1167         self._status.set_progress(1.0)
1168         # Get k and segsize, then give them to the caller.
1169         hints = {}
1170         hints['segsize'] = self.segment_size
1171         hints['k'] = self.required_shares
1172         self._node.set_downloader_hints(hints)
1173         eventually(self.done_deferred.callback, None)
1174
1175     def _failure(self, f=None):
1176         if f:
1177             self._last_failure = f
1178
1179         if not self.surprised:
1180             # We ran out of servers
1181             msg = "Publish ran out of good servers"
1182             if self._last_failure:
1183                 msg += ", last failure was: %s" % str(self._last_failure)
1184             self.log(msg)
1185             e = NotEnoughServersError(msg)
1186
1187         else:
1188             # We ran into shares that we didn't recognize, which means
1189             # that we need to return an UncoordinatedWriteError.
1190             self.log("Publish failed with UncoordinatedWriteError")
1191             e = UncoordinatedWriteError()
1192         f = failure.Failure(e)
1193         eventually(self.done_deferred.callback, f)
1194
1195
1196 class MutableFileHandle:
1197     """
1198     I am a mutable uploadable built around a filehandle-like object,
1199     usually either a StringIO instance or a handle to an actual file.
1200     """
1201     implements(IMutableUploadable)
1202
1203     def __init__(self, filehandle):
1204         # The filehandle is defined as a generally file-like object that
1205         # has these two methods. We don't care beyond that.
1206         assert hasattr(filehandle, "read")
1207         assert hasattr(filehandle, "close")
1208
1209         self._filehandle = filehandle
1210         # We must start reading at the beginning of the file, or we risk
1211         # encountering errors when the data read does not match the size
1212         # reported to the uploader.
1213         self._filehandle.seek(0)
1214
1215         # We have not yet read anything, so our position is 0.
1216         self._marker = 0
1217
1218
1219     def get_size(self):
1220         """
1221         I return the amount of data in my filehandle.
1222         """
1223         if not hasattr(self, "_size"):
1224             old_position = self._filehandle.tell()
1225             # Seek to the end of the file by seeking 0 bytes from the
1226             # file's end
1227             self._filehandle.seek(0, 2) # 2 == os.SEEK_END in 2.5+
1228             self._size = self._filehandle.tell()
1229             # Restore the previous position, in case this was called
1230             # after a read.
1231             self._filehandle.seek(old_position)
1232             assert self._filehandle.tell() == old_position
1233
1234         assert hasattr(self, "_size")
1235         return self._size
1236
1237
1238     def pos(self):
1239         """
1240         I return the position of my read marker -- i.e., how much data I
1241         have already read and returned to callers.
1242         """
1243         return self._marker
1244
1245
1246     def read(self, length):
1247         """
1248         I return some data (up to length bytes) from my filehandle.
1249
1250         In most cases, I return length bytes, but sometimes I won't --
1251         for example, if I am asked to read beyond the end of a file, or
1252         an error occurs.
1253         """
1254         results = self._filehandle.read(length)
1255         self._marker += len(results)
1256         return [results]
1257
1258
1259     def close(self):
1260         """
1261         I close the underlying filehandle. Any further operations on the
1262         filehandle fail at this point.
1263         """
1264         self._filehandle.close()
1265
1266
1267 class MutableData(MutableFileHandle):
1268     """
1269     I am a mutable uploadable built around a string, which I then cast
1270     into a StringIO and treat as a filehandle.
1271     """
1272
1273     def __init__(self, s):
1274         # Take a string and return a file-like uploadable.
1275         assert isinstance(s, str)
1276
1277         MutableFileHandle.__init__(self, StringIO(s))
1278
1279
1280 class TransformingUploadable:
1281     """
1282     I am an IMutableUploadable that wraps another IMutableUploadable,
1283     and some segments that are already on the grid. When I am called to
1284     read, I handle merging of boundary segments.
1285     """
1286     implements(IMutableUploadable)
1287
1288
1289     def __init__(self, data, offset, segment_size, start, end):
1290         assert IMutableUploadable.providedBy(data)
1291
1292         self._newdata = data
1293         self._offset = offset
1294         self._segment_size = segment_size
1295         self._start = start
1296         self._end = end
1297
1298         self._read_marker = 0
1299
1300         self._first_segment_offset = offset % segment_size
1301
1302         num = self.log("TransformingUploadable: starting", parent=None)
1303         self._log_number = num
1304         self.log("got fso: %d" % self._first_segment_offset)
1305         self.log("got offset: %d" % self._offset)
1306
1307
1308     def log(self, *args, **kwargs):
1309         if 'parent' not in kwargs:
1310             kwargs['parent'] = self._log_number
1311         if "facility" not in kwargs:
1312             kwargs["facility"] = "tahoe.mutable.transforminguploadable"
1313         return log.msg(*args, **kwargs)
1314
1315
1316     def get_size(self):
1317         return self._offset + self._newdata.get_size()
1318
1319
1320     def read(self, length):
1321         # We can get data from 3 sources here. 
1322         #   1. The first of the segments provided to us.
1323         #   2. The data that we're replacing things with.
1324         #   3. The last of the segments provided to us.
1325
1326         # are we in state 0?
1327         self.log("reading %d bytes" % length)
1328
1329         old_start_data = ""
1330         old_data_length = self._first_segment_offset - self._read_marker
1331         if old_data_length > 0:
1332             if old_data_length > length:
1333                 old_data_length = length
1334             self.log("returning %d bytes of old start data" % old_data_length)
1335
1336             old_data_end = old_data_length + self._read_marker
1337             old_start_data = self._start[self._read_marker:old_data_end]
1338             length -= old_data_length
1339         else:
1340             # otherwise calculations later get screwed up.
1341             old_data_length = 0
1342
1343         # Is there enough new data to satisfy this read? If not, we need
1344         # to pad the end of the data with data from our last segment.
1345         old_end_length = length - \
1346             (self._newdata.get_size() - self._newdata.pos())
1347         old_end_data = ""
1348         if old_end_length > 0:
1349             self.log("reading %d bytes of old end data" % old_end_length)
1350
1351             # TODO: We're not explicitly checking for tail segment size
1352             # here. Is that a problem?
1353             old_data_offset = (length - old_end_length + \
1354                                old_data_length) % self._segment_size
1355             self.log("reading at offset %d" % old_data_offset)
1356             old_end = old_data_offset + old_end_length
1357             old_end_data = self._end[old_data_offset:old_end]
1358             length -= old_end_length
1359             assert length == self._newdata.get_size() - self._newdata.pos()
1360
1361         self.log("reading %d bytes of new data" % length)
1362         new_data = self._newdata.read(length)
1363         new_data = "".join(new_data)
1364
1365         self._read_marker += len(old_start_data + new_data + old_end_data)
1366
1367         return old_start_data + new_data + old_end_data
1368
1369     def close(self):
1370         pass