significant improvement in class formation

This commit is contained in:
anoduck 2023-10-20 18:27:48 -04:00
parent 598bd784a3
commit 6afad9fad4

217
ctiger.py
View file

@ -15,7 +15,6 @@ from scapy.sendrecv import sniff
from scapy.sendrecv import AsyncSniffer
from scapy.sendrecv import sendp
from scapy.sendrecv import sr1
from scapy.interfaces import get_working_if
from scapy.layers.dot11 import Dot11Beacon
from scapy.layers.dot11 import Dot11
from scapy.layers.dot11 import Dot11Elt
@ -55,7 +54,6 @@ sys.path.append(os.path.expanduser('~/.local/lib/python3.11/site-packages'))
# ------------------------------------------------------------
config_file = os.path.abspath("/etc/ctiger/config.ini")
pc = Counter()
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
# scapy.config.Conf.layers.filter([Dot11, Dot11Beacon, Dot11Elt,
# RadioTap, Dot11Deauth, Dot11FCS, EAPOL])
@ -223,69 +221,69 @@ def strainer(pkt) -> None:
log.info('Intercepted CTS from: ' + bssid)
dbm_signal = pkt.dBm_AntSignal
channel = extract_channel(res[Dot11])
scan_df = Purge.__dict__['scan_df']
scan_df = Purge.get_df
scan_df.loc[bssid] = ['N/A', dbm_signal, channel, 'N/A']
# -------------------------------------------------------------------
# ███╗ ██╗███████╗████████╗██████╗ ███████╗██╗ ██╗
# ████╗ ██║██╔════╝╚══██╔══╝██╔══██╗██╔════╝██║ ██║
# ██╔██╗ ██║█████╗ ██║ ██║ ██║█████╗ ██║ ██║
# ██║╚██╗██║██╔══╝ ██║ ██║ ██║██╔══╝ ╚██╗ ██╔╝
# ██║ ╚████║███████╗ ██║ ██████╔╝███████╗ ╚████╔╝
# ╚═╝ ╚═══╝╚══════╝ ╚═╝ ╚═════╝ ╚══════╝ ╚═══╝
# ----------------------------------------------------------------
class NetDev:
def __init__(self, name: str, macaddr,
mon_if: str, channels: str, mon_crtd: str) -> None:
self.name = name
self.macaddr = macaddr
self.mon_if = mon_if
self.channels = channels
self.mon_crtd = mon_crtd
# type: ignore
def macaddr(self) -> str:
def __init__(self) -> None:
self.log = Log.get_log
self.macaddr = fake.wifi_essid()
pge = Purge()
self.name = pge.get_name()
self.mon_crtd = str(self.name) + 'mon'
self.mon_type = pge.get_type()
self.channels = pge.get_channels()
# type: ignore
def get_mac(self) -> str:
return self.macaddr
# ---------------------------------------------------------------------------------
# ███████╗███████╗████████╗██╗ ██╗██████╗ ███╗ ███╗ ██████╗ ███╗ ██╗
# ██╔════╝██╔════╝╚══██╔══╝██║ ██║██╔══██╗ ████╗ ████║██╔═══██╗████╗ ██║
# ███████╗█████╗ ██║ ██║ ██║██████╔╝ ██╔████╔██║██║ ██║██╔██╗ ██║
# ╚════██║██╔══╝ ██║ ██║ ██║██╔═══╝ ██║╚██╔╝██║██║ ██║██║╚██╗██║
# ███████║███████╗ ██║ ╚██████╔╝██║ ██║ ╚═╝ ██║╚██████╔╝██║ ╚████║
# ╚══════╝╚══════╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═══╝
# ---------------------------------------------------------------------------------
def create_if(self) -> bool:
try:
os.system(f'ip link set {self.name} up')
os.system(f'iw dev {self.name} interface add {self.mon_crtd} type monitor')
log.debug('Created ', self.mon_crtd)
self.log.debug('Created ', self.mon_crtd)
os.system(f'ip link set {self.mon_crtd} down')
os.system(f'ip link set {self.mon_crtd} address {self.macaddr}')
log.debug('Set device address to ', self.macaddr)
self.log.debug('Set device address to ', self.macaddr)
os.system(f'ip link set {self.mon_crtd} up')
log.debug('Set device up')
self.log.debug('Set device up')
os.system('iw set reg US')
log.debug('Set device registry to US')
log.info('Device is fully configured and up')
self.log.debug('Set device registry to US')
self.log.info('Device is fully configured and up')
return True
except os.error as e:
log.debug('Failed to create ', self.mon_crtd, e)
self.log.debug('Failed to create ', self.mon_crtd, e)
print('Creation of new monitor self.name failed:', e)
sys.exit(1)
def switch_if(self) -> bool:
try:
os.system(f'ip link set {self.name} down')
log.debug('Set device down')
self.log.debug('Set device down')
os.system(f'ip link set {self.name} address {self.macaddr}')
log.debug('Set device address to ', self.macaddr)
self.log.debug('Set device address to ', self.macaddr)
os.system(f'iw dev {self.name} set type monitor')
log.debug(self.name, ' switched to monitor')
self.log.debug(self.name, ' switched to monitor')
os.system(f'ip link set {self.name} up')
# (below) setting registry is known to cause issues.
os.system('iw set reg US')
log.debug('Set device registry to US')
self.log.debug('Set device registry to US')
scapyconfig.iface = self.name
log.info('Set scapy config self.name to: ', self.name)
log.info('Device is fully configured and up')
self.log.info('Set scapy config self.name to: ', self.name)
self.log.info('Device is fully configured and up')
return True
except os.error as e:
log.debug('Failed to switch ', self.name, ' type', e)
self.log.debug('Failed to switch ', self.name, ' type', e)
print('Failed to change ', self.name, ' mode', e)
sys.exit(1)
@ -301,58 +299,46 @@ class NetDev:
Returns:
str: The name of the created or switched monitor interface.
"""
self.name = Purge.__dict__['name']
self.mon_type = Purge.__dict__['mon_type']
log.debug('mac_address type: ', str(type(self.macaddr)))
log.debug('mac_address: ', str(self.macaddr))
log.info('Starting monitor interface')
self.mon_crtd = self.name + 'mon'
self.log.debug('mac_address type: ', str(type(self.macaddr)))
self.log.debug('mac_address: ', str(self.macaddr))
self.log.info('Starting monitor interface')
if self.mon_type == 'create':
NetDev.create_if(self.name)
if_mon = self.mon_crtd
return if_mon
self.create_if()
mon_if = self.mon_crtd
return mon_if
elif self.mon_type == 'switch':
if self.switch_if():
return self.name
else:
log.debug('Something got fucked with the interface')
sys.exit(1)
self.switch_if()
mon_if = self.name
return mon_if
else:
Exception('Invalid monitor type')
log.debug('Invalid monitor type')
self.log.debug('Invalid monitor type')
sys.exit(1)
# ---------------------------------------------------------------------------
# ██████╗██╗ ██╗ ██████╗ ██╗ ██╗███╗ ██╗███╗ ██╗███████╗██████╗
# ██╔════╝██║ ██║ ██╔══██╗██║ ██║████╗ ██║████╗ ██║██╔════╝██╔══██╗
# ██║ ███████║ ██████╔╝██║ ██║██╔██╗ ██║██╔██╗ ██║█████╗ ██████╔╝
# ██║ ██╔══██║ ██╔══██╗██║ ██║██║╚██╗██║██║╚██╗██║██╔══╝ ██╔══██╗
# ╚██████╗██║ ██║ ██║ ██║╚██████╔╝██║ ╚████║██║ ╚████║███████╗██║ ██║
# ╚═════╝╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═══╝╚═╝ ╚═══╝╚══════╝╚═╝ ╚═╝
# ----------------------------------------------------------------------------
def channel_runner(self) -> None:
log.info('Channel Runner NG started.')
log.info('Preliminary channel list: ', str(self.channels))
def channel_runner(self, mon_if) -> None:
self.log.info('Channel Runner NG started.')
self.log.info('Preliminary channel list: ', str(self.channels))
chanlist = self.channels.split(',')
chlist = list(set(chanlist))
log.info('Channel list: ', str(chlist))
self.log.info('Channel list: ', str(chlist))
chans = [int(chan) for chan in chlist]
thread = threading.current_thread()
print(f'name={thread.name}, daemon={thread.daemon}')
while True:
ichan = choice(chans)
os.system(f'iw dev {self.mon_if} set channel {str(ichan)}')
# log.debug('Channel set to ', str(ichan))
os.system(f'iw dev {mon_if} set channel {str(ichan)}')
# self.log.debug('Channel set to ', str(ichan))
sleep(14.7)
def signal_handler(signal, frame) -> None:
log = Log.get_log
print('You pressed Ctrl+C!')
log.info('Shutting down')
df2w = Purge.__dict__['scan_df']
df2w = Purge.get_df
df2w.to_csv('ct_purge.csv')
log.info('Saved results to: ',
Purge.__dict__['valid_file'])
Purge.get_file)
log.info('Going Down!!')
sys.exit(0)
@ -370,40 +356,71 @@ class Purge:
self.mon_type = kwargs.get('mon_type')
self.valid_file = kwargs.get('valid_file')
self.channels = kwargs.get('channels')
self.log = log
self.log = Log.get_log
self.scan_df = DataFrame.get_df
def start_purge(self) -> None:
asyncio.run(self.mac_purge())
def get_file(self):
return self.valid_file
def get_df(self):
return self.scan_df
def get_channels(self):
return self.channels
def get_type(self):
return self.mon_type
def get_name(self):
return self.interface
# -----------------------------------------------------------------------------
# ███╗ ███╗ █████╗ ██████╗ ██████╗ ██╗ ██╗██████╗ ██████╗ ███████╗
# ████╗ ████║██╔══██╗██╔════╝ ██╔══██╗██║ ██║██╔══██╗██╔════╝ ██╔════╝
# ██╔████╔██║███████║██║ ██████╔╝██║ ██║██████╔╝██║ ███╗█████╗
# ██║╚██╔╝██║██╔══██║██║ ██╔═══╝ ██║ ██║██╔══██╗██║ ██║██╔══╝
# ██║ ╚═╝ ██║██║ ██║╚██████╗ ██║ ╚██████╔╝██║ ██║╚██████╔╝███████╗
# ╚═╝ ╚═╝╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝ ╚═════╝ ╚══════╝
# ----------------------------------------------------------------------------
async def mac_purge(self) -> None:
signal.signal(signal.SIGINT, signal_handler)
print('Enter Ctrl+C TWICE to fully stop the script.')
device = NetDev
self.mon_if = device.start_monitor
log.info('interface ', self.mon_if, ' is up and running.')
self.scan_df = DataFrame()
self.valid_file = self.valid_file
log.info('We will be writing captured macs to ', self.valid_file)
chop = asyncio.to_thread(NetDev.channel_runner) # type: ignore
dev = NetDev()
mon_if = dev.start_monitor()
self.log.info('interface ', mon_if, ' is up and running.')
# vfile = self.get_file()
# self.log.info('We will be writing captured macs to ', str(self.valid_file))
chop = asyncio.to_thread(dev.channel_runner(mon_if)) # type: ignore
chopper = asyncio.create_task(chop)
log.info('Channel runner started.')
self.log.info('Channel runner started.')
await asyncio.sleep(0)
while True:
log.info('starting sniffer')
asniff = AsyncSniffer(iface=self.mon_if,
self.log.info('starting sniffer')
asniff = AsyncSniffer(iface=mon_if,
prn=strainer,
store=False, monitor=True)
asniff.start()
log.info('asniffer started')
self.log.info('asniffer started')
forever_wait = threading.Event()
forever_wait.wait()
class Log:
def __init__(self) -> None:
self.log = logging.getLogger(__name__)
self.file = os.path.abspath('/var/log/ctiger.log')
def get_log(self):
return self.log
def set_log(self):
logging.basicConfig(filename=self.file, level=args.log_level,
format='%(asctime)s %(levelname)s %(message)s')
# create logger
logging.getLogger("scapy.runtime").setLevel(logging.DEBUG)
handler = logging.FileHandler(self.file, mode='a', encoding='utf-8')
logging.getLogger("scapy.runtime").addHandler(handler)
self.log.setLevel(logging.DEBUG)
self.log.info('Started crouching tiger')
self.log.info('Started logger...')
return self.log
# -----------------------------------------------------------------------------
# ███████╗███╗ ██╗██╗███████╗███████╗ ███████╗████████╗ ██████╗ ██████╗
# ██╔════╝████╗ ██║██║██╔════╝██╔════╝ ██╔════╝╚══██╔══╝██╔═══██╗██╔══██╗
@ -615,12 +632,21 @@ class DataFrame:
scan_df (pandas DataFrame): An empty DataFrame object with the specified columns and index.
"""
def __init__(self):
def __init__(self, scan_df):
self.scan_df = scan_df
def get_df(self):
"""
Initializes a new empty DataFrame for storing scan data.
Returns:
pandas.DataFrame: The newly created DataFrame with columns 'BSSID', 'SSID', 'dBm_Signal', 'Channel', and 'Crypto'.
"""
self.scan_df = pd.DataFrame(columns=['BSSID', 'SSID',
'dBm_Signal', 'Channel',
'Crypto'])
self.scan_df.set_index("BSSID", inplace=True)
return scan_df
return self.scan_df
# -------------------------------------------------------------------
@ -659,16 +685,11 @@ def process_args(args: argparse.Namespace) -> None:
log.info('Beginning Mac Purge')
#mon_dev, mon_type, valid_file, channels
#mon_dev, mon_type, valid_file, channels
# local_args = {'interface': args.interface,
# 'mon_type': args.mon_type,
# 'valid_file': args.valid_file,
# 'channels': args.channels}
data = vars(args)
asyncio.run(Purge.mac_purge(**data))
# asyncio.run(Purge.mac_purge(interface=args.interface,
# mon_type=args.mon_type,
# valid_file=args.valid_file,
# channels=args.channels))
pge = Purge(interface=args.name,
mon_type=args.mon_type,
valid_file=args.valid_file,
channels=args.channels)
pge.start_purge()
case "scn":
log.info('Start scanning...')
if not args.save_results:
@ -765,7 +786,7 @@ else:
epilog='Processing will take several seconds, please be patient.\n',
conflict_handler='resolve')
# options parser
ap.add_argument('-i', '--interface', dest='interface',
ap.add_argument('-i', '--interface', dest='name',
default=config['interface'],
help='Interface(s) to scan on')
ap.add_argument('-f', '--file', dest='config_file', default='config_path',