481 lines
15 KiB
Python
Executable file
481 lines
15 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
from flask import Flask, request, abort, make_response, \
|
|
render_template, redirect, url_for, \
|
|
json, jsonify, session, flash
|
|
from collections import namedtuple, OrderedDict
|
|
from subprocess import check_call, call
|
|
from fcntl import flock, LOCK_EX, LOCK_SH, LOCK_UN
|
|
|
|
import hashlib
|
|
import random
|
|
import shutil
|
|
import time
|
|
import glob
|
|
import sys
|
|
import re
|
|
import os
|
|
|
|
#app = Flask("ISABEL-2 Verifier") # That app name breaks Ubuntu 14.04 :-o
|
|
app = Flask("main")
|
|
app.secret_key = "6ab77f3c45447429c2ae163c260a626029519a66450e474c"
|
|
|
|
users_file = "/etc/freeradius/dpto2/users"
|
|
dhcp_hosts_file = "/etc/dnsmasq.d/dpto2/dhcp-hosts"
|
|
dhcp_opts_file = "/etc/dnsmasq.d/dpto2/dhcp-opts"
|
|
dnsmasq_leases = "/var/lib/misc/dnsmasq.leases"
|
|
impossible_mac = "ff:ff:ff:ff:ff:fe"
|
|
|
|
class Lock:
|
|
def __init__(self, lockfile):
|
|
self.lockfile = lockfile
|
|
|
|
def __enter__(self):
|
|
flock(self.lockfile, LOCK_EX)
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
flock(self.lockfile, LOCK_UN)
|
|
|
|
class SharedLock(Lock):
|
|
def __init__(self, lockfile):
|
|
super().__init__(lockfile)
|
|
|
|
def __enter__(self, exc_type, exc_val, exc_tb):
|
|
flock(self.lockfile, LOCK_SH)
|
|
|
|
lock_file = open("/run/lock/dpto2.lock","w")
|
|
exclusive_lock = Lock(lock_file)
|
|
shared_lock = SharedLock(lock_file)
|
|
|
|
def reload_freeradius():
|
|
call("./sendhup freeradius".split())
|
|
|
|
def reload_dnsmasq():
|
|
call("./sendhup dnsmasq".split())
|
|
|
|
def nthash(password):
|
|
return hashlib.new('md4',password.encode('utf-16le')).hexdigest().upper()
|
|
|
|
def create_user(username, password, creator):
|
|
with exclusive_lock:
|
|
f = open(users_file,"a")
|
|
f.write('# created by {}\n{} NT-Password := "{}"\n'.format(creator, username, nthash(password)))
|
|
f.close()
|
|
reload_freeradius()
|
|
|
|
def delete_user(deluser):
|
|
f = open(users_file)
|
|
lines = f.readlines()
|
|
f.close()
|
|
timestamp = time.time()
|
|
tempfile = "{}.{}".format(users_file, timestamp)
|
|
f = open(tempfile,"w")
|
|
createdby = None
|
|
for line in lines:
|
|
if re.search("#\s+created by", line):
|
|
createdby = line
|
|
continue
|
|
if line.startswith(deluser):
|
|
createdby = None
|
|
continue
|
|
if line.startswith("#"):
|
|
f.write(line)
|
|
else:
|
|
if createdby:
|
|
f.write(createdby)
|
|
f.write(line)
|
|
createdby = None
|
|
f.close()
|
|
rename(tempfile, users_file)
|
|
reload_freeradius()
|
|
|
|
def load_users():
|
|
users = []
|
|
creator = None
|
|
with open(users_file) as f:
|
|
for l in f:
|
|
l = l.strip()
|
|
if l.startswith("#"):
|
|
m = re.match("#\s+created by\s+(\S+)", l)
|
|
if m:
|
|
creator = m.group(1)
|
|
continue
|
|
m = re.match("(^\S+).*-Password\s+:=\s+\"(\S+)\"", l)
|
|
if m:
|
|
users.append((m.group(1), m.group(2), creator))
|
|
creator = None
|
|
return users
|
|
|
|
def login_required(f):
|
|
def g(*args, **kwargs):
|
|
if not session.get('logged_in', False):
|
|
return redirect(url_for('login', redirect_to=request.url))
|
|
return f(*args, **kwargs)
|
|
g.__name__ = f.__name__
|
|
return g
|
|
|
|
@app.route("/login",methods=['GET','POST'])
|
|
def login():
|
|
if session.get('logged_in',False):
|
|
return redirect(url_for(request.args.get('redirect_to','admin')))
|
|
if request.method == 'GET':
|
|
return render_template("login.html")
|
|
if request.method == 'POST':
|
|
username = request.form.get("username",None)
|
|
password = request.form.get("password",None)
|
|
|
|
if username is None or password is None:
|
|
return render_template("login.html",error=True,errormsg="invalid username or password")
|
|
|
|
if username == 'guest':
|
|
return render_template("login.html",error=True,errormsg="guest user has no admin privileges")
|
|
|
|
try:
|
|
check_login(username, password)
|
|
except Exception as e:
|
|
return render_template("login.html",error=True,errormsg=str(e))
|
|
|
|
session['logged_in'] = True
|
|
session['username'] = username
|
|
return redirect(request.args.get('redirect_to','admin'))
|
|
|
|
@app.route("/")
|
|
def index():
|
|
f = open(users_file)
|
|
guestpass = "?"
|
|
for line in f:
|
|
if line.startswith("guest"):
|
|
m = re.search(':=\s+"(.+?)"\s*$',line)
|
|
if m:
|
|
guestpass = m.group(1)
|
|
break
|
|
|
|
return render_template("index.html", guestpass=guestpass)
|
|
|
|
def check_login(username, password):
|
|
for u,p,c in load_users():
|
|
if u == username and p == nthash(password):
|
|
return True
|
|
raise ValueError("Invalid username or password")
|
|
|
|
@app.route("/admin",methods=['GET','POST'])
|
|
@login_required
|
|
def admin():
|
|
if request.method == 'POST':
|
|
deluser = request.form.get('deluser',None)
|
|
if deluser is not None:
|
|
if deluser == 'guest':
|
|
return render_template("admin.html", delete_error=True, errormsg="Cannot delete guest user")
|
|
delete_user(deluser)
|
|
flash("User deleted succesfully")
|
|
|
|
username = request.form.get('username',None)
|
|
pass1 = request.form.get('password1',None)
|
|
pass2 = request.form.get('password2',None)
|
|
creator = session['username']
|
|
if username is not None:
|
|
if username == 'guest':
|
|
return render_template("admin.html", create_error=True, errormsg="Cannot create guest user")
|
|
if pass1 is None or \
|
|
pass2 is None or \
|
|
pass1 != pass2:
|
|
return render_template("admin.html", create_error=True, errormsg="Passwords do not match")
|
|
if pass1 == '':
|
|
return render_template("admin.html", create_error=True, errormsg="Password cannot be empty")
|
|
create_user(username,pass1,creator)
|
|
flash("User created successfully")
|
|
|
|
return render_template("admin.html")
|
|
|
|
def render_users_tree(tree):
|
|
lines = []
|
|
def _render(user, edges, is_last):
|
|
lines.append((edges + (" └─" if is_last else " ├─"), user))
|
|
n = len(tree[user])
|
|
for i,u in enumerate(tree[user]):
|
|
_render(u, edges + (" " if is_last else " │ "), i == n - 1)
|
|
return lines
|
|
# root = tree(""); n = len(root)
|
|
# for i,u in enumerate(root):
|
|
_render("", "", True)
|
|
return lines
|
|
|
|
@app.route("/users")
|
|
@login_required
|
|
def list_users():
|
|
users = load_users()
|
|
users_set = set(x[0] for x in users)
|
|
tree = { "": [] }
|
|
for user,passwd,creator in users:
|
|
if user == "guest":
|
|
continue
|
|
tree[user] = []
|
|
if creator is None:
|
|
tree[""].append(user)
|
|
else:
|
|
if creator not in users_set:
|
|
creator += " (invalid username)"
|
|
tree[""].append(creator)
|
|
tree[creator] = tree.get(creator,[]) + [user]
|
|
lines = render_users_tree(tree)
|
|
return render_template("users.html", users=lines)
|
|
|
|
mac_re = re.compile("(?:[0-9a-fA-F]{2}[:-]){5}[0-9a-fA-F]{2}")
|
|
ip_re_str = "\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}"
|
|
ip_re = re.compile(ip_re_str)
|
|
DhcpHostLine = namedtuple("DhcpHostLine","macs ip tag")
|
|
|
|
def parse_dchp_host(line):
|
|
macs = []
|
|
ip = ""
|
|
tag = None
|
|
parts = line.split(",")
|
|
for p in parts:
|
|
p = p.strip()
|
|
m = mac_re.match(p)
|
|
if m:
|
|
if m.group(0) != impossible_mac:
|
|
macs.append(m.group(0))
|
|
continue
|
|
m = ip_re.match(p)
|
|
if m:
|
|
ip = m.group(0)
|
|
continue
|
|
if p.startswith("set:"):
|
|
tag = p[4:]
|
|
return DhcpHostLine(macs, ip, tag)
|
|
|
|
Ip = namedtuple("Ip","reserved_by dhcp_pool locked macs gw color")
|
|
|
|
def load_ips():
|
|
ipmap = {}
|
|
for i in range(1,255):
|
|
ipmap[i] = Ip(
|
|
reserved_by="",
|
|
dhcp_pool=True,
|
|
locked=False,
|
|
macs=[],
|
|
gw=None,
|
|
color=None
|
|
)
|
|
ipmap[0] = ipmap[255] = Ip(
|
|
reserved_by="",
|
|
dhcp_pool=False,
|
|
locked=False,
|
|
macs=[],
|
|
gw=None,
|
|
color=None
|
|
)
|
|
prefix = None
|
|
|
|
# Read custom gateways
|
|
gws = {}
|
|
with open(dhcp_opts_file) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line.startswith("#"): continue
|
|
m = re.match(
|
|
"tag:(client[0-9a-f]{12}),option:router,(" + ip_re_str + ")",
|
|
line
|
|
)
|
|
if m:
|
|
gws[m.group(1)] = m.group(2)
|
|
|
|
with open(dhcp_hosts_file) as f:
|
|
meta = {}
|
|
for line in f:
|
|
line = line.strip()
|
|
if line.startswith("#"):
|
|
try:
|
|
meta = json.loads(line[1:])
|
|
except:
|
|
meta = {}
|
|
continue
|
|
if not isinstance(meta,dict):
|
|
meta = {}
|
|
continue
|
|
if re.match("^[0-9a-fA-F]{2}:",line):
|
|
r = parse_dchp_host(line)
|
|
ip = int(r.ip.split(".")[-1])
|
|
if prefix is None:
|
|
prefix = ".".join(r.ip.split(".")[:-1])
|
|
ipmap[ip] = Ip(
|
|
reserved_by=meta.get("reserved-by",""),
|
|
dhcp_pool=False,
|
|
locked=meta.get("locked",False),
|
|
macs=r.macs,
|
|
gw=gws.get(r.tag, None),
|
|
color = meta.get("color", None)
|
|
)
|
|
meta = {}
|
|
ipmap["prefix"] = prefix
|
|
return ipmap
|
|
|
|
def load_leases():
|
|
leases = {}
|
|
with open(dnsmasq_leases) as f:
|
|
for line in f:
|
|
parts = line.strip().split()
|
|
try:
|
|
ip = int(parts[2].split(".")[-1])
|
|
except IndexError:
|
|
continue
|
|
except ValueError:
|
|
continue
|
|
try:
|
|
lease = [
|
|
int(parts[0]), # expiration timestamp
|
|
parts[1], # client mac address
|
|
parts[2], # leased ip
|
|
parts[3], # host name or * if no name sent
|
|
parts[4], # client id or * if no id sent
|
|
]
|
|
except IndexError:
|
|
# line is shorter than I thought, skippping
|
|
continue
|
|
leases[ip] = lease
|
|
return leases
|
|
|
|
@app.route("/ips")
|
|
@login_required
|
|
def ips():
|
|
ipmap = load_ips()
|
|
leases = load_leases()
|
|
return render_template("ips.html", ipmap=ipmap, leases=leases)
|
|
|
|
def parse_macs(macs):
|
|
r = []
|
|
macs = macs.strip()
|
|
parts = macs.split(',')
|
|
for p in parts:
|
|
p = p.strip()
|
|
m = mac_re.match(p)
|
|
if m:
|
|
r.append(m.group(0).replace("-",":"))
|
|
return r
|
|
|
|
def rename(src, dst):
|
|
shutil.move(src, dst)
|
|
shutil.copy(dst, src)
|
|
copies = glob.glob(dst + ".[0-9]*")
|
|
# sort from most recent to oldest
|
|
copies = sorted(copies, key=lambda x: -float(x[len(dst)+1:]))
|
|
# keep only the most recent 5
|
|
for c in copies[5:]:
|
|
print("Deleting", c)
|
|
os.remove(c)
|
|
|
|
def write_dhcp_files(ipmap):
|
|
timestamp = time.time()
|
|
tempfile = "{}.{}".format(dhcp_hosts_file, timestamp)
|
|
with open(tempfile,"w") as f:
|
|
prefix = ipmap["prefix"]
|
|
gws = {}
|
|
for i in range(1,255):
|
|
ip = ipmap[i]
|
|
if ip.dhcp_pool:
|
|
continue
|
|
if ip.reserved_by or ip.locked:
|
|
meta = OrderedDict()
|
|
if ip.reserved_by:
|
|
meta['reserved-by'] = ip.reserved_by
|
|
if ip.locked:
|
|
meta['locked'] = True
|
|
if ip.color:
|
|
meta['color'] = ip.color
|
|
print("# " + json.dumps(meta), file=f)
|
|
if len(ip.macs) == 0:
|
|
macs = impossible_mac
|
|
else:
|
|
macs = ",".join(ip.macs)
|
|
settag = ""
|
|
if ip.gw and len(ip.macs) > 0:
|
|
tag = "client" + ip.macs[0].replace(":","").lower()
|
|
settag = ",set:" + tag
|
|
gws[tag] = ip.gw
|
|
print("{},{}.{}{}".format(macs,prefix,i,settag), file=f)
|
|
rename(tempfile, dhcp_hosts_file)
|
|
|
|
# Write custom gateways file
|
|
tempfile = "{}.{}".format(dhcp_opts_file, timestamp)
|
|
with open(tempfile,"w") as f:
|
|
for tag in gws:
|
|
print(
|
|
"tag:{},option:router,{}".format(tag,gws[tag]),
|
|
file=f,
|
|
)
|
|
rename(tempfile, dhcp_opts_file)
|
|
|
|
@app.route("/ip/<int:last_byte>", methods=['GET', 'POST'])
|
|
@login_required
|
|
def ip(last_byte):
|
|
valid_colors = "black red gold green blue purple".split()
|
|
if request.method == 'POST':
|
|
#print(request.form)
|
|
|
|
reserved_by = request.form.get('reserved-by', '')[:100]
|
|
dhcp_pool = request.form.get('dhcp-client', '') == 'pool'
|
|
macs = parse_macs(request.form.get('dhcp-client-mac', ''))
|
|
gw = request.form.get('dhcp-gw','')[:15]
|
|
gw = gw if ip_re.match(gw) else None
|
|
color = request.form.get('label-color', '')[:20]
|
|
color = color if color in valid_colors else None
|
|
|
|
with exclusive_lock:
|
|
ipmap = load_ips()
|
|
if not ipmap[last_byte].locked:
|
|
ipmap[last_byte] = Ip(
|
|
reserved_by=reserved_by,
|
|
dhcp_pool=dhcp_pool,
|
|
locked=False,
|
|
macs=macs,
|
|
gw=gw,
|
|
color=color
|
|
)
|
|
write_dhcp_files(ipmap)
|
|
reload_dnsmasq()
|
|
|
|
return redirect(url_for('ips'))
|
|
else:
|
|
ipmap = load_ips()
|
|
leases = load_leases()
|
|
lease = None
|
|
try:
|
|
if last_byte in leases:
|
|
lease = leases[last_byte]
|
|
today = time.localtime().tm_wday
|
|
expiry_t = time.localtime(int(lease[0]))
|
|
expiry_day = expiry_t.tm_wday
|
|
expiry_hour = time.strftime("%I:%M %p", expiry_t)
|
|
if today == expiry_day:
|
|
lease[0] = "las " + expiry_hour
|
|
elif (today + 1) % 7 == expiry_day:
|
|
lease[0] = "ma\N{LATIN SMALL LETTER N WITH TILDE}ana a las " + expiry_hour
|
|
else:
|
|
lease[0] = time.strftime("%B %d, ", expiry_t) + expiry_hour
|
|
except ValueError:
|
|
pass
|
|
except IndexError:
|
|
pass
|
|
|
|
addr = "{}.{}".format(ipmap["prefix"], last_byte)
|
|
return render_template(
|
|
"ip.html",
|
|
addr=addr,
|
|
valid_colors=valid_colors,
|
|
meta=ipmap[last_byte],
|
|
lease=lease,
|
|
)
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
session.pop("logged_in",None)
|
|
return redirect(url_for("index"))
|
|
|
|
if __name__ == '__main__':
|
|
if "debug" in sys.argv:
|
|
users_file = "users.dpto2"
|
|
dhcp_hosts_file = "dhcp-hosts.dpto2"
|
|
dhcp_opts_file = "dhcp-opts.dpto2"
|
|
app.debug = True
|
|
app.run(host="0.0.0.0")
|