Source code for pagesign

# -*- coding: utf-8 -*-
#
# Copyright (C) 2021 Red Dove Consultants Limited
#
import base64
import functools
import json
import logging
import os
import re
import shutil
# import stat
import subprocess
import sys
import tempfile
import threading

__version__ = '0.1.0'
__author__ = 'Vinay Sajip'
__date__ = "$05-Dec-2021 12:39:53$"

if sys.version_info[:2] < (3, 6):
    raise ImportError('This module requires Python >= 3.6 to run.')

logger = logging.getLogger(__name__)

__all__ = [
    'Identity',
    'remove_identities',
    'clear_identities',
    'list_identities',
    'encrypt',
    'decrypt',
    'sign',
    'verify',
    'encrypt_and_sign',
    'verify_and_decrypt'
]

if os.name == 'nt':
    PAGESIGN_DIR = os.path.join(os.environ['LOCALAPPDATA'], 'pagesign')
else:
    PAGESIGN_DIR = os.path.expanduser('~/.pagesign')

CREATED_PATTERN = re.compile('# created: (.*)', re.I)
APK_PATTERN = re.compile('# public key: (.*)', re.I)
ASK_PATTERN = re.compile(r'AGE-SECRET-KEY-.*')
MPI_PATTERN = re.compile(r'minisign public key (\S+)')

if not os.path.exists(PAGESIGN_DIR):
    os.makedirs(PAGESIGN_DIR)

if not os.path.isdir(PAGESIGN_DIR):
    raise ValueError('%s exists but is not a directory.' % PAGESIGN_DIR)

os.chmod(PAGESIGN_DIR, 0o700)


def _load_keys():
    result = {}
    p = os.path.join(PAGESIGN_DIR, 'keys')
    if os.path.exists(p):
        with open(p, encoding='utf-8') as f:
            result = json.load(f)
    return result


def _save_keys(keys):
    p = os.path.join(PAGESIGN_DIR, 'keys')
    with open(p, 'w', encoding='utf-8') as f:
        json.dump(keys, f, indent=2, sort_keys=True)
    os.chmod(p, 0o600)


KEYS = _load_keys()

PUBLIC_ATTRS = ('created', 'crypt_public', 'sign_public', 'sign_id')

ATTRS = PUBLIC_ATTRS + ('crypt_secret', 'sign_secret', 'sign_pass')


def clear_identities(keys=KEYS):
    if len(keys):
        keys.clear()
        _save_keys(keys)


def remove_identities(*args):
    changed = False
    for name in args:
        if name in KEYS:
            del KEYS[name]
            changed = True
    if changed:
        _save_keys(KEYS)


def list_identities():
    return KEYS.items()


def _make_password(length):
    return base64.b64encode(os.urandom(length)).decode('ascii')


def _read_out(stream, result):
    data = b''
    while True:
        c = stream.read1(100)
        if not c:
            break
        data += c
    result['stdout'] = data


def _read_age_encrypt_err(passphrase, stream, stdin, result):
    data = b''
    pwd = (passphrase + os.linesep).encode('ascii')
    pwd_written = 0
    sep = os.linesep.encode('ascii')
    prompt1 = b'Enter passphrase (leave empty to autogenerate a secure one): '
    prompt2 = prompt1 + sep + b'Confirm passphrase: '
    prompts = (prompt1, prompt2)
    while True:
        c = stream.read1(100)
        data += c
        # print('err: %s' % data)
        if data in prompts:
            stdin.write(pwd)
            stdin.flush()
            pwd_written += 1
            if pwd_written == 2:
                stdin.close()
                break
    result['stderr'] = data


def _read_age_decrypt_err(passphrase, stream, stdin, result):
    data = b''
    pwd = (passphrase + os.linesep).encode('ascii')
    while True:
        c = stream.read1(100)
        data += c
        # print('err: %s' % data)
        if data == b'Enter passphrase: ':
            stdin.write(pwd)
            stdin.flush()
            stdin.close()
            break
    result['stderr'] = data


def _run_command(cmd, wd, err_reader=None):
    # print('Running: %s' % (cmd if isinstance(cmd, str) else ' '.join(cmd)))
    # if cmd[0] == 'age': import pdb; pdb.set_trace()
    if not isinstance(cmd, list):
        cmd = cmd.split()
    logger.debug('Running: %s' % cmd)
    kwargs = {
        'cwd': wd,
        'stdout': subprocess.PIPE,
        'stderr': subprocess.PIPE
    }
    if err_reader:
        kwargs['stdin'] = subprocess.PIPE
    p = subprocess.Popen(cmd, **kwargs)
    if err_reader is None:
        stdout, stderr = p.communicate()
    else:
        data = {}
        rout = threading.Thread(target=_read_out, args=(p.stdout, data))
        rout.daemon = True
        rout.start()

        rerr = threading.Thread(target=err_reader, args=(p.stderr, p.stdin, data))
        rerr.daemon = True
        rerr.start()

        rout.join()
        rerr.join()

        p.wait()

        stdout = data['stdout']
        stderr = data['stderr']

        p.stdout.close()
        p.stderr.close()

    if p.returncode == 0:
        return stdout.decode('utf-8'), stderr.decode('utf-8')
    else:
        # import pdb; pdb.set_trace()
        if False:
            print('Command %r failed with return code %d' % (cmd[0], p.returncode))
            print('stdout was:')
            if stdout:
                print(stdout.decode('utf-8'))
            print('stderr was:')
            if stderr:
                print(stderr.decode('utf-8'))
            print('Raising an exception')
        raise subprocess.CalledProcessError(p.returncode, p.args,
                                            output=stdout, stderr=stderr)


[docs]class Identity: encoding = 'utf-8'
[docs] def __init__(self, name=None): if name: if name in KEYS: self.__dict__.update(KEYS[name]) else: raise ValueError('No such identity: %r' % name) else: # Generate a new identity wd = tempfile.mkdtemp(dir=PAGESIGN_DIR, prefix='work-') p = os.path.join(wd, 'age-key') cmd = 'age-keygen -o %s' % p _run_command(cmd, wd) with open(p, encoding=self.encoding) as f: lines = f.read().splitlines() for line in lines: m = CREATED_PATTERN.match(line) if m: self.created = m.groups()[0] continue m = APK_PATTERN.match(line) if m: self.crypt_public = m.groups()[0] continue m = ASK_PATTERN.match(line) if m: self.crypt_secret = line fd, sfn = tempfile.mkstemp(prefix='msk-', dir=wd) os.close(fd) fd, pfn = tempfile.mkstemp(prefix='mpk-', dir=wd) os.close(fd) self.sign_pass = _make_password(12) cmd = 'minisign -fG -p %s -s %s' % (pfn, sfn) _run_command(cmd, wd, self._read_minisign_gen_err) with open(pfn, encoding=self.encoding) as f: lines = f.read().splitlines() for line in lines: m = MPI_PATTERN.search(line) if m: self.sign_id = m.groups()[0] else: self.sign_public = line with open(sfn, encoding=self.encoding) as f: self.sign_secret = f.read() shutil.rmtree(wd) for attr in ATTRS: assert hasattr(self, attr)
[docs] def save(self, name): d = dict(self.__dict__) # might need to remove some attrs from d here ... KEYS[name] = d _save_keys(KEYS)
def _read_minisign_gen_err(self, stream, stdin, result): data = b'' pwd = (self.sign_pass + os.linesep).encode('ascii') pwd_written = 0 sep = os.linesep.encode('ascii') prompt1 = b'Password: ' prompt2 = prompt1 + sep + b'Password (one more time): ' prompts = (prompt1, prompt2) while True: c = stream.read1(100) data += c # print('err: %s' % data) if data in prompts: stdin.write(pwd) stdin.flush() pwd_written += 1 # print('Wrote pwd') if pwd_written == 2: stdin.close() break result['stderr'] = data def _read_minisign_sign_err(self, stream, stdin, result): data = b'' pwd = (self.sign_pass + os.linesep).encode('ascii') while True: c = stream.read(1) data += c # print('err: %s' % data) if data == b'Password: ': stdin.write(pwd) stdin.close() break result['stderr'] = data
[docs] def export(self): d = dict(self.__dict__) for k in self.__dict__: if '_secret' in k or '_pass' in k: del d[k] return d
[docs] @classmethod def imported(cls, d, name): result = object.__new__(cls) for k in PUBLIC_ATTRS: try: setattr(result, k, d[k]) except KeyError: logger.warning('Attribute absent: %s', k) result.save(name) return result
[docs]def encrypt(path, recipients, outpath=None, armor=True): if not recipients: raise ValueError('At least one recipient needs to be specified.') if not os.path.isfile(path): raise ValueError('No such file: %s' % path) if outpath is None: outpath = '%s.age' % path else: d = os.path.dirname(outpath) if not os.path.exists(d): os.makedirs(d) elif not os.path.isdir(d): raise ValueError('Not a directory: %s' % d) # if dir, assume writeable, for now cmd = ['age', '-e'] if armor: cmd.append('-a') if isinstance(recipients, str): recipients = [recipients] if not isinstance(recipients, (list, tuple)): raise ValueError('invalid recipients: %s' % recipients) for r in recipients: if r not in KEYS: raise ValueError('No such recipient: %s' % r) info = KEYS[r] cmd.extend(['-r', info['crypt_public']]) cmd.extend(['-o', outpath]) cmd.append(path) _run_command(cmd, os.getcwd()) return outpath
[docs]def decrypt(path, identities, outpath=None): if not identities: raise ValueError('At least one identity needs to be specified.') if not os.path.isfile(path): raise ValueError('No such file: %s' % path) if outpath is None: if path.endswith('.age'): outpath = path[:-4] else: outpath = '%s.dec' % path else: d = os.path.dirname(outpath) if not os.path.exists(d): os.makedirs(d) elif not os.path.isdir(d): raise ValueError('Not a directory: %s' % d) # if dir, assume writeable, for now cmd = ['age', '-d'] if isinstance(identities, str): identities = [identities] if not isinstance(identities, (list, tuple)): raise ValueError('invalid identities: %s' % identities) fd, fn = tempfile.mkstemp(dir=PAGESIGN_DIR, prefix='ident-') os.close(fd) ident_values = [] for ident in identities: if ident not in KEYS: raise ValueError('No such identity: %s' % ident) ident_values.append(KEYS[ident]['crypt_secret']) with open(fn, 'w', encoding='utf-8') as f: f.write('\n'.join(ident_values)) cmd.extend(['-i', fn]) # import pdb; pdb.set_trace() try: cmd.extend(['-o', outpath]) cmd.append(path) _run_command(cmd, os.getcwd()) return outpath finally: os.remove(fn)
[docs]def sign(path, identity, outpath=None): if not identity: raise ValueError('An identity needs to be specified.') if identity not in KEYS: raise ValueError('No such identity: %s' % identity) ident = Identity(identity) if not os.path.isfile(path): raise ValueError('No such file: %s' % path) if outpath is None: outpath = '%s.sig' % path else: d = os.path.dirname(outpath) if not os.path.exists(d): os.makedirs(d) elif not os.path.isdir(d): raise ValueError('Not a directory: %s' % d) # if dir, assume writeable, for now fd, fn = tempfile.mkstemp(dir=PAGESIGN_DIR, prefix='seckey-') os.write(fd, (KEYS[identity]['sign_secret'] + os.linesep).encode('ascii')) os.close(fd) try: cmd = ['minisign', '-S', '-x', outpath, '-s', fn, '-m', path] _run_command(cmd, os.getcwd(), ident._read_minisign_sign_err) finally: os.remove(fn) return outpath
[docs]def verify(path, identity, sigpath=None): if not identity: raise ValueError('An identity needs to be specified.') if identity not in KEYS: raise ValueError('No such identity: %s' % identity) ident = Identity(identity) if not os.path.isfile(path): raise ValueError('No such file: %s' % path) if sigpath is None: sigpath = '%s.sig' % path if not os.path.isfile(sigpath): raise ValueError('No such file: %s' % sigpath) cmd = ['minisign', '-V', '-x', sigpath, '-P', ident.sign_public, '-m', path] # import pdb; pdb.set_trace() _run_command(cmd, os.getcwd())
[docs]def encrypt_and_sign(path, recipients, signer, armor=True, outpath=None, sigpath=None): if not recipients or not signer: raise ValueError('At least one recipient and one signer needs to be specified.') if not os.path.isfile(path): raise ValueError('No such file: %s' % path) outpath = encrypt(path, recipients, outpath=outpath, armor=armor) sigpath = sign(outpath, signer, outpath=sigpath) return outpath, sigpath
[docs]def verify_and_decrypt(path, recipients, signer, outpath=None, sigpath=None): if not signer or not recipients: raise ValueError('At least one recipient and one signer needs to be specified.') if not os.path.isfile(path): raise ValueError('No such file: %s' % path) if sigpath is None: sigpath = path + '.sig' verify(path, signer, sigpath) return decrypt(path, recipients, outpath)