186 lines
5.7 KiB
Python
186 lines
5.7 KiB
Python
import re
|
|
import os
|
|
import sys
|
|
import time
|
|
import socket
|
|
import iperf3
|
|
import logging
|
|
import requests
|
|
import subprocess
|
|
import scapy.all as scapy
|
|
from simplepam import authenticate
|
|
from flask_fontawesome import FontAwesome
|
|
from flask_navigation import Navigation
|
|
from flask_socketio import SocketIO, emit
|
|
from flask import Flask, render_template, Response, request, redirect, url_for, session, escape
|
|
|
|
#Global config
|
|
__version__ = "1.0.0"
|
|
path = os.path.dirname(__file__)
|
|
logging.basicConfig(filename=path+'/log/Pyng.log', level=logging.DEBUG, format=f'%(asctime)s %(levelname)s %(name)s %(threadName)s : %(message)s')
|
|
logging.info('PROGRAM START')
|
|
|
|
#Init
|
|
app = Flask(__name__)
|
|
app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX/,?RT'
|
|
socketio = SocketIO(app,cors_allowed_origins="*",async_mode="threading")
|
|
fa = FontAwesome(app)
|
|
nav = Navigation(app)
|
|
|
|
|
|
#Variable
|
|
uphost=0
|
|
#Menu
|
|
nav.Bar('top', [
|
|
nav.Item('Network', 'index'),
|
|
nav.Item('Port', 'port'),
|
|
nav.Item('Tools','tools'),
|
|
])
|
|
|
|
##Check if URL or IP
|
|
def check_url_ip(target):
|
|
ip_pattern = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")
|
|
if ip_pattern.match(target):
|
|
ip_target = target
|
|
else:
|
|
if target.startswith("http://"):
|
|
target = target[7:]
|
|
elif target.startswith("https://"):
|
|
target = target[8:]
|
|
try:
|
|
socket.gethostbyname(target)
|
|
ip_target = socket.gethostbyname(target)
|
|
except:
|
|
ip_target='0.0.0.0'
|
|
return ip_target
|
|
|
|
def iperf():
|
|
client = iperf3.Client()
|
|
client.duration = 2
|
|
client.server_hostname = 'ping.online.net'
|
|
client.port = 5200
|
|
try:
|
|
result = client.run()
|
|
send_speed = round(result.sent_Mbps,2)
|
|
received_speed = round(result.received_Mbps,2)
|
|
except:
|
|
send_speed = "Could not be tested"
|
|
received_speed = "Could not be tested"
|
|
return send_speed, received_speed
|
|
|
|
##Check Mac adress
|
|
def get_mac_details(mac_address):
|
|
##Sleep to avoid Free API Limitation
|
|
time.sleep(.65)
|
|
url = "https://api.macvendors.com/"
|
|
response = requests.get(url+mac_address)
|
|
if response.status_code != 200:
|
|
retour = " "
|
|
else:
|
|
retour = response.content.decode()
|
|
return retour
|
|
|
|
##Check DNS resolution
|
|
def dns_resolv(target):
|
|
try:
|
|
socket.gethostbyaddr(target)
|
|
dns=socket.gethostbyaddr(target)[0]
|
|
except:
|
|
dns=""
|
|
return dns
|
|
|
|
#Background port scan
|
|
def bg_scan_port(target):
|
|
for port in range(0, 65535):
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
result = sock.connect_ex((target, port))
|
|
if result == 0:
|
|
socketio.emit('scan_response',{'data': 'Port: ' + format(port)})
|
|
sock.close()
|
|
|
|
#Background network scan
|
|
def bg_scan_network(subnet):
|
|
global uphost
|
|
uphost=0
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
request = scapy.ARP()
|
|
request.pdst = subnet
|
|
broadcast = scapy.Ether()
|
|
broadcast.dst = 'ff:ff:ff:ff:ff:ff'
|
|
request_broadcast = broadcast / request
|
|
clients = scapy.srp(request_broadcast, timeout = 1)[0]
|
|
for element in clients:
|
|
uphost=uphost+1
|
|
macdet=get_mac_details(element[1].hwsrc)
|
|
dns=dns_resolv(element[1].psrc)
|
|
socketio.emit('net_response',{'ip': element[1].psrc,'mac': element[1].hwsrc,'vendor':macdet,'dns':dns})
|
|
sock.close()
|
|
|
|
#Main route to index.
|
|
@app.route("/")
|
|
def index():
|
|
app.logger.info('Info level log')
|
|
app.logger.warning('Warning level log')
|
|
if 'username' in session:
|
|
return render_template('index.html')
|
|
return render_template('login.html',title="Welcome")
|
|
|
|
@app.route("/tools", methods=['GET', 'POST'])
|
|
def tools():
|
|
resultat =''
|
|
send,received=iperf();
|
|
resultat = '<p>Send : '+str(send)+' Mbp/s</p><p>Received : '+str(received)+' Mbp/s</p>'
|
|
|
|
return render_template('tool.html',resultat=resultat,title="Tools")
|
|
|
|
@app.route("/port")
|
|
def port():
|
|
app.logger.info('Info level log')
|
|
app.logger.warning('Warning level log')
|
|
if 'username' in session:
|
|
return render_template('port.html')
|
|
return render_template('login.html',title="Welcome")
|
|
|
|
##Socket pour le scan de port
|
|
@socketio.on('scan_port')
|
|
def connect(data):
|
|
target=check_url_ip(data)
|
|
if target == '0.0.0.0':
|
|
emit('scan_response', {'data': 'Server '+format(data)+' not found'})
|
|
else:
|
|
emit('scan_response', {'data': 'Scan on progress for '+format(data)+'('+target+')'})
|
|
socketio.start_background_task(bg_scan_port(target))
|
|
emit('scan_response', {'data': 'Scan Complete'})
|
|
|
|
##Socket pour le scan réseau
|
|
@socketio.on('net_port')
|
|
def connect(data):
|
|
global uphost
|
|
emit('net_response', {'ip':'IP','mac':'Mac address','vendor':'Vendor resolution','dns':'Local DNS'})
|
|
socketio.start_background_task(bg_scan_network(data))
|
|
downhost=256-uphost
|
|
emit('net_response', {'ip':' ','mac':'Up:'+str(uphost),'vendor': 'Down:'+str(downhost),'dns':' '})
|
|
|
|
##Login
|
|
@app.route("/login", methods=['GET', 'POST'])
|
|
def login():
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
if authenticate(str(username), str(password)):
|
|
session['username'] = request.form['username']
|
|
logging.info('AUTH LOG - New auth from '+username+' !')
|
|
return redirect(url_for('index'))
|
|
else:
|
|
resultats='<p id="alert" style="color:red">Invalid Username/Password ! </p>'
|
|
return render_template('login.html', alertmessage=resultats)
|
|
##Logout
|
|
@app.route('/logout')
|
|
def logout():
|
|
session.pop('username', None)
|
|
return redirect(url_for('index'))
|
|
|
|
##Start
|
|
if __name__ == '__main__':
|
|
socketio.run(app, host='0.0.0.0', port='5005', allow_unsafe_werkzeug=True)
|