From: Kevan Carstensen Date: Tue, 2 Aug 2011 02:11:20 +0000 (-0700) Subject: mutable/layout: Define MDMF share format, write tools for working with MDMF share... X-Git-Tag: trac-5200~22 X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/uri/flags/schema.xhtml?a=commitdiff_plain;h=b1b77d3b89cd7a6cc467e17c62d5d53750559a24;p=tahoe-lafs%2Ftahoe-lafs.git mutable/layout: Define MDMF share format, write tools for working with MDMF share format The changes in layout.py are mostly concerned with the MDMF share format. In particular, we define read and write proxy objects used by retrieval, publishing, and other code to write and read the MDMF share format. We create equivalent proxies for SDMF objects so that these objects can be suitably general. --- diff --git a/src/allmydata/mutable/layout.py b/src/allmydata/mutable/layout.py index 9565843f..02069edd 100644 --- a/src/allmydata/mutable/layout.py +++ b/src/allmydata/mutable/layout.py @@ -1,13 +1,79 @@ -import struct +import struct, math from allmydata.mutable.common import NeedMoreDataError, UnknownVersionError +from allmydata.interfaces import HASH_SIZE, SALT_SIZE, SDMF_VERSION, \ + MDMF_VERSION, IMutableSlotWriter +from allmydata.util import mathutil, observer +from twisted.python import failure +from twisted.internet import defer +from zope.interface import implements + + +# These strings describe the format of the packed structs they help process +# Here's what they mean: +# +# PREFIX: +# >: Big-endian byte order; the most significant byte is first (leftmost). +# B: The version information; an 8 bit version identifier. Stored as +# an unsigned char. This is currently 00 00 00 00; our modifications +# will turn it into 00 00 00 01. +# Q: The sequence number; this is sort of like a revision history for +# mutable files; they start at 1 and increase as they are changed after +# being uploaded. Stored as an unsigned long long, which is 8 bytes in +# length. +# 32s: The root hash of the share hash tree. We use sha-256d, so we use 32 +# characters = 32 bytes to store the value. +# 16s: The salt for the readkey. This is a 16-byte random value, stored as +# 16 characters. +# +# SIGNED_PREFIX additions, things that are covered by the signature: +# B: The "k" encoding parameter. We store this as an 8-bit character, +# which is convenient because our erasure coding scheme cannot +# encode if you ask for more than 255 pieces. +# B: The "N" encoding parameter. Stored as an 8-bit character for the +# same reasons as above. +# Q: The segment size of the uploaded file. This will essentially be the +# length of the file in SDMF. An unsigned long long, so we can store +# files of quite large size. +# Q: The data length of the uploaded file. Modulo padding, this will be +# the same of the data length field. Like the data length field, it is +# an unsigned long long and can be quite large. +# +# HEADER additions: +# L: The offset of the signature of this. An unsigned long. +# L: The offset of the share hash chain. An unsigned long. +# L: The offset of the block hash tree. An unsigned long. +# L: The offset of the share data. An unsigned long. +# Q: The offset of the encrypted private key. An unsigned long long, to +# account for the possibility of a lot of share data. +# Q: The offset of the EOF. An unsigned long long, to account for the +# possibility of a lot of share data. +# +# After all of these, we have the following: +# - The verification key: Occupies the space between the end of the header +# and the start of the signature (i.e.: data[HEADER_LENGTH:o['signature']]. +# - The signature, which goes from the signature offset to the share hash +# chain offset. +# - The share hash chain, which goes from the share hash chain offset to +# the block hash tree offset. +# - The share data, which goes from the share data offset to the encrypted +# private key offset. +# - The encrypted private key offset, which goes until the end of the file. +# +# The block hash tree in this encoding has only one share, so the offset of +# the share data will be 32 bits more than the offset of the block hash tree. +# Given this, we may need to check to see how many bytes a reasonably sized +# block hash tree will take up. PREFIX = ">BQ32s16s" # each version has a different prefix SIGNED_PREFIX = ">BQ32s16s BBQQ" # this is covered by the signature SIGNED_PREFIX_LENGTH = struct.calcsize(SIGNED_PREFIX) HEADER = ">BQ32s16s BBQQ LLLLQQ" # includes offsets HEADER_LENGTH = struct.calcsize(HEADER) +OFFSETS = ">LLLLQQ" +OFFSETS_LENGTH = struct.calcsize(OFFSETS) +# These are still used for some tests. def unpack_header(data): o = {} (version, @@ -23,30 +89,6 @@ def unpack_header(data): o['EOF']) = struct.unpack(HEADER, data[:HEADER_LENGTH]) return (version, seqnum, root_hash, IV, k, N, segsize, datalen, o) -def unpack_prefix_and_signature(data): - assert len(data) >= HEADER_LENGTH, len(data) - prefix = data[:SIGNED_PREFIX_LENGTH] - - (version, - seqnum, - root_hash, - IV, - k, N, segsize, datalen, - o) = unpack_header(data) - - if version != 0: - raise UnknownVersionError("got mutable share version %d, but I only understand version 0" % version) - - if len(data) < o['share_hash_chain']: - raise NeedMoreDataError(o['share_hash_chain'], - o['enc_privkey'], o['EOF']-o['enc_privkey']) - - pubkey_s = data[HEADER_LENGTH:o['signature']] - signature = data[o['signature']:o['share_hash_chain']] - - return (seqnum, root_hash, IV, k, N, segsize, datalen, - pubkey_s, signature, prefix) - def unpack_share(data): assert len(data) >= HEADER_LENGTH o = {} @@ -94,45 +136,6 @@ def unpack_share(data): pubkey, signature, share_hash_chain, block_hash_tree, share_data, enc_privkey) -def unpack_share_data(verinfo, hash_and_data): - (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, o_t) = verinfo - - # hash_and_data starts with the share_hash_chain, so figure out what the - # offsets really are - o = dict(o_t) - o_share_hash_chain = 0 - o_block_hash_tree = o['block_hash_tree'] - o['share_hash_chain'] - o_share_data = o['share_data'] - o['share_hash_chain'] - o_enc_privkey = o['enc_privkey'] - o['share_hash_chain'] - - share_hash_chain_s = hash_and_data[o_share_hash_chain:o_block_hash_tree] - share_hash_format = ">H32s" - hsize = struct.calcsize(share_hash_format) - assert len(share_hash_chain_s) % hsize == 0, len(share_hash_chain_s) - share_hash_chain = [] - for i in range(0, len(share_hash_chain_s), hsize): - chunk = share_hash_chain_s[i:i+hsize] - (hid, h) = struct.unpack(share_hash_format, chunk) - share_hash_chain.append( (hid, h) ) - share_hash_chain = dict(share_hash_chain) - block_hash_tree_s = hash_and_data[o_block_hash_tree:o_share_data] - assert len(block_hash_tree_s) % 32 == 0, len(block_hash_tree_s) - block_hash_tree = [] - for i in range(0, len(block_hash_tree_s), 32): - block_hash_tree.append(block_hash_tree_s[i:i+32]) - - share_data = hash_and_data[o_share_data:o_enc_privkey] - - return (share_hash_chain, block_hash_tree, share_data) - - -def pack_checkstring(seqnum, root_hash, IV): - return struct.pack(PREFIX, - 0, # version, - seqnum, - root_hash, - IV) - def unpack_checkstring(checkstring): cs_len = struct.calcsize(PREFIX) version, seqnum, root_hash, IV = struct.unpack(PREFIX, checkstring[:cs_len]) @@ -140,21 +143,6 @@ def unpack_checkstring(checkstring): raise UnknownVersionError("got mutable share version %d, but I only understand version 0" % version) return (seqnum, root_hash, IV) -def pack_prefix(seqnum, root_hash, IV, - required_shares, total_shares, - segment_size, data_length): - prefix = struct.pack(SIGNED_PREFIX, - 0, # version, - seqnum, - root_hash, - IV, - - required_shares, - total_shares, - segment_size, - data_length, - ) - return prefix def pack_offsets(verification_key_length, signature_length, share_hash_chain_length, block_hash_tree_length, @@ -201,3 +189,1581 @@ def pack_share(prefix, verification_key, signature, encprivkey]) return final_share +def pack_prefix(seqnum, root_hash, IV, + required_shares, total_shares, + segment_size, data_length): + prefix = struct.pack(SIGNED_PREFIX, + 0, # version, + seqnum, + root_hash, + IV, + required_shares, + total_shares, + segment_size, + data_length, + ) + return prefix + + +class SDMFSlotWriteProxy: + implements(IMutableSlotWriter) + """ + I represent a remote write slot for an SDMF mutable file. I build a + share in memory, and then write it in one piece to the remote + server. This mimics how SDMF shares were built before MDMF (and the + new MDMF uploader), but provides that functionality in a way that + allows the MDMF uploader to be built without much special-casing for + file format, which makes the uploader code more readable. + """ + def __init__(self, + shnum, + rref, # a remote reference to a storage server + storage_index, + secrets, # (write_enabler, renew_secret, cancel_secret) + seqnum, # the sequence number of the mutable file + required_shares, + total_shares, + segment_size, + data_length): # the length of the original file + self.shnum = shnum + self._rref = rref + self._storage_index = storage_index + self._secrets = secrets + self._seqnum = seqnum + self._required_shares = required_shares + self._total_shares = total_shares + self._segment_size = segment_size + self._data_length = data_length + + # This is an SDMF file, so it should have only one segment, so, + # modulo padding of the data length, the segment size and the + # data length should be the same. + expected_segment_size = mathutil.next_multiple(data_length, + self._required_shares) + assert expected_segment_size == segment_size + + self._block_size = self._segment_size / self._required_shares + + # This is meant to mimic how SDMF files were built before MDMF + # entered the picture: we generate each share in its entirety, + # then push it off to the storage server in one write. When + # callers call set_*, they are just populating this dict. + # finish_publishing will stitch these pieces together into a + # coherent share, and then write the coherent share to the + # storage server. + self._share_pieces = {} + + # This tells the write logic what checkstring to use when + # writing remote shares. + self._testvs = [] + + self._readvs = [(0, struct.calcsize(PREFIX))] + + + def set_checkstring(self, checkstring_or_seqnum, + root_hash=None, + salt=None): + """ + Set the checkstring that I will pass to the remote server when + writing. + + @param checkstring_or_seqnum: A packed checkstring to use, + or a sequence number. I will treat this as a checkstr + + Note that implementations can differ in which semantics they + wish to support for set_checkstring -- they can, for example, + build the checkstring themselves from its constituents, or + some other thing. + """ + if root_hash and salt: + checkstring = struct.pack(PREFIX, + 0, + checkstring_or_seqnum, + root_hash, + salt) + else: + checkstring = checkstring_or_seqnum + self._testvs = [(0, len(checkstring), "eq", checkstring)] + + + def get_checkstring(self): + """ + Get the checkstring that I think currently exists on the remote + server. + """ + if self._testvs: + return self._testvs[0][3] + return "" + + + def put_block(self, data, segnum, salt): + """ + Add a block and salt to the share. + """ + # SDMF files have only one segment + assert segnum == 0 + assert len(data) == self._block_size + assert len(salt) == SALT_SIZE + + self._share_pieces['sharedata'] = data + self._share_pieces['salt'] = salt + + # TODO: Figure out something intelligent to return. + return defer.succeed(None) + + + def put_encprivkey(self, encprivkey): + """ + Add the encrypted private key to the share. + """ + self._share_pieces['encprivkey'] = encprivkey + + return defer.succeed(None) + + + def put_blockhashes(self, blockhashes): + """ + Add the block hash tree to the share. + """ + assert isinstance(blockhashes, list) + for h in blockhashes: + assert len(h) == HASH_SIZE + + # serialize the blockhashes, then set them. + blockhashes_s = "".join(blockhashes) + self._share_pieces['block_hash_tree'] = blockhashes_s + + return defer.succeed(None) + + + def put_sharehashes(self, sharehashes): + """ + Add the share hash chain to the share. + """ + assert isinstance(sharehashes, dict) + for h in sharehashes.itervalues(): + assert len(h) == HASH_SIZE + + # serialize the sharehashes, then set them. + sharehashes_s = "".join([struct.pack(">H32s", i, sharehashes[i]) + for i in sorted(sharehashes.keys())]) + self._share_pieces['share_hash_chain'] = sharehashes_s + + return defer.succeed(None) + + + def put_root_hash(self, root_hash): + """ + Add the root hash to the share. + """ + assert len(root_hash) == HASH_SIZE + + self._share_pieces['root_hash'] = root_hash + + return defer.succeed(None) + + + def put_salt(self, salt): + """ + Add a salt to an empty SDMF file. + """ + assert len(salt) == SALT_SIZE + + self._share_pieces['salt'] = salt + self._share_pieces['sharedata'] = "" + + + def get_signable(self): + """ + Return the part of the share that needs to be signed. + + SDMF writers need to sign the packed representation of the + first eight fields of the remote share, that is: + - version number (0) + - sequence number + - root of the share hash tree + - salt + - k + - n + - segsize + - datalen + + This method is responsible for returning that to callers. + """ + return struct.pack(SIGNED_PREFIX, + 0, + self._seqnum, + self._share_pieces['root_hash'], + self._share_pieces['salt'], + self._required_shares, + self._total_shares, + self._segment_size, + self._data_length) + + + def put_signature(self, signature): + """ + Add the signature to the share. + """ + self._share_pieces['signature'] = signature + + return defer.succeed(None) + + + def put_verification_key(self, verification_key): + """ + Add the verification key to the share. + """ + self._share_pieces['verification_key'] = verification_key + + return defer.succeed(None) + + + def get_verinfo(self): + """ + I return my verinfo tuple. This is used by the ServermapUpdater + to keep track of versions of mutable files. + + The verinfo tuple for MDMF files contains: + - seqnum + - root hash + - a blank (nothing) + - segsize + - datalen + - k + - n + - prefix (the thing that you sign) + - a tuple of offsets + + We include the nonce in MDMF to simplify processing of version + information tuples. + + The verinfo tuple for SDMF files is the same, but contains a + 16-byte IV instead of a hash of salts. + """ + return (self._seqnum, + self._share_pieces['root_hash'], + self._share_pieces['salt'], + self._segment_size, + self._data_length, + self._required_shares, + self._total_shares, + self.get_signable(), + self._get_offsets_tuple()) + + def _get_offsets_dict(self): + post_offset = HEADER_LENGTH + offsets = {} + + verification_key_length = len(self._share_pieces['verification_key']) + o1 = offsets['signature'] = post_offset + verification_key_length + + signature_length = len(self._share_pieces['signature']) + o2 = offsets['share_hash_chain'] = o1 + signature_length + + share_hash_chain_length = len(self._share_pieces['share_hash_chain']) + o3 = offsets['block_hash_tree'] = o2 + share_hash_chain_length + + block_hash_tree_length = len(self._share_pieces['block_hash_tree']) + o4 = offsets['share_data'] = o3 + block_hash_tree_length + + share_data_length = len(self._share_pieces['sharedata']) + o5 = offsets['enc_privkey'] = o4 + share_data_length + + encprivkey_length = len(self._share_pieces['encprivkey']) + offsets['EOF'] = o5 + encprivkey_length + return offsets + + + def _get_offsets_tuple(self): + offsets = self._get_offsets_dict() + return tuple([(key, value) for key, value in offsets.items()]) + + + def _pack_offsets(self): + offsets = self._get_offsets_dict() + return struct.pack(">LLLLQQ", + offsets['signature'], + offsets['share_hash_chain'], + offsets['block_hash_tree'], + offsets['share_data'], + offsets['enc_privkey'], + offsets['EOF']) + + + def finish_publishing(self): + """ + Do anything necessary to finish writing the share to a remote + server. I require that no further publishing needs to take place + after this method has been called. + """ + for k in ["sharedata", "encprivkey", "signature", "verification_key", + "share_hash_chain", "block_hash_tree"]: + assert k in self._share_pieces + # This is the only method that actually writes something to the + # remote server. + # First, we need to pack the share into data that we can write + # to the remote server in one write. + offsets = self._pack_offsets() + prefix = self.get_signable() + final_share = "".join([prefix, + offsets, + self._share_pieces['verification_key'], + self._share_pieces['signature'], + self._share_pieces['share_hash_chain'], + self._share_pieces['block_hash_tree'], + self._share_pieces['sharedata'], + self._share_pieces['encprivkey']]) + + # Our only data vector is going to be writing the final share, + # in its entirely. + datavs = [(0, final_share)] + + if not self._testvs: + # Our caller has not provided us with another checkstring + # yet, so we assume that we are writing a new share, and set + # a test vector that will allow a new share to be written. + self._testvs = [] + self._testvs.append(tuple([0, 1, "eq", ""])) + + tw_vectors = {} + tw_vectors[self.shnum] = (self._testvs, datavs, None) + return self._rref.callRemote("slot_testv_and_readv_and_writev", + self._storage_index, + self._secrets, + tw_vectors, + # TODO is it useful to read something? + self._readvs) + + +MDMFHEADER = ">BQ32sBBQQ QQQQQQQQ" +MDMFHEADERWITHOUTOFFSETS = ">BQ32sBBQQ" +MDMFHEADERSIZE = struct.calcsize(MDMFHEADER) +MDMFHEADERWITHOUTOFFSETSSIZE = struct.calcsize(MDMFHEADERWITHOUTOFFSETS) +MDMFCHECKSTRING = ">BQ32s" +MDMFSIGNABLEHEADER = ">BQ32sBBQQ" +MDMFOFFSETS = ">QQQQQQQQ" +MDMFOFFSETS_LENGTH = struct.calcsize(MDMFOFFSETS) + +PRIVATE_KEY_SIZE = 1220 +SIGNATURE_SIZE = 260 +VERIFICATION_KEY_SIZE = 292 +# We know we won't have more than 256 shares, and we know that we won't +# need to store more than lg 256 of them to validate, so that's our +# bound. We add 1 to the int cast to round to the next integer. +SHARE_HASH_CHAIN_SIZE = int(math.log(HASH_SIZE * 256)) + 1 + +class MDMFSlotWriteProxy: + implements(IMutableSlotWriter) + + """ + I represent a remote write slot for an MDMF mutable file. + + I abstract away from my caller the details of block and salt + management, and the implementation of the on-disk format for MDMF + shares. + """ + # Expected layout, MDMF: + # offset: size: name: + #-- signed part -- + # 0 1 version number (01) + # 1 8 sequence number + # 9 32 share tree root hash + # 41 1 The "k" encoding parameter + # 42 1 The "N" encoding parameter + # 43 8 The segment size of the uploaded file + # 51 8 The data length of the original plaintext + #-- end signed part -- + # 59 8 The offset of the encrypted private key + # 67 8 The offset of the signature + # 75 8 The offset of the verification key + # 83 8 The offset of the end of the v. key. + # 92 8 The offset of the share data + # 100 8 The offset of the block hash tree + # 108 8 The offset of the share hash chain + # 116 8 The offset of EOF + # + # followed by the encrypted private key, signature, verification + # key, share hash chain, data, and block hash tree. We order the + # fields that way to make smart downloaders -- downloaders which + # prempetively read a big part of the share -- possible. + # + # The checkstring is the first three fields -- the version number, + # sequence number, root hash and root salt hash. This is consistent + # in meaning to what we have with SDMF files, except now instead of + # using the literal salt, we use a value derived from all of the + # salts -- the share hash root. + # + # The salt is stored before the block for each segment. The block + # hash tree is computed over the combination of block and salt for + # each segment. In this way, we get integrity checking for both + # block and salt with the current block hash tree arrangement. + # + # The ordering of the offsets is different to reflect the dependencies + # that we'll run into with an MDMF file. The expected write flow is + # something like this: + # + # 0: Initialize with the sequence number, encoding parameters and + # data length. From this, we can deduce the number of segments, + # and where they should go.. We can also figure out where the + # encrypted private key should go, because we can figure out how + # big the share data will be. + # + # 1: Encrypt, encode, and upload the file in chunks. Do something + # like + # + # put_block(data, segnum, salt) + # + # to write a block and a salt to the disk. We can do both of + # these operations now because we have enough of the offsets to + # know where to put them. + # + # 2: Put the encrypted private key. Use: + # + # put_encprivkey(encprivkey) + # + # Now that we know the length of the private key, we can fill + # in the offset for the block hash tree. + # + # 3: We're now in a position to upload the block hash tree for + # a share. Put that using something like: + # + # put_blockhashes(block_hash_tree) + # + # Note that block_hash_tree is a list of hashes -- we'll take + # care of the details of serializing that appropriately. When + # we get the block hash tree, we are also in a position to + # calculate the offset for the share hash chain, and fill that + # into the offsets table. + # + # 4: We're now in a position to upload the share hash chain for + # a share. Do that with something like: + # + # put_sharehashes(share_hash_chain) + # + # share_hash_chain should be a dictionary mapping shnums to + # 32-byte hashes -- the wrapper handles serialization. + # We'll know where to put the signature at this point, also. + # The root of this tree will be put explicitly in the next + # step. + # + # 5: Before putting the signature, we must first put the + # root_hash. Do this with: + # + # put_root_hash(root_hash). + # + # In terms of knowing where to put this value, it was always + # possible to place it, but it makes sense semantically to + # place it after the share hash tree, so that's why you do it + # in this order. + # + # 6: With the root hash put, we can now sign the header. Use: + # + # get_signable() + # + # to get the part of the header that you want to sign, and use: + # + # put_signature(signature) + # + # to write your signature to the remote server. + # + # 6: Add the verification key, and finish. Do: + # + # put_verification_key(key) + # + # and + # + # finish_publish() + # + # Checkstring management: + # + # To write to a mutable slot, we have to provide test vectors to ensure + # that we are writing to the same data that we think we are. These + # vectors allow us to detect uncoordinated writes; that is, writes + # where both we and some other shareholder are writing to the + # mutable slot, and to report those back to the parts of the program + # doing the writing. + # + # With SDMF, this was easy -- all of the share data was written in + # one go, so it was easy to detect uncoordinated writes, and we only + # had to do it once. With MDMF, not all of the file is written at + # once. + # + # If a share is new, we write out as much of the header as we can + # before writing out anything else. This gives other writers a + # canary that they can use to detect uncoordinated writes, and, if + # they do the same thing, gives us the same canary. We them update + # the share. We won't be able to write out two fields of the header + # -- the share tree hash and the salt hash -- until we finish + # writing out the share. We only require the writer to provide the + # initial checkstring, and keep track of what it should be after + # updates ourselves. + # + # If we haven't written anything yet, then on the first write (which + # will probably be a block + salt of a share), we'll also write out + # the header. On subsequent passes, we'll expect to see the header. + # This changes in two places: + # + # - When we write out the salt hash + # - When we write out the root of the share hash tree + # + # since these values will change the header. It is possible that we + # can just make those be written in one operation to minimize + # disruption. + def __init__(self, + shnum, + rref, # a remote reference to a storage server + storage_index, + secrets, # (write_enabler, renew_secret, cancel_secret) + seqnum, # the sequence number of the mutable file + required_shares, + total_shares, + segment_size, + data_length): # the length of the original file + self.shnum = shnum + self._rref = rref + self._storage_index = storage_index + self._seqnum = seqnum + self._required_shares = required_shares + assert self.shnum >= 0 and self.shnum < total_shares + self._total_shares = total_shares + # We build up the offset table as we write things. It is the + # last thing we write to the remote server. + self._offsets = {} + self._testvs = [] + # This is a list of write vectors that will be sent to our + # remote server once we are directed to write things there. + self._writevs = [] + self._secrets = secrets + # The segment size needs to be a multiple of the k parameter -- + # any padding should have been carried out by the publisher + # already. + assert segment_size % required_shares == 0 + self._segment_size = segment_size + self._data_length = data_length + + # These are set later -- we define them here so that we can + # check for their existence easily + + # This is the root of the share hash tree -- the Merkle tree + # over the roots of the block hash trees computed for shares in + # this upload. + self._root_hash = None + + # We haven't yet written anything to the remote bucket. By + # setting this, we tell the _write method as much. The write + # method will then know that it also needs to add a write vector + # for the checkstring (or what we have of it) to the first write + # request. We'll then record that value for future use. If + # we're expecting something to be there already, we need to call + # set_checkstring before we write anything to tell the first + # write about that. + self._written = False + + # When writing data to the storage servers, we get a read vector + # for free. We'll read the checkstring, which will help us + # figure out what's gone wrong if a write fails. + self._readv = [(0, struct.calcsize(MDMFCHECKSTRING))] + + # We calculate the number of segments because it tells us + # where the salt part of the file ends/share segment begins, + # and also because it provides a useful amount of bounds checking. + self._num_segments = mathutil.div_ceil(self._data_length, + self._segment_size) + self._block_size = self._segment_size / self._required_shares + # We also calculate the share size, to help us with block + # constraints later. + tail_size = self._data_length % self._segment_size + if not tail_size: + self._tail_block_size = self._block_size + else: + self._tail_block_size = mathutil.next_multiple(tail_size, + self._required_shares) + self._tail_block_size /= self._required_shares + + # We already know where the sharedata starts; right after the end + # of the header (which is defined as the signable part + the offsets) + # We can also calculate where the encrypted private key begins + # from what we know know. + self._actual_block_size = self._block_size + SALT_SIZE + data_size = self._actual_block_size * (self._num_segments - 1) + data_size += self._tail_block_size + data_size += SALT_SIZE + self._offsets['enc_privkey'] = MDMFHEADERSIZE + + # We don't define offsets for these because we want them to be + # tightly packed -- this allows us to ignore the responsibility + # of padding individual values, and of removing that padding + # later. So nonconstant_start is where we start writing + # nonconstant data. + nonconstant_start = self._offsets['enc_privkey'] + nonconstant_start += PRIVATE_KEY_SIZE + nonconstant_start += SIGNATURE_SIZE + nonconstant_start += VERIFICATION_KEY_SIZE + nonconstant_start += SHARE_HASH_CHAIN_SIZE + + self._offsets['share_data'] = nonconstant_start + + # Finally, we know how big the share data will be, so we can + # figure out where the block hash tree needs to go. + # XXX: But this will go away if Zooko wants to make it so that + # you don't need to know the size of the file before you start + # uploading it. + self._offsets['block_hash_tree'] = self._offsets['share_data'] + \ + data_size + + # Done. We can snow start writing. + + + def set_checkstring(self, + seqnum_or_checkstring, + root_hash=None, + salt=None): + """ + Set checkstring checkstring for the given shnum. + + This can be invoked in one of two ways. + + With one argument, I assume that you are giving me a literal + checkstring -- e.g., the output of get_checkstring. I will then + set that checkstring as it is. This form is used by unit tests. + + With two arguments, I assume that you are giving me a sequence + number and root hash to make a checkstring from. In that case, I + will build a checkstring and set it for you. This form is used + by the publisher. + + By default, I assume that I am writing new shares to the grid. + If you don't explcitly set your own checkstring, I will use + one that requires that the remote share not exist. You will want + to use this method if you are updating a share in-place; + otherwise, writes will fail. + """ + # You're allowed to overwrite checkstrings with this method; + # I assume that users know what they are doing when they call + # it. + if root_hash: + checkstring = struct.pack(MDMFCHECKSTRING, + 1, + seqnum_or_checkstring, + root_hash) + else: + checkstring = seqnum_or_checkstring + + if checkstring == "": + # We special-case this, since len("") = 0, but we need + # length of 1 for the case of an empty share to work on the + # storage server, which is what a checkstring that is the + # empty string means. + self._testvs = [] + else: + self._testvs = [] + self._testvs.append((0, len(checkstring), "eq", checkstring)) + + + def __repr__(self): + return "MDMFSlotWriteProxy for share %d" % self.shnum + + + def get_checkstring(self): + """ + Given a share number, I return a representation of what the + checkstring for that share on the server will look like. + + I am mostly used for tests. + """ + if self._root_hash: + roothash = self._root_hash + else: + roothash = "\x00" * 32 + return struct.pack(MDMFCHECKSTRING, + 1, + self._seqnum, + roothash) + + + def put_block(self, data, segnum, salt): + """ + I queue a write vector for the data, salt, and segment number + provided to me. I return None, as I do not actually cause + anything to be written yet. + """ + if segnum >= self._num_segments: + raise LayoutInvalid("I won't overwrite the block hash tree") + if len(salt) != SALT_SIZE: + raise LayoutInvalid("I was given a salt of size %d, but " + "I wanted a salt of size %d") + if segnum + 1 == self._num_segments: + if len(data) != self._tail_block_size: + raise LayoutInvalid("I was given the wrong size block to write") + elif len(data) != self._block_size: + raise LayoutInvalid("I was given the wrong size block to write") + + # We want to write at len(MDMFHEADER) + segnum * block_size. + offset = self._offsets['share_data'] + \ + (self._actual_block_size * segnum) + data = salt + data + + self._writevs.append(tuple([offset, data])) + + + def put_encprivkey(self, encprivkey): + """ + I queue a write vector for the encrypted private key provided to + me. + """ + assert self._offsets + assert self._offsets['enc_privkey'] + # You shouldn't re-write the encprivkey after the block hash + # tree is written, since that could cause the private key to run + # into the block hash tree. Before it writes the block hash + # tree, the block hash tree writing method writes the offset of + # the share hash chain. So that's a good indicator of whether or + # not the block hash tree has been written. + if "signature" in self._offsets: + raise LayoutInvalid("You can't put the encrypted private key " + "after putting the share hash chain") + + self._offsets['share_hash_chain'] = self._offsets['enc_privkey'] + \ + len(encprivkey) + + self._writevs.append(tuple([self._offsets['enc_privkey'], encprivkey])) + + + def put_blockhashes(self, blockhashes): + """ + I queue a write vector to put the block hash tree in blockhashes + onto the remote server. + + The encrypted private key must be queued before the block hash + tree, since we need to know how large it is to know where the + block hash tree should go. The block hash tree must be put + before the share hash chain, since its size determines the + offset of the share hash chain. + """ + assert self._offsets + assert "block_hash_tree" in self._offsets + + assert isinstance(blockhashes, list) + + blockhashes_s = "".join(blockhashes) + self._offsets['EOF'] = self._offsets['block_hash_tree'] + len(blockhashes_s) + + self._writevs.append(tuple([self._offsets['block_hash_tree'], + blockhashes_s])) + + + def put_sharehashes(self, sharehashes): + """ + I queue a write vector to put the share hash chain in my + argument onto the remote server. + + The block hash tree must be queued before the share hash chain, + since we need to know where the block hash tree ends before we + can know where the share hash chain starts. The share hash chain + must be put before the signature, since the length of the packed + share hash chain determines the offset of the signature. Also, + semantically, you must know what the root of the block hash tree + is before you can generate a valid signature. + """ + assert isinstance(sharehashes, dict) + assert self._offsets + if "share_hash_chain" not in self._offsets: + raise LayoutInvalid("You must put the block hash tree before " + "putting the share hash chain") + + # The signature comes after the share hash chain. If the + # signature has already been written, we must not write another + # share hash chain. The signature writes the verification key + # offset when it gets sent to the remote server, so we look for + # that. + if "verification_key" in self._offsets: + raise LayoutInvalid("You must write the share hash chain " + "before you write the signature") + sharehashes_s = "".join([struct.pack(">H32s", i, sharehashes[i]) + for i in sorted(sharehashes.keys())]) + self._offsets['signature'] = self._offsets['share_hash_chain'] + \ + len(sharehashes_s) + self._writevs.append(tuple([self._offsets['share_hash_chain'], + sharehashes_s])) + + + def put_root_hash(self, roothash): + """ + Put the root hash (the root of the share hash tree) in the + remote slot. + """ + # It does not make sense to be able to put the root + # hash without first putting the share hashes, since you need + # the share hashes to generate the root hash. + # + # Signature is defined by the routine that places the share hash + # chain, so it's a good thing to look for in finding out whether + # or not the share hash chain exists on the remote server. + if len(roothash) != HASH_SIZE: + raise LayoutInvalid("hashes and salts must be exactly %d bytes" + % HASH_SIZE) + self._root_hash = roothash + # To write both of these values, we update the checkstring on + # the remote server, which includes them + checkstring = self.get_checkstring() + self._writevs.append(tuple([0, checkstring])) + # This write, if successful, changes the checkstring, so we need + # to update our internal checkstring to be consistent with the + # one on the server. + + + def get_signable(self): + """ + Get the first seven fields of the mutable file; the parts that + are signed. + """ + if not self._root_hash: + raise LayoutInvalid("You need to set the root hash " + "before getting something to " + "sign") + return struct.pack(MDMFSIGNABLEHEADER, + 1, + self._seqnum, + self._root_hash, + self._required_shares, + self._total_shares, + self._segment_size, + self._data_length) + + + def put_signature(self, signature): + """ + I queue a write vector for the signature of the MDMF share. + + I require that the root hash and share hash chain have been put + to the grid before I will write the signature to the grid. + """ + if "signature" not in self._offsets: + raise LayoutInvalid("You must put the share hash chain " + # It does not make sense to put a signature without first + # putting the root hash and the salt hash (since otherwise + # the signature would be incomplete), so we don't allow that. + "before putting the signature") + if not self._root_hash: + raise LayoutInvalid("You must complete the signed prefix " + "before computing a signature") + # If we put the signature after we put the verification key, we + # could end up running into the verification key, and will + # probably screw up the offsets as well. So we don't allow that. + if "verification_key_end" in self._offsets: + raise LayoutInvalid("You can't put the signature after the " + "verification key") + # The method that writes the verification key defines the EOF + # offset before writing the verification key, so look for that. + self._offsets['verification_key'] = self._offsets['signature'] +\ + len(signature) + self._writevs.append(tuple([self._offsets['signature'], signature])) + + + def put_verification_key(self, verification_key): + """ + I queue a write vector for the verification key. + + I require that the signature have been written to the storage + server before I allow the verification key to be written to the + remote server. + """ + if "verification_key" not in self._offsets: + raise LayoutInvalid("You must put the signature before you " + "can put the verification key") + + self._offsets['verification_key_end'] = \ + self._offsets['verification_key'] + len(verification_key) + assert self._offsets['verification_key_end'] <= self._offsets['share_data'] + self._writevs.append(tuple([self._offsets['verification_key'], + verification_key])) + + + def _get_offsets_tuple(self): + return tuple([(key, value) for key, value in self._offsets.items()]) + + + def get_verinfo(self): + return (self._seqnum, + self._root_hash, + self._required_shares, + self._total_shares, + self._segment_size, + self._data_length, + self.get_signable(), + self._get_offsets_tuple()) + + + def finish_publishing(self): + """ + I add a write vector for the offsets table, and then cause all + of the write vectors that I've dealt with so far to be published + to the remote server, ending the write process. + """ + if "verification_key_end" not in self._offsets: + raise LayoutInvalid("You must put the verification key before " + "you can publish the offsets") + offsets_offset = struct.calcsize(MDMFHEADERWITHOUTOFFSETS) + offsets = struct.pack(MDMFOFFSETS, + self._offsets['enc_privkey'], + self._offsets['share_hash_chain'], + self._offsets['signature'], + self._offsets['verification_key'], + self._offsets['verification_key_end'], + self._offsets['share_data'], + self._offsets['block_hash_tree'], + self._offsets['EOF']) + self._writevs.append(tuple([offsets_offset, offsets])) + encoding_parameters_offset = struct.calcsize(MDMFCHECKSTRING) + params = struct.pack(">BBQQ", + self._required_shares, + self._total_shares, + self._segment_size, + self._data_length) + self._writevs.append(tuple([encoding_parameters_offset, params])) + return self._write(self._writevs) + + + def _write(self, datavs, on_failure=None, on_success=None): + """I write the data vectors in datavs to the remote slot.""" + tw_vectors = {} + if not self._testvs: + self._testvs = [] + self._testvs.append(tuple([0, 1, "eq", ""])) + if not self._written: + # Write a new checkstring to the share when we write it, so + # that we have something to check later. + new_checkstring = self.get_checkstring() + datavs.append((0, new_checkstring)) + def _first_write(): + self._written = True + self._testvs = [(0, len(new_checkstring), "eq", new_checkstring)] + on_success = _first_write + tw_vectors[self.shnum] = (self._testvs, datavs, None) + d = self._rref.callRemote("slot_testv_and_readv_and_writev", + self._storage_index, + self._secrets, + tw_vectors, + self._readv) + def _result(results): + if isinstance(results, failure.Failure) or not results[0]: + # Do nothing; the write was unsuccessful. + if on_failure: on_failure() + else: + if on_success: on_success() + return results + d.addCallback(_result) + return d + + +class MDMFSlotReadProxy: + """ + I read from a mutable slot filled with data written in the MDMF data + format (which is described above). + + I can be initialized with some amount of data, which I will use (if + it is valid) to eliminate some of the need to fetch it from servers. + """ + def __init__(self, + rref, + storage_index, + shnum, + data=""): + # Start the initialization process. + self._rref = rref + self._storage_index = storage_index + self.shnum = shnum + + # Before doing anything, the reader is probably going to want to + # verify that the signature is correct. To do that, they'll need + # the verification key, and the signature. To get those, we'll + # need the offset table. So fetch the offset table on the + # assumption that that will be the first thing that a reader is + # going to do. + + # The fact that these encoding parameters are None tells us + # that we haven't yet fetched them from the remote share, so we + # should. We could just not set them, but the checks will be + # easier to read if we don't have to use hasattr. + self._version_number = None + self._sequence_number = None + self._root_hash = None + # Filled in if we're dealing with an SDMF file. Unused + # otherwise. + self._salt = None + self._required_shares = None + self._total_shares = None + self._segment_size = None + self._data_length = None + self._offsets = None + + # If the user has chosen to initialize us with some data, we'll + # try to satisfy subsequent data requests with that data before + # asking the storage server for it. If + self._data = data + # The way callers interact with cache in the filenode returns + # None if there isn't any cached data, but the way we index the + # cached data requires a string, so convert None to "". + if self._data == None: + self._data = "" + + self._queue_observers = observer.ObserverList() + self._queue_errbacks = observer.ObserverList() + self._readvs = [] + + + def _maybe_fetch_offsets_and_header(self, force_remote=False): + """ + I fetch the offset table and the header from the remote slot if + I don't already have them. If I do have them, I do nothing and + return an empty Deferred. + """ + if self._offsets: + return defer.succeed(None) + # At this point, we may be either SDMF or MDMF. Fetching 107 + # bytes will be enough to get header and offsets for both SDMF and + # MDMF, though we'll be left with 4 more bytes than we + # need if this ends up being MDMF. This is probably less + # expensive than the cost of a second roundtrip. + readvs = [(0, 123)] + d = self._read(readvs, force_remote) + d.addCallback(self._process_encoding_parameters) + d.addCallback(self._process_offsets) + return d + + + def _process_encoding_parameters(self, encoding_parameters): + assert self.shnum in encoding_parameters + encoding_parameters = encoding_parameters[self.shnum][0] + # The first byte is the version number. It will tell us what + # to do next. + (verno,) = struct.unpack(">B", encoding_parameters[:1]) + if verno == MDMF_VERSION: + read_size = MDMFHEADERWITHOUTOFFSETSSIZE + (verno, + seqnum, + root_hash, + k, + n, + segsize, + datalen) = struct.unpack(MDMFHEADERWITHOUTOFFSETS, + encoding_parameters[:read_size]) + if segsize == 0 and datalen == 0: + # Empty file, no segments. + self._num_segments = 0 + else: + self._num_segments = mathutil.div_ceil(datalen, segsize) + + elif verno == SDMF_VERSION: + read_size = SIGNED_PREFIX_LENGTH + (verno, + seqnum, + root_hash, + salt, + k, + n, + segsize, + datalen) = struct.unpack(">BQ32s16s BBQQ", + encoding_parameters[:SIGNED_PREFIX_LENGTH]) + self._salt = salt + if segsize == 0 and datalen == 0: + # empty file + self._num_segments = 0 + else: + # non-empty SDMF files have one segment. + self._num_segments = 1 + else: + raise UnknownVersionError("You asked me to read mutable file " + "version %d, but I only understand " + "%d and %d" % (verno, SDMF_VERSION, + MDMF_VERSION)) + + self._version_number = verno + self._sequence_number = seqnum + self._root_hash = root_hash + self._required_shares = k + self._total_shares = n + self._segment_size = segsize + self._data_length = datalen + + self._block_size = self._segment_size / self._required_shares + # We can upload empty files, and need to account for this fact + # so as to avoid zero-division and zero-modulo errors. + if datalen > 0: + tail_size = self._data_length % self._segment_size + else: + tail_size = 0 + if not tail_size: + self._tail_block_size = self._block_size + else: + self._tail_block_size = mathutil.next_multiple(tail_size, + self._required_shares) + self._tail_block_size /= self._required_shares + + return encoding_parameters + + + def _process_offsets(self, offsets): + if self._version_number == 0: + read_size = OFFSETS_LENGTH + read_offset = SIGNED_PREFIX_LENGTH + end = read_size + read_offset + (signature, + share_hash_chain, + block_hash_tree, + share_data, + enc_privkey, + EOF) = struct.unpack(">LLLLQQ", + offsets[read_offset:end]) + self._offsets = {} + self._offsets['signature'] = signature + self._offsets['share_data'] = share_data + self._offsets['block_hash_tree'] = block_hash_tree + self._offsets['share_hash_chain'] = share_hash_chain + self._offsets['enc_privkey'] = enc_privkey + self._offsets['EOF'] = EOF + + elif self._version_number == 1: + read_offset = MDMFHEADERWITHOUTOFFSETSSIZE + read_length = MDMFOFFSETS_LENGTH + end = read_offset + read_length + (encprivkey, + sharehashes, + signature, + verification_key, + verification_key_end, + sharedata, + blockhashes, + eof) = struct.unpack(MDMFOFFSETS, + offsets[read_offset:end]) + self._offsets = {} + self._offsets['enc_privkey'] = encprivkey + self._offsets['block_hash_tree'] = blockhashes + self._offsets['share_hash_chain'] = sharehashes + self._offsets['signature'] = signature + self._offsets['verification_key'] = verification_key + self._offsets['verification_key_end']= \ + verification_key_end + self._offsets['EOF'] = eof + self._offsets['share_data'] = sharedata + + + def get_block_and_salt(self, segnum, queue=False): + """ + I return (block, salt), where block is the block data and + salt is the salt used to encrypt that segment. + """ + d = self._maybe_fetch_offsets_and_header() + def _then(ignored): + base_share_offset = self._offsets['share_data'] + + if segnum + 1 > self._num_segments: + raise LayoutInvalid("Not a valid segment number") + + if self._version_number == 0: + share_offset = base_share_offset + self._block_size * segnum + else: + share_offset = base_share_offset + (self._block_size + \ + SALT_SIZE) * segnum + if segnum + 1 == self._num_segments: + data = self._tail_block_size + else: + data = self._block_size + + if self._version_number == 1: + data += SALT_SIZE + + readvs = [(share_offset, data)] + return readvs + d.addCallback(_then) + d.addCallback(lambda readvs: + self._read(readvs, queue=queue)) + def _process_results(results): + assert self.shnum in results + if self._version_number == 0: + # We only read the share data, but we know the salt from + # when we fetched the header + data = results[self.shnum] + if not data: + data = "" + else: + assert len(data) == 1 + data = data[0] + salt = self._salt + else: + data = results[self.shnum] + if not data: + salt = data = "" + else: + salt_and_data = results[self.shnum][0] + salt = salt_and_data[:SALT_SIZE] + data = salt_and_data[SALT_SIZE:] + return data, salt + d.addCallback(_process_results) + return d + + + def get_blockhashes(self, needed=None, queue=False, force_remote=False): + """ + I return the block hash tree + + I take an optional argument, needed, which is a set of indices + correspond to hashes that I should fetch. If this argument is + missing, I will fetch the entire block hash tree; otherwise, I + may attempt to fetch fewer hashes, based on what needed says + that I should do. Note that I may fetch as many hashes as I + want, so long as the set of hashes that I do fetch is a superset + of the ones that I am asked for, so callers should be prepared + to tolerate additional hashes. + """ + # TODO: Return only the parts of the block hash tree necessary + # to validate the blocknum provided? + # This is a good idea, but it is hard to implement correctly. It + # is bad to fetch any one block hash more than once, so we + # probably just want to fetch the whole thing at once and then + # serve it. + if needed == set([]): + return defer.succeed([]) + d = self._maybe_fetch_offsets_and_header() + def _then(ignored): + blockhashes_offset = self._offsets['block_hash_tree'] + if self._version_number == 1: + blockhashes_length = self._offsets['EOF'] - blockhashes_offset + else: + blockhashes_length = self._offsets['share_data'] - blockhashes_offset + readvs = [(blockhashes_offset, blockhashes_length)] + return readvs + d.addCallback(_then) + d.addCallback(lambda readvs: + self._read(readvs, queue=queue, force_remote=force_remote)) + def _build_block_hash_tree(results): + assert self.shnum in results + + rawhashes = results[self.shnum][0] + results = [rawhashes[i:i+HASH_SIZE] + for i in range(0, len(rawhashes), HASH_SIZE)] + return results + d.addCallback(_build_block_hash_tree) + return d + + + def get_sharehashes(self, needed=None, queue=False, force_remote=False): + """ + I return the part of the share hash chain placed to validate + this share. + + I take an optional argument, needed. Needed is a set of indices + that correspond to the hashes that I should fetch. If needed is + not present, I will fetch and return the entire share hash + chain. Otherwise, I may fetch and return any part of the share + hash chain that is a superset of the part that I am asked to + fetch. Callers should be prepared to deal with more hashes than + they've asked for. + """ + if needed == set([]): + return defer.succeed([]) + d = self._maybe_fetch_offsets_and_header() + + def _make_readvs(ignored): + sharehashes_offset = self._offsets['share_hash_chain'] + if self._version_number == 0: + sharehashes_length = self._offsets['block_hash_tree'] - sharehashes_offset + else: + sharehashes_length = self._offsets['signature'] - sharehashes_offset + readvs = [(sharehashes_offset, sharehashes_length)] + return readvs + d.addCallback(_make_readvs) + d.addCallback(lambda readvs: + self._read(readvs, queue=queue, force_remote=force_remote)) + def _build_share_hash_chain(results): + assert self.shnum in results + + sharehashes = results[self.shnum][0] + results = [sharehashes[i:i+(HASH_SIZE + 2)] + for i in range(0, len(sharehashes), HASH_SIZE + 2)] + results = dict([struct.unpack(">H32s", data) + for data in results]) + return results + d.addCallback(_build_share_hash_chain) + return d + + + def get_encprivkey(self, queue=False): + """ + I return the encrypted private key. + """ + d = self._maybe_fetch_offsets_and_header() + + def _make_readvs(ignored): + privkey_offset = self._offsets['enc_privkey'] + if self._version_number == 0: + privkey_length = self._offsets['EOF'] - privkey_offset + else: + privkey_length = self._offsets['share_hash_chain'] - privkey_offset + readvs = [(privkey_offset, privkey_length)] + return readvs + d.addCallback(_make_readvs) + d.addCallback(lambda readvs: + self._read(readvs, queue=queue)) + def _process_results(results): + assert self.shnum in results + privkey = results[self.shnum][0] + return privkey + d.addCallback(_process_results) + return d + + + def get_signature(self, queue=False): + """ + I return the signature of my share. + """ + d = self._maybe_fetch_offsets_and_header() + + def _make_readvs(ignored): + signature_offset = self._offsets['signature'] + if self._version_number == 1: + signature_length = self._offsets['verification_key'] - signature_offset + else: + signature_length = self._offsets['share_hash_chain'] - signature_offset + readvs = [(signature_offset, signature_length)] + return readvs + d.addCallback(_make_readvs) + d.addCallback(lambda readvs: + self._read(readvs, queue=queue)) + def _process_results(results): + assert self.shnum in results + signature = results[self.shnum][0] + return signature + d.addCallback(_process_results) + return d + + + def get_verification_key(self, queue=False): + """ + I return the verification key. + """ + d = self._maybe_fetch_offsets_and_header() + + def _make_readvs(ignored): + if self._version_number == 1: + vk_offset = self._offsets['verification_key'] + vk_length = self._offsets['verification_key_end'] - vk_offset + else: + vk_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ") + vk_length = self._offsets['signature'] - vk_offset + readvs = [(vk_offset, vk_length)] + return readvs + d.addCallback(_make_readvs) + d.addCallback(lambda readvs: + self._read(readvs, queue=queue)) + def _process_results(results): + assert self.shnum in results + verification_key = results[self.shnum][0] + return verification_key + d.addCallback(_process_results) + return d + + + def get_encoding_parameters(self): + """ + I return (k, n, segsize, datalen) + """ + d = self._maybe_fetch_offsets_and_header() + d.addCallback(lambda ignored: + (self._required_shares, + self._total_shares, + self._segment_size, + self._data_length)) + return d + + + def get_seqnum(self): + """ + I return the sequence number for this share. + """ + d = self._maybe_fetch_offsets_and_header() + d.addCallback(lambda ignored: + self._sequence_number) + return d + + + def get_root_hash(self): + """ + I return the root of the block hash tree + """ + d = self._maybe_fetch_offsets_and_header() + d.addCallback(lambda ignored: self._root_hash) + return d + + + def get_checkstring(self): + """ + I return the packed representation of the following: + + - version number + - sequence number + - root hash + - salt hash + + which my users use as a checkstring to detect other writers. + """ + d = self._maybe_fetch_offsets_and_header() + def _build_checkstring(ignored): + if self._salt: + checkstring = struct.pack(PREFIX, + self._version_number, + self._sequence_number, + self._root_hash, + self._salt) + else: + checkstring = struct.pack(MDMFCHECKSTRING, + self._version_number, + self._sequence_number, + self._root_hash) + + return checkstring + d.addCallback(_build_checkstring) + return d + + + def get_prefix(self, force_remote): + d = self._maybe_fetch_offsets_and_header(force_remote) + d.addCallback(lambda ignored: + self._build_prefix()) + return d + + + def _build_prefix(self): + # The prefix is another name for the part of the remote share + # that gets signed. It consists of everything up to and + # including the datalength, packed by struct. + if self._version_number == SDMF_VERSION: + return struct.pack(SIGNED_PREFIX, + self._version_number, + self._sequence_number, + self._root_hash, + self._salt, + self._required_shares, + self._total_shares, + self._segment_size, + self._data_length) + + else: + return struct.pack(MDMFSIGNABLEHEADER, + self._version_number, + self._sequence_number, + self._root_hash, + self._required_shares, + self._total_shares, + self._segment_size, + self._data_length) + + + def _get_offsets_tuple(self): + # The offsets tuple is another component of the version + # information tuple. It is basically our offsets dictionary, + # itemized and in a tuple. + return self._offsets.copy() + + + def get_verinfo(self): + """ + I return my verinfo tuple. This is used by the ServermapUpdater + to keep track of versions of mutable files. + + The verinfo tuple for MDMF files contains: + - seqnum + - root hash + - a blank (nothing) + - segsize + - datalen + - k + - n + - prefix (the thing that you sign) + - a tuple of offsets + + We include the nonce in MDMF to simplify processing of version + information tuples. + + The verinfo tuple for SDMF files is the same, but contains a + 16-byte IV instead of a hash of salts. + """ + d = self._maybe_fetch_offsets_and_header() + def _build_verinfo(ignored): + if self._version_number == SDMF_VERSION: + salt_to_use = self._salt + else: + salt_to_use = None + return (self._sequence_number, + self._root_hash, + salt_to_use, + self._segment_size, + self._data_length, + self._required_shares, + self._total_shares, + self._build_prefix(), + self._get_offsets_tuple()) + d.addCallback(_build_verinfo) + return d + + + def flush(self): + """ + I flush my queue of read vectors. + """ + d = self._read(self._readvs) + def _then(results): + self._readvs = [] + if isinstance(results, failure.Failure): + self._queue_errbacks.notify(results) + else: + self._queue_observers.notify(results) + self._queue_observers = observer.ObserverList() + self._queue_errbacks = observer.ObserverList() + d.addBoth(_then) + + + def _read(self, readvs, force_remote=False, queue=False): + unsatisfiable = filter(lambda x: x[0] + x[1] > len(self._data), readvs) + # TODO: It's entirely possible to tweak this so that it just + # fulfills the requests that it can, and not demand that all + # requests are satisfiable before running it. + if not unsatisfiable and not force_remote: + results = [self._data[offset:offset+length] + for (offset, length) in readvs] + results = {self.shnum: results} + return defer.succeed(results) + else: + if queue: + start = len(self._readvs) + self._readvs += readvs + end = len(self._readvs) + def _get_results(results, start, end): + if not self.shnum in results: + return {self._shnum: [""]} + return {self.shnum: results[self.shnum][start:end]} + d = defer.Deferred() + d.addCallback(_get_results, start, end) + self._queue_observers.subscribe(d.callback) + self._queue_errbacks.subscribe(d.errback) + return d + return self._rref.callRemote("slot_readv", + self._storage_index, + [self.shnum], + readvs) + + + def is_sdmf(self): + """I tell my caller whether or not my remote file is SDMF or MDMF + """ + d = self._maybe_fetch_offsets_and_header() + d.addCallback(lambda ignored: + self._version_number == 0) + return d + + +class LayoutInvalid(Exception): + """ + This isn't a valid MDMF mutable file + """ diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 81a889ed..549b839f 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -1,4 +1,4 @@ -import time, os.path, platform, stat, re, simplejson, struct +import time, os.path, platform, stat, re, simplejson, struct, shutil import mock @@ -20,8 +20,16 @@ from allmydata.storage.crawler import BucketCountingCrawler from allmydata.storage.expirer import LeaseCheckingCrawler from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \ ReadBucketProxy +from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \ + LayoutInvalid, MDMFSIGNABLEHEADER, \ + SIGNED_PREFIX, MDMFHEADER, \ + MDMFOFFSETS, SDMFSlotWriteProxy, \ + PRIVATE_KEY_SIZE, \ + SIGNATURE_SIZE, \ + VERIFICATION_KEY_SIZE, \ + SHARE_HASH_CHAIN_SIZE from allmydata.interfaces import BadWriteEnablerError -from allmydata.test.common import LoggingServiceParent +from allmydata.test.common import LoggingServiceParent, ShouldFailMixin from allmydata.test.common_web import WebRenderingMixin from allmydata.test.no_network import NoNetworkServer from allmydata.web.storage import StorageStatus, remove_prefix @@ -100,12 +108,23 @@ class Bucket(unittest.TestCase): class RemoteBucket: + def __init__(self): + self.read_count = 0 + self.write_count = 0 + def callRemote(self, methname, *args, **kwargs): def _call(): meth = getattr(self.target, "remote_" + methname) return meth(*args, **kwargs) + + if methname == "slot_readv": + self.read_count += 1 + if "writev" in methname: + self.write_count += 1 + return defer.maybeDeferred(_call) + class BucketProxy(unittest.TestCase): def make_bucket(self, name, size): basedir = os.path.join("storage", "BucketProxy", name) @@ -1288,6 +1307,1462 @@ class MutableServer(unittest.TestCase): self.failUnless(os.path.exists(prefixdir), prefixdir) self.failIf(os.path.exists(bucketdir), bucketdir) + +class MDMFProxies(unittest.TestCase, ShouldFailMixin): + def setUp(self): + self.sparent = LoggingServiceParent() + self._lease_secret = itertools.count() + self.ss = self.create("MDMFProxies storage test server") + self.rref = RemoteBucket() + self.rref.target = self.ss + self.secrets = (self.write_enabler("we_secret"), + self.renew_secret("renew_secret"), + self.cancel_secret("cancel_secret")) + self.segment = "aaaaaa" + self.block = "aa" + self.salt = "a" * 16 + self.block_hash = "a" * 32 + self.block_hash_tree = [self.block_hash for i in xrange(6)] + self.share_hash = self.block_hash + self.share_hash_chain = dict([(i, self.share_hash) for i in xrange(6)]) + self.signature = "foobarbaz" + self.verification_key = "vvvvvv" + self.encprivkey = "private" + self.root_hash = self.block_hash + self.salt_hash = self.root_hash + self.salt_hash_tree = [self.salt_hash for i in xrange(6)] + self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree) + self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain) + # blockhashes and salt hashes are serialized in the same way, + # only we lop off the first element and store that in the + # header. + self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:]) + + + def tearDown(self): + self.sparent.stopService() + shutil.rmtree(self.workdir("MDMFProxies storage test server")) + + + def write_enabler(self, we_tag): + return hashutil.tagged_hash("we_blah", we_tag) + + + def renew_secret(self, tag): + return hashutil.tagged_hash("renew_blah", str(tag)) + + + def cancel_secret(self, tag): + return hashutil.tagged_hash("cancel_blah", str(tag)) + + + def workdir(self, name): + basedir = os.path.join("storage", "MutableServer", name) + return basedir + + + def create(self, name): + workdir = self.workdir(name) + ss = StorageServer(workdir, "\x00" * 20) + ss.setServiceParent(self.sparent) + return ss + + + def build_test_mdmf_share(self, tail_segment=False, empty=False): + # Start with the checkstring + data = struct.pack(">BQ32s", + 1, + 0, + self.root_hash) + self.checkstring = data + # Next, the encoding parameters + if tail_segment: + data += struct.pack(">BBQQ", + 3, + 10, + 6, + 33) + elif empty: + data += struct.pack(">BBQQ", + 3, + 10, + 0, + 0) + else: + data += struct.pack(">BBQQ", + 3, + 10, + 6, + 36) + # Now we'll build the offsets. + sharedata = "" + if not tail_segment and not empty: + for i in xrange(6): + sharedata += self.salt + self.block + elif tail_segment: + for i in xrange(5): + sharedata += self.salt + self.block + sharedata += self.salt + "a" + + # The encrypted private key comes after the shares + salts + offset_size = struct.calcsize(MDMFOFFSETS) + encrypted_private_key_offset = len(data) + offset_size + # The share has chain comes after the private key + sharehashes_offset = encrypted_private_key_offset + \ + len(self.encprivkey) + + # The signature comes after the share hash chain. + signature_offset = sharehashes_offset + len(self.share_hash_chain_s) + + verification_key_offset = signature_offset + len(self.signature) + verification_key_end = verification_key_offset + \ + len(self.verification_key) + + share_data_offset = offset_size + share_data_offset += PRIVATE_KEY_SIZE + share_data_offset += SIGNATURE_SIZE + share_data_offset += VERIFICATION_KEY_SIZE + share_data_offset += SHARE_HASH_CHAIN_SIZE + + blockhashes_offset = share_data_offset + len(sharedata) + eof_offset = blockhashes_offset + len(self.block_hash_tree_s) + + data += struct.pack(MDMFOFFSETS, + encrypted_private_key_offset, + sharehashes_offset, + signature_offset, + verification_key_offset, + verification_key_end, + share_data_offset, + blockhashes_offset, + eof_offset) + + self.offsets = {} + self.offsets['enc_privkey'] = encrypted_private_key_offset + self.offsets['block_hash_tree'] = blockhashes_offset + self.offsets['share_hash_chain'] = sharehashes_offset + self.offsets['signature'] = signature_offset + self.offsets['verification_key'] = verification_key_offset + self.offsets['share_data'] = share_data_offset + self.offsets['verification_key_end'] = verification_key_end + self.offsets['EOF'] = eof_offset + + # the private key, + data += self.encprivkey + # the sharehashes + data += self.share_hash_chain_s + # the signature, + data += self.signature + # and the verification key + data += self.verification_key + # Then we'll add in gibberish until we get to the right point. + nulls = "".join([" " for i in xrange(len(data), share_data_offset)]) + data += nulls + + # Then the share data + data += sharedata + # the blockhashes + data += self.block_hash_tree_s + return data + + + def write_test_share_to_server(self, + storage_index, + tail_segment=False, + empty=False): + """ + I write some data for the read tests to read to self.ss + + If tail_segment=True, then I will write a share that has a + smaller tail segment than other segments. + """ + write = self.ss.remote_slot_testv_and_readv_and_writev + data = self.build_test_mdmf_share(tail_segment, empty) + # Finally, we write the whole thing to the storage server in one + # pass. + testvs = [(0, 1, "eq", "")] + tws = {} + tws[0] = (testvs, [(0, data)], None) + readv = [(0, 1)] + results = write(storage_index, self.secrets, tws, readv) + self.failUnless(results[0]) + + + def build_test_sdmf_share(self, empty=False): + if empty: + sharedata = "" + else: + sharedata = self.segment * 6 + self.sharedata = sharedata + blocksize = len(sharedata) / 3 + block = sharedata[:blocksize] + self.blockdata = block + prefix = struct.pack(">BQ32s16s BBQQ", + 0, # version, + 0, + self.root_hash, + self.salt, + 3, + 10, + len(sharedata), + len(sharedata), + ) + post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ") + signature_offset = post_offset + len(self.verification_key) + sharehashes_offset = signature_offset + len(self.signature) + blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s) + sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s) + encprivkey_offset = sharedata_offset + len(block) + eof_offset = encprivkey_offset + len(self.encprivkey) + offsets = struct.pack(">LLLLQQ", + signature_offset, + sharehashes_offset, + blockhashes_offset, + sharedata_offset, + encprivkey_offset, + eof_offset) + final_share = "".join([prefix, + offsets, + self.verification_key, + self.signature, + self.share_hash_chain_s, + self.block_hash_tree_s, + block, + self.encprivkey]) + self.offsets = {} + self.offsets['signature'] = signature_offset + self.offsets['share_hash_chain'] = sharehashes_offset + self.offsets['block_hash_tree'] = blockhashes_offset + self.offsets['share_data'] = sharedata_offset + self.offsets['enc_privkey'] = encprivkey_offset + self.offsets['EOF'] = eof_offset + return final_share + + + def write_sdmf_share_to_server(self, + storage_index, + empty=False): + # Some tests need SDMF shares to verify that we can still + # read them. This method writes one, which resembles but is not + assert self.rref + write = self.ss.remote_slot_testv_and_readv_and_writev + share = self.build_test_sdmf_share(empty) + testvs = [(0, 1, "eq", "")] + tws = {} + tws[0] = (testvs, [(0, share)], None) + readv = [] + results = write(storage_index, self.secrets, tws, readv) + self.failUnless(results[0]) + + + def test_read(self): + self.write_test_share_to_server("si1") + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + # Check that every method equals what we expect it to. + d = defer.succeed(None) + def _check_block_and_salt((block, salt)): + self.failUnlessEqual(block, self.block) + self.failUnlessEqual(salt, self.salt) + + for i in xrange(6): + d.addCallback(lambda ignored, i=i: + mr.get_block_and_salt(i)) + d.addCallback(_check_block_and_salt) + + d.addCallback(lambda ignored: + mr.get_encprivkey()) + d.addCallback(lambda encprivkey: + self.failUnlessEqual(self.encprivkey, encprivkey)) + + d.addCallback(lambda ignored: + mr.get_blockhashes()) + d.addCallback(lambda blockhashes: + self.failUnlessEqual(self.block_hash_tree, blockhashes)) + + d.addCallback(lambda ignored: + mr.get_sharehashes()) + d.addCallback(lambda sharehashes: + self.failUnlessEqual(self.share_hash_chain, sharehashes)) + + d.addCallback(lambda ignored: + mr.get_signature()) + d.addCallback(lambda signature: + self.failUnlessEqual(signature, self.signature)) + + d.addCallback(lambda ignored: + mr.get_verification_key()) + d.addCallback(lambda verification_key: + self.failUnlessEqual(verification_key, self.verification_key)) + + d.addCallback(lambda ignored: + mr.get_seqnum()) + d.addCallback(lambda seqnum: + self.failUnlessEqual(seqnum, 0)) + + d.addCallback(lambda ignored: + mr.get_root_hash()) + d.addCallback(lambda root_hash: + self.failUnlessEqual(self.root_hash, root_hash)) + + d.addCallback(lambda ignored: + mr.get_seqnum()) + d.addCallback(lambda seqnum: + self.failUnlessEqual(0, seqnum)) + + d.addCallback(lambda ignored: + mr.get_encoding_parameters()) + def _check_encoding_parameters((k, n, segsize, datalen)): + self.failUnlessEqual(k, 3) + self.failUnlessEqual(n, 10) + self.failUnlessEqual(segsize, 6) + self.failUnlessEqual(datalen, 36) + d.addCallback(_check_encoding_parameters) + + d.addCallback(lambda ignored: + mr.get_checkstring()) + d.addCallback(lambda checkstring: + self.failUnlessEqual(checkstring, checkstring)) + return d + + + def test_read_with_different_tail_segment_size(self): + self.write_test_share_to_server("si1", tail_segment=True) + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + d = mr.get_block_and_salt(5) + def _check_tail_segment(results): + block, salt = results + self.failUnlessEqual(len(block), 1) + self.failUnlessEqual(block, "a") + d.addCallback(_check_tail_segment) + return d + + + def test_get_block_with_invalid_segnum(self): + self.write_test_share_to_server("si1") + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + d = defer.succeed(None) + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "test invalid segnum", + None, + mr.get_block_and_salt, 7)) + return d + + + def test_get_encoding_parameters_first(self): + self.write_test_share_to_server("si1") + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + d = mr.get_encoding_parameters() + def _check_encoding_parameters((k, n, segment_size, datalen)): + self.failUnlessEqual(k, 3) + self.failUnlessEqual(n, 10) + self.failUnlessEqual(segment_size, 6) + self.failUnlessEqual(datalen, 36) + d.addCallback(_check_encoding_parameters) + return d + + + def test_get_seqnum_first(self): + self.write_test_share_to_server("si1") + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + d = mr.get_seqnum() + d.addCallback(lambda seqnum: + self.failUnlessEqual(seqnum, 0)) + return d + + + def test_get_root_hash_first(self): + self.write_test_share_to_server("si1") + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + d = mr.get_root_hash() + d.addCallback(lambda root_hash: + self.failUnlessEqual(root_hash, self.root_hash)) + return d + + + def test_get_checkstring_first(self): + self.write_test_share_to_server("si1") + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + d = mr.get_checkstring() + d.addCallback(lambda checkstring: + self.failUnlessEqual(checkstring, self.checkstring)) + return d + + + def test_write_read_vectors(self): + # When writing for us, the storage server will return to us a + # read vector, along with its result. If a write fails because + # the test vectors failed, this read vector can help us to + # diagnose the problem. This test ensures that the read vector + # is working appropriately. + mw = self._make_new_mw("si1", 0) + + for i in xrange(6): + mw.put_block(self.block, i, self.salt) + mw.put_encprivkey(self.encprivkey) + mw.put_blockhashes(self.block_hash_tree) + mw.put_sharehashes(self.share_hash_chain) + mw.put_root_hash(self.root_hash) + mw.put_signature(self.signature) + mw.put_verification_key(self.verification_key) + d = mw.finish_publishing() + def _then(results): + self.failUnless(len(results), 2) + result, readv = results + self.failUnless(result) + self.failIf(readv) + self.old_checkstring = mw.get_checkstring() + mw.set_checkstring("") + d.addCallback(_then) + d.addCallback(lambda ignored: + mw.finish_publishing()) + def _then_again(results): + self.failUnlessEqual(len(results), 2) + result, readvs = results + self.failIf(result) + self.failUnlessIn(0, readvs) + readv = readvs[0][0] + self.failUnlessEqual(readv, self.old_checkstring) + d.addCallback(_then_again) + # The checkstring remains the same for the rest of the process. + return d + + + def test_private_key_after_share_hash_chain(self): + mw = self._make_new_mw("si1", 0) + d = defer.succeed(None) + for i in xrange(6): + d.addCallback(lambda ignored, i=i: + mw.put_block(self.block, i, self.salt)) + d.addCallback(lambda ignored: + mw.put_encprivkey(self.encprivkey)) + d.addCallback(lambda ignored: + mw.put_sharehashes(self.share_hash_chain)) + + # Now try to put the private key again. + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "test repeat private key", + None, + mw.put_encprivkey, self.encprivkey)) + return d + + + def test_signature_after_verification_key(self): + mw = self._make_new_mw("si1", 0) + d = defer.succeed(None) + # Put everything up to and including the verification key. + for i in xrange(6): + d.addCallback(lambda ignored, i=i: + mw.put_block(self.block, i, self.salt)) + d.addCallback(lambda ignored: + mw.put_encprivkey(self.encprivkey)) + d.addCallback(lambda ignored: + mw.put_blockhashes(self.block_hash_tree)) + d.addCallback(lambda ignored: + mw.put_sharehashes(self.share_hash_chain)) + d.addCallback(lambda ignored: + mw.put_root_hash(self.root_hash)) + d.addCallback(lambda ignored: + mw.put_signature(self.signature)) + d.addCallback(lambda ignored: + mw.put_verification_key(self.verification_key)) + # Now try to put the signature again. This should fail + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "signature after verification", + None, + mw.put_signature, self.signature)) + return d + + + def test_uncoordinated_write(self): + # Make two mutable writers, both pointing to the same storage + # server, both at the same storage index, and try writing to the + # same share. + mw1 = self._make_new_mw("si1", 0) + mw2 = self._make_new_mw("si1", 0) + + def _check_success(results): + result, readvs = results + self.failUnless(result) + + def _check_failure(results): + result, readvs = results + self.failIf(result) + + def _write_share(mw): + for i in xrange(6): + mw.put_block(self.block, i, self.salt) + mw.put_encprivkey(self.encprivkey) + mw.put_blockhashes(self.block_hash_tree) + mw.put_sharehashes(self.share_hash_chain) + mw.put_root_hash(self.root_hash) + mw.put_signature(self.signature) + mw.put_verification_key(self.verification_key) + return mw.finish_publishing() + d = _write_share(mw1) + d.addCallback(_check_success) + d.addCallback(lambda ignored: + _write_share(mw2)) + d.addCallback(_check_failure) + return d + + + def test_invalid_salt_size(self): + # Salts need to be 16 bytes in size. Writes that attempt to + # write more or less than this should be rejected. + mw = self._make_new_mw("si1", 0) + invalid_salt = "a" * 17 # 17 bytes + another_invalid_salt = "b" * 15 # 15 bytes + d = defer.succeed(None) + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "salt too big", + None, + mw.put_block, self.block, 0, invalid_salt)) + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "salt too small", + None, + mw.put_block, self.block, 0, + another_invalid_salt)) + return d + + + def test_write_test_vectors(self): + # If we give the write proxy a bogus test vector at + # any point during the process, it should fail to write when we + # tell it to write. + def _check_failure(results): + self.failUnlessEqual(len(results), 2) + res, d = results + self.failIf(res) + + def _check_success(results): + self.failUnlessEqual(len(results), 2) + res, d = results + self.failUnless(results) + + mw = self._make_new_mw("si1", 0) + mw.set_checkstring("this is a lie") + for i in xrange(6): + mw.put_block(self.block, i, self.salt) + mw.put_encprivkey(self.encprivkey) + mw.put_blockhashes(self.block_hash_tree) + mw.put_sharehashes(self.share_hash_chain) + mw.put_root_hash(self.root_hash) + mw.put_signature(self.signature) + mw.put_verification_key(self.verification_key) + d = mw.finish_publishing() + d.addCallback(_check_failure) + d.addCallback(lambda ignored: + mw.set_checkstring("")) + d.addCallback(lambda ignored: + mw.finish_publishing()) + d.addCallback(_check_success) + return d + + + def serialize_blockhashes(self, blockhashes): + return "".join(blockhashes) + + + def serialize_sharehashes(self, sharehashes): + ret = "".join([struct.pack(">H32s", i, sharehashes[i]) + for i in sorted(sharehashes.keys())]) + return ret + + + def test_write(self): + # This translates to a file with 6 6-byte segments, and with 2-byte + # blocks. + mw = self._make_new_mw("si1", 0) + # Test writing some blocks. + read = self.ss.remote_slot_readv + expected_private_key_offset = struct.calcsize(MDMFHEADER) + expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \ + PRIVATE_KEY_SIZE + \ + SIGNATURE_SIZE + \ + VERIFICATION_KEY_SIZE + \ + SHARE_HASH_CHAIN_SIZE + written_block_size = 2 + len(self.salt) + written_block = self.block + self.salt + for i in xrange(6): + mw.put_block(self.block, i, self.salt) + + mw.put_encprivkey(self.encprivkey) + mw.put_blockhashes(self.block_hash_tree) + mw.put_sharehashes(self.share_hash_chain) + mw.put_root_hash(self.root_hash) + mw.put_signature(self.signature) + mw.put_verification_key(self.verification_key) + d = mw.finish_publishing() + def _check_publish(results): + self.failUnlessEqual(len(results), 2) + result, ign = results + self.failUnless(result, "publish failed") + for i in xrange(6): + self.failUnlessEqual(read("si1", [0], [(expected_sharedata_offset + (i * written_block_size), written_block_size)]), + {0: [written_block]}) + + self.failUnlessEqual(len(self.encprivkey), 7) + self.failUnlessEqual(read("si1", [0], [(expected_private_key_offset, 7)]), + {0: [self.encprivkey]}) + + expected_block_hash_offset = expected_sharedata_offset + \ + (6 * written_block_size) + self.failUnlessEqual(len(self.block_hash_tree_s), 32 * 6) + self.failUnlessEqual(read("si1", [0], [(expected_block_hash_offset, 32 * 6)]), + {0: [self.block_hash_tree_s]}) + + expected_share_hash_offset = expected_private_key_offset + len(self.encprivkey) + self.failUnlessEqual(read("si1", [0],[(expected_share_hash_offset, (32 + 2) * 6)]), + {0: [self.share_hash_chain_s]}) + + self.failUnlessEqual(read("si1", [0], [(9, 32)]), + {0: [self.root_hash]}) + expected_signature_offset = expected_share_hash_offset + \ + len(self.share_hash_chain_s) + self.failUnlessEqual(len(self.signature), 9) + self.failUnlessEqual(read("si1", [0], [(expected_signature_offset, 9)]), + {0: [self.signature]}) + + expected_verification_key_offset = expected_signature_offset + len(self.signature) + self.failUnlessEqual(len(self.verification_key), 6) + self.failUnlessEqual(read("si1", [0], [(expected_verification_key_offset, 6)]), + {0: [self.verification_key]}) + + signable = mw.get_signable() + verno, seq, roothash, k, n, segsize, datalen = \ + struct.unpack(">BQ32sBBQQ", + signable) + self.failUnlessEqual(verno, 1) + self.failUnlessEqual(seq, 0) + self.failUnlessEqual(roothash, self.root_hash) + self.failUnlessEqual(k, 3) + self.failUnlessEqual(n, 10) + self.failUnlessEqual(segsize, 6) + self.failUnlessEqual(datalen, 36) + expected_eof_offset = expected_block_hash_offset + \ + len(self.block_hash_tree_s) + + # Check the version number to make sure that it is correct. + expected_version_number = struct.pack(">B", 1) + self.failUnlessEqual(read("si1", [0], [(0, 1)]), + {0: [expected_version_number]}) + # Check the sequence number to make sure that it is correct + expected_sequence_number = struct.pack(">Q", 0) + self.failUnlessEqual(read("si1", [0], [(1, 8)]), + {0: [expected_sequence_number]}) + # Check that the encoding parameters (k, N, segement size, data + # length) are what they should be. These are 3, 10, 6, 36 + expected_k = struct.pack(">B", 3) + self.failUnlessEqual(read("si1", [0], [(41, 1)]), + {0: [expected_k]}) + expected_n = struct.pack(">B", 10) + self.failUnlessEqual(read("si1", [0], [(42, 1)]), + {0: [expected_n]}) + expected_segment_size = struct.pack(">Q", 6) + self.failUnlessEqual(read("si1", [0], [(43, 8)]), + {0: [expected_segment_size]}) + expected_data_length = struct.pack(">Q", 36) + self.failUnlessEqual(read("si1", [0], [(51, 8)]), + {0: [expected_data_length]}) + expected_offset = struct.pack(">Q", expected_private_key_offset) + self.failUnlessEqual(read("si1", [0], [(59, 8)]), + {0: [expected_offset]}) + expected_offset = struct.pack(">Q", expected_share_hash_offset) + self.failUnlessEqual(read("si1", [0], [(67, 8)]), + {0: [expected_offset]}) + expected_offset = struct.pack(">Q", expected_signature_offset) + self.failUnlessEqual(read("si1", [0], [(75, 8)]), + {0: [expected_offset]}) + expected_offset = struct.pack(">Q", expected_verification_key_offset) + self.failUnlessEqual(read("si1", [0], [(83, 8)]), + {0: [expected_offset]}) + expected_offset = struct.pack(">Q", expected_verification_key_offset + len(self.verification_key)) + self.failUnlessEqual(read("si1", [0], [(91, 8)]), + {0: [expected_offset]}) + expected_offset = struct.pack(">Q", expected_sharedata_offset) + self.failUnlessEqual(read("si1", [0], [(99, 8)]), + {0: [expected_offset]}) + expected_offset = struct.pack(">Q", expected_block_hash_offset) + self.failUnlessEqual(read("si1", [0], [(107, 8)]), + {0: [expected_offset]}) + expected_offset = struct.pack(">Q", expected_eof_offset) + self.failUnlessEqual(read("si1", [0], [(115, 8)]), + {0: [expected_offset]}) + d.addCallback(_check_publish) + return d + + def _make_new_mw(self, si, share, datalength=36): + # This is a file of size 36 bytes. Since it has a segment + # size of 6, we know that it has 6 byte segments, which will + # be split into blocks of 2 bytes because our FEC k + # parameter is 3. + mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10, + 6, datalength) + return mw + + + def test_write_rejected_with_too_many_blocks(self): + mw = self._make_new_mw("si0", 0) + + # Try writing too many blocks. We should not be able to write + # more than 6 + # blocks into each share. + d = defer.succeed(None) + for i in xrange(6): + d.addCallback(lambda ignored, i=i: + mw.put_block(self.block, i, self.salt)) + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "too many blocks", + None, + mw.put_block, self.block, 7, self.salt)) + return d + + + def test_write_rejected_with_invalid_salt(self): + # Try writing an invalid salt. Salts are 16 bytes -- any more or + # less should cause an error. + mw = self._make_new_mw("si1", 0) + bad_salt = "a" * 17 # 17 bytes + d = defer.succeed(None) + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "test_invalid_salt", + None, mw.put_block, self.block, 7, bad_salt)) + return d + + + def test_write_rejected_with_invalid_root_hash(self): + # Try writing an invalid root hash. This should be SHA256d, and + # 32 bytes long as a result. + mw = self._make_new_mw("si2", 0) + # 17 bytes != 32 bytes + invalid_root_hash = "a" * 17 + d = defer.succeed(None) + # Before this test can work, we need to put some blocks + salts, + # a block hash tree, and a share hash tree. Otherwise, we'll see + # failures that match what we are looking for, but are caused by + # the constraints imposed on operation ordering. + for i in xrange(6): + d.addCallback(lambda ignored, i=i: + mw.put_block(self.block, i, self.salt)) + d.addCallback(lambda ignored: + mw.put_encprivkey(self.encprivkey)) + d.addCallback(lambda ignored: + mw.put_blockhashes(self.block_hash_tree)) + d.addCallback(lambda ignored: + mw.put_sharehashes(self.share_hash_chain)) + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "invalid root hash", + None, mw.put_root_hash, invalid_root_hash)) + return d + + + def test_write_rejected_with_invalid_blocksize(self): + # The blocksize implied by the writer that we get from + # _make_new_mw is 2bytes -- any more or any less than this + # should be cause for failure, unless it is the tail segment, in + # which case it may not be failure. + invalid_block = "a" + mw = self._make_new_mw("si3", 0, 33) # implies a tail segment with + # one byte blocks + # 1 bytes != 2 bytes + d = defer.succeed(None) + d.addCallback(lambda ignored, invalid_block=invalid_block: + self.shouldFail(LayoutInvalid, "test blocksize too small", + None, mw.put_block, invalid_block, 0, + self.salt)) + invalid_block = invalid_block * 3 + # 3 bytes != 2 bytes + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "test blocksize too large", + None, + mw.put_block, invalid_block, 0, self.salt)) + for i in xrange(5): + d.addCallback(lambda ignored, i=i: + mw.put_block(self.block, i, self.salt)) + # Try to put an invalid tail segment + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "test invalid tail segment", + None, + mw.put_block, self.block, 5, self.salt)) + valid_block = "a" + d.addCallback(lambda ignored: + mw.put_block(valid_block, 5, self.salt)) + return d + + + def test_write_enforces_order_constraints(self): + # We require that the MDMFSlotWriteProxy be interacted with in a + # specific way. + # That way is: + # 0: __init__ + # 1: write blocks and salts + # 2: Write the encrypted private key + # 3: Write the block hashes + # 4: Write the share hashes + # 5: Write the root hash and salt hash + # 6: Write the signature and verification key + # 7: Write the file. + # + # Some of these can be performed out-of-order, and some can't. + # The dependencies that I want to test here are: + # - Private key before block hashes + # - share hashes and block hashes before root hash + # - root hash before signature + # - signature before verification key + mw0 = self._make_new_mw("si0", 0) + # Write some shares + d = defer.succeed(None) + for i in xrange(6): + d.addCallback(lambda ignored, i=i: + mw0.put_block(self.block, i, self.salt)) + + # Try to write the share hash chain without writing the + # encrypted private key + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "share hash chain before " + "private key", + None, + mw0.put_sharehashes, self.share_hash_chain)) + # Write the private key. + d.addCallback(lambda ignored: + mw0.put_encprivkey(self.encprivkey)) + + # Now write the block hashes and try again + d.addCallback(lambda ignored: + mw0.put_blockhashes(self.block_hash_tree)) + + # We haven't yet put the root hash on the share, so we shouldn't + # be able to sign it. + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "signature before root hash", + None, mw0.put_signature, self.signature)) + + d.addCallback(lambda ignored: + self.failUnlessRaises(LayoutInvalid, mw0.get_signable)) + + # ..and, since that fails, we also shouldn't be able to put the + # verification key. + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "key before signature", + None, mw0.put_verification_key, + self.verification_key)) + + # Now write the share hashes. + d.addCallback(lambda ignored: + mw0.put_sharehashes(self.share_hash_chain)) + # We should be able to write the root hash now too + d.addCallback(lambda ignored: + mw0.put_root_hash(self.root_hash)) + + # We should still be unable to put the verification key + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "key before signature", + None, mw0.put_verification_key, + self.verification_key)) + + d.addCallback(lambda ignored: + mw0.put_signature(self.signature)) + + # We shouldn't be able to write the offsets to the remote server + # until the offset table is finished; IOW, until we have written + # the verification key. + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "offsets before verification key", + None, + mw0.finish_publishing)) + + d.addCallback(lambda ignored: + mw0.put_verification_key(self.verification_key)) + return d + + + def test_end_to_end(self): + mw = self._make_new_mw("si1", 0) + # Write a share using the mutable writer, and make sure that the + # reader knows how to read everything back to us. + d = defer.succeed(None) + for i in xrange(6): + d.addCallback(lambda ignored, i=i: + mw.put_block(self.block, i, self.salt)) + d.addCallback(lambda ignored: + mw.put_encprivkey(self.encprivkey)) + d.addCallback(lambda ignored: + mw.put_blockhashes(self.block_hash_tree)) + d.addCallback(lambda ignored: + mw.put_sharehashes(self.share_hash_chain)) + d.addCallback(lambda ignored: + mw.put_root_hash(self.root_hash)) + d.addCallback(lambda ignored: + mw.put_signature(self.signature)) + d.addCallback(lambda ignored: + mw.put_verification_key(self.verification_key)) + d.addCallback(lambda ignored: + mw.finish_publishing()) + + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + def _check_block_and_salt((block, salt)): + self.failUnlessEqual(block, self.block) + self.failUnlessEqual(salt, self.salt) + + for i in xrange(6): + d.addCallback(lambda ignored, i=i: + mr.get_block_and_salt(i)) + d.addCallback(_check_block_and_salt) + + d.addCallback(lambda ignored: + mr.get_encprivkey()) + d.addCallback(lambda encprivkey: + self.failUnlessEqual(self.encprivkey, encprivkey)) + + d.addCallback(lambda ignored: + mr.get_blockhashes()) + d.addCallback(lambda blockhashes: + self.failUnlessEqual(self.block_hash_tree, blockhashes)) + + d.addCallback(lambda ignored: + mr.get_sharehashes()) + d.addCallback(lambda sharehashes: + self.failUnlessEqual(self.share_hash_chain, sharehashes)) + + d.addCallback(lambda ignored: + mr.get_signature()) + d.addCallback(lambda signature: + self.failUnlessEqual(signature, self.signature)) + + d.addCallback(lambda ignored: + mr.get_verification_key()) + d.addCallback(lambda verification_key: + self.failUnlessEqual(verification_key, self.verification_key)) + + d.addCallback(lambda ignored: + mr.get_seqnum()) + d.addCallback(lambda seqnum: + self.failUnlessEqual(seqnum, 0)) + + d.addCallback(lambda ignored: + mr.get_root_hash()) + d.addCallback(lambda root_hash: + self.failUnlessEqual(self.root_hash, root_hash)) + + d.addCallback(lambda ignored: + mr.get_encoding_parameters()) + def _check_encoding_parameters((k, n, segsize, datalen)): + self.failUnlessEqual(k, 3) + self.failUnlessEqual(n, 10) + self.failUnlessEqual(segsize, 6) + self.failUnlessEqual(datalen, 36) + d.addCallback(_check_encoding_parameters) + + d.addCallback(lambda ignored: + mr.get_checkstring()) + d.addCallback(lambda checkstring: + self.failUnlessEqual(checkstring, mw.get_checkstring())) + return d + + + def test_is_sdmf(self): + # The MDMFSlotReadProxy should also know how to read SDMF files, + # since it will encounter them on the grid. Callers use the + # is_sdmf method to test this. + self.write_sdmf_share_to_server("si1") + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + d = mr.is_sdmf() + d.addCallback(lambda issdmf: + self.failUnless(issdmf)) + return d + + + def test_reads_sdmf(self): + # The slot read proxy should, naturally, know how to tell us + # about data in the SDMF format + self.write_sdmf_share_to_server("si1") + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + d = defer.succeed(None) + d.addCallback(lambda ignored: + mr.is_sdmf()) + d.addCallback(lambda issdmf: + self.failUnless(issdmf)) + + # What do we need to read? + # - The sharedata + # - The salt + d.addCallback(lambda ignored: + mr.get_block_and_salt(0)) + def _check_block_and_salt(results): + block, salt = results + # Our original file is 36 bytes long. Then each share is 12 + # bytes in size. The share is composed entirely of the + # letter a. self.block contains 2 as, so 6 * self.block is + # what we are looking for. + self.failUnlessEqual(block, self.block * 6) + self.failUnlessEqual(salt, self.salt) + d.addCallback(_check_block_and_salt) + + # - The blockhashes + d.addCallback(lambda ignored: + mr.get_blockhashes()) + d.addCallback(lambda blockhashes: + self.failUnlessEqual(self.block_hash_tree, + blockhashes, + blockhashes)) + # - The sharehashes + d.addCallback(lambda ignored: + mr.get_sharehashes()) + d.addCallback(lambda sharehashes: + self.failUnlessEqual(self.share_hash_chain, + sharehashes)) + # - The keys + d.addCallback(lambda ignored: + mr.get_encprivkey()) + d.addCallback(lambda encprivkey: + self.failUnlessEqual(encprivkey, self.encprivkey, encprivkey)) + d.addCallback(lambda ignored: + mr.get_verification_key()) + d.addCallback(lambda verification_key: + self.failUnlessEqual(verification_key, + self.verification_key, + verification_key)) + # - The signature + d.addCallback(lambda ignored: + mr.get_signature()) + d.addCallback(lambda signature: + self.failUnlessEqual(signature, self.signature, signature)) + + # - The sequence number + d.addCallback(lambda ignored: + mr.get_seqnum()) + d.addCallback(lambda seqnum: + self.failUnlessEqual(seqnum, 0, seqnum)) + + # - The root hash + d.addCallback(lambda ignored: + mr.get_root_hash()) + d.addCallback(lambda root_hash: + self.failUnlessEqual(root_hash, self.root_hash, root_hash)) + return d + + + def test_only_reads_one_segment_sdmf(self): + # SDMF shares have only one segment, so it doesn't make sense to + # read more segments than that. The reader should know this and + # complain if we try to do that. + self.write_sdmf_share_to_server("si1") + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + d = defer.succeed(None) + d.addCallback(lambda ignored: + mr.is_sdmf()) + d.addCallback(lambda issdmf: + self.failUnless(issdmf)) + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "test bad segment", + None, + mr.get_block_and_salt, 1)) + return d + + + def test_read_with_prefetched_mdmf_data(self): + # The MDMFSlotReadProxy will prefill certain fields if you pass + # it data that you have already fetched. This is useful for + # cases like the Servermap, which prefetches ~2kb of data while + # finding out which shares are on the remote peer so that it + # doesn't waste round trips. + mdmf_data = self.build_test_mdmf_share() + self.write_test_share_to_server("si1") + def _make_mr(ignored, length): + mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length]) + return mr + + d = defer.succeed(None) + # This should be enough to fill in both the encoding parameters + # and the table of offsets, which will complete the version + # information tuple. + d.addCallback(_make_mr, 123) + d.addCallback(lambda mr: + mr.get_verinfo()) + def _check_verinfo(verinfo): + self.failUnless(verinfo) + self.failUnlessEqual(len(verinfo), 9) + (seqnum, + root_hash, + salt_hash, + segsize, + datalen, + k, + n, + prefix, + offsets) = verinfo + self.failUnlessEqual(seqnum, 0) + self.failUnlessEqual(root_hash, self.root_hash) + self.failUnlessEqual(segsize, 6) + self.failUnlessEqual(datalen, 36) + self.failUnlessEqual(k, 3) + self.failUnlessEqual(n, 10) + expected_prefix = struct.pack(MDMFSIGNABLEHEADER, + 1, + seqnum, + root_hash, + k, + n, + segsize, + datalen) + self.failUnlessEqual(expected_prefix, prefix) + self.failUnlessEqual(self.rref.read_count, 0) + d.addCallback(_check_verinfo) + # This is not enough data to read a block and a share, so the + # wrapper should attempt to read this from the remote server. + d.addCallback(_make_mr, 123) + d.addCallback(lambda mr: + mr.get_block_and_salt(0)) + def _check_block_and_salt((block, salt)): + self.failUnlessEqual(block, self.block) + self.failUnlessEqual(salt, self.salt) + self.failUnlessEqual(self.rref.read_count, 1) + # This should be enough data to read one block. + d.addCallback(_make_mr, 123 + PRIVATE_KEY_SIZE + SIGNATURE_SIZE + VERIFICATION_KEY_SIZE + SHARE_HASH_CHAIN_SIZE + 140) + d.addCallback(lambda mr: + mr.get_block_and_salt(0)) + d.addCallback(_check_block_and_salt) + return d + + + def test_read_with_prefetched_sdmf_data(self): + sdmf_data = self.build_test_sdmf_share() + self.write_sdmf_share_to_server("si1") + def _make_mr(ignored, length): + mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length]) + return mr + + d = defer.succeed(None) + # This should be enough to get us the encoding parameters, + # offset table, and everything else we need to build a verinfo + # string. + d.addCallback(_make_mr, 123) + d.addCallback(lambda mr: + mr.get_verinfo()) + def _check_verinfo(verinfo): + self.failUnless(verinfo) + self.failUnlessEqual(len(verinfo), 9) + (seqnum, + root_hash, + salt, + segsize, + datalen, + k, + n, + prefix, + offsets) = verinfo + self.failUnlessEqual(seqnum, 0) + self.failUnlessEqual(root_hash, self.root_hash) + self.failUnlessEqual(salt, self.salt) + self.failUnlessEqual(segsize, 36) + self.failUnlessEqual(datalen, 36) + self.failUnlessEqual(k, 3) + self.failUnlessEqual(n, 10) + expected_prefix = struct.pack(SIGNED_PREFIX, + 0, + seqnum, + root_hash, + salt, + k, + n, + segsize, + datalen) + self.failUnlessEqual(expected_prefix, prefix) + self.failUnlessEqual(self.rref.read_count, 0) + d.addCallback(_check_verinfo) + # This shouldn't be enough to read any share data. + d.addCallback(_make_mr, 123) + d.addCallback(lambda mr: + mr.get_block_and_salt(0)) + def _check_block_and_salt((block, salt)): + self.failUnlessEqual(block, self.block * 6) + self.failUnlessEqual(salt, self.salt) + # TODO: Fix the read routine so that it reads only the data + # that it has cached if it can't read all of it. + self.failUnlessEqual(self.rref.read_count, 2) + + # This should be enough to read share data. + d.addCallback(_make_mr, self.offsets['share_data']) + d.addCallback(lambda mr: + mr.get_block_and_salt(0)) + d.addCallback(_check_block_and_salt) + return d + + + def test_read_with_empty_mdmf_file(self): + # Some tests upload a file with no contents to test things + # unrelated to the actual handling of the content of the file. + # The reader should behave intelligently in these cases. + self.write_test_share_to_server("si1", empty=True) + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + # We should be able to get the encoding parameters, and they + # should be correct. + d = defer.succeed(None) + d.addCallback(lambda ignored: + mr.get_encoding_parameters()) + def _check_encoding_parameters(params): + self.failUnlessEqual(len(params), 4) + k, n, segsize, datalen = params + self.failUnlessEqual(k, 3) + self.failUnlessEqual(n, 10) + self.failUnlessEqual(segsize, 0) + self.failUnlessEqual(datalen, 0) + d.addCallback(_check_encoding_parameters) + + # We should not be able to fetch a block, since there are no + # blocks to fetch + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "get block on empty file", + None, + mr.get_block_and_salt, 0)) + return d + + + def test_read_with_empty_sdmf_file(self): + self.write_sdmf_share_to_server("si1", empty=True) + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + # We should be able to get the encoding parameters, and they + # should be correct + d = defer.succeed(None) + d.addCallback(lambda ignored: + mr.get_encoding_parameters()) + def _check_encoding_parameters(params): + self.failUnlessEqual(len(params), 4) + k, n, segsize, datalen = params + self.failUnlessEqual(k, 3) + self.failUnlessEqual(n, 10) + self.failUnlessEqual(segsize, 0) + self.failUnlessEqual(datalen, 0) + d.addCallback(_check_encoding_parameters) + + # It does not make sense to get a block in this format, so we + # should not be able to. + d.addCallback(lambda ignored: + self.shouldFail(LayoutInvalid, "get block on an empty file", + None, + mr.get_block_and_salt, 0)) + return d + + + def test_verinfo_with_sdmf_file(self): + self.write_sdmf_share_to_server("si1") + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + # We should be able to get the version information. + d = defer.succeed(None) + d.addCallback(lambda ignored: + mr.get_verinfo()) + def _check_verinfo(verinfo): + self.failUnless(verinfo) + self.failUnlessEqual(len(verinfo), 9) + (seqnum, + root_hash, + salt, + segsize, + datalen, + k, + n, + prefix, + offsets) = verinfo + self.failUnlessEqual(seqnum, 0) + self.failUnlessEqual(root_hash, self.root_hash) + self.failUnlessEqual(salt, self.salt) + self.failUnlessEqual(segsize, 36) + self.failUnlessEqual(datalen, 36) + self.failUnlessEqual(k, 3) + self.failUnlessEqual(n, 10) + expected_prefix = struct.pack(">BQ32s16s BBQQ", + 0, + seqnum, + root_hash, + salt, + k, + n, + segsize, + datalen) + self.failUnlessEqual(prefix, expected_prefix) + self.failUnlessEqual(offsets, self.offsets) + d.addCallback(_check_verinfo) + return d + + + def test_verinfo_with_mdmf_file(self): + self.write_test_share_to_server("si1") + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + d = defer.succeed(None) + d.addCallback(lambda ignored: + mr.get_verinfo()) + def _check_verinfo(verinfo): + self.failUnless(verinfo) + self.failUnlessEqual(len(verinfo), 9) + (seqnum, + root_hash, + IV, + segsize, + datalen, + k, + n, + prefix, + offsets) = verinfo + self.failUnlessEqual(seqnum, 0) + self.failUnlessEqual(root_hash, self.root_hash) + self.failIf(IV) + self.failUnlessEqual(segsize, 6) + self.failUnlessEqual(datalen, 36) + self.failUnlessEqual(k, 3) + self.failUnlessEqual(n, 10) + expected_prefix = struct.pack(">BQ32s BBQQ", + 1, + seqnum, + root_hash, + k, + n, + segsize, + datalen) + self.failUnlessEqual(prefix, expected_prefix) + self.failUnlessEqual(offsets, self.offsets) + d.addCallback(_check_verinfo) + return d + + + def test_reader_queue(self): + self.write_test_share_to_server('si1') + mr = MDMFSlotReadProxy(self.rref, "si1", 0) + d1 = mr.get_block_and_salt(0, queue=True) + d2 = mr.get_blockhashes(queue=True) + d3 = mr.get_sharehashes(queue=True) + d4 = mr.get_signature(queue=True) + d5 = mr.get_verification_key(queue=True) + dl = defer.DeferredList([d1, d2, d3, d4, d5]) + mr.flush() + def _print(results): + self.failUnlessEqual(len(results), 5) + # We have one read for version information and offsets, and + # one for everything else. + self.failUnlessEqual(self.rref.read_count, 2) + block, salt = results[0][1] # results[0] is a boolean that says + # whether or not the operation + # worked. + self.failUnlessEqual(self.block, block) + self.failUnlessEqual(self.salt, salt) + + blockhashes = results[1][1] + self.failUnlessEqual(self.block_hash_tree, blockhashes) + + sharehashes = results[2][1] + self.failUnlessEqual(self.share_hash_chain, sharehashes) + + signature = results[3][1] + self.failUnlessEqual(self.signature, signature) + + verification_key = results[4][1] + self.failUnlessEqual(self.verification_key, verification_key) + dl.addCallback(_print) + return dl + + + def test_sdmf_writer(self): + # Go through the motions of writing an SDMF share to the storage + # server. Then read the storage server to see that the share got + # written in the way that we think it should have. + + # We do this first so that the necessary instance variables get + # set the way we want them for the tests below. + data = self.build_test_sdmf_share() + sdmfr = SDMFSlotWriteProxy(0, + self.rref, + "si1", + self.secrets, + 0, 3, 10, 36, 36) + # Put the block and salt. + sdmfr.put_block(self.blockdata, 0, self.salt) + + # Put the encprivkey + sdmfr.put_encprivkey(self.encprivkey) + + # Put the block and share hash chains + sdmfr.put_blockhashes(self.block_hash_tree) + sdmfr.put_sharehashes(self.share_hash_chain) + sdmfr.put_root_hash(self.root_hash) + + # Put the signature + sdmfr.put_signature(self.signature) + + # Put the verification key + sdmfr.put_verification_key(self.verification_key) + + # Now check to make sure that nothing has been written yet. + self.failUnlessEqual(self.rref.write_count, 0) + + # Now finish publishing + d = sdmfr.finish_publishing() + def _then(ignored): + self.failUnlessEqual(self.rref.write_count, 1) + read = self.ss.remote_slot_readv + self.failUnlessEqual(read("si1", [0], [(0, len(data))]), + {0: [data]}) + d.addCallback(_then) + return d + + + def test_sdmf_writer_preexisting_share(self): + data = self.build_test_sdmf_share() + self.write_sdmf_share_to_server("si1") + + # Now there is a share on the storage server. To successfully + # write, we need to set the checkstring correctly. When we + # don't, no write should occur. + sdmfw = SDMFSlotWriteProxy(0, + self.rref, + "si1", + self.secrets, + 1, 3, 10, 36, 36) + sdmfw.put_block(self.blockdata, 0, self.salt) + + # Put the encprivkey + sdmfw.put_encprivkey(self.encprivkey) + + # Put the block and share hash chains + sdmfw.put_blockhashes(self.block_hash_tree) + sdmfw.put_sharehashes(self.share_hash_chain) + + # Put the root hash + sdmfw.put_root_hash(self.root_hash) + + # Put the signature + sdmfw.put_signature(self.signature) + + # Put the verification key + sdmfw.put_verification_key(self.verification_key) + + # We shouldn't have a checkstring yet + self.failUnlessEqual(sdmfw.get_checkstring(), "") + + d = sdmfw.finish_publishing() + def _then(results): + self.failIf(results[0]) + # this is the correct checkstring + self._expected_checkstring = results[1][0][0] + return self._expected_checkstring + + d.addCallback(_then) + d.addCallback(sdmfw.set_checkstring) + d.addCallback(lambda ignored: + sdmfw.get_checkstring()) + d.addCallback(lambda checkstring: + self.failUnlessEqual(checkstring, self._expected_checkstring)) + d.addCallback(lambda ignored: + sdmfw.finish_publishing()) + def _then_again(results): + self.failUnless(results[0]) + read = self.ss.remote_slot_readv + self.failUnlessEqual(read("si1", [0], [(1, 8)]), + {0: [struct.pack(">Q", 1)]}) + self.failUnlessEqual(read("si1", [0], [(9, len(data) - 9)]), + {0: [data[9:]]}) + d.addCallback(_then_again) + return d + + class Stats(unittest.TestCase): def setUp(self):