top of page

FROM PROMPT TO PACKET

  • Writer: Moudiongui Martin
    Moudiongui Martin
  • Jul 14
  • 6 min read

Updated: Aug 11

ree

Holla amigos!


Today we're going to dive into how an MCP server works and see how to connect it to TShark for network‑capture analysis.


The goal: let an LLM ask in natural language, “How many HTTP packets are in sample.pcap?” and get the answer instantly.


THEORY SECTION


CONCEPT 1 : MCP in a Nutshell


MCP (Message Control Protocol) is a text protocol with three objectives:


  1. Encapsulate each message cleanly so no line is lost or overflows.

  2. Tag messages with key/value pairs (e.g., _id, content‑type) to keep context.

  3. Simplify life for both humans and machines: an MCP message stays readable in tcpdump while being trivial to parse in Python, Go, or Rust.


In short, MCP strikes the ideal balance between the minimalism of raw TCP streams and the overkill of full HTTP when you only need to exchange small structured packets.


CONCEPT 2 : What Is an MCP Server?


It’s the receiving program that:

  • opens a TCP port (default: 2950);

  • decodes every MCP frame;

  • triggers business tools based on the incoming content;

  • returns, when needed, a response also formatted as MCP.


In our project, the business tool is TShark: we want an LLM to drive TShark to analyze network captures.


CONCEPT 3 : Why TShark?

“It’s a bit like eavesdropping on everything computers say to each other on a network.”

TShark is the command‑line version of Wireshark:


  • Live mode → we sniff packets in real time;

  • Offline mode → we read an existing PCAP file.


In this tutorial, we’ll start simple: read a PCAP file, filter a protocol, then count the matching packets.


PRACTICAL SECTION


1. Hardware & Prerequisites

Item

Recommended version

Required

Why?

OS

Debian 12 / Parrot 6.x

NO

Stable and security‑oriented (optional : windows , other linux distro )

Python

≥ 3.11

YES

asyncio + strong typing

Wireshark / TShark

≥ 4.2

YES

Faster display filters

Quick Install

sudo apt update
sudo apt install -y python3 python3-venv wireshark tshark

2. Project Structure

├── mcp-agent
│   ├── fastagent.config.yaml
│   ├── fastagent.secrets.yaml
│   └── venv-agent
└── mcp-srv-tshark
    ├── pcaps/
    ├── server.py
    ├── tools.py
    └── venv-mcp-server
Two folders, two roles: mcp-agent is the brain (LLM + FastAPI + OpenAI keys).tshark-mcp is the hand that executes system commands.

Section 1: Setting Up the mcp‑agent


🎯 Goal


In this first section, we’ll set up an LLM agent based on FastAgent MCP that can both generate and interpret MCP requests.


We’ll use ChatGPT as an example by generating an API key. To learn how to create your own API key, see the following tutorial ----> [TUTORIAL].


📁 Create the Directory

mkdir -p tuto-mcp-agent/mcp-agent
mkdir -p tuto-mcp-agent/tshark-mcp
cd tuto-mcp-agent

🐍 Python Virtual Environment

Create a virtual environment to isolate dependencies:

cd mcp-agent
python -m venv env-agent
source env-agent/bin/activate
💡 Prefer uv (faster)? Go ahead!

📦 Install Dependencies


Install the fast-agent-mcp library:

pip install fast-agent-mcp fastapi

If you have a requirements.txt:

pip install -r requirements.txt

🛠️ Configuration Files


Prepare two files:


  • fastagent.config.yaml: agent configuration.

  • fastagent.secrets.yaml: stores API keys (OpenAI, etc.).


Example: fastagent.config.yaml

# MCP server definition 
mcp:
  servers:
    tshark_mcp:
      transport: "http"              
      url: "http://localhost:5001/mcp"
      read_transport_sse_timeout_seconds: 120  # optionnel


# Agent definition 
agents:
  netcli:
    model: gpt-4o-mini
    servers: [tshark_mcp]           
    system: |
      You’re a network expert. Always use the “tshark_query” tool to analyze offline PCAP files.

providers:                       
  openai:
    reasoning_effort: "medium"   

Example: fastagent.secrets.yaml

openai : 
  api_key: "sk-..."

Section 2: Setting Up the MCP Server


🎯 Goal


Here, we’ll create a minimalist MCP server that:


  1. Receives MCP requests via HTTP (FastAPI);

  2. Calls TShark to analyze a PCAP file;

  3. Returns a structured result.


📁 Create the Directory

mkdir mcp-srv-tshark
cd mcp-srv-tshark

📁 Folder for pcaps

mkdir pcaps

Put all the .pcap files to be analyzed here.


🐍 Python Virtual Environment

python -m venv env-mcp
source env-mcp/bin/activate

📦 Install Dependencies

pip install fastapi pydantic uvicorn

Or, from a requirements.txt:

pip install -r requirements.txt

🧠 Create the Necessary Files

Create the server entry point and define tools we will use :

touch server.py
touch tools.py

File: Tools.py


BLOC 1: TOOL_DEFS


Define the interface of tshark_query:




Parameter

Type

Description

file

str

Path to the PCAP file (required)

filter

str

Wireshark filter (e.g., http)

limit

int

Maximum number of packets to return

BLOC 2: call_tool


The `call_tool` function:


  1. checks the requested tool name;

  2. validates the arguments;

  3. builds the appropriate tshark command;

  4. runs it asynchronously;

  5. parses the JSON output and returns a summary.


# ----------------- 1) Import standard libraries
import subprocess, json, os, asyncio, shlex
from pathlib import Path
# Build absolute path to the sibling “pcaps/” directory (project_root/pcaps)
PCAP_ROOT = (Path(__file__).resolve().parent / "pcaps").as_posix()
# ----------------- 2) Security helper – only allow reading pcaps inside PCAP_ROOT
def _safe_path(fname: str) -> str:
    path = os.path.abspath(os.path.join(PCAP_ROOT, fname))
    # Block directory-traversal attacks (“../secret.txt”)
    if not path.startswith(PCAP_ROOT):
        raise ValueError("Chemin PCAP interdit")  # ← path outside sandbox
    # Ensure the file actually exists
    if not os.path.isfile(path):
        raise FileNotFoundError(path)
    return path
# ----------------- 3) Interface & JSON-Schema for our tool: tshark_query
TOOL_DEFS = [{
    "name": "tshark_query",
    "description": "Execute TShark on a pcap file offline",
    "inputSchema": {
        "type": "object",
        "properties": {
            "file":   { "type": "string",  "description": "Name of the pcap file (inside pcaps/)" },
            "filter": { "type": "string",  "description": "TShark display filter (-Y)",           "default": "" },
            # --- schema definition ---
            "limit":  { "type": "integer", "description": "Max packets (0 = no limit)",            "default": 0 }
        },
        "required": ["file"]
    }
}]
# ----------------- 4) Implementation: validate args & dispatch to TShark
async def call_tool(name: str, args: dict):
    # --- 4a) Accept only the single declared tool for now
    if name != "tshark_query":
        raise ValueError(f"Tool {name} non géré")
    file  = _safe_path(args["file"])
    flt   = args.get("filter", "")
    # --- 4b) Packet cap; default 0 = unlimited
    limit = int(args.get("limit", 0))
    # --- 4c) Build the TShark command line
    cmd = ["tshark", "-r", file, "-T", "json"]
    if limit > 0:                      # add “-c” only when a cap is requested
        cmd += ["-c", str(limit)]
    if flt:                            # add “-Y” only when a filter is provided
        cmd += ["-Y", flt]
    # --- 4d) Launch TShark asynchronously and capture I/O
    proc = await asyncio.create_subprocess_exec(
        *cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    # --- 4e) Wait for completion and decode byte streams
    out_b, err_b = await proc.communicate()
    out = out_b.decode("utf-8", "replace")
    err = err_b.decode("utf-8", "replace")
    # --- 4f) Propagate error if TShark exited non-zero
    if proc.returncode != 0:
        raise RuntimeError(err.strip() or "TShark error")
    # --- 4g) Parse JSON output and return a short summary
    frames  = json.loads(out)
    summary = f"{len(frames)} paquets renvoyés pour filtre '{flt}'"
    return summary

File: Server.py


BLOC 1 : mcp_entry function


mcp_entry is the front door of your MCP JSON-RPC server:

  1. Reads the JSON request body.

  2. Determines if it’s a single message or a batch.

  3. Delegates each message to handle.

  4. Returns either one response or a list of responses, which FastAPI ships back as JSON.


BLOC 2 : Handle function


  • mcp_entry (previous function) decides batch vs. single and funnels each message here.

  • handle inspects "method" and:

    • Performs the appropriate action,

    • Packages the outcome into a spec-compliant response,

    • Guarantees the "jsonrpc" version and "id" are echoed

    • Supplies a clear error when the method is unrecognized.


BLOC 3 : Main


  • Entry-point guard – if name == "__main__": triggers only when the file is executed directly, preventing the server from auto-starting when the module is merely imported elsewhere.

  • Imports Uvicorn lazily – pulls in the lightweight ASGI server only when it’s about to be used, keeping top-level imports minimal.

  • Boots the event-loop server – uvicorn.run(app, host="0.0.0.0", port=5001) spins up Uvicorn, mounts your FastAPI app, and begins listening on all network interfaces at port 5001, so the whole script becomes a self-contained web service ready to receive MCP requests.


# ----------------- 1) Import library 
from fastapi import FastAPI, Request
from pydantic import BaseModel
import uuid, json               # std-lib modules
# Lazy import of uvicorn is done in the __main__ section
from tools import TOOL_DEFS, call_tool   # imported from tools.py
app = FastAPI(title="TShark MCP Server")
# ----------------- 2) Specify protocol for JSON-RPC 
PROTOCOL = "2025-03-26"
# ----------------- 3) Handle HTTP POST requests to the MCP endpoint
@app.post("/mcp")
async def mcp_entry(request: Request):
    payload = await request.json()
    # Detect whether the client sent a batch (list) or a single message
    if isinstance(payload, list):
        return [await handle(msg) for msg in payload]
    return await handle(payload)  # Single message → forward to handle()
# ----------------- 4) handle(msg) builds the proper JSON-RPC response
async def handle(msg):
# --- 4a) Handshake: client asks what the server can do 
    if msg["method"] == "initialize":
        return {
            "jsonrpc": "2.0",
            "id": msg["id"],
            "result": {
                "protocolVersion": PROTOCOL,
                "capabilities": { "tools": { "listChanged": False } },
                "serverInfo": { "name": "SK-Tshark-Mcp", "version": "0.1.0" }
            }
        }
# --- 4b) Client requests the catalogue of available tools 
    if msg["method"] == "tools/list":
        return {
            "jsonrpc": "2.0",
            "id": msg["id"],
            "result": { "tools": TOOL_DEFS }  # defined in tools.py
        
# --- 4c) Client calls a specific tool 
    if msg["method"] == "tools/call":
        name   = msg["params"]["name"]
        args   = msg["params"].get("arguments", {})
        result = await call_tool(name, args)
# Wrap the tool’s raw result in the JSON-RPC success envelope
        return {
            "jsonrpc": "2.0",
            "id": msg["id"],
            "result": { "content": [{ "type": "text", "text": result }],
                        "isError": False }
        }
# --- 4d) Unknown method → standard JSON-RPC error 
    return {
        "jsonrpc": "2.0",
        "id": msg.get("id"),
        "error": {
            "code": -32601,                       # Method not found
            "message": f"Unknown method {msg['method']}"
        }
    }
# ----------------- 5) Main section: launch the ASGI server 
if __name__ == "__main__":
    import uvicorn    # Lazy import: only load Uvicorn when actually starting the app
    uvicorn.run(app, host="0.0.0.0", port=5001)

🚀 How to Launch Your MCP Server and AI Agent


launch server

ree

Launch AI Agent

ree

✅ Expected Result

At the end of this tutorial, we’ll be able to ask the LLM something like:


ree

Mission accomplished! 🚀

Comments


bottom of page