#!/usr/bin/env python3 from flask import Flask, request, abort, make_response, \ render_template, redirect, url_for, \ json, jsonify, session, flash from collections import namedtuple, OrderedDict from subprocess import check_call, call from fcntl import flock, LOCK_EX, LOCK_SH, LOCK_UN import hashlib import random import shutil import time import glob import sys import re import os #app = Flask("ISABEL-2 Verifier") # That app name breaks Ubuntu 14.04 :-o app = Flask("main") app.secret_key = "6ab77f3c45447429c2ae163c260a626029519a66450e474c" users_file = "/etc/freeradius/dpto2/users" dhcp_hosts_file = "/etc/dnsmasq.d/dpto2/dhcp-hosts" dhcp_opts_file = "/etc/dnsmasq.d/dpto2/dhcp-opts" dnsmasq_leases = "/var/lib/misc/dnsmasq.leases" impossible_mac = "ff:ff:ff:ff:ff:fe" class Lock: def __init__(self, lockfile): self.lockfile = lockfile def __enter__(self): flock(self.lockfile, LOCK_EX) def __exit__(self, exc_type, exc_val, exc_tb): flock(self.lockfile, LOCK_UN) class SharedLock(Lock): def __init__(self, lockfile): super().__init__(lockfile) def __enter__(self, exc_type, exc_val, exc_tb): flock(self.lockfile, LOCK_SH) lock_file = open("/run/lock/dpto2.lock","w") exclusive_lock = Lock(lock_file) shared_lock = SharedLock(lock_file) def reload_freeradius(): call("./sendhup freeradius".split()) def reload_dnsmasq(): call("./sendhup dnsmasq".split()) def nthash(password): return hashlib.new('md4',password.encode('utf-16le')).hexdigest().upper() def create_user(username, password, creator): with exclusive_lock: f = open(users_file,"a") f.write('# created by {}\n{} NT-Password := "{}"\n'.format(creator, username, nthash(password))) f.close() reload_freeradius() def delete_user(deluser): f = open(users_file) lines = f.readlines() f.close() timestamp = time.time() tempfile = "{}.{}".format(users_file, timestamp) f = open(tempfile,"w") createdby = None for line in lines: if re.search("#\s+created by", line): createdby = line continue if line.startswith(deluser): createdby = None continue if line.startswith("#"): f.write(line) else: if createdby: f.write(createdby) f.write(line) createdby = None f.close() rename(tempfile, users_file) reload_freeradius() def load_users(): users = [] creator = None with open(users_file) as f: for l in f: l = l.strip() if l.startswith("#"): m = re.match("#\s+created by\s+(\S+)", l) if m: creator = m.group(1) continue m = re.match("(^\S+).*-Password\s+:=\s+\"(\S+)\"", l) if m: users.append((m.group(1), m.group(2), creator)) creator = None return users def login_required(f): def g(*args, **kwargs): if not session.get('logged_in', False): return redirect(url_for('login', redirect_to=request.url)) return f(*args, **kwargs) g.__name__ = f.__name__ return g @app.route("/login",methods=['GET','POST']) def login(): if session.get('logged_in',False): return redirect(url_for(request.args.get('redirect_to','admin'))) if request.method == 'GET': return render_template("login.html") if request.method == 'POST': username = request.form.get("username",None) password = request.form.get("password",None) if username is None or password is None: return render_template("login.html",error=True,errormsg="invalid username or password") if username == 'guest': return render_template("login.html",error=True,errormsg="guest user has no admin privileges") try: check_login(username, password) except Exception as e: return render_template("login.html",error=True,errormsg=str(e)) session['logged_in'] = True session['username'] = username return redirect(request.args.get('redirect_to','admin')) @app.route("/") def index(): f = open(users_file) guestpass = "?" for line in f: if line.startswith("guest"): m = re.search(':=\s+"(.+?)"\s*$',line) if m: guestpass = m.group(1) break return render_template("index.html", guestpass=guestpass) def check_login(username, password): for u,p,c in load_users(): if u == username and p == nthash(password): return True raise ValueError("Invalid username or password") @app.route("/admin",methods=['GET','POST']) @login_required def admin(): if request.method == 'POST': deluser = request.form.get('deluser',None) if deluser is not None: if deluser == 'guest': return render_template("admin.html", delete_error=True, errormsg="Cannot delete guest user") delete_user(deluser) flash("User deleted succesfully") username = request.form.get('username',None) pass1 = request.form.get('password1',None) pass2 = request.form.get('password2',None) creator = session['username'] if username is not None: if username == 'guest': return render_template("admin.html", create_error=True, errormsg="Cannot create guest user") if pass1 is None or \ pass2 is None or \ pass1 != pass2: return render_template("admin.html", create_error=True, errormsg="Passwords do not match") if pass1 == '': return render_template("admin.html", create_error=True, errormsg="Password cannot be empty") create_user(username,pass1,creator) flash("User created successfully") return render_template("admin.html") def render_users_tree(tree): lines = [] def _render(user, edges, is_last): lines.append((edges + (" └─" if is_last else " ├─"), user)) n = len(tree[user]) for i,u in enumerate(tree[user]): _render(u, edges + (" " if is_last else " │ "), i == n - 1) return lines # root = tree(""); n = len(root) # for i,u in enumerate(root): _render("", "", True) return lines @app.route("/users") @login_required def list_users(): users = load_users() users_set = set(x[0] for x in users) tree = { "": [] } for user,passwd,creator in users: if user == "guest": continue tree[user] = [] if creator is None: tree[""].append(user) else: if creator not in users_set: creator += " (invalid username)" tree[""].append(creator) tree[creator] = tree.get(creator,[]) + [user] lines = render_users_tree(tree) return render_template("users.html", users=lines) mac_re = re.compile("(?:[0-9a-fA-F]{2}[:-]){5}[0-9a-fA-F]{2}") ip_re_str = "\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}" ip_re = re.compile(ip_re_str) DhcpHostLine = namedtuple("DhcpHostLine","macs ip tag") def parse_dchp_host(line): macs = [] ip = "" tag = None parts = line.split(",") for p in parts: p = p.strip() m = mac_re.match(p) if m: if m.group(0) != impossible_mac: macs.append(m.group(0)) continue m = ip_re.match(p) if m: ip = m.group(0) continue if p.startswith("set:"): tag = p[4:] return DhcpHostLine(macs, ip, tag) Ip = namedtuple("Ip","reserved_by dhcp_pool locked macs gw color") def load_ips(): ipmap = {} for i in range(1,255): ipmap[i] = Ip( reserved_by="", dhcp_pool=True, locked=False, macs=[], gw=None, color=None ) ipmap[0] = ipmap[255] = Ip( reserved_by="", dhcp_pool=False, locked=False, macs=[], gw=None, color=None ) prefix = None # Read custom gateways gws = {} with open(dhcp_opts_file) as f: for line in f: line = line.strip() if line.startswith("#"): continue m = re.match( "tag:(client[0-9a-f]{12}),option:router,(" + ip_re_str + ")", line ) if m: gws[m.group(1)] = m.group(2) with open(dhcp_hosts_file) as f: meta = {} for line in f: line = line.strip() if line.startswith("#"): try: meta = json.loads(line[1:]) except: meta = {} continue if not isinstance(meta,dict): meta = {} continue if re.match("^[0-9a-fA-F]{2}:",line): r = parse_dchp_host(line) ip = int(r.ip.split(".")[-1]) if prefix is None: prefix = ".".join(r.ip.split(".")[:-1]) ipmap[ip] = Ip( reserved_by=meta.get("reserved-by",""), dhcp_pool=False, locked=meta.get("locked",False), macs=r.macs, gw=gws.get(r.tag, None), color = meta.get("color", None) ) meta = {} ipmap["prefix"] = prefix return ipmap def load_leases(): leases = {} with open(dnsmasq_leases) as f: for line in f: parts = line.strip().split() try: ip = int(parts[2].split(".")[-1]) except IndexError: continue except ValueError: continue try: lease = [ int(parts[0]), # expiration timestamp parts[1], # client mac address parts[2], # leased ip parts[3], # host name or * if no name sent parts[4], # client id or * if no id sent ] except IndexError: # line is shorter than I thought, skippping continue leases[ip] = lease return leases @app.route("/ips") @login_required def ips(): ipmap = load_ips() leases = load_leases() return render_template("ips.html", ipmap=ipmap, leases=leases) def parse_macs(macs): r = [] macs = macs.strip() parts = macs.split(',') for p in parts: p = p.strip() m = mac_re.match(p) if m: r.append(m.group(0).replace("-",":")) return r def rename(src, dst): shutil.move(src, dst) shutil.copy(dst, src) copies = glob.glob(dst + ".[0-9]*") # sort from most recent to oldest copies = sorted(copies, key=lambda x: -float(x[len(dst)+1:])) # keep only the most recent 5 for c in copies[5:]: print("Deleting", c) os.remove(c) def write_dhcp_files(ipmap): timestamp = time.time() tempfile = "{}.{}".format(dhcp_hosts_file, timestamp) with open(tempfile,"w") as f: prefix = ipmap["prefix"] gws = {} for i in range(1,255): ip = ipmap[i] if ip.dhcp_pool: continue if ip.reserved_by or ip.locked: meta = OrderedDict() if ip.reserved_by: meta['reserved-by'] = ip.reserved_by if ip.locked: meta['locked'] = True if ip.color: meta['color'] = ip.color print("# " + json.dumps(meta), file=f) if len(ip.macs) == 0: macs = impossible_mac else: macs = ",".join(ip.macs) settag = "" if ip.gw and len(ip.macs) > 0: tag = "client" + ip.macs[0].replace(":","").lower() settag = ",set:" + tag gws[tag] = ip.gw print("{},{}.{}{}".format(macs,prefix,i,settag), file=f) rename(tempfile, dhcp_hosts_file) # Write custom gateways file tempfile = "{}.{}".format(dhcp_opts_file, timestamp) with open(tempfile,"w") as f: for tag in gws: print( "tag:{},option:router,{}".format(tag,gws[tag]), file=f, ) rename(tempfile, dhcp_opts_file) @app.route("/ip/", methods=['GET', 'POST']) @login_required def ip(last_byte): valid_colors = "black red gold green blue purple".split() if request.method == 'POST': #print(request.form) reserved_by = request.form.get('reserved-by', '')[:100] dhcp_pool = request.form.get('dhcp-client', '') == 'pool' macs = parse_macs(request.form.get('dhcp-client-mac', '')) gw = request.form.get('dhcp-gw','')[:15] gw = gw if ip_re.match(gw) else None color = request.form.get('label-color', '')[:20] color = color if color in valid_colors else None with exclusive_lock: ipmap = load_ips() if not ipmap[last_byte].locked: ipmap[last_byte] = Ip( reserved_by=reserved_by, dhcp_pool=dhcp_pool, locked=False, macs=macs, gw=gw, color=color ) write_dhcp_files(ipmap) reload_dnsmasq() return redirect(url_for('ips')) else: ipmap = load_ips() leases = load_leases() lease = None try: if last_byte in leases: lease = leases[last_byte] today = time.localtime().tm_wday expiry_t = time.localtime(int(lease[0])) expiry_day = expiry_t.tm_wday expiry_hour = time.strftime("%I:%M %p", expiry_t) if today == expiry_day: lease[0] = "las " + expiry_hour elif (today + 1) % 7 == expiry_day: lease[0] = "ma\N{LATIN SMALL LETTER N WITH TILDE}ana a las " + expiry_hour else: lease[0] = time.strftime("%B %d, ", expiry_t) + expiry_hour except ValueError: pass except IndexError: pass addr = "{}.{}".format(ipmap["prefix"], last_byte) return render_template( "ip.html", addr=addr, valid_colors=valid_colors, meta=ipmap[last_byte], lease=lease, ) @app.route("/logout") def logout(): session.pop("logged_in",None) return redirect(url_for("index")) if __name__ == '__main__': if "debug" in sys.argv: users_file = "users.dpto2" dhcp_hosts_file = "dhcp-hosts.dpto2" dhcp_opts_file = "dhcp-opts.dpto2" app.debug = True app.run(host="0.0.0.0")