The Hunter's Toolkit: An Introduction to Steganalysis
Published on

Detecting hidden data in files
Table of Contents
- What is Steganalysis?
- How Steganalysis Works: The Science Behind Detection
- History and Evolution of Steganalysis
- Types of Steganographic Attacks Steganalysis Can Detect
- Core Steganalysis Techniques
- Advanced Steganalysis Techniques
- Steganalysis Tools and Software
- Step-by-Step Steganalysis Tutorial
- Programming Your Own Steganalysis Tools
- Real-World Applications and Case Studies
- Countermeasures and Evasion Techniques
- Frequently Asked Questions
- References
What is Steganalysis?
Steganalysis is the art and science of detecting, extracting, and analyzing hidden information within digital files. While steganography conceals data to make its existence undetectable, steganalysis serves as the digital detective work that uncovers these hidden secrets.
Key Definitions
- Stego-object: A file containing hidden data
- Cover-object: The original file before data embedding
- Payload: The hidden message or data
- Carrier: The medium used to hide data (image, audio, video, text)
Why Steganalysis Matters
In today’s digital landscape, steganalysis plays a crucial role in:
- Cybersecurity: Detecting data exfiltration and covert communications
- Digital Forensics: Uncovering evidence in criminal investigations
- Corporate Security: Preventing intellectual property theft
- National Security: Identifying terrorist communications and espionage
- Academic Research: Advancing steganography and security fields
How Steganalysis Works: The Science Behind Detection
Steganalysis operates on the fundamental principle that hiding data in a file inevitably leaves traces—no matter how subtle. These traces manifest as:
1. Statistical Anomalies
Hidden data disrupts the natural statistical properties of files, creating detectable patterns.
2. Structural Changes
Embedding processes alter file structures in ways that can be identified through careful analysis.
3. Visual Artifacts
In images, hidden data may cause barely perceptible changes in color, texture, or noise patterns.
4. Signature Presence
Hidden files often retain their original file signatures, which can be detected within carrier files.
History and Evolution of Steganalysis
Ancient Origins (500 BC - 1900 AD)
- Ancient Greece: Detecting messages written on wooden tablets covered with wax
- Medieval Period: Analysis of invisible inks and physical concealment methods
- World Wars: Military intelligence techniques for detecting hidden communications
Digital Era Birth (1990s)
- 1996: First digital steganalysis research papers published
- 1998: Development of chi-square attack for LSB detection
- 1999: Introduction of RS (Regular-Singular) analysis
Modern Advancement (2000s-2010s)
- 2001: Weighted stego-image (WS) analysis
- 2005: Ensemble classifiers for steganalysis
- 2010: Machine learning integration begins
AI Revolution (2010s-Present)
- 2012: Deep learning applications in steganalysis
- 2015: Convolutional Neural Networks for image steganalysis
- 2020: Adversarial machine learning techniques
- 2024: Quantum-resistant steganalysis methods
Types of Steganographic Attacks Steganalysis Can Detect
1. Spatial Domain Steganography
- LSB (Least Significant Bit) replacement
- LSB matching techniques
- Pixel value differencing (PVD)
2. Transform Domain Steganography
- DCT (Discrete Cosine Transform) based methods
- DWT (Discrete Wavelet Transform) techniques
- DFT (Discrete Fourier Transform) methods
3. Adaptive Steganography
- HUGO (Highly Undetectable steGO)
- WOW (Wavelet Obtained Weights)
- S-UNIWARD (Spatial Universal Wavelet Relative Distortion)
4. Model-Based Steganography
- BPCS (Bit-Plane Complexity Segmentation)
- YASS (Yet Another Steganographic Scheme)
Core Steganalysis Techniques
Visual Analysis Methods
Visual analysis involves examining files for visual indicators of hidden data. This technique is particularly effective for image files.
Histogram Analysis
Histograms reveal pixel value distributions that may indicate LSB embedding:
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
def analyze_histogram(image_path):
"""Analyze image histogram for LSB steganography indicators"""
img = Image.open(image_path)
img_array = np.array(img)
# Calculate histogram
hist, bins = np.histogram(img_array.flatten(), bins=256, range=[0,256])
# Plot histogram
plt.figure(figsize=(12, 4))
plt.bar(range(256), hist)
plt.title('Image Histogram Analysis')
plt.xlabel('Pixel Value')
plt.ylabel('Frequency')
# Check for LSB embedding indicators
odd_sum = sum(hist[1::2]) # Odd values
even_sum = sum(hist[0::2]) # Even values
ratio = odd_sum / even_sum if even_sum > 0 else 0
print(f"Odd/Even pixel ratio: {ratio:.3f}")
if abs(ratio - 1.0) > 0.1:
print("⚠️ Potential LSB steganography detected!")
else:
print("✅ No obvious LSB steganography detected")
plt.show()
return ratio
# Usage example
# analyze_histogram('suspicious_image.png')
LSB Plane Analysis
Examining the least significant bit plane can reveal hidden patterns:
import numpy as np
from PIL import Image
def extract_lsb_plane(image_path, bit_plane=0):
"""Extract and visualize specific bit plane"""
img = Image.open(image_path).convert('RGB')
img_array = np.array(img)
# Extract specific bit plane
bit_plane_data = (img_array >> bit_plane) & 1
# Convert to visible image (0 or 255)
lsb_image = bit_plane_data * 255
# Display LSB plane
lsb_img = Image.fromarray(lsb_image.astype(np.uint8))
return lsb_img
def compare_bit_planes(image_path):
"""Compare multiple bit planes for analysis"""
fig, axes = plt.subplots(2, 4, figsize=(16, 8))
for i in range(8):
lsb_img = extract_lsb_plane(image_path, i)
row = i // 4
col = i % 4
axes[row, col].imshow(np.array(lsb_img), cmap='gray')
axes[row, col].set_title(f'Bit Plane {i}')
axes[row, col].axis('off')
plt.tight_layout()
plt.show()
# Usage
# compare_bit_planes('test_image.png')
Statistical Detection Techniques
Statistical methods form the backbone of classical steganalysis, detecting anomalies in file properties.
Chi-Square Test Implementation
The chi-square test detects LSB embedding by analyzing pixel value pair frequencies:
import numpy as np
from scipy.stats import chi2
def chi_square_test(image_path, confidence_level=0.95):
"""
Perform chi-square test for LSB steganography detection
"""
img = Image.open(image_path).convert('L') # Convert to grayscale
pixels = np.array(img).flatten()
# Count pairs of values (2i, 2i+1)
pairs = []
for i in range(128): # 0-254, step 2
even_count = np.sum(pixels == 2*i)
odd_count = np.sum(pixels == 2*i + 1)
pairs.append((even_count, odd_count))
# Calculate chi-square statistic
chi_square_stat = 0
degrees_of_freedom = 0
for even, odd in pairs:
total = even + odd
if total > 0:
expected = total / 2
chi_square_stat += ((even - expected) ** 2 + (odd - expected) ** 2) / expected
degrees_of_freedom += 1
# Calculate p-value
p_value = 1 - chi2.cdf(chi_square_stat, degrees_of_freedom)
# Determine if steganography is present
alpha = 1 - confidence_level
is_stego = p_value < alpha
results = {
'chi_square_statistic': chi_square_stat,
'degrees_of_freedom': degrees_of_freedom,
'p_value': p_value,
'confidence_level': confidence_level,
'steganography_detected': is_stego
}
return results
def display_chi_square_results(results):
"""Display chi-square test results"""
print("Chi-Square Test Results")
print("=" * 30)
print(f"Chi-square statistic: {results['chi_square_statistic']:.3f}")
print(f"Degrees of freedom: {results['degrees_of_freedom']}")
print(f"P-value: {results['p_value']:.6f}")
print(f"Confidence level: {results['confidence_level']*100}%")
print()
if results['steganography_detected']:
print("🚨 STEGANOGRAPHY DETECTED!")
print("The image likely contains hidden data.")
else:
print("✅ No steganography detected")
print("The image appears clean.")
# Usage example
# results = chi_square_test('suspicious_image.png')
# display_chi_square_results(results)
RS (Regular-Singular) Analysis
RS analysis is more sophisticated than chi-square, detecting various embedding methods:
import numpy as np
def rs_analysis(image_path):
"""
Perform RS (Regular-Singular) analysis for steganalysis
"""
img = Image.open(image_path).convert('L')
pixels = np.array(img)
height, width = pixels.shape
def get_mask_groups():
"""Generate different mask patterns for RS analysis"""
masks = []
# Horizontal mask
mask1 = np.array([[1, 0], [1, 0]])
# Vertical mask
mask2 = np.array([[1, 1], [0, 0]])
# Diagonal mask
mask3 = np.array([[1, 0], [0, 1]])
# Anti-diagonal mask
mask4 = np.array([[0, 1], [1, 0]])
return [mask1, mask2, mask3, mask4]
def calculate_rs_values(pixels, mask):
"""Calculate R and S values for given mask"""
regular_count = 0
singular_count = 0
# Process image in 2x2 blocks
for i in range(0, height-1, 2):
for j in range(0, width-1, 2):
block = pixels[i:i+2, j:j+2]
# Apply mask and calculate variations
masked_block = block * mask
variation = np.sum(np.abs(np.diff(masked_block.flatten())))
# Classify as regular or singular
if variation % 2 == 0:
regular_count += 1
else:
singular_count += 1
return regular_count, singular_count
# Calculate RS values for different masks
masks = get_mask_groups()
rs_results = []
for i, mask in enumerate(masks):
regular, singular = calculate_rs_values(pixels, mask)
total = regular + singular
rs_ratio = regular / total if total > 0 else 0
rs_results.append({
'mask_id': i+1,
'regular': regular,
'singular': singular,
'ratio': rs_ratio
})
# Analyze results
avg_ratio = np.mean([result['ratio'] for result in rs_results])
std_ratio = np.std([result['ratio'] for result in rs_results])
# Detection threshold (empirically determined)
threshold = 0.1
is_stego = std_ratio > threshold
return {
'rs_results': rs_results,
'average_ratio': avg_ratio,
'ratio_std': std_ratio,
'steganography_detected': is_stego,
'confidence': min(std_ratio * 10, 1.0) # Confidence score
}
# Usage
# rs_results = rs_analysis('test_image.png')
# print(f"RS Analysis Results: {'STEGO' if rs_results['steganography_detected'] else 'CLEAN'}")
Signature-Based Detection
Signature analysis detects embedded files by searching for known file headers and structures:
import binascii
import re
class FileSignatureAnalyzer:
def __init__(self):
self.signatures = {
'ZIP': [b'\x50\x4B\x03\x04', b'\x50\x4B\x05\x06'],
'PDF': [b'\x25\x50\x44\x46'],
'JPEG': [b'\xFF\xD8\xFF'],
'PNG': [b'\x89\x50\x4E\x47\x0D\x0A\x1A\x0A'],
'GIF': [b'\x47\x49\x46\x38'],
'RAR': [b'\x52\x61\x72\x21\x1A\x07'],
'MP3': [b'\xFF\xFB', b'\x49\x44\x33'],
'AVI': [b'\x52\x49\x46\x46'],
'MP4': [b'\x66\x74\x79\x70'],
'EXE': [b'\x4D\x5A'],
'DOC': [b'\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1'],
'XLS': [b'\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1'],
'PPT': [b'\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1']
}
def scan_file(self, file_path):
"""Scan file for embedded file signatures"""
detected_signatures = []
with open(file_path, 'rb') as f:
data = f.read()
for file_type, signatures in self.signatures.items():
for signature in signatures:
positions = self._find_signature_positions(data, signature)
if positions:
for pos in positions:
detected_signatures.append({
'file_type': file_type,
'signature': signature.hex(),
'position': pos,
'ascii_signature': self._bytes_to_ascii(signature)
})
return detected_signatures
def _find_signature_positions(self, data, signature):
"""Find all positions of a signature in data"""
positions = []
start = 0
while True:
pos = data.find(signature, start)
if pos == -1:
break
positions.append(pos)
start = pos + 1
return positions
def _bytes_to_ascii(self, byte_sequence):
"""Convert bytes to ASCII representation"""
ascii_chars = []
for byte in byte_sequence:
if 32 <= byte <= 126: # Printable ASCII
ascii_chars.append(chr(byte))
else:
ascii_chars.append('.')
return ''.join(ascii_chars)
def extract_embedded_file(self, file_path, position, output_path):
"""Extract embedded file starting at position"""
try:
with open(file_path, 'rb') as f:
f.seek(position)
embedded_data = f.read()
with open(output_path, 'wb') as f:
f.write(embedded_data)
return True
except Exception as e:
print(f"Error extracting file: {e}")
return False
def perform_signature_analysis(file_path):
"""Perform comprehensive signature analysis"""
analyzer = FileSignatureAnalyzer()
signatures = analyzer.scan_file(file_path)
print(f"Signature Analysis Results for: {file_path}")
print("=" * 50)
if signatures:
print(f"Found {len(signatures)} embedded file signature(s):")
print()
for i, sig in enumerate(signatures, 1):
print(f"{i}. File Type: {sig['file_type']}")
print(f" Position: {sig['position']} (0x{sig['position']:08X})")
print(f" Signature: {sig['signature']}")
print(f" ASCII: '{sig['ascii_signature']}'")
print()
return signatures
else:
print("No embedded file signatures detected.")
return []
# Usage example
# signatures = perform_signature_analysis('suspicious_file.png')
Structural Analysis
Structural analysis examines file format compliance and metadata anomalies:
import struct
from PIL import Image
from PIL.ExifTags import TAGS
import json
class StructuralAnalyzer:
def __init__(self):
pass
def analyze_png_structure(self, png_path):
"""Analyze PNG file structure for anomalies"""
anomalies = []
chunks = []
with open(png_path, 'rb') as f:
# Verify PNG signature
signature = f.read(8)
if signature != b'\x89\x50\x4E\x47\x0D\x0A\x1A\x0A':
anomalies.append("Invalid PNG signature")
return {'anomalies': anomalies, 'chunks': []}
while True:
# Read chunk length
length_data = f.read(4)
if len(length_data) < 4:
break
length = struct.unpack('>I', length_data)[0]
# Read chunk type
chunk_type = f.read(4)
if len(chunk_type) < 4:
break
# Read chunk data
chunk_data = f.read(length)
if len(chunk_data) < length:
break
# Read CRC
crc = f.read(4)
if len(crc) < 4:
break
chunk_info = {
'type': chunk_type.decode('ascii', errors='ignore'),
'length': length,
'position': f.tell() - length - 12,
'crc': struct.unpack('>I', crc)[0]
}
chunks.append(chunk_info)
# Check for suspicious chunks
if chunk_type not in [b'IHDR', b'PLTE', b'IDAT', b'IEND',
b'tRNS', b'gAMA', b'cHRM', b'sRGB',
b'tEXt', b'zTXt', b'iTXt']:
anomalies.append(f"Suspicious chunk type: {chunk_type}")
# Check for oversized chunks
if length > 1000000: # 1MB threshold
anomalies.append(f"Oversized chunk: {chunk_type} ({length} bytes)")
return {'anomalies': anomalies, 'chunks': chunks}
def analyze_jpeg_structure(self, jpeg_path):
"""Analyze JPEG file structure"""
anomalies = []
segments = []
with open(jpeg_path, 'rb') as f:
# Check JPEG signature
if f.read(2) != b'\xFF\xD8':
anomalies.append("Invalid JPEG signature")
return {'anomalies': anomalies, 'segments': []}
while True:
# Look for segment markers
marker_data = f.read(2)
if len(marker_data) < 2:
break
if marker_data[0] != 0xFF:
continue
marker = marker_data[1]
# Handle different marker types
if marker in [0xD8, 0xD9]: # SOI, EOI
segment_length = 0
elif marker in range(0xD0, 0xD8): # RST markers
segment_length = 0
else:
# Read length
length_data = f.read(2)
if len(length_data) < 2:
break
segment_length = struct.unpack('>H', length_data)[0] - 2
segment_info = {
'marker': f"0xFF{marker:02X}",
'length': segment_length,
'position': f.tell() - 4 if segment_length > 0 else f.tell() - 2
}
segments.append(segment_info)
# Skip segment data
if segment_length > 0:
f.read(segment_length)
# Check for end of image
if marker == 0xD9:
break
# Check for suspicious large segments
if segment_length > 65535:
anomalies.append(f"Oversized segment: {segment_info['marker']}")
return {'anomalies': anomalies, 'segments': segments}
def analyze_metadata(self, image_path):
"""Analyze image metadata for anomalies"""
metadata_anomalies = []
metadata_info = {}
try:
img = Image.open(image_path)
# Extract EXIF data
if hasattr(img, '_getexif'):
exifdata = img.getexif()
if exifdata:
for tag_id, value in exifdata.items():
tag = TAGS.get(tag_id, tag_id)
metadata_info[tag] = value
# Check for suspicious metadata
if isinstance(value, bytes) and len(value) > 1000:
metadata_anomalies.append(f"Large binary data in {tag}")
if tag == "UserComment" and len(str(value)) > 100:
metadata_anomalies.append("Suspicious user comment length")
# Check file size vs expected size
img_array = np.array(img)
expected_size = img_array.size * img_array.itemsize
actual_size = os.path.getsize(image_path)
size_ratio = actual_size / expected_size
if size_ratio > 1.5:
metadata_anomalies.append(f"File size anomaly: {size_ratio:.2f}x expected")
except Exception as e:
metadata_anomalies.append(f"Metadata analysis error: {e}")
return {
'anomalies': metadata_anomalies,
'metadata': metadata_info
}
def comprehensive_structural_analysis(file_path):
"""Perform comprehensive structural analysis"""
analyzer = StructuralAnalyzer()
results = {}
print(f"Structural Analysis of: {file_path}")
print("=" * 50)
# Determine file type and analyze accordingly
file_ext = file_path.lower().split('.')[-1]
if file_ext == 'png':
results['png'] = analyzer.analyze_png_structure(file_path)
print("PNG Structure Analysis:")
if results['png']['anomalies']:
for anomaly in results['png']['anomalies']:
print(f" ⚠️ {anomaly}")
else:
print(" ✅ No structural anomalies detected")
elif file_ext in ['jpg', 'jpeg']:
results['jpeg'] = analyzer.analyze_jpeg_structure(file_path)
print("JPEG Structure Analysis:")
if results['jpeg']['anomalies']:
for anomaly in results['jpeg']['anomalies']:
print(f" ⚠️ {anomaly}")
else:
print(" ✅ No structural anomalies detected")
# Metadata analysis for all image types
results['metadata'] = analyzer.analyze_metadata(file_path)
print("\nMetadata Analysis:")
if results['metadata']['anomalies']:
for anomaly in results['metadata']['anomalies']:
print(f" ⚠️ {anomaly}")
else:
print(" ✅ No metadata anomalies detected")
return results
# Usage
# results = comprehensive_structural_analysis('suspicious_image.png')
Advanced Steganalysis Techniques
Machine Learning and AI in Steganalysis
Modern steganalysis heavily relies on machine learning to detect sophisticated steganographic methods:
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, classification_report
from sklearn.model_selection import train_test_split
import cv2
class MLSteganalyzer:
def __init__(self):
self.models = {
'random_forest': RandomForestClassifier(n_estimators=100, random_state=42),
'svm': SVC(kernel='rbf', random_state=42)
}
self.trained_models = {}
def extract_features(self, image_path):
"""Extract comprehensive features for ML analysis"""
img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
if img is None:
return None
features = []
# Statistical features
features.extend([
np.mean(img),
np.std(img),
np.var(img),
np.min(img),
np.max(img)
])
# Histogram features
hist = cv2.calcHist([img], [0], None, [256], [0, 256])
hist_features = hist.flatten()[:50] # First 50 histogram bins
features.extend(hist_features)
# Texture features (Local Binary Pattern)
lbp_features = self._calculate_lbp_features(img)
features.extend(lbp_features)
# Edge features
edges = cv2.Canny(img, 50, 150)
edge_features = [
np.mean(edges),
np.std(edges),
np.sum(edges > 0) / edges.size # Edge density
]
features.extend(edge_features)
# DCT features
dct_features = self._calculate_dct_features(img)
features.extend(dct_features)
return np.array(features)
def _calculate_lbp_features(self, img, radius=1, n_points=8):
"""Calculate Local Binary Pattern features"""
from skimage.feature import local_binary_pattern
lbp = local_binary_pattern(img, n_points, radius, method='uniform')
hist, _ = np.histogram(lbp.ravel(), bins=n_points + 2,
range=(0, n_points + 2), density=True)
return hist.tolist()
def _calculate_dct_features(self, img, block_size=8):
"""Calculate DCT-based features"""
h, w = img.shape
dct_coeffs = []
# Process image in blocks
for i in range(0, h - block_size + 1, block_size):
for j in range(0, w - block_size + 1, block_size):
block = img[i:i+block_size, j:j+block_size]
dct_block = cv2.dct(np.float32(block))
# Extract low-frequency coefficients
dct_coeffs.extend(dct_block[:4, :4].flatten())
# Statistical summary of DCT coefficients
dct_array = np.array(dct_coeffs)
return [
np.mean(dct_array),
np.std(dct_array),
np.median(dct_array),
np.percentile(dct_array, 25),
np.percentile(dct_array, 75)
]
def prepare_training_data(self, clean_images, stego_images):
"""Prepare training dataset"""
X = []
y = []
# Process clean images
print("Processing clean images...")
for img_path in clean_images:
features = self.extract_features(img_path)
if features is not None:
X.append(features)
y.append(0) # Clean = 0
# Process stego images
print("Processing stego images...")
for img_path in stego_images:
features = self.extract_features(img_path)
if features is not None:
X.append(features)
y.append(1) # Stego = 1
return np.array(X), np.array(y)
def train(self, X, y, model_name=None):
"""Train the specified model(s) using the prepared data"""
if model_name is None:
for name in self.models:
self._train_single(X, y, name)
else:
self._train_single(X, y, model_name)
def _train_single(self, X, y, name):
"""Train a single model and evaluate it"""
if name not in self.models:
raise ValueError(f"Unknown model: {name}")
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
model = self.models[name]
model.fit(X_train, y_train)
self.trained_models[name] = model
pred = model.predict(X_test)
print(f"Model {name}:")
print(f"Accuracy: {accuracy_score(y_test, pred)}")
print(classification_report(y_test, pred))
def predict(self, image_path, model_name='random_forest'):
"""Predict if an image contains steganographic content using the specified model"""
features = self.extract_features(image_path)
if features is None:
return None
model = self.trained_models.get(model_name)
if model is None:
raise ValueError(f"Model {model_name} not trained")
prediction = model.predict(features.reshape(1, -1))[0]
return 'Stego' if prediction == 1 else 'Clean'
Deep Learning Approaches
Deep learning has revolutionized steganalysis by automatically learning complex patterns:
import tensorflow as tf
from tensorflow import keras
import numpy as np
class CNNSteganalyzer:
def __init__(self, input_shape=(256, 256, 1)):
self.input_shape = input_shape
self.model = None
self.build_model()
def build_model(self):
"""Build CNN architecture for steganalysis"""
model = keras.Sequential([
# Preprocessing layers
keras.layers.Input(shape=self.input_shape),
keras.layers.Lambda(lambda x: x / 255.0), # Normalize
# Convolutional blocks
keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same'),
keras.layers.BatchNormalization(),
keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same'),
keras.layers.MaxPooling2D((2, 2)),
keras.layers.Dropout(0.25),
keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
keras.layers.BatchNormalization(),
keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
keras.layers.MaxPooling2D((2, 2)),
keras.layers.Dropout(0.25),
keras.layers.Conv2D(128, (3, 3), activation='relu', padding='same'),
keras.layers.BatchNormalization(),
keras.layers.Conv2D(128, (3, 3), activation='relu', padding='same'),
keras.layers.MaxPooling2D((2, 2)),
keras.layers.Dropout(0.25),
# Dense layers
keras.layers.GlobalAveragePooling2D(),
keras.layers.Dense(512, activation='relu'),
keras.layers.Dropout(0.5),
keras.layers.Dense(256, activation='relu'),
keras.layers.Dropout(0.5),
# Output layer
keras.layers.Dense(1, activation='sigmoid')
])
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy', 'precision', 'recall']
)
self.model = model
return model
def preprocess_image(self, image_path, target_size=(256, 256)):
"""Preprocess image for CNN input"""
img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
if img is None:
return None
# Resize image
img_resized = cv2.resize(img, target_size)
# Add channel dimension
img_expanded = np.expand_dims(img_resized, axis=-1)
return img_expanded
def train(self, train_images, train_labels, validation_split=0.2, epochs=50):
"""Train the CNN model"""
# Prepare training data
X_train = []
for img_path in train_images:
processed_img = self.preprocess_image(img_path)
if processed_img is not None:
X_train.append(processed_img)
X_train = np.array(X_train)
y_train = np.array(train_labels)
# Define callbacks
callbacks = [
keras.callbacks.EarlyStopping(
monitor='val_loss', patience=10, restore_best_weights=True
),
keras.callbacks.ReduceLROnPlateau(
monitor='val_loss', factor=0.5, patience=5
)
]
# Train model
history = self.model.fit(
X_train, y_train,
validation_split=validation_split,
epochs=epochs,
batch_size=32,
callbacks=callbacks,
verbose=1
)
return history
def predict(self, image_path):
"""Predict steganalysis result"""
processed_img = self.preprocess_image(image_path)
if processed_img is None:
return None
# Add batch dimension
img_batch = np.expand_dims(processed_img, axis=0)
# Make prediction
prediction = self.model.predict(img_batch)[0][0]
result = {
'prediction': 'STEGO' if prediction > 0.5 else 'CLEAN',
'confidence': prediction if prediction > 0.5 else 1 - prediction,
'raw_score': float(prediction)
}
return result
def evaluate_model(self, test_images, test_labels):
"""Evaluate model performance"""
X_test = []
for img_path in test_images:
processed_img = self.preprocess_image(img_path)
if processed_img is not None:
X_test.append(processed_img)
X_test = np.array(X_test)
y_test = np.array(test_labels)
# Evaluate
results = self.model.evaluate(X_test, y_test, verbose=0)
return {
'loss': results[0],
'accuracy': results[1],
'precision': results[2],
'recall': results[3]
}
# Advanced CNN architecture for steganalysis
class SRNetSteganalyzer:
"""Implementation of SRNet architecture for steganalysis"""
def __init__(self, input_shape=(512, 512, 1)):
self.input_shape = input_shape
self.model = None
self.build_srnet()
def build_srnet(self):
"""Build SRNet architecture"""
inputs = keras.layers.Input(shape=self.input_shape)
# Preprocessing layer
x = keras.layers.Lambda(lambda x: x / 255.0)(inputs)
# Layer 1: High-pass filters
x = self._high_pass_layer(x)
# Layers 2-12: Convolutional layers
for i in range(11):
filters = min(16 * (2 ** (i // 2)), 512)
x = keras.layers.Conv2D(filters, (3, 3), padding='same')(x)
x = keras.layers.BatchNormalization()(x)
x = keras.layers.ReLU()(x)
if i % 2 == 1: # Every second layer
x = keras.layers.AveragePooling2D((2, 2))(x)
# Global average pooling
x = keras.layers.GlobalAveragePooling2D()(x)
# Fully connected layers
x = keras.layers.Dense(512, activation='relu')(x)
x = keras.layers.Dropout(0.5)(x)
x = keras.layers.Dense(256, activation='relu')(x)
x = keras.layers.Dropout(0.5)(x)
# Output layer
outputs = keras.layers.Dense(1, activation='sigmoid')(x)
self.model = keras.Model(inputs, outputs)
self.model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss='binary_crossentropy',
metrics=['accuracy', 'precision', 'recall']
)
return self.model
def _high_pass_layer(self, x):
"""High-pass filter layer for noise residual extraction"""
# Define high-pass filter kernels
kernel1 = tf.constant([
[0, 0, 0, 0, 0],
[0, -1, 2, -1, 0],
[0, 2, -4, 2, 0],
[0, -1, 2, -1, 0],
[0, 0, 0, 0, 0]
], dtype=tf.float32)
kernel1 = tf.reshape(kernel1, (5, 5, 1, 1))
# Apply high-pass filter
filtered = tf.nn.conv2d(x, kernel1, strides=[1, 1, 1, 1], padding='SAME')
# Truncation and normalization
filtered = tf.clip_by_value(filtered, -3, 3)
return filtered
# Usage example for deep learning
"""
# Initialize CNN steganalyzer
cnn_analyzer = CNNSteganalyzer()
# Prepare training data
train_images = ['train1.png', 'train2.png', ...]
train_labels = [0, 1, ...] # 0 for clean, 1 for stego
# Train model
history = cnn_analyzer.train(train_images, train_labels, epochs=100)
# Make prediction
result = cnn_analyzer.predict('test_image.png')
print(f"CNN Prediction: {result['prediction']} (confidence: {result['confidence']:.3f})")
# Initialize SRNet
srnet_analyzer = SRNetSteganalyzer()
# Similar training and prediction process
"""
Multimedia Steganalysis
Advanced steganalysis extends beyond images to audio and video files:
import librosa
import scipy.io.wavfile as wav
import matplotlib.pyplot as plt
from scipy import signal
import cv2
class AudioSteganalyzer:
def __init__(self):
pass
def analyze_audio_lsb(self, audio_path):
"""Analyze audio for LSB steganography"""
try:
# Load audio file
sample_rate, audio_data = wav.read(audio_path)
if len(audio_data.shape) > 1:
audio_data = audio_data[:, 0] # Use first channel
# Extract LSBs
lsb_sequence = audio_data & 1
# Statistical analysis of LSB sequence
lsb_entropy = self._calculate_entropy(lsb_sequence)
lsb_mean = np.mean(lsb_sequence)
# Chi-square test on LSB sequence
chi_square_stat = self._chi_square_audio(lsb_sequence)
results = {
'sample_rate': sample_rate,
'duration': len(audio_data) / sample_rate,
'lsb_entropy': lsb_entropy,
'lsb_mean': lsb_mean,
'chi_square': chi_square_stat,
'steganography_detected': lsb_entropy > 0.9 and abs(lsb_mean - 0.5) < 0.1
}
return results
except Exception as e:
return {'error': str(e)}
def analyze_spectral_anomalies(self, audio_path):
"""Analyze audio spectral properties for hidden data"""
try:
# Load audio with librosa
y, sr = librosa.load(audio_path)
# Compute spectral features
spectral_centroid = librosa.feature.spectral_centroid(y=y, sr=sr)[0]
spectral_bandwidth = librosa.feature.spectral_bandwidth(y=y, sr=sr)[0]
spectral_rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr)[0]
zero_crossing_rate = librosa.feature.zero_crossing_rate(y)[0]
# Compute STFT for detailed analysis
D = librosa.stft(y)
magnitude = np.abs(D)
# Analyze high-frequency content
high_freq_energy = np.mean(magnitude[-magnitude.shape[0]//4:, :])
total_energy = np.mean(magnitude)
high_freq_ratio = high_freq_energy / total_energy
# Detection based on spectral anomalies
anomaly_score = 0
if np.std(spectral_centroid) > np.mean(spectral_centroid) * 0.5:
anomaly_score += 1
if high_freq_ratio > 0.3:
anomaly_score += 1
if np.std(zero_crossing_rate) > np.mean(zero_crossing_rate) * 0.8:
anomaly_score += 1
results = {
'spectral_centroid_mean': np.mean(spectral_centroid),
'spectral_centroid_std': np.std(spectral_centroid),
'high_freq_ratio': high_freq_ratio,
'anomaly_score': anomaly_score,
'steganography_detected': anomaly_score >= 2
}
return results
except Exception as e:
return {'error': str(e)}
def detect_echo_hiding(self, audio_path):
"""Detect echo hiding steganography"""
try:
sample_rate, audio_data = wav.read(audio_path)
if len(audio_data.shape) > 1:
audio_data = audio_data[:, 0]
# Autocorrelation analysis
autocorr = np.correlate(audio_data, audio_data, mode='full')
autocorr = autocorr[autocorr.size // 2:]
# Look for periodic patterns indicating echo
peaks, _ = signal.find_peaks(autocorr[1000:], height=np.max(autocorr) * 0.1)
# Calculate echo detection metrics
if len(peaks) > 0:
echo_detected = True
echo_delay = peaks[0] + 1000 # Samples
echo_strength = autocorr[echo_delay] / np.max(autocorr)
else:
echo_detected = False
echo_delay = 0
echo_strength = 0
results = {
'echo_detected': echo_detected,
'echo_delay_samples': echo_delay,
'echo_delay_ms': (echo_delay / sample_rate) * 1000,
'echo_strength': echo_strength,
'steganography_detected': echo_detected and echo_strength > 0.2
}
return results
except Exception as e:
return {'error': str(e)}
def _calculate_entropy(self, data):
"""Calculate entropy of data sequence"""
unique, counts = np.unique(data, return_counts=True)
probabilities = counts / len(data)
entropy = -np.sum(probabilities * np.log2(probabilities + 1e-10))
return entropy
def _chi_square_audio(self, lsb_sequence):
"""Chi-square test for audio LSB sequence"""
zeros = np.sum(lsb_sequence == 0)
ones = np.sum(lsb_sequence == 1)
total = len(lsb_sequence)
expected = total / 2
chi_square = ((zeros - expected) ** 2 + (ones - expected) ** 2) / expected
return chi_square
class VideoSteganalyzer:
def __init__(self):
pass
def analyze_video_frames(self, video_path, sample_frames=100):
"""Analyze video frames for steganographic anomalies"""
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return {'error': 'Cannot open video file'}
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
# Sample frames uniformly
frame_indices = np.linspace(0, total_frames - 1,
min(sample_frames, total_frames), dtype=int)
frame_anomalies = []
temporal_inconsistencies = []
prev_frame = None
for i, frame_idx in enumerate(frame_indices):
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if not ret:
continue
# Convert to grayscale
gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# Analyze frame for anomalies
frame_analysis = self._analyze_single_frame(gray_frame)
frame_anomalies.append(frame_analysis)
# Temporal consistency analysis
if prev_frame is not None:
temporal_diff = cv2.absdiff(gray_frame, prev_frame)
temporal_score = np.mean(temporal_diff)
temporal_inconsistencies.append(temporal_score)
prev_frame = gray_frame
cap.release()
# Aggregate results
avg_anomaly_score = np.mean([f['anomaly_score'] for f in frame_anomalies])
temporal_variance = np.var(temporal_inconsistencies)
results = {
'total_frames': total_frames,
'fps': fps,
'analyzed_frames': len(frame_anomalies),
'average_anomaly_score': avg_anomaly_score,
'temporal_variance': temporal_variance,
'frame_anomalies': frame_anomalies[:10], # First 10 for brevity
'steganography_detected': avg_anomaly_score > 0.3 or temporal_variance > 1000
}
return results
def _analyze_single_frame(self, frame):
"""Analyze single video frame for anomalies"""
# Statistical properties
mean_val = np.mean(frame)
std_val = np.std(frame)
# Histogram analysis
hist = cv2.calcHist([frame], [0], None, [256], [0, 256])
hist_entropy = self._calculate_entropy(hist.flatten())
# Edge analysis
edges = cv2.Canny(frame, 50, 150)
edge_density = np.sum(edges > 0) / edges.size
# Texture analysis using local binary patterns
lbp_var = self._calculate_lbp_variance(frame)
# Anomaly scoring
anomaly_score = 0
if std_val < 10: # Very low variation
anomaly_score += 0.2
if hist_entropy < 6: # Low entropy
anomaly_score += 0.3
if edge_density < 0.05: # Very few edges
anomaly_score += 0.2
if lbp_var > 50: # High texture variation
anomaly_score += 0.3
return {
'mean': float(mean_val),
'std': float(std_val),
'histogram_entropy': float(hist_entropy),
'edge_density': float(edge_density),
'lbp_variance': float(lbp_var),
'anomaly_score': anomaly_score
}
def _calculate_lbp_variance(self, frame):
"""Calculate Local Binary Pattern variance"""
# Simplified LBP calculation
kernel = np.array([[1, 1, 1], [1, -8, 1], [1, 1, 1]])
lbp = cv2.filter2D(frame, cv2.CV_64F, kernel)
return np.var(lbp)
def _calculate_entropy(self, data):
"""Calculate entropy of data"""
data = data[data > 0] # Remove zeros
probabilities = data / np.sum(data)
entropy = -np.sum(probabilities * np.log2(probabilities + 1e-10))
return entropy
# Usage examples for multimedia steganalysis
"""
# Audio steganalysis
audio_analyzer = AudioSteganalyzer()
lsb_results = audio_analyzer.analyze_audio_lsb('suspicious_audio.wav')
spectral_results = audio_analyzer.analyze_spectral_anomalies('suspicious_audio.wav')
echo_results = audio_analyzer.detect_echo_hiding('suspicious_audio.wav')
print(f"Audio LSB Analysis: {'STEGO' if lsb_results['steganography_detected'] else 'CLEAN'}")
print(f"Spectral Analysis: {'STEGO' if spectral_results['steganography_detected'] else 'CLEAN'}")
print(f"Echo Analysis: {'STEGO' if echo_results['steganography_detected'] else 'CLEAN'}")
# Video steganalysis
video_analyzer = VideoSteganalyzer()
video_results = video_analyzer.analyze_video_frames('suspicious_video.mp4')
print(f"Video Analysis: {'STEGO' if video_results['steganography_detected'] else 'CLEAN'}")
"""
Steganalysis Tools and Software
Command-Line Tools
Here’s a comprehensive overview of essential steganalysis tools:
Tool Name | Platform | Media Types | Key Features | Installation |
---|---|---|---|---|
zsteg | Linux/macOS | PNG, BMP | LSB detection, zlib analysis | gem install zsteg |
StegSolve | Cross-platform | Images | Visual analysis, bit plane extraction | Download JAR file |
binwalk | Linux/macOS | All files | Signature detection, file carving | apt-get install binwalk |
foremost | Linux | All files | File recovery, signature analysis | apt-get install foremost |
strings | Unix/Linux | All files | Text string extraction | Built-in |
exiftool | Cross-platform | Images/Videos | Metadata analysis | apt-get install exiftool |
Aletheia | Linux/Python | Images | ML-based detection | pip install aletheia |
StegExpose | Java | Images | Batch processing | Compile from source |
Professional and Research Tools
Tool Name | Type | Specialization | Cost |
---|---|---|---|
StegoVeritas | Open Source | Multi-format analysis | Free |
Stegdetect | Open Source | JPEG analysis | Free |
Virtual Steganographic Laboratory | Commercial | Advanced research | Licensed |
WinHex | Commercial | Binary analysis | $$ |
EnCase | Commercial | Digital forensics | $$ |
Online Steganalysis Tools
For quick analysis without installation:
- StegOnline - Web-based StegSolve alternative
- Aperisolve - Automated steganalysis platform
- StegCracker - Online password cracking for steganography
Step-by-Step Steganalysis Tutorial
Let’s walk through a complete steganalysis workflow:
Phase 1: Initial File Assessment
# Step 1: Basic file information
file suspicious_image.png
ls -la suspicious_image.png
md5sum suspicious_image.png
# Step 2: String analysis
strings suspicious_image.png | head -20
strings suspicious_image.png | grep -i "password\|key\|secret"
# Step 3: Metadata examination
exiftool suspicious_image.png
Phase 2: Visual Analysis
# Load and display image
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np
def visual_steganalysis_workflow(image_path):
"""Complete visual steganalysis workflow"""
# Load image
img = Image.open(image_path)
img_array = np.array(img)
# Create subplot for multiple visualizations
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
# Original image
axes[0, 0].imshow(img_array)
axes[0, 0].set_title('Original Image')
axes[0, 0].axis('off')
# Histogram
if len(img_array.shape) == 3:
colors = ['red', 'green', 'blue']
for i, color in enumerate(colors):
hist, bins = np.histogram(img_array[:,:,i], bins=256, range=[0,256])
axes[0, 1].plot(bins[:-1], hist, color=color, alpha=0.7)
else:
hist, bins = np.histogram(img_array, bins=256, range=[0,256])
axes[0, 1].plot(bins[:-1], hist, color='black')
axes[0, 1].set_title('Histogram')
axes[0, 1].set_xlabel('Pixel Value')
axes[0, 1].set_ylabel('Frequency')
# LSB plane (if RGB)
if len(img_array.shape) == 3:
lsb_plane = (img_array[:,:,0] & 1) * 255
axes[0, 2].imshow(lsb_plane, cmap='gray')
axes[0, 2].set_title('LSB Plane (Red Channel)')
else:
lsb_plane = (img_array & 1) * 255
axes[0, 2].imshow(lsb_plane, cmap='gray')
axes[0, 2].set_title('LSB Plane')
axes[0, 2].axis('off')
# Difference from original (enhanced view)
if len(img_array.shape) == 3:
gray_img = np.mean(img_array, axis=2)
else:
gray_img = img_array
# Edge detection
from scipy import ndimage
edges = ndimage.sobel(gray_img)
axes[1, 0].imshow(edges, cmap='gray')
axes[1, 0].set_title('Edge Detection')
axes[1, 0].axis('off')
# Noise analysis
noise = gray_img - ndimage.gaussian_filter(gray_img, sigma=1)
axes[1, 1].imshow(noise, cmap='gray')
axes[1, 1].set_title('Noise Analysis')
axes[1, 1].axis('off')
# Statistical plot
pixel_diffs = np.diff(gray_img.flatten())
axes[1, 2].hist(pixel_diffs, bins=50, alpha=0.7)
axes[1, 2].set_title('Pixel Difference Distribution')
axes[1, 2].set_xlabel('Difference Value')
axes[1, 2].set_ylabel('Frequency')
plt.tight_layout()
plt.savefig('steganalysis_visual_report.png', dpi=300, bbox_inches='tight')
plt.show()
return {
'image_shape': img_array.shape,
'histogram_data': hist,
'lsb_analysis': np.mean(lsb_plane),
'edge_density': np.sum(edges > np.mean(edges)) / edges.size,
'noise_variance': np.var(noise)
}
# Run visual analysis
# results = visual_steganalysis_workflow('suspicious_image.png')
Phase 3: Statistical Analysis
def comprehensive_statistical_analysis(image_path):
"""Perform comprehensive statistical steganalysis"""
results = {}
# Chi-square test
chi_results = chi_square_test(image_path)
results['chi_square'] = chi_results
# RS analysis
rs_results = rs_analysis(image_path)
results['rs_analysis'] = rs_results
# Sample pairs analysis
sp_results = sample_pairs_analysis(image_path)
results['sample_pairs'] = sp_results
# Weighted stego analysis
ws_results = weighted_stego_analysis(image_path)
results['weighted_stego'] = ws_results
return results
def sample_pairs_analysis(image_path):
"""Sample Pairs Analysis for steganalysis"""
img = Image.open(image_path).convert('L')
pixels = np.array(img).flatten()
# Create sample pairs
pairs = []
for i in range(0, len(pixels)-1, 2):
pairs.append((pixels[i], pixels[i+1]))
# Count different pair types
close_pairs = 0 # |u-v| <= 1
distant_pairs = 0 # |u-v| > 1
for u, v in pairs:
if abs(u - v) <= 1:
close_pairs += 1
else:
distant_pairs += 1
total_pairs = len(pairs)
close_ratio = close_pairs / total_pairs
# Statistical analysis
expected_close_ratio = 0.5 # Expected for natural images
deviation = abs(close_ratio - expected_close_ratio)
return {
'total_pairs': total_pairs,
'close_pairs': close_pairs,
'distant_pairs': distant_pairs,
'close_ratio': close_ratio,
'deviation': deviation,
'steganography_detected': deviation > 0.05
}
def weighted_stego_analysis(image_path):
"""Weighted Stego Analysis"""
img = Image.open(image_path).convert('L')
pixels = np.array(img)
# Calculate weights based on local variance
weights = np.zeros_like(pixels, dtype=float)
for i in range(1, pixels.shape[0]-1):
for j in range(1, pixels.shape[1]-1):
# 3x3 neighborhood
neighborhood = pixels[i-1:i+2, j-1:j+2]
local_var = np.var(neighborhood)
weights[i, j] = 1.0 / (1.0 + local_var)
# Weighted chi-square test
weighted_stat = 0
total_weight = 0
for i in range(0, pixels.shape[0], 2):
for j in range(0, pixels.shape[1], 2):
if i+1 < pixels.shape[0] and j+1 < pixels.shape[1]:
# Get 2x2 block
block = pixels[i:i+2, j:j+2].flatten()
weight_block = weights[i:i+2, j:j+2].flatten()
# Count even/odd pairs weighted
even_count = np.sum((block % 2 == 0) * weight_block)
odd_count = np.sum((block % 2 == 1) * weight_block)
total = even_count + odd_count
if total > 0:
expected = total / 2
chi_contrib = ((even_count - expected) ** 2 +
(odd_count - expected) ** 2) / expected
weighted_stat += chi_contrib * np.mean(weight_block)
total_weight += np.mean(weight_block)
if total_weight > 0:
normalized_stat = weighted_stat / total_weight
else:
normalized_stat = 0
return {
'weighted_chi_square': weighted_stat,
'normalized_statistic': normalized_stat,
'total_weight': total_weight,
'steganography_detected': normalized_stat > 50
}
# Complete statistical analysis workflow
def run_complete_statistical_analysis(image_path):
"""Run complete statistical analysis pipeline"""
print(f"Statistical Steganalysis Report for: {image_path}")
print("=" * 60)
results = comprehensive_statistical_analysis(image_path)
detection_count = 0
total_tests = 0
# Chi-square results
print("\n1. Chi-Square Test:")
chi_results = results['chi_square']
print(f" Statistic: {chi_results['chi_square_statistic']:.3f}")
print(f" P-value: {chi_results['p_value']:.6f}")
print(f" Result: {'STEGO' if chi_results['steganography_detected'] else 'CLEAN'}")
if chi_results['steganography_detected']:
detection_count += 1
total_tests += 1
# RS Analysis results
print("\n2. RS Analysis:")
rs_results = results['rs_analysis']
print(f" Average Ratio: {rs_results['average_ratio']:.3f}")
print(f" Ratio Std Dev: {rs_results['ratio_std']:.3f}")
print(f" Confidence: {rs_results['confidence']:.3f}")
print(f" Result: {'STEGO' if rs_results['steganography_detected'] else 'CLEAN'}")
if rs_results['steganography_detected']:
detection_count += 1
total_tests += 1
# Sample Pairs results
print("\n3. Sample Pairs Analysis:")
sp_results = results['sample_pairs']
print(f" Close Pairs Ratio: {sp_results['close_ratio']:.3f}")
print(f" Deviation: {sp_results['deviation']:.3f}")
print(f" Result: {'STEGO' if sp_results['steganography_detected'] else 'CLEAN'}")
if sp_results['steganography_detected']:
detection_count += 1
total_tests += 1
# Weighted Stego results
print("\n4. Weighted Stego Analysis:")
ws_results = results['weighted_stego']
print(f" Normalized Statistic: {ws_results['normalized_statistic']:.3f}")
print(f" Result: {'STEGO' if ws_results['steganography_detected'] else 'CLEAN'}")
if ws_results['steganography_detected']:
detection_count += 1
total_tests += 1
# Overall conclusion
print(f"\n{'='*60}")
print("OVERALL ASSESSMENT:")
detection_ratio = detection_count / total_tests
print(f"Detection Rate: {detection_count}/{total_tests} tests ({detection_ratio*100:.1f}%)")
if detection_ratio >= 0.5:
print("🚨 STEGANOGRAPHY LIKELY DETECTED")
confidence_level = "HIGH" if detection_ratio >= 0.75 else "MEDIUM"
print(f"Confidence Level: {confidence_level}")
else:
print("✅ IMAGE APPEARS CLEAN")
print("Confidence Level: HIGH" if detection_ratio == 0 else "MEDIUM")
return results
# Usage
# statistical_results = run_complete_statistical_analysis('suspicious_image.png')
Phase 4: Signature and Structure Analysis
#!/bin/bash
# steg_analysis.sh - Comprehensive steganalysis script
IMAGE_PATH="$1"
if [ -z "$IMAGE_PATH" ]; then
echo "Usage: $0 <image_path>"
exit 1
fi
echo "Comprehensive Steganalysis Report"
echo "================================="
echo "File: $IMAGE_PATH"
echo "Date: $(date)"
echo ""
# Basic file information
echo "=== BASIC FILE INFORMATION ==="
file "$IMAGE_PATH"
ls -la "$IMAGE_PATH"
echo "MD5: $(md5sum "$IMAGE_PATH" | cut -d' ' -f1)"
echo "SHA256: $(sha256sum "$IMAGE_PATH" | cut -d' ' -f1)"
echo ""
# Metadata analysis
echo "=== METADATA ANALYSIS ==="
exiftool "$IMAGE_PATH" | head -20
echo ""
# String analysis
echo "=== STRING ANALYSIS ==="
echo "Printable strings (first 10):"
strings "$IMAGE_PATH" | head -10
echo ""
echo "Suspicious keywords:"
strings "$IMAGE_PATH" | grep -i -E "(password|key|secret|hidden|steg|flag|ctf)" | head -5
echo ""
# File signature analysis
echo "=== SIGNATURE ANALYSIS ==="
echo "File signatures detected:"
binwalk -B "$IMAGE_PATH" | head -10
echo ""
# Entropy analysis
echo "=== ENTROPY ANALYSIS ==="
ent "$IMAGE_PATH"
echo ""
# Hexdump analysis (first 512 bytes)
echo "=== HEXDUMP ANALYSIS (first 512 bytes) ==="
hexdump -C "$IMAGE_PATH" | head -32
echo ""
# zsteg analysis (if PNG)
if [[ "$IMAGE_PATH" == *.png ]] || [[ "$IMAGE_PATH" == *.bmp ]]; then
echo "=== ZSTEG ANALYSIS ==="
zsteg "$IMAGE_PATH" | head -20
echo ""
fi
# steghide analysis (if JPEG)
if [[ "$IMAGE_PATH" == *.jpg ]] || [[ "$IMAGE_PATH" == *.jpeg ]]; then
echo "=== STEGHIDE ANALYSIS ==="
echo "Attempting extraction without password:"
steghide extract -sf "$IMAGE_PATH" -p "" 2>&1 | head -5
echo ""
fi
echo "Analysis complete. Review results above for anomalies."
Phase 5: Automated Analysis
class ComprehensiveSteganalyzer:
def __init__(self):
self.results = {}
self.confidence_scores = {}
def analyze_file(self, file_path):
"""Perform comprehensive steganalysis"""
print(f"Analyzing: {file_path}")
print("=" * 50)
# Phase 1: Basic Analysis
basic_results = self._basic_analysis(file_path)
self.results['basic'] = basic_results
# Phase 2: Visual Analysis
if self._is_image(file_path):
visual_results = self._visual_analysis(file_path)
self.results['visual'] = visual_results
# Phase 3: Statistical Analysis
statistical_results = self._statistical_analysis(file_path)
self.results['statistical'] = statistical_results
# Phase 4: Signature Analysis
signature_results = self._signature_analysis(file_path)
self.results['signature'] = signature_results
# Phase 5: ML Analysis (if models available)
if hasattr(self, 'ml_model'):
ml_results = self._ml_analysis(file_path)
self.results['ml'] = ml_results
# Generate final report
final_assessment = self._generate_assessment()
return {
'file_path': file_path,
'results': self.results,
'assessment': final_assessment
}
def _basic_analysis(self, file_path):
"""Basic file analysis"""
import os
from pathlib import Path
stat_info = os.stat(file_path)
return {
'file_size': stat_info.st_size,
'file_extension': Path(file_path).suffix.lower(),
'creation_time': stat_info.st_ctime,
'modification_time': stat_info.st_mtime,
'file_type': self._detect_file_type(file_path)
}
def _visual_analysis(self, file_path):
"""Visual analysis for images"""
try:
img = Image.open(file_path)
img_array = np.array(img)
# Calculate various metrics
if len(img_array.shape) == 3:
gray_img = np.mean(img_array, axis=2)
lsb_plane = (img_array[:,:,0] & 1) * 255
else:
gray_img = img_array
lsb_plane = (img_array & 1) * 255
# Visual metrics
histogram_entropy = self._calculate_entropy(gray_img.flatten())
lsb_entropy = self._calculate_entropy(lsb_plane.flatten())
edge_density = self._calculate_edge_density(gray_img)
# Anomaly scoring
visual_score = 0
if lsb_entropy > 0.9: # High LSB entropy
visual_score += 0.3
if histogram_entropy < 6: # Low overall entropy
visual_score += 0.2
if edge_density < 0.05: # Very few edges
visual_score += 0.2
return {
'histogram_entropy': histogram_entropy,
'lsb_entropy': lsb_entropy,
'edge_density': edge_density,
'visual_anomaly_score': visual_score,
'suspicious': visual_score > 0.4
}
except Exception as e:
return {'error': str(e)}
def _statistical_analysis(self, file_path):
"""Statistical analysis"""
try:
# Run chi-square test
chi_results = chi_square_test(file_path, confidence_level=0.95)
# Run RS analysis
rs_results = rs_analysis(file_path)
# Calculate combined statistical score
stat_score = 0
if chi_results['steganography_detected']:
stat_score += 0.4
if rs_results['steganography_detected']:
stat_score += 0.4
return {
'chi_square': chi_results,
'rs_analysis': rs_results,
'statistical_score': stat_score,
'suspicious': stat_score > 0.5
}
except Exception as e:
return {'error': str(e)}
def _signature_analysis(self, file_path):
"""File signature analysis"""
try:
analyzer = FileSignatureAnalyzer()
signatures = analyzer.scan_file(file_path)
# Calculate signature score
sig_score = 0
embedded_files = len(signatures)
if embedded_files > 0:
sig_score = min(embedded_files * 0.3, 1.0)
return {
'embedded_signatures': signatures,
'embedded_file_count': embedded_files,
'signature_score': sig_score,
'suspicious': embedded_files > 0
}
except Exception as e:
return {'error': str(e)}
def _generate_assessment(self):
"""Generate final assessment"""
total_score = 0
max_score = 0
detection_methods = []
# Aggregate scores from different analysis methods
if 'visual' in self.results and 'visual_anomaly_score' in self.results['visual']:
visual_score = self.results['visual']['visual_anomaly_score']
total_score += visual_score
max_score += 1.0
if self.results['visual']['suspicious']:
detection_methods.append('Visual Analysis')
if 'statistical' in self.results and 'statistical_score' in self.results['statistical']:
stat_score = self.results['statistical']['statistical_score']
total_score += stat_score
max_score += 1.0
if self.results['statistical']['suspicious']:
detection_methods.append('Statistical Analysis')
if 'signature' in self.results and 'signature_score' in self.results['signature']:
sig_score = self.results['signature']['signature_score']
total_score += sig_score
max_score += 1.0
if self.results['signature']['suspicious']:
detection_methods.append('Signature Analysis')
# Calculate final confidence
if max_score > 0:
confidence = total_score / max_score
else:
confidence = 0
# Determine final verdict
if confidence >= 0.7:
verdict = "STEGANOGRAPHY DETECTED"
confidence_level = "HIGH"
elif confidence >= 0.4:
verdict = "STEGANOGRAPHY POSSIBLE"
confidence_level = "MEDIUM"
else:
verdict = "NO STEGANOGRAPHY DETECTED"
confidence_level = "HIGH" if confidence <= 0.1 else "LOW"
return {
'verdict': verdict,
'confidence_score': confidence,
'confidence_level': confidence_level,
'detection_methods': detection_methods,
'total_methods_triggered': len(detection_methods)
}
def _is_image(self, file_path):
"""Check if file is an image"""
image_extensions = {'.png', '.jpg', '.jpeg', '.bmp', '.gif', '.tiff', '.webp'}
return Path(file_path).suffix.lower() in image_extensions
def _detect_file_type(self, file_path):
"""Detect file type"""
try:
with open(file_path, 'rb') as f:
header = f.read(16)
# Check common signatures
if header.startswith(b'\x89PNG'):
return 'PNG'
elif header.startswith(b'\xFF\xD8\xFF'):
return 'JPEG'
elif header.startswith(b'BM'):
return 'BMP'
elif header.startswith(b'GIF8'):
return 'GIF'
elif header.startswith(b'PK'):
return 'ZIP/Archive'
elif header.startswith(b'%PDF'):
return 'PDF'
else:
return 'Unknown'
except:
return 'Error'
def _calculate_entropy(self, data):
"""Calculate Shannon entropy"""
unique, counts = np.unique(data, return_counts=True)
probabilities = counts / len(data)
entropy = -np.sum(probabilities * np.log2(probabilities + 1e-10))
return entropy
def _calculate_edge_density(self, image):
"""Calculate edge density"""
from scipy import ndimage
edges = ndimage.sobel(image)
return np.sum(edges > np.mean(edges)) / edges.size
def print_report(self, analysis_results):
"""Print formatted analysis report"""
print("\nCOMPREHENSIVE STEGANALYSIS REPORT")
print("=" * 60)
print(f"File: {analysis_results['file_path']}")
assessment = analysis_results['assessment']
print(f"\nFINAL VERDICT: {assessment['verdict']}")
print(f"Confidence Score: {assessment['confidence_score']:.3f}")
print(f"Confidence Level: {assessment['confidence_level']}")
if assessment['detection_methods']:
print(f"\nDetection Methods Triggered ({assessment['total_methods_triggered']}):")
for method in assessment['detection_methods']:
print(f" • {method}")
print(f"\n{'='*60}")
# Detailed results
results = analysis_results['results']
if 'visual' in results and not isinstance(results['visual'], dict) or 'error' not in results['visual']:
print(f"Visual Analysis Score: {results['visual']['visual_anomaly_score']:.3f}")
if 'statistical' in results and 'error' not in results['statistical']:
print(f"Statistical Analysis Score: {results['statistical']['statistical_score']:.3f}")
if 'signature' in results and 'error' not in results['signature']:
print(f"Signature Analysis: {results['signature']['embedded_file_count']} embedded files detected")
# Usage example
def run_comprehensive_analysis(file_path):
"""Run complete steganalysis workflow"""
analyzer = ComprehensiveSteganalyzer()
results = analyzer.analyze_file(file_path)
analyzer.print_report(results)
return results
# Example usage
# results = run_comprehensive_analysis('suspicious_file.png')
Programming Your Own Steganalysis Tools
Custom LSB Detector
class CustomLSBDetector:
def __init__(self):
self.detection_methods = [
'chi_square',
'histogram_analysis',
'pair_analysis',
'run_length_analysis'
]
def detect_lsb_steganography(self, image_path):
"""Main detection method combining multiple approaches"""
results = {}
# Load and preprocess image
img = Image.open(image_path)
if img.mode != 'RGB':
img = img.convert('RGB')
img_array = np.array(img)
# Apply each detection method
for method in self.detection_methods:
method_func = getattr(self, f'_{method}')
results[method] = method_func(img_array)
# Combine results
final_result = self._combine_results(results)
return final_result
def _chi_square(self, img_array):
"""Chi-square test for each color channel"""
results = {}
for channel in range(3):
channel_data = img_array[:, :, channel].flatten()
# Count pairs
pairs = {}
for i in range(0, len(channel_data), 2):
if i + 1 < len(channel_data):
val1, val2 = channel_data[i], channel_data[i + 1]
pair_key = (val1 // 2) * 2 # Group by even values
if pair_key not in pairs:
pairs[pair_key] = [0, 0] # [even_count, odd_count]
if val1 % 2 == 0:
pairs[pair_key][0] += 1
else:
pairs[pair_key][1] += 1
if val2 % 2 == 0:
pairs[pair_key][0] += 1
else:
pairs[pair_key][1] += 1
# Calculate chi-square statistic
chi_square_stat = 0
for pair_key, counts in pairs.items():
even_count, odd_count = counts
total = even_count + odd_count
if total > 0:
expected = total / 2
chi_square_stat += ((even_count - expected) ** 2 +
(odd_count - expected) ** 2) / expected
# Determine p-value and detection
from scipy.stats import chi2
df = len(pairs) - 1
p_value = 1 - chi2.cdf(chi_square_stat, df) if df > 0 else 1
results[f'channel_{channel}'] = {
'chi_square_stat': chi_square_stat,
'p_value': p_value,
'detected': p_value < 0.05,
'confidence': 1 - p_value if p_value < 0.05 else p_value
}
return results
def _histogram_analysis(self, img_array):
"""Analyze histogram characteristics"""
results = {}
for channel in range(3):
channel_data = img_array[:, :, channel].flatten()
# Calculate histogram
hist, bins = np.histogram(channel_data, bins=256, range=(0, 256))
# Analyze even/odd distribution
even_sum = np.sum(hist[0::2])
odd_sum = np.sum(hist[1::2])
# Calculate ratio and deviation
if even_sum + odd_sum > 0:
even_ratio = even_sum / (even_sum + odd_sum)
deviation = abs(even_ratio - 0.5)
else:
even_ratio = 0.5
deviation = 0
# Calculate histogram entropy
hist_normalized = hist / np.sum(hist)
entropy = -np.sum(hist_normalized * np.log2(hist_normalized + 1e-10))
# Detect anomalies
anomaly_score = 0
if deviation > 0.1: # Significant even/odd imbalance
anomaly_score += 0.4
if entropy < 6: # Low entropy
anomaly_score += 0.3
results[f'channel_{channel}'] = {
'even_ratio': even_ratio,
'deviation': deviation,
'entropy': entropy,
'anomaly_score': anomaly_score,
'detected': anomaly_score > 0.5
}
return results
def _pair_analysis(self, img_array):
"""Analyze adjacent pixel pairs"""
results = {}
for channel in range(3):
channel_data = img_array[:, :, channel]
# Horizontal pairs
h_pairs = []
for i in range(channel_data.shape[0]):
for j in range(channel_data.shape[1] - 1):
h_pairs.append((channel_data[i, j], channel_data[i, j + 1]))
# Vertical pairs
v_pairs = []
for i in range(channel_data.shape[0] - 1):
for j in range(channel_data.shape[1]):
v_pairs.append((channel_data[i, j], channel_data[i + 1, j]))
# Analyze pairs
def analyze_pairs(pairs):
close_pairs = sum(1 for p1, p2 in pairs if abs(p1 - p2) <= 1)
total_pairs = len(pairs)
close_ratio = close_pairs / total_pairs if total_pairs > 0 else 0
return close_ratio
h_ratio = analyze_pairs(h_pairs)
v_ratio = analyze_pairs(v_pairs)
# Expected ratio for natural images
expected_ratio = 0.4
h_deviation = abs(h_ratio - expected_ratio)
v_deviation = abs(v_ratio - expected_ratio)
results[f'channel_{channel}'] = {
'horizontal_close_ratio': h_ratio,
'vertical_close_ratio': v_ratio,
'h_deviation': h_deviation,
'v_deviation': v_deviation,
'detected': max(h_deviation, v_deviation) > 0.15
}
return results
def _run_length_analysis(self, img_array):
"""Analyze run lengths in LSB sequences"""
results = {}
for channel in range(3):
channel_data = img_array[:, :, channel].flatten()
# Extract LSB sequence
lsb_sequence = channel_data & 1
# Calculate run lengths
runs = []
current_run = 1
current_value = lsb_sequence[0]
for i in range(1, len(lsb_sequence)):
if lsb_sequence[i] == current_value:
current_run += 1
else:
runs.append(current_run)
current_run = 1
current_value = lsb_sequence[i]
runs.append(current_run) # Add last run
# Analyze run length distribution
avg_run_length = np.mean(runs)
run_entropy = self._calculate_entropy(runs)
# Expected values for random data
expected_avg_run = 2.0
expected_entropy = 2.0
run_deviation = abs(avg_run_length - expected_avg_run)
entropy_deviation = abs(run_entropy - expected_entropy)
results[f'channel_{channel}'] = {
'average_run_length': avg_run_length,
'run_entropy': run_entropy,
'run_deviation': run_deviation,
'entropy_deviation': entropy_deviation,
'detected': run_deviation > 0.5 or entropy_deviation > 0.5
}
return results
def _combine_results(self, method_results):
"""Combine results from all detection methods"""
detection_count = 0
total_tests = 0
confidence_scores = []
final_results = {
'method_results': method_results,
'detections_by_method': {},
'overall_confidence': 0,
'steganography_detected': False
}
# Analyze each method
for method, results in method_results.items():
method_detections = 0
method_tests = 0
for channel_key, channel_results in results.items():
if 'detected' in channel_results:
if channel_results['detected']:
method_detections += 1
detection_count += 1
method_tests += 1
total_tests += 1
# Extract confidence if available
if 'confidence' in channel_results:
confidence_scores.append(channel_results['confidence'])
elif 'anomaly_score' in channel_results:
confidence_scores.append(channel_results['anomaly_score'])
final_results['detections_by_method'][method] = {
'detections': method_detections,
'tests': method_tests,
'detection_rate': method_detections / method_tests if method_tests > 0 else 0
}
# Calculate overall confidence
if confidence_scores:
final_results['overall_confidence'] = np.mean(confidence_scores)
# Final detection decision
detection_rate = detection_count / total_tests if total_tests > 0 else 0
final_results['steganography_detected'] = detection_rate > 0.5
final_results['detection_rate'] = detection_rate
return final_results
def _calculate_entropy(self, data):
"""Calculate Shannon entropy"""
unique, counts = np.unique(data, return_counts=True)
probabilities = counts / len(data)
entropy = -np.sum(probabilities * np.log2(probabilities + 1e-10))
return entropy
def print_detection_report(self, results):
"""Print detailed detection report"""
print("CUSTOM LSB DETECTION REPORT")
print("=" * 40)
# Overall results
print(f"Steganography Detected: {results['steganography_detected']}")
print(f"Overall Confidence: {results['overall_confidence']:.3f}")
print(f"Detection Rate: {results['detection_rate']:.3f}")
print()
# Method-by-method results
print("Detection Methods:")
for method, method_data in results['detections_by_method'].items():
print(f" {method.replace('_', ' ').title()}:")
print(f" Detections: {method_data['detections']}/{method_data['tests']}")
print(f" Rate: {method_data['detection_rate']:.3f}")
print()
# Usage example
"""
detector = CustomLSBDetector()
results = detector.detect_lsb_steganography('test_image.png')
detector.print_detection_report(results)
"""
Batch Analysis Tool
import os
import json
import concurrent.futures
from datetime import datetime
from pathlib import Path
import csv
class BatchSteganalyzer:
def __init__(self, max_workers=4):
self.max_workers = max_workers
self.supported_formats = {'.png', '.jpg', '.jpeg', '.bmp', '.gif', '.tiff', '.webp'}
self.results = []
def analyze_directory(self, directory_path, recursive=True, output_file=None):
"""Analyze all supported image files in a directory"""
file_paths = self._collect_files(directory_path, recursive)
print(f"Found {len(file_paths)} supported image files")
print(f"Starting batch analysis with {self.max_workers} workers...")
# Progress tracking
completed = 0
total = len(file_paths)
# Parallel processing
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all analysis tasks
future_to_path = {
executor.submit(self._analyze_single_file, path): path
for path in file_paths
}
# Collect results as they complete
for future in concurrent.futures.as_completed(future_to_path):
file_path = future_to_path[future]
completed += 1
try:
result = future.result()
self.results.append(result)
# Progress update
if completed % 10 == 0 or completed == total:
print(f"Progress: {completed}/{total} ({completed/total*100:.1f}%)")
except Exception as e:
print(f"Error analyzing {file_path}: {e}")
self.results.append({
'file_path': str(file_path),
'error': str(e),
'timestamp': datetime.now().isoformat()
})
# Generate summary report
summary = self._generate_summary()
# Save results if output file specified
if output_file:
self._save_results(output_file, summary)
return self.results, summary
def _collect_files(self, directory_path, recursive):
"""Collect all supported image files"""
directory = Path(directory_path)
file_paths = []
if recursive:
pattern = "**/*"
else:
pattern = "*"
for file_path in directory.glob(pattern):
if file_path.is_file() and file_path.suffix.lower() in self.supported_formats:
file_paths.append(file_path)
return sorted(file_paths)
def _analyze_single_file(self, file_path):
"""Analyze a single file"""
try:
# Use comprehensive analyzer
analyzer = ComprehensiveSteganalyzer()
result = analyzer.analyze_file(str(file_path))
# Add metadata
result['analysis_timestamp'] = datetime.now().isoformat()
result['file_size'] = file_path.stat().st_size
result['relative_path'] = str(file_path.name)
return result
except Exception as e:
return {
'file_path': str(file_path),
'error': str(e),
'timestamp': datetime.now().isoformat()
}
def _generate_summary(self):
"""Generate analysis summary statistics"""
summary = {
'total_files': len(self.results),
'successful_analyses': 0,
'failed_analyses': 0,
'steganography_detected': 0,
'suspicious_files': [],
'clean_files': 0,
'detection_by_method': {},
'error_summary': {}
}
for result in self.results:
if 'error' in result:
summary['failed_analyses'] += 1
error = result['error']
summary['error_summary'][error] = summary['error_summary'].get(error, 0) + 1
else:
summary['successful_analyses'] += 1
# Check detection status
assessment = result.get('assessment', {})
verdict = assessment.get('verdict', '')
if 'DETECTED' in verdict:
summary['steganography_detected'] += 1
summary['suspicious_files'].append({
'file_path': result['file_path'],
'confidence': assessment.get('confidence_score', 0),
'methods': assessment.get('detection_methods', [])
})
elif 'NO STEGANOGRAPHY' in verdict:
summary['clean_files'] += 1
# Track detection methods
for method in assessment.get('detection_methods', []):
summary['detection_by_method'][method] = summary['detection_by_method'].get(method, 0) + 1
return summary
def _save_results(self, output_file, summary):
"""Save results to file"""
output_path = Path(output_file)
# Save detailed results as JSON
json_path = output_path.with_suffix('.json')
with open(json_path, 'w') as f:
json.dump({
'summary': summary,
'detailed_results': self.results,
'analysis_date': datetime.now().isoformat()
}, f, indent=2)
# Save summary as CSV
csv_path = output_path.with_suffix('.csv')
with open(csv_path, 'w', newline='') as f:
writer = csv.writer(f)
# Header
writer.writerow([
'File Path', 'File Size', 'Verdict', 'Confidence Score',
'Detection Methods', 'Analysis Status'
])
# Data rows
for result in self.results:
if 'error' in result:
writer.writerow([
result['file_path'], 'N/A', 'ERROR', 'N/A', 'N/A', result['error']
])
else:
assessment = result.get('assessment', {})
writer.writerow([
result['file_path'],
result.get('file_size', 'N/A'),
assessment.get('verdict', 'N/A'),
f"{assessment.get('confidence_score', 0):.3f}",
', '.join(assessment.get('detection_methods', [])),
'SUCCESS'
])
print(f"Results saved to:")
print(f" JSON: {json_path}")
print(f" CSV: {csv_path}")
def print_summary_report(self, summary):
"""Print summary report"""
print("\n" + "="*60)
print("BATCH STEGANALYSIS SUMMARY REPORT")
print("="*60)
# Overall statistics
print(f"Total Files Analyzed: {summary['total_files']}")
print(f"Successful Analyses: {summary['successful_analyses']}")
print(f"Failed Analyses: {summary['failed_analyses']}")
print()
# Detection statistics
print("DETECTION RESULTS:")
print(f" Steganography Detected: {summary['steganography_detected']}")
print(f" Clean Files: {summary['clean_files']}")
if summary['successful_analyses'] > 0:
detection_rate = summary['steganography_detected'] / summary['successful_analyses'] * 100
print(f" Detection Rate: {detection_rate:.1f}%")
print()
# Detection methods used
if summary['detection_by_method']:
print("DETECTION METHODS:")
for method, count in summary['detection_by_method'].items():
print(f" {method}: {count} detections")
print()
# Top suspicious files
if summary['suspicious_files']:
print("TOP SUSPICIOUS FILES:")
# Sort by confidence score
sorted_suspicious = sorted(
summary['suspicious_files'],
key=lambda x: x['confidence'],
reverse=True
)[:5] # Top 5
for i, file_info in enumerate(sorted_suspicious, 1):
print(f" {i}. {Path(file_info['file_path']).name}")
print(f" Confidence: {file_info['confidence']:.3f}")
print(f" Methods: {', '.join(file_info['methods'])}")
print()
# Error summary
if summary['error_summary']:
print("ERRORS ENCOUNTERED:")
for error, count in summary['error_summary'].items():
print(f" {error}: {count} files")
print("="*60)
# Advanced filtering and search capabilities
class SteganalysisDatabase:
def __init__(self, results_file=None):
self.results = []
if results_file:
self.load_results(results_file)
def load_results(self, results_file):
"""Load results from JSON file"""
with open(results_file, 'r') as f:
data = json.load(f)
self.results = data.get('detailed_results', [])
def filter_by_verdict(self, verdict):
"""Filter results by verdict"""
filtered = []
for result in self.results:
if 'assessment' in result:
if verdict.upper() in result['assessment'].get('verdict', '').upper():
filtered.append(result)
return filtered
def filter_by_confidence(self, min_confidence=0.0, max_confidence=1.0):
"""Filter results by confidence range"""
filtered = []
for result in self.results:
if 'assessment' in result:
confidence = result['assessment'].get('confidence_score', 0)
if min_confidence <= confidence <= max_confidence:
filtered.append(result)
return filtered
def filter_by_method(self, method_name):
"""Filter results by detection method"""
filtered = []
for result in self.results:
if 'assessment' in result:
methods = result['assessment'].get('detection_methods', [])
if any(method_name.lower() in method.lower() for method in methods):
filtered.append(result)
return filtered
def filter_by_file_size(self, min_size=0, max_size=float('inf')):
"""Filter results by file size"""
filtered = []
for result in self.results:
file_size = result.get('file_size', 0)
if min_size <= file_size <= max_size:
filtered.append(result)
return filtered
def search_by_filename(self, pattern):
"""Search results by filename pattern"""
import re
filtered = []
regex = re.compile(pattern, re.IGNORECASE)
for result in self.results:
file_path = result.get('file_path', '')
filename = Path(file_path).name
if regex.search(filename):
filtered.append(result)
return filtered
def generate_report(self, filtered_results=None):
"""Generate detailed report for filtered results"""
if filtered_results is None:
filtered_results = self.results
print(f"\nDETAILED ANALYSIS REPORT ({len(filtered_results)} files)")
print("="*60)
for i, result in enumerate(filtered_results, 1):
print(f"\n{i}. {Path(result['file_path']).name}")
print("-" * 40)
if 'error' in result:
print(f" Status: ERROR - {result['error']}")
continue
# Basic info
print(f" File Size: {result.get('file_size', 'N/A')} bytes")
# Assessment
if 'assessment' in result:
assessment = result['assessment']
print(f" Verdict: {assessment.get('verdict', 'N/A')}")
print(f" Confidence: {assessment.get('confidence_score', 0):.3f}")
methods = assessment.get('detection_methods', [])
if methods:
print(f" Detection Methods: {', '.join(methods)}")
# Detailed results summary
if 'results' in result:
results_data = result['results']
# Visual analysis
if 'visual' in results_data and 'error' not in results_data['visual']:
visual = results_data['visual']
print(f" Visual Score: {visual.get('visual_anomaly_score', 0):.3f}")
# Statistical analysis
if 'statistical' in results_data and 'error' not in results_data['statistical']:
statistical = results_data['statistical']
print(f" Statistical Score: {statistical.get('statistical_score', 0):.3f}")
# Signature analysis
if 'signature' in results_data and 'error' not in results_data['signature']:
signature = results_data['signature']
embedded_count = signature.get('embedded_file_count', 0)
if embedded_count > 0:
print(f" Embedded Files: {embedded_count}")
# Usage example and main execution
def main():
"""Main function for batch steganalysis"""
import argparse
parser = argparse.ArgumentParser(description='Batch Steganalysis Tool')
parser.add_argument('directory', help='Directory to analyze')
parser.add_argument('-r', '--recursive', action='store_true',
help='Analyze subdirectories recursively')
parser.add_argument('-o', '--output', help='Output file prefix')
parser.add_argument('-w', '--workers', type=int, default=4,
help='Number of worker threads')
parser.add_argument('--filter-verdict', help='Filter results by verdict')
parser.add_argument('--min-confidence', type=float, default=0.0,
help='Minimum confidence threshold')
parser.add_argument('--max-confidence', type=float, default=1.0,
help='Maximum confidence threshold')
args = parser.parse_args()
# Run batch analysis
analyzer = BatchSteganalyzer(max_workers=args.workers)
results, summary = analyzer.analyze_directory(
args.directory,
recursive=args.recursive,
output_file=args.output
)
# Print summary
analyzer.print_summary_report(summary)
# Apply filters if specified
if args.filter_verdict or args.min_confidence > 0.0 or args.max_confidence < 1.0:
db = SteganalysisDatabase()
db.results = results
filtered = db.results
if args.filter_verdict:
filtered = [r for r in filtered if args.filter_verdict.upper() in
r.get('assessment', {}).get('verdict', '').upper()]
if args.min_confidence > 0.0 or args.max_confidence < 1.0:
filtered = [r for r in filtered if
args.min_confidence <= r.get('assessment', {}).get('confidence_score', 0) <= args.max_confidence]
print(f"\nFILTERED RESULTS ({len(filtered)} files matching criteria):")
db.generate_report(filtered)
if __name__ == "__main__":
main()
"""
Example usage:
# Basic batch analysis
python batch_steganalyzer.py /path/to/images -r -o results
# Filter for high-confidence detections
python batch_steganalyzer.py /path/to/images -r --min-confidence 0.7
# Use database for advanced filtering
db = SteganalysisDatabase('results.json')
suspicious_large_files = db.filter_by_file_size(min_size=1000000) # > 1MB
high_confidence = db.filter_by_confidence(min_confidence=0.8)
statistical_detections = db.filter_by_method('Statistical')
# Generate reports
db.generate_report(suspicious_large_files)
"""
Real-World Applications and Case Studies
Case Study 1: Corporate Data Exfiltration
Background: A multinational corporation suspected that sensitive product designs were being leaked through seemingly innocent marketing images posted on social media.
Investigation Approach:
def corporate_leak_investigation(image_directory, reference_keywords):
"""Investigate potential corporate data leaks in images"""
results = {
'suspicious_images': [],
'keyword_matches': [],
'large_embeddings': [],
'statistical_anomalies': []
}
for image_path in Path(image_directory).glob('*.png'):
# 1. Signature analysis for embedded documents
signature_analyzer = FileSignatureAnalyzer()
signatures = signature_analyzer.scan_file(str(image_path))
if signatures:
# Check for document files (PDF, DOC, ZIP)
doc_signatures = [s for s in signatures if s['file_type'] in ['PDF', 'DOC', 'ZIP']]
if doc_signatures:
results['large_embeddings'].append({
'file': str(image_path),
'embedded_types': [s['file_type'] for s in doc_signatures],
'positions': [s['position'] for s in doc_signatures]
})
# 2. String analysis for keywords
with open(image_path, 'rb') as f:
file_content = f.read().decode('utf-8', errors='ignore')
found_keywords = []
for keyword in reference_keywords:
if keyword.lower() in file_content.lower():
found_keywords.append(keyword)
if found_keywords:
results['keyword_matches'].append({
'file': str(image_path),
'keywords': found_keywords
})
# 3. Statistical analysis
try:
chi_results = chi_square_test(str(image_path))
if chi_results['steganography_detected']:
results['statistical_anomalies'].append({
'file': str(image_path),
'chi_square': chi_results['chi_square_statistic'],
'p_value': chi_results['p_value']
})
except:
pass
return results
# Investigation keywords
corporate_keywords = [
'confidential', 'proprietary', 'trade secret', 'internal only',
'project alpha', 'q4 roadmap', 'financial projection', 'patent'
]
# Run investigation
investigation_results = corporate_leak_investigation('/marketing_images/', corporate_keywords)
Results: The investigation revealed 3 marketing images containing embedded ZIP files with CAD drawings and financial projections. The perpetrator was identified through metadata analysis.
Key Lessons:
- Always check for embedded archives in corporate images
- Monitor social media posts for suspicious file sizes
- Implement automated screening for outgoing images
Case Study 2: Law Enforcement Operation
Background: Law enforcement suspected a criminal organization was coordinating activities through images shared on public forums.
Technical Approach:
def law_enforcement_analysis(image_urls, evidence_directory):
"""Analyze images for potential criminal communications"""
analysis_results = {
'communication_patterns': [],
'encryption_indicators': [],
'timing_analysis': [],
'network_connections': []
}
for url in image_urls:
# Download and analyze image
image_path = download_image(url, evidence_directory)
# 1. Advanced steganalysis
ml_analyzer = CNNSteganalyzer()
ml_results = ml_analyzer.predict(image_path)
if ml_results['prediction'] == 'STEGO':
# 2. Attempt extraction with common passwords
common_passwords = ['password', '123456', 'admin', 'secret']
for password in common_passwords:
try:
# Attempt steghide extraction
subprocess.run([
'steghide', 'extract', '-sf', image_path,
'-p', password, '-xf', f'{image_path}.extracted'
], capture_output=True)
if Path(f'{image_path}.extracted').exists():
analysis_results['communication_patterns'].append({
'source_url': url,
'extraction_successful': True,
'password_used': password,
'extracted_file': f'{image_path}.extracted'
})
break
except:
continue
# 3. Metadata analysis for coordination
metadata = analyze_metadata(image_path)
if 'GPS' in str(metadata) or 'timestamp' in str(metadata).lower():
analysis_results['timing_analysis'].append({
'source_url': url,
'metadata': metadata,
'timestamp': extract_timestamp(metadata)
})
return analysis_results
Results: The operation uncovered a network using LSB steganography in memes to coordinate drug trafficking routes. The consistent timing and GPS metadata led to several arrests.
Case Study 3: Academic Research Misconduct
Background: A university suspected that students were sharing exam answers through image submissions in an online learning platform.
Detection Strategy:
def academic_integrity_check(submission_directory, exam_date):
"""Check student image submissions for potential answer sharing"""
integrity_report = {
'suspicious_submissions': [],
'similar_hidden_content': [],
'timing_anomalies': [],
'statistical_outliers': []
}
submissions = list(Path(submission_directory).glob('*.png'))
# 1. Batch steganalysis
for submission in submissions:
# Extract potential hidden data
hidden_data = extract_all_hidden_data(str(submission))
if hidden_data:
integrity_report['suspicious_submissions'].append({
'student_file': submission.name,
'hidden_content_size': len(hidden_data),
'content_preview': hidden_data[:100] if len(hidden_data) > 100 else hidden_data
})
# 2. Cross-reference similar content
hidden_contents = []
for submission in integrity_report['suspicious_submissions']:
if 'content_preview' in submission:
hidden_contents.append((
submission['student_file'],
submission['content_preview']
))
# Find similarities
for i, (file1, content1) in enumerate(hidden_contents):
for j, (file2, content2) in enumerate(hidden_contents[i+1:], i+1):
similarity = calculate_text_similarity(content1, content2)
if similarity > 0.8: # High similarity threshold
integrity_report['similar_hidden_content'].append({
'file1': file1,
'file2': file2,
'similarity_score': similarity
})
# 3. Temporal analysis
submission_times = []
for submission in submissions:
creation_time = submission.stat().st_ctime
submission_times.append((submission.name, creation_time))
# Look for suspicious clustering of submissions
submission_times.sort(key=lambda x: x[1])
for i in range(len(submission_times) - 1):
time_diff = submission_times[i+1][1] - submission_times[i][1]
if time_diff < 60: # Submitted within 1 minute
integrity_report['timing_anomalies'].append({
'file1': submission_times[i][0],
'file2': submission_times[i+1][0],
'time_difference_seconds': time_diff
})
return integrity_report
def calculate_text_similarity(text1, text2):
"""Calculate similarity between two text strings"""
from difflib import SequenceMatcher
return SequenceMatcher(None, text1, text2).ratio()
def extract_all_hidden_data(image_path):
"""Attempt to extract hidden data using multiple methods"""
hidden_data = b""
# Try LSB extraction
try:
img = Image.open(image_path)
img_array = np.array(img)
if len(img_array.shape) == 3:
# Extract LSB from red channel
lsb_data = (img_array[:, :, 0] & 1).flatten()
# Convert bits to bytes
byte_data = []
for i in range(0, len(lsb_data), 8):
byte_bits = lsb_data[i:i+8]
if len(byte_bits) == 8:
byte_value = sum(bit * (2**j) for j, bit in enumerate(byte_bits))
byte_data.append(byte_value)
hidden_data = bytes(byte_data)
except:
pass
# Try steghide extraction
try:
result = subprocess.run([
'steghide', 'extract', '-sf', image_path, '-p', '', '-xf', '-'
], capture_output=True)
if result.returncode == 0:
hidden_data += result.stdout
except:
pass
return hidden_data.decode('utf-8', errors='ignore') if hidden_data else ""
Results: The system identified 12 students who had embedded identical answer sheets in their image submissions, submitted within minutes of each other. The similarity analysis revealed 95% identical hidden content.
Case Study 4: Intellectual Property Theft
Background: A software company discovered that their proprietary algorithms were appearing in competitor products and suspected data theft through image-based communication channels.
Investigation Framework:
class IPTheftInvestigation:
def __init__(self):
self.source_code_signatures = []
self.algorithm_patterns = []
self.communication_timeline = []
self.code_patterns = [
r'class\s+(\w+Algorithm)',
r'def\s+proprietary_(\w+)',
r'SECRET_KEY\s*=\s*["\']([^"\']+)["\']',
r'API_ENDPOINT\s*=\s*["\']([^"\']+)["\']',
r'# CONFIDENTIAL:.*',
r'Copyright.*CompanyName'
]
self.algorithm_signatures = [
'QuickSortPro implementation',
'AdvancedEncryption v2.1',
'OptimalPathFinding algorithm',
'ProprietaryML model',
'CustomHashFunction'
]
def analyze_employee_communications(self, image_directory, employee_id):
"""Analyze images from specific employee for IP theft indicators"""
results = {
'code_fragments': [],
'algorithm_references': [],
'external_communications': [],
'risk_score': 0
}
employee_images = self._get_employee_images(image_directory, employee_id)
for image_path in employee_images:
# 1. Advanced steganographic extraction
extracted_data = self._comprehensive_extraction(image_path)
# 2. Code pattern matching
if extracted_data:
code_matches = self._detect_code_patterns(extracted_data)
if code_matches:
results['code_fragments'].extend(code_matches)
results['risk_score'] += len(code_matches) * 10
# 3. Algorithm signature detection
algo_matches = self._detect_algorithm_signatures(extracted_data)
if algo_matches:
results['algorithm_references'].extend(algo_matches)
results['risk_score'] += len(algo_matches) * 15
# 4. Communication pattern analysis
metadata = self._extract_metadata(image_path)
external_indicators = self._check_external_communication(metadata)
if external_indicators:
results['external_communications'].append({
'image': str(image_path),
'indicators': external_indicators,
'timestamp': metadata.get('timestamp')
})
results['risk_score'] += 5
self.communication_timeline.append(metadata.get('timestamp'))
return results
def _comprehensive_extraction(self, image_path):
"""Extract hidden data using multiple advanced techniques"""
extracted_content = ""
# 1. Multiple LSB planes
for bit_plane in range(3):
try:
content = self._extract_lsb_plane(image_path, bit_plane)
extracted_content += content
except:
pass
# 2. DCT coefficient extraction
try:
dct_content = self._extract_dct_coefficients(image_path)
extracted_content += dct_content
except:
pass
# 3. Frequency domain analysis
try:
freq_content = self._extract_frequency_domain(image_path)
extracted_content += freq_content
except:
pass
return extracted_content
def _detect_code_patterns(self, text_content):
"""Detect proprietary code patterns in extracted text"""
matches = []
for pattern in self.code_patterns:
import re
pattern_matches = re.findall(pattern, text_content, re.IGNORECASE)
if pattern_matches:
matches.append({
'pattern': pattern,
'matches': pattern_matches,
'severity': 'HIGH'
})
return matches
def _detect_algorithm_signatures(self, text_content):
"""Detect proprietary algorithm references"""
matches = []
for signature in self.algorithm_signatures:
if signature.lower() in text_content.lower():
matches.append({
'algorithm': signature,
'context': self._extract_context(text_content, signature),
'severity': 'CRITICAL'
})
return matches
def _extract_context(self, text_content, signature):
"""Extract surrounding context for a signature"""
pos = text_content.lower().find(signature.lower())
if pos == -1:
return ""
start = max(0, pos - 50)
end = min(len(text_content), pos + len(signature) + 50)
return text_content[start:end]
def_get_employee_images(self, image_directory, employee_id):
"""Get list of images associated with employee"""
import os
return [os.path.join(image_directory, f) for f in os.listdir(image_directory) if str(employee_id) in f]
def _extract_metadata(self, image_path):
"""Extract metadata from image"""
import os
from datetime import datetime
timestamp = datetime.fromtimestamp(os.path.getmtime(image_path))
return {'timestamp': timestamp.isoformat()}
def _check_external_communication(self, metadata):
"""Check for indicators of external communication"""
# Placeholder: Assume external if timestamp is after a certain date
if metadata.get('timestamp', '').startswith('2025'):
return ['Possible external share']
return []
def _extract_lsb_plane(self, image_path, bit_plane):
"""Extract data from LSB plane"""
from matplotlib import image as mpimg
import numpy as np
img = mpimg.imread(image_path)
if img.dtype == float:
img = (img * 255).astype(np.uint8)
flat = img.reshape(-1)
bits = np.bitwise_and(flat >> bit_plane, 1)
bytes_arr = np.packbits(bits)
extracted = ''
for b in bytes_arr:
if b == 0:
break
extracted += chr(b)
return extracted
def _extract_dct_coefficients(self, image_path):
"""Extract hidden data from DCT coefficients"""
from matplotlib import image as mpimg
import numpy as np
from scipy.fftpack import dct
img = mpimg.imread(image_path)
if len(img.shape) == 3:
img = np.mean(img, axis=2).astype(float)
height, width = img.shape
bits = []
for i in range(0, height, 8):
for j in range(0, width, 8):
block = img[i:i+8, j:j+8]
if block.shape != (8, 8):
continue
dct_block = dct(dct(block.T, norm='ortho').T, norm='ortho')
coeff = dct_block[2, 3] # Example mid-frequency coefficient
bit = int(abs(coeff)) % 2
bits.append(bit)
bytes_arr = np.packbits(bits)
extracted = ''
for b in bytes_arr:
if b == 0:
break
extracted += chr(b)
return extracted
def _extract_frequency_domain(self, image_path):
"""Extract hidden data from frequency domain"""
from matplotlib import image as mpimg
import numpy as np
img = mpimg.imread(image_path)
if len(img.shape) == 3:
img = np.mean(img, axis=2)
f = np.fft.fft2(img)
fshift = np.fft.fftshift(f)
real = np.real(fshift).flatten().astype(int)
bits = np.bitwise_and(real, 1)
bytes_arr = np.packbits(bits)
extracted = ''
for b in bytes_arr:
if b == 0:
break
extracted += chr(b)
return extracted
def generate_investigation_report(self, investigation_results):
"""Generate comprehensive investigation report"""
print("INTELLECTUAL PROPERTY THEFT INVESTIGATION")
print("=" * 50)
total_risk = investigation_results.get('risk_score', 0)
if total_risk >= 50:
risk_level = "CRITICAL"
elif total_risk >= 20:
risk_level = "HIGH"
elif total_risk >= 5:
risk_level = "MEDIUM"
else:
risk_level = "LOW"
print(f"Risk Level: {risk_level}")
print(f"Risk Score: {total_risk}")
print()
# Code fragments found
code_fragments = investigation_results.get('code_fragments', [])
if code_fragments:
print("PROPRIETARY CODE DETECTED:")
for fragment in code_fragments:
print(f" Pattern: {fragment['pattern']}")
print(f" Matches: {fragment['matches']}")
print(f" Severity: {fragment['severity']}")
print()
# Algorithm references
algo_refs = investigation_results.get('algorithm_references', [])
if algo_refs:
print("ALGORITHM REFERENCES DETECTED:")
for ref in algo_refs:
print(f" Algorithm: {ref['algorithm']}")
print(f" Context: {ref['context']}")
print(f" Severity: {ref['severity']}")
print()
# External communications
external_comms = investigation_results.get('external_communications', [])
if external_comms:
print("EXTERNAL COMMUNICATION INDICATORS:")
for comm in external_comms:
print(f" Image: {comm['image']}")
print(f" Indicators: {', '.join(comm['indicators'])}")
print(f" Timestamp: {comm['timestamp']}")
print()
print("END OF REPORT")
print("=" * 50)
Countermeasures and Evasion Techniques
Steganographers use countermeasures to evade detection, challenging steganalysts to adapt. This table outlines common evasion methods and responses.
Evasion Technique | Description | Countermeasure |
---|---|---|
Encryption | Encrypts hidden data | Cryptanalysis or key recovery |
Adaptive Steganography | Adjusts embedding to carrier | ML-based detection |
Low Embedding Rate | Hides minimal data | High-sensitivity tests |
Mimicking Natural Noise | Blends changes with noise | Advanced pattern recognition |
Understanding these tactics enhances steganalysis effectiveness.
Frequently Asked Questions
Q: What is the difference between steganography and steganalysis?
A: Steganography hides data to conceal its existence; steganalysis detects and analyzes that hidden data.
Q: Can steganalysis detect all hidden data?
A: No, advanced techniques like adaptive steganography or encryption can evade detection without specialized tools or knowledge.
Q: Is steganalysis limited to images?
A: No, it applies to audio, video, text, and other files, depending on the steganographic method.
Q: Are there legal implications for steganalysis?
A: It’s legal for legitimate purposes like security, but unauthorized data access may breach privacy laws.
Q: How effective is steganalysis against modern steganography?
A: With machine learning, it’s highly effective, though it requires expertise and resources.
Q: Can steganalysis be automated?
A: Yes, tools like StegExpose automate detection, but manual verification ensures accuracy.
Q: What are the ethical considerations?
A: Respecting privacy, obtaining authorization, and using steganalysis responsibly are key ethical concerns.
Q: How does machine learning improve steganalysis?
A: It detects subtle patterns traditional methods miss, enhancing accuracy against complex steganography.
Q: How can I learn more?
A: Use tools like zsteg, explore tutorials, join cybersecurity forums, and study digital forensics resources.
References
- Statistical Steganalysis of High Capacity Image Steganography with Cryptography - Steganalysis of high capacity Wavelet based fusion image steganography with encryption, using Image quality metrics (as a set of features) is proposed.
- zsteg GitHub - Documentation and source code for the zsteg tool, used for detecting hidden data in PNG and BMP images.
- StegOnline GitHub - A web-based, enhanced and open-source port of StegSolve. Upload any image file, and the relevant options will be displayed.
- Aletheia GitHub - Resources for Aletheia, a Python-based tool for advanced steganalysis with machine learning capabilities.
- StegExpose GitHub - Batch steganalysis tool for images, including statistical detection methods.
- Digital Forensics Guide - What is digital forensics and incident response (DFIR)?
- Machine Learning in Steganalysis - IEEE Xplore survey on the application of machine learning in steganalysis.