-
Notifications
You must be signed in to change notification settings - Fork 135
Add perceptual hashing (phash) support for image similarity detection #318
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,16 +57,17 @@ | |
| "file-name": ["chat", "item", "message", "pdf"], | ||
| "gtracker": ["domain", "item"], | ||
| "hhhash": ["domain"], | ||
| "image": ["barcode", "chat", "chat-subchannel", "chat-thread", "message", "ocr", "qrcode", "user-account"], # TODO subchannel + threads ???? | ||
| "image": ["barcode", "chat", "chat-subchannel", "chat-thread", "message", "ocr", "phash", "qrcode", "user-account", "image", "screenshot"], # TODO subchannel + threads ???? | ||
| "ip": ["ssh-key"], | ||
| "item": ["cve", "cryptocurrency", "decoded", "domain", "dom-hash", "favicon", "file-name", "gtracker", "mail", "message", "pdf", "pgp", "screenshot", "title", "username"], # chat ??? | ||
| "mail": ["domain", "item", "message"], # chat ?? | ||
| "message": ["barcode", "chat", "chat-subchannel", "chat-thread", "cve", "cryptocurrency", "decoded", "domain", "file-name", "image", "item", "mail", "ocr", "pdf", "pgp", "user-account"], | ||
| "ocr": ["chat", "chat-subchannel", "chat-thread", "cve", "cryptocurrency", "decoded", "image", "message", "pgp", "user-account"], | ||
| "pdf": ["author", "chat", "file-name", "item", "message"], | ||
| "phash": ["image", "phash"], | ||
| "pgp": ["chat", "domain", "item", "message", "ocr"], | ||
| "qrcode": ["chat", "cve", "cryptocurrency", "decoded", "domain", "image", "message", "screenshot"], # "chat-subchannel", "chat-thread" ????? | ||
| "screenshot": ["barcode", "domain", "item", "qrcode"], | ||
| "screenshot": ["barcode", "domain", "item", "qrcode", "image"], | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. a screenshot object shouldn't correlate with an image. |
||
| "ssh-key": ["domain", "ip"], | ||
| "title": ["domain", "item"], | ||
| "user-account": ["chat", "chat-subchannel", "chat-thread", "image", "message", "ocr", "username"], | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,15 @@ | |
| from flask import url_for | ||
| from pymisp import MISPObject | ||
|
|
||
| try: | ||
| from PIL import Image as PILImage | ||
| from PIL.ExifTags import TAGS | ||
| import imagehash | ||
| IMAGEHASH_AVAILABLE = True | ||
| except ImportError: | ||
| IMAGEHASH_AVAILABLE = False | ||
| TAGS = None | ||
|
|
||
| sys.path.append(os.environ['AIL_BIN']) | ||
| ################################## | ||
| # Import Project packages | ||
|
|
@@ -116,6 +125,66 @@ def get_description(self, model=None): | |
| description = description.replace("`", ' ') | ||
| return description | ||
|
|
||
| def calculate_phash(self): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All pHash-related functions should be moved to Since pHash is now implemented as a dedicated object, it can be retrieved by the correlation engine using: |
||
| """Calculate perceptual hash (pHash) for the image.""" | ||
| if not IMAGEHASH_AVAILABLE: | ||
| return None | ||
|
|
||
| if not self.exists(): | ||
| return None | ||
|
|
||
| try: | ||
| filepath = self.get_filepath() | ||
| with PILImage.open(filepath) as img: | ||
| phash = imagehash.phash(img) | ||
| return str(phash) | ||
| except Exception as e: | ||
| self.logger.warning(f"Failed to calculate phash for image {self.id}: {e}") | ||
| return None | ||
|
|
||
| def get_phash(self): | ||
| """Get perceptual hash, calculating it if not stored.""" | ||
| phash = self._get_field('phash') | ||
| if phash: | ||
| return phash | ||
|
|
||
| # Calculate and store if not exists | ||
| phash = self.calculate_phash() | ||
| if phash: | ||
| self._set_field('phash', phash) | ||
| return phash | ||
|
|
||
| def set_phash(self, phash_value): | ||
| """Store perceptual hash in image metadata.""" | ||
| if phash_value: | ||
| self._set_field('phash', phash_value) | ||
|
|
||
| def compare_phash(self, other_phash): | ||
| """ | ||
| Compare this image's phash with another phash using Hamming distance. | ||
|
|
||
| Args: | ||
| other_phash: Another phash value (string) to compare with | ||
|
|
||
| Returns: | ||
| int: Hamming distance (0-64), or None if either phash is invalid | ||
| """ | ||
| if not IMAGEHASH_AVAILABLE: | ||
| return None | ||
|
|
||
| current_phash = self.get_phash() | ||
| if not current_phash or not other_phash: | ||
| return None | ||
|
|
||
| try: | ||
| # Convert hex strings to imagehash objects for comparison | ||
| hash1 = imagehash.hex_to_hash(current_phash) | ||
| hash2 = imagehash.hex_to_hash(other_phash) | ||
| return hash1 - hash2 # Hamming distance | ||
| except Exception as e: | ||
| self.logger.warning(f"Failed to compare phash for image {self.id}: {e}") | ||
| return None | ||
|
|
||
| def get_search_document(self): | ||
| global_id = self.get_global_id() | ||
| content = self.get_description() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,121 @@ | ||
| #!/usr/bin/env python3 | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| # -*-coding:UTF-8 -* | ||
|
|
||
| import os | ||
| import sys | ||
|
|
||
| from flask import url_for | ||
| from pymisp import MISPObject | ||
|
|
||
| sys.path.append(os.environ['AIL_BIN']) | ||
| ################################## | ||
| # Import Project packages | ||
| ################################## | ||
| from lib.ConfigLoader import ConfigLoader | ||
| from lib.objects.abstract_daterange_object import AbstractDaterangeObject, AbstractDaterangeObjects | ||
|
|
||
| config_loader = ConfigLoader() | ||
| r_objects = config_loader.get_db_conn("Kvrocks_Objects") | ||
| baseurl = config_loader.get_config_str("Notifications", "ail_domain") | ||
| config_loader = None | ||
|
|
||
|
|
||
| class Phash(AbstractDaterangeObject): | ||
| """ | ||
| AIL Phash Object. | ||
| Represents a perceptual hash value for images. | ||
| """ | ||
|
|
||
| def __init__(self, id): | ||
| super(Phash, self).__init__('phash', id) | ||
|
|
||
| def delete(self): | ||
| # TODO: Implement delete functionality | ||
| pass | ||
|
|
||
| def get_link(self, flask_context=False): | ||
| if flask_context: | ||
| url = url_for('correlation.show_correlation', type=self.type, id=self.id) | ||
| else: | ||
| url = f'{baseurl}/correlation/show?type={self.type}&id={self.id}' | ||
| return url | ||
|
|
||
| def get_svg_icon(self): | ||
| # Icon for correlation graph visualization (like DomHash and HHHash) | ||
| return {'style': 'fas', 'icon': '\uf1c0', 'color': '#E1F5DF', 'radius': 5} | ||
|
|
||
| def get_misp_object(self): | ||
| obj_attrs = [] | ||
| obj = MISPObject('phash') | ||
| first_seen = self.get_first_seen() | ||
| last_seen = self.get_last_seen() | ||
| if first_seen: | ||
| obj.first_seen = first_seen | ||
| if last_seen: | ||
| obj.last_seen = last_seen | ||
| if not first_seen or not last_seen: | ||
| self.logger.warning( | ||
| f'Export error, None seen {self.type}:{self.subtype}:{self.id}, first={first_seen}, last={last_seen}') | ||
|
|
||
| obj_attrs.append(obj.add_attribute('phash', value=self.get_id())) | ||
| # Note: DomHash doesn't include tool attribute, HHHash does. Phash follows DomHash pattern. | ||
| for obj_attr in obj_attrs: | ||
| for tag in self.get_tags(): | ||
| obj_attr.add_tag(tag) | ||
| return obj | ||
|
|
||
| def get_nb_seen(self): | ||
| return self.get_nb_correlation('image') | ||
|
|
||
| def get_meta(self, options=set()): | ||
| meta = self._get_meta(options=options) | ||
| meta['id'] = self.id | ||
| meta['tags'] = self.get_tags(r_list=True) | ||
| return meta | ||
|
|
||
| def create(self, _first_seen=None, _last_seen=None): | ||
| self._create() | ||
|
|
||
|
|
||
| def create(phash_value, obj_id=None): | ||
| """ | ||
| Create or get Phash object. | ||
|
|
||
| Args: | ||
| phash_value: The phash string value | ||
| obj_id: Optional phash ID (if None, uses phash_value as ID) | ||
|
|
||
| Returns: | ||
| Phash object | ||
| """ | ||
| if obj_id is None: | ||
| obj_id = phash_value | ||
| obj = Phash(obj_id) | ||
| if not obj.exists(): | ||
| obj.create() | ||
| return obj | ||
|
|
||
|
|
||
| class Phashs(AbstractDaterangeObjects): | ||
| """ | ||
| Phash Objects | ||
| """ | ||
| def __init__(self): | ||
| super().__init__('phash', Phash) | ||
|
|
||
| def get_name(self): | ||
| return 'Phashs' | ||
|
|
||
| def get_icon(self): | ||
| return {'fa': 'fa-solid', 'icon': 'image'} | ||
|
|
||
| def get_link(self, flask_context=False): | ||
| if flask_context: | ||
| url = url_for('objects_phash.objects_phashes') | ||
| else: | ||
| url = f'{baseurl}/objects/phashes' | ||
| return url | ||
|
|
||
| def sanitize_id_to_search(self, name_to_search): | ||
| return name_to_search | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,13 @@ | |
| from flask import url_for | ||
| from pymisp import MISPObject | ||
|
|
||
| try: | ||
| from PIL import Image as PILImage | ||
| import imagehash | ||
| IMAGEHASH_AVAILABLE = True | ||
| except ImportError: | ||
| IMAGEHASH_AVAILABLE = False | ||
|
|
||
| sys.path.append(os.environ['AIL_BIN']) | ||
| ################################## | ||
| # Import Project packages | ||
|
|
@@ -114,6 +121,40 @@ def get_description(self, model=None): | |
| model = get_default_image_description_model() | ||
| return self._get_field(f'desc:{model}') | ||
|
|
||
| def calculate_phash(self): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All pHash-related functions should be moved to Since pHash is now implemented as a dedicated object, it can be retrieved by the correlation engine using:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did this in commit 6556837 |
||
| """Calculate perceptual hash (pHash) for the screenshot.""" | ||
| if not IMAGEHASH_AVAILABLE: | ||
| return None | ||
|
|
||
| if not self.exists(): | ||
| return None | ||
|
|
||
| try: | ||
| filepath = self.get_filepath() | ||
| with PILImage.open(filepath) as img: | ||
| phash = imagehash.phash(img) | ||
| return str(phash) | ||
| except Exception as e: | ||
| # Log error if needed | ||
| return None | ||
|
|
||
| def get_phash(self): | ||
| """Get perceptual hash, calculating it if not stored.""" | ||
| phash = self._get_field('phash') | ||
| if phash: | ||
| return phash | ||
|
|
||
| # Calculate and store if not exists | ||
| phash = self.calculate_phash() | ||
| if phash: | ||
| self._set_field('phash', phash) | ||
| return phash | ||
|
|
||
| def set_phash(self, phash_value): | ||
| """Store perceptual hash in screenshot metadata.""" | ||
| if phash_value: | ||
| self._set_field('phash', phash_value) | ||
|
|
||
| def get_search_document(self): | ||
| global_id = self.get_global_id() | ||
| content = self.get_description() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| #!/usr/bin/env python3 | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer if the The combined module should first check whether the image already has an existing pHash correlation using: If no pHash exists, it should:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with this. This is a better design |
||
| # -*-coding:UTF-8 -* | ||
| """ | ||
| The ImagePhash Module | ||
| ====================== | ||
|
|
||
| Calculates perceptual hash (phash) for images when they are imported. | ||
| Creates Phash objects and correlates them with Images. | ||
| """ | ||
|
|
||
| ################################## | ||
| # Import External packages | ||
| ################################## | ||
| import os | ||
| import sys | ||
|
|
||
| sys.path.append(os.environ['AIL_BIN']) | ||
| ################################## | ||
| # Import Project packages | ||
| ################################## | ||
| from modules.abstract_module import AbstractModule | ||
| from lib.objects import Images | ||
| from lib.objects import Phashs | ||
|
|
||
|
|
||
| class Phash(AbstractModule): | ||
| """ | ||
| Phash module for AIL framework | ||
| Calculates perceptual hash for images and creates Phash objects | ||
| """ | ||
|
|
||
| def __init__(self): | ||
| super(Phash, self).__init__() | ||
|
|
||
| # Waiting time in seconds between to message processed | ||
| self.pending_seconds = 1 | ||
|
|
||
| # Send module state to logs | ||
| self.logger.info(f'Module {self.module_name} initialized') | ||
|
|
||
| def compute(self, message): | ||
| image = self.get_obj() | ||
| date = message | ||
|
|
||
| # Calculate phash | ||
| phash_value = image.calculate_phash() | ||
| if not phash_value: | ||
| self.logger.warning(f'Failed to calculate phash for image {image.id}') | ||
| return None | ||
|
|
||
| # Store phash in image metadata (for backward compatibility and quick access) | ||
| image.set_phash(phash_value) | ||
|
|
||
| # Create or get Phash object | ||
| phash_obj = Phashs.create(phash_value) | ||
|
|
||
| # Correlate Phash ↔ Image (using add() which automatically creates correlation) | ||
| phash_obj.add(date, image) | ||
|
|
||
| self.logger.debug(f'Created Phash object {phash_value} for image {image.id}') | ||
|
|
||
| # Queue Phash object for correlation processing | ||
| self.add_message_to_queue(obj=phash_obj, queue='PhashCorrelation', message=date) | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
|
|
||
| module = Phash() | ||
| module.run() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the image object shouldn't correlate with another image or screenshot.
"phash": ["image", "phash"],is used to correlate phash with images and screenshotsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is in the new code