Pyng/app.py

192 lines
5.8 KiB
Python

import re
import os
import sys
import time
import socket
import iperf3
import requests
import subprocess
import scapy.all as scapy
from simplepam import authenticate
from flask_fontawesome import FontAwesome
from flask_navigation import Navigation
from flask_socketio import SocketIO, emit
from flask import Flask, render_template, Response, request, redirect, url_for, session, escape
#Init
app = Flask(__name__)
app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX/,?RT'
socketio = SocketIO(app,cors_allowed_origins="*",async_mode="threading")
fa = FontAwesome(app)
nav = Navigation(app)
#Variable
uphost=0
export_list=[]
#Menu
nav.Bar('top', [
nav.Item('Network', 'index'),
nav.Item('Port', 'port'),
nav.Item('Tools','tools'),
])
##Check if URL or IP
def check_url_ip(target):
ip_pattern = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")
if ip_pattern.match(target):
ip_target = target
else:
if target.startswith("http://"):
target = target[7:]
elif target.startswith("https://"):
target = target[8:]
try:
socket.gethostbyname(target)
ip_target = socket.gethostbyname(target)
except:
ip_target='0.0.0.0'
return ip_target
def iperf():
client = iperf3.Client()
client.duration = 2
client.server_hostname = 'ping.online.net'
client.port = 5200
try:
result = client.run()
send_speed = round(result.sent_Mbps,2)
received_speed = round(result.received_Mbps,2)
except:
send_speed = "Could not be tested"
received_speed = "Could not be tested"
return send_speed, received_speed
##Check Mac adress
def get_mac_details(mac_address):
##Sleep to avoid Free API Limitation
time.sleep(.65)
url = "https://api.macvendors.com/"
response = requests.get(url+mac_address)
if response.status_code != 200:
retour = " "
else:
retour = response.content.decode()
return retour
##Check DNS resolution
def dns_resolv(target):
try:
socket.gethostbyaddr(target)
dns=socket.gethostbyaddr(target)[0]
except:
dns=""
return dns
#Background port scan
def bg_scan_port(target):
for port in range(0, 65535):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex((target, port))
if result == 0:
socketio.emit('scan_response',{'data': 'Port: ' + format(port)})
sock.close()
#Background network scan
def bg_scan_network(subnet):
global export_list
global uphost
uphost=0
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
request = scapy.ARP()
request.pdst = subnet
broadcast = scapy.Ether()
broadcast.dst = 'ff:ff:ff:ff:ff:ff'
request_broadcast = broadcast / request
clients = scapy.srp(request_broadcast, timeout = 1)[0]
export_list=['IP,MAC,VENDOR,DNS']
for element in clients:
uphost=uphost+1
macdet=get_mac_details(element[1].hwsrc)
dns=dns_resolv(element[1].psrc)
export_list.append(element[1].psrc+','+element[1].hwsrc+','+macdet.replace(",", " ")+','+dns)
socketio.emit('net_response',{'ip': element[1].psrc,'mac': element[1].hwsrc,'vendor':macdet,'dns':dns})
sock.close()
#Main route to index.
@app.route("/")
def index():
if 'username' in session:
return render_template('index.html')
return render_template('login.html',title="Welcome")
@app.route("/tools", methods=['GET', 'POST'])
def tools():
resultat =''
send,received=iperf();
resultat = '<p>Send : '+str(send)+' Mbp/s</p><p>Received : '+str(received)+' Mbp/s</p>'
return render_template('tool.html',resultat=resultat,title="Tools")
@app.route("/port")
def port():
app.logger.info('Info level log')
app.logger.warning('Warning level log')
if 'username' in session:
return render_template('port.html')
return render_template('login.html',title="Welcome")
##Socket pour le scan de port
@socketio.on('scan_port')
def connect(data):
target=check_url_ip(data)
if target == '0.0.0.0':
emit('scan_response', {'data': 'Server '+format(data)+' not found'})
else:
emit('scan_response', {'data': 'Scan on progress for '+format(data)+'('+target+')'})
socketio.start_background_task(bg_scan_port(target))
emit('scan_response', {'data': 'Scan Complete'})
##Socket pour le scan réseau
@socketio.on('net_port')
def connect(data):
global uphost
global export_list
emit('net_response', {'ip':'IP','mac':'Mac address','vendor':'Vendor resolution','dns':'Local DNS'})
socketio.start_background_task(bg_scan_network(data))
downhost=256-uphost
emit('net_response', {'ip':' ','mac':'Up:'+str(uphost),'vendor': 'Down:'+str(downhost),'dns':''})
#Export
@app.route("/export", methods=['GET', 'POST'])
def export_file():
global export_list
csv=""
for line in export_list:
csv = csv+'\n'+line
return Response(
csv,
mimetype="text/csv",
headers={"Content-disposition":
"attachment; filename=CMDB.csv"})
##Login
@app.route("/login", methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
if authenticate(str(username), str(password)):
session['username'] = request.form['username']
return redirect(url_for('index'))
else:
resultats='<p id="alert" style="color:red">Invalid Username/Password ! </p>'
return render_template('login.html', alertmessage=resultats)
##Logout
@app.route('/logout')
def logout():
session.pop('username', None)
return redirect(url_for('index'))
##Start
if __name__ == '__main__':
socketio.run(app, host='0.0.0.0', port='5005', allow_unsafe_werkzeug=True)