feat(signature): first US for ExpectationSignature (#206)#257
Conversation
|
Initial message edited to reflect the new behavior based on review/usages. Also as for the NetworkInjectorConfig, we can use a builder to quickly create them from a list of targets (from build_network_configs(["10.0.0.1", "2001:db8::1", "web.example.com"]) |
|
|
||
|
|
||
| class NetworkInjectorConfig(BaseModel): | ||
| """A single network target. Exactly one of ``target_ipv4``, ``target_ipv6``, or ``target_hostname``.""" |
There was a problem hiding this comment.
Should we use a pydantic model_validator (eventually a mode=before) to ensure there is at least one but only one of those three options?
Something like
@model_validator(mode='before')
def check_one(cls, data):
assert sum(value != None for key, value in data.items() if key in ['target_ipv4', 'target_ipv6', 'target_hostname']) == 1
return data
This comment was marked as outdated.
This comment was marked as outdated.
|
@Kakudou to keep you updated: when you speak about yet according to the engineering made beforehand (as in US-5), multiple expectation types could be found there you anticipated that properly in your models, but this doesn't translate into your function leading to a single ExpectationSignatureGroup in your signature target I've made a small commit 8b7848f in order to allow for multiple expectation types in the |
8b7848f to
9af20a8
Compare
d162a92 to
68a7c63
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #257 +/- ##
==========================================
+ Coverage 70.34% 73.04% +2.69%
==========================================
Files 49 53 +4
Lines 1966 2341 +375
==========================================
+ Hits 1383 1710 +327
- Misses 583 631 +48
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
…m/OpenAEV-Platform/client-python into feat/us-int-1-signature-lifecycle
| post_signatures = [post_signatures] | ||
|
|
||
| # Validate expectation types | ||
| expectation_types_valid = [ |
There was a problem hiding this comment.
I see your point @Megafredo and it makes sense, if an expectation isn't valid this will raise an error (make sense, that's the objective 👍 ).
But when we iterate later on line 214 (for expectation_type in expectation_types_valid:), it's for two reasons:
extra_signatures[expectation_type](meaning the key of the dict isExpectationType.DETECTION- usually we preferstras keys afaik 🤔 )expectation_type.value(so theexpectation_typevalue provided originally in the input of the function, if it goes through the check)
So, I've got two suggestions for you that seems to be a bit better if you're ok with it.
Option 1
- Line 200, we just check but don't keep the value:
[ExpectationType(expectation_type.upper()) for expectation_type in expectation_types]. This will raise an issue if it doesn't work, so we're good. - Line 229 it's just
expectation_typewithout the.value - the input dict
extra_signaturewould have strings rather thanExpectationTypeas key
Option 2
Basically, we do the heavy-lifting outside of the signature_manage.py
- We add a new model in
pyoaev.signatures.models, something likeExtraSignaturesData - We either hardcode parameters in the model such as
prevention: dict[str, JsonValue], expectation: dict[str, JsonValue], vulnerabilities: dict[str, JsonValue] - or we use a trick I suggested on collector-side (see below) that check on-the-fly the key names in the pydantic model
- in
signature_manager.pywe just type extra_signature as the newExtraSignaturesDatarather than a dict - you don't have to check if the keys match
ExpectationTypehere (we do it in the model, one way or an other) + the developers have a model to check for, easier than to look inside this function
guzmud's opinion
On my side, I prefer option 2 or something like that because it splits the issue of formatting/serializing properly the input (and it's done models side, plus it's easier to document and so forth) and the issue of using the data (which is the point of the current function).
on-the-fly checking
The trick in question is too on-the-fly check if the parameters are named after signature type. In our current case, it would be ExpectationType, so we need to change a bit the definition of _allowed_values, but that's the rough idea:
class OAEVData(BaseModel, extra="allow"):
"""
Source-side version of OAEV formatted data.
Apart from context, the allowed fields are signature types (e.g. parent_process_name)
"""
__pydantic_extra__: dict[str, str] = Field(default_factory=dict)
_allowed_values: ClassVar[frozenset[str]] = frozenset(
[sig.value for sig in SignatureTypes]
)
@model_validator(mode="before")
@classmethod
def check_field_names(cls, data: Any) -> Any:
"""Check whether the fields provided through extra are actually signature types"""
if isinstance(data, dict):
for key in data:
if key not in cls._allowed_values:
raise ValueError("Only signature types are allowed")
return data
Note that this trick may confuse IDE and could make auto-documentation a bit harder, so hardcoding the parameters (prevention, detection, etc.) could be just simpler and better.
There was a problem hiding this comment.
To be more straightforward,
in pyoaev.signatures.models,
from pydantic import BaseModel, JsonValue, Field
class ExtraSignatureData(BaseModel):
detection: dict[str, JsonValue] | None = Field(default_factory=dict)
prevention: dict[str, JsonValue] | None = Field(default_factory=dict)
vulnerability: dict[str, JsonValue] | None = Field(default_factory=dict)and in pyoaev.signatures.signature_manager,
def build_payload(
post_signatures: dict[str, Any] | list[dict[str, Any]],
expectation_types: list[str],
extra_signature: ExtraSignatureData| None = None, # we change the typing for extra_signature
) -> dict[str, Any]:`The question is how to easily access the value once inside the function. We can't do signature_data.update(extra_signatures[expectation_type]) since extra_signatures is not a dict anymore.
We have multiple options, but after a bit of thoughts, this seems to be the easiest for the dev-experience (although it asks us to maintain a bit more hardcoded things).
from pydantic import BaseModel, JsonValue, Field
class ExtraSignatureData(BaseModel):
detection: dict[str, JsonValue] | None = Field(default_factory=dict)
prevention: dict[str, JsonValue] | None = Field(default_factory=dict)
vulnerability: dict[str, JsonValue] | None = Field(default_factory=dict)
def get_extra(self, expectation_type: str):
if expectation_type == "detection":
return self.detection
if expectation_type == "prevention":
return self.prevention
if expectation_type == "vulnerability":
return self.vulnerability
raise ValueError(
f"Expectation type should be one of the available parameters: {list(self.__fields__.keys())}"
)f5d832c to
fb74062
Compare
|
Nota bene for @mariot and @Kakudou ! In order to have a better control regarding
This gives us the ability to spread the extra signatures into the various expectation types while providing a stronger typing/definition for the dev-experience. |
…tation type through the build_payload (#206) rather than handling global extra signatures through tool output in post_exec_compile
fb74062 to
a76467d
Compare
That's the first US to introduce the SignatureExpectation for the injectors.
Proposed changes
SignatureManager
Unified signature lifecycle for OpenAEV injectors: compile pre-execution signatures, merge post-execution results, and ship structured output to the backend.
Architecture
flowchart LR subgraph pyoaev/signatures SM[SignatureManager] M[models.py] CFG["InjectorConfig\n(Network / Cloud / External)"] end subgraph pyoaev/apis API[SignatureApiManager] end subgraph Backend CB["/api/injects/{id}/callback"] end CFG -->|typed input| SM SM -->|compile_pre/post| M SM -->|send_signatures| API API -->|callback\nretry + chunk| CBInjector configs (
models.py) are the typed contract: one config = one signature row.SignatureManager owns the domain logic (compile, merge, resolve IP).
SignatureApiManager owns the transport (validation, chunking, retry).
Quick Start
Injector configs
The category is encoded in the config type. Pass a single config for a single-target inject,
or a homogeneous list for a multi-target inject. Mixing config types in a single call is rejected.
NetworkInjectorConfigtarget_ipv4/target_ipv6/target_hostnameCloudInjectorConfigcloud_provider,cloud_account_id,cloud_regiontarget_serviceExternalInjectorConfigquerytarget_ipv4,target_hostnameInjectorConfigis the union type:NetworkInjectorConfig | CloudInjectorConfig | ExternalInjectorConfig.SignatureManageraddsstart_timeautomatically (plussource_ipv4/source_ipv6for network).Network
####### Network builder
build_network_configs(targets)turns a heterogeneous list of strings, dicts, or already-typedNetworkInjectorConfiginto a clean list of typed configs. Strings are auto-classified intoIPv4 / IPv6 / hostname via the stdlib
ipaddressmodule. Each input is treated as one distinctasset — a target never mixes identities.
Cloud
External
Compiled output shapes
compile_pre_execution_signaturesreturns a single flat dict for one config, or a list of dictsfor a list of configs.
Nonefields are stripped.compile_post_execution_signatures(pre, tool_output)preserves the input shape (dict in, dict out;list in, list out) and adds
end_time,execution_status, and optionalpartial_results.Anything in
tool_output["extra_signatures"]is merged into the final dict verbatim, useful forinjector-specific fields like
parent_process_nameor custom signal types.Failure modes
compile_pre_execution_signaturesValueErrorNetwork+Cloud)ValueErrorNetworkInjectorConfigwith zero or more than one identity fieldValidationErrorbuild_network_configsitem that's neitherstr,dict, nor aNetworkInjectorConfigTypeErrortool_outputin post-executionOpenAEVErrorLifecycle Flow
sequenceDiagram participant Injector participant SM as SignatureManager participant API as SignatureApiManager participant Backend Injector->>SM: compile_pre_execution_signatures(config) SM-->>Injector: pre_signatures dict/list Note over Injector: Tool executes... Injector->>SM: compile_post_execution_signatures(pre, tool_output) SM-->>Injector: merged signatures Injector->>SM: build_payload(post, target_meta, expectation_type) SM-->>Injector: nested wire payload Injector->>SM: send_signatures(inject_id, phase, signatures) SM->>API: send_signatures(inject_id, phase, signatures) API->>API: validate + normalize + chunk if needed API->>Backend: POST /api/injects/{id}/callback Backend-->>API: 200/202Transport Behaviour
max_payload_size(default 1 MiB) are split by target and sent sequentially withchunk_index/total_chunksmetadata.SignatureTransmissionErrorimmediately.Wire Format
Payloads follow the nested schema expected by the callback endpoint:
{ "phase": "execution_complete", "expectation_signature": { "targets": [ { "signature_values": [ { "expectation_type": "DETECTION", "values": [ { "signature_type": "source_ipv4", "signature_value": "172.17.0.2" }, { "signature_type": "target_ipv4", "signature_value": "10.0.0.1" }, { "signature_type": "start_time", "signature_value": "2024-06-26T06:06:06Z" }, { "signature_type": "end_time", "signature_value": "2024-06-26T06:06:09Z" }, { "signature_type": "execution_status", "signature_value": "success" } ] } ] } ] } }Known
signature_typelabels live inpyoaev.signatures.SignatureTypes(
source_ipv4_address,target_hostname_address,cloud_region,query, ...). The wire formatitself accepts any string, so injectors are free to add custom types via
tool_output.extra_signatures.Utility
Resolution strategy:
CONTAINER_IPenv var >socket.gethostbyname>hostname -i>"unknown".The result is cached for the lifetime of the manager and IPv6 is sniffed best-effort alongside.
Related issues
ContractOutputType:ExpectationSignature#206Checklist
Further comments