I'm working on a toy module to encrypt tempfiles. The idea is to expose an interface similar to that found in the tempfile
module, but to have data transparently encrypted using session keys. Again, it's just a toy project and not production code.
First, a few technical details. This project is implemented with pycrypto
and is employing:
- AES-CTR
- A counter with a 1-bit randomized prefix
The basic class is structured so as to mimic the file object interface. When a method such as read
or write
is called, we initialize a Crypto.Cipher.AES
object using the class' _cipher
property. This property is implemented as follows:
def _cipher():
doc = doc = "Returns a stateful AES object ready to decrypt at the required \
stream position"
def fget(self):
ctr = Counter.new(nbits=64,
initial_value=self.tell() # alias of self._file.tell
prefix=self._nonce) # set by `Random.new().read(8)
return AES.new(self._key, counter=ctr, mode=self._opmode) # self._opmode = Crypto.Cipher.AES.MODE_CTR
def fset(self, value):
raise AttributeError('Cannot set _cipher')
def fdel(self, value):
raise AttributeError('Cannot delete _cipher')
return locals()
_cipher = property(**_cipher())
Here is an example of how the _cipher
property is used to encrypt transparently during a call to the write
method.
def write(self, data):
if not isinstance(data, str):
raise TypeError('Data must be str (or bytestring)')
self._file.write(self._cipher.encrypt(data))
When decrypting, we apply the opposite transaction as such:
def read(self, size=-1):
return self._cipher.decrypt(self._file.read(size))
This works when making single calls to write
, but fails when multiple calls to write
are chained. For instance:
ep = EphemeralFile() # the class in question
ep.write('Now is the winter of our discontent')
ep.seek(0)
print ep.read()
>> Now is the winter of our discontent
So far so good... but here's where it fails
ep.write(' made glorious summer by this sun of York')
ep.seek(0)
print ep.read()
>> Now is the winter of our discontent"d_"��U�L~ �w���S��h��]"U(��P^��9k
What am I doing wrong? Shouldn't the use of self._file.tell()
in the _cipher
property yield the appropriate counter position for decryption?
Please note that I first suspected that I might be off by one counter tick, so I tried modifying the initial_value=self.tell()
line to initial_value=self.tell() + 1
(also tried with -1
), but to no avail.
For convenience, here is the full class definition. It's fairly short and may yield some insight.
import tempfile
from Crypto.Cipher import AES
from Crypto import Random
from Crypto.Util import Counter
PRNG = Random.new()
class EphemeralFile(object):
def __init__(self, mode='w+b', bufsize=-1, suffix='', prefix='', dir=None,
key_size=32):
self._key = PRNG.read(key_size)
self._nonce = PRNG.read(8)
self._opmode = AES.MODE_CTR
self._file = tempfile.TemporaryFile(mode=mode, bufsize=bufsize,
suffix=suffix, prefix=prefix, dir=dir)
# alias tempfile methods and parameters
self.close = self._file.close
self.closed = self._file.closed
self.encoding = self._file.encoding
self.errors = self._file.errors
self.fileno = self._file.fileno
self.flush = self._file.flush
self.isatty = self._file.isatty
self.mode = self._file.mode
self.name = self._file.name
self.softspace = self._file.softspace
self.truncate = self._file.truncate
self.seek = self._file.seek
self.tell = self._file.tell
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self._file.close()
def __iter__(self):
return (line for line in self.readlines())
def _cipher():
doc = "Returns a stateful AES object ready to decrypt at the required \
stream position"
def fget(self):
ctr = Counter.new(nbits=64,
initial_value=self.tell(),
prefix=self._nonce)
return AES.new(self._key, counter=ctr, mode=self._opmode)
def fset(self, value):
raise AttributeError('Cannot set EphemeralFile._cipher')
def fdel(self):
raise AttributeError('Cannot delete EphemeralFile._cipher')
return locals()
_cipher = property(**_cipher())
def write(self, data):
if not isinstance(data, str):
raise TypeError('Data must be str (or bytestring)')
self._file.write(self._cipher.encrypt(data))
def writelines(self, lines):
self.write("\n".join(lines))
def read(self, size=-1):
return self._cipher.decrypt(self._file.read(size))
def readline(self, size=-1):
fptr = self.tell()
bytes = []
got_line = False
while not got_line:
bytes.append(self.read(1))
if not bytes[-1] or ('\n' in bytes[-1]):
bytes[-1] = bytes[-1][0:bytes[-1].find('\n') + 1]
got_line = True
plaintext = ''.join(bytes)
self.seek(fptr + len(plaintext)) # rewind
return plaintext
def readlines(self, size=-1):
return [line for line in self]
def read_ciphertext(self, size=-1):
"""Read ciphertext without decrypting.
size : int (default -1)
Number of bytes to read. Negative values read the entire stream
return : str
Ciphertext
"""
return self._file.read(size)
def next(self):
return self.readline()
At this point I really don't see where the problem is, so please feel free to nag me with questions and suggest possible solutions.
Many thanks in advance!
I believe one problem is that the
Counter
object should receive asinitial_value
the AES block number, not the byte offset. In other words, you need to have:That's needed because in AES CTR mode you increase the value each time you cross the AES data boundary (16 bytes).
That also means that the correct sequence for any
write
operation roughly is:self.tell() % 16
bytes of any data and throw away the result.Similarly for reading:
self.tell() % 16
bytes of any data and throw away the result.From a quick look at the code, a second problem could also be that you use the same AES object for both encryption and decryption. You need two separate objects, one for each direction.