base62 encoding fits more information into alphanumeric chars while avoiding the troublesome non-alphanumeric chars of base64 encoding. In particular, this allows us to work around the ext3 "32,000 entries in a directory" limit while retaining the convenient property that the intermediate directory names are leading prefixes of the storage index file names.
from twisted.application import service
from twisted.python import log
from allmydata.interfaces import IVerifierURI
-from allmydata import uri, download
-from allmydata.util import hashutil, idlib
+from allmydata import uri, download, storage
+from allmydata.util import hashutil
class SimpleCHKFileChecker:
"""Return a list of (needed, total, found, sharemap), where sharemap maps
}
def init_logging(self):
- self._log_prefix = prefix = idlib.b2a(self._storage_index)[:6]
+ self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
num = self._client.log("SimpleCHKFileVerifier(%s): starting" % prefix)
self._log_number = num
def start(self):
- log.msg("starting download [%s]" % idlib.b2a(self._storage_index)[:6])
+ log.msg("starting download [%s]" % storage.si_b2a(self._storage_index)[:5])
# first step: who should we download from?
d = defer.maybeDeferred(self._get_all_shareholders)
}
def init_logging(self):
- self._log_prefix = prefix = idlib.b2a(self._storage_index)[:6]
+ self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
num = self._client.log(format="FileDownloader(%(si)s): starting",
- si=idlib.b2a(self._storage_index))
+ si=storage.si_b2a(self._storage_index))
self._log_number = num
def log(self, *args, **kwargs):
from zope.interface import implements
from twisted.internet import defer
from foolscap import eventual
-from allmydata import uri
+from allmydata import storage, uri
from allmydata.hashtree import HashTree
from allmydata.util import mathutil, hashutil, idlib, log
from allmydata.util.assertutil import _assert, precondition
def __repr__(self):
if hasattr(self, "_storage_index"):
- return "<Encoder for %s>" % idlib.b2a(self._storage_index)[:6]
+ return "<Encoder for %s>" % storage.si_b2a(self._storage_index)[:5]
return "<Encoder for unknown storage index>"
def log(self, *args, **kwargs):
from allmydata.interfaces import IMutableFileNode, IMutableFileURI
from allmydata.util import hashutil, mathutil, idlib, log
from allmydata.uri import WriteableSSKFileURI
-from allmydata import hashtree, codec
+from allmydata import hashtree, codec, storage
from allmydata.encode import NotEnoughPeersError
from pycryptopp.publickey import rsa
from pycryptopp.cipher.aes import AES
self._readkey = filenode.get_readkey()
self._last_failure = None
self._log_number = None
- self._log_prefix = prefix = idlib.b2a(self._storage_index)[:6]
+ self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
num = self._node._client.log("Retrieve(%s): starting" % prefix)
self._log_number = num
def __init__(self, filenode):
self._node = filenode
self._storage_index = self._node.get_storage_index()
- self._log_prefix = prefix = idlib.b2a(self._storage_index)[:6]
+ self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
num = self._node._client.log("Publish(%s): starting" % prefix)
self._log_number = num
# one-per-peer in the normal permuted order.
while shares_needing_homes:
if not reachable_peers:
- prefix = idlib.b2a(self._node.get_storage_index())[:6]
+ prefix = storage.si_b2a(self._node.get_storage_index())[:5]
raise NotEnoughPeersError("ran out of peers during upload of (%s); shares_needing_homes: %s, reachable_peers: %s" % (prefix, shares_needing_homes, reachable_peers,))
shnum = shares_needing_homes.pop(0)
possible_homes = reachable_peers.keys()
self._helper = helper
self._incoming_file = incoming_file
self._encoding_file = encoding_file
- upload_id = idlib.b2a(storage_index)[:6]
+ upload_id = storage.si_b2a(storage_index)[:5]
self._log_number = log_number
self._results = results
self._upload_status = upload.UploadStatus()
def remote_upload_chk(self, storage_index):
r = upload.UploadResults()
started = time.time()
- si_s = idlib.b2a(storage_index)
+ si_s = storage.si_b2a(storage_index)
lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
incoming_file = os.path.join(self._chk_incoming, si_s)
encoding_file = os.path.join(self._chk_encoding, si_s)
print >>out, " lease cancel secret:", b2a(cancel)
def dump_uri_instance(u, nodeid, secret, out, err, show_header=True):
- from allmydata import uri
+ from allmydata import storage, uri
from allmydata.util.idlib import b2a
from allmydata.util import hashutil
print >>out, " UEB hash:", b2a(u.uri_extension_hash)
print >>out, " size:", u.size
print >>out, " k/N: %d/%d" % (u.needed_shares, u.total_shares)
- print >>out, " storage index:", b2a(u.storage_index)
+ print >>out, " storage index:", storage.si_b2a(u.storage_index)
_dump_secrets(u.storage_index, secret, nodeid, out)
elif isinstance(u, uri.CHKFileVerifierURI):
if show_header:
print >>out, " UEB hash:", b2a(u.uri_extension_hash)
print >>out, " size:", u.size
print >>out, " k/N: %d/%d" % (u.needed_shares, u.total_shares)
- print >>out, " storage index:", b2a(u.storage_index)
+ print >>out, " storage index:", storage.si_b2a(u.storage_index)
elif isinstance(u, uri.LiteralFileURI):
if show_header:
print >>out, "SSK Writeable URI:"
print >>out, " writekey:", b2a(u.writekey)
print >>out, " readkey:", b2a(u.readkey)
- print >>out, " storage index:", b2a(u.storage_index)
+ print >>out, " storage index:", storage.si_b2a(u.storage_index)
print >>out, " fingerprint:", b2a(u.fingerprint)
print >>out
if nodeid:
if show_header:
print >>out, "SSK Read-only URI:"
print >>out, " readkey:", b2a(u.readkey)
- print >>out, " storage index:", b2a(u.storage_index)
+ print >>out, " storage index:", storage.si_b2a(u.storage_index)
print >>out, " fingerprint:", b2a(u.fingerprint)
elif isinstance(u, uri.SSKVerifierURI):
if show_header:
print >>out, "SSK Verifier URI:"
- print >>out, " storage index:", b2a(u.storage_index)
+ print >>out, " storage index:", storage.si_b2a(u.storage_index)
print >>out, " fingerprint:", b2a(u.fingerprint)
elif isinstance(u, uri.NewDirectoryURI):
from allmydata import storage
from allmydata.util import idlib
- sharedir = storage.storage_index_to_dir(idlib.a2b(config.si_s))
+ sharedir = storage.storage_index_to_dir(storage.si_a2b(config.si_s))
for d in config.nodedirs:
d = os.path.join(os.path.expanduser(d), "storage/shares", sharedir)
if os.path.exists(d):
from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE, \
BadWriteEnablerError, IStatsProducer
-from allmydata.util import fileutil, idlib, mathutil, log
+from allmydata.util import base62, fileutil, idlib, mathutil, log
from allmydata.util.assertutil import precondition, _assert
import allmydata # for __version__
# B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch
# B+0x48: next lease, or end of record
+def si_b2a(storageindex):
+ return base62.b2a(storageindex)
+
+def si_a2b(ascii_storageindex):
+ return base62.a2b(ascii_storageindex)
+
def storage_index_to_dir(storageindex):
- return os.path.join(idlib.b2a_l(storageindex[:2], 14), idlib.b2a(storageindex))
+ sia = si_b2a(storageindex)
+ return os.path.join(sia[:2], sia)
class ShareFile:
LEASE_SIZE = struct.calcsize(">L32s32sL")
secrets,
test_and_write_vectors,
read_vector):
- si_s = idlib.b2a(storage_index)
+ si_s = si_b2a(storage_index)
si_dir = storage_index_to_dir(storage_index)
(write_enabler, renew_secret, cancel_secret) = secrets
# shares exist if there is a file for them
return share
def remote_slot_readv(self, storage_index, shares, readv):
- si_s = idlib.b2a(storage_index)
+ si_s = si_b2a(storage_index)
lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
facility="tahoe.storage", level=log.OPERATIONAL)
si_dir = storage_index_to_dir(storage_index)
--- /dev/null
+#!/usr/bin/env python
+#
+# Copyright (c) 2002-2008 Bryce "Zooko" Wilcox-O'Hearn
+# mailto:zooko@zooko.com
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this work to deal in this work without restriction (including the rights
+# to use, modify, distribute, sublicense, and/or sell copies).
+
+import random, unittest
+
+from allmydata.util import base62, mathutil
+
+def insecurerandstr(n):
+ return ''.join(map(chr, map(random.randrange, [0]*n, [256]*n)))
+
+class T(unittest.TestCase):
+ def _test_num_octets_that_encode_to_this_many_chars(self, chars, octets):
+ assert base62.num_octets_that_encode_to_this_many_chars(chars) == octets, "%s != %s <- %s" % (octets, base62.num_octets_that_encode_to_this_many_chars(chars), chars)
+
+ def _test_ende(self, bs):
+ as=base62.b2a(bs)
+ bs2=base62.a2b(as)
+ assert bs2 == bs, "bs2: %s:%s, bs: %s:%s, as: %s:%s" % (len(bs2), `bs2`, len(bs), `bs`, len(as), `as`)
+
+ def test_num_octets_that_encode_to_this_many_chars(self):
+ return self._test_num_octets_that_encode_to_this_many_chars(2, 1)
+ return self._test_num_octets_that_encode_to_this_many_chars(3, 2)
+ return self._test_num_octets_that_encode_to_this_many_chars(5, 3)
+ return self._test_num_octets_that_encode_to_this_many_chars(6, 4)
+
+ def test_ende_0x00(self):
+ return self._test_ende('\x00')
+
+ def test_ende_0x01(self):
+ return self._test_ende('\x01')
+
+ def test_ende_0x0100(self):
+ return self._test_ende('\x01\x00')
+
+ def test_ende_0x010000(self):
+ return self._test_ende('\x01\x00\x00')
+
+ def test_ende_0x000000(self):
+ return self._test_ende('\x00\x00\x00')
+
+ def test_ende_0x010000(self):
+ return self._test_ende('\x01\x00\x00')
+
+ def test_ende_randstr(self):
+ return self._test_ende(insecurerandstr(2**4))
+
+ def test_ende_longrandstr(self):
+ return self._test_ende(insecurerandstr(random.randrange(0, 2**10)))
+
+ def test_odd_sizes(self):
+ for j in range(2**6):
+ lib = random.randrange(1, 2**8)
+ numos = mathutil.div_ceil(lib, 8)
+ bs = insecurerandstr(numos)
+ # zero-out unused least-sig bits
+ if lib%8:
+ b=ord(bs[-1])
+ b = b >> (8 - (lib%8))
+ b = b << (8 - (lib%8))
+ bs = bs[:-1] + chr(b)
+ asl = base62.b2a_l(bs, lib)
+ assert len(asl) == base62.num_chars_that_this_many_octets_encode_to(numos) # the size of the base-62 encoding must be just right
+ bs2l = base62.a2b_l(asl, lib)
+ assert len(bs2l) == numos # the size of the result must be just right
+ assert bs == bs2l
+
+def suite():
+ suite = unittest.makeSuite(base62TestCase, 'test')
+ return suite
+
+if __name__ == "__main__":
+ unittest.main()
self.failUnless("UEB hash: hd7rwri6djiapo6itg5hcxa7ze5im7z9qwcdu8oka6qinahsbiuo" in output)
self.failUnless("size: 1234" in output)
self.failUnless("k/N: 25/100" in output)
- self.failUnless("storage index: kckbjgwsckzrj5srjdniw9h3ke" in output)
+ self.failUnless("storage index: 2WlXTYP4ahK2VBkx1pckfC" in output, output)
output = self._dump_cap("--client-secret", "p3w849k9whqhw6b9fkf4xjs5xc",
u.to_string())
self.failUnless("UEB hash: hd7rwri6djiapo6itg5hcxa7ze5im7z9qwcdu8oka6qinahsbiuo" in output)
self.failUnless("size: 1234" in output)
self.failUnless("k/N: 25/100" in output)
- self.failUnless("storage index: kckbjgwsckzrj5srjdniw9h3ke" in output)
+ self.failUnless("storage index: 2WlXTYP4ahK2VBkx1pckfC" in output, output)
prefixed_u = "http://127.0.0.1/uri/%s" % urllib.quote(u.to_string())
output = self._dump_cap(prefixed_u)
self.failUnless("UEB hash: hd7rwri6djiapo6itg5hcxa7ze5im7z9qwcdu8oka6qinahsbiuo" in output)
self.failUnless("size: 1234" in output)
self.failUnless("k/N: 25/100" in output)
- self.failUnless("storage index: kckbjgwsckzrj5srjdniw9h3ke" in output)
+ self.failUnless("storage index: 2WlXTYP4ahK2VBkx1pckfC" in output, output)
def test_dump_cap_lit(self):
u = uri.LiteralFileURI("this is some data")
self.failUnless("SSK Writeable URI:" in output)
self.failUnless("writekey: yryonyebyryonyebyryonyebyr" in output)
self.failUnless("readkey: zhgqsyrkuywo3rha41b1d7xrar" in output)
- self.failUnless("storage index: toz9zpxrzjzwoxtuq6xr3ygdze" in output)
+ self.failUnless("storage index: 4GWqxTUinIqKqWj770lRIA" in output, output)
self.failUnless("fingerprint: 959x79z6959x79z6959x79z6959x79z6959x79z6959x79z6959y" in output)
output = self._dump_cap("--client-secret", "p3w849k9whqhw6b9fkf4xjs5xc",
output = self._dump_cap(u.to_string())
self.failUnless("SSK Read-only URI:" in output)
self.failUnless("readkey: zhgqsyrkuywo3rha41b1d7xrar" in output)
- self.failUnless("storage index: toz9zpxrzjzwoxtuq6xr3ygdze" in output)
+ self.failUnless("storage index: 4GWqxTUinIqKqWj770lRIA" in output, output)
self.failUnless("fingerprint: 959x79z6959x79z6959x79z6959x79z6959x79z6959x79z6959y" in output)
u = u.get_verifier()
output = self._dump_cap(u.to_string())
self.failUnless("SSK Verifier URI:" in output)
- self.failUnless("storage index: toz9zpxrzjzwoxtuq6xr3ygdze" in output)
+ self.failUnless("storage index: 4GWqxTUinIqKqWj770lRIA" in output, output)
self.failUnless("fingerprint: 959x79z6959x79z6959x79z6959x79z6959x79z6959x79z6959y" in output)
def test_dump_cap_directory(self):
self.failUnless("Directory Writeable URI:" in output)
self.failUnless("writekey: yryonyebyryonyebyryonyebyr" in output)
self.failUnless("readkey: zhgqsyrkuywo3rha41b1d7xrar" in output)
- self.failUnless("storage index: toz9zpxrzjzwoxtuq6xr3ygdze" in output)
+ self.failUnless("storage index: 4GWqxTUinIqKqWj770lRIA" in output, output)
self.failUnless("fingerprint: 959x79z6959x79z6959x79z6959x79z6959x79z6959x79z6959y" in output)
output = self._dump_cap("--client-secret", "p3w849k9whqhw6b9fkf4xjs5xc",
output = self._dump_cap(u.to_string())
self.failUnless("Directory Read-only URI:" in output)
self.failUnless("readkey: zhgqsyrkuywo3rha41b1d7xrar" in output)
- self.failUnless("storage index: toz9zpxrzjzwoxtuq6xr3ygdze" in output)
+ self.failUnless("storage index: 4GWqxTUinIqKqWj770lRIA" in output, output)
self.failUnless("fingerprint: 959x79z6959x79z6959x79z6959x79z6959x79z6959x79z6959y" in output)
u = u.get_verifier()
output = self._dump_cap(u.to_string())
self.failUnless("Directory Verifier URI:" in output)
- self.failUnless("storage index: toz9zpxrzjzwoxtuq6xr3ygdze" in output)
+ self.failUnless("storage index: 4GWqxTUinIqKqWj770lRIA" in output, output)
self.failUnless("fingerprint: 959x79z6959x79z6959x79z6959x79z6959x79z6959x79z6959y" in output)
from foolscap import Tub, eventual
from foolscap.logging import log
-from allmydata import upload, offloaded
+from allmydata import offloaded, storage, upload
from allmydata.util import hashutil, fileutil, idlib, mathutil
from pycryptopp.cipher.aes import AES
assert len(key) == 16
encryptor = AES(key)
SI = hashutil.storage_index_hash(key)
- SI_s = idlib.b2a(SI)
+ SI_s = storage.si_b2a(SI)
encfile = os.path.join(self.basedir, "CHK_encoding", SI_s)
f = open(encfile, "wb")
f.write(encryptor.process(DATA))
assert pieces[-5].startswith("client")
client_num = int(pieces[-5][-1])
storage_index_s = pieces[-1]
- storage_index = idlib.a2b(storage_index_s)
+ storage_index = storage.si_a2b(storage_index_s)
for sharename in filenames:
shnum = int(sharename)
filename = os.path.join(dirpath, sharename)
def __repr__(self):
return ("<PeerTracker for peer %s and SI %s>"
% (idlib.shortnodeid_b2a(self.peerid),
- idlib.b2a(self.storage_index)[:6]))
+ storage.si_b2a(self.storage_index)[:5]))
def query(self, sharenums):
d = self._storageserver.callRemote("allocate_buckets",
self._storage_index_elapsed = now - started
storage_index = encoder.get_param("storage_index")
self._storage_index = storage_index
- upload_id = idlib.b2a(storage_index)[:6]
+ upload_id = storage.si_b2a(storage_index)[:5]
self.log("using storage index %s" % upload_id)
peer_selector = self.peer_selector_class(upload_id, self._log_number,
self._upload_status)
import re, urllib
from zope.interface import implements
from twisted.python.components import registerAdapter
-from allmydata.util import idlib, hashutil
+from allmydata import storage
+from allmydata.util import base62, idlib, hashutil
from allmydata.interfaces import IURI, IDirnodeURI, IFileURI, IVerifierURI, \
IMutableFileURI, INewDirectoryURI, IReadonlyNewDirectoryURI
ZBASE32STR_128bits = '(%s{25}%s)' % (idlib.ZBASE32CHAR, idlib.ZBASE32CHAR_3bits)
ZBASE32STR_256bits = '(%s{51}%s)' % (idlib.ZBASE32CHAR, idlib.ZBASE32CHAR_1bits)
+ZBASE62STR_128bits = '(%s{22})' % (base62.ZBASE62CHAR)
SEP='(?::|%3A)'
NUMBER='([0-9]+)'
class CHKFileVerifierURI(_BaseURI):
implements(IVerifierURI)
- STRING_RE=re.compile('^URI:CHK-Verifier:'+ZBASE32STR_128bits+':'+
+ STRING_RE=re.compile('^URI:CHK-Verifier:'+ZBASE62STR_128bits+':'+
ZBASE32STR_256bits+':'+NUMBER+':'+NUMBER+':'+NUMBER)
HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'CHK-Verifier'+SEP+
- ZBASE32STR_128bits+SEP+ZBASE32STR_256bits+SEP+NUMBER+
+ ZBASE62STR_128bits+SEP+ZBASE32STR_256bits+SEP+NUMBER+
SEP+NUMBER+SEP+NUMBER)
def __init__(self, storage_index, uri_extension_hash,
@classmethod
def init_from_string(cls, uri):
mo = cls.STRING_RE.search(uri)
- assert mo, uri
- return cls(idlib.a2b(mo.group(1)), idlib.a2b(mo.group(2)),
+ assert mo, (uri, cls, cls.STRING_RE)
+ return cls(storage.si_a2b(mo.group(1)), idlib.a2b(mo.group(2)),
int(mo.group(3)), int(mo.group(4)), int(mo.group(5)))
def to_string(self):
assert isinstance(self.size, (int,long))
return ('URI:CHK-Verifier:%s:%s:%d:%d:%d' %
- (idlib.b2a(self.storage_index),
+ (storage.si_b2a(self.storage_index),
idlib.b2a(self.uri_extension_hash),
self.needed_shares,
self.total_shares,
implements(IVerifierURI)
BASE_STRING='URI:SSK-Verifier:'
- STRING_RE=re.compile('^'+BASE_STRING+ZBASE32STR_128bits+':'+ZBASE32STR_256bits+'$')
- HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'SSK-RO'+SEP+ZBASE32STR_128bits+SEP+ZBASE32STR_256bits+'$')
+ STRING_RE=re.compile('^'+BASE_STRING+ZBASE62STR_128bits+':'+ZBASE32STR_256bits+'$')
+ HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'SSK-RO'+SEP+ZBASE62STR_128bits+SEP+ZBASE32STR_256bits+'$')
def __init__(self, storage_index, fingerprint):
assert len(storage_index) == 16
def init_from_human_encoding(cls, uri):
mo = cls.HUMAN_RE.search(uri)
assert mo, uri
- return cls(idlib.a2b(mo.group(1)), idlib.a2b(mo.group(2)))
+ return cls(storage.si_a2b(mo.group(1)), idlib.a2b(mo.group(2)))
@classmethod
def init_from_string(cls, uri):
mo = cls.STRING_RE.search(uri)
- assert mo, uri
- return cls(idlib.a2b(mo.group(1)), idlib.a2b(mo.group(2)))
+ assert mo, (uri, cls)
+ return cls(storage.si_a2b(mo.group(1)), idlib.a2b(mo.group(2)))
def to_string(self):
assert isinstance(self.storage_index, str)
assert isinstance(self.fingerprint, str)
- return 'URI:SSK-Verifier:%s:%s' % (idlib.b2a(self.storage_index),
+ return 'URI:SSK-Verifier:%s:%s' % (storage.si_b2a(self.storage_index),
idlib.b2a(self.fingerprint))
class _NewDirectoryBaseURI(_BaseURI):
--- /dev/null
+#!/usr/bin/env python
+
+# Copyright (c) 2002-2008 Bryce "Zooko" Wilcox-O'Hearn
+# mailto:zooko@zooko.com
+# Permission is hereby granted to any person obtaining a copy of this work to
+# deal in this work without restriction (including the rights to use, modify,
+# distribute, sublicense, and/or sell copies).
+
+# from the Python Standard Library
+import string
+
+from allmydata.util.mathutil import div_ceil, log_ceil, log_floor
+
+chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
+
+ZBASE62CHAR = '[' + chars + ']'
+
+vals = ''.join([chr(i) for i in range(62)])
+c2vtranstable = string.maketrans(chars, vals)
+v2ctranstable = string.maketrans(vals, chars)
+identitytranstable = string.maketrans(chars, chars)
+
+def b2a(os):
+ """
+ @param os the data to be encoded (a string)
+
+ @return the contents of os in base-62 encoded form
+ """
+ cs = b2a_l(os, len(os)*8)
+ assert num_octets_that_encode_to_this_many_chars(len(cs)) == len(os), "%s != %s, numchars: %s" % (num_octets_that_encode_to_this_many_chars(len(cs)), len(os), len(cs))
+ return cs
+
+def b2a_l(os, lengthinbits):
+ """
+ @param os the data to be encoded (a string)
+ @param lengthinbits the number of bits of data in os to be encoded
+
+ b2a_l() will generate a base-62 encoded string big enough to encode
+ lengthinbits bits. So for example if os is 3 bytes long and lengthinbits is
+ 17, then b2a_l() will generate a 3-character- long base-62 encoded string
+ (since 3 chars is sufficient to encode more than 2^17 values). If os is 3
+ bytes long and lengthinbits is 18 (or None), then b2a_l() will generate a
+ 4-character string (since 4 chars are required to hold 2^18 values). Note
+ that if os is 3 bytes long and lengthinbits is 17, the least significant 7
+ bits of os are ignored.
+
+ Warning: if you generate a base-62 encoded string with b2a_l(), and then someone else tries to
+ decode it by calling a2b() instead of a2b_l(), then they will (potentially) get a different
+ string than the one you encoded! So use b2a_l() only when you are sure that the encoding and
+ decoding sides know exactly which lengthinbits to use. If you do not have a way for the
+ encoder and the decoder to agree upon the lengthinbits, then it is best to use b2a() and
+ a2b(). The only drawback to using b2a() over b2a_l() is that when you have a number of
+ bits to encode that is not a multiple of 8, b2a() can sometimes generate a base-62 encoded
+ string that is one or two characters longer than necessary.
+
+ @return the contents of os in base-62 encoded form
+ """
+ os = [ord(o) for o in reversed(os)] # treat os as big-endian -- and we want to process the least-significant o first
+
+ value = 0
+ numvalues = 1 # the number of possible values that value could be
+ for o in os:
+ o *= numvalues
+ value += o
+ numvalues *= 256
+
+ chars = []
+ while numvalues > 0:
+ chars.append(value % 62)
+ value //= 62
+ numvalues //= 62
+
+ return string.translate(''.join([chr(c) for c in reversed(chars)]), v2ctranstable) # make it big-endian
+
+def num_octets_that_encode_to_this_many_chars(numcs):
+ return log_floor(62**numcs, 256)
+
+def num_chars_that_this_many_octets_encode_to(numos):
+ return log_ceil(256**numos, 62)
+
+def a2b(cs):
+ """
+ @param cs the base-62 encoded data (a string)
+ """
+ return a2b_l(cs, num_octets_that_encode_to_this_many_chars(len(cs))*8)
+
+def a2b_l(cs, lengthinbits):
+ """
+ @param lengthinbits the number of bits of data in encoded into cs
+
+ a2b_l() will return a result just big enough to hold lengthinbits bits. So
+ for example if cs is 2 characters long (encoding between 5 and 12 bits worth
+ of data) and lengthinbits is 8, then a2b_l() will return a string of length
+ 1 (since 1 byte is sufficient to store 8 bits), but if lengthinbits is 9,
+ then a2b_l() will return a string of length 2.
+
+ Please see the warning in the docstring of b2a_l() regarding the use of
+ b2a() versus b2a_l().
+
+ @return the data encoded in cs
+ """
+ cs = [ord(c) for c in reversed(string.translate(cs, c2vtranstable))] # treat cs as big-endian -- and we want to process the least-significant c first
+
+ value = 0
+ numvalues = 1 # the number of possible values that value could be
+ for c in cs:
+ c *= numvalues
+ value += c
+ numvalues *= 62
+
+ numvalues = 2**lengthinbits
+ bytes = []
+ while numvalues > 1:
+ bytes.append(value % 256)
+ value //= 256
+ numvalues //= 256
+
+ return ''.join([chr(b) for b in reversed(bytes)]) # make it big-endian
def ave(l):
return sum(l) / len(l)
+def log_ceil(n, b):
+ """
+ The smallest integer k such that b^k >= n.
+
+ log_ceil(n, 2) is the number of bits needed to store any of n values, e.g.
+ the number of bits needed to store any of 128 possible values is 7.
+ """
+ p = 1
+ k = 0
+ while p < n:
+ p *= b
+ k += 1
+ return k
+
+def log_floor(n, b):
+ """
+ The largest integer k such that b^k <= n.
+ """
+ p = 1
+ k = 0
+ while p <= n:
+ p *= b
+ k += 1
+ return k - 1