diff --git a/filter-plugin/logstash-filter-dsql-guardium/.gitignore b/filter-plugin/logstash-filter-dsql-guardium/.gitignore new file mode 100644 index 000000000..288053a14 --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/.gitignore @@ -0,0 +1,37 @@ +# Gradle +.gradle/ +build/ +out/ + +# IDE +.idea/ +*.iml +*.ipr +*.iws +.vscode/ +.settings/ +.classpath +.project + +# OS +.DS_Store +Thumbs.db + +# Logstash +vendor/ +Gemfile +Gemfile.lock +*.gemspec +*.gem +lib/ + +# Logs +*.log + +# Temporary files +*.tmp +*.bak +*.swp +*~ + +# Made with Bob diff --git a/filter-plugin/logstash-filter-dsql-guardium/CHANGELOG.md b/filter-plugin/logstash-filter-dsql-guardium/CHANGELOG.md new file mode 100644 index 000000000..0d5653a06 --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/CHANGELOG.md @@ -0,0 +1,47 @@ +# Changelog + +All notable changes to the DSQL Guardium filter plugin will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [1.0.0] - 2024-01-01 + +### Added +- Initial release of DSQL Guardium filter plugin +- Support for AWS DSQL Database Activity Streams +- PostgreSQL-compatible SQL parsing (DSQL uses PostgreSQL protocol) +- SQS input integration for receiving audit events +- Parsing of successful SQL statements +- Parsing of SQL errors and exceptions +- Authentication failure detection +- Session tracking with client IP and port +- Database user and database name extraction +- Timestamp parsing from audit logs +- Client application identification +- Comprehensive test coverage +- Documentation and configuration examples + +### Features +- **Server Type**: POSTGRESQL +- **Data Protocol**: POSTGRESQL +- **Language**: PGRS (PostgreSQL) +- **Input Method**: AWS SQS +- **Supported Events**: + - Successful queries + - SQL errors + - Authentication failures + - Connection events + +### Known Limitations +- IPv6 is not supported +- Client hostname and OS user fields are not available in DSQL audit logs +- Multiline characters in queries may not be preserved when using SQS +- Single line comments in queries are not fully supported + +## [Unreleased] + +### Planned +- Support for additional DSQL-specific features as they become available +- Enhanced error handling and logging +- Performance optimizations for high-volume environments \ No newline at end of file diff --git a/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/DSQL/filter.conf b/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/DSQL/filter.conf new file mode 100644 index 000000000..30d58c15d --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/DSQL/filter.conf @@ -0,0 +1,84 @@ +filter { + if [type] == "dsql" { + + # Parse the JSON message from SQS + json { + source => "message" + target => "parsed_message" + } + + # Extract fields from the parsed message to top level for easier access + if [parsed_message] { + mutate { + add_field => { + "databaseName" => "%{[parsed_message][databaseName]}" + "dbUserName" => "%{[parsed_message][dbUserName]}" + "remoteHost" => "%{[parsed_message][remoteHost]}" + "remotePort" => "%{[parsed_message][remotePort]}" + "sessionId" => "%{[parsed_message][sessionId]}" + "statementText" => "%{[parsed_message][statementText]}" + "exitCode" => "%{[parsed_message][exitCode]}" + "logTime" => "%{[parsed_message][logTime]}" + } + } + + # Add optional fields if they exist + if [parsed_message][clientApplication] { + mutate { + add_field => { "clientApplication" => "%{[parsed_message][clientApplication]}" } + } + } + + if [parsed_message][errorMessage] { + mutate { + add_field => { "errorMessage" => "%{[parsed_message][errorMessage]}" } + } + } + + if [parsed_message][startTime] { + mutate { + add_field => { "startTime" => "%{[parsed_message][startTime]}" } + } + } + + if [parsed_message][commandTag] { + mutate { + add_field => { "commandTag" => "%{[parsed_message][commandTag]}" } + } + } + + if [parsed_message][statementId] { + mutate { + add_field => { "statementId" => "%{[parsed_message][statementId]}" } + } + } + } + + # Apply the DSQL Guardium filter + dsql_guardium_plugin_filter {} + + # Clean up temporary fields + mutate { + remove_field => [ + "parsed_message", + "message", + "databaseName", + "dbUserName", + "remoteHost", + "remotePort", + "sessionId", + "statementText", + "exitCode", + "logTime", + "clientApplication", + "errorMessage", + "startTime", + "commandTag", + "statementId", + "type", + "@timestamp", + "@version" + ] + } + } +} \ No newline at end of file diff --git a/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/DSQL/logstash-filter-dsql_guardium_plugin_filter.zip b/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/DSQL/logstash-filter-dsql_guardium_plugin_filter.zip new file mode 100644 index 000000000..cbb34a13c Binary files /dev/null and b/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/DSQL/logstash-filter-dsql_guardium_plugin_filter.zip differ diff --git a/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/DSQL/manifest.json b/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/DSQL/manifest.json new file mode 100644 index 000000000..f2d5a2189 --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/DSQL/manifest.json @@ -0,0 +1,14 @@ +{ + "name": "Guardium_DSQL_filter", + "alias": "DSQL", + "type": "filter", + "pipeline_type":"pull", + "plugin_version": "1.0.0", + "datasourceTypes": [{"type":"DSQL","supportedVersions": [""]}], + "supported_input_plugins": ["SQS_input"], + "developer": "IBM", + "license": "Apache2.0", + "description": "Parses AWS DSQL Database Activity Stream events into Guardium.", + "configuration_notes": "Requires AWS SQS input plugin configured to receive DSQL Database Activity Stream events.", + "documentation_path": "https://github.com/IBM/universal-connectors/blob/main/filter-plugin/logstash-filter-dsql-guardium/README.md" +} \ No newline at end of file diff --git a/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/dsqlSQS.conf b/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/dsqlSQS.conf new file mode 100644 index 000000000..64de8ccf4 --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/dsqlSQS.conf @@ -0,0 +1,127 @@ +#/* +#Copyright 2020-2021 IBM Inc. All rights reserved +#SPDX-License-Identifier: Apache-2.0 +#*/ + +input { + sqs { + # Insert the access key and secret that has access to SQS queue + access_key_id => "" + secret_access_key => "" + region => "" # Region that has the Queue, Default value: us-east-1 + queue => "" # This parameter simply holds the Queue name and not the URL + codec => "json" + type => "dsql" + # Insert the account id of the AWS account + add_field => {"account_id" => ""} + # Insert the Instance/Cluster name of the DSQL database that is to be monitored + add_field => {"instance_name" => ""} + # Optional: Specify a custom endpoint (e.g., proxy) + # endpoint => "https://proxy.company.com" + # Set to true to use AWS's bundled CA certificates for SSL/TLS connections + # use_aws_bundled_ca => false + # Optional: Provide additional settings (e.g., custom SSL certificate bundle) + # additional_settings => { + # ssl_ca_bundle => "/usr/share/logstash/third_party/" + # } + } +} + +filter { + if [type] == "dsql" { + + # Parse the JSON message from SQS + json { + source => "message" + target => "parsed_message" + } + + # Extract fields from the parsed message to top level for easier access + if [parsed_message] { + mutate { + add_field => { + "databaseName" => "%{[parsed_message][databaseName]}" + "dbUserName" => "%{[parsed_message][dbUserName]}" + "remoteHost" => "%{[parsed_message][remoteHost]}" + "remotePort" => "%{[parsed_message][remotePort]}" + "sessionId" => "%{[parsed_message][sessionId]}" + "statementText" => "%{[parsed_message][statementText]}" + "exitCode" => "%{[parsed_message][exitCode]}" + "logTime" => "%{[parsed_message][logTime]}" + } + } + + # Add optional fields if they exist + if [parsed_message][clientApplication] { + mutate { + add_field => { "clientApplication" => "%{[parsed_message][clientApplication]}" } + } + } + + if [parsed_message][errorMessage] { + mutate { + add_field => { "errorMessage" => "%{[parsed_message][errorMessage]}" } + } + } + + if [parsed_message][startTime] { + mutate { + add_field => { "startTime" => "%{[parsed_message][startTime]}" } + } + } + + if [parsed_message][commandTag] { + mutate { + add_field => { "commandTag" => "%{[parsed_message][commandTag]}" } + } + } + + if [parsed_message][statementId] { + mutate { + add_field => { "statementId" => "%{[parsed_message][statementId]}" } + } + } + } + + # Apply the DSQL Guardium filter + dsql_guardium_plugin_filter {} + + # Clean up temporary fields + mutate { + remove_field => [ + "parsed_message", + "message", + "databaseName", + "dbUserName", + "remoteHost", + "remotePort", + "sessionId", + "statementText", + "exitCode", + "logTime", + "clientApplication", + "errorMessage", + "startTime", + "commandTag", + "statementId", + "type", + "@timestamp", + "@version" + ] + } + } +} + +output { + # Send to Guardium + if [GuardRecord] { + guardium_connector { + # Guardium configuration + } + } + + # Optional: Output to stdout for debugging + # stdout { + # codec => rubydebug + # } +} diff --git a/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/gi_templates.json b/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/gi_templates.json new file mode 100644 index 000000000..a8b708cdf --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/DSQLOverSQSPackage/gi_templates.json @@ -0,0 +1,12 @@ +{ + "templates": [ + { + "name": "DSQL over SQS", + "description": "Template for AWS DSQL Database Activity Streams over SQS", + "type": "filter", + "datasource_type": "DSQL", + "input_type": "SQS", + "version": "1.0.0" + } + ] +} \ No newline at end of file diff --git a/filter-plugin/logstash-filter-dsql-guardium/LICENSE b/filter-plugin/logstash-filter-dsql-guardium/LICENSE new file mode 100644 index 000000000..eebb4fc0b --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/LICENSE @@ -0,0 +1,190 @@ +Apache License +Version 2.0, January 2004 +http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + +"License" shall mean the terms and conditions for use, reproduction, +and distribution as defined by Sections 1 through 9 of this document. + +"Licensor" shall mean the copyright owner or entity authorized by +the copyright owner that is granting the License. + +"Legal Entity" shall mean the union of the acting entity and all +other entities that control, are controlled by, or are under common +control with that entity. For the purposes of this definition, +"control" means (i) the power, direct or indirect, to cause the +direction or management of such entity, whether by contract or +otherwise, or (ii) ownership of fifty percent (50%) or more of the +outstanding shares, or (iii) beneficial ownership of such entity. + +"You" (or "Your") shall mean an individual or Legal Entity +exercising permissions granted by this License. + +"Source" form shall mean the preferred form for making modifications, +including but not limited to software source code, documentation +source, and configuration files. + +"Object" form shall mean any form resulting from mechanical +transformation or translation of a Source form, including but +not limited to compiled object code, generated documentation, +and conversions to other media types. + +"Work" shall mean the work of authorship, whether in Source or +Object form, made available under the License, as indicated by a +copyright notice that is included in or attached to the work +(an example is provided in the Appendix below). + +"Derivative Works" shall mean any work, whether in Source or Object +form, that is based on (or derived from) the Work and for which the +editorial revisions, annotations, elaborations, or other modifications +represent, as a whole, an original work of authorship. For the purposes +of this License, Derivative Works shall not include works that remain +separable from, or merely link (or bind by name) to the interfaces of, +the Work and Derivative Works thereof. + +"Contribution" shall mean any work of authorship, including +the original version of the Work and any modifications or additions +to that Work or Derivative Works thereof, that is intentionally +submitted to Licensor for inclusion in the Work by the copyright owner +or by an individual or Legal Entity authorized to submit on behalf of +the copyright owner. For the purposes of this definition, "submitted" +means any form of electronic, verbal, or written communication sent +to the Licensor or its representatives, including but not limited to +communication on electronic mailing lists, source code control systems, +and issue tracking systems that are managed by, or on behalf of, the +Licensor for the purpose of discussing and improving the Work, but +excluding communication that is conspicuously marked or otherwise +designated in writing by the copyright owner as "Not a Contribution." + +"Contributor" shall mean Licensor and any individual or Legal Entity +on behalf of whom a Contribution has been received by Licensor and +subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of +this License, each Contributor hereby grants to You a perpetual, +worldwide, non-exclusive, no-charge, royalty-free, irrevocable +copyright license to reproduce, prepare Derivative Works of, +publicly display, publicly perform, sublicense, and distribute the +Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of +this License, each Contributor hereby grants to You a perpetual, +worldwide, non-exclusive, no-charge, royalty-free, irrevocable +(except as stated in this section) patent license to make, have made, +use, offer to sell, sell, import, and otherwise transfer the Work, +where such license applies only to those patent claims licensable +by such Contributor that are necessarily infringed by their +Contribution(s) alone or by combination of their Contribution(s) +with the Work to which such Contribution(s) was submitted. If You +institute patent litigation against any entity (including a +cross-claim or counterclaim in a lawsuit) alleging that the Work +or a Contribution incorporated within the Work constitutes direct +or contributory patent infringement, then any patent licenses +granted to You under this License for that Work shall terminate +as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the +Work or Derivative Works thereof in any medium, with or without +modifications, and in Source or Object form, provided that You +meet the following conditions: + +(a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + +(b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + +(c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + +(d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + +You may add Your own copyright statement to Your modifications and +may provide additional or different license terms and conditions +for use, reproduction, or distribution of Your modifications, or +for any such Derivative Works as a whole, provided Your use, +reproduction, and distribution of the Work otherwise complies with +the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, +any Contribution intentionally submitted for inclusion in the Work +by You to the Licensor shall be under the terms and conditions of +this License, without any additional terms or conditions. +Notwithstanding the above, nothing herein shall supersede or modify +the terms of any separate license agreement you may have executed +with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade +names, trademarks, service marks, or product names of the Licensor, +except as required for reasonable and customary use in describing the +origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or +agreed to in writing, Licensor provides the Work (and each +Contributor provides its Contributions) on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied, including, without limitation, any warranties or conditions +of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A +PARTICULAR PURPOSE. You are solely responsible for determining the +appropriateness of using or redistributing the Work and assume any +risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, +whether in tort (including negligence), contract, or otherwise, +unless required by applicable law (such as deliberate and grossly +negligent acts) or agreed to in writing, shall any Contributor be +liable to You for damages, including any direct, indirect, special, +incidental, or consequential damages of any character arising as a +result of this License or out of the use or inability to use the +Work (including but not limited to damages for loss of goodwill, +work stoppage, computer failure or malfunction, or any and all +other commercial damages or losses), even if such Contributor +has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing +the Work or Derivative Works thereof, You may choose to offer, +and charge a fee for, acceptance of support, warranty, indemnity, +or other liability obligations and/or rights consistent with this +License. However, in accepting such obligations, You may act only +on Your own behalf and on Your sole responsibility, not on behalf +of any other Contributor, and only if You agree to indemnify, +defend, and hold each Contributor harmless for any liability +incurred by, or claims asserted against, such Contributor by reason +of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +Copyright 2020-2021 IBM Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. \ No newline at end of file diff --git a/filter-plugin/logstash-filter-dsql-guardium/README.md b/filter-plugin/logstash-filter-dsql-guardium/README.md new file mode 100644 index 000000000..fb321d117 --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/README.md @@ -0,0 +1,353 @@ +## DSQL-Guardium Logstash filter plug-in + +### Meet DSQL +* Tested versions: +* Environment: AWS +* Supported inputs: SQS (pull) +* Supported Guardium versions: + * Guardium Data Protection: + +This is a [Logstash](https://github.com/elastic/logstash) filter plug-in for the universal connector that is featured in IBM Security Guardium. It parses events and messages from the AWS DSQL Database Activity Streams into a [Guardium record](https://github.com/IBM/universal-connectors/blob/main/common/src/main/java/com/ibm/guardium/universalconnector/commons/structures/Record.java) instance (which is a standard structure made out of several parts). The information is then sent over to Guardium. Guardium records include the accessor (the person who tried to access the data), the session, data, and exceptions. If there are no errors, the data contains details about the query "construct". The construct details the main action (verb) and collections (objects) involved. + +The plug-in is free and open-source (Apache 2.0). It can be used as a starting point to develop additional filter plug-ins for the Guardium universal connector. + +## 1. Overview + +AWS DSQL (Database SQL) is a distributed SQL database service that provides PostgreSQL-compatible database capabilities. This plugin processes audit logs from DSQL Database Activity Streams, which are delivered via Amazon SQS (Simple Queue Service). + +### Key Features +- Parses DSQL Database Activity Stream events in PostgreSQL format +- Supports both successful queries and error/exception events +- Captures detailed session information including client IP, port, and user details +- Handles authentication failures and SQL errors +- Compatible with AWS SQS input plugin + +## 2. Configuring DSQL Database Activity Streams + +### 2.1 Enable Database Activity Streams + +1. Navigate to the AWS RDS console +2. Select your DSQL cluster +3. Enable Database Activity Streams: + - Choose **Actions** → **Start activity stream** + - Select **Asynchronous** mode (recommended for production) + - Choose or create a KMS key for encryption + - Note the Kinesis Data Stream name created + +### 2.2 Configure SQS Queue + +1. Create an SQS queue to receive the activity stream events: + ```bash + aws sqs create-queue --queue-name dsql-activity-stream-queue + ``` + +2. Configure the Kinesis Data Stream to send events to SQS: + - Create a Lambda function to forward Kinesis events to SQS + - Or use Kinesis Data Firehose to deliver to SQS + +3. Set appropriate IAM permissions for the SQS queue + +### 2.3 Required IAM Permissions + +The AWS credentials used by Logstash must have the following permissions: + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "sqs:ReceiveMessage", + "sqs:DeleteMessage", + "sqs:GetQueueAttributes" + ], + "Resource": "arn:aws:sqs:REGION:ACCOUNT_ID:dsql-activity-stream-queue" + } + ] +} +``` + +## 3. Configuring the DSQL Filter in Guardium + +### 3.1 Logstash Configuration + +The Guardium universal connector is the Logstash runtime environment that comes with a Logstash service. The DSQL filter plug-in is a configuration file that defines the settings for the Logstash service. + +#### Procedure + +1. On the collector, go to **Setup** → **Tools and Views** → **Configure Universal Connector**. + +2. Enable the universal connector if it is disabled. + +3. Click **Upload File** and select the offline [dsql-offline-plugins-7.5.2.zip](./DSQLOverSQSPackage/DSQL/dsql-offline-plugins-7.5.2.zip) plug-in. After it is uploaded, click **OK**. + +4. Click the Plus sign to open the Connector Configuration dialog box. + +5. Type a name in the **Connector name** field. + +6. Update the input section to add details from your SQS queue: + +``` +input { + sqs { + access_key_id => "" + secret_access_key => "" + region => "" + queue => "" + codec => "json" + type => "dsql" + add_field => {"account_id" => ""} + add_field => {"instance_name" => ""} + } +} +``` + +7. The filter section is provided in the [filter.conf](./DSQLOverSQSPackage/DSQL/filter.conf) file. Include this section in your configuration. + +8. The output section should point to your Guardium collector: + +``` +output { + if [GuardRecord] { + guardium_connector { + guardium_ip => "" + guardium_port => + } + } +} +``` + +## 4. DSQL Audit Log Format + +The DSQL Database Activity Streams provide audit logs in JSON format. The plugin supports two formats: flat format and nested DatabaseActivityMonitoringRecord format. + +### 4.1 Flat Format + +#### 4.1.1 Successful Query Event +```json +{ + "type": "record", + "databaseName": "mydb", + "dbUserName": "postgres", + "remoteHost": "10.0.1.100", + "remotePort": 54321, + "sessionId": "session-12345", + "statementText": "SELECT * FROM users WHERE id = 1;", + "commandTag": "SELECT", + "exitCode": 0, + "logTime": "2023-11-10T10:15:30.123Z", + "clientApplication": "psql", + "statementId": "stmt-001" +} +``` + +#### 4.1.2 Error Event +```json +{ + "type": "record", + "databaseName": "mydb", + "dbUserName": "postgres", + "remoteHost": "10.0.1.100", + "remotePort": 54321, + "sessionId": "session-12345", + "statementText": "SELECT * FROM nonexistent_table;", + "exitCode": 1, + "errorMessage": "relation \"nonexistent_table\" does not exist", + "logTime": "2023-11-10T10:15:30.123Z" +} +``` + +#### 4.1.3 Authentication Failure Event +```json +{ + "type": "record", + "databaseName": "mydb", + "dbUserName": "baduser", + "remoteHost": "10.0.1.100", + "remotePort": 54321, + "exitCode": 1, + "errorMessage": "password authentication failed for user \"baduser\"", + "logTime": "2023-11-10T10:15:30.123Z" +} +``` + +### 4.2 Nested DatabaseActivityMonitoringRecord Format + +The plugin also supports a nested format where events are wrapped in a `DatabaseActivityMonitoringRecord` structure with a `databaseActivityEventList` array. + +#### 4.2.1 Nested Format - Successful Query Event +```json +{ + "type": "DatabaseActivityMonitoringRecord", + "clusterId": "cluster-abc123", + "instanceId": "db-INSTANCE123", + "databaseActivityEventList": [ + { + "type": "record", + "class": "READ", + "command": "SELECT", + "commandText": "SELECT * FROM users WHERE id = 1;", + "databaseName": "mydb", + "dbProtocol": "POSTGRESQL", + "dbUserName": "postgres", + "exitCode": 0, + "logTime": "2023-11-10T10:15:30.123Z", + "remoteHost": "10.0.1.100", + "remotePort": 5432, + "serverHost": "172.31.30.159", + "serverType": "POSTGRESQL", + "sessionId": "session-456", + "clientApplication": "psql" + } + ] +} +``` + +#### 4.2.2 Nested Format - Login Failure Event +```json +{ + "type": "DatabaseActivityMonitoringRecord", + "clusterId": "cluster-abc123", + "instanceId": "db-INSTANCE123", + "databaseActivityEventList": [ + { + "type": "record", + "class": "LOGIN", + "command": "LOGIN FAILED", + "commandText": "Login attempt failed", + "databaseName": "postgres", + "dbProtocol": "POSTGRESQL", + "dbUserName": "baduser", + "errorMessage": "password authentication failed for user \"baduser\"", + "exitCode": 1, + "logTime": "2023-11-10T10:15:30.123Z", + "remoteHost": "10.0.1.100", + "remotePort": 5432, + "serverHost": "172.31.30.159", + "serverType": "POSTGRESQL", + "sessionId": "session-123", + "clientApplication": "psql" + } + ] +} +``` + +#### 4.2.3 Nested Format - DDL Statement +```json +{ + "type": "DatabaseActivityMonitoringRecord", + "clusterId": "cluster-abc123", + "instanceId": "db-INSTANCE123", + "databaseActivityEventList": [ + { + "type": "record", + "class": "SCHEMA", + "command": "CREATE", + "commandText": "CREATE TABLE users (id serial PRIMARY KEY, name varchar(100));", + "databaseName": "mydb", + "dbProtocol": "POSTGRESQL", + "dbUserName": "postgres", + "exitCode": 0, + "logTime": "2023-11-10T10:15:30.123Z", + "remoteHost": "10.0.1.100", + "remotePort": 5432, + "serverHost": "172.31.30.159", + "serverType": "POSTGRESQL", + "sessionId": "session-789", + "objectName": "users", + "objectType": "TABLE", + "clientApplication": "psql" + } + ] +} +``` + +### 4.3 Field Mapping + +The parser automatically detects the format (flat or nested) and extracts events accordingly. For nested format, the parser processes the first event in the `databaseActivityEventList` array. + +**Key differences between formats:** +- **Flat format**: Uses `statementText` for SQL commands +- **Nested format**: Uses `commandText` for SQL commands (parser handles both) +- **Nested format**: Includes additional fields like `class`, `command`, `objectName`, `objectType` +- **Nested format**: Parent-level fields (`instanceId`, `clusterId`) are preserved + +**Error detection:** The parser determines if an event is an error based on the presence of a non-empty `errorMessage` field, not the `exitCode` value. + +## 5. Supported Fields + +The plugin extracts and maps the following fields from DSQL audit logs: + +### 5.1 Common Fields (Both Formats) + +| DSQL Field | Guardium Field | Description | +|------------|----------------|-------------| +| databaseName | dbName | Database name | +| dbUserName | accessor.dbUser | Database user | +| remoteHost | sessionLocator.clientIp | Client IP address | +| remotePort | sessionLocator.clientPort | Client port | +| sessionId | sessionId | Session identifier | +| statementText / commandText | data.originalSqlCommand | SQL statement (flat format uses `statementText`, nested format uses `commandText`) | +| exitCode | - | Exit code (0=success, non-zero=error) | +| errorMessage | exception.description | Error description | +| logTime | time.timestamp | Event timestamp | +| clientApplication | accessor.sourceProgram | Client application name | + +### 5.2 Nested Format Additional Fields + +These fields are only present in the nested `DatabaseActivityMonitoringRecord` format and are used by the parser for processing: + +| DSQL Field | Guardium Field | Description | Usage | +|------------|----------------|-------------|-------| +| class | - | Event class (e.g., LOGIN, READ, WRITE, SCHEMA, TABLE) | Used to detect LOGIN failure events | +| command | - | Command type (e.g., SELECT, INSERT, CREATE, ALTER, LOGIN FAILED) | Used to detect LOGIN failure events and error conditions | +| clusterId | - | Cluster identifier (parent-level field) | Preserved from parent level for context | +| instanceId | - | Instance identifier (parent-level field) | Preserved from parent level for context | + +## 6. Limitations + +- The DSQL plug-in does not support IPv6 +- Client hostname and OS user fields are not available in DSQL audit logs and are set as empty +- The plugin uses PostgreSQL as the server type and protocol since DSQL is PostgreSQL-compatible + +## 7. Troubleshooting + +### 7.1 No Events Received + +**Problem**: Logstash is not receiving events from SQS. + +**Solution**: +- Verify SQS queue name and region are correct +- Check AWS credentials have proper permissions +- Ensure Database Activity Streams are enabled on DSQL cluster +- Verify the Kinesis-to-SQS forwarding is configured correctly + +### 7.2 Parsing Errors + +**Problem**: Events are received but not parsed correctly. + +**Solution**: +- Check the Logstash logs for parsing errors +- Verify the JSON format matches the expected DSQL audit log format +- Ensure the `codec => "json"` is set in the SQS input configuration + +### 7.3 Missing Fields + +**Problem**: Some fields are not appearing in Guardium reports. + +**Solution**: +- Verify all required fields are present in the DSQL audit logs +- Check that the filter configuration includes all field mappings +- Review the Constants.java file for field name mappings + +## 8. Additional Resources + +- [AWS DSQL Documentation](https://docs.aws.amazon.com/dsql/) +- [AWS RDS Database Activity Streams](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/DBActivityStreams.html) +- [Guardium Universal Connector Documentation](https://github.com/IBM/universal-connectors) +- [Logstash SQS Input Plugin](https://www.elastic.co/guide/en/logstash/current/plugins-inputs-sqs.html) + +## 9. License + +This plugin is licensed under the Apache License 2.0. See the [LICENSE](./LICENSE) file for details. diff --git a/filter-plugin/logstash-filter-dsql-guardium/VERSION b/filter-plugin/logstash-filter-dsql-guardium/VERSION new file mode 100644 index 000000000..afaf360d3 --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/VERSION @@ -0,0 +1 @@ +1.0.0 \ No newline at end of file diff --git a/filter-plugin/logstash-filter-dsql-guardium/build.gradle b/filter-plugin/logstash-filter-dsql-guardium/build.gradle new file mode 100644 index 000000000..63d51ddf3 --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/build.gradle @@ -0,0 +1,193 @@ +import java.nio.file.Files +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING + +apply plugin: 'java' + +buildscript { + repositories { + maven { + url "https://plugins.gradle.org/m2/" + } + mavenCentral() + jcenter() + } + + dependencies { + classpath 'com.github.johnrengelman.shadow:com.github.johnrengelman.shadow.gradle.plugin:8.1.1' + classpath "org.barfuin.gradle.jacocolog:gradle-jacoco-log:4.0.1" + classpath group: 'org.yaml', name: 'snakeyaml', version: '2.2' + } +} + +def universalConnectorsDir=project.projectDir.parentFile?.parentFile.toString(); +def versions = new org.yaml.snakeyaml.Yaml().load( new File("${universalConnectorsDir}/versions.yml").newInputStream() ) +gradle.ext.versions = new org.yaml.snakeyaml.Yaml().load( new File(LOGSTASH_CORE_PATH + "/../versions.yml").newInputStream() ) + +apply from: LOGSTASH_CORE_PATH + "/../rubyUtils.gradle" + +apply plugin: 'jacoco' +apply plugin: 'com.github.johnrengelman.shadow' + +// =========================================================================== +group = 'com.ibm.guardium.dsql' +version = file("VERSION").text.trim() +description = "DSQL-Guardium filter plugin" + +// =========================================================================== +pluginInfo.licenses = ['Apache-2.0'] +pluginInfo.longDescription = "This gem is a Logstash DSQL filter plugin required to be installed as part of IBM Security Guardium, Guardium Universal connector configuration. This gem is not a stand-alone program." +pluginInfo.authors = ['IBM'] +pluginInfo.email = [''] +pluginInfo.homepage = "https://github.com/IBM/universal-connectors" +pluginInfo.pluginType = "filter" +pluginInfo.pluginClass = "DSQLGuardiumPluginFilter" +pluginInfo.pluginName = "dsql_guardium_plugin_filter" + +sourceCompatibility = JavaVersion.VERSION_17 +targetCompatibility = JavaVersion.VERSION_17 + +def jacocoVersion = '0.8.4' +def minimumCoverageStr = System.getenv("MINIMUM_COVERAGE") ?: "50.0%" +if (minimumCoverageStr.endsWith("%")) { + minimumCoverageStr = minimumCoverageStr[0..-2] +} +def minimumCoverage = Float.valueOf(minimumCoverageStr) / 100 + +buildscript { + repositories { + maven { url "https://plugins.gradle.org/m2/" } + mavenCentral() + jcenter() + } + dependencies { + classpath 'com.github.johnrengelman.shadow:com.github.johnrengelman.shadow.gradle.plugin:8.1.1' + classpath "org.barfuin.gradle.jacocolog:gradle-jacoco-log:4.0.1" + } +} + +repositories { + mavenCentral() +} + +dependencies { + implementation 'commons-validator:commons-validator:1.7' + implementation 'org.apache.logging.log4j:log4j-core:2.17.1' + implementation 'org.apache.commons:commons-lang3:3.7' + implementation 'com.google.code.gson:gson:2.8.9' + implementation 'commons-beanutils:commons-beanutils:1.11.0' + implementation fileTree(dir: LOGSTASH_CORE_PATH, include: "build/libs/logstash-core.jar") + implementation fileTree(dir: GUARDIUM_UNIVERSALCONNECTOR_COMMONS_PATH, include: "common-*.*.*.jar") + + // ✅ JUnit 4 only + testImplementation 'junit:junit:4.13.2' + testImplementation 'org.mockito:mockito-core:5.17.0' + testImplementation 'org.jruby:jruby-complete:9.2.7.0' + testImplementation fileTree(dir: GUARDIUM_UNIVERSALCONNECTOR_COMMONS_PATH, include: "common-*.*.*.jar") +} + +test { + useJUnit() // ✅ Make sure JUnit 4 is used explicitly + testLogging { + events "passed", "skipped", "failed" + } +} + +tasks.register("vendor") { + dependsOn shadowJar + doLast { + String vendorPathPrefix = "vendor/jar-dependencies" + String projectGroupPath = project.group.replaceAll('\\.', '/') + File projectJarFile = file("${vendorPathPrefix}/${projectGroupPath}/${pluginInfo.pluginFullName()}/${project.version}/${pluginInfo.pluginFullName()}-${project.version}.jar") + projectJarFile.mkdirs() + Files.copy(file("$buildDir/libs/${project.name}-${project.version}.jar").toPath(), projectJarFile.toPath(), REPLACE_EXISTING) + validatePluginJar(projectJarFile, project.group) + } +} + +shadowJar { + archiveClassifier = null +} + +clean { + delete "${projectDir}/Gemfile" + delete "${projectDir}/${pluginInfo.pluginFullName()}.gemspec" + delete "${projectDir}/lib/" + delete "${projectDir}/vendor/" + new FileNameFinder().getFileNames(projectDir.toString(), "${pluginInfo.pluginFullName()}-*.*.*.gem").each { + delete it + } +} + +tasks.withType(JavaCompile) { + options.encoding = 'UTF-8' +} + +tasks.register("generateRubySupportFiles") { + doLast { + generateRubySupportFilesForPlugin(project.description, project.group, version) + } +} + +tasks.register("removeObsoleteJars") { + doLast { + new FileNameFinder().getFileNames( + projectDir.toString(), + "vendor/**/${pluginInfo.pluginFullName()}*.jar", + "vendor/**/${pluginInfo.pluginFullName()}-${version}.jar" + ).each { delete it } + } +} + +tasks.register("gem") { + dependsOn = [downloadAndInstallJRuby, removeObsoleteJars, vendor, generateRubySupportFiles] + doLast { + buildGem(projectDir, buildDir, "${pluginInfo.pluginFullName()}.gemspec") + } +} + +// ✅ JaCoCo Setup +jacoco { + toolVersion = jacocoVersion +} + +jacocoTestReport { + reports { + html.required = true + xml.required = true + csv.required = true + html.destination file("${buildDir}/reports/jacoco") + csv.destination file("${buildDir}/reports/jacoco/all.csv") + } + executionData.from fileTree(dir: "${buildDir}/jacoco/", includes: ['**/*.exec']) + + afterEvaluate { + classDirectories.setFrom(files(classDirectories.files.collect { + fileTree(dir: it, exclude: []) + })) + } + + doLast { + println "Report -> file://${buildDir}/reports/jacoco/index.html" + } +} + +jacocoTestCoverageVerification { + violationRules { + rule { + limit { + minimum = minimumCoverage + } + } + } + + executionData.from fileTree(dir: "${buildDir}/jacoco/", includes: ['**/*.exec']) + + afterEvaluate { + classDirectories.setFrom(files(classDirectories.files.collect { + fileTree(dir: it, exclude: []) + })) + } +} + +test.finalizedBy jacocoTestReport +check.dependsOn jacocoTestCoverageVerification, jacocoTestReport \ No newline at end of file diff --git a/filter-plugin/logstash-filter-dsql-guardium/gradle/wrapper/gradle-wrapper.jar b/filter-plugin/logstash-filter-dsql-guardium/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 000000000..7454180f2 Binary files /dev/null and b/filter-plugin/logstash-filter-dsql-guardium/gradle/wrapper/gradle-wrapper.jar differ diff --git a/filter-plugin/logstash-filter-dsql-guardium/gradle/wrapper/gradle-wrapper.properties b/filter-plugin/logstash-filter-dsql-guardium/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 000000000..81aa1c044 --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,5 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.11.1-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/filter-plugin/logstash-filter-dsql-guardium/gradlew b/filter-plugin/logstash-filter-dsql-guardium/gradlew new file mode 100755 index 000000000..cccdd3d51 --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/gradlew @@ -0,0 +1,172 @@ +#!/usr/bin/env sh + +############################################################################## +## +## Gradle start up script for UN*X +## +############################################################################## + +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS="" + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn () { + echo "$*" +} + +die () { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MINGW* ) + msys=true + ;; + NONSTOP* ) + nonstop=true + ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD="java" + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then + MAX_FD_LIMIT=`ulimit -H -n` + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n $MAX_FD + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" + fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi +fi + +# For Darwin, add options to specify how the application appears in the dock +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" +fi + +# For Cygwin, switch paths to Windows format before running java +if $cygwin ; then + APP_HOME=`cygpath --path --mixed "$APP_HOME"` + CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` + + # We build the pattern for arguments to be converted via cygpath + ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` + SEP="" + for dir in $ROOTDIRSRAW ; do + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" + done + OURCYGPATTERN="(^($ROOTDIRS))" + # Add a user-defined pattern to the cygpath arguments + if [ "$GRADLE_CYGPATTERN" != "" ] ; then + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + fi + # Now convert the arguments - kludge to limit ourselves to /bin/sh + i=0 + for arg in "$@" ; do + CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` + CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option + + if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition + eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` + else + eval `echo args$i`="\"$arg\"" + fi + i=$((i+1)) + done + case $i in + (0) set -- ;; + (1) set -- "$args0" ;; + (2) set -- "$args0" "$args1" ;; + (3) set -- "$args0" "$args1" "$args2" ;; + (4) set -- "$args0" "$args1" "$args2" "$args3" ;; + (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + esac +fi + +# Escape application args +save () { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " +} +APP_ARGS=$(save "$@") + +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" + +# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong +if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then + cd "$(dirname "$0")" +fi + +exec "$JAVACMD" "$@" diff --git a/filter-plugin/logstash-filter-dsql-guardium/gradlew.bat b/filter-plugin/logstash-filter-dsql-guardium/gradlew.bat new file mode 100755 index 000000000..f9553162f --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/gradlew.bat @@ -0,0 +1,84 @@ +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS= + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto init + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto init + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:init +@rem Get command-line arguments, handling Windows variants + +if not "%OS%" == "Windows_NT" goto win9xME_args + +:win9xME_args +@rem Slurp the command line arguments. +set CMD_LINE_ARGS= +set _SKIP=2 + +:win9xME_args_slurp +if "x%~1" == "x" goto execute + +set CMD_LINE_ARGS=%* + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/filter-plugin/logstash-filter-dsql-guardium/src/main/java/com/ibm/guardium/dsql/Constants.java b/filter-plugin/logstash-filter-dsql-guardium/src/main/java/com/ibm/guardium/dsql/Constants.java new file mode 100644 index 000000000..6e498493d --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/src/main/java/com/ibm/guardium/dsql/Constants.java @@ -0,0 +1,96 @@ +/* +#Copyright 2020-2021 IBM Inc. All rights reserved +#SPDX-License-Identifier: Apache-2.0 +#*/ +package com.ibm.guardium.dsql; + +public interface Constants { + + public static final String TYPE = "type"; + + public static final String CLASS = "class"; + + public static final String CLIENT_APPLICATION = "clientApplication"; + + public static final String DATABASE_NAME = "databaseName"; + + public static final String DB_PROTOCOL = "dbProtocol"; + + public static final String DB_USER_NAME = "dbUserName"; + + public static final String ERROR_MESSAGE = "errorMessage"; + + public static final String EXIT_CODE = "exitCode"; + + public static final String REMOTE_HOST = "remoteHost"; + + public static final String REMOTE_PORT = "remotePort"; + + public static final String SERVER_HOST = "serverHost"; + + public static final String SESSION_ID = "sessionId"; + + public static final String START_TIME = "startTime"; + + public static final String STATEMENT_TEXT = "statementText"; + + public static final String COMMAND_TAG = "commandTag"; + + public static final String PARAMETER_LIST = "parameterList"; + + public static final String LOG_TIME = "logTime"; + + public static final String ENGINE_NATIVE_AUDIT_FIELDS = "engineNativeAuditFields"; + + public static final String DEFAULT_IP = "0.0.0.0"; + + public static final int DEFAULT_PORT = -1; + + public static final String UNKNOWN_STRING = ""; + + public static final String SERVER_TYPE_STRING = "POSTGRESQL"; + + public static final String DATA_PROTOCOL_STRING = "POSTGRESQL"; + + public static final String LANGUAGE = "PGRS"; + + public static final String SQL_ERROR = "SQL_ERROR"; + + public static final String LOGIN_FAILED = "LOGIN_FAILED"; + + public static final String COMM_PROTOCOL = "AWSApiCall"; + + public static final String APP_USER_NAME = "AWSService"; + + public static final String NA = "N.A."; + + public static final String ACCOUNT_ID = "account_id"; + + public static final String INSTANCE_NAME = "instance_name"; + + public static final String SERVER_HOST_NAME = "server_hostname"; + + public static final String MESSAGE = "message"; + + public static final String RECORDS = "Records"; + + public static final String BODY = "body"; + + public static final String LOG_EVENTS = "logEvents"; + + public static final String EXIT_CODE_SUCCESS = "0"; + + // Constants for nested DatabaseActivityMonitoringRecord format + public static final String DATABASE_ACTIVITY_EVENT_LIST = "databaseActivityEventList"; + + public static final String COMMAND_TEXT = "commandText"; + + public static final String COMMAND = "command"; + + public static final String CLUSTER_ID = "clusterId"; + + public static final String INSTANCE_ID = "instanceId"; + + public static final String DB_ACTIVITY_MONITORING_RECORD = "DatabaseActivityMonitoringRecord"; + +} \ No newline at end of file diff --git a/filter-plugin/logstash-filter-dsql-guardium/src/main/java/com/ibm/guardium/dsql/DSQLGuardiumPluginFilter.java b/filter-plugin/logstash-filter-dsql-guardium/src/main/java/com/ibm/guardium/dsql/DSQLGuardiumPluginFilter.java new file mode 100644 index 000000000..85b2b086c --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/src/main/java/com/ibm/guardium/dsql/DSQLGuardiumPluginFilter.java @@ -0,0 +1,111 @@ +/* +#Copyright 2020-2021 IBM Inc. All rights reserved +#SPDX-License-Identifier: Apache-2.0 +#*/ +package com.ibm.guardium.dsql; + +import java.io.File; +import java.util.Collection; +import java.util.Collections; + + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LoggerContext; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.ibm.guardium.universalconnector.commons.GuardConstants; +import com.ibm.guardium.universalconnector.commons.structures.Record; + +import co.elastic.logstash.api.Configuration; +import co.elastic.logstash.api.Context; +import co.elastic.logstash.api.Event; +import co.elastic.logstash.api.Filter; +import co.elastic.logstash.api.FilterMatchListener; +import co.elastic.logstash.api.LogstashPlugin; +import co.elastic.logstash.api.PluginConfigSpec; + +//class name must match plugin name +@LogstashPlugin(name = "dsql_guardium_plugin_filter") + +public class DSQLGuardiumPluginFilter implements Filter { + + public static final String LOG42_CONF = "log4j2uc.properties"; + + static { + try { + String uc_etc = System.getenv("UC_ETC"); + + LoggerContext context = (LoggerContext) LogManager.getContext(false); + + File file = new File(uc_etc + File.separator + LOG42_CONF); + + context.setConfigLocation(file.toURI()); + + } catch (Exception e) { + + System.err.println("Failed to load log4j configuration " + e.getMessage()); + + e.printStackTrace(); + } + } + + private String id; + + public static final PluginConfigSpec SOURCE_CONFIG = PluginConfigSpec.stringSetting("source", "message"); + + private static Logger log = LogManager.getLogger(DSQLGuardiumPluginFilter.class); + + public DSQLGuardiumPluginFilter(String id, Configuration config, Context context) { + + // constructors should validate configuration options + + this.id = id; + } + + @Override + + public Collection filter(Collection events, FilterMatchListener matchListener) { + + for (Event e : events) { + if(null != e && null != e.getData()){ + try { + log.debug("Event Now: {} " + e.getData()); + + Record record = Parser.parseRecord(e); + + final GsonBuilder builder = new GsonBuilder(); + + builder.serializeNulls(); + + final Gson gson = builder.create(); + + e.setField(GuardConstants.GUARDIUM_RECORD_FIELD_NAME, gson.toJson(record)); + + matchListener.filterMatched(e); + + } catch (Exception exception) { + log.error("Failed to parse DSQL event: {}", exception.getMessage(), exception); + log.warn("Failed event data: {}" , e.getData()); + } + } + + } + return events; + } + + @Override + + public Collection> configSchema() { + + // should return a list of all configuration options for this plugin + + return Collections.singletonList(SOURCE_CONFIG); + } + + @Override + public String getId() { + return this.id; + } +} \ No newline at end of file diff --git a/filter-plugin/logstash-filter-dsql-guardium/src/main/java/com/ibm/guardium/dsql/Parser.java b/filter-plugin/logstash-filter-dsql-guardium/src/main/java/com/ibm/guardium/dsql/Parser.java new file mode 100644 index 000000000..328c25f14 --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/src/main/java/com/ibm/guardium/dsql/Parser.java @@ -0,0 +1,529 @@ +/* +#Copyright 2020-2021 IBM Inc. All rights reserved +#SPDX-License-Identifier: Apache-2.0 +#*/ +package com.ibm.guardium.dsql; + +import co.elastic.logstash.api.Event; +import com.ibm.guardium.universalconnector.commons.structures.Accessor; +import com.ibm.guardium.universalconnector.commons.structures.Data; +import com.ibm.guardium.universalconnector.commons.structures.ExceptionRecord; +import com.ibm.guardium.universalconnector.commons.structures.Record; +import com.ibm.guardium.universalconnector.commons.structures.SessionLocator; +import com.ibm.guardium.universalconnector.commons.structures.Time; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.text.ParseException; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class Parser { + + private static Logger log = LogManager.getLogger(Parser.class); + + /** + * Parse the DSQL Database Activity Stream event into a Guardium Record + * The event is expected to have already been parsed by Logstash's json filter + * Supports both flat format and nested DatabaseActivityMonitoringRecord format + * + * @param event The Logstash event containing parsed DSQL audit log data + * @return Record object containing parsed audit information + * @throws ParseException if parsing fails + */ + public static Record parseRecord(final Event event) throws ParseException { + Record record = new Record(); + + try { + // The message should already be parsed as a Map by Logstash's json filter + // Access the parsed fields directly from the event + Map eventData = event.getData(); + + if (eventData == null || eventData.isEmpty()) { + throw new ParseException("Event data is null or empty", 0); + } + + // Check if this is a nested DatabaseActivityMonitoringRecord format + Event processedEvent = event; + if (isNestedFormat(event)) { + log.debug("Detected nested DatabaseActivityMonitoringRecord format"); + processedEvent = extractNestedEvent(event); + if (processedEvent == null) { + throw new ParseException("Failed to extract nested event from DatabaseActivityMonitoringRecord", 0); + } + } + + // Set basic record fields + record.setAppUserName(Constants.APP_USER_NAME); + record.setAccessor(parseAccessor(processedEvent)); + record.setSessionLocator(parseSessionLocator(processedEvent)); + record.setDbName(parseDbName(processedEvent)); + record.setTime(parseTimestamp(processedEvent)); + + // Set session ID + Object sessionIdObj = getFieldValue(processedEvent, Constants.SESSION_ID); + if (sessionIdObj != null) { + record.setSessionId(sessionIdObj.toString()); + } else { + record.setSessionId(Constants.UNKNOWN_STRING); + } + + // Parse SQL statement or exception based on error indicators + Object errorMessageObj = getFieldValue(processedEvent, Constants.ERROR_MESSAGE); + boolean hasErrorMessage = errorMessageObj != null && + !errorMessageObj.toString().isEmpty() && + !errorMessageObj.toString().equalsIgnoreCase("null"); + + // Check for login failure indicators + Object commandObj = getFieldValue(processedEvent, Constants.COMMAND); + Object classObj = getFieldValue(processedEvent, Constants.CLASS); + String command = commandObj != null ? commandObj.toString() : ""; + String eventClass = classObj != null ? classObj.toString() : ""; + + boolean isLoginFailure = command.toUpperCase().contains("LOGIN FAILED") || + (eventClass.equalsIgnoreCase("LOGIN") && + command.toUpperCase().contains("FAILED")); + + // Check commandText for login failure indicators if errorMessage is null + Object commandTextObj = getFieldValue(processedEvent, Constants.COMMAND_TEXT); + String commandText = commandTextObj != null ? commandTextObj.toString() : ""; + boolean hasLoginFailureInText = commandText.toLowerCase().contains("login failed") || + commandText.toLowerCase().contains("authentication failed"); + + if (hasErrorMessage || isLoginFailure || hasLoginFailureInText) { + // This is an error/exception + record.setException(parseException(processedEvent)); + } else { + // This is a successful SQL statement + Object statementText = getStatementText(processedEvent); + if (statementText != null && !statementText.toString().isEmpty()) { + record.setData(parseData(processedEvent)); + } + } + + } catch (Exception e) { + log.error("Error parsing DSQL record: {}", e.getMessage(), e); + throw new ParseException("Failed to parse DSQL record: " + e.getMessage(), 0); + } + + return record; + } + + /** + * Check if the event is in nested DatabaseActivityMonitoringRecord format + */ + private static boolean isNestedFormat(Event event) { + Object typeObj = event.getField(Constants.TYPE); + Object eventListObj = event.getField(Constants.DATABASE_ACTIVITY_EVENT_LIST); + + return typeObj != null && + typeObj.toString().equals(Constants.DB_ACTIVITY_MONITORING_RECORD) && + eventListObj != null; + } + + /** + * Extract the first event from the nested databaseActivityEventList array + * Creates a new Event object with flattened fields for easier processing + */ + private static Event extractNestedEvent(Event parentEvent) { + try { + Object eventListObj = parentEvent.getField(Constants.DATABASE_ACTIVITY_EVENT_LIST); + + if (eventListObj instanceof List) { + List eventList = (List) eventListObj; + + if (!eventList.isEmpty() && eventList.get(0) instanceof Map) { + @SuppressWarnings("unchecked") + Map nestedEventData = (Map) eventList.get(0); + + // Create a new event with the nested data + Event flatEvent = new org.logstash.Event(); + + // Copy all fields from nested event + for (Map.Entry entry : nestedEventData.entrySet()) { + flatEvent.setField(entry.getKey(), entry.getValue()); + } + + // Also preserve parent-level fields that might be needed + Object instanceId = parentEvent.getField(Constants.INSTANCE_ID); + if (instanceId != null) { + flatEvent.setField(Constants.INSTANCE_ID, instanceId); + } + + Object clusterId = parentEvent.getField(Constants.CLUSTER_ID); + if (clusterId != null) { + flatEvent.setField(Constants.CLUSTER_ID, clusterId); + } + + // Preserve account_id and instance_name if they exist at parent level + Object accountId = parentEvent.getField(Constants.ACCOUNT_ID); + if (accountId != null) { + flatEvent.setField(Constants.ACCOUNT_ID, accountId); + } + + Object instanceName = parentEvent.getField(Constants.INSTANCE_NAME); + if (instanceName != null) { + flatEvent.setField(Constants.INSTANCE_NAME, instanceName); + } + + return flatEvent; + } + } + } catch (Exception e) { + log.error("Error extracting nested event: {}", e.getMessage(), e); + } + + return null; + } + + /** + * Get field value, checking both statementText and commandText fields + */ + private static Object getStatementText(Event event) { + Object statementText = event.getField(Constants.STATEMENT_TEXT); + if (statementText == null || statementText.toString().isEmpty()) { + statementText = event.getField(Constants.COMMAND_TEXT); + } + return statementText; + } + + /** + * Get field value with fallback support for nested format field names + */ + private static Object getFieldValue(Event event, String fieldName) { + return event.getField(fieldName); + } + + /** + * Parse accessor information from the audit event + */ + private static Accessor parseAccessor(Event event) { + Accessor accessor = new Accessor(); + + // Set database user + Object dbUserObj = event.getField(Constants.DB_USER_NAME); + if (dbUserObj != null && !dbUserObj.toString().isEmpty()) { + accessor.setDbUser(dbUserObj.toString()); + } else { + accessor.setDbUser(Constants.UNKNOWN_STRING); + } + + // Set server type and protocol + accessor.setServerType(Constants.SERVER_TYPE_STRING); + accessor.setDbProtocol(Constants.DATA_PROTOCOL_STRING); + accessor.setLanguage(Constants.LANGUAGE); + accessor.setCommProtocol(Constants.COMM_PROTOCOL); + + // Set server hostname + String accountId = getAccountId(event); + String instanceName = getInstanceName(event); + String serverHostName = accountId + ":" + instanceName; + accessor.setServerHostName(serverHostName); + + // Set service name (same as dbName) + String dbName = parseDbName(event); + accessor.setServiceName(dbName); + + // Set client application if available + Object clientAppObj = event.getField(Constants.CLIENT_APPLICATION); + if (clientAppObj != null && !clientAppObj.toString().isEmpty()) { + accessor.setSourceProgram(clientAppObj.toString()); + } else { + accessor.setSourceProgram(Constants.UNKNOWN_STRING); + } + + // Set other fields + accessor.setServerOs(Constants.UNKNOWN_STRING); + accessor.setClientOs(Constants.UNKNOWN_STRING); + accessor.setClientHostName(Constants.UNKNOWN_STRING); + accessor.setDbProtocolVersion(Constants.UNKNOWN_STRING); + accessor.setOsUser(Constants.UNKNOWN_STRING); + accessor.setClient_mac(Constants.UNKNOWN_STRING); + accessor.setServerDescription(Constants.UNKNOWN_STRING); + accessor.setDataType(Accessor.DATA_TYPE_GUARDIUM_SHOULD_PARSE_SQL); + + return accessor; + } + + /** + * Parse session locator information + */ + private static SessionLocator parseSessionLocator(Event event) { + SessionLocator sessionLocator = new SessionLocator(); + + // Parse client IP and port + String clientIp = Constants.DEFAULT_IP; + int clientPort = Constants.DEFAULT_PORT; + + Object remoteHostObj = event.getField(Constants.REMOTE_HOST); + if (remoteHostObj != null && !remoteHostObj.toString().isEmpty()) { + clientIp = remoteHostObj.toString(); + } + + Object remotePortObj = event.getField(Constants.REMOTE_PORT); + if (remotePortObj != null) { + try { + clientPort = Integer.parseInt(remotePortObj.toString()); + } catch (NumberFormatException e) { + log.warn("Failed to parse remote port: {}", remotePortObj); + } + } + + sessionLocator.setClientIp(clientIp); + sessionLocator.setClientPort(clientPort); + + // Set server IP (using account:instance:dbname format) + String accountId = getAccountId(event); + String instanceName = getInstanceName(event); + + Object dbNameObj = event.getField(Constants.DATABASE_NAME); + String dbName = dbNameObj != null ? dbNameObj.toString() : Constants.UNKNOWN_STRING; + + String serverIp = accountId + ":" + instanceName + ":" + dbName; + sessionLocator.setServerIp(serverIp); + sessionLocator.setServerPort(Constants.DEFAULT_PORT); + + // Set IPv6 fields + sessionLocator.setIpv6(false); + sessionLocator.setClientIpv6(Constants.UNKNOWN_STRING); + sessionLocator.setServerIpv6(Constants.UNKNOWN_STRING); + + return sessionLocator; + } + + /** + * Parse database name + */ + private static String parseDbName(Event event) { + String accountId = getAccountId(event); + String instanceName = getInstanceName(event); + String dbName = Constants.UNKNOWN_STRING; + + Object dbNameObj = event.getField(Constants.DATABASE_NAME); + if (dbNameObj != null && !dbNameObj.toString().isEmpty()) { + dbName = dbNameObj.toString(); + } + + if (dbName.isEmpty() || dbName.equals(Constants.UNKNOWN_STRING)) { + return Constants.NA; + } + + return accountId + ":" + instanceName + ":" + dbName; + } + + /** + * Parse timestamp from the audit event + * Supports multiple timestamp formats including ISO 8601 and SQL Server format + */ + private static Time parseTimestamp(Event event) { + long millis = 0; + + try { + // Try logTime first, then startTime + Object timeField = event.getField(Constants.LOG_TIME); + + if (timeField == null) { + timeField = event.getField(Constants.START_TIME); + } + + if (timeField != null) { + String dateString = timeField.toString(); + + // Try multiple timestamp formats + millis = parseTimestampString(dateString); + } + } catch (Exception e) { + log.error("Failed to parse timestamp: {}", e.getMessage(), e); + } + + return new Time(millis, 0, 0); + } + + /** + * Parse timestamp string with support for multiple formats + * Supports: + * - ISO 8601: 2022-10-06T21:34:42.711Z + * - SQL Server format: 2022-10-06 21:44:38.4120677+00 + * - Other common formats + */ + private static long parseTimestampString(String dateString) { + if (dateString == null || dateString.isEmpty()) { + return 0; + } + + try { + // First, try ISO 8601 format (most common) + try { + ZonedDateTime parsedTime = ZonedDateTime.parse(dateString, DateTimeFormatter.ISO_DATE_TIME); + return parsedTime.toInstant().toEpochMilli(); + } catch (Exception e) { + // Not ISO 8601, try other formats + } + + // Try SQL Server format: "2022-10-06 21:44:38.4120677+00" + // Replace space with 'T' and normalize timezone + String normalizedDate = dateString; + + // If it has a space instead of 'T', replace it + if (normalizedDate.contains(" ") && !normalizedDate.contains("T")) { + normalizedDate = normalizedDate.replace(" ", "T"); + } + + // Handle timezone format: +00 -> +00:00 + if (normalizedDate.matches(".*[+-]\\d{2}$")) { + normalizedDate = normalizedDate + ":00"; + } + + // Try parsing the normalized format + try { + ZonedDateTime parsedTime = ZonedDateTime.parse(normalizedDate, DateTimeFormatter.ISO_DATE_TIME); + return parsedTime.toInstant().toEpochMilli(); + } catch (Exception e) { + // Still failed, try with offset format + } + + // Try with explicit offset format + try { + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSXXX"); + ZonedDateTime parsedTime = ZonedDateTime.parse(normalizedDate, formatter); + return parsedTime.toInstant().toEpochMilli(); + } catch (Exception e) { + // Try without microseconds + } + + // Try simpler format without microseconds + try { + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); + ZonedDateTime parsedTime = ZonedDateTime.parse(normalizedDate, formatter); + return parsedTime.toInstant().toEpochMilli(); + } catch (Exception e) { + log.warn("Could not parse timestamp with any known format: {}", dateString); + } + + } catch (Exception e) { + log.error("Unexpected error parsing timestamp: {}", e.getMessage(), e); + } + + return 0; + } + + /** + * Parse SQL data from successful statements + * Supports both statementText and commandText fields + */ + private static Data parseData(Event event) { + Data data = new Data(); + + // Try statementText first, then commandText (for nested format) + Object statementTextObj = getStatementText(event); + if (statementTextObj != null && !statementTextObj.toString().isEmpty()) { + String sqlText = statementTextObj.toString(); + data.setOriginalSqlCommand(sqlText); + } else { + data.setOriginalSqlCommand(Constants.UNKNOWN_STRING); + } + + return data; + } + + /** + * Parse exception information from failed statements + * Supports both statementText and commandText fields + */ + private static ExceptionRecord parseException(Event event) { + ExceptionRecord exception = new ExceptionRecord(); + + // Get error message - if null, try to get from commandText + Object errorMessageObj = event.getField(Constants.ERROR_MESSAGE); + String errorMessage = null; + + if (errorMessageObj != null && !errorMessageObj.toString().isEmpty() && + !errorMessageObj.toString().equalsIgnoreCase("null")) { + errorMessage = errorMessageObj.toString(); + } else { + // If errorMessage is null, use commandText as error description + Object commandTextObj = getStatementText(event); + if (commandTextObj != null && !commandTextObj.toString().isEmpty()) { + errorMessage = commandTextObj.toString(); + } else { + errorMessage = Constants.UNKNOWN_STRING; + } + } + + // Check class field for LOGIN events (nested format) + Object classObj = event.getField(Constants.CLASS); + String eventClass = classObj != null ? classObj.toString() : ""; + + // Check command field for LOGIN FAILED + Object commandObj = event.getField(Constants.COMMAND); + String command = commandObj != null ? commandObj.toString() : ""; + + // Determine exception type based on error message, class, or command + if (errorMessage.toLowerCase().contains("authentication") || + errorMessage.toLowerCase().contains("login") || + errorMessage.toLowerCase().contains("password") || + eventClass.equalsIgnoreCase("LOGIN") || + command.toUpperCase().contains("LOGIN FAILED")) { + exception.setExceptionTypeId(Constants.LOGIN_FAILED); + } else { + exception.setExceptionTypeId(Constants.SQL_ERROR); + } + + exception.setDescription(errorMessage); + + // Set SQL string if available - try both field names + Object statementTextObj = getStatementText(event); + if (statementTextObj != null && !statementTextObj.toString().isEmpty()) { + exception.setSqlString(statementTextObj.toString()); + } else { + exception.setSqlString(Constants.UNKNOWN_STRING); + } + + return exception; + } + + /** + * Get account ID from event + */ + private static String getAccountId(Event event) { + String accountId = Constants.UNKNOWN_STRING; + + Object accountIdObj = event.getField(Constants.ACCOUNT_ID); + if (accountIdObj instanceof String) { + accountId = accountIdObj.toString(); + } else if (accountIdObj instanceof List) { + List rawList = (List) accountIdObj; + List arrayList = new ArrayList<>(rawList); + + if (!arrayList.isEmpty()) { + accountId = String.valueOf(arrayList.get(0)); + } + } + + return accountId; + } + + /** + * Get instance name from event + */ + private static String getInstanceName(Event event) { + String instanceName = Constants.UNKNOWN_STRING; + + Object instanceNameObj = event.getField(Constants.INSTANCE_NAME); + if (instanceNameObj instanceof String) { + instanceName = instanceNameObj.toString(); + } else if (instanceNameObj instanceof List) { + List rawList = (List) instanceNameObj; + List arrayList = new ArrayList<>(rawList); + + if (!arrayList.isEmpty()) { + instanceName = String.valueOf(arrayList.get(0)); + } + } + + return instanceName; + } +} \ No newline at end of file diff --git a/filter-plugin/logstash-filter-dsql-guardium/src/test/java/com/ibm/guardium/dsql/DSQLGuardiumPluginFilterTest.java b/filter-plugin/logstash-filter-dsql-guardium/src/test/java/com/ibm/guardium/dsql/DSQLGuardiumPluginFilterTest.java new file mode 100644 index 000000000..7db35d14c --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/src/test/java/com/ibm/guardium/dsql/DSQLGuardiumPluginFilterTest.java @@ -0,0 +1,219 @@ +package com.ibm.guardium.dsql; + +import co.elastic.logstash.api.Event; +import com.ibm.guardium.universalconnector.commons.structures.Record; +import org.junit.Test; + +import java.text.ParseException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +public class DSQLGuardiumPluginFilterTest { + + @Test + public void testParseRecordWithSuccessfulStatement() throws ParseException { + Event mockEvent = mock(Event.class); + + // Mock event data with DSQL audit log fields + Map eventData = new HashMap<>(); + eventData.put(Constants.TYPE, "record"); + eventData.put(Constants.DATABASE_NAME, "testdb"); + eventData.put(Constants.DB_USER_NAME, "testuser"); + eventData.put(Constants.REMOTE_HOST, "10.0.0.1"); + eventData.put(Constants.REMOTE_PORT, "5432"); + eventData.put(Constants.SESSION_ID, "session123"); + eventData.put(Constants.STATEMENT_TEXT, "SELECT * FROM users WHERE id = 1;"); + eventData.put(Constants.EXIT_CODE, "0"); + eventData.put(Constants.LOG_TIME, "2023-11-10T10:15:30Z"); + eventData.put(Constants.CLIENT_APPLICATION, "psql"); + + when(mockEvent.getData()).thenReturn(eventData); + when(mockEvent.getField(Constants.DATABASE_NAME)).thenReturn("testdb"); + when(mockEvent.getField(Constants.DB_USER_NAME)).thenReturn("testuser"); + when(mockEvent.getField(Constants.REMOTE_HOST)).thenReturn("10.0.0.1"); + when(mockEvent.getField(Constants.REMOTE_PORT)).thenReturn("5432"); + when(mockEvent.getField(Constants.SESSION_ID)).thenReturn("session123"); + when(mockEvent.getField(Constants.STATEMENT_TEXT)).thenReturn("SELECT * FROM users WHERE id = 1;"); + when(mockEvent.getField(Constants.EXIT_CODE)).thenReturn("0"); + when(mockEvent.getField(Constants.LOG_TIME)).thenReturn("2023-11-10T10:15:30Z"); + when(mockEvent.getField(Constants.CLIENT_APPLICATION)).thenReturn("psql"); + when(mockEvent.getField(Constants.ACCOUNT_ID)).thenReturn("123456789012"); + when(mockEvent.getField(Constants.INSTANCE_NAME)).thenReturn("dsql-cluster-1"); + + Record record = Parser.parseRecord(mockEvent); + + assertNotNull(record); + assertNotNull(record.getData()); + assertEquals("SELECT * FROM users WHERE id = 1;", record.getData().getOriginalSqlCommand()); + assertEquals("testuser", record.getAccessor().getDbUser()); + assertEquals("10.0.0.1", record.getSessionLocator().getClientIp()); + assertEquals(5432, record.getSessionLocator().getClientPort()); + assertEquals("session123", record.getSessionId()); + assertEquals(Constants.SERVER_TYPE_STRING, record.getAccessor().getServerType()); + assertEquals(Constants.DATA_PROTOCOL_STRING, record.getAccessor().getDbProtocol()); + assertEquals(Constants.LANGUAGE, record.getAccessor().getLanguage()); + assertEquals("psql", record.getAccessor().getSourceProgram()); + } + + @Test + public void testParseRecordWithError() throws ParseException { + Event mockEvent = mock(Event.class); + + Map eventData = new HashMap<>(); + eventData.put(Constants.TYPE, "record"); + eventData.put(Constants.DATABASE_NAME, "testdb"); + eventData.put(Constants.DB_USER_NAME, "testuser"); + eventData.put(Constants.REMOTE_HOST, "10.0.0.1"); + eventData.put(Constants.REMOTE_PORT, "5432"); + eventData.put(Constants.SESSION_ID, "session123"); + eventData.put(Constants.STATEMENT_TEXT, "SELECT * FROM nonexistent_table;"); + eventData.put(Constants.EXIT_CODE, "1"); + eventData.put(Constants.ERROR_MESSAGE, "relation \"nonexistent_table\" does not exist"); + eventData.put(Constants.LOG_TIME, "2023-11-10T10:15:30Z"); + + when(mockEvent.getData()).thenReturn(eventData); + when(mockEvent.getField(Constants.DATABASE_NAME)).thenReturn("testdb"); + when(mockEvent.getField(Constants.DB_USER_NAME)).thenReturn("testuser"); + when(mockEvent.getField(Constants.REMOTE_HOST)).thenReturn("10.0.0.1"); + when(mockEvent.getField(Constants.REMOTE_PORT)).thenReturn("5432"); + when(mockEvent.getField(Constants.SESSION_ID)).thenReturn("session123"); + when(mockEvent.getField(Constants.STATEMENT_TEXT)).thenReturn("SELECT * FROM nonexistent_table;"); + when(mockEvent.getField(Constants.EXIT_CODE)).thenReturn("1"); + when(mockEvent.getField(Constants.ERROR_MESSAGE)).thenReturn("relation \"nonexistent_table\" does not exist"); + when(mockEvent.getField(Constants.LOG_TIME)).thenReturn("2023-11-10T10:15:30Z"); + when(mockEvent.getField(Constants.ACCOUNT_ID)).thenReturn("123456789012"); + when(mockEvent.getField(Constants.INSTANCE_NAME)).thenReturn("dsql-cluster-1"); + + Record record = Parser.parseRecord(mockEvent); + + assertNotNull(record); + assertNotNull(record.getException()); + assertEquals(Constants.SQL_ERROR, record.getException().getExceptionTypeId()); + assertEquals("relation \"nonexistent_table\" does not exist", record.getException().getDescription()); + assertEquals("SELECT * FROM nonexistent_table;", record.getException().getSqlString()); + } + + @Test + public void testParseRecordWithLoginFailure() throws ParseException { + Event mockEvent = mock(Event.class); + + Map eventData = new HashMap<>(); + eventData.put(Constants.TYPE, "record"); + eventData.put(Constants.DATABASE_NAME, "testdb"); + eventData.put(Constants.DB_USER_NAME, "testuser"); + eventData.put(Constants.REMOTE_HOST, "10.0.0.1"); + eventData.put(Constants.REMOTE_PORT, "5432"); + eventData.put(Constants.SESSION_ID, "session123"); + eventData.put(Constants.EXIT_CODE, "1"); + eventData.put(Constants.ERROR_MESSAGE, "password authentication failed for user \"testuser\""); + eventData.put(Constants.LOG_TIME, "2023-11-10T10:15:30Z"); + + when(mockEvent.getData()).thenReturn(eventData); + when(mockEvent.getField(Constants.DATABASE_NAME)).thenReturn("testdb"); + when(mockEvent.getField(Constants.DB_USER_NAME)).thenReturn("testuser"); + when(mockEvent.getField(Constants.REMOTE_HOST)).thenReturn("10.0.0.1"); + when(mockEvent.getField(Constants.REMOTE_PORT)).thenReturn("5432"); + when(mockEvent.getField(Constants.SESSION_ID)).thenReturn("session123"); + when(mockEvent.getField(Constants.EXIT_CODE)).thenReturn("1"); + when(mockEvent.getField(Constants.ERROR_MESSAGE)).thenReturn("password authentication failed for user \"testuser\""); + when(mockEvent.getField(Constants.LOG_TIME)).thenReturn("2023-11-10T10:15:30Z"); + when(mockEvent.getField(Constants.ACCOUNT_ID)).thenReturn("123456789012"); + when(mockEvent.getField(Constants.INSTANCE_NAME)).thenReturn("dsql-cluster-1"); + + Record record = Parser.parseRecord(mockEvent); + + assertNotNull(record); + assertNotNull(record.getException()); + assertEquals(Constants.LOGIN_FAILED, record.getException().getExceptionTypeId()); + assertTrue(record.getException().getDescription().contains("password authentication failed")); + } + + @Test + public void testParseTimestampValid() throws ParseException { + Event mockEvent = mock(Event.class); + + Map eventData = new HashMap<>(); + eventData.put(Constants.LOG_TIME, "2023-11-10T10:15:30Z"); + eventData.put(Constants.DATABASE_NAME, "testdb"); + eventData.put(Constants.DB_USER_NAME, "testuser"); + eventData.put(Constants.EXIT_CODE, "0"); + eventData.put(Constants.STATEMENT_TEXT, "SELECT 1;"); + + when(mockEvent.getData()).thenReturn(eventData); + when(mockEvent.getField(Constants.LOG_TIME)).thenReturn("2023-11-10T10:15:30Z"); + when(mockEvent.getField(Constants.DATABASE_NAME)).thenReturn("testdb"); + when(mockEvent.getField(Constants.DB_USER_NAME)).thenReturn("testuser"); + when(mockEvent.getField(Constants.EXIT_CODE)).thenReturn("0"); + when(mockEvent.getField(Constants.STATEMENT_TEXT)).thenReturn("SELECT 1;"); + when(mockEvent.getField(Constants.ACCOUNT_ID)).thenReturn("123456789012"); + when(mockEvent.getField(Constants.INSTANCE_NAME)).thenReturn("dsql-cluster-1"); + + Record record = Parser.parseRecord(mockEvent); + + assertNotNull(record.getTime()); + assertTrue(record.getTime().getTimstamp() > 0); + } + + @Test + public void testParseDbName() throws ParseException { + Event mockEvent = mock(Event.class); + + Map eventData = new HashMap<>(); + eventData.put(Constants.DATABASE_NAME, "production_db"); + eventData.put(Constants.DB_USER_NAME, "admin"); + eventData.put(Constants.EXIT_CODE, "0"); + eventData.put(Constants.STATEMENT_TEXT, "SELECT 1;"); + eventData.put(Constants.LOG_TIME, "2023-11-10T10:15:30Z"); + + when(mockEvent.getData()).thenReturn(eventData); + when(mockEvent.getField(Constants.DATABASE_NAME)).thenReturn("production_db"); + when(mockEvent.getField(Constants.DB_USER_NAME)).thenReturn("admin"); + when(mockEvent.getField(Constants.EXIT_CODE)).thenReturn("0"); + when(mockEvent.getField(Constants.STATEMENT_TEXT)).thenReturn("SELECT 1;"); + when(mockEvent.getField(Constants.LOG_TIME)).thenReturn("2023-11-10T10:15:30Z"); + when(mockEvent.getField(Constants.ACCOUNT_ID)).thenReturn("123456789012"); + when(mockEvent.getField(Constants.INSTANCE_NAME)).thenReturn("dsql-cluster-1"); + + Record record = Parser.parseRecord(mockEvent); + + assertNotNull(record.getDbName()); + assertEquals("123456789012:dsql-cluster-1:production_db", record.getDbName()); + assertEquals("123456789012:dsql-cluster-1:production_db", record.getAccessor().getServiceName()); + } + + @Test + public void testParseSessionLocator() throws ParseException { + Event mockEvent = mock(Event.class); + + Map eventData = new HashMap<>(); + eventData.put(Constants.DATABASE_NAME, "testdb"); + eventData.put(Constants.DB_USER_NAME, "testuser"); + eventData.put(Constants.REMOTE_HOST, "192.168.1.100"); + eventData.put(Constants.REMOTE_PORT, "54321"); + eventData.put(Constants.EXIT_CODE, "0"); + eventData.put(Constants.STATEMENT_TEXT, "SELECT 1;"); + eventData.put(Constants.LOG_TIME, "2023-11-10T10:15:30Z"); + + when(mockEvent.getData()).thenReturn(eventData); + when(mockEvent.getField(Constants.DATABASE_NAME)).thenReturn("testdb"); + when(mockEvent.getField(Constants.DB_USER_NAME)).thenReturn("testuser"); + when(mockEvent.getField(Constants.REMOTE_HOST)).thenReturn("192.168.1.100"); + when(mockEvent.getField(Constants.REMOTE_PORT)).thenReturn("54321"); + when(mockEvent.getField(Constants.EXIT_CODE)).thenReturn("0"); + when(mockEvent.getField(Constants.STATEMENT_TEXT)).thenReturn("SELECT 1;"); + when(mockEvent.getField(Constants.LOG_TIME)).thenReturn("2023-11-10T10:15:30Z"); + when(mockEvent.getField(Constants.ACCOUNT_ID)).thenReturn("123456789012"); + when(mockEvent.getField(Constants.INSTANCE_NAME)).thenReturn("dsql-cluster-1"); + + Record record = Parser.parseRecord(mockEvent); + + assertNotNull(record.getSessionLocator()); + assertEquals("192.168.1.100", record.getSessionLocator().getClientIp()); + assertEquals(54321, record.getSessionLocator().getClientPort()); + assertEquals("123456789012:dsql-cluster-1:testdb", record.getSessionLocator().getServerIp()); + assertFalse(record.getSessionLocator().isIpv6()); + } +} \ No newline at end of file diff --git a/filter-plugin/logstash-filter-dsql-guardium/src/test/java/com/ibm/guardium/dsql/ParserTest.java b/filter-plugin/logstash-filter-dsql-guardium/src/test/java/com/ibm/guardium/dsql/ParserTest.java new file mode 100644 index 000000000..4aff3315e --- /dev/null +++ b/filter-plugin/logstash-filter-dsql-guardium/src/test/java/com/ibm/guardium/dsql/ParserTest.java @@ -0,0 +1,504 @@ +package com.ibm.guardium.dsql; + +import co.elastic.logstash.api.Event; +import com.ibm.guardium.universalconnector.commons.structures.Record; +import org.junit.Test; + +import java.text.ParseException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.*; + +public class ParserTest { + + @Test + public void testParseNestedPostgreSQLLoginFailure() throws ParseException { + // Create a nested DatabaseActivityMonitoringRecord format event + Event event = new org.logstash.Event(); + + event.setField(Constants.TYPE, "DatabaseActivityMonitoringRecord"); + event.setField(Constants.CLUSTER_ID, ""); + event.setField(Constants.INSTANCE_ID, "db-4JCWQLUZVFYP7DIWP6JVQ77O3Q"); + event.setField(Constants.ACCOUNT_ID, "123456789012"); + event.setField(Constants.INSTANCE_NAME, "postgres-cluster-1"); + + // Create the nested event list + List> eventList = new ArrayList<>(); + Map nestedEvent = new HashMap<>(); + + nestedEvent.put(Constants.CLASS, "LOGIN"); + nestedEvent.put(Constants.CLIENT_APPLICATION, "psql"); + nestedEvent.put(Constants.COMMAND, "LOGIN FAILED"); + nestedEvent.put(Constants.COMMAND_TEXT, "Login failed for user 'test'. Reason: Password did not match that for the login provided."); + nestedEvent.put(Constants.DATABASE_NAME, "testdb"); + nestedEvent.put(Constants.DB_PROTOCOL, "POSTGRESQL"); + nestedEvent.put(Constants.DB_USER_NAME, "test"); + nestedEvent.put(Constants.ERROR_MESSAGE, "password authentication failed for user \"test\""); + nestedEvent.put(Constants.EXIT_CODE, 1); + nestedEvent.put(Constants.LOG_TIME, "2022-10-06T21:34:42.711Z"); + nestedEvent.put(Constants.REMOTE_HOST, "10.0.1.100"); + nestedEvent.put(Constants.REMOTE_PORT, 5432); + nestedEvent.put(Constants.SERVER_HOST, "172.31.30.159"); + nestedEvent.put(Constants.SESSION_ID, "session-123"); + nestedEvent.put(Constants.START_TIME, null); + nestedEvent.put(Constants.TYPE, "record"); + + eventList.add(nestedEvent); + event.setField(Constants.DATABASE_ACTIVITY_EVENT_LIST, eventList); + + // Parse the record + Record record = Parser.parseRecord(event); + + // Verify the record was parsed correctly + assertNotNull(record); + assertNotNull(record.getException()); + assertEquals(Constants.LOGIN_FAILED, record.getException().getExceptionTypeId()); + assertTrue(record.getException().getDescription().contains("password authentication failed")); + assertEquals("test", record.getAccessor().getDbUser()); + assertEquals("10.0.1.100", record.getSessionLocator().getClientIp()); + assertEquals(5432, record.getSessionLocator().getClientPort()); + assertEquals("session-123", record.getSessionId()); + assertEquals(Constants.SERVER_TYPE_STRING, record.getAccessor().getServerType()); + assertEquals(Constants.DATA_PROTOCOL_STRING, record.getAccessor().getDbProtocol()); + } + + @Test + public void testParseNestedPostgreSQLSuccessfulQuery() throws ParseException { + // Create a nested DatabaseActivityMonitoringRecord format event + Event event = new org.logstash.Event(); + + event.setField(Constants.TYPE, "DatabaseActivityMonitoringRecord"); + event.setField(Constants.CLUSTER_ID, ""); + event.setField(Constants.INSTANCE_ID, "db-4JCWQLUZVFYP7DIWP6JVQ77O3Q"); + event.setField(Constants.ACCOUNT_ID, "123456789012"); + event.setField(Constants.INSTANCE_NAME, "postgres-cluster-1"); + + // Create the nested event list + List> eventList = new ArrayList<>(); + Map nestedEvent = new HashMap<>(); + + nestedEvent.put(Constants.CLASS, "READ"); + nestedEvent.put(Constants.CLIENT_APPLICATION, "psql"); + nestedEvent.put(Constants.COMMAND, "SELECT"); + nestedEvent.put(Constants.COMMAND_TEXT, "SELECT * FROM users WHERE id = 1;"); + nestedEvent.put(Constants.DATABASE_NAME, "testdb"); + nestedEvent.put(Constants.DB_PROTOCOL, "POSTGRESQL"); + nestedEvent.put(Constants.DB_USER_NAME, "postgres"); + nestedEvent.put(Constants.EXIT_CODE, 0); + nestedEvent.put(Constants.LOG_TIME, "2022-10-06T21:34:42.711Z"); + nestedEvent.put(Constants.REMOTE_HOST, "10.0.1.100"); + nestedEvent.put(Constants.REMOTE_PORT, 5432); + nestedEvent.put(Constants.SERVER_HOST, "172.31.30.159"); + nestedEvent.put(Constants.SESSION_ID, "session-456"); + nestedEvent.put(Constants.TYPE, "record"); + + eventList.add(nestedEvent); + event.setField(Constants.DATABASE_ACTIVITY_EVENT_LIST, eventList); + + // Parse the record + Record record = Parser.parseRecord(event); + + // Verify the record was parsed correctly + assertNotNull(record); + assertNotNull(record.getData()); + assertEquals("SELECT * FROM users WHERE id = 1;", record.getData().getOriginalSqlCommand()); + assertEquals("postgres", record.getAccessor().getDbUser()); + assertEquals("10.0.1.100", record.getSessionLocator().getClientIp()); + assertEquals(5432, record.getSessionLocator().getClientPort()); + assertEquals("session-456", record.getSessionId()); + } + + @Test + public void testAcceptSQLServerRecord() throws ParseException { + // Create a nested event with SQL Server protocol + Event event = new org.logstash.Event(); + + event.setField(Constants.TYPE, "DatabaseActivityMonitoringRecord"); + event.setField(Constants.INSTANCE_ID, "db-test"); + event.setField(Constants.ACCOUNT_ID, "123456789012"); + event.setField(Constants.INSTANCE_NAME, "sqlserver-instance"); + + List> eventList = new ArrayList<>(); + Map nestedEvent = new HashMap<>(); + + nestedEvent.put(Constants.DB_PROTOCOL, "SQLSERVER"); + nestedEvent.put(Constants.DB_USER_NAME, "test"); + nestedEvent.put(Constants.DATABASE_NAME, "testdb"); + nestedEvent.put(Constants.EXIT_CODE, 0); + nestedEvent.put(Constants.STATEMENT_TEXT, "SELECT 1"); + nestedEvent.put(Constants.LOG_TIME, "2023-11-10T10:15:30Z"); + nestedEvent.put(Constants.REMOTE_HOST, "10.0.0.1"); + nestedEvent.put(Constants.REMOTE_PORT, 1433); + nestedEvent.put(Constants.SESSION_ID, "session-789"); + + eventList.add(nestedEvent); + event.setField(Constants.DATABASE_ACTIVITY_EVENT_LIST, eventList); + + // This should now be accepted even though it's SQL Server + Record record = Parser.parseRecord(event); + + assertNotNull(record); + assertEquals("test", record.getAccessor().getDbUser()); + assertNotNull(record.getData()); + } + + @Test + public void testFlatFormatStillWorks() throws ParseException { + // Verify that the original flat format still works + Event event = new org.logstash.Event(); + + event.setField(Constants.TYPE, "record"); + event.setField(Constants.DATABASE_NAME, "testdb"); + event.setField(Constants.DB_USER_NAME, "postgres"); + event.setField(Constants.DB_PROTOCOL, "POSTGRESQL"); + event.setField(Constants.REMOTE_HOST, "10.0.0.1"); + event.setField(Constants.REMOTE_PORT, 5432); + event.setField(Constants.SESSION_ID, "session789"); + event.setField(Constants.STATEMENT_TEXT, "SELECT 1;"); + event.setField(Constants.EXIT_CODE, 0); + event.setField(Constants.LOG_TIME, "2023-11-10T10:15:30Z"); + event.setField(Constants.ACCOUNT_ID, "123456789012"); + event.setField(Constants.INSTANCE_NAME, "dsql-cluster-1"); + + Record record = Parser.parseRecord(event); + + assertNotNull(record); + assertNotNull(record.getData()); + assertEquals("SELECT 1;", record.getData().getOriginalSqlCommand()); + assertEquals("postgres", record.getAccessor().getDbUser()); + } + + @Test + public void testParseSQLServerTimestampFormat() throws ParseException { + // Test SQL Server timestamp format with 7 decimal places: "2022-10-06 21:34:42.7113072+00" + Event event = new org.logstash.Event(); + + event.setField(Constants.TYPE, "DatabaseActivityMonitoringRecord"); + event.setField(Constants.INSTANCE_ID, "db-4JCWQLUZVFYP7DIWP6JVQ77O3Q"); + event.setField(Constants.ACCOUNT_ID, "123456789012"); + event.setField(Constants.INSTANCE_NAME, "postgres-cluster-1"); + + List> eventList = new ArrayList<>(); + Map nestedEvent = new HashMap<>(); + + nestedEvent.put(Constants.CLASS, "LOGIN"); + nestedEvent.put(Constants.CLIENT_APPLICATION, "psql"); + nestedEvent.put(Constants.COMMAND, "LOGIN FAILED"); + nestedEvent.put(Constants.COMMAND_TEXT, "Login failed for user 'test'."); + nestedEvent.put(Constants.DATABASE_NAME, "testdb"); + nestedEvent.put(Constants.DB_PROTOCOL, "POSTGRESQL"); + nestedEvent.put(Constants.DB_USER_NAME, "test"); + nestedEvent.put(Constants.ERROR_MESSAGE, "password authentication failed"); + nestedEvent.put(Constants.EXIT_CODE, 1); + // SQL Server timestamp format with space and 7 decimal places + nestedEvent.put(Constants.LOG_TIME, "2022-10-06 21:34:42.7113072+00"); + nestedEvent.put(Constants.REMOTE_HOST, "10.0.1.100"); + nestedEvent.put(Constants.REMOTE_PORT, 5432); + nestedEvent.put(Constants.SESSION_ID, "session-123"); + nestedEvent.put(Constants.TYPE, "record"); + + eventList.add(nestedEvent); + event.setField(Constants.DATABASE_ACTIVITY_EVENT_LIST, eventList); + + Record record = Parser.parseRecord(event); + + assertNotNull(record); + assertNotNull(record.getTime()); + // Verify timestamp was parsed (should be > 0) + assertTrue(record.getTime().getTimstamp() > 0); + // Verify it's approximately correct (October 2022) + assertTrue(record.getTime().getTimstamp() > 1665000000000L); + assertTrue(record.getTime().getTimstamp() < 1666000000000L); + } + + @Test + public void testParseNestedDDLStatement() throws ParseException { + // Test DDL statement (CREATE TABLE) with error message (creates exception) + Event event = new org.logstash.Event(); + + event.setField(Constants.TYPE, "DatabaseActivityMonitoringRecord"); + event.setField(Constants.CLUSTER_ID, ""); + event.setField(Constants.INSTANCE_ID, "db-4JCWQLUZVFYP7DIWP6JVQ77O3Q"); + event.setField(Constants.ACCOUNT_ID, "123456789012"); + event.setField(Constants.INSTANCE_NAME, "postgres-cluster-1"); + + List> eventList = new ArrayList<>(); + Map nestedEvent = new HashMap<>(); + + nestedEvent.put(Constants.CLASS, "SCHEMA"); + nestedEvent.put(Constants.CLIENT_APPLICATION, "pgAdmin"); + nestedEvent.put(Constants.COMMAND, "CREATE"); + nestedEvent.put(Constants.COMMAND_TEXT, "CREATE TABLE testDB.public.TestTable2(textA varchar(6000), textB varchar(6000))"); + nestedEvent.put(Constants.DATABASE_NAME, "testDB"); + nestedEvent.put(Constants.DB_PROTOCOL, "POSTGRESQL"); + nestedEvent.put(Constants.DB_USER_NAME, "test"); + nestedEvent.put(Constants.ERROR_MESSAGE, "permission denied for schema public"); + nestedEvent.put(Constants.EXIT_CODE, 1); + nestedEvent.put(Constants.LOG_TIME, "2022-10-06 21:44:38.4120677+00"); + nestedEvent.put(Constants.REMOTE_HOST, "10.0.1.100"); + nestedEvent.put(Constants.REMOTE_PORT, 5432); + nestedEvent.put(Constants.SERVER_HOST, "172.31.30.159"); + nestedEvent.put(Constants.SESSION_ID, 84); + nestedEvent.put(Constants.TYPE, "record"); + + eventList.add(nestedEvent); + event.setField(Constants.DATABASE_ACTIVITY_EVENT_LIST, eventList); + + Record record = Parser.parseRecord(event); + + assertNotNull(record); + // Since exitCode is 1, should have exception not data + assertNotNull(record.getException()); + assertEquals(Constants.SQL_ERROR, record.getException().getExceptionTypeId()); + assertEquals("permission denied for schema public", record.getException().getDescription()); + assertEquals("CREATE TABLE testDB.public.TestTable2(textA varchar(6000), textB varchar(6000))", + record.getException().getSqlString()); + assertEquals("test", record.getAccessor().getDbUser()); + assertEquals("10.0.1.100", record.getSessionLocator().getClientIp()); + assertEquals(5432, record.getSessionLocator().getClientPort()); + } + + @Test + public void testParseSelectWithExitCode1NoError() throws ParseException { + // Test SELECT statement with exitCode 1 but no error message (should create Data, not Exception) + Event event = new org.logstash.Event(); + + event.setField(Constants.TYPE, "DatabaseActivityMonitoringRecord"); + event.setField(Constants.CLUSTER_ID, ""); + event.setField(Constants.INSTANCE_ID, "db-4JCWQLUZVFYP7DIWP6JVQ77O3Q"); + event.setField(Constants.ACCOUNT_ID, "123456789012"); + event.setField(Constants.INSTANCE_NAME, "postgres-cluster-1"); + + List> eventList = new ArrayList<>(); + Map nestedEvent = new HashMap<>(); + + nestedEvent.put(Constants.CLASS, "TABLE"); + nestedEvent.put(Constants.CLIENT_APPLICATION, "psql"); + nestedEvent.put(Constants.COMMAND, "SELECT"); + nestedEvent.put(Constants.COMMAND_TEXT, "select * from testDB.public.TestTable"); + nestedEvent.put(Constants.DATABASE_NAME, "testDB"); + nestedEvent.put(Constants.DB_PROTOCOL, "POSTGRESQL"); + nestedEvent.put(Constants.DB_USER_NAME, "test"); + nestedEvent.put(Constants.ERROR_MESSAGE, null); // No error message + nestedEvent.put(Constants.EXIT_CODE, 1); // exitCode 1 but no error + nestedEvent.put(Constants.LOG_TIME, "2022-10-06 21:24:59.9422268+00"); + nestedEvent.put(Constants.REMOTE_HOST, "10.0.1.100"); + nestedEvent.put(Constants.REMOTE_PORT, 5432); + nestedEvent.put(Constants.SERVER_HOST, "172.31.30.159"); + nestedEvent.put(Constants.SESSION_ID, 62); + nestedEvent.put(Constants.TYPE, "record"); + + eventList.add(nestedEvent); + event.setField(Constants.DATABASE_ACTIVITY_EVENT_LIST, eventList); + + Record record = Parser.parseRecord(event); + + assertNotNull(record); + // Since errorMessage is null, should create Data record (not Exception) + assertNotNull(record.getData()); + assertEquals("select * from testDB.public.TestTable", record.getData().getOriginalSqlCommand()); + assertEquals("test", record.getAccessor().getDbUser()); + assertEquals("10.0.1.100", record.getSessionLocator().getClientIp()); + assertEquals(5432, record.getSessionLocator().getClientPort()); + } + + @Test + public void testParseNestedDDLStatementSuccess() throws ParseException { + // Test successful DDL statement (CREATE TABLE) with no error message + Event event = new org.logstash.Event(); + + event.setField(Constants.TYPE, "DatabaseActivityMonitoringRecord"); + event.setField(Constants.INSTANCE_ID, "db-4JCWQLUZVFYP7DIWP6JVQ77O3Q"); + event.setField(Constants.ACCOUNT_ID, "123456789012"); + event.setField(Constants.INSTANCE_NAME, "postgres-cluster-1"); + + List> eventList = new ArrayList<>(); + Map nestedEvent = new HashMap<>(); + + nestedEvent.put(Constants.CLASS, "SCHEMA"); + nestedEvent.put(Constants.CLIENT_APPLICATION, "psql"); + nestedEvent.put(Constants.COMMAND, "CREATE"); + nestedEvent.put(Constants.COMMAND_TEXT, "CREATE TABLE users(id serial PRIMARY KEY, name varchar(100))"); + nestedEvent.put(Constants.DATABASE_NAME, "mydb"); + nestedEvent.put(Constants.DB_PROTOCOL, "POSTGRESQL"); + nestedEvent.put(Constants.DB_USER_NAME, "postgres"); + nestedEvent.put(Constants.EXIT_CODE, 0); + nestedEvent.put(Constants.LOG_TIME, "2022-10-06T21:44:38.412Z"); + nestedEvent.put(Constants.REMOTE_HOST, "10.0.1.100"); + nestedEvent.put(Constants.REMOTE_PORT, 5432); + nestedEvent.put(Constants.SESSION_ID, "session-789"); + nestedEvent.put(Constants.TYPE, "record"); + + eventList.add(nestedEvent); + event.setField(Constants.DATABASE_ACTIVITY_EVENT_LIST, eventList); + + Record record = Parser.parseRecord(event); + + assertNotNull(record); + // Since exitCode is 0, should have data not exception + assertNotNull(record.getData()); + assertEquals("CREATE TABLE users(id serial PRIMARY KEY, name varchar(100))", + record.getData().getOriginalSqlCommand()); + assertEquals("postgres", record.getAccessor().getDbUser()); + } + + @Test + public void testParseWithNullRemotePort() throws ParseException { + // Test handling of null remotePort (common in SQL Server logs) + Event event = new org.logstash.Event(); + + event.setField(Constants.TYPE, "DatabaseActivityMonitoringRecord"); + event.setField(Constants.INSTANCE_ID, "db-test"); + event.setField(Constants.ACCOUNT_ID, "123456789012"); + event.setField(Constants.INSTANCE_NAME, "postgres-cluster-1"); + + List> eventList = new ArrayList<>(); + Map nestedEvent = new HashMap<>(); + + nestedEvent.put(Constants.DB_PROTOCOL, "POSTGRESQL"); + nestedEvent.put(Constants.DB_USER_NAME, "test"); + nestedEvent.put(Constants.DATABASE_NAME, "testdb"); + nestedEvent.put(Constants.REMOTE_HOST, "local machine"); + nestedEvent.put(Constants.REMOTE_PORT, null); // null port + nestedEvent.put(Constants.EXIT_CODE, 0); + nestedEvent.put(Constants.STATEMENT_TEXT, "SELECT 1"); + nestedEvent.put(Constants.LOG_TIME, "2023-11-10T10:15:30Z"); + nestedEvent.put(Constants.SESSION_ID, "session-456"); + nestedEvent.put(Constants.TYPE, "record"); + + eventList.add(nestedEvent); + event.setField(Constants.DATABASE_ACTIVITY_EVENT_LIST, eventList); + + Record record = Parser.parseRecord(event); + + assertNotNull(record); + assertEquals("local machine", record.getSessionLocator().getClientIp()); + // Should use default port when null + assertEquals(Constants.DEFAULT_PORT, record.getSessionLocator().getClientPort()); + } + + @Test + public void testParseWithEmptyDatabaseName() throws ParseException { + // Test handling of empty database name (common in login failures) + Event event = new org.logstash.Event(); + + event.setField(Constants.TYPE, "DatabaseActivityMonitoringRecord"); + event.setField(Constants.INSTANCE_ID, "db-test"); + event.setField(Constants.ACCOUNT_ID, "123456789012"); + event.setField(Constants.INSTANCE_NAME, "postgres-cluster-1"); + + List> eventList = new ArrayList<>(); + Map nestedEvent = new HashMap<>(); + + nestedEvent.put(Constants.DB_PROTOCOL, "POSTGRESQL"); + nestedEvent.put(Constants.DB_USER_NAME, "test"); + nestedEvent.put(Constants.DATABASE_NAME, ""); // Empty database name + nestedEvent.put(Constants.REMOTE_HOST, "10.0.0.1"); + nestedEvent.put(Constants.REMOTE_PORT, 5432); + nestedEvent.put(Constants.EXIT_CODE, 1); + nestedEvent.put(Constants.ERROR_MESSAGE, "authentication failed"); + nestedEvent.put(Constants.LOG_TIME, "2023-11-10T10:15:30Z"); + nestedEvent.put(Constants.SESSION_ID, 0); + nestedEvent.put(Constants.TYPE, "record"); + + eventList.add(nestedEvent); + event.setField(Constants.DATABASE_ACTIVITY_EVENT_LIST, eventList); + + Record record = Parser.parseRecord(event); + + assertNotNull(record); + // Empty database name should result in N.A. + assertEquals(Constants.NA, record.getDbName()); + } + + @Test + public void testParseMultilineCommandText() throws ParseException { + // Test handling of multi-line SQL statements (common in DDL) + Event event = new org.logstash.Event(); + + event.setField(Constants.TYPE, "DatabaseActivityMonitoringRecord"); + event.setField(Constants.INSTANCE_ID, "db-test"); + event.setField(Constants.ACCOUNT_ID, "123456789012"); + event.setField(Constants.INSTANCE_NAME, "postgres-cluster-1"); + + List> eventList = new ArrayList<>(); + Map nestedEvent = new HashMap<>(); + + String multilineSQL = "CREATE TABLE testDB.dbo.TestTable2(\r\n" + + "textA varchar(6000),\r\n" + + " textB varchar(6000)\r\n" + + ")"; + + nestedEvent.put(Constants.DB_PROTOCOL, "POSTGRESQL"); + nestedEvent.put(Constants.DB_USER_NAME, "postgres"); + nestedEvent.put(Constants.DATABASE_NAME, "testDB"); + nestedEvent.put(Constants.COMMAND_TEXT, multilineSQL); + nestedEvent.put(Constants.REMOTE_HOST, "10.0.0.1"); + nestedEvent.put(Constants.REMOTE_PORT, 5432); + nestedEvent.put(Constants.EXIT_CODE, 0); + nestedEvent.put(Constants.LOG_TIME, "2023-11-10T10:15:30Z"); + nestedEvent.put(Constants.SESSION_ID, "session-123"); + nestedEvent.put(Constants.TYPE, "record"); + + eventList.add(nestedEvent); + event.setField(Constants.DATABASE_ACTIVITY_EVENT_LIST, eventList); + + Record record = Parser.parseRecord(event); + + assertNotNull(record); + assertNotNull(record.getData()); + // Verify multi-line SQL is preserved + assertTrue(record.getData().getOriginalSqlCommand().contains("\r\n")); + assertTrue(record.getData().getOriginalSqlCommand().contains("textA varchar(6000)")); + } + + @Test + public void testParseSQLServerLoginFailureWithNullErrorMessage() throws ParseException { + // Test SQL Server login failure where errorMessage is null but command indicates failure + Event event = new org.logstash.Event(); + + event.setField(Constants.TYPE, "DatabaseActivityMonitoringRecord"); + event.setField(Constants.CLUSTER_ID, ""); + event.setField(Constants.INSTANCE_ID, "db-4JCWQLUZVFYP7DIWP6JVQ77O3Q"); + event.setField(Constants.ACCOUNT_ID, "123456789012"); + event.setField(Constants.INSTANCE_NAME, "sqlserver-cluster-1"); + + List> eventList = new ArrayList<>(); + Map nestedEvent = new HashMap<>(); + + nestedEvent.put(Constants.CLASS, "LOGIN"); + nestedEvent.put(Constants.CLIENT_APPLICATION, "Microsoft SQL Server Management Studio"); + nestedEvent.put(Constants.COMMAND, "LOGIN FAILED"); + nestedEvent.put(Constants.COMMAND_TEXT, "Login failed for user 'test'. Reason: Password did not match that for the login provided. [CLIENT: local-machine]"); + nestedEvent.put(Constants.DATABASE_NAME, ""); + nestedEvent.put(Constants.DB_PROTOCOL, "SQLSERVER"); + nestedEvent.put(Constants.DB_USER_NAME, "test"); + nestedEvent.put(Constants.ERROR_MESSAGE, null); // null error message + nestedEvent.put(Constants.EXIT_CODE, 0); + nestedEvent.put(Constants.LOG_TIME, "2022-10-06 21:34:42.7113072+00"); + nestedEvent.put(Constants.REMOTE_HOST, "local machine"); + nestedEvent.put(Constants.REMOTE_PORT, null); + nestedEvent.put(Constants.SERVER_HOST, "172.31.30.159"); + nestedEvent.put(Constants.SESSION_ID, 0); + nestedEvent.put(Constants.TYPE, "record"); + + eventList.add(nestedEvent); + event.setField(Constants.DATABASE_ACTIVITY_EVENT_LIST, eventList); + + Record record = Parser.parseRecord(event); + + // Verify this is treated as an exception (login failure) + assertNotNull(record); + assertNotNull(record.getException()); + assertEquals(Constants.LOGIN_FAILED, record.getException().getExceptionTypeId()); + assertTrue(record.getException().getDescription().contains("Login failed for user 'test'")); + assertEquals("test", record.getAccessor().getDbUser()); + assertEquals("local machine", record.getSessionLocator().getClientIp()); + assertEquals(Constants.DEFAULT_PORT, record.getSessionLocator().getClientPort()); + assertEquals("0", record.getSessionId()); + assertEquals(Constants.NA, record.getDbName()); // Empty database name + } +} \ No newline at end of file