diff --git a/filter-plugin/logstash-filter-documentdb-aws-guardium/README.md b/filter-plugin/logstash-filter-documentdb-aws-guardium/README.md index 5308630a0..7a0eb1c76 100644 --- a/filter-plugin/logstash-filter-documentdb-aws-guardium/README.md +++ b/filter-plugin/logstash-filter-documentdb-aws-guardium/README.md @@ -99,7 +99,32 @@ The Guardium universal connector is the Guardium entry point for native audit/pr 9. Click ```Save```. Guardium validates the new connector and displays it in the Configure Universal Connector page. 10. After the offline plug-in is installed and the configuration is uploaded and saved in the Guardium machine, restart the Universal Connector using the ```Disable/Enable``` button. -## 6. Limitations +## 6. Handling Large Events and JSON Truncation + +The DocumentDB filter includes enhanced handling for large JSON events that may be truncated during transmission. For detailed information about truncation detection, configuration, and troubleshooting, see [TRUNCATION_HANDLING.md](TRUNCATION_HANDLING.md). + +### Quick Configuration + +To handle large events, you can configure the maximum JSON size threshold: + +```ruby +filter { + documentdb_guardium_filter { + max_json_size_bytes => 2097152 # 2MB (default is 1MB) + } +} +``` + +### Error Tags + +Events that cannot be parsed are tagged for monitoring: +- `_documentdbguardium_json_truncated` - JSON appears truncated +- `_documentdbguardium_json_parse_error` - General parsing errors +- `_documentdbguardium_json_depth_error` - JSON nesting or structure issues + +See [TRUNCATION_HANDLING.md](TRUNCATION_HANDLING.md) for comprehensive troubleshooting guidance. + +## 7. Limitations - DocumentDB Profiler logs capture any database operations that take longer than some period of time(e. g. 100 ms). If the threshold value is not configurable and set value is too high, then profiler logs may not get captured for every database operation. - The Following important fields couldn't be mapped with DocumentDB audit/profiler logs - Source program : Only available in case of "aggregate" query diff --git a/filter-plugin/logstash-filter-documentdb-aws-guardium/TRUNCATION_HANDLING.md b/filter-plugin/logstash-filter-documentdb-aws-guardium/TRUNCATION_HANDLING.md new file mode 100644 index 000000000..5dc9262dc --- /dev/null +++ b/filter-plugin/logstash-filter-documentdb-aws-guardium/TRUNCATION_HANDLING.md @@ -0,0 +1,192 @@ +# DocumentDB Guardium Filter - JSON Truncation Handling + +## Overview + +This document describes the enhanced error handling for JSON truncation issues in the DocumentDB Guardium filter. + +## Problem + +Large DocumentDB audit/profiler events can be truncated during transmission or logging, resulting in malformed JSON that causes parsing errors like: + +``` +JsonSyntaxException: Unterminated array at line 1 column 1518 path $.param.args.updates[2] +``` + +These errors occur when JSON messages are cut off mid-value (e.g., `"upsert":tru...` instead of `"upsert":true`). + +## Solution + +The filter now includes comprehensive truncation detection and handling: + +### 1. **Pre-Parse Validation** +- Validates JSON structure before attempting to parse +- Detects common truncation patterns: + - Missing closing brackets + - Unterminated strings + - Truncated boolean/null values (`tru`, `fal`, `nul`) + - Ellipsis patterns (`...`) + +### 2. **Size Monitoring** +- Configurable maximum JSON size threshold (default: 1MB) +- Warns when messages exceed the threshold +- Helps identify potential truncation sources + +### 3. **Enhanced Error Detection** +- Identifies truncation-specific errors in `JsonSyntaxException` +- Distinguishes between truncation and other parsing errors +- Provides detailed error context + +### 4. **Improved Logging** +- Separate tags for different error types: + - `_documentdbguardium_json_truncated` - Truncated JSON + - `_documentdbguardium_json_parse_error` - Other parsing errors + - `_documentdbguardium_json_depth_error` - Nesting/structure errors +- Concise event identifiers (event_id, log_group, type, cluster) +- Message size reporting for truncation errors + +## Configuration + +### New Configuration Option + +```ruby +filter { + documentdb_guardium_filter { + # Maximum JSON message size in bytes (default: 1048576 = 1MB) + max_json_size_bytes => 2097152 # 2MB + } +} +``` + +### Recommended Settings + +For environments with large DocumentDB operations: +- Increase `max_json_size_bytes` to 2-5MB +- Monitor CloudWatch log group settings for size limits +- Consider enabling CloudWatch Logs data protection + +## Error Tags + +Events are tagged based on the error type: + +| Tag | Description | Action | +|-----|-------------|--------| +| `_documentdbguardium_json_truncated` | JSON appears truncated | Increase log size limits or filter large operations | +| `_documentdbguardium_json_parse_error` | General parsing error | Review JSON structure | +| `_documentdbguardium_json_depth_error` | Nesting too deep or invalid structure | Review event complexity | + +## Monitoring + +### Log Patterns to Watch + +**Truncation Detected:** +``` +DocumentDB filter: JSON truncation detected in audit event. Message size: 2048 bytes. Error: Unterminated array... +``` + +**Size Warning:** +``` +DocumentDB filter: JSON message exceeds max size (1500000 bytes), truncating may occur. +``` + +### Metrics to Track + +1. Count of `_documentdbguardium_json_truncated` tags +2. Average message sizes for truncated events +3. Frequency of truncation by cluster/log_group + +## Troubleshooting + +### High Truncation Rate + +**Symptoms:** Many events tagged with `_documentdbguardium_json_truncated` + +**Solutions:** +1. **Increase CloudWatch Logs limits:** + - Check CloudWatch log group retention and size settings + - Verify no intermediate log processors are truncating + +2. **Filter large operations:** + ```ruby + filter { + if [message] =~ /large_collection/ { + drop { } + } + } + ``` + +3. **Increase buffer sizes:** + - Adjust Logstash pipeline batch sizes + - Review input plugin buffer settings + +### Specific Operations Truncating + +**Symptoms:** Truncation occurs for specific operation types (e.g., large updates) + +**Solutions:** +1. **Exclude problematic operations:** + ```ruby + filter { + if [message] =~ /"update".*"updates":\[/ and [message] !~ /\}\}$/ { + mutate { + add_tag => ["_skip_large_update"] + } + } + } + ``` + +2. **Sample large operations:** + - Use sampling to reduce volume of large events + - Focus on metadata rather than full payloads + +### False Positives + +**Symptoms:** Valid JSON tagged as truncated + +**Solutions:** +1. Review truncation pattern detection in [`validateJson()`](src/main/java/com/ibm/guardium/documentdb/DocumentdbGuardiumFilter.java:362) +2. Adjust pattern matching if needed +3. Report false positives for filter improvement + +## Best Practices + +1. **Set appropriate size limits** based on your DocumentDB operation patterns +2. **Monitor truncation rates** and adjust configuration accordingly +3. **Use event identifiers** in logs to correlate with source events +4. **Review CloudWatch settings** to ensure logs aren't truncated upstream +5. **Consider operation filtering** for extremely large operations that don't need auditing + +## Technical Details + +### Truncation Detection Methods + +1. **Structural validation:** + - Checks for matching opening/closing brackets + - Validates JSON starts with `{` or `[` + +2. **Pattern matching:** + - Regex patterns for incomplete values + - End-of-string analysis for truncation markers + +3. **Exception analysis:** + - Parses `JsonSyntaxException` messages + - Identifies truncation-specific error patterns + +### Performance Impact + +- Pre-validation adds minimal overhead (~1-2ms per event) +- Pattern matching is optimized for common cases +- No impact on successfully parsed events + +## Related Files + +- [`DocumentdbGuardiumFilter.java`](src/main/java/com/ibm/guardium/documentdb/DocumentdbGuardiumFilter.java) - Main filter implementation +- [`Parser.java`](src/main/java/com/ibm/guardium/documentdb/Parser.java) - JSON parsing logic +- [`README.md`](README.md) - General filter documentation + +## Support + +For issues or questions: +1. Check logs for specific error messages and event identifiers +2. Review configuration settings +3. Monitor truncation patterns and frequencies +4. Report persistent issues with example events (sanitized) \ No newline at end of file diff --git a/filter-plugin/logstash-filter-documentdb-aws-guardium/gradle.properties b/filter-plugin/logstash-filter-documentdb-aws-guardium/gradle.properties new file mode 100644 index 000000000..50ca5f179 --- /dev/null +++ b/filter-plugin/logstash-filter-documentdb-aws-guardium/gradle.properties @@ -0,0 +1,3 @@ +LOGSTASH_CORE_PATH=/Users/taees/Development/logstash/logstash-core +GUARDIUM_UNIVERSALCONNECTOR_COMMONS_PATH=/Users/taees/Development/public_repo/universal-connectors/common/build/libs + diff --git a/filter-plugin/logstash-filter-documentdb-aws-guardium/src/main/java/com/ibm/guardium/documentdb/DocumentdbGuardiumFilter.java b/filter-plugin/logstash-filter-documentdb-aws-guardium/src/main/java/com/ibm/guardium/documentdb/DocumentdbGuardiumFilter.java index cb2ba1984..7616d9e9d 100644 --- a/filter-plugin/logstash-filter-documentdb-aws-guardium/src/main/java/com/ibm/guardium/documentdb/DocumentdbGuardiumFilter.java +++ b/filter-plugin/logstash-filter-documentdb-aws-guardium/src/main/java/com/ibm/guardium/documentdb/DocumentdbGuardiumFilter.java @@ -37,8 +37,10 @@ public class DocumentdbGuardiumFilter implements Filter { public static final PluginConfigSpec SOURCE_CONFIG = PluginConfigSpec.stringSetting("source", "message"); + public static final PluginConfigSpec MAX_JSON_SIZE_CONFIG = PluginConfigSpec.numSetting("max_json_size_bytes", 1048576L); // 1MB default public static final String LOGSTASH_TAG_JSON_PARSE_ERROR = "_documentdbguardium_json_parse_error"; public static final String LOGSTASH_TAG_JSON_DEPTH_ERROR = "_documentdbguardium_json_depth_error"; + public static final String LOGSTASH_TAG_JSON_TRUNCATED = "_documentdbguardium_json_truncated"; /* * skipping non-relevant log events like * "successful authenticate", and other events @@ -79,17 +81,19 @@ public class DocumentdbGuardiumFilter implements Filter { private static Logger log = LogManager.getLogger(DocumentdbGuardiumFilter.class); Parser parser; private String id; + private long maxJsonSizeBytes; public DocumentdbGuardiumFilter(String id, Configuration config, Context context) { // constructors should validate configuration options this.id = id; this.parser = new Parser(); + this.maxJsonSizeBytes = config.get(MAX_JSON_SIZE_CONFIG); } @Override public Collection> configSchema() { // should return a list of all configuration options for this plugin - return Collections.singletonList(SOURCE_CONFIG); + return Arrays.asList(SOURCE_CONFIG, MAX_JSON_SIZE_CONFIG); } @Override @@ -108,10 +112,24 @@ public Collection filter(Collection events, FilterMatchListener ma if (e.getField("message") instanceof String) { messageString = e.getField("message").toString(); + // Check JSON size before processing + if (messageString.length() > maxJsonSizeBytes) { + log.warn("DocumentDB filter: JSON message exceeds max size ({} bytes), truncating may occur. Event: {}", + messageString.length(), getEventIdentifier(e)); + } - if (!isProperlyClosedJson(messageString)) { - log.error("DocumentDB filter: JSON validation failed (truncated or too large)"); - e.tag(LOGSTASH_TAG_JSON_DEPTH_ERROR); + // Validate JSON structure + JsonValidationResult validationResult = validateJson(messageString); + if (!validationResult.isValid) { + if (validationResult.isTruncated) { + log.error("DocumentDB filter: JSON appears truncated ({}). Event: {}", + validationResult.reason, getEventIdentifier(e)); + e.tag(LOGSTASH_TAG_JSON_TRUNCATED); + } else { + log.error("DocumentDB filter: JSON validation failed ({}). Event: {}", + validationResult.reason, getEventIdentifier(e)); + e.tag(LOGSTASH_TAG_JSON_DEPTH_ERROR); + } continue; } if (messageString.contains(DOCUMENTDB_AUDIT_SIGNAL)) {// This is an audit event @@ -150,11 +168,22 @@ public Collection filter(Collection events, FilterMatchListener ma e.tag(LOGSTASH_TAG_JSON_PARSE_ERROR); System.gc(); // Suggest garbage collection } catch (JsonSyntaxException jse) { - log.error( - "DocumentDB filter: Error parsing docDb audit event {} \n {} ", - logEvent(e), - formatJsonSyntaxException(jse)); - e.tag(LOGSTASH_TAG_JSON_PARSE_ERROR); + String errorMsg = formatJsonSyntaxException(jse); + if (isTruncationError(jse)) { + log.error( + "DocumentDB filter: JSON truncation detected in audit event. " + + "Message size: {} bytes. Error: {}. Event: {}", + messageString != null ? messageString.length() : 0, + errorMsg, + getEventIdentifier(e)); + e.tag(LOGSTASH_TAG_JSON_TRUNCATED); + } else { + log.error( + "DocumentDB filter: Error parsing docDb audit event. Error: {}. Event: {}", + errorMsg, + getEventIdentifier(e)); + e.tag(LOGSTASH_TAG_JSON_PARSE_ERROR); + } } catch (Exception exception) { // don't let event pass filter // events.remove(e); @@ -202,11 +231,22 @@ public Collection filter(Collection events, FilterMatchListener ma e.tag(LOGSTASH_TAG_JSON_PARSE_ERROR); System.gc(); // Suggest garbage collection } catch (JsonSyntaxException jse) { - log.error( - "DocumentDB filter: Error parsing docDb profiler event {} \n {} ", - logEvent(e), - formatJsonSyntaxException(jse)); - e.tag(LOGSTASH_TAG_JSON_PARSE_ERROR); + String errorMsg = formatJsonSyntaxException(jse); + if (isTruncationError(jse)) { + log.error( + "DocumentDB filter: JSON truncation detected in profiler event. " + + "Message size: {} bytes. Error: {}. Event: {}", + messageString != null ? messageString.length() : 0, + errorMsg, + getEventIdentifier(e)); + e.tag(LOGSTASH_TAG_JSON_TRUNCATED); + } else { + log.error( + "DocumentDB filter: Error parsing docDb profiler event. Error: {}. Event: {}", + errorMsg, + getEventIdentifier(e)); + e.tag(LOGSTASH_TAG_JSON_PARSE_ERROR); + } } catch (Exception exception) { // don't let event pass filter // events.remove(e); @@ -329,32 +369,125 @@ private static String logEvent(Event event) { } } - private static boolean isProperlyClosedJson(String json) { + /** + * Validation result class for JSON validation + */ + private static class JsonValidationResult { + boolean isValid; + boolean isTruncated; + String reason; + + JsonValidationResult(boolean isValid, boolean isTruncated, String reason) { + this.isValid = isValid; + this.isTruncated = isTruncated; + this.reason = reason; + } + } + + /** + * Validates JSON structure and detects truncation + */ + private static JsonValidationResult validateJson(String json) { if (json == null || json.isEmpty()) { - return false; + return new JsonValidationResult(false, false, "empty or null"); } - // Trim whitespace String trimmed = json.trim(); if (trimmed.isEmpty()) { - return false; + return new JsonValidationResult(false, false, "empty after trim"); } char first = trimmed.charAt(0); char last = trimmed.charAt(trimmed.length() - 1); - // Check if starts with { or [ and ends with matching } or ] + // Check for proper JSON structure if (first == '{' && last == '}') { - return true; + // Additional check for truncation patterns + if (isTruncatedPattern(trimmed)) { + return new JsonValidationResult(false, true, "truncation pattern detected"); + } + return new JsonValidationResult(true, false, "valid"); } if (first == '[' && last == ']') { - return true; + if (isTruncatedPattern(trimmed)) { + return new JsonValidationResult(false, true, "truncation pattern detected"); + } + return new JsonValidationResult(true, false, "valid"); } - // Not properly closed or not valid JSON structure + // Check if it looks truncated + if (first == '{' || first == '[') { + return new JsonValidationResult(false, true, "missing closing bracket"); + } + + return new JsonValidationResult(false, false, "invalid JSON structure"); + } + + /** + * Detects common truncation patterns in JSON strings + */ + private static boolean isTruncatedPattern(String json) { + // Check for common truncation patterns at the end + String end = json.substring(Math.max(0, json.length() - 20)); + + // Patterns like: "...tru, "...fal, "...\"hel, etc. + if (end.matches(".*\"[^\"]*$") || // Unterminated string + end.matches(".*:tru$") || // Truncated "true" + end.matches(".*:fal$") || // Truncated "false" + end.matches(".*:nul$") || // Truncated "null" + end.matches(".*\\.\\.\\.$")) { // Ellipsis indicating truncation + return true; + } + return false; } + /** + * Checks if JsonSyntaxException indicates truncation + */ + private static boolean isTruncationError(JsonSyntaxException jse) { + String message = jse.getMessage(); + if (message == null) { + return false; + } + + // Common truncation error patterns + return message.contains("Unterminated") || + message.contains("Expected") && message.contains("but was END_DOCUMENT") || + message.contains("End of input") || + message.contains("Unexpected end of JSON"); + } + + /** + * Gets a concise event identifier for logging + */ + private static String getEventIdentifier(Event event) { + StringBuilder sb = new StringBuilder(128); + sb.append("{"); + + // Include key identifying fields + appendField(sb, event, "event_id", true); + appendField(sb, event, "log_group", false); + appendField(sb, event, "type", false); + appendField(sb, event, "cluster", false); + + sb.append("}"); + return sb.toString(); + } + + /** + * Helper to append field to identifier string + */ + private static void appendField(StringBuilder sb, Event event, String fieldName, boolean first) { + Object value = event.getField(fieldName); + if (value != null) { + if (!first) { + sb.append(", "); + } + sb.append(fieldName).append("=").append(value); + } + } + /** * Formats JsonSyntaxException message, truncating everything from the troubleshooting link * onwards diff --git a/filter-plugin/logstash-filter-dsql-guardium/.gitignore b/filter-plugin/logstash-filter-dsql-guardium/.gitignore new file mode 100644 index 000000000..288053a14 --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/.gitignore @@ -0,0 +1,37 @@ +# Gradle +.gradle/ +build/ +out/ + +# IDE +.idea/ +*.iml +*.ipr +*.iws +.vscode/ +.settings/ +.classpath +.project + +# OS +.DS_Store +Thumbs.db + +# Logstash +vendor/ +Gemfile +Gemfile.lock +*.gemspec +*.gem +lib/ + +# Logs +*.log + +# Temporary files +*.tmp +*.bak +*.swp +*~ + +# Made with Bob diff --git a/filter-plugin/logstash-filter-dsql-guardium/CHANGELOG.md b/filter-plugin/logstash-filter-dsql-guardium/CHANGELOG.md new file mode 100644 index 000000000..0d5653a06 --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/CHANGELOG.md @@ -0,0 +1,47 @@ +# Changelog + +All notable changes to the DSQL Guardium filter plugin will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [1.0.0] - 2024-01-01 + +### Added +- Initial release of DSQL Guardium filter plugin +- Support for AWS DSQL Database Activity Streams +- PostgreSQL-compatible SQL parsing (DSQL uses PostgreSQL protocol) +- SQS input integration for receiving audit events +- Parsing of successful SQL statements +- Parsing of SQL errors and exceptions +- Authentication failure detection +- Session tracking with client IP and port +- Database user and database name extraction +- Timestamp parsing from audit logs +- Client application identification +- Comprehensive test coverage +- Documentation and configuration examples + +### Features +- **Server Type**: POSTGRESQL +- **Data Protocol**: POSTGRESQL +- **Language**: PGRS (PostgreSQL) +- **Input Method**: AWS SQS +- **Supported Events**: + - Successful queries + - SQL errors + - Authentication failures + - Connection events + +### Known Limitations +- IPv6 is not supported +- Client hostname and OS user fields are not available in DSQL audit logs +- Multiline characters in queries may not be preserved when using SQS +- Single line comments in queries are not fully supported + +## [Unreleased] + +### Planned +- Support for additional DSQL-specific features as they become available +- Enhanced error handling and logging +- Performance optimizations for high-volume environments \ No newline at end of file diff --git a/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/DSQL/filter.conf b/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/DSQL/filter.conf new file mode 100644 index 000000000..bd5acca17 --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/DSQL/filter.conf @@ -0,0 +1,86 @@ +filter { + if [type] == "dsql" { + + # Parse the JSON message from SQS + json { + source => "message" + target => "parsed_message" + } + + # Extract fields from the parsed message to top level for easier access + if [parsed_message] { + mutate { + add_field => { + "databaseName" => "%{[parsed_message][databaseName]}" + "dbUserName" => "%{[parsed_message][dbUserName]}" + "remoteHost" => "%{[parsed_message][remoteHost]}" + "remotePort" => "%{[parsed_message][remotePort]}" + "sessionId" => "%{[parsed_message][sessionId]}" + "statementText" => "%{[parsed_message][statementText]}" + "exitCode" => "%{[parsed_message][exitCode]}" + "logTime" => "%{[parsed_message][logTime]}" + } + } + + # Add optional fields if they exist + if [parsed_message][clientApplication] { + mutate { + add_field => { "clientApplication" => "%{[parsed_message][clientApplication]}" } + } + } + + if [parsed_message][errorMessage] { + mutate { + add_field => { "errorMessage" => "%{[parsed_message][errorMessage]}" } + } + } + + if [parsed_message][startTime] { + mutate { + add_field => { "startTime" => "%{[parsed_message][startTime]}" } + } + } + + if [parsed_message][commandTag] { + mutate { + add_field => { "commandTag" => "%{[parsed_message][commandTag]}" } + } + } + + if [parsed_message][statementId] { + mutate { + add_field => { "statementId" => "%{[parsed_message][statementId]}" } + } + } + } + + # Apply the DSQL Guardium filter + dsql_guardium_plugin_filter {} + + # Clean up temporary fields + mutate { + remove_field => [ + "parsed_message", + "message", + "databaseName", + "dbUserName", + "remoteHost", + "remotePort", + "sessionId", + "statementText", + "exitCode", + "logTime", + "clientApplication", + "errorMessage", + "startTime", + "commandTag", + "statementId", + "type", + "@timestamp", + "@version" + ] + } + } +} + +# Made with Bob diff --git a/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/DSQL/manifest.json b/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/DSQL/manifest.json new file mode 100644 index 000000000..f2d5a2189 --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/DSQL/manifest.json @@ -0,0 +1,14 @@ +{ + "name": "Guardium_DSQL_filter", + "alias": "DSQL", + "type": "filter", + "pipeline_type":"pull", + "plugin_version": "1.0.0", + "datasourceTypes": [{"type":"DSQL","supportedVersions": [""]}], + "supported_input_plugins": ["SQS_input"], + "developer": "IBM", + "license": "Apache2.0", + "description": "Parses AWS DSQL Database Activity Stream events into Guardium.", + "configuration_notes": "Requires AWS SQS input plugin configured to receive DSQL Database Activity Stream events.", + "documentation_path": "https://github.com/IBM/universal-connectors/blob/main/filter-plugin/logstash-filter-dsql-guardium/README.md" +} \ No newline at end of file diff --git a/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/dsqlSQS.conf b/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/dsqlSQS.conf new file mode 100644 index 000000000..d3256791c --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/dsqlSQS.conf @@ -0,0 +1,129 @@ +#/* +#Copyright 2020-2021 IBM Inc. All rights reserved +#SPDX-License-Identifier: Apache-2.0 +#*/ + +input { + sqs { + # Insert the access key and secret that has access to SQS queue + access_key_id => "" + secret_access_key => "" + region => "" # Region that has the Queue, Default value: us-east-1 + queue => "" # This parameter simply holds the Queue name and not the URL + codec => "json" + type => "dsql" + # Insert the account id of the AWS account + add_field => {"account_id" => ""} + # Insert the Instance/Cluster name of the DSQL database that is to be monitored + add_field => {"instance_name" => ""} + # Optional: Specify a custom endpoint (e.g., proxy) + # endpoint => "https://proxy.company.com" + # Set to true to use AWS's bundled CA certificates for SSL/TLS connections + # use_aws_bundled_ca => false + # Optional: Provide additional settings (e.g., custom SSL certificate bundle) + # additional_settings => { + # ssl_ca_bundle => "/usr/share/logstash/third_party/" + # } + } +} + +filter { + if [type] == "dsql" { + + # Parse the JSON message from SQS + json { + source => "message" + target => "parsed_message" + } + + # Extract fields from the parsed message to top level for easier access + if [parsed_message] { + mutate { + add_field => { + "databaseName" => "%{[parsed_message][databaseName]}" + "dbUserName" => "%{[parsed_message][dbUserName]}" + "remoteHost" => "%{[parsed_message][remoteHost]}" + "remotePort" => "%{[parsed_message][remotePort]}" + "sessionId" => "%{[parsed_message][sessionId]}" + "statementText" => "%{[parsed_message][statementText]}" + "exitCode" => "%{[parsed_message][exitCode]}" + "logTime" => "%{[parsed_message][logTime]}" + } + } + + # Add optional fields if they exist + if [parsed_message][clientApplication] { + mutate { + add_field => { "clientApplication" => "%{[parsed_message][clientApplication]}" } + } + } + + if [parsed_message][errorMessage] { + mutate { + add_field => { "errorMessage" => "%{[parsed_message][errorMessage]}" } + } + } + + if [parsed_message][startTime] { + mutate { + add_field => { "startTime" => "%{[parsed_message][startTime]}" } + } + } + + if [parsed_message][commandTag] { + mutate { + add_field => { "commandTag" => "%{[parsed_message][commandTag]}" } + } + } + + if [parsed_message][statementId] { + mutate { + add_field => { "statementId" => "%{[parsed_message][statementId]}" } + } + } + } + + # Apply the DSQL Guardium filter + dsql_guardium_plugin_filter {} + + # Clean up temporary fields + mutate { + remove_field => [ + "parsed_message", + "message", + "databaseName", + "dbUserName", + "remoteHost", + "remotePort", + "sessionId", + "statementText", + "exitCode", + "logTime", + "clientApplication", + "errorMessage", + "startTime", + "commandTag", + "statementId", + "type", + "@timestamp", + "@version" + ] + } + } +} + +output { + # Send to Guardium + if [GuardRecord] { + guardium_connector { + # Guardium configuration + } + } + + # Optional: Output to stdout for debugging + # stdout { + # codec => rubydebug + # } +} + +# Made with Bob diff --git a/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/gi_templates.json b/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/gi_templates.json new file mode 100644 index 000000000..a8b708cdf --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/gi_templates.json @@ -0,0 +1,12 @@ +{ + "templates": [ + { + "name": "DSQL over SQS", + "description": "Template for AWS DSQL Database Activity Streams over SQS", + "type": "filter", + "datasource_type": "DSQL", + "input_type": "SQS", + "version": "1.0.0" + } + ] +} \ No newline at end of file diff --git a/filter-plugin/logstash-filter-dsql-guardium/LICENSE b/filter-plugin/logstash-filter-dsql-guardium/LICENSE new file mode 100644 index 000000000..eebb4fc0b --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/LICENSE @@ -0,0 +1,190 @@ +Apache License +Version 2.0, January 2004 +http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + +"License" shall mean the terms and conditions for use, reproduction, +and distribution as defined by Sections 1 through 9 of this document. + +"Licensor" shall mean the copyright owner or entity authorized by +the copyright owner that is granting the License. + +"Legal Entity" shall mean the union of the acting entity and all +other entities that control, are controlled by, or are under common +control with that entity. For the purposes of this definition, +"control" means (i) the power, direct or indirect, to cause the +direction or management of such entity, whether by contract or +otherwise, or (ii) ownership of fifty percent (50%) or more of the +outstanding shares, or (iii) beneficial ownership of such entity. + +"You" (or "Your") shall mean an individual or Legal Entity +exercising permissions granted by this License. + +"Source" form shall mean the preferred form for making modifications, +including but not limited to software source code, documentation +source, and configuration files. + +"Object" form shall mean any form resulting from mechanical +transformation or translation of a Source form, including but +not limited to compiled object code, generated documentation, +and conversions to other media types. + +"Work" shall mean the work of authorship, whether in Source or +Object form, made available under the License, as indicated by a +copyright notice that is included in or attached to the work +(an example is provided in the Appendix below). + +"Derivative Works" shall mean any work, whether in Source or Object +form, that is based on (or derived from) the Work and for which the +editorial revisions, annotations, elaborations, or other modifications +represent, as a whole, an original work of authorship. For the purposes +of this License, Derivative Works shall not include works that remain +separable from, or merely link (or bind by name) to the interfaces of, +the Work and Derivative Works thereof. + +"Contribution" shall mean any work of authorship, including +the original version of the Work and any modifications or additions +to that Work or Derivative Works thereof, that is intentionally +submitted to Licensor for inclusion in the Work by the copyright owner +or by an individual or Legal Entity authorized to submit on behalf of +the copyright owner. For the purposes of this definition, "submitted" +means any form of electronic, verbal, or written communication sent +to the Licensor or its representatives, including but not limited to +communication on electronic mailing lists, source code control systems, +and issue tracking systems that are managed by, or on behalf of, the +Licensor for the purpose of discussing and improving the Work, but +excluding communication that is conspicuously marked or otherwise +designated in writing by the copyright owner as "Not a Contribution." + +"Contributor" shall mean Licensor and any individual or Legal Entity +on behalf of whom a Contribution has been received by Licensor and +subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of +this License, each Contributor hereby grants to You a perpetual, +worldwide, non-exclusive, no-charge, royalty-free, irrevocable +copyright license to reproduce, prepare Derivative Works of, +publicly display, publicly perform, sublicense, and distribute the +Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of +this License, each Contributor hereby grants to You a perpetual, +worldwide, non-exclusive, no-charge, royalty-free, irrevocable +(except as stated in this section) patent license to make, have made, +use, offer to sell, sell, import, and otherwise transfer the Work, +where such license applies only to those patent claims licensable +by such Contributor that are necessarily infringed by their +Contribution(s) alone or by combination of their Contribution(s) +with the Work to which such Contribution(s) was submitted. If You +institute patent litigation against any entity (including a +cross-claim or counterclaim in a lawsuit) alleging that the Work +or a Contribution incorporated within the Work constitutes direct +or contributory patent infringement, then any patent licenses +granted to You under this License for that Work shall terminate +as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the +Work or Derivative Works thereof in any medium, with or without +modifications, and in Source or Object form, provided that You +meet the following conditions: + +(a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + +(b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + +(c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + +(d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + +You may add Your own copyright statement to Your modifications and +may provide additional or different license terms and conditions +for use, reproduction, or distribution of Your modifications, or +for any such Derivative Works as a whole, provided Your use, +reproduction, and distribution of the Work otherwise complies with +the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, +any Contribution intentionally submitted for inclusion in the Work +by You to the Licensor shall be under the terms and conditions of +this License, without any additional terms or conditions. +Notwithstanding the above, nothing herein shall supersede or modify +the terms of any separate license agreement you may have executed +with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade +names, trademarks, service marks, or product names of the Licensor, +except as required for reasonable and customary use in describing the +origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or +agreed to in writing, Licensor provides the Work (and each +Contributor provides its Contributions) on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied, including, without limitation, any warranties or conditions +of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A +PARTICULAR PURPOSE. You are solely responsible for determining the +appropriateness of using or redistributing the Work and assume any +risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, +whether in tort (including negligence), contract, or otherwise, +unless required by applicable law (such as deliberate and grossly +negligent acts) or agreed to in writing, shall any Contributor be +liable to You for damages, including any direct, indirect, special, +incidental, or consequential damages of any character arising as a +result of this License or out of the use or inability to use the +Work (including but not limited to damages for loss of goodwill, +work stoppage, computer failure or malfunction, or any and all +other commercial damages or losses), even if such Contributor +has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing +the Work or Derivative Works thereof, You may choose to offer, +and charge a fee for, acceptance of support, warranty, indemnity, +or other liability obligations and/or rights consistent with this +License. However, in accepting such obligations, You may act only +on Your own behalf and on Your sole responsibility, not on behalf +of any other Contributor, and only if You agree to indemnify, +defend, and hold each Contributor harmless for any liability +incurred by, or claims asserted against, such Contributor by reason +of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +Copyright 2020-2021 IBM Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. \ No newline at end of file diff --git a/filter-plugin/logstash-filter-dsql-guardium/README.md b/filter-plugin/logstash-filter-dsql-guardium/README.md new file mode 100644 index 000000000..e41f0ffe4 --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/README.md @@ -0,0 +1,236 @@ +## DSQL-Guardium Logstash filter plug-in + +### Meet DSQL +* Tested versions: 1.x +* Environment: AWS +* Supported inputs: SQS (pull) +* Supported Guardium versions: + * Guardium Data Protection: 11.4 and above + * Guardium Insights SaaS: 1.0 + +This is a [Logstash](https://github.com/elastic/logstash) filter plug-in for the universal connector that is featured in IBM Security Guardium. It parses events and messages from the AWS DSQL Database Activity Streams into a [Guardium record](https://github.com/IBM/universal-connectors/blob/main/common/src/main/java/com/ibm/guardium/universalconnector/commons/structures/Record.java) instance (which is a standard structure made out of several parts). The information is then sent over to Guardium. Guardium records include the accessor (the person who tried to access the data), the session, data, and exceptions. If there are no errors, the data contains details about the query "construct". The construct details the main action (verb) and collections (objects) involved. + +The plug-in is free and open-source (Apache 2.0). It can be used as a starting point to develop additional filter plug-ins for the Guardium universal connector. + +## 1. Overview + +AWS DSQL (Database SQL) is a distributed SQL database service that provides PostgreSQL-compatible database capabilities. This plugin processes audit logs from DSQL Database Activity Streams, which are delivered via Amazon SQS (Simple Queue Service). + +### Key Features +- Parses DSQL Database Activity Stream events in PostgreSQL format +- Supports both successful queries and error/exception events +- Captures detailed session information including client IP, port, and user details +- Handles authentication failures and SQL errors +- Compatible with AWS SQS input plugin + +## 2. Configuring DSQL Database Activity Streams + +### 2.1 Enable Database Activity Streams + +1. Navigate to the AWS RDS console +2. Select your DSQL cluster +3. Enable Database Activity Streams: + - Choose **Actions** → **Start activity stream** + - Select **Asynchronous** mode (recommended for production) + - Choose or create a KMS key for encryption + - Note the Kinesis Data Stream name created + +### 2.2 Configure SQS Queue + +1. Create an SQS queue to receive the activity stream events: + ```bash + aws sqs create-queue --queue-name dsql-activity-stream-queue + ``` + +2. Configure the Kinesis Data Stream to send events to SQS: + - Create a Lambda function to forward Kinesis events to SQS + - Or use Kinesis Data Firehose to deliver to SQS + +3. Set appropriate IAM permissions for the SQS queue + +### 2.3 Required IAM Permissions + +The AWS credentials used by Logstash must have the following permissions: + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "sqs:ReceiveMessage", + "sqs:DeleteMessage", + "sqs:GetQueueAttributes" + ], + "Resource": "arn:aws:sqs:REGION:ACCOUNT_ID:dsql-activity-stream-queue" + } + ] +} +``` + +## 3. Configuring the DSQL Filter in Guardium + +### 3.1 Logstash Configuration + +The Guardium universal connector is the Logstash runtime environment that comes with a Logstash service. The DSQL filter plug-in is a configuration file that defines the settings for the Logstash service. + +#### Procedure + +1. On the collector, go to **Setup** → **Tools and Views** → **Configure Universal Connector**. + +2. Enable the universal connector if it is disabled. + +3. Click **Upload File** and select the offline [dsql-offline-plugins-7.5.2.zip](./DSQLOverSQSPackage/DSQL/dsql-offline-plugins-7.5.2.zip) plug-in. After it is uploaded, click **OK**. + +4. Click the Plus sign to open the Connector Configuration dialog box. + +5. Type a name in the **Connector name** field. + +6. Update the input section to add details from your SQS queue: + +``` +input { + sqs { + access_key_id => "" + secret_access_key => "" + region => "" + queue => "" + codec => "json" + type => "dsql" + add_field => {"account_id" => ""} + add_field => {"instance_name" => ""} + } +} +``` + +7. The filter section is provided in the [filter.conf](./DSQLOverSQSPackage/DSQL/filter.conf) file. Include this section in your configuration. + +8. The output section should point to your Guardium collector: + +``` +output { + if [GuardRecord] { + guardium_connector { + guardium_ip => "" + guardium_port => + } + } +} +``` + +## 4. DSQL Audit Log Format + +The DSQL Database Activity Streams provide audit logs in JSON format with the following key fields: + +### 4.1 Successful Query Event +```json +{ + "type": "record", + "databaseName": "mydb", + "dbUserName": "postgres", + "remoteHost": "10.0.1.100", + "remotePort": "54321", + "sessionId": "session-12345", + "statementText": "SELECT * FROM users WHERE id = 1;", + "commandTag": "SELECT", + "exitCode": "0", + "logTime": "2023-11-10T10:15:30.123Z", + "clientApplication": "psql", + "statementId": "stmt-001" +} +``` + +### 4.2 Error Event +```json +{ + "type": "record", + "databaseName": "mydb", + "dbUserName": "postgres", + "remoteHost": "10.0.1.100", + "remotePort": "54321", + "sessionId": "session-12345", + "statementText": "SELECT * FROM nonexistent_table;", + "exitCode": "1", + "errorMessage": "relation \"nonexistent_table\" does not exist", + "logTime": "2023-11-10T10:15:30.123Z" +} +``` + +### 4.3 Authentication Failure Event +```json +{ + "type": "record", + "databaseName": "mydb", + "dbUserName": "baduser", + "remoteHost": "10.0.1.100", + "remotePort": "54321", + "exitCode": "1", + "errorMessage": "password authentication failed for user \"baduser\"", + "logTime": "2023-11-10T10:15:30.123Z" +} +``` + +## 5. Supported Fields + +The plugin extracts and maps the following fields from DSQL audit logs: + +| DSQL Field | Guardium Field | Description | +|------------|----------------|-------------| +| databaseName | dbName | Database name | +| dbUserName | accessor.dbUser | Database user | +| remoteHost | sessionLocator.clientIp | Client IP address | +| remotePort | sessionLocator.clientPort | Client port | +| sessionId | sessionId | Session identifier | +| statementText | data.originalSqlCommand | SQL statement | +| exitCode | - | Exit code (0=success, non-zero=error) | +| errorMessage | exception.description | Error description | +| logTime | time.timestamp | Event timestamp | +| clientApplication | accessor.sourceProgram | Client application name | + +## 6. Limitations + +- The DSQL plug-in does not support IPv6 +- Client hostname and OS user fields are not available in DSQL audit logs and are set as empty +- The plugin uses PostgreSQL as the server type and protocol since DSQL is PostgreSQL-compatible + +## 7. Troubleshooting + +### 7.1 No Events Received + +**Problem**: Logstash is not receiving events from SQS. + +**Solution**: +- Verify SQS queue name and region are correct +- Check AWS credentials have proper permissions +- Ensure Database Activity Streams are enabled on DSQL cluster +- Verify the Kinesis-to-SQS forwarding is configured correctly + +### 7.2 Parsing Errors + +**Problem**: Events are received but not parsed correctly. + +**Solution**: +- Check the Logstash logs for parsing errors +- Verify the JSON format matches the expected DSQL audit log format +- Ensure the `codec => "json"` is set in the SQS input configuration + +### 7.3 Missing Fields + +**Problem**: Some fields are not appearing in Guardium reports. + +**Solution**: +- Verify all required fields are present in the DSQL audit logs +- Check that the filter configuration includes all field mappings +- Review the Constants.java file for field name mappings + +## 8. Additional Resources + +- [AWS DSQL Documentation](https://docs.aws.amazon.com/dsql/) +- [AWS RDS Database Activity Streams](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/DBActivityStreams.html) +- [Guardium Universal Connector Documentation](https://github.com/IBM/universal-connectors) +- [Logstash SQS Input Plugin](https://www.elastic.co/guide/en/logstash/current/plugins-inputs-sqs.html) + +## 9. License + +This plugin is licensed under the Apache License 2.0. See the [LICENSE](./LICENSE) file for details. \ No newline at end of file diff --git a/filter-plugin/logstash-filter-dsql-guardium/VERSION b/filter-plugin/logstash-filter-dsql-guardium/VERSION new file mode 100644 index 000000000..afaf360d3 --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/VERSION @@ -0,0 +1 @@ +1.0.0 \ No newline at end of file diff --git a/filter-plugin/logstash-filter-dsql-guardium/build.gradle b/filter-plugin/logstash-filter-dsql-guardium/build.gradle new file mode 100644 index 000000000..63d51ddf3 --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/build.gradle @@ -0,0 +1,193 @@ +import java.nio.file.Files +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING + +apply plugin: 'java' + +buildscript { + repositories { + maven { + url "https://plugins.gradle.org/m2/" + } + mavenCentral() + jcenter() + } + + dependencies { + classpath 'com.github.johnrengelman.shadow:com.github.johnrengelman.shadow.gradle.plugin:8.1.1' + classpath "org.barfuin.gradle.jacocolog:gradle-jacoco-log:4.0.1" + classpath group: 'org.yaml', name: 'snakeyaml', version: '2.2' + } +} + +def universalConnectorsDir=project.projectDir.parentFile?.parentFile.toString(); +def versions = new org.yaml.snakeyaml.Yaml().load( new File("${universalConnectorsDir}/versions.yml").newInputStream() ) +gradle.ext.versions = new org.yaml.snakeyaml.Yaml().load( new File(LOGSTASH_CORE_PATH + "/../versions.yml").newInputStream() ) + +apply from: LOGSTASH_CORE_PATH + "/../rubyUtils.gradle" + +apply plugin: 'jacoco' +apply plugin: 'com.github.johnrengelman.shadow' + +// =========================================================================== +group = 'com.ibm.guardium.dsql' +version = file("VERSION").text.trim() +description = "DSQL-Guardium filter plugin" + +// =========================================================================== +pluginInfo.licenses = ['Apache-2.0'] +pluginInfo.longDescription = "This gem is a Logstash DSQL filter plugin required to be installed as part of IBM Security Guardium, Guardium Universal connector configuration. This gem is not a stand-alone program." +pluginInfo.authors = ['IBM'] +pluginInfo.email = [''] +pluginInfo.homepage = "https://github.com/IBM/universal-connectors" +pluginInfo.pluginType = "filter" +pluginInfo.pluginClass = "DSQLGuardiumPluginFilter" +pluginInfo.pluginName = "dsql_guardium_plugin_filter" + +sourceCompatibility = JavaVersion.VERSION_17 +targetCompatibility = JavaVersion.VERSION_17 + +def jacocoVersion = '0.8.4' +def minimumCoverageStr = System.getenv("MINIMUM_COVERAGE") ?: "50.0%" +if (minimumCoverageStr.endsWith("%")) { + minimumCoverageStr = minimumCoverageStr[0..-2] +} +def minimumCoverage = Float.valueOf(minimumCoverageStr) / 100 + +buildscript { + repositories { + maven { url "https://plugins.gradle.org/m2/" } + mavenCentral() + jcenter() + } + dependencies { + classpath 'com.github.johnrengelman.shadow:com.github.johnrengelman.shadow.gradle.plugin:8.1.1' + classpath "org.barfuin.gradle.jacocolog:gradle-jacoco-log:4.0.1" + } +} + +repositories { + mavenCentral() +} + +dependencies { + implementation 'commons-validator:commons-validator:1.7' + implementation 'org.apache.logging.log4j:log4j-core:2.17.1' + implementation 'org.apache.commons:commons-lang3:3.7' + implementation 'com.google.code.gson:gson:2.8.9' + implementation 'commons-beanutils:commons-beanutils:1.11.0' + implementation fileTree(dir: LOGSTASH_CORE_PATH, include: "build/libs/logstash-core.jar") + implementation fileTree(dir: GUARDIUM_UNIVERSALCONNECTOR_COMMONS_PATH, include: "common-*.*.*.jar") + + // ✅ JUnit 4 only + testImplementation 'junit:junit:4.13.2' + testImplementation 'org.mockito:mockito-core:5.17.0' + testImplementation 'org.jruby:jruby-complete:9.2.7.0' + testImplementation fileTree(dir: GUARDIUM_UNIVERSALCONNECTOR_COMMONS_PATH, include: "common-*.*.*.jar") +} + +test { + useJUnit() // ✅ Make sure JUnit 4 is used explicitly + testLogging { + events "passed", "skipped", "failed" + } +} + +tasks.register("vendor") { + dependsOn shadowJar + doLast { + String vendorPathPrefix = "vendor/jar-dependencies" + String projectGroupPath = project.group.replaceAll('\\.', '/') + File projectJarFile = file("${vendorPathPrefix}/${projectGroupPath}/${pluginInfo.pluginFullName()}/${project.version}/${pluginInfo.pluginFullName()}-${project.version}.jar") + projectJarFile.mkdirs() + Files.copy(file("$buildDir/libs/${project.name}-${project.version}.jar").toPath(), projectJarFile.toPath(), REPLACE_EXISTING) + validatePluginJar(projectJarFile, project.group) + } +} + +shadowJar { + archiveClassifier = null +} + +clean { + delete "${projectDir}/Gemfile" + delete "${projectDir}/${pluginInfo.pluginFullName()}.gemspec" + delete "${projectDir}/lib/" + delete "${projectDir}/vendor/" + new FileNameFinder().getFileNames(projectDir.toString(), "${pluginInfo.pluginFullName()}-*.*.*.gem").each { + delete it + } +} + +tasks.withType(JavaCompile) { + options.encoding = 'UTF-8' +} + +tasks.register("generateRubySupportFiles") { + doLast { + generateRubySupportFilesForPlugin(project.description, project.group, version) + } +} + +tasks.register("removeObsoleteJars") { + doLast { + new FileNameFinder().getFileNames( + projectDir.toString(), + "vendor/**/${pluginInfo.pluginFullName()}*.jar", + "vendor/**/${pluginInfo.pluginFullName()}-${version}.jar" + ).each { delete it } + } +} + +tasks.register("gem") { + dependsOn = [downloadAndInstallJRuby, removeObsoleteJars, vendor, generateRubySupportFiles] + doLast { + buildGem(projectDir, buildDir, "${pluginInfo.pluginFullName()}.gemspec") + } +} + +// ✅ JaCoCo Setup +jacoco { + toolVersion = jacocoVersion +} + +jacocoTestReport { + reports { + html.required = true + xml.required = true + csv.required = true + html.destination file("${buildDir}/reports/jacoco") + csv.destination file("${buildDir}/reports/jacoco/all.csv") + } + executionData.from fileTree(dir: "${buildDir}/jacoco/", includes: ['**/*.exec']) + + afterEvaluate { + classDirectories.setFrom(files(classDirectories.files.collect { + fileTree(dir: it, exclude: []) + })) + } + + doLast { + println "Report -> file://${buildDir}/reports/jacoco/index.html" + } +} + +jacocoTestCoverageVerification { + violationRules { + rule { + limit { + minimum = minimumCoverage + } + } + } + + executionData.from fileTree(dir: "${buildDir}/jacoco/", includes: ['**/*.exec']) + + afterEvaluate { + classDirectories.setFrom(files(classDirectories.files.collect { + fileTree(dir: it, exclude: []) + })) + } +} + +test.finalizedBy jacocoTestReport +check.dependsOn jacocoTestCoverageVerification, jacocoTestReport \ No newline at end of file diff --git a/filter-plugin/logstash-filter-dsql-guardium/gradle/wrapper/gradle-wrapper.jar b/filter-plugin/logstash-filter-dsql-guardium/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 000000000..7454180f2 Binary files /dev/null and b/filter-plugin/logstash-filter-dsql-guardium/gradle/wrapper/gradle-wrapper.jar differ diff --git a/filter-plugin/logstash-filter-dsql-guardium/gradle/wrapper/gradle-wrapper.properties b/filter-plugin/logstash-filter-dsql-guardium/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 000000000..81aa1c044 --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,5 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.11.1-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/filter-plugin/logstash-filter-dsql-guardium/gradlew b/filter-plugin/logstash-filter-dsql-guardium/gradlew new file mode 100755 index 000000000..cccdd3d51 --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/gradlew @@ -0,0 +1,172 @@ +#!/usr/bin/env sh + +############################################################################## +## +## Gradle start up script for UN*X +## +############################################################################## + +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS="" + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn () { + echo "$*" +} + +die () { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MINGW* ) + msys=true + ;; + NONSTOP* ) + nonstop=true + ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD="java" + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then + MAX_FD_LIMIT=`ulimit -H -n` + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n $MAX_FD + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" + fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi +fi + +# For Darwin, add options to specify how the application appears in the dock +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" +fi + +# For Cygwin, switch paths to Windows format before running java +if $cygwin ; then + APP_HOME=`cygpath --path --mixed "$APP_HOME"` + CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` + + # We build the pattern for arguments to be converted via cygpath + ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` + SEP="" + for dir in $ROOTDIRSRAW ; do + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" + done + OURCYGPATTERN="(^($ROOTDIRS))" + # Add a user-defined pattern to the cygpath arguments + if [ "$GRADLE_CYGPATTERN" != "" ] ; then + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + fi + # Now convert the arguments - kludge to limit ourselves to /bin/sh + i=0 + for arg in "$@" ; do + CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` + CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option + + if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition + eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` + else + eval `echo args$i`="\"$arg\"" + fi + i=$((i+1)) + done + case $i in + (0) set -- ;; + (1) set -- "$args0" ;; + (2) set -- "$args0" "$args1" ;; + (3) set -- "$args0" "$args1" "$args2" ;; + (4) set -- "$args0" "$args1" "$args2" "$args3" ;; + (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + esac +fi + +# Escape application args +save () { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " +} +APP_ARGS=$(save "$@") + +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" + +# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong +if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then + cd "$(dirname "$0")" +fi + +exec "$JAVACMD" "$@" diff --git a/filter-plugin/logstash-filter-dsql-guardium/gradlew.bat b/filter-plugin/logstash-filter-dsql-guardium/gradlew.bat new file mode 100755 index 000000000..f9553162f --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/gradlew.bat @@ -0,0 +1,84 @@ +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS= + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto init + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto init + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:init +@rem Get command-line arguments, handling Windows variants + +if not "%OS%" == "Windows_NT" goto win9xME_args + +:win9xME_args +@rem Slurp the command line arguments. +set CMD_LINE_ARGS= +set _SKIP=2 + +:win9xME_args_slurp +if "x%~1" == "x" goto execute + +set CMD_LINE_ARGS=%* + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/filter-plugin/logstash-filter-dsql-guardium/src/main/java/com/ibm/guardium/dsql/Constants.java b/filter-plugin/logstash-filter-dsql-guardium/src/main/java/com/ibm/guardium/dsql/Constants.java new file mode 100644 index 000000000..3339484fb --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/src/main/java/com/ibm/guardium/dsql/Constants.java @@ -0,0 +1,97 @@ +/* +#Copyright 2020-2021 IBM Inc. All rights reserved +#SPDX-License-Identifier: Apache-2.0 +#*/ +package com.ibm.guardium.dsql; + +public interface Constants { + + public static final String TYPE = "type"; + + public static final String CLASS = "class"; + + public static final String CLIENT_APPLICATION = "clientApplication"; + + public static final String DATABASE_NAME = "databaseName"; + + public static final String DB_PROTOCOL = "dbProtocol"; + + public static final String DB_USER_NAME = "dbUserName"; + + public static final String ERROR_MESSAGE = "errorMessage"; + + public static final String EXIT_CODE = "exitCode"; + + public static final String OBJECT_TYPE = "objectType"; + + public static final String OBJECT_NAME = "objectName"; + + public static final String PID = "pid"; + + public static final String REMOTE_HOST = "remoteHost"; + + public static final String REMOTE_PORT = "remotePort"; + + public static final String SERVER_HOST = "serverHost"; + + public static final String SERVER_VERSION = "serverVersion"; + + public static final String SESSION_ID = "sessionId"; + + public static final String START_TIME = "startTime"; + + public static final String STATEMENT_ID = "statementId"; + + public static final String STATEMENT_TEXT = "statementText"; + + public static final String SUBSTATEMENT_ID = "substatementId"; + + public static final String COMMAND_TAG = "commandTag"; + + public static final String PARAMETER_LIST = "parameterList"; + + public static final String LOG_TIME = "logTime"; + + public static final String ENGINE_NATIVE_AUDIT_FIELDS = "engineNativeAuditFields"; + + public static final String DEFAULT_IP = "0.0.0.0"; + + public static final int DEFAULT_PORT = -1; + + public static final String UNKNOWN_STRING = ""; + + public static final String SERVER_TYPE_STRING = "POSTGRESQL"; + + public static final String DATA_PROTOCOL_STRING = "POSTGRESQL"; + + public static final String LANGUAGE = "PGRS"; + + public static final String SQL_ERROR = "SQL_ERROR"; + + public static final String LOGIN_FAILED = "LOGIN_FAILED"; + + public static final String COMM_PROTOCOL = "AWSApiCall"; + + public static final String APP_USER_NAME = "AWSService"; + + public static final String NA = "N.A."; + + public static final String ACCOUNT_ID = "account_id"; + + public static final String INSTANCE_NAME = "instance_name"; + + public static final String SERVER_HOST_NAME = "server_hostname"; + + public static final String MESSAGE = "message"; + + public static final String RECORDS = "Records"; + + public static final String BODY = "body"; + + public static final String LOG_EVENTS = "logEvents"; + + public static final String EXIT_CODE_SUCCESS = "0"; + +} + +// Made with Bob diff --git a/filter-plugin/logstash-filter-dsql-guardium/src/main/java/com/ibm/guardium/dsql/DSQLGuardiumPluginFilter.java b/filter-plugin/logstash-filter-dsql-guardium/src/main/java/com/ibm/guardium/dsql/DSQLGuardiumPluginFilter.java new file mode 100644 index 000000000..a0bd4154b --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/src/main/java/com/ibm/guardium/dsql/DSQLGuardiumPluginFilter.java @@ -0,0 +1,113 @@ +/* +#Copyright 2020-2021 IBM Inc. All rights reserved +#SPDX-License-Identifier: Apache-2.0 +#*/ +package com.ibm.guardium.dsql; + +import java.io.File; +import java.util.Collection; +import java.util.Collections; + + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LoggerContext; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.ibm.guardium.universalconnector.commons.GuardConstants; +import com.ibm.guardium.universalconnector.commons.structures.Record; + +import co.elastic.logstash.api.Configuration; +import co.elastic.logstash.api.Context; +import co.elastic.logstash.api.Event; +import co.elastic.logstash.api.Filter; +import co.elastic.logstash.api.FilterMatchListener; +import co.elastic.logstash.api.LogstashPlugin; +import co.elastic.logstash.api.PluginConfigSpec; + +//class name must match plugin name +@LogstashPlugin(name = "dsql_guardium_plugin_filter") + +public class DSQLGuardiumPluginFilter implements Filter { + + public static final String LOG42_CONF = "log4j2uc.properties"; + + static { + try { + String uc_etc = System.getenv("UC_ETC"); + + LoggerContext context = (LoggerContext) LogManager.getContext(false); + + File file = new File(uc_etc + File.separator + LOG42_CONF); + + context.setConfigLocation(file.toURI()); + + } catch (Exception e) { + + System.err.println("Failed to load log4j configuration " + e.getMessage()); + + e.printStackTrace(); + } + } + + private String id; + + public static final PluginConfigSpec SOURCE_CONFIG = PluginConfigSpec.stringSetting("source", "message"); + + private static Logger log = LogManager.getLogger(DSQLGuardiumPluginFilter.class); + + public DSQLGuardiumPluginFilter(String id, Configuration config, Context context) { + + // constructors should validate configuration options + + this.id = id; + } + + @Override + + public Collection filter(Collection events, FilterMatchListener matchListener) { + + for (Event e : events) { + if(null != e && null != e.getData()){ + try { + log.debug("Event Now: {} " + e.getData()); + + Record record = Parser.parseRecord(e); + + final GsonBuilder builder = new GsonBuilder(); + + builder.serializeNulls(); + + final Gson gson = builder.create(); + + e.setField(GuardConstants.GUARDIUM_RECORD_FIELD_NAME, gson.toJson(record)); + + matchListener.filterMatched(e); + + } catch (Exception exception) { + log.error("Failed to parse DSQL event: {}", exception.getMessage(), exception); + log.warn("Failed event data: {}" , e.getData()); + } + } + + } + return events; + } + + @Override + + public Collection> configSchema() { + + // should return a list of all configuration options for this plugin + + return Collections.singletonList(SOURCE_CONFIG); + } + + @Override + public String getId() { + return this.id; + } +} + +// Made with Bob diff --git a/filter-plugin/logstash-filter-dsql-guardium/src/main/java/com/ibm/guardium/dsql/Parser.java b/filter-plugin/logstash-filter-dsql-guardium/src/main/java/com/ibm/guardium/dsql/Parser.java new file mode 100644 index 000000000..d13e2abf2 --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/src/main/java/com/ibm/guardium/dsql/Parser.java @@ -0,0 +1,324 @@ +/* +#Copyright 2020-2021 IBM Inc. All rights reserved +#SPDX-License-Identifier: Apache-2.0 +#*/ +package com.ibm.guardium.dsql; + +import co.elastic.logstash.api.Event; +import com.ibm.guardium.universalconnector.commons.structures.Accessor; +import com.ibm.guardium.universalconnector.commons.structures.Data; +import com.ibm.guardium.universalconnector.commons.structures.ExceptionRecord; +import com.ibm.guardium.universalconnector.commons.structures.Record; +import com.ibm.guardium.universalconnector.commons.structures.SessionLocator; +import com.ibm.guardium.universalconnector.commons.structures.Time; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.text.ParseException; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class Parser { + + private static Logger log = LogManager.getLogger(Parser.class); + + /** + * Parse the DSQL Database Activity Stream event into a Guardium Record + * The event is expected to have already been parsed by Logstash's json filter + * + * @param event The Logstash event containing parsed DSQL audit log data + * @return Record object containing parsed audit information + * @throws ParseException if parsing fails + */ + public static Record parseRecord(final Event event) throws ParseException { + Record record = new Record(); + + try { + // The message should already be parsed as a Map by Logstash's json filter + // Access the parsed fields directly from the event + Map eventData = event.getData(); + + if (eventData == null || eventData.isEmpty()) { + throw new ParseException("Event data is null or empty", 0); + } + + // Set basic record fields + record.setAppUserName(Constants.APP_USER_NAME); + record.setAccessor(parseAccessor(event)); + record.setSessionLocator(parseSessionLocator(event)); + record.setDbName(parseDbName(event)); + record.setTime(parseTimestamp(event)); + + // Set session ID + Object sessionIdObj = event.getField(Constants.SESSION_ID); + if (sessionIdObj != null) { + record.setSessionId(sessionIdObj.toString()); + } else { + record.setSessionId(Constants.UNKNOWN_STRING); + } + + // Parse SQL statement or exception based on exit code + Object exitCodeObj = event.getField(Constants.EXIT_CODE); + String exitCode = exitCodeObj != null ? exitCodeObj.toString() : null; + + if (exitCode != null && !exitCode.equals(Constants.EXIT_CODE_SUCCESS)) { + // This is an error/exception + record.setException(parseException(event)); + } else { + // This is a successful SQL statement + Object statementText = event.getField(Constants.STATEMENT_TEXT); + if (statementText != null && !statementText.toString().isEmpty()) { + record.setData(parseData(event)); + } + } + + } catch (Exception e) { + log.error("Error parsing DSQL record: {}", e.getMessage(), e); + throw new ParseException("Failed to parse DSQL record: " + e.getMessage(), 0); + } + + return record; + } + + /** + * Parse accessor information from the audit event + */ + private static Accessor parseAccessor(Event event) { + Accessor accessor = new Accessor(); + + // Set database user + Object dbUserObj = event.getField(Constants.DB_USER_NAME); + if (dbUserObj != null && !dbUserObj.toString().isEmpty()) { + accessor.setDbUser(dbUserObj.toString()); + } else { + accessor.setDbUser(Constants.UNKNOWN_STRING); + } + + // Set server type and protocol + accessor.setServerType(Constants.SERVER_TYPE_STRING); + accessor.setDbProtocol(Constants.DATA_PROTOCOL_STRING); + accessor.setLanguage(Constants.LANGUAGE); + accessor.setCommProtocol(Constants.COMM_PROTOCOL); + + // Set server hostname + String accountId = getAccountId(event); + String instanceName = getInstanceName(event); + String serverHostName = accountId + ":" + instanceName; + accessor.setServerHostName(serverHostName); + + // Set service name (same as dbName) + String dbName = parseDbName(event); + accessor.setServiceName(dbName); + + // Set client application if available + Object clientAppObj = event.getField(Constants.CLIENT_APPLICATION); + if (clientAppObj != null && !clientAppObj.toString().isEmpty()) { + accessor.setSourceProgram(clientAppObj.toString()); + } else { + accessor.setSourceProgram(Constants.UNKNOWN_STRING); + } + + // Set other fields + accessor.setServerOs(Constants.UNKNOWN_STRING); + accessor.setClientOs(Constants.UNKNOWN_STRING); + accessor.setClientHostName(Constants.UNKNOWN_STRING); + accessor.setDbProtocolVersion(Constants.UNKNOWN_STRING); + accessor.setOsUser(Constants.UNKNOWN_STRING); + accessor.setClientMac(Constants.UNKNOWN_STRING); + accessor.setServerDescription(Constants.UNKNOWN_STRING); + accessor.setDataType(Accessor.DATA_TYPE_GUARDIUM_SHOULD_PARSE_SQL); + + return accessor; + } + + /** + * Parse session locator information + */ + private static SessionLocator parseSessionLocator(Event event) { + SessionLocator sessionLocator = new SessionLocator(); + + // Parse client IP and port + String clientIp = Constants.DEFAULT_IP; + int clientPort = Constants.DEFAULT_PORT; + + Object remoteHostObj = event.getField(Constants.REMOTE_HOST); + if (remoteHostObj != null && !remoteHostObj.toString().isEmpty()) { + clientIp = remoteHostObj.toString(); + } + + Object remotePortObj = event.getField(Constants.REMOTE_PORT); + if (remotePortObj != null) { + try { + clientPort = Integer.parseInt(remotePortObj.toString()); + } catch (NumberFormatException e) { + log.warn("Failed to parse remote port: {}", remotePortObj); + } + } + + sessionLocator.setClientIp(clientIp); + sessionLocator.setClientPort(clientPort); + + // Set server IP (using account:instance:dbname format) + String accountId = getAccountId(event); + String instanceName = getInstanceName(event); + + Object dbNameObj = event.getField(Constants.DATABASE_NAME); + String dbName = dbNameObj != null ? dbNameObj.toString() : Constants.UNKNOWN_STRING; + + String serverIp = accountId + ":" + instanceName + ":" + dbName; + sessionLocator.setServerIp(serverIp); + sessionLocator.setServerPort(Constants.DEFAULT_PORT); + + // Set IPv6 fields + sessionLocator.setIpv6(false); + sessionLocator.setClientIpv6(Constants.UNKNOWN_STRING); + sessionLocator.setServerIpv6(Constants.UNKNOWN_STRING); + + return sessionLocator; + } + + /** + * Parse database name + */ + private static String parseDbName(Event event) { + String accountId = getAccountId(event); + String instanceName = getInstanceName(event); + String dbName = Constants.UNKNOWN_STRING; + + Object dbNameObj = event.getField(Constants.DATABASE_NAME); + if (dbNameObj != null && !dbNameObj.toString().isEmpty()) { + dbName = dbNameObj.toString(); + } + + if (dbName.isEmpty() || dbName.equals(Constants.UNKNOWN_STRING)) { + return Constants.NA; + } + + return accountId + ":" + instanceName + ":" + dbName; + } + + /** + * Parse timestamp from the audit event + */ + private static Time parseTimestamp(Event event) { + long millis = 0; + + try { + // Try logTime first, then startTime + Object timeField = event.getField(Constants.LOG_TIME); + + if (timeField == null) { + timeField = event.getField(Constants.START_TIME); + } + + if (timeField != null) { + String dateString = timeField.toString(); + + // Parse ISO 8601 format timestamp + ZonedDateTime parsedTime = ZonedDateTime.parse(dateString, DateTimeFormatter.ISO_DATE_TIME); + millis = parsedTime.toInstant().toEpochMilli(); + } + } catch (Exception e) { + log.error("Failed to parse timestamp: {}", e.getMessage(), e); + } + + return new Time(millis, 0, 0); + } + + /** + * Parse SQL data from successful statements + */ + private static Data parseData(Event event) { + Data data = new Data(); + + Object statementTextObj = event.getField(Constants.STATEMENT_TEXT); + if (statementTextObj != null && !statementTextObj.toString().isEmpty()) { + String sqlText = statementTextObj.toString(); + data.setOriginalSqlCommand(sqlText); + } else { + data.setOriginalSqlCommand(Constants.UNKNOWN_STRING); + } + + return data; + } + + /** + * Parse exception information from failed statements + */ + private static ExceptionRecord parseException(Event event) { + ExceptionRecord exception = new ExceptionRecord(); + + // Get error message + Object errorMessageObj = event.getField(Constants.ERROR_MESSAGE); + String errorMessage = errorMessageObj != null ? + errorMessageObj.toString() : Constants.UNKNOWN_STRING; + + // Determine exception type based on error + if (errorMessage.toLowerCase().contains("authentication") || + errorMessage.toLowerCase().contains("login") || + errorMessage.toLowerCase().contains("password")) { + exception.setExceptionTypeId(Constants.LOGIN_FAILED); + } else { + exception.setExceptionTypeId(Constants.SQL_ERROR); + } + + exception.setDescription(errorMessage); + + // Set SQL string if available + Object statementTextObj = event.getField(Constants.STATEMENT_TEXT); + if (statementTextObj != null && !statementTextObj.toString().isEmpty()) { + exception.setSqlString(statementTextObj.toString()); + } else { + exception.setSqlString(Constants.UNKNOWN_STRING); + } + + return exception; + } + + /** + * Get account ID from event + */ + private static String getAccountId(Event event) { + String accountId = Constants.UNKNOWN_STRING; + + Object accountIdObj = event.getField(Constants.ACCOUNT_ID); + if (accountIdObj instanceof String) { + accountId = accountIdObj.toString(); + } else if (accountIdObj instanceof List) { + List rawList = (List) accountIdObj; + List arrayList = new ArrayList<>(rawList); + + if (!arrayList.isEmpty()) { + accountId = String.valueOf(arrayList.get(0)); + } + } + + return accountId; + } + + /** + * Get instance name from event + */ + private static String getInstanceName(Event event) { + String instanceName = Constants.UNKNOWN_STRING; + + Object instanceNameObj = event.getField(Constants.INSTANCE_NAME); + if (instanceNameObj instanceof String) { + instanceName = instanceNameObj.toString(); + } else if (instanceNameObj instanceof List) { + List rawList = (List) instanceNameObj; + List arrayList = new ArrayList<>(rawList); + + if (!arrayList.isEmpty()) { + instanceName = String.valueOf(arrayList.get(0)); + } + } + + return instanceName; + } +} + +// Made with Bob diff --git a/filter-plugin/logstash-filter-dsql-guardium/src/test/java/com/ibm/guardium/dsql/DSQLGuardiumPluginFilterTest.java b/filter-plugin/logstash-filter-dsql-guardium/src/test/java/com/ibm/guardium/dsql/DSQLGuardiumPluginFilterTest.java new file mode 100644 index 000000000..6d85e39d8 --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/src/test/java/com/ibm/guardium/dsql/DSQLGuardiumPluginFilterTest.java @@ -0,0 +1,221 @@ +package com.ibm.guardium.dsql; + +import co.elastic.logstash.api.Event; +import com.ibm.guardium.universalconnector.commons.structures.Record; +import org.junit.Test; + +import java.text.ParseException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +public class DSQLGuardiumPluginFilterTest { + + @Test + public void testParseRecordWithSuccessfulStatement() throws ParseException { + Event mockEvent = mock(Event.class); + + // Mock event data with DSQL audit log fields + Map eventData = new HashMap<>(); + eventData.put(Constants.TYPE, "record"); + eventData.put(Constants.DATABASE_NAME, "testdb"); + eventData.put(Constants.DB_USER_NAME, "testuser"); + eventData.put(Constants.REMOTE_HOST, "10.0.0.1"); + eventData.put(Constants.REMOTE_PORT, "5432"); + eventData.put(Constants.SESSION_ID, "session123"); + eventData.put(Constants.STATEMENT_TEXT, "SELECT * FROM users WHERE id = 1;"); + eventData.put(Constants.EXIT_CODE, "0"); + eventData.put(Constants.LOG_TIME, "2023-11-10T10:15:30Z"); + eventData.put(Constants.CLIENT_APPLICATION, "psql"); + + when(mockEvent.getData()).thenReturn(eventData); + when(mockEvent.getField(Constants.DATABASE_NAME)).thenReturn("testdb"); + when(mockEvent.getField(Constants.DB_USER_NAME)).thenReturn("testuser"); + when(mockEvent.getField(Constants.REMOTE_HOST)).thenReturn("10.0.0.1"); + when(mockEvent.getField(Constants.REMOTE_PORT)).thenReturn("5432"); + when(mockEvent.getField(Constants.SESSION_ID)).thenReturn("session123"); + when(mockEvent.getField(Constants.STATEMENT_TEXT)).thenReturn("SELECT * FROM users WHERE id = 1;"); + when(mockEvent.getField(Constants.EXIT_CODE)).thenReturn("0"); + when(mockEvent.getField(Constants.LOG_TIME)).thenReturn("2023-11-10T10:15:30Z"); + when(mockEvent.getField(Constants.CLIENT_APPLICATION)).thenReturn("psql"); + when(mockEvent.getField(Constants.ACCOUNT_ID)).thenReturn("123456789012"); + when(mockEvent.getField(Constants.INSTANCE_NAME)).thenReturn("dsql-cluster-1"); + + Record record = Parser.parseRecord(mockEvent); + + assertNotNull(record); + assertNotNull(record.getData()); + assertEquals("SELECT * FROM users WHERE id = 1;", record.getData().getOriginalSqlCommand()); + assertEquals("testuser", record.getAccessor().getDbUser()); + assertEquals("10.0.0.1", record.getSessionLocator().getClientIp()); + assertEquals(5432, record.getSessionLocator().getClientPort()); + assertEquals("session123", record.getSessionId()); + assertEquals(Constants.SERVER_TYPE_STRING, record.getAccessor().getServerType()); + assertEquals(Constants.DATA_PROTOCOL_STRING, record.getAccessor().getDbProtocol()); + assertEquals(Constants.LANGUAGE, record.getAccessor().getLanguage()); + assertEquals("psql", record.getAccessor().getSourceProgram()); + } + + @Test + public void testParseRecordWithError() throws ParseException { + Event mockEvent = mock(Event.class); + + Map eventData = new HashMap<>(); + eventData.put(Constants.TYPE, "record"); + eventData.put(Constants.DATABASE_NAME, "testdb"); + eventData.put(Constants.DB_USER_NAME, "testuser"); + eventData.put(Constants.REMOTE_HOST, "10.0.0.1"); + eventData.put(Constants.REMOTE_PORT, "5432"); + eventData.put(Constants.SESSION_ID, "session123"); + eventData.put(Constants.STATEMENT_TEXT, "SELECT * FROM nonexistent_table;"); + eventData.put(Constants.EXIT_CODE, "1"); + eventData.put(Constants.ERROR_MESSAGE, "relation \"nonexistent_table\" does not exist"); + eventData.put(Constants.LOG_TIME, "2023-11-10T10:15:30Z"); + + when(mockEvent.getData()).thenReturn(eventData); + when(mockEvent.getField(Constants.DATABASE_NAME)).thenReturn("testdb"); + when(mockEvent.getField(Constants.DB_USER_NAME)).thenReturn("testuser"); + when(mockEvent.getField(Constants.REMOTE_HOST)).thenReturn("10.0.0.1"); + when(mockEvent.getField(Constants.REMOTE_PORT)).thenReturn("5432"); + when(mockEvent.getField(Constants.SESSION_ID)).thenReturn("session123"); + when(mockEvent.getField(Constants.STATEMENT_TEXT)).thenReturn("SELECT * FROM nonexistent_table;"); + when(mockEvent.getField(Constants.EXIT_CODE)).thenReturn("1"); + when(mockEvent.getField(Constants.ERROR_MESSAGE)).thenReturn("relation \"nonexistent_table\" does not exist"); + when(mockEvent.getField(Constants.LOG_TIME)).thenReturn("2023-11-10T10:15:30Z"); + when(mockEvent.getField(Constants.ACCOUNT_ID)).thenReturn("123456789012"); + when(mockEvent.getField(Constants.INSTANCE_NAME)).thenReturn("dsql-cluster-1"); + + Record record = Parser.parseRecord(mockEvent); + + assertNotNull(record); + assertNotNull(record.getException()); + assertEquals(Constants.SQL_ERROR, record.getException().getExceptionTypeId()); + assertEquals("relation \"nonexistent_table\" does not exist", record.getException().getDescription()); + assertEquals("SELECT * FROM nonexistent_table;", record.getException().getSqlString()); + } + + @Test + public void testParseRecordWithLoginFailure() throws ParseException { + Event mockEvent = mock(Event.class); + + Map eventData = new HashMap<>(); + eventData.put(Constants.TYPE, "record"); + eventData.put(Constants.DATABASE_NAME, "testdb"); + eventData.put(Constants.DB_USER_NAME, "testuser"); + eventData.put(Constants.REMOTE_HOST, "10.0.0.1"); + eventData.put(Constants.REMOTE_PORT, "5432"); + eventData.put(Constants.SESSION_ID, "session123"); + eventData.put(Constants.EXIT_CODE, "1"); + eventData.put(Constants.ERROR_MESSAGE, "password authentication failed for user \"testuser\""); + eventData.put(Constants.LOG_TIME, "2023-11-10T10:15:30Z"); + + when(mockEvent.getData()).thenReturn(eventData); + when(mockEvent.getField(Constants.DATABASE_NAME)).thenReturn("testdb"); + when(mockEvent.getField(Constants.DB_USER_NAME)).thenReturn("testuser"); + when(mockEvent.getField(Constants.REMOTE_HOST)).thenReturn("10.0.0.1"); + when(mockEvent.getField(Constants.REMOTE_PORT)).thenReturn("5432"); + when(mockEvent.getField(Constants.SESSION_ID)).thenReturn("session123"); + when(mockEvent.getField(Constants.EXIT_CODE)).thenReturn("1"); + when(mockEvent.getField(Constants.ERROR_MESSAGE)).thenReturn("password authentication failed for user \"testuser\""); + when(mockEvent.getField(Constants.LOG_TIME)).thenReturn("2023-11-10T10:15:30Z"); + when(mockEvent.getField(Constants.ACCOUNT_ID)).thenReturn("123456789012"); + when(mockEvent.getField(Constants.INSTANCE_NAME)).thenReturn("dsql-cluster-1"); + + Record record = Parser.parseRecord(mockEvent); + + assertNotNull(record); + assertNotNull(record.getException()); + assertEquals(Constants.LOGIN_FAILED, record.getException().getExceptionTypeId()); + assertTrue(record.getException().getDescription().contains("password authentication failed")); + } + + @Test + public void testParseTimestampValid() throws ParseException { + Event mockEvent = mock(Event.class); + + Map eventData = new HashMap<>(); + eventData.put(Constants.LOG_TIME, "2023-11-10T10:15:30Z"); + eventData.put(Constants.DATABASE_NAME, "testdb"); + eventData.put(Constants.DB_USER_NAME, "testuser"); + eventData.put(Constants.EXIT_CODE, "0"); + eventData.put(Constants.STATEMENT_TEXT, "SELECT 1;"); + + when(mockEvent.getData()).thenReturn(eventData); + when(mockEvent.getField(Constants.LOG_TIME)).thenReturn("2023-11-10T10:15:30Z"); + when(mockEvent.getField(Constants.DATABASE_NAME)).thenReturn("testdb"); + when(mockEvent.getField(Constants.DB_USER_NAME)).thenReturn("testuser"); + when(mockEvent.getField(Constants.EXIT_CODE)).thenReturn("0"); + when(mockEvent.getField(Constants.STATEMENT_TEXT)).thenReturn("SELECT 1;"); + when(mockEvent.getField(Constants.ACCOUNT_ID)).thenReturn("123456789012"); + when(mockEvent.getField(Constants.INSTANCE_NAME)).thenReturn("dsql-cluster-1"); + + Record record = Parser.parseRecord(mockEvent); + + assertNotNull(record.getTime()); + assertTrue(record.getTime().getTimstamp() > 0); + } + + @Test + public void testParseDbName() throws ParseException { + Event mockEvent = mock(Event.class); + + Map eventData = new HashMap<>(); + eventData.put(Constants.DATABASE_NAME, "production_db"); + eventData.put(Constants.DB_USER_NAME, "admin"); + eventData.put(Constants.EXIT_CODE, "0"); + eventData.put(Constants.STATEMENT_TEXT, "SELECT 1;"); + eventData.put(Constants.LOG_TIME, "2023-11-10T10:15:30Z"); + + when(mockEvent.getData()).thenReturn(eventData); + when(mockEvent.getField(Constants.DATABASE_NAME)).thenReturn("production_db"); + when(mockEvent.getField(Constants.DB_USER_NAME)).thenReturn("admin"); + when(mockEvent.getField(Constants.EXIT_CODE)).thenReturn("0"); + when(mockEvent.getField(Constants.STATEMENT_TEXT)).thenReturn("SELECT 1;"); + when(mockEvent.getField(Constants.LOG_TIME)).thenReturn("2023-11-10T10:15:30Z"); + when(mockEvent.getField(Constants.ACCOUNT_ID)).thenReturn("123456789012"); + when(mockEvent.getField(Constants.INSTANCE_NAME)).thenReturn("dsql-cluster-1"); + + Record record = Parser.parseRecord(mockEvent); + + assertNotNull(record.getDbName()); + assertEquals("123456789012:dsql-cluster-1:production_db", record.getDbName()); + assertEquals("123456789012:dsql-cluster-1:production_db", record.getAccessor().getServiceName()); + } + + @Test + public void testParseSessionLocator() throws ParseException { + Event mockEvent = mock(Event.class); + + Map eventData = new HashMap<>(); + eventData.put(Constants.DATABASE_NAME, "testdb"); + eventData.put(Constants.DB_USER_NAME, "testuser"); + eventData.put(Constants.REMOTE_HOST, "192.168.1.100"); + eventData.put(Constants.REMOTE_PORT, "54321"); + eventData.put(Constants.EXIT_CODE, "0"); + eventData.put(Constants.STATEMENT_TEXT, "SELECT 1;"); + eventData.put(Constants.LOG_TIME, "2023-11-10T10:15:30Z"); + + when(mockEvent.getData()).thenReturn(eventData); + when(mockEvent.getField(Constants.DATABASE_NAME)).thenReturn("testdb"); + when(mockEvent.getField(Constants.DB_USER_NAME)).thenReturn("testuser"); + when(mockEvent.getField(Constants.REMOTE_HOST)).thenReturn("192.168.1.100"); + when(mockEvent.getField(Constants.REMOTE_PORT)).thenReturn("54321"); + when(mockEvent.getField(Constants.EXIT_CODE)).thenReturn("0"); + when(mockEvent.getField(Constants.STATEMENT_TEXT)).thenReturn("SELECT 1;"); + when(mockEvent.getField(Constants.LOG_TIME)).thenReturn("2023-11-10T10:15:30Z"); + when(mockEvent.getField(Constants.ACCOUNT_ID)).thenReturn("123456789012"); + when(mockEvent.getField(Constants.INSTANCE_NAME)).thenReturn("dsql-cluster-1"); + + Record record = Parser.parseRecord(mockEvent); + + assertNotNull(record.getSessionLocator()); + assertEquals("192.168.1.100", record.getSessionLocator().getClientIp()); + assertEquals(54321, record.getSessionLocator().getClientPort()); + assertEquals("123456789012:dsql-cluster-1:testdb", record.getSessionLocator().getServerIp()); + assertFalse(record.getSessionLocator().isIpv6()); + } +} + +// Made with Bob diff --git a/filter-plugin/logstash-filter-yugabyte-guardium/YugabyteOverSyslogPackage/YugabyteDB/filter.conf b/filter-plugin/logstash-filter-yugabyte-guardium/YugabyteOverSyslogPackage/YugabyteDB/filter.conf index bc767fd22..2bc3663c7 100644 --- a/filter-plugin/logstash-filter-yugabyte-guardium/YugabyteOverSyslogPackage/YugabyteDB/filter.conf +++ b/filter-plugin/logstash-filter-yugabyte-guardium/YugabyteOverSyslogPackage/YugabyteDB/filter.conf @@ -1,21 +1,21 @@ filter { if [type] == "syslog_yugabyte" { if [message] =~ /LOG/ and [message] !~ /AUDIT/ { - drop { } + drop {} } # Dorpping the loglines contains the queries for pg_catalog schema if [message] =~ /(?i)(from\s+)?pg_catalog|search_path|application_name|current_schema|session_user|TRANSACTION\s+ISOLATION\s+LEVEL|extra_float_digits/ { - drop { } + drop {} } # Dorpping the loglines contains the queries for information_schema schema if [message] =~ /(?i)(from\s+)?information_schema/ { - drop { } + drop {} } if [message] !~ /(?i)(AUDIT:|ERROR:|STATEMENT:|FATAL:|DETAIL:)/ { - drop { } + drop {} } mutate { @@ -47,11 +47,26 @@ filter { # required data by separating the log logline initially by spaces or tabs up to occurrence of the # word "AUDIT", and then later it will look for the commas as a separator. match => { - "message" => "%{IPORHOST:server_hostname},%{IPV4:server_ip},(%{NUMBER:timestamp}[\t\s]+)(%{IP:client_ip})\(%{NUMBER:client_port}\)[\t\s]+\[(%{NUMBER:process_id})\][\t\s]+(%{GREEDYDATA:application_name})[\t\s]+(%{USERNAME:username})[\t\s]+(%{WORD:db_name})[\t\s]+(?%{BASE16NUM}.%{BASE16NUM})[\t\s]+(%{NUMBER:trasaction_id})[\s\t]+(%{WORD}):[ \t]+AUDIT:[\s\t]+(?SESSION|OBJECT),(%{INT:statement_id}),(%{INT:sub_statement_id}),(%{WORD:event_category}),(?[A-Z\sa-z]*),(?([A-Z.a-z\s]*)),(?([^,]*)),(%{GREEDYDATA:query}),(%{GREEDYDATA:parameters})" + "message" => [ + "%{IPORHOST:server_hostname},%{IPV4:server_ip},(%{NUMBER:timestamp}[\t\s]+)(%{IP:client_ip})\(%{NUMBER:client_port}\)[\t\s]+\[(%{NUMBER:process_id})\][\t\s]+(%{GREEDYDATA:application_name})[\t\s]+(%{USERNAME:username})[\t\s]+(%{WORD:db_name})[\t\s]+(?%{BASE16NUM}.%{BASE16NUM})[\t\s]+(%{NUMBER:trasaction_id})[\s\t]+(%{WORD}):[ \t]+AUDIT:[\s\t]+(?SESSION|OBJECT),(%{INT:statement_id}),(%{INT:sub_statement_id}),(%{WORD:event_category}),(?[A-Z\sa-z]*),(?([A-Z.a-z\s]*)),(?([^,]*)),(%{GREEDYDATA:query}),(%{GREEDYDATA:parameters})", + "(%{TIMESTAMP_ISO8601:timestamp_str}[\t\s]+%{WORD:timezone}[\t\s]+)(%{IP:client_ip})\(%{NUMBER:client_port};?\)[\t\s]+\[(%{NUMBER:process_id})\][\t\s]+(%{GREEDYDATA:application_name})[\t\s]+(%{USERNAME:username})[\t\s]+(%{WORD:db_name})[\t\s]+(?%{BASE16NUM}.%{BASE16NUM})[\t\s]+(%{NUMBER:trasaction_id})[\s\t]+(%{WORD}):[ \t]+AUDIT:[\s\t]+(?SESSION|OBJECT),(%{INT:statement_id}),(%{INT:sub_statement_id}),(%{WORD:event_category}),(?[A-Z\sa-z]*),(?([A-Z.a-z\s]*)),(?([^,]*)),(%{GREEDYDATA:query}),(%{GREEDYDATA:parameters})" + ] } tag_on_failure => ["sql_error_event"] } + # Convert timestamp_str to epoch timestamp if it exists + if [timestamp_str] { + date { + match => ["timestamp_str", "yyyy-MM-dd HH:mm:ss.SSS"] + target => "timestamp" + timezone => "%{timezone}" + } + ruby { + code => "event.set('timestamp', event.get('@timestamp').to_f)" + } + } + if "sql_error_event" not in [tags] { mutate { gsub => ["query", '\\"', '"'] } } @@ -68,7 +83,10 @@ filter { # the regex looks for the sequence of the workds "authentication failed for user" grok { match => { - "message" => "FATAL:[\s\t]+(?(.*))[\s\t\\n]+(%{NUMBER:timestamp}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port}\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+DETAIL:[\s\t]+(?.*?\.)" + "message" => [ + "FATAL:[\s\t]+(?(.*))[\s\t\\n]+(%{NUMBER:timestamp}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port}\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+DETAIL:[\s\t]+(?.*?\.)", + "FATAL:[\s\t]+(?(.*))[\s\t\\n]+(%{TIMESTAMP_ISO8601:timestamp_str}[\s\t]+%{WORD:timezone}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port};?\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+DETAIL:[\s\t]+(?.*?\.)" + ] } add_field => { error_description => "%{error_fatal}. %{error_detail}" @@ -76,6 +94,16 @@ filter { } remove_field => ["error_fatal", "error_detail"] } + if [timestamp_str] { + date { + match => ["timestamp_str", "yyyy-MM-dd HH:mm:ss.SSS"] + target => "timestamp" + timezone => "%{timezone}" + } + ruby { + code => "event.set('timestamp', event.get('@timestamp').to_f)" + } + } } else if [message] =~ /(?i)((role|user).*does\s+not\s+exist)/ and [message] !~ /STATEMENT:/ { # Fatal logs for authentication failed issue other then password authentication. For example, Role not found # The statments which are specific to CREATING and DROPPING the roles won't come under this condition. @@ -83,12 +111,25 @@ filter { # https://www.postgresql.org/docs/current/client-authentication-problems.html grok { match => { - "message" => "%{IPORHOST:server_hostname},%{IPV4:server_ip},(%{NUMBER:timestamp}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port}\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+(FATAL:)[\s\t]+(%{GREEDYDATA:error_description}[\s\t]*)" + "message" => [ + "%{IPORHOST:server_hostname},%{IPV4:server_ip},(%{NUMBER:timestamp}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port}\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+(FATAL:)[\s\t]+(%{GREEDYDATA:error_description}[\s\t]*)", + "(%{TIMESTAMP_ISO8601:timestamp_str}[\s\t]+%{WORD:timezone}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port};?\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+(FATAL:)[\s\t]+(%{GREEDYDATA:error_description}[\s\t]*)" + ] } add_field => { event_type => "AUTH_ERROR" } } + if [timestamp_str] { + date { + match => ["timestamp_str", "yyyy-MM-dd HH:mm:ss.SSS"] + target => "timestamp" + timezone => "%{timezone}" + } + ruby { + code => "event.set('timestamp', event.get('@timestamp').to_f)" + } + } } else if [message] =~ /STATEMENT:/ { # Error logs containing the lastly fired query, Example, Table not found error # The regex starts looking from the matching timestamp in the log event. It will extract the @@ -96,23 +137,49 @@ filter { # word "STATEMENT" and then later, the text will be considered as fired query. grok { match => { - "message" => "%{IPORHOST:server_hostname},%{IPV4:server_ip},(%{NUMBER:timestamp}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port}\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+(ERROR:)[\s\t]+(%{GREEDYDATA:error_description}[\s\t\\n]+)(%{NUMBER}[\s\t]+)(%{IP})\(%{NUMBER}\)[\s\t]+\[(%{NUMBER})\][\s\t]+(%{GREEDYDATA})[\s\t]+(%{USERNAME})[\s\t]+(%{WORD})[\s\t]+(%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER})[\s\t]+(STATEMENT:)[\t\s]+(%{GREEDYDATA:query})" + "message" => [ + "%{IPORHOST:server_hostname},%{IPV4:server_ip},(%{NUMBER:timestamp}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port}\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+(ERROR:)[\s\t]+(%{GREEDYDATA:error_description}[\s\t\\n]+)(%{NUMBER}[\s\t]+)(%{IP})\(%{NUMBER}\)[\s\t]+\[(%{NUMBER})\][\s\t]+(%{GREEDYDATA})[\s\t]+(%{USERNAME})[\s\t]+(%{WORD})[\s\t]+(%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER})[\s\t]+(STATEMENT:)[\t\s]+(%{GREEDYDATA:query})", + "(%{TIMESTAMP_ISO8601:timestamp_str}[\s\t]+%{WORD:timezone}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port};?\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+(ERROR:)[\s\t]+(%{GREEDYDATA:error_description}[\s\t\\n]+)(%{TIMESTAMP_ISO8601}[\s\t]+%{WORD}[\s\t]+)(%{IP})\(%{NUMBER};?\)[\s\t]+\[(%{NUMBER})\][\s\t]+(%{GREEDYDATA})[\s\t]+(%{USERNAME})[\s\t]+(%{WORD})[\s\t]+(%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER})[\s\t]+(STATEMENT:)[\t\s]+(%{GREEDYDATA:query})" + ] } add_field => { event_type => "SQL_ERROR" } } + if [timestamp_str] { + date { + match => ["timestamp_str", "yyyy-MM-dd HH:mm:ss.SSS"] + target => "timestamp" + timezone => "%{timezone}" + } + ruby { + code => "event.set('timestamp', event.get('@timestamp').to_f)" + } + } } else { # Error logs containing only error messages, Example, Syntax errors # the regex looks for the keyword "Error" grok { match => { - "message" => "%{IPORHOST:server_hostname},%{IPV4:server_ip},(%{NUMBER:timestamp}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port}\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+(ERROR:|FATAL:)[\s\t]+(%{GREEDYDATA:error_description}[\s\t]*)" + "message" => [ + "%{IPORHOST:server_hostname},%{IPV4:server_ip},(%{NUMBER:timestamp}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port}\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+(ERROR:|FATAL:)[\s\t]+(%{GREEDYDATA:error_description}[\s\t]*)", + "(%{TIMESTAMP_ISO8601:timestamp_str}[\s\t]+%{WORD:timezone}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port};?\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+(ERROR:|FATAL:)[\s\t]+(%{GREEDYDATA:error_description}[\s\t]*)" + ] } add_field => { event_type => "SQL_ERROR" } } + if [timestamp_str] { + date { + match => ["timestamp_str", "yyyy-MM-dd HH:mm:ss.SSS"] + target => "timestamp" + timezone => "%{timezone}" + } + ruby { + code => "event.set('timestamp', event.get('@timestamp').to_f)" + } + } } } @@ -131,7 +198,7 @@ filter { } } if "_grokparsefailure" in [tags] { - drop { } + drop {} } yugabytedb_guardium_filter{} diff --git a/filter-plugin/logstash-filter-yugabyte-guardium/YugabytedbOverFilebeatPackage/YugabyteDB/filter.conf b/filter-plugin/logstash-filter-yugabyte-guardium/YugabytedbOverFilebeatPackage/YugabyteDB/filter.conf index 531bbbd76..c39dde1a5 100644 --- a/filter-plugin/logstash-filter-yugabyte-guardium/YugabytedbOverFilebeatPackage/YugabyteDB/filter.conf +++ b/filter-plugin/logstash-filter-yugabyte-guardium/YugabytedbOverFilebeatPackage/YugabyteDB/filter.conf @@ -43,12 +43,44 @@ filter { # required data by separating the log logline initially by spaces or tabs up to occurrence of the # word "AUDIT", and then later it will look for the commas as a separator. match => { - "message" => "(%{NUMBER:timestamp}[\t\s]+)(%{IP:client_ip})\(%{NUMBER:client_port}\)[\t\s]+\[(%{NUMBER:process_id})\][\t\s]+(%{GREEDYDATA:application_name})[\t\s]+(%{USERNAME:username})[\t\s]+(%{WORD:db_name})[\t\s]+(?%{BASE16NUM}.%{BASE16NUM})[\t\s]+(%{NUMBER:trasaction_id})[\s\t]+(%{WORD:event_category}):[ \t]+AUDIT:[\s\t]+(?SESSION|OBJECT),(%{INT:statement_id}),(%{INT:sub_statement_id}),(%{WORD:event_category}),(?[A-Z\sa-z]*),(?([A-Z.a-z\s]*)),(?([^,]*)),(%{GREEDYDATA:query}),(%{GREEDYDATA:parameters})" + "message" => [ + "(%{NUMBER:timestamp}[\t\s]+)(%{IP:client_ip})\(%{NUMBER:client_port}\)[\t\s]+\[(%{NUMBER:process_id})\][\t\s]+(%{GREEDYDATA:application_name})[\t\s]+(%{USERNAME:username})[\t\s]+(%{WORD:db_name})[\t\s]+(?%{BASE16NUM}.%{BASE16NUM})[\t\s]+(%{NUMBER:trasaction_id})[\s\t]+(%{WORD:event_category}):[ \t]+AUDIT:[\s\t]+(?SESSION|OBJECT),(%{INT:statement_id}),(%{INT:sub_statement_id}),(%{WORD:event_category}),(?[A-Z\sa-z]*),(?([A-Z.a-z\s]*)),(?([^,]*)),(%{GREEDYDATA:query}),(%{GREEDYDATA:parameters})", + "(%{YEAR}-%{MONTHNUM}-%{MONTHDAY}[\t\s]+%{TIME}[\t\s]+%{WORD:timezone}[\t\s]+)(%{IP:client_ip})\(%{NUMBER:client_port};?\)[\t\s]+\[(%{NUMBER:process_id})\][\t\s]+(%{GREEDYDATA:application_name})[\t\s]+(%{USERNAME:username})[\t\s]+(%{WORD:db_name})[\t\s]+(?%{BASE16NUM}.%{BASE16NUM})[\t\s]+(%{NUMBER:trasaction_id})[\s\t]+(%{WORD:event_category}):[ \t]+AUDIT:[\s\t]+(?SESSION|OBJECT),(%{INT:statement_id}),(%{INT:sub_statement_id}),(%{WORD:event_category}),(?[A-Z\sa-z]*),(?([A-Z.a-z\s]*)),(?([^,]*)),(%{GREEDYDATA:query}),(%{GREEDYDATA:parameters})" + ] } tag_on_failure => ["sql_error_event"] } } + # Convert formatted timestamp to Unix epoch if timezone field exists + if [timezone] { + # Extract the timestamp portion from the message + grok { + match => { + "message" => "^(?%{YEAR}-%{MONTHNUM}-%{MONTHDAY}[\s\t]+%{TIME})[\s\t]+%{WORD:tz}" + } + overwrite => ["timezone"] + } + # Parse the timestamp and convert to epoch seconds + date { + match => ["timestamp_str", "yyyy-MM-dd HH:mm:ss.SSS"] + target => "@timestamp_temp" + } + ruby { + code => " + ts = event.get('@timestamp_temp') + if ts + epoch_seconds = ts.to_f + event.set('timestamp', epoch_seconds) + end + event.remove('@timestamp_temp') + " + } + mutate { + remove_field => ["timestamp_str", "tz"] + } + } + if "sql_error_event" in [tags] { mutate { remove_tag => [ "sql_event" ] @@ -60,7 +92,10 @@ filter { # the regex looks for the sequence of the workds "authentication failed for user" grok { match => { - "message" => "FATAL:[\s\t]+(?(.*))[\s\t]+(%{NUMBER:timestamp}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port}\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+DETAIL:[\s\t]+(?.*?\.)" + "message" => [ + "FATAL:[\s\t]+(?(.*))[\s\t]+(%{NUMBER:timestamp}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port}\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+DETAIL:[\s\t]+(?.*?\.)", + "FATAL:[\s\t]+(?(.*))[\s\t]+(%{YEAR}-%{MONTHNUM}-%{MONTHDAY}[\s\t]+%{TIME}[\s\t]+%{WORD:timezone}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port};?\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+DETAIL:[\s\t]+(?.*?\.)" + ] } add_field => { error_description => "%{error_fatal}. %{error_detail}" @@ -74,7 +109,10 @@ filter { # https://www.postgresql.org/docs/current/client-authentication-problems.html grok { match => { - "message" => "(%{NUMBER:timestamp}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port}\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+(FATAL:)[\s\t]+(%{GREEDYDATA:error_description}[\s\t]*)" + "message" => [ + "(%{NUMBER:timestamp}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port}\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+(FATAL:)[\s\t]+(%{GREEDYDATA:error_description}[\s\t]*)", + "(%{YEAR}-%{MONTHNUM}-%{MONTHDAY}[\s\t]+%{TIME}[\s\t]+%{WORD:timezone}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port};?\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+(FATAL:)[\s\t]+(%{GREEDYDATA:error_description}[\s\t]*)" + ] } add_field => { event_type => "AUTH_ERROR" @@ -87,7 +125,10 @@ filter { # word "STATEMENT" and then later, the text will be considered as fired query. grok { match => { - "message" => "(%{NUMBER:timestamp}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port}\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+(ERROR:)[\s\t]+(%{GREEDYDATA:error_description}[\s\t]+)(%{NUMBER}[\s\t]+)(%{IP})\(%{NUMBER}\)[\s\t]+\[(%{NUMBER})\][\s\t]+(%{GREEDYDATA})[\s\t]+(%{USERNAME})[\s\t]+(%{WORD})[\s\t]+(%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER})[\s\t]+(STATEMENT:)[\t\s]+(%{GREEDYDATA:query})" + "message" => [ + "(%{NUMBER:timestamp}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port}\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+(ERROR:)[\s\t]+(%{GREEDYDATA:error_description}[\s\t]+)(%{NUMBER}[\s\t]+)(%{IP})\(%{NUMBER}\)[\s\t]+\[(%{NUMBER})\][\s\t]+(%{GREEDYDATA})[\s\t]+(%{USERNAME})[\s\t]+(%{WORD})[\s\t]+(%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER})[\s\t]+(STATEMENT:)[\t\s]+(%{GREEDYDATA:query})", + "(%{YEAR}-%{MONTHNUM}-%{MONTHDAY}[\s\t]+%{TIME}[\s\t]+%{WORD:timezone}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port};?\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+(ERROR:)[\s\t]+(%{GREEDYDATA:error_description}[\s\t]+)(%{YEAR}-%{MONTHNUM}-%{MONTHDAY}[\s\t]+%{TIME}[\s\t]+%{WORD}[\s\t]+)(%{IP})\(%{NUMBER};?\)[\s\t]+\[(%{NUMBER})\][\s\t]+(%{GREEDYDATA})[\s\t]+(%{USERNAME})[\s\t]+(%{WORD})[\s\t]+(%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER})[\s\t]+(STATEMENT:)[\t\s]+(%{GREEDYDATA:query})" + ] } add_field => { event_type => "SQL_ERROR" @@ -98,7 +139,10 @@ filter { # the regex looks for the keyword "Error" grok { match => { - "message" => "(%{NUMBER:timestamp}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port}\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+(ERROR:|FATAL:)[\s\t]+(%{GREEDYDATA:error_description}[\s\t]*)" + "message" => [ + "(%{NUMBER:timestamp}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port}\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+(ERROR:|FATAL:)[\s\t]+(%{GREEDYDATA:error_description}[\s\t]*)", + "(%{YEAR}-%{MONTHNUM}-%{MONTHDAY}[\s\t]+%{TIME}[\s\t]+%{WORD:timezone}[\s\t]+)(%{IP:client_ip}[\s\t]*)\(%{NUMBER:client_port};?\)[\s\t]+\[(%{NUMBER:process_id})\][\s\t]+(%{GREEDYDATA:application_name})[\s\t]+(%{USERNAME:username})[\s\t]+(%{WORD:db_name})[\s\t]+(?%{BASE16NUM}.%{BASE16NUM})[\s\t]+(%{NUMBER:trasaction_id})[\s\t]+(ERROR:|FATAL:)[\s\t]+(%{GREEDYDATA:error_description}[\s\t]*)" + ] } add_field => { event_type => "SQL_ERROR"