crouching_tiger/ctiger/mac_purge.py
anoduck f2c940ae37 feat(Features): 🚧 Hidden Dragon Work
Many changes performed in development of Hidden Dragon

hdragon command is broken for development
2024-03-12 18:03:38 -04:00

145 lines
5.3 KiB
Python

# Copyright (c) 2024 Anoduck
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
from scapy.sendrecv import AsyncSniffer
from scapy.sendrecv import sendp
from scapy.layers.dot11 import Dot11
from scapy.layers.dot11 import RadioTap
from scapy.layers.dot11 import Dot11FCS
import threading
from random import choice
from time import sleep
import os
from .dataframe import CtigerDataFrame
from .netdev import NetDev
class Purge(object):
def __getitem__(self, pkt):
return pkt
def do_hop(self) -> None:
self.log.debug('Hop args: {0}'.format(
self.do_hop.__code__.co_varnames))
thread = threading.current_thread()
self.log.debug(f'Do Hop: name={thread.name}, daemon={thread.daemon}')
while True:
ichan = choice(self.chans)
self.log.debug('Hopping on: {0}'.format(ichan))
os.system(f'iw dev {self.mon_if} set channel {str(ichan)}')
self.log.debug('Channel set to {0}'.format(ichan))
sleep(14.7)
def channel_runner(self, log) -> None:
log.info('Channel Runner NG started.')
log.info('Preliminary channel list: {0}'.format(self.channels))
chanlist = self.channels.split(',')
chlist = list(set(chanlist))
chans = [int(chan) for chan in chlist]
self.chans = chans
log.info('Channel list: {0}'.format(chlist))
thread = threading.current_thread()
log.debug(
f'Channel Runner: name={thread.name}, daemon={thread.daemon}')
chop = threading.Thread(target=self.do_hop,
name='chop')
chop.start()
def send_pkt(self, bssid) -> None:
self.log.debug('Extracted bssid: {0}'.format(bssid))
durid = 65535
self.log.debug('Selected durid: {0}'.format(durid))
self.log.debug('Sending packet with: {0}'.format(self.macaddr))
new_pkt = RadioTap()/Dot11(proto=0, type=1, subtype=11,
addr1=bssid,
addr2=self.macaddr,
ID=durid)
self.log.debug(
'Sending RTS frame to {0} with type 1 and subtype 11'.format(bssid))
sendp(new_pkt, verbose=0)
def get_interface(self) -> tuple:
ndev = NetDev()
if_tup = ndev.start_monitor(interface=self.interface,
mon_type=self.mon_type,
log=self.log)
return if_tup
def extract_channel(self, layers):
retval = ''
counter = 0
try:
while True:
layer = layers[counter]
if hasattr(layer, "ID") and layer.ID == 3 and layer.len == 1:
retval = ord(layer.info)
break
else:
counter += 1
except IndexError:
pass
return retval
def cts_prn(self, pkt):
bssid = pkt[Dot11FCS].addr2
self.log.info('Intercepted CTS from {0}'.format(bssid))
dbm_signal = pkt.dBm_AntSignal
pkt_chan = self.extract_channel(pkt[Dot11])
self.log.debug('Extracted channel: {0}'.format(pkt_chan))
if bssid in self.devices:
self.log.info('Duplicate CTS from {0}'.format(bssid))
return
else:
self.devices.append(bssid)
self.scan_df.loc[bssid] = [
self.macaddr, dbm_signal, pkt_chan, 'N/A']
self.scan_df.to_csv(self.valid_file, mode='a', header=False)
self.log.info('Results written to {0}'.format(self.valid_file))
def probe_prn(self, pkt):
bssid = pkt[Dot11FCS].addr2
self.log.info('Intercepted probe-req from {0}'.format(bssid))
self.log.debug('Extracted bssid: {0}'.format(bssid))
self.log.info('Sending RTS frame to {0}'.format(bssid))
self.send_pkt(bssid)
def mac_revealer(self, interface, mon_type, valid_file, channels, log):
self.interface = interface
self.mon_type = mon_type
self.valid_file = valid_file
self.channels = channels
self.log = log
df = CtigerDataFrame()
log.info('mac revealer started')
log.info('setting up class attributes')
self.scan_df = df.get_df()
devices = df.load_df(valid_file, log)
if devices:
self.devices = devices
else:
log.info('No valid targets found')
self.devices = []
log.info('acquired Dataframe')
mon_if, macaddr = self.get_interface()
self.mon_if = mon_if
self.macaddr = macaddr
log.debug('mon_if: {0}'.format(mon_if))
log.debug('return type: {0}'.format(type(mon_if)))
log.info('interface {0} is up and running.'.format(self.mon_if))
probe_sniff = AsyncSniffer(
iface=mon_if, prn=self.probe_prn,
filter="type mgt subtype probe-req",
monitor=True)
probe_sniff.start()
cts_sniff = AsyncSniffer(filter='type ctl subtype cts',
iface=mon_if, prn=self.cts_prn,
monitor=True)
cts_sniff.start()
self.channel_runner(log)
log.info('Channel runner started.')
log.info('Probe sniffer started')
log.info('CTS sniffer started')