Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master (unreleased)

### New Features
- freeze: add `--reproducible` flag that zeros dynamic header metadata

### Breaking Changes

Expand Down Expand Up @@ -113,7 +114,7 @@
- tests: update binja version to 5.3 @mr-tz #3011
- ci: use explicit and per job permissions @mike-hunhoff #3002
- replace black/isort/flake8 with ruff @mike-hunhoff #2992

- tests: add snapshot tests for feature extraction @williballenthin #3069
- ci: update GitHub Actions to support Node.js 24 (deprecate Node.js 20) @mr-tz #2984

### Raw diffs
Expand Down Expand Up @@ -259,7 +260,6 @@ Additionally a Binary Ninja bug has been fixed. Released binaries now include AR
- nursery/get-dotnet-assembly-entry-point mehunhoff@google.com

### Bug Fixes

- binja: fix a crash during feature extraction when the MLIL is unavailable @xusheng6 #2714

### capa Explorer Web
Expand Down
2 changes: 1 addition & 1 deletion capa/features/extractors/dotnetfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def extract_file_namespace_features(pe: dnfile.dnPE, **kwargs) -> Iterator[tuple
# namespaces may be empty, discard
namespaces.discard("")

for namespace in namespaces:
for namespace in sorted(namespaces):
# namespace do not have an associated token, so we yield 0x0
yield Namespace(namespace), NO_ADDRESS

Expand Down
93 changes: 77 additions & 16 deletions capa/features/freeze/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,7 @@ def from_capa(cls, a: capa.features.address.Address) -> "Address":
return cls(type=AddressType.THREAD, value=(a.process.ppid, a.process.pid, a.tid))

elif isinstance(a, capa.features.address.DynamicCallAddress):
return cls(
type=AddressType.CALL,
value=(a.thread.process.ppid, a.thread.process.pid, a.thread.tid, a.id),
)
return cls(type=AddressType.CALL, value=(a.thread.process.ppid, a.thread.process.pid, a.thread.tid, a.id))

elif a == capa.features.address.NO_ADDRESS or isinstance(a, capa.features.address._NoAddress):
return cls(type=AddressType.NO_ADDRESS, value=None)
Expand Down Expand Up @@ -346,9 +343,14 @@ class Freeze(BaseModel):
model_config = ConfigDict(populate_by_name=True)


def dumps_static(extractor: StaticFeatureExtractor) -> str:
def dumps_static(extractor: StaticFeatureExtractor, reproducible: bool = False) -> str:
"""
serialize the given extractor to a string

When `reproducible` is true, the freeze's dynamic header metadata (e.g. the
embedded capa version) is zeroed out so that output is identical across
capa versions for a given extractor. This is used by the feature snapshot
tests to keep fixtures stable across version bumps.
"""
global_features: list[GlobalFeature] = []
for feature, _ in extractor.extract_global_features():
Expand All @@ -357,6 +359,7 @@ def dumps_static(extractor: StaticFeatureExtractor) -> str:
feature=feature_from_capa(feature),
)
)
global_features.sort(key=lambda gf: gf.feature.model_dump_json())

file_features: list[FileFeature] = []
for feature, address in extractor.extract_file_features():
Expand All @@ -366,6 +369,7 @@ def dumps_static(extractor: StaticFeatureExtractor) -> str:
address=Address.from_capa(address),
)
)
file_features.sort(key=lambda ff: (ff.address, ff.feature.model_dump_json()))

function_features: list[FunctionFeatures] = []
for f in extractor.get_functions():
Expand All @@ -378,6 +382,7 @@ def dumps_static(extractor: StaticFeatureExtractor) -> str:
)
for feature, addr in extractor.extract_function_features(f)
]
ffeatures.sort(key=lambda ff: (ff.address, ff.feature.model_dump_json()))

basic_blocks = []
for bb in extractor.get_basic_blocks(f):
Expand All @@ -390,6 +395,7 @@ def dumps_static(extractor: StaticFeatureExtractor) -> str:
)
for feature, addr in extractor.extract_basic_block_features(f, bb)
]
bbfeatures.sort(key=lambda bf: (bf.address, bf.feature.model_dump_json()))

instructions = []
for insn in extractor.get_instructions(f, bb):
Expand All @@ -402,6 +408,7 @@ def dumps_static(extractor: StaticFeatureExtractor) -> str:
)
for feature, addr in extractor.extract_insn_features(f, bb, insn)
]
ifeatures.sort(key=lambda i: (i.address, i.feature.model_dump_json()))

instructions.append(
InstructionFeatures(
Expand All @@ -410,6 +417,7 @@ def dumps_static(extractor: StaticFeatureExtractor) -> str:
)
)

instructions.sort(key=lambda i: i.address)
basic_blocks.append(
BasicBlockFeatures(
address=bbaddr,
Expand All @@ -418,6 +426,7 @@ def dumps_static(extractor: StaticFeatureExtractor) -> str:
)
)

basic_blocks.sort(key=lambda bb: bb.address)
function_features.append(
FunctionFeatures(
address=faddr,
Expand All @@ -426,28 +435,33 @@ def dumps_static(extractor: StaticFeatureExtractor) -> str:
)
)

function_features.sort(key=lambda ff: ff.address)

features = StaticFeatures(
global_=global_features, # type: ignore[call-arg] # pydantic alias "global" not recognized by type checkers
file=tuple(file_features),
functions=tuple(function_features),
)

extractor_version = "" if reproducible else capa.version.__version__
freeze = Freeze(
version=CURRENT_VERSION,
base_address=Address.from_capa(extractor.get_base_address()), # type: ignore[call-arg] # pydantic alias "base address" not recognized by type checkers
sample_hashes=extractor.get_sample_hashes(),
flavor="static",
extractor=Extractor(name=extractor.__class__.__name__),
extractor=Extractor(name=extractor.__class__.__name__, version=extractor_version),
features=features,
)
# type checkers are unable to recognise `base_address` as an argument due to alias

return freeze.model_dump_json()


def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
def dumps_dynamic(extractor: DynamicFeatureExtractor, reproducible: bool = False) -> str:
"""
serialize the given extractor to a string

See `dumps_static` for `reproducible`.
"""
global_features: list[GlobalFeature] = []
for feature, _ in extractor.extract_global_features():
Expand All @@ -456,6 +470,7 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
feature=feature_from_capa(feature),
)
)
global_features.sort(key=lambda gf: gf.feature.model_dump_json())

file_features: list[FileFeature] = []
for feature, address in extractor.extract_file_features():
Expand All @@ -465,6 +480,7 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
address=Address.from_capa(address),
)
)
file_features.sort(key=lambda ff: (ff.address, ff.feature.model_dump_json()))

process_features: list[ProcessFeatures] = []
for p in extractor.get_processes():
Expand All @@ -478,6 +494,7 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
)
for feature, addr in extractor.extract_process_features(p)
]
pfeatures.sort(key=lambda pf: (pf.address, pf.feature.model_dump_json()))

threads = []
for t in extractor.get_threads(p):
Expand All @@ -490,6 +507,7 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
)
for feature, addr in extractor.extract_thread_features(p, t)
]
tfeatures.sort(key=lambda tf: (tf.address, tf.feature.model_dump_json()))

calls = []
for call in extractor.get_calls(p, t):
Expand All @@ -503,6 +521,7 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
)
for feature, addr in extractor.extract_call_features(p, t, call)
]
cfeatures.sort(key=lambda cf: (cf.address, cf.feature.model_dump_json()))

calls.append(
CallFeatures(
Expand All @@ -512,6 +531,7 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
)
)

calls.sort(key=lambda c: c.address)
threads.append(
ThreadFeatures(
address=taddr,
Expand All @@ -520,6 +540,7 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
)
)

threads.sort(key=lambda t: t.address)
process_features.append(
ProcessFeatures(
address=paddr,
Expand All @@ -529,6 +550,8 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
)
)

process_features.sort(key=lambda pf: pf.address)

features = DynamicFeatures(
global_=global_features, # type: ignore[call-arg] # pydantic alias "global" not recognized by type checkers
file=tuple(file_features),
Expand All @@ -539,12 +562,13 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
get_base_addr = getattr(extractor, "get_base_address", None)
base_addr = get_base_addr() if get_base_addr else capa.features.address.NO_ADDRESS

extractor_version = "" if reproducible else capa.version.__version__
freeze = Freeze(
version=CURRENT_VERSION,
base_address=Address.from_capa(base_addr), # type: ignore[call-arg] # pydantic alias "base address" not recognized by type checkers
sample_hashes=extractor.get_sample_hashes(),
flavor="dynamic",
extractor=Extractor(name=extractor.__class__.__name__),
extractor=Extractor(name=extractor.__class__.__name__, version=extractor_version),
features=features,
)
# type checkers are unable to recognise `base_address` as an argument due to alias
Expand Down Expand Up @@ -627,28 +651,28 @@ def loads_dynamic(s: str) -> DynamicFeatureExtractor:
MAGIC = "capa0000".encode("ascii")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump this now?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or should it stay stable and we change it accordingly?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think the format has changed, so i don't think it should be bumped. what were you thinking?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about the now sorted features?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imho, we didn't previously guarantee this, and now we do, but the data is the same, just the order is different, so i believe this is backwards compatible.

however, this doesn't change the way the null feature extractors expose the features - they can still sort the data upon loading. so it's really just about having a stable, reproducible hash for the same data.



def dumps(extractor: FeatureExtractor) -> str:
def dumps(extractor: FeatureExtractor, reproducible: bool = False) -> str:
"""serialize the given extractor to a string."""
if isinstance(extractor, StaticFeatureExtractor):
doc = dumps_static(extractor)
doc = dumps_static(extractor, reproducible=reproducible)
elif isinstance(extractor, DynamicFeatureExtractor):
doc = dumps_dynamic(extractor)
doc = dumps_dynamic(extractor, reproducible=reproducible)
else:
raise ValueError("Invalid feature extractor")

return doc


def dump(extractor: FeatureExtractor) -> bytes:
def dump(extractor: FeatureExtractor, reproducible: bool = False) -> bytes:
"""serialize the given extractor to a byte array."""
return MAGIC + zlib.compress(dumps(extractor).encode("utf-8"))
return MAGIC + zlib.compress(dumps(extractor, reproducible=reproducible).encode("utf-8"))


def is_freeze(buf: bytes) -> bool:
return buf[: len(MAGIC)] == MAGIC


def loads(s: str):
def loads(s: str) -> FeatureExtractor:
doc = json.loads(s)

if doc["version"] != CURRENT_VERSION:
Expand All @@ -662,7 +686,7 @@ def loads(s: str):
raise ValueError(f"unsupported freeze format flavor: {doc['flavor']}")


def load(buf: bytes):
def load(buf: bytes) -> FeatureExtractor:
"""deserialize a set of features (as a NullFeatureExtractor) from a byte array."""
if not is_freeze(buf):
raise ValueError("missing magic header")
Expand All @@ -685,6 +709,11 @@ def main(argv=None):
parser = argparse.ArgumentParser(description="save capa features to a file")
capa.main.install_common_args(parser, {"input_file", "format", "backend", "os", "signatures"})
parser.add_argument("output", type=str, help="Path to output file")
parser.add_argument(
"--reproducible",
action="store_true",
help="zero out dynamic header metadata (e.g. capa version) so output is stable across capa versions",
)
args = parser.parse_args(args=argv)

try:
Expand All @@ -696,11 +725,43 @@ def main(argv=None):
except capa.main.ShouldExitError as e:
return e.status_code

Path(args.output).write_bytes(dump(extractor))
output_path = Path(args.output)
output_path.write_bytes(dump(extractor, reproducible=args.reproducible))

# Log a manifest entry for the feature snapshot tests at INFO level. This
# makes it easy to copy/paste into
# `tests/fixtures/snapshots/features/manifest.json` when adding a new
# fixture or refreshing an existing one.
entry: dict[str, str] = {
"name": output_path.stem,
"sample": str(args.input_file),
"freeze": output_path.name,
}
if args.format and args.format != "auto":
entry["format"] = args.format
if args.backend and args.backend != "auto":
entry["backend"] = args.backend
if args.os and args.os != "auto":
entry["os"] = args.os
commit = _git_head_commit()
if commit:
entry["generated_at_commit"] = commit
logger.info("manifest entry: %s", json.dumps(entry))

return 0


def _git_head_commit() -> str:
"""Return the HEAD commit, or empty string if this isn't a git checkout."""
import subprocess

try:
out = subprocess.check_output(["git", "rev-parse", "HEAD"], stderr=subprocess.DEVNULL)
except (subprocess.CalledProcessError, FileNotFoundError, OSError):
return ""
return out.decode("ascii", errors="replace").strip()


if __name__ == "__main__":
import sys

Expand Down
19 changes: 19 additions & 0 deletions capa/features/freeze/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys

from capa.features.freeze import main

sys.exit(main())
Loading
Loading