Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions filter-plugin/logstash-filter-dsql-guardium/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Gradle
.gradle/
build/
out/

# IDE
.idea/
*.iml
*.ipr
*.iws
.vscode/
.settings/
.classpath
.project

# OS
.DS_Store
Thumbs.db

# Logstash
vendor/
Gemfile
Gemfile.lock
*.gemspec
*.gem
lib/

# Logs
*.log

# Temporary files
*.tmp
*.bak
*.swp
*~

# Made with Bob
47 changes: 47 additions & 0 deletions filter-plugin/logstash-filter-dsql-guardium/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Changelog

All notable changes to the DSQL Guardium filter plugin will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.0.0] - 2024-01-01

### Added
- Initial release of DSQL Guardium filter plugin
- Support for AWS DSQL Database Activity Streams
- PostgreSQL-compatible SQL parsing (DSQL uses PostgreSQL protocol)
- SQS input integration for receiving audit events
- Parsing of successful SQL statements
- Parsing of SQL errors and exceptions
- Authentication failure detection
- Session tracking with client IP and port
- Database user and database name extraction
- Timestamp parsing from audit logs
- Client application identification
- Comprehensive test coverage
- Documentation and configuration examples

### Features
- **Server Type**: POSTGRESQL
- **Data Protocol**: POSTGRESQL
- **Language**: PGRS (PostgreSQL)
- **Input Method**: AWS SQS
- **Supported Events**:
- Successful queries
- SQL errors
- Authentication failures
- Connection events

### Known Limitations
- IPv6 is not supported
- Client hostname and OS user fields are not available in DSQL audit logs
- Multiline characters in queries may not be preserved when using SQS
- Single line comments in queries are not fully supported

## [Unreleased]

### Planned
- Support for additional DSQL-specific features as they become available
- Enhanced error handling and logging
- Performance optimizations for high-volume environments
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
filter {
if [type] == "dsql" {

# Parse the JSON message from SQS
json {
source => "message"
target => "parsed_message"
}

# Extract fields from the parsed message to top level for easier access
if [parsed_message] {
mutate {
add_field => {
"databaseName" => "%{[parsed_message][databaseName]}"
"dbUserName" => "%{[parsed_message][dbUserName]}"
"remoteHost" => "%{[parsed_message][remoteHost]}"
"remotePort" => "%{[parsed_message][remotePort]}"
"sessionId" => "%{[parsed_message][sessionId]}"
"statementText" => "%{[parsed_message][statementText]}"
"exitCode" => "%{[parsed_message][exitCode]}"
"logTime" => "%{[parsed_message][logTime]}"
}
}

# Add optional fields if they exist
if [parsed_message][clientApplication] {
mutate {
add_field => { "clientApplication" => "%{[parsed_message][clientApplication]}" }
}
}

if [parsed_message][errorMessage] {
mutate {
add_field => { "errorMessage" => "%{[parsed_message][errorMessage]}" }
}
}

if [parsed_message][startTime] {
mutate {
add_field => { "startTime" => "%{[parsed_message][startTime]}" }
}
}

if [parsed_message][commandTag] {
mutate {
add_field => { "commandTag" => "%{[parsed_message][commandTag]}" }
}
}

if [parsed_message][statementId] {
mutate {
add_field => { "statementId" => "%{[parsed_message][statementId]}" }
}
}
}

# Apply the DSQL Guardium filter
dsql_guardium_plugin_filter {}

# Clean up temporary fields
mutate {
remove_field => [
"parsed_message",
"message",
"databaseName",
"dbUserName",
"remoteHost",
"remotePort",
"sessionId",
"statementText",
"exitCode",
"logTime",
"clientApplication",
"errorMessage",
"startTime",
"commandTag",
"statementId",
"type",
"@timestamp",
"@version"
]
}
}
}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"name": "Guardium_DSQL_filter",
"alias": "DSQL",
"type": "filter",
"pipeline_type":"pull",
"plugin_version": "1.0.0",
"datasourceTypes": [{"type":"DSQL","supportedVersions": [""]}],
"supported_input_plugins": ["SQS_input"],
"developer": "IBM",
"license": "Apache2.0",
"description": "Parses AWS DSQL Database Activity Stream events into Guardium.",
"configuration_notes": "Requires AWS SQS input plugin configured to receive DSQL Database Activity Stream events.",
"documentation_path": "https://github.com/IBM/universal-connectors/blob/main/filter-plugin/logstash-filter-dsql-guardium/README.md"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
#/*
#Copyright 2020-2021 IBM Inc. All rights reserved
#SPDX-License-Identifier: Apache-2.0
#*/

input {
sqs {
# Insert the access key and secret that has access to SQS queue
access_key_id => "<ACCESS_KEY>"
secret_access_key => "<SECRET_KEY>"
region => "<REGION>" # Region that has the Queue, Default value: us-east-1
queue => "<QUEUE_NAME>" # This parameter simply holds the Queue name and not the URL
codec => "json"
type => "dsql"
# Insert the account id of the AWS account
add_field => {"account_id" => "<ACCOUNT_ID>"}
# Insert the Instance/Cluster name of the DSQL database that is to be monitored
add_field => {"instance_name" => "<INSTANCE_NAME>"}
# Optional: Specify a custom endpoint (e.g., proxy)
# endpoint => "https://proxy.company.com"
# Set to true to use AWS's bundled CA certificates for SSL/TLS connections
# use_aws_bundled_ca => false
# Optional: Provide additional settings (e.g., custom SSL certificate bundle)
# additional_settings => {
# ssl_ca_bundle => "/usr/share/logstash/third_party/<ca_bundle_filename>"
# }
}
}

filter {
if [type] == "dsql" {

# Parse the JSON message from SQS
json {
source => "message"
target => "parsed_message"
}

# Extract fields from the parsed message to top level for easier access
if [parsed_message] {
mutate {
add_field => {
"databaseName" => "%{[parsed_message][databaseName]}"
"dbUserName" => "%{[parsed_message][dbUserName]}"
"remoteHost" => "%{[parsed_message][remoteHost]}"
"remotePort" => "%{[parsed_message][remotePort]}"
"sessionId" => "%{[parsed_message][sessionId]}"
"statementText" => "%{[parsed_message][statementText]}"
"exitCode" => "%{[parsed_message][exitCode]}"
"logTime" => "%{[parsed_message][logTime]}"
}
}

# Add optional fields if they exist
if [parsed_message][clientApplication] {
mutate {
add_field => { "clientApplication" => "%{[parsed_message][clientApplication]}" }
}
}

if [parsed_message][errorMessage] {
mutate {
add_field => { "errorMessage" => "%{[parsed_message][errorMessage]}" }
}
}

if [parsed_message][startTime] {
mutate {
add_field => { "startTime" => "%{[parsed_message][startTime]}" }
}
}

if [parsed_message][commandTag] {
mutate {
add_field => { "commandTag" => "%{[parsed_message][commandTag]}" }
}
}

if [parsed_message][statementId] {
mutate {
add_field => { "statementId" => "%{[parsed_message][statementId]}" }
}
}
}

# Apply the DSQL Guardium filter
dsql_guardium_plugin_filter {}

# Clean up temporary fields
mutate {
remove_field => [
"parsed_message",
"message",
"databaseName",
"dbUserName",
"remoteHost",
"remotePort",
"sessionId",
"statementText",
"exitCode",
"logTime",
"clientApplication",
"errorMessage",
"startTime",
"commandTag",
"statementId",
"type",
"@timestamp",
"@version"
]
}
}
}

output {
# Send to Guardium
if [GuardRecord] {
guardium_connector {
# Guardium configuration
}
}

# Optional: Output to stdout for debugging
# stdout {
# codec => rubydebug
# }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"templates": [
{
"name": "DSQL over SQS",
"description": "Template for AWS DSQL Database Activity Streams over SQS",
"type": "filter",
"datasource_type": "DSQL",
"input_type": "SQS",
"version": "1.0.0"
}
]
}
Loading
Loading