Pyng/app.py

186 lines
5.7 KiB
Python
Raw Normal View History

2023-06-21 21:15:51 +00:00
import re
import os
import sys
import time
import socket
import iperf3
import logging
import requests
import subprocess
import scapy.all as scapy
from simplepam import authenticate
from flask_fontawesome import FontAwesome
from flask_navigation import Navigation
from flask_socketio import SocketIO, emit
from flask import Flask, render_template, Response, request, redirect, url_for, session, escape
#Global config
__version__ = "1.0.0"
path = os.path.dirname(__file__)
logging.basicConfig(filename=path+'/log/Pyng.log', level=logging.DEBUG, format=f'%(asctime)s %(levelname)s %(name)s %(threadName)s : %(message)s')
logging.info('PROGRAM START')
#Init
app = Flask(__name__)
app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX/,?RT'
socketio = SocketIO(app,cors_allowed_origins="*",async_mode="threading")
fa = FontAwesome(app)
nav = Navigation(app)
#Variable
uphost=0
#Menu
nav.Bar('top', [
nav.Item('Network', 'index'),
nav.Item('Port', 'port'),
nav.Item('Tools','tools'),
])
##Check if URL or IP
def check_url_ip(target):
ip_pattern = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")
if ip_pattern.match(target):
ip_target = target
else:
if target.startswith("http://"):
target = target[7:]
elif target.startswith("https://"):
target = target[8:]
try:
socket.gethostbyname(target)
ip_target = socket.gethostbyname(target)
except:
ip_target='0.0.0.0'
return ip_target
def iperf():
client = iperf3.Client()
client.duration = 2
client.server_hostname = 'ping.online.net'
client.port = 5200
try:
result = client.run()
send_speed = round(result.sent_Mbps,2)
received_speed = round(result.received_Mbps,2)
except:
send_speed = "Could not be tested"
received_speed = "Could not be tested"
return send_speed, received_speed
##Check Mac adress
def get_mac_details(mac_address):
##Sleep to avoid Free API Limitation
time.sleep(.65)
url = "https://api.macvendors.com/"
response = requests.get(url+mac_address)
if response.status_code != 200:
retour = " "
else:
retour = response.content.decode()
return retour
##Check DNS resolution
def dns_resolv(target):
try:
socket.gethostbyaddr(target)
dns=socket.gethostbyaddr(target)[0]
except:
dns=""
return dns
#Background port scan
def bg_scan_port(target):
for port in range(0, 65535):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex((target, port))
if result == 0:
socketio.emit('scan_response',{'data': 'Port: ' + format(port)})
sock.close()
#Background network scan
def bg_scan_network(subnet):
global uphost
uphost=0
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
request = scapy.ARP()
request.pdst = subnet
broadcast = scapy.Ether()
broadcast.dst = 'ff:ff:ff:ff:ff:ff'
request_broadcast = broadcast / request
clients = scapy.srp(request_broadcast, timeout = 1)[0]
for element in clients:
uphost=uphost+1
macdet=get_mac_details(element[1].hwsrc)
dns=dns_resolv(element[1].psrc)
socketio.emit('net_response',{'ip': element[1].psrc,'mac': element[1].hwsrc,'vendor':macdet,'dns':dns})
sock.close()
#Main route to index.
@app.route("/")
def index():
app.logger.info('Info level log')
app.logger.warning('Warning level log')
if 'username' in session:
return render_template('index.html')
return render_template('login.html',title="Welcome")
@app.route("/tools", methods=['GET', 'POST'])
def tools():
resultat =''
send,received=iperf();
resultat = '<p>Send : '+str(send)+' Mbp/s</p><p>Received : '+str(received)+' Mbp/s</p>'
return render_template('tool.html',resultat=resultat,title="Tools")
@app.route("/port")
def port():
app.logger.info('Info level log')
app.logger.warning('Warning level log')
if 'username' in session:
return render_template('port.html')
return render_template('login.html',title="Welcome")
##Socket pour le scan de port
@socketio.on('scan_port')
def connect(data):
target=check_url_ip(data)
if target == '0.0.0.0':
emit('scan_response', {'data': 'Server '+format(data)+' not found'})
else:
emit('scan_response', {'data': 'Scan on progress for '+format(data)+'('+target+')'})
socketio.start_background_task(bg_scan_port(target))
emit('scan_response', {'data': 'Scan Complete'})
##Socket pour le scan réseau
@socketio.on('net_port')
def connect(data):
global uphost
emit('net_response', {'ip':'IP','mac':'Mac address','vendor':'Vendor resolution','dns':'Local DNS'})
socketio.start_background_task(bg_scan_network(data))
downhost=256-uphost
emit('net_response', {'ip':' ','mac':'Up:'+str(uphost),'vendor': 'Down:'+str(downhost),'dns':' '})
##Login
@app.route("/login", methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
if authenticate(str(username), str(password)):
session['username'] = request.form['username']
logging.info('AUTH LOG - New auth from '+username+' !')
return redirect(url_for('index'))
else:
resultats='<p id="alert" style="color:red">Invalid Username/Password ! </p>'
return render_template('login.html', alertmessage=resultats)
##Logout
@app.route('/logout')
def logout():
session.pop('username', None)
return redirect(url_for('index'))
##Start
if __name__ == '__main__':
socketio.run(app, host='0.0.0.0', port='5005', allow_unsafe_werkzeug=True)