Dl app.ori
parent
58c285e027
commit
73597bcd6a
187
app.py.ori
187
app.py.ori
|
@ -1,187 +0,0 @@
|
||||||
import re
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
import time
|
|
||||||
import socket
|
|
||||||
import iperf3
|
|
||||||
import logging
|
|
||||||
import requests
|
|
||||||
import subprocess
|
|
||||||
import scapy.all as scapy
|
|
||||||
from simplepam import authenticate
|
|
||||||
from flask_fontawesome import FontAwesome
|
|
||||||
from flask_navigation import Navigation
|
|
||||||
from flask_socketio import SocketIO, emit
|
|
||||||
from flask import Flask, render_template, Response, request, redirect, url_for, session, escape
|
|
||||||
|
|
||||||
#Global config
|
|
||||||
__version__ = "1.0.0"
|
|
||||||
path = os.path.dirname(__file__)
|
|
||||||
logging.basicConfig(filename=path+'/log/Pyng.log', level=logging.DEBUG, format=f'%(asctime)s %(levelname)s %(name)s %(threadName)s : %(message)s')
|
|
||||||
logging.info('PROGRAM START')
|
|
||||||
|
|
||||||
#Init
|
|
||||||
app = Flask(__name__)
|
|
||||||
app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX/,?RT'
|
|
||||||
socketio = SocketIO(app,cors_allowed_origins="*",async_mode="threading")
|
|
||||||
fa = FontAwesome(app)
|
|
||||||
nav = Navigation(app)
|
|
||||||
|
|
||||||
|
|
||||||
#Variable
|
|
||||||
uphost=0
|
|
||||||
#Menu
|
|
||||||
nav.Bar('top', [
|
|
||||||
nav.Item('Network', 'index'),
|
|
||||||
nav.Item('Port', 'port'),
|
|
||||||
nav.Item('Tools','tools'),
|
|
||||||
])
|
|
||||||
|
|
||||||
##Check if URL or IP
|
|
||||||
def check_url_ip(target):
|
|
||||||
ip_pattern = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")
|
|
||||||
if ip_pattern.match(target):
|
|
||||||
ip_target = target
|
|
||||||
else:
|
|
||||||
if target.startswith("http://"):
|
|
||||||
target = target[7:]
|
|
||||||
elif target.startswith("https://"):
|
|
||||||
target = target[8:]
|
|
||||||
try:
|
|
||||||
socket.gethostbyname(target)
|
|
||||||
ip_target = socket.gethostbyname(target)
|
|
||||||
except:
|
|
||||||
ip_target='0.0.0.0'
|
|
||||||
return ip_target
|
|
||||||
|
|
||||||
def iperf():
|
|
||||||
client = iperf3.Client()
|
|
||||||
client.duration = 2
|
|
||||||
client.server_hostname = 'ping.online.net'
|
|
||||||
client.port = 5200
|
|
||||||
try:
|
|
||||||
result = client.run()
|
|
||||||
send_speed = round(result.sent_Mbps,2)
|
|
||||||
received_speed = round(result.received_Mbps,2)
|
|
||||||
except:
|
|
||||||
send_speed = "Could not be tested"
|
|
||||||
received_speed = "Could not be tested"
|
|
||||||
return send_speed, received_speed
|
|
||||||
|
|
||||||
##Check Mac adress
|
|
||||||
def get_mac_details(mac_address):
|
|
||||||
##Sleep to avoid Free API Limitation
|
|
||||||
time.sleep(.65)
|
|
||||||
url = "https://api.macvendors.com/"
|
|
||||||
response = requests.get(url+mac_address)
|
|
||||||
#DEBUG But too fast :/
|
|
||||||
# print("DEBUG : MAC"+str(mac_address)+" CODE:"+str(response.status_code)+" RESULTAT : "+str(response.content.decode()))
|
|
||||||
if response.status_code != 200:
|
|
||||||
retour = " "
|
|
||||||
else:
|
|
||||||
retour = response.content.decode()
|
|
||||||
return retour
|
|
||||||
|
|
||||||
##Check DNS resolution
|
|
||||||
def dns_resolv(target):
|
|
||||||
try:
|
|
||||||
socket.gethostbyaddr(target)
|
|
||||||
dns=socket.gethostbyaddr(target)[0]
|
|
||||||
except:
|
|
||||||
dns=""
|
|
||||||
return dns
|
|
||||||
|
|
||||||
#Background port scan
|
|
||||||
def bg_scan_port(target):
|
|
||||||
for port in range(0, 65535):
|
|
||||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
result = sock.connect_ex((target, port))
|
|
||||||
if result == 0:
|
|
||||||
socketio.emit('scan_response',{'data': 'Port: ' + format(port)})
|
|
||||||
sock.close()
|
|
||||||
|
|
||||||
#Background network scan
|
|
||||||
def bg_scan_network(subnet):
|
|
||||||
global uphost
|
|
||||||
uphost=0
|
|
||||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
request = scapy.ARP()
|
|
||||||
request.pdst = subnet
|
|
||||||
broadcast = scapy.Ether()
|
|
||||||
broadcast.dst = 'ff:ff:ff:ff:ff:ff'
|
|
||||||
request_broadcast = broadcast / request
|
|
||||||
clients = scapy.srp(request_broadcast, timeout = 1)[0]
|
|
||||||
for element in clients:
|
|
||||||
uphost=uphost+1
|
|
||||||
macdet=get_mac_details(element[1].hwsrc)
|
|
||||||
dns=dns_resolv(element[1].psrc)
|
|
||||||
socketio.emit('net_response',{'ip': element[1].psrc,'mac': element[1].hwsrc,'vendor':macdet,'dns':dns})
|
|
||||||
sock.close()
|
|
||||||
|
|
||||||
#Main route to index.
|
|
||||||
@app.route("/")
|
|
||||||
def index():
|
|
||||||
app.logger.info('Info level log')
|
|
||||||
app.logger.warning('Warning level log')
|
|
||||||
if 'username' in session:
|
|
||||||
return render_template('index.html')
|
|
||||||
return render_template('login.html',title="Welcome")
|
|
||||||
|
|
||||||
@app.route("/tools", methods=['GET', 'POST'])
|
|
||||||
def tools():
|
|
||||||
resultat =''
|
|
||||||
send,received=iperf();
|
|
||||||
resultat = '<p>Send : '+str(send)+' Mbp/s</p><p>Received : '+str(received)+' Mbp/s</p>'
|
|
||||||
|
|
||||||
return render_template('tool.html',resultat=resultat,title="Tools")
|
|
||||||
|
|
||||||
@app.route("/port")
|
|
||||||
def port():
|
|
||||||
app.logger.info('Info level log')
|
|
||||||
app.logger.warning('Warning level log')
|
|
||||||
if 'username' in session:
|
|
||||||
return render_template('port.html')
|
|
||||||
return render_template('login.html',title="Welcome")
|
|
||||||
|
|
||||||
##Socket pour le scan de port
|
|
||||||
@socketio.on('scan_port')
|
|
||||||
def connect(data):
|
|
||||||
target=check_url_ip(data)
|
|
||||||
if target == '0.0.0.0':
|
|
||||||
emit('scan_response', {'data': 'Server '+format(data)+' not found'})
|
|
||||||
else:
|
|
||||||
emit('scan_response', {'data': 'Scan on progress for '+format(data)+'('+target+')'})
|
|
||||||
socketio.start_background_task(bg_scan_port(target))
|
|
||||||
emit('scan_response', {'data': 'Scan Complete'})
|
|
||||||
|
|
||||||
##Socket pour le scan réseau
|
|
||||||
@socketio.on('net_port')
|
|
||||||
def connect(data):
|
|
||||||
global uphost
|
|
||||||
emit('net_response', {'ip':'IP','mac':'Mac address','vendor':'Vendor resolution','dns':'Local DNS'})
|
|
||||||
socketio.start_background_task(bg_scan_network(data))
|
|
||||||
downhost=256-uphost
|
|
||||||
emit('net_response', {'ip':' ','mac':'Up:'+str(uphost),'vendor': 'Down:'+str(downhost),'dns':' '})
|
|
||||||
|
|
||||||
##Login
|
|
||||||
@app.route("/login", methods=['GET', 'POST'])
|
|
||||||
def login():
|
|
||||||
if request.method == 'POST':
|
|
||||||
username = request.form['username']
|
|
||||||
password = request.form['password']
|
|
||||||
if authenticate(str(username), str(password)):
|
|
||||||
session['username'] = request.form['username']
|
|
||||||
logging.info('AUTH LOG - New auth from '+username+' !')
|
|
||||||
return redirect(url_for('index'))
|
|
||||||
else:
|
|
||||||
resultats='<p id="alert" style="color:red">Invalid Username/Password ! </p>'
|
|
||||||
return render_template('login.html', alertmessage=resultats)
|
|
||||||
##Logout
|
|
||||||
@app.route('/logout')
|
|
||||||
def logout():
|
|
||||||
session.pop('username', None)
|
|
||||||
return redirect(url_for('index'))
|
|
||||||
|
|
||||||
##Start
|
|
||||||
if __name__ == '__main__':
|
|
||||||
socketio.run(app, host='0.0.0.0', port='5005', allow_unsafe_werkzeug=True)
|
|
Loading…
Reference in New Issue