Working on classes

This commit is contained in:
anoduck 2023-10-12 04:57:15 -04:00
parent 61f48ed359
commit 6f61609f80
4 changed files with 652 additions and 125 deletions

213
ctiger.py
View file

@ -11,9 +11,6 @@ import os
import sys
import fcntl
import argparse
from scapy.all import *
from scapy.packet import Packet
from scapy.plist import PacketList
from scapy.sendrecv import sniff
from scapy.sendrecv import sendp
from scapy.layers.dot11 import Dot11Beacon
@ -23,21 +20,19 @@ from scapy.layers.dot11 import RadioTap
from scapy.layers.dot11 import Dot11Deauth
from scapy.layers.dot11 import Dot11FCS
from scapy.layers.eap import EAPOL
import scapy_ex
from art.art import tprint
import multiprocessing as mp
import asyncio
import threading
import threading
from random import choice
from alive_progress import alive_it
from configobj import ConfigObj, validate
from collections import Counter
import pandas as pd
import socket
import signal
import logging
from rich.logging import RichHandler
from time import sleep
from daemonize import Daemonize
sys.path.append(os.path.expanduser('~/.local/lib/python3.11/site-packages'))
@ -50,7 +45,8 @@ config_file = os.path.abspath("/etc/ctiger/config.ini")
pc = Counter()
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
scapy.config.Conf.layers.filter([Dot11, Dot11Beacon, Dot11Elt, RadioTap, Dot11Deauth, Dot11FCS, EAPOL])
# scapy.config.Conf.layers.filter([Dot11, Dot11Beacon, Dot11Elt,
# RadioTap, Dot11Deauth, Dot11FCS, EAPOL])
# ------------------------------------------------------------
# ██████╗███████╗ ██████╗ ███████╗██████╗ ███████╗ ██████╗
@ -277,6 +273,20 @@ def stop_monitor(if_mon):
return False
# ██████╗ ██╗ ██╗██████╗ ██████╗ ███████╗
# ██╔══██╗██║ ██║██╔══██╗██╔════╝ ██╔════╝
# ██████╔╝██║ ██║██████╔╝██║ ███╗█████╗
# ██╔═══╝ ██║ ██║██╔══██╗██║ ██║██╔══╝
# ██║ ╚██████╔╝██║ ██║╚██████╔╝███████╗
# ╚═╝ ╚═════╝ ╚═╝ ╚═╝ ╚═════╝ ╚══════╝
# -------------------------------------------
class Purge:
def __init__(self, mon_dev, mon_type, mac_targ, log):
self.mon_dev = mon_dev
self.mon_if = start_monitor(self.mon_dev, mon_type, mac_targ)
self.scan_df = get_df()
self.log = log
# ------------------------------------------------------------------
# ███████╗████████╗██████╗ █████╗ ██╗███╗ ██╗███████╗██████╗
# ██╔════╝╚══██╔══╝██╔══██╗██╔══██╗██║████╗ ██║██╔════╝██╔══██╗
@ -285,55 +295,37 @@ def stop_monitor(if_mon):
# ███████║ ██║ ██║ ██║██║ ██║██║██║ ╚████║███████╗██║ ██║
# ╚══════╝ ╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝╚═╝ ╚═══╝╚══════╝╚═╝ ╚═╝
# ----------------------------------------------------------------
def strainer(pkt):
if pkt[Dot11].type == 0 and pkt[Dot11].subtype == 4:
bssid = pkt[Dot11FCS].addr2
log.info('BSSID for sieve: ' + str(bssid))
new_pkt = RadioTap()/Dot11(proto=0, type=1, subtype=11,
addr1=bssid,
addr2=macaddr,
ID=65535)
log.debug('Sending CTS frame to ' + str(bssid) + ' with type 11')
sendp(new_pkt, iface=mon_if, verbose=0, count=2)
if pkt[Dot11].type == 1 and pkt[Dot11].subtype == 12:
try:
def strainer(pkt, log):
if pkt[Dot11].type == 0 and pkt[Dot11].subtype == 4:
bssid = pkt[Dot11FCS].addr2
except:
bssid = pkt[Dot11].addr2
dbm_signal = pkt.dBm_AntSignal
if pkt.haslayer(Dot11Beacon):
stats = pkt[Dot11Beacon].network_stats()
channel = stats.get('channel')
else:
channel = extract_channel(pkt[Dot11Elt])
scan_df.loc[bssid] = ['N/A', dbm_signal, channel, 'N/A']
# -----------------------------------------------------------------
# ███████╗██╗ ██╗ ██████╗ ██████╗ ████████╗███████╗██████╗
# ██╔════╝██║ ██║██╔═══██╗██╔═══██╗╚══██╔══╝██╔════╝██╔══██╗
# ███████╗███████║██║ ██║██║ ██║ ██║ █████╗ ██████╔╝
# ╚════██║██╔══██║██║ ██║██║ ██║ ██║ ██╔══╝ ██╔══██╗
# ███████║██║ ██║╚██████╔╝╚██████╔╝ ██║ ███████╗██║ ██║
# ╚══════╝╚═╝ ╚═╝ ╚═════╝ ╚═════╝ ╚═╝ ╚══════╝╚═╝ ╚═╝
# -----------------------------------------------------------------
def shooter(mon_if, bssid):
log.info('Shooter Running')
new_pkt = RadioTap()/Dot11(type=1, subtype=12,
addr1="ff:ff:ff:ff:ff:ff",
addr2=bssid, addr3=bssid)
log.debug('Sending CTS frame to ' + str(bssid) + ' with type 11')
sendp(new_pkt, iface=mon_if, verbose=0, inter=0.1, count=2, realtime=True)
def df_writer(scan_df, valid_file):
while True:
if scan_df.empty:
pass
else:
scan_df.to_csv(valid_file)
print('results written to file ' + valid_file)
log.info('BSSID for sieve: ' + str(bssid))
new_pkt = RadioTap()/Dot11(proto=0, type=1, subtype=11,
addr1=bssid,
addr2=macaddr,
ID=65535)
log.debug('Sending CTS frame to ' + str(bssid) + ' with type 11')
sendp(new_pkt, iface=mon_if, verbose=0, count=1)
if pkt[Dot11].type == 1 and pkt[Dot11].subtype == 12:
try:
bssid = pkt[Dot11FCS].addr2
except:
bssid = pkt[Dot11].addr2
dbm_signal = pkt.dBm_AntSignal
# if pkt.haslayer(Dot11Beacon):
# stats = pkt[Dot11Beacon].network_stats()
# channel = stats.get('channel')
# else:
# channel = extract_channel(pkt[Dot11])
channel = pkt[Dot11].channel() or pkt[RadioTap].Channel
scan_df.loc[bssid] = ['N/A', dbm_signal, channel, 'N/A']
def df_writer(scan_df, valid_file):
while True:
if scan_df.empty:
pass
else:
scan_df.to_csv(valid_file)
print('results written to file ' + valid_file)
# ---------------------------------------------------------------------------
# ██████╗██╗ ██╗ ██████╗ ██╗ ██╗███╗ ██╗███╗ ██╗███████╗██████╗
@ -343,53 +335,26 @@ def df_writer(scan_df, valid_file):
# ╚██████╗██║ ██║ ██║ ██║╚██████╔╝██║ ╚████║██║ ╚████║███████╗██║ ██║
# ╚═════╝╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═══╝╚═╝ ╚═══╝╚══════╝╚═╝ ╚═╝
# ----------------------------------------------------------------------------
def channel_runner(mon_if, channels):
log.info('Channel Runner NG started.')
log.info('Preliminary channel list: ' + str(channels))
log.debug('Preliminary list type is: ' + str(type(channels)))
chanlist = channels.split(',')
chlist = list(set(chanlist))
log.info('Channel list: ' + str(chlist))
chans = [int(chan) for chan in chlist]
thread = threading.current_thread()
print(f'name={thread.name}, daemon={thread.daemon}')
while True:
ichan = choice(chans)
os.system('iw dev ' + mon_if + ' set channel ' + str(ichan))
# log.debug('Channel set to ' + str(ichan))
sleep(14.7)
def signal_handler(signal, frame):
print('You pressed Ctrl+C!')
log.info('Shutting down')
sys.exit(0)
def sieve(pkt):
try:
bssid = pkt[Dot11FCS].addr2
except:
bssid = pkt[Dot11].addr2
if bssid is not None:
log.info('BSSID for sieve: ' + str(bssid))
new_pkt = RadioTap()/Dot11(proto=0, type=1, subtype=11,
addr1=bssid,
addr2=macaddr,
ID=65535)/Dot11FCS()
log.debug('Sending CTS frame to ' + str(bssid) + ' with type 11')
sendp(new_pkt, iface=mon_if, verbose=0, count=1)
# return bssid
def thumper(pkt):
try:
bssid = pkt[Dot11FCS].addr2
except:
bssid = pkt[Dot11].addr2
if bssid is not None:
return bssid
def channel_runner(mon_if, channels, log):
log.info('Channel Runner NG started.')
log.info('Preliminary channel list: ' + str(channels))
log.debug('Preliminary list type is: ' + str(type(channels)))
chanlist = channels.split(',')
chlist = list(set(chanlist))
log.info('Channel list: ' + str(chlist))
chans = [int(chan) for chan in chlist]
thread = threading.current_thread()
print(f'name={thread.name}, daemon={thread.daemon}')
while True:
ichan = choice(chans)
os.system('iw dev ' + mon_if + ' set channel ' + str(ichan))
# log.debug('Channel set to ' + str(ichan))
sleep(14.7)
def signal_handler(signal, frame, log):
print('You pressed Ctrl+C!')
log.info('Shutting down')
sys.exit(0)
# -----------------------------------------------------------------------------
# ███╗ ███╗ █████╗ ██████╗ ██████╗ ██╗ ██╗██████╗ ██████╗ ███████╗
@ -399,26 +364,24 @@ def thumper(pkt):
# ██║ ╚═╝ ██║██║ ██║╚██████╗ ██║ ╚██████╔╝██║ ██║╚██████╔╝███████╗
# ╚═╝ ╚═╝╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝ ╚═════╝ ╚══════╝
# ----------------------------------------------------------------------------
async def mac_purge(mon_dev, mon_type, valid_file, channels, mac_targ):
signal.signal(signal.SIGINT, signal_handler)
print('Enter Ctrl+C TWICE to fully stop the script.')
global mon_if
mon_if = start_monitor(mon_dev, mon_type, mac_targ)
log.info(f'You now have {mon_if}.')
chop = asyncio.to_thread(channel_runner, mon_dev, channels)
chopper = asyncio.create_task(chop)
log.info('Channel runner started.')
await asyncio.sleep(0)
while True:
log.info('starting sniffer')
asniff = AsyncSniffer(iface=mon_if, prn=strainer,
store=False, monitor=True)
asniff.start()
log.info('asniffer started')
nloop = asyncio.get_running_loop()
nloop.add_signal_handler(0, df_writer, scan_df, valid_file)
forever_wait = threading.Event()
forever_wait.wait()
async def mac_purge(self, mon_dev, valid_file, channels, log):
signal.signal(signal.SIGINT, Purge.signal_handler)
print('Enter Ctrl+C TWICE to fully stop the script.')
chop = asyncio.to_thread(Purge.channel_runner, mon_dev, channels)
chopper = asyncio.create_task(chop)
log.info('Channel runner started.')
await asyncio.sleep(0)
while True:
log.info('starting sniffer')
asniff = asyncio.AsyncSniffer(iface=self.mon_if, prn=self.strainer,
store=False, monitor=True)
asniff.start()
log.info('asniffer started')
nloop = asyncio.get_running_loop()
nloop.add_signal_handler(0, self.df_writer,
self.scan_df, valid_file)
forever_wait = threading.Event()
forever_wait.wait()
# -----------------------------------------------------------------------------
@ -695,9 +658,9 @@ def process_args(args: argparse.Namespace, config):
case "mac":
log.info('Beginning Mac Purge')
mac_targ = 'unique'
asyncio.run(mac_purge(args.interface, args.if_type,
args.valid_file, args.channels,
mac_targ))
asyncio.run(Purge.mac_purge(args.interface, args.if_type,
args.valid_file, args.channels,
mac_targ))
case "scn":
log.info('Start scanning...')
if not args.save_results:
@ -836,7 +799,7 @@ else:
mac_parse.add_argument('-c', '--chans', dest='channels',
default=config['MAC_PURGE']['channel_list'],
help='A single or comma seperated list of channels.')
mac_parse.set_defaults(fun=mac_purge)
mac_parse.set_defaults(fun=Purge.mac_purge)
# scn Subcommands
scn_parse = subparse.add_parser('scn', help='Scan for target')

View file

@ -1698,6 +1698,203 @@
"# Close the client socket\n",
"client_socket.close()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Scapy Examples from popular programs (Circa 2018)\n",
"\n",
"Below are examples of popular scapy functions."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 1. Aggr-Inject"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def __init__(self, recv_mac, trans_mac, dst_mac):\n",
" self.rt = RadioTap(len=18, present='Flags+Rate+Channel+dBm_AntSignal+Antenna', notdecoded='\\x00\\x6c' + get_frequency(CHANNEL) + '\\xc0\\x00\\xc0\\x01\\x00\\x00')\n",
" self.dot11hdr = Dot11(type=\"Data\", subtype=DOT11_SUBTYPE_DATA, addr1=recv_mac, addr2=trans_mac, addr3=dst_mac, SC=0x3060, FCfield=0x01)\n",
" self.data = self.rt / self.dot11hdr\n",
" self.recv_mac = recv_mac\n",
" self.trans_mac = trans_mac\n",
" self.dst_mac = dst_mac\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 2. Aggr-Inject"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
" def __init__(self, recv_mac, src_mac, dst_mac, ds=0x01):\n",
" self.rt = RadioTap(len=18, present='Flags+Rate+Channel+dBm_AntSignal+Antenna', notdecoded='\\x00\\x6c' + get_frequency(CHANNEL) + '\\xc0\\x00\\xc0\\x01\\x00\\x00')\n",
" self.dot11hdr = Dot11(type=\"Data\", subtype=DOT11_SUBTYPE_QOS_DATA, addr1=recv_mac, addr2=src_mac, addr3=dst_mac, SC=0x3060, FCfield=ds) / Raw(\"\\x80\\x00\")\n",
" self.data = self.rt / self.dot11hdr\n",
" self.num_subframes = 0\n",
" self.recv_mac = recv_mac\n",
" self.src_mac = src_mac\n",
" self.dst_mac = dst_mac"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 3. Aggr-Inject"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
" def __init__(self, recv_mac, src_mac, dst_mac, ds=0x01):\n",
" self.rt = RadioTap(len=18, present='Flags+Rate+Channel+dBm_AntSignal+Antenna', notdecoded='\\x00\\x6c' + get_frequency(CHANNEL) + '\\xc0\\x00\\xc0\\x01\\x00\\x00')\n",
" self.dot11hdr = Dot11(type=\"Data\", subtype=DOT11_SUBTYPE_QOS_DATA, addr1=recv_mac, addr2=src_mac, addr3=dst_mac, SC=0x3060, FCfield=ds) / Raw(\"\\x00\\x00\")\n",
" self.data = self.rt\n",
" self.num_subframes = 0\n",
" self.recv_mac = recv_mac\n",
" self.src_mac = src_mac\n",
" self.dst_mac = dst_mac\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 4. Pyrit"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# May result in 'anonymous' AP\n",
" self._add_ap(ap_mac, dot11_pckt)\n",
" ap = self.air[ap_mac]\n",
"\n",
" self._add_station(ap, sta_mac)\n",
" sta = ap[sta_mac]\n",
"\n",
" if EAPOL_WPAKey in dot11_pckt:\n",
" wpakey_pckt = dot11_pckt[EAPOL_WPAKey]\n",
" elif EAPOL_RSNKey in dot11_pckt:\n",
" wpakey_pckt = dot11_pckt[EAPOL_RSNKey]\n",
" elif dot11_pckt.isFlagSet('type', 'Data') \\\n",
" and dot11_pckt.haslayer(scapy.layers.dot11.Dot11WEP):\n",
" # An encrypted data packet - maybe useful for CCMP-attack\n",
"\n",
" dot11_wep = str(dot11_pckt[scapy.layers.dot11.Dot11WEP])\n",
" # Ignore packets which has less than len(header + data + signature)\n",
" if len(dot11_wep) < 8 + 6 + 8:\n",
" return\n",
"\n",
" # Ignore packets with high CCMP-counter. A high CCMP-counter\n",
" # means that we missed a lot of packets since the last\n",
" # authentication which also means a whole new authentication\n",
" # might already have happened.\n",
" ccmp_counter = (dot11_wep[0:2] + dot11_wep[4:8])[::-1]\n",
" if int(binascii.hexlify(ccmp_counter), 16) < 30:\n",
" self._add_ccmppckt(sta, pckt)\n",
" return\n",
" else:\n",
" return\n",
"\n",
" # Frame 1: pairwise set, install unset, ack set, mic unset\n",
" # results in ANonce\n",
" if wpakey_pckt.areFlagsSet('KeyInfo', ('pairwise', 'ack')) \\\n",
" and wpakey_pckt.areFlagsNotSet('KeyInfo', ('install', 'mic')):\n",
" self._add_keypckt(sta, 0, pckt)\n",
" return\n",
"\n",
" # Frame 2: pairwise set, install unset, ack unset, mic set,\n",
" # SNonce != 0. Results in SNonce, MIC and keymic_frame\n",
" elif wpakey_pckt.areFlagsSet('KeyInfo', ('pairwise', 'mic')) \\\n",
" and wpakey_pckt.areFlagsNotSet('KeyInfo', ('install', 'ack')) \\\n",
" and not all(c == '\\x00' for c in wpakey_pckt.Nonce):\n",
" self._add_keypckt(sta, 1, pckt)\n",
" return\n",
"\n",
" # Frame 3: pairwise set, install set, ack set, mic set\n",
" # Results in ANonce\n",
" elif wpakey_pckt.areFlagsSet('KeyInfo', ('pairwise', 'install', \\\n",
" 'ack', 'mic')):\n",
" self._add_keypckt(sta, 2, pckt)\n",
" return\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 5. Aggr-Inject"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def ssid_packet():\n",
" ap_mac = '00:00:00:00:00:00'\n",
" rt = RadioTap(len=18, present='Flags+Rate+Channel+dBm_AntSignal+Antenna', notdecoded='\\x00\\x6c' + get_frequency(CHANNEL) + '\\xc0\\x00\\xc0\\x01\\x00\\x00')\n",
" beacon_packet = Dot11(subtype=8, addr1='ff:ff:ff:ff:ff:ff', addr2=ap_mac, addr3=ap_mac) \\\n",
" / Dot11Beacon(cap=0x2105) \\\n",
" / Dot11Elt(ID='SSID', info=\"injected SSID\") \\\n",
" / Dot11Elt(ID='Rates', info=AP_RATES) \\\n",
" / Dot11Elt(ID='DSset', info=chr(CHANNEL))\n",
"\n",
" # Update sequence number\n",
" beacon_packet.SC = 0x3060\n",
"\n",
" # Update timestamp\n",
" beacon_packet[Dot11Beacon].timestamp = time.time()\n",
"\n",
" mpdu_len = len(beacon_packet) + 4\n",
"\n",
" if mpdu_len % 4 != 0:\n",
" padding = \"\\x00\" * (4 - (mpdu_len % 4)) # Align to 4 octets\n",
" else:\n",
" padding = \"\"\n",
" mpdu_len <<= 4\n",
" crc_fun = crcmod.mkCrcFun(0b100000111, rev=True, initCrc=0x00, xorOut=0xFF)\n",
"\n",
" crc = crc_fun(struct.pack('<H', mpdu_len))\n",
" maccrc = dot11crc(str(beacon_packet))\n",
" delim_sig = 0x4E\n",
"\n",
" #print('a-mpdu: len %d crc %02x delim %02x' % (mpdu_len >> 4, crc, delim_sig))\n",
" #hexdump(maccrc)\n",
" ampdu_header = struct.pack('<HBB', mpdu_len, crc, delim_sig)\n",
" #hexdump(ampdu_header)\n",
"\n",
" data = ampdu_header / beacon_packet / maccrc / padding\n",
" data /= \"\\x00\\x00\\x20\\x4e\" * 8\n",
" data = str(data)\n",
"\n",
" return data"
]
}
],
"metadata": {

36
printer.py Normal file
View file

@ -0,0 +1,36 @@
"""
Define a singleton that handles basic output
"""
import sys
import traceback
class Printer:
"""A class for printing messages that respects verbosity levels"""
verbose_level = 0
@staticmethod
def verbose(message, verbose_level=1):
"""Print a message only if it is within an acceptabe verbosity level"""
if Printer.verbose_level >= verbose_level:
sys.stdout.write(message)
sys.stdout.write('\n')
@staticmethod
def write(message):
"""Write a message to stdout"""
sys.stdout.write(message)
sys.stdout.write('\n')
@staticmethod
def error(message):
"""Write a message to stderr"""
sys.stderr.write(message)
sys.stderr.write('\n')
@staticmethod
def exception(e):
"""Write a summary of an exception with a stack trace"""
Printer.error(repr(e))
traceback.print_exc(file=sys.stderr)

331
scapy_ex.py Normal file
View file

@ -0,0 +1,331 @@
"""
A set of additions and modifications to scapy to assist in parsing dot11
"""
import scapy
from scapy.fields import BitField
from scapy.fields import ByteField
from scapy.fields import ConditionalField
from scapy.fields import EnumField
from scapy.fields import Field
from scapy.fields import FieldLenField
from scapy.fields import FieldListField
from scapy.fields import FlagsField
from scapy.fields import LEFieldLenField
from scapy.fields import LELongField
from scapy.fields import LEShortField
from scapy.fields import StrFixedLenField
from scapy.layers.dot11 import Dot11Elt
from scapy.layers.dot11 import Dot11ProbeReq
from scapy.packet import Packet
from printer import Printer
class SignedByteField(Field):
"""Fields for a signed byte"""
def __init__(self, name, default):
Field.__init__(self, name, default, '<b')
class LESignedShortField(Field):
"""Field for a little-endian short"""
def __init__(self, name, default):
Field.__init__(self, name, default, '<h')
def scapy_packet_Packet_hasflag(self, field_name, value):
"""Is the specified flag value set in the named field"""
field, val = self.getfield_and_val(field_name)
if isinstance(field, EnumField):
if val not in field.i2s:
return False
return field.i2s[val] == value
else:
return (1 << field.names.index([value])) & self.__getattr__(field_name) != 0
scapy.packet.Packet.hasflag = scapy_packet_Packet_hasflag
del scapy_packet_Packet_hasflag
def scapy_fields_FieldListField_i2repr(self, pkt, x):
"""Return a list with the representation of contained fields"""
return repr([self.field.i2repr(pkt, v) for v in x])
FieldListField.i2repr = scapy_fields_FieldListField_i2repr
del scapy_fields_FieldListField_i2repr
class ChannelFromMhzField(LEShortField):
"""A little-endian short field that converts from mhz to channel"""
def m2i(self, pkt, x):
return min(14, max(1, (x - 2407) / 5))
class PresentFlagField(ConditionalField):
"""Utility field for use by RadioTap"""
def __init__(self, field, flag_name):
ConditionalField.__init__(self, field, lambda pkt: pkt.hasflag('present', flag_name))
# TODO(ivanlei): This fields_desc does not cover chained present flags decode will fail in this cases
scapy.layers.dot11.RadioTap.name = '802.11 RadioTap'
# Greatly improved fields_desc for RadioTap which parses known present flags
scapy.layers.dot11.RadioTap.fields_desc = [
ByteField('version', 0),
ByteField('pad', 0),
LEShortField('RadioTap_len', 0),
FlagsField('present', None, -32, ['TSFT','Flags','Rate','Channel','FHSS','dBm_AntSignal',
'dBm_AntNoise','Lock_Quality','TX_Attenuation','dB_TX_Attenuation',
'dBm_TX_Power', 'Antenna', 'dB_AntSignal', 'dB_AntNoise',
'b14', 'b15','b16','b17','b18','b19','b20','b21','b22','b23',
'b24','b25','b26','b27','b28','b29','b30','Ext']),
PresentFlagField(LELongField('TSFT', 0), 'TSFT'),
PresentFlagField(ByteField('Flags', 0), 'Flags'),
PresentFlagField(ByteField('Rate', 0), 'Rate'),
PresentFlagField(ChannelFromMhzField('Channel', 0), 'Channel'),
PresentFlagField(LEShortField('Channel_flags', 0), 'Channel'),
PresentFlagField(ByteField('FHSS_hop_set', 0), 'FHSS'),
PresentFlagField(ByteField('FHSS_hop_pattern', 0), 'FHSS'),
PresentFlagField(SignedByteField('dBm_AntSignal', 0), 'dBm_AntSignal'),
PresentFlagField(SignedByteField('dBm_AntNoise', 0), 'dBm_AntNoise'),
PresentFlagField(LEShortField('Lock_Quality', 0), 'Lock_Quality'),
PresentFlagField(LEShortField('TX_Attenuation', 0), 'TX_Attenuation'),
PresentFlagField(LEShortField('db_TX_Attenuation', 0), 'dB_TX_Attenuation'),
PresentFlagField(SignedByteField('dBm_TX_Power', 0), 'dBm_TX_Power'),
PresentFlagField(ByteField('Antenna', 0), 'Antenna'),
PresentFlagField(ByteField('dB_AntSignal', 0), 'dB_AntSignal'),
PresentFlagField(ByteField('dB_AntNoise', 0), 'dB_AntNoise'),
PresentFlagField(LEShortField('RX_Flags', 0), 'b14')
]
def scapy_layers_dot11_RadioTap_extract_padding(self, s):
"""Ignore any unparsed conditionally present fields
If all fields have been parsed, the payload length should have decreased RadioTap_len bytes
If it has not, there are unparsed fields which should be treated as padding
"""
padding = len(s) - (self.pre_dissect_len - self.RadioTap_len)
if padding:
return s[padding:], s[:padding]
else:
return s, None
scapy.layers.dot11.RadioTap.extract_padding = scapy_layers_dot11_RadioTap_extract_padding
del scapy_layers_dot11_RadioTap_extract_padding
def scapy_layers_dot11_RadioTap_pre_dissect(self, s):
"""Cache to total payload length prior to dissection for use in finding padding latter"""
self.pre_dissect_len = len(s)
return s
scapy.layers.dot11.RadioTap.pre_dissect = scapy_layers_dot11_RadioTap_pre_dissect
del scapy_layers_dot11_RadioTap_pre_dissect
class Dot11EltRates(Packet):
"""The rates member contains an array of supported rates"""
name = '802.11 Rates Information Element'
# Known rates come from table in 6.5.5.2 of the 802.11 spec
known_rates = {
2 : 1,
3 : 1.5,
4 : 2,
5 : 2.5,
6 : 3,
9 : 4.5,
11 : 5.5,
12 : 6,
18 : 9,
22 : 11,
24 : 12,
27 : 13.5,
36 : 18,
44 : 22,
48 : 24,
54 : 27,
66 : 33,
72 : 36,
96 : 48,
108 : 54
}
fields_desc = [
ByteField('ID', 0),
FieldLenField("len", None, "info", "B"),
FieldListField('supported_rates', None, ByteField('', 0), count_from=lambda pkt: pkt.len),
]
def post_dissection(self, pkt):
self.rates = []
for supported_rate in self.supported_rates:
# check the msb for each rate
rate_msb = supported_rate & 0x80
rate_value = supported_rate & 0x7F
if rate_msb:
# a value of 127 means HT PHY feature is required to join the BSS
if 127 != rate_value:
self.rates.append(rate_value/2)
elif rate_value in Dot11EltRates.known_rates:
self.rates.append(Dot11EltRates.known_rates[rate_value])
class Dot11EltExtendedRates(Dot11EltRates):
"""The rates member contains an additional array of supported rates"""
name = '802.11 Extended Rates Information Element'
class Dot11EltRSN(Packet):
"""The enc, cipher, and auth members contain the decoded 'security' details"""
name = '802.11 RSN Information Element'
cipher_suites = { '\x00\x0f\xac\x00': 'GROUP',
'\x00\x0f\xac\x01': 'WEP',
'\x00\x0f\xac\x02': 'TKIP',
'\x00\x0f\xac\x04': 'CCMP',
'\x00\x0f\xac\x05': 'WEP' }
auth_suites = { '\x00\x0f\xac\x01': 'MGT',
'\x00\x0f\xac\x02': 'PSK' }
fields_desc = [
ByteField('ID', 0),
FieldLenField("len", None, "info", "B"),
LEShortField('version', 1),
StrFixedLenField('group_cipher_suite', '', length=4),
LEFieldLenField('pairwise_cipher_suite_count', 1, count_of='pairwise_cipher_suite'),
FieldListField('pairwise_cipher_suite', None, StrFixedLenField('','', length=4), count_from=lambda pkt: pkt.pairwise_cipher_suite_count),
LEFieldLenField('auth_cipher_suite_count', 1, count_of='auth_cipher_suite'),
FieldListField('auth_cipher_suite', None, StrFixedLenField('','',length=4), count_from=lambda pkt: pkt.auth_cipher_suite_count),
BitField('rsn_cap_pre_auth', 0, 1),
BitField('rsn_cap_no_pairwise', 0, 1),
BitField('rsn_cap_ptksa_replay_counter', 0, 2),
BitField('rsn_cap_gtksa_replay_counter', 0, 2),
BitField('rsn_cap_mgmt_frame_protect_required', 0, 1),
BitField('rsn_cap_mgmt_frame_protect_capable', 0, 1),
BitField('rsn_cap_reserved_1', 0, 1),
BitField('rsn_cap_peer_key_enabled', 0, 1),
BitField('rsn_cap_reserved_2', 0, 6),
]
def post_dissection(self, pkt):
"""Parse cipher suites to determine encryption, cipher, and authentication methods"""
self.enc = 'WPA2' # Everything is assumed to be WPA
self.cipher = ''
self.auth = ''
ciphers = [self.cipher_suites.get(pairwise_cipher) for pairwise_cipher in self.getfieldval('pairwise_cipher_suite')]
if 'GROUP' in ciphers:
ciphers = [self.cipher_suites.get(group_cipher, '') for group_cipher in self.getfieldval('group_cipher_suite')]
for cipher in ['CCMP', 'TKIP', 'WEP']:
if cipher in ciphers:
self.cipher = cipher
break
if 'WEP' == self.cipher:
self.enc = 'WEP'
for auth_cipher in self.getfieldval('auth_cipher_suite'):
self.auth = self.auth_suites.get(auth_cipher, '')
break
def scapy_layers_dot11_Dot11_elts(self):
"""An iterator of Dot11Elt"""
dot11elt = self.getlayer(Dot11Elt)
while dot11elt and dot11elt.haslayer(Dot11Elt):
yield dot11elt
dot11elt = dot11elt.payload
scapy.layers.dot11.Dot11.elts = scapy_layers_dot11_Dot11_elts
del scapy_layers_dot11_Dot11_elts
def scapy_layers_dot11_Dot11_find_elt_by_id(self, id):
"""Iterate over elt and return the first with a specific ID"""
for elt in self.elts():
if elt.ID == id:
return elt
return None
scapy.layers.dot11.Dot11.find_elt_by_id = scapy_layers_dot11_Dot11_find_elt_by_id
del scapy_layers_dot11_Dot11_find_elt_by_id
def scapy_layers_dot11_Dot11_essid(self):
"""Return the payload of the SSID Dot11Elt if it exists"""
elt = self.find_elt_by_id(0)
return elt.info if elt else None
scapy.layers.dot11.Dot11.essid = scapy_layers_dot11_Dot11_essid
del scapy_layers_dot11_Dot11_essid
def scapy_layers_dot11_Dot11_rates(self, id=1):
"""Return the payload of the rates Dot11Elt if it exists"""
elt = self.find_elt_by_id(id)
if elt:
try:
return Dot11EltRates(str(elt)).rates
except (exception, e):
Printer.error('Bad Dot11EltRates got[{0:s}]'.format(elt.info))
Printer.exception(e)
return []
scapy.layers.dot11.Dot11.rates = scapy_layers_dot11_Dot11_rates
del scapy_layers_dot11_Dot11_rates
def scapy_layers_dot11_Dot11_extended_rates(self):
"""Return the payload of the extended rates Dot11Elt if it exists"""
return scapy.layers.dot11.Dot11.rates(self, 50)
scapy.layers.dot11.Dot11.extended_rates = scapy_layers_dot11_Dot11_extended_rates
del scapy_layers_dot11_Dot11_extended_rates
def scapy_layers_dot11_Dot11_sta_bssid(self):
"""Return the bssid for a station associated with the packet"""
if self.haslayer(Dot11ProbeReq) or self.hasflag('FCfield', 'to-DS'):
return self.addr2
else:
return self.addr1
scapy.layers.dot11.Dot11.sta_bssid = scapy_layers_dot11_Dot11_sta_bssid
del scapy_layers_dot11_Dot11_sta_bssid
def scapy_layers_dot11_Dot11_ap_bssid(self):
"""Return the bssid for a access point associated with the packet"""
if self.haslayer(Dot11ProbeReq) or self.hasflag('FCfield', 'to-DS'):
return self.addr1
else:
return self.addr2
scapy.layers.dot11.Dot11.ap_bssid = scapy_layers_dot11_Dot11_ap_bssid
del scapy_layers_dot11_Dot11_ap_bssid
def scapy_layers_dot11_Dot11_channel(self):
"""Return the payload of the channel Dot11Elt if it exists"""
elt = self.find_elt_by_id(3)
if elt:
try:
return int(ord(elt.info))
except (exception, e):
Printer.error('Bad Dot11Elt channel got[{0:s}]'.format(elt.info))
Printer.exception(e)
return None
scapy.layers.dot11.Dot11.channel = scapy_layers_dot11_Dot11_channel
del scapy_layers_dot11_Dot11_channel
def scapy_layers_dot11_Dot11_rsn(self):
"""Return the payload of the RSN Dot11Elt as a Dot11EltRSN"""
elt = self.find_elt_by_id(48)
if elt:
try:
return Dot11EltRSN(str(elt))
except (exception, e):
Printer.error('Bad Dot11EltRSN got[{0:s}]'.format(elt.info))
Printer.exception(e)
return None
scapy.layers.dot11.Dot11.rsn = scapy_layers_dot11_Dot11_rsn
del scapy_layers_dot11_Dot11_rsn