Fichier voteano.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Server for anonymous votes
   (Simple proof of concept)

Emmanuel Viennet, may 2009
"""

import sys, re, pdb, random, smtplib, urlparse
from cgi import parse_qs
from sys import stderr
from email.MIMEMultipart import MIMEMultipart
from email.MIMEText import MIMEText
from email.Header import Header

from votetexts import *

# Parameters:
EID_NBITS = 80 # nb of bits in generated random eid
SMTP_HOST = 'localhost'
SERVER_URL = 'http://copiapo.viennet.net'
SERVER_PORT = 1848
SERVER_BASE = '' # additional path component if necessary

FROM_ADDR = 'emmanuel.viennet@univ-paris13.fr' # origin of messages

# BASE URL of our pages:
BASE_URL =  SERVER_URL + ':' + str(SERVER_PORT) + SERVER_BASE

def get_electors():
    """returns list of emails addresses, in random order"""
    R = []
    ln = 0
    filename = 'electors.txt'
    for line in open(filename):
        ln += 1
        line = line.strip()
        if line and line[0] != '#':
            if not is_valid_mail(line):
                raise ValueError('invalid email on line %d of %s' % (ln,filename))
            R.append(line)
    random.shuffle(R)
    return R

def is_valid_mail(email):
    """True if well-formed email address"""
    return re.match( "^.+@.+\..{2,3}$", email)

def generate_eid():
    """generate a random number (eid)
    """
    return str(random.getrandbits(EID_NBITS))

def setup_new_vote():
    """Setup new vote.
    Send eid to each elector.
    Create files votes.txt and log.txt
    """
    eids = []
    for elector in get_electors():
        eid = generate_eid()
        send_ballot(elector, eid)
        eids.append(eid)

    write_votes( {}.fromkeys(eids, []) )


def send_ballot(elector, eid):
    """Send an email to elector with his private number (eid)."""

    txt = BALLOT_MESSAGE % { 'elector' : elector, 'eid' : eid, 'url' : BASE_URL }
    #
    msg = MIMEMultipart()
    subj = Header( '[VOTE]:',  TXT_ENCODING )
    msg['Subject'] = subj
    msg['To'] = elector
    msg['From'] = FROM_ADDR
    msg.epilogue = ''
    msg.attach(MIMEText( txt, 'plain', TXT_ENCODING ))
    send_email( FROM_ADDR, elector, msg)


def send_email( from_addr, to_addr, msg):
    """Send an email"""
    try:
        server = smtplib.SMTP( SMTP_HOST )
        server.sendmail( from_addr, to_addr, msg.as_string() )
        stderr.write('mail sent to %s\n' % to_addr)
    except:
        stderr.write('an exception occurred sending mail to %s\n' % to_addr)
        raise # or         sys.exit(1)

def load_votes():
    """Returns votes as dict { eid : [ vote ] }
    """
    V = {}
    for line in open('votes.txt'):
        eid, vote = line.split('\t')
        votes = [ x for x in vote.strip().split(',') if x ]
        V[eid] = votes
    return V

def write_votes(V):
    "Write votes to file."
    f = open('votes.txt', 'w')
    eids = V.keys()
    # Write eids (shuffled, no correlation between mails orders and votes)
    random.shuffle(eids)
    for eid in eids:
        vote = ','.join([str(x) for x in V[eid]])
        f.write('%s\t%s\n' % (eid, vote))
    f.close()

def print_vote_result():
    "print vote result to stdout"
    V = load_votes()
    r = {} # { option : nb ballots }
    for eid in V:
        for v in V[eid]:
            if v in r:
                r[v] += 1
            else:
                r[v] = 1
    #
    ov = r.items()
    ov.sort(lambda x,y: cmp(y[1],x[1]))
    print 'Results of elections'
    for (option, ballots) in ov:
        print '%s\t%s' % (option, ballots)

# ------------------------------------------------------------------------
#        THE WEB SERVER
# ------------------------------------------------------------------------

from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler

class MyHandler(SimpleHTTPRequestHandler):
    def do_GET(self):
        print 'request'
        try:
            r = urlparse.urlsplit(self.path)
            method = r.path.strip('/')
            v = parse_qs(r.query)
            #
            if method == 'jquery-1.3.2.min.js':
                f = self.send_head()
                if f:
                    self.copyfile(f, self.wfile)
                    f.close()
                    return
            # all methods need a valid eid:
            eid = v.get( 'eid', [None])[0]
            V = load_votes()
            if not eid in V:
                return self.send_error(404, 'invalid code')

            if method == 'vote':
                self.vote(eid,v,V)
            elif method == '':
                self.send_text( HTML_VOTE_FORM % { 'eid' : eid },
                                head_script=SCRIPT_VOTE)
            else:
                print 'method=%s' % method
                self.send_error(404, 'Not Found')
        except:
            self.send_error(500, 'Server error')
            raise # debug

    def log_message(self, format, *args):
        """disable all server logging"""
        pass

    def send_text(self, text, head_script=''):
        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()
        self.wfile.write( HTML_HEADER + head_script + '</head>')
        self.wfile.write( text )
        self.wfile.write( HTML_FOOTER )

    def vote(self, eid, v, V):
        vote = v.get('vote', None)
        # debug: print 'vote: eid=%s vote=%s' % (eid, vote)
        if V[eid]:
            # already voted
            pass # allow changing vote for now
        # check vote value
        if not vote:
            return self.send_error(404, 'vote invalide')
        try:
            vote = [ int(x) for x in vote ]
            V[eid] = vote
        except:
            return self.send_error(404, 'vote invalide')

        write_votes(V)

        nb_valid_votes = len([ x for x in V.values() if x ])
        self.send_text( """<h2>Votre vote a été pris en compte</h2>
        <p>Votes enregistrés: %d sur %d électeurs inscrits.</p>
        """ % (nb_valid_votes, len(V)))


def server_main():
    try:
        server = HTTPServer(('', SERVER_PORT), MyHandler)
        print 'started httpserver on port tcp/%s...' %  SERVER_PORT
        server.serve_forever()
    except KeyboardInterrupt:
        print '^C received, shutting down server'
        server.socket.close()

# -------------------------
from optparse import OptionParser
parser = OptionParser()
parser.add_option('-n', '--new', help='launch new elections',
                  action="store_true")

parser.add_option('-r', '--results', help='Print election results',
                  action="store_true")

(options, args) = parser.parse_args()

if __name__ == '__main__':
    if options.results:
        print_vote_result()
        exit(0)
    if options.new:
        stderr.write('Initializing new election\n')
        setup_new_vote()
    server_main()