Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ storm.nimbus.zookeeper.acls.fixup: true

storm.auth.simple-white-list.users: []
storm.cluster.state.store: "org.apache.storm.cluster.ZKStateStorageFactory"
storm.meta.serialization.delegate: "org.apache.storm.serialization.GzipThriftSerializationDelegate"
storm.meta.serialization.delegate: "org.apache.storm.serialization.ZstdBridgeThriftSerializationDelegate"
storm.compression.zstd.level: 3
storm.compression.zstd.max.decompressed.bytes: 104857600
storm.compression.gzip.max.decompressed.bytes: 104857600
storm.codedistributor.class: "org.apache.storm.codedistributor.LocalFileSystemCodeDistributor"
storm.workers.artifacts.dir: "workers-artifacts"
storm.health.check.dir: "healthchecks"
Expand Down
124 changes: 124 additions & 0 deletions docs/Cluster-State-Serialization.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
---
title: Cluster State Serialization
layout: documentation
documentation: true
---

This page describes how Storm serializes the *meta* state it persists in
ZooKeeper (and other configured state stores) such as topology assignments, Nimbus
summaries, `StormBase` records, log configs, credentials, worker heartbeats,
profile requests, errors, etc.

It is distinct from
[tuple serialization](Serialization.html), which covers payloads exchanged
between spouts and bolts at runtime via Kryo.

## Background

All cluster state writes go through `Utils.serialize(...)` /
`Utils.deserialize(...)`, which in turn delegate to a pluggable
`SerializationDelegate` selected by the
`storm.meta.serialization.delegate` config.

## Configuration

| Key | Default | Range | Description |
|---|---|---|---|
| `storm.meta.serialization.delegate` | `org.apache.storm.serialization.ZstdBridgeThriftSerializationDelegate` | any `SerializationDelegate` impl | Class used to (de)serialize cluster state. |
| `storm.compression.zstd.level` | `3` | `1`–`19` | Zstandard compression level. Higher = smaller + slower. Levels 20–22 are rejected by the validator. |
| `storm.compression.zstd.max.decompressed.bytes` | `104857600` (100 MiB) | `> 0` | Hard cap on the size of any zstd-decompressed payload. |
| `storm.compression.gzip.max.decompressed.bytes` | `104857600` (100 MiB) | `> 0` | Hard cap on the size of any gzip-decompressed payload. Also enforced by `GzipSerializationDelegate`. |

## Choosing a delegate

* **`ZstdBridgeThriftSerializationDelegate`** *(default)* — recommended.
Writes zstd, reads anything previously written. Use this unless you
have a specific reason not to.
* **`ZstdThriftSerializationDelegate`** — pure zstd, refuses non-zstd
input. Only safe to deploy after every znode in your state store has
been rewritten by a bridge delegate (e.g. by submitting / killing each
topology, or by force-rewriting Nimbus state). Use only when you want
to *enforce* the new format.
* **`GzipBridgeThriftSerializationDelegate`** — legacy default; still
available for clusters that want to roll forward without touching the
codec.
* **`ThriftSerializationDelegate`** — raw Thrift.

## Migration to Zstandard compression

Starting with Apache Storm 3.X, Zstandard is supported as the default
compression codec for cluster state, replacing gzip for better
performance — faster compression and decompression at comparable or
better ratios. Earlier versions used `GzipThriftSerializationDelegate`,
wrapped by `GzipBridgeThriftSerializationDelegate` to allow rolling
upgrades from clusters that had previously stored raw Thrift bytes; the
new `ZstdBridgeThriftSerializationDelegate` plays the equivalent bridge
role for the gzip to zstd transition.

| Area | Gzip | Zstandard |
|---|---------------------------------------------------------|-----------------------------------------------------|
| Default delegate | `GzipThriftSerializationDelegate` (via `GzipBridge...`) | `ZstdBridgeThriftSerializationDelegate` |
| Compression codec | gzip (`java.util.zip`) | Zstandard (via `commons-compress` + `zstd-jni`) |
| Decompression bound | none | bounded (`BoundedInputStream`), default 100 MiB |
| Format detection | gzip magic only | gzip magic *and* zstd magic |
| Config validation | none for compression | `ZstdLevelValidator` (1–19), positive bounds checks |

### Zstandard `SerializationDelegate` implementations

* `ZstdThriftSerializationDelegate`: pure zstd Thrift codec. Serializes
any `TBase` with zstd at the configured level; deserialization
requires the input to begin with the zstd magic number
(`0xFD2FB528`).
* `ZstdBridgeThriftSerializationDelegate`: the new default, implemented to
allow rolling upgrades from clusters that had previously stored payloads
as gzip-compressed. Always *writes* zstd. On read, dispatches based on a
magic-byte sniff:

```
ZstdBridgeThriftSerializationDelegate.deserialize(bytes)
├── bytes starts with zstd magic (0xFD2FB528) delegates to ZstdThriftSerializationDelegate
└── otherwise, delegates to GzipBridgeThriftSerializationDelegate.deserialize(bytes)
├── bytes starts with gzip magic (0x1F8B) delegates to GzipThriftSerializationDelegate
└── otherwise delegates to ThriftSerializationDelegate (raw Thrift)
```

This delegation chain is the key property that makes the new default
rolling-upgrade safe: nodes running the new code can still read every
older payload that may already exist in ZooKeeper, while new writes use
zstd.

### Zip-bomb protection

`GzipUtils.decompress` and `ZstdUtils.decompress` (both in
`org.apache.storm.utils.Utils`) wrap the decompressor stream in an Apache
Commons `BoundedInputStream` with `maxCount` set to the configured cap.
After draining the bounded stream, the underlying decompressor is probed
with one extra `read()`; if any byte remains, the call fails with:

```
Decompression threshold exceeded! Possible security risk or invalid data size.
```

The same guard is applied to the legacy `GzipSerializationDelegate` (the
non-Thrift Java-serialization variant).

### Upgrading an existing cluster

1. **Roll Nimbus and Supervisors onto the new build.** The bridge
delegate is the default, so no config change is required for a safe
upgrade.
2. **(Optional) Tune `storm.compression.zstd.level`** if you want a
tighter compression / latency trade-off. Most state writes are
infrequent; level 3 is a good default.
3. **(Optional) Tune `storm.compression.zstd.max.decompressed.bytes`** if
you legitimately persist payloads larger than 100 MiB. The cap
guards against malformed or hostile data, raise it deliberately.
4. **(Optional) Switch to the strict `ZstdThriftSerializationDelegate`**
*only* after every legacy payload has been rewritten. The bridge
delegate is sufficient for the vast majority of deployments.

### Dependencies

The zstd codec is provided by Apache Commons Compress
(`org.apache.commons:commons-compress`) backed by the `com.github.luben:zstd-jni`
native binding.
2 changes: 2 additions & 0 deletions docs/Serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ documentation: true
---
This page is about how the serialization system in Storm works for versions 0.6.0 and onwards. Storm used a different serialization system prior to 0.6.0 which is documented on [Serialization (prior to 0.6.0)](Serialization-\(prior-to-0.6.0\).html).

> This page covers **tuple** serialization (data flowing between spouts and bolts). For how Storm serializes the meta state it persists in ZooKeeper and related configuration, see [Cluster State Serialization](Cluster-State-Serialization.html).

Tuples can be comprised of objects of any types. Since Storm is a distributed system, it needs to know how to serialize and deserialize objects when they're passed between tasks.

Storm uses [Kryo](https://github.com/EsotericSoftware/kryo) for serialization. Kryo is a flexible and fast serialization library that produces small serializations.
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@

<!-- dependency versions -->
<commons-compress.version>1.28.0</commons-compress.version>
<zstd-jni.version>1.5.7-8</zstd-jni.version>
<commons-io.version>2.22.0</commons-io.version>
<commons-lang3.version>3.20.0</commons-lang3.version>
<commons-exec.version>1.6.0</commons-exec.version>
Expand Down Expand Up @@ -514,6 +515,11 @@
<artifactId>commons-compress</artifactId>
<version>${commons-compress.version}</version>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>${zstd-jni.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
Expand Down
8 changes: 8 additions & 0 deletions storm-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
19 changes: 18 additions & 1 deletion storm-client/src/jvm/org/apache/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1458,7 +1458,24 @@ public class Config extends HashMap<String, Object> {
*/
@IsString
public static final String STORM_META_SERIALIZATION_DELEGATE = "storm.meta.serialization.delegate";

/**
* GZIP max decompression bytes. Defaults to 104857600 (100MB).
*/
@IsPositiveNumber(includeZero = false)
public static final String STORM_COMPRESSION_GZIP_MAX_DECOMPRESSED_BYTES = "storm.compression.gzip.max.decompressed.bytes";
/**
* Zstandard compression level.
* Supported range: 1 to 19. Default: 3.
* <b>Prohibited:</b> Levels 20-22 (Ultra mode) are not allowed as they
* require dramatically more working memory per call.
*/
@CustomValidator(validatorClass = ConfigValidation.ZstdLevelValidator.class)
public static final String STORM_COMPRESSION_ZSTD_LEVEL = "storm.compression.zstd.level";
/**
* Zstandard max decompression bytes. Defaults to 104857600 (100MB).
*/
@IsPositiveNumber(includeZero = false)
public static final String STORM_COMPRESSION_ZSTD_MAX_DECOMPRESSED_BYTES = "storm.compression.zstd.max.decompressed.bytes";
/**
* Configure the topology metrics reporters to be used on workers.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
Expand All @@ -20,15 +20,22 @@
import java.util.Map;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.storm.Config;
import org.apache.storm.shade.org.apache.commons.io.input.BoundedInputStream;
import org.apache.storm.utils.ObjectReader;

/**
* Note, this assumes it's deserializing a gzip byte stream, and will err if it encounters any other serialization.
*/
public class GzipSerializationDelegate implements SerializationDelegate {

private static final int DEFAULT_MAX_DECOMPRESSED_BYTES = 100 * 1024 * 1024;
private int maxDecompressedBytes;

@Override
public void prepare(Map<String, Object> topoConf) {
// No-op
this.maxDecompressedBytes = ObjectReader.getInt(topoConf.getOrDefault(Config.STORM_COMPRESSION_GZIP_MAX_DECOMPRESSED_BYTES,
DEFAULT_MAX_DECOMPRESSED_BYTES));
}

@Override
Expand All @@ -47,17 +54,21 @@ public byte[] serialize(Object object) {

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
try {
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
GZIPInputStream gis = new GZIPInputStream(bis);
ObjectInputStream ois = new ObjectInputStream(gis);
try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
GZIPInputStream gis = new GZIPInputStream(bis);
BoundedInputStream lis = BoundedInputStream.builder()
.setMaxCount(this.maxDecompressedBytes)
.setInputStream(gis)
.setPropagateClose(true)
.get();
ObjectInputStream ois = new ObjectInputStream(lis)) {
Object ret = ois.readObject();
ois.close();
return (T) ret;
} catch (IOException ioe) {
throw new RuntimeException(ioe);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
if (gis.read() != -1) {
throw new IOException("Decompression threshold exceeded! Possible security risk or invalid data size.");
}
return clazz.cast(ret);
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("Deserialization failed: " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,32 @@
package org.apache.storm.serialization;

import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.thrift.TBase;
import org.apache.storm.thrift.TDeserializer;
import org.apache.storm.thrift.TException;
import org.apache.storm.thrift.TSerializer;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Utils;

/**
* Note, this assumes it's deserializing a gzip byte stream, and will err if it encounters any other serialization.
*/
public class GzipThriftSerializationDelegate implements SerializationDelegate {

private static final int DEFAULT_MAX_DECOMPRESSED_BYTES = 100 * 1024 * 1024;
private int maxDecompressedBytes;

@Override
public void prepare(Map<String, Object> topoConf) {
// No-op
this.maxDecompressedBytes = ObjectReader.getInt(topoConf.getOrDefault(Config.STORM_COMPRESSION_GZIP_MAX_DECOMPRESSED_BYTES,
DEFAULT_MAX_DECOMPRESSED_BYTES));
}

@Override
public byte[] serialize(Object object) {
try {
return Utils.gzip(new TSerializer().serialize((TBase) object));
return Utils.GzipUtils.compress(new TSerializer().serialize((TBase) object));
} catch (TException e) {
throw new RuntimeException(e);
}
Expand All @@ -48,7 +54,7 @@ public byte[] serialize(Object object) {
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
try {
TBase instance = (TBase) clazz.newInstance();
new TDeserializer().deserialize(instance, Utils.gunzip(bytes));
new TDeserializer().deserialize(instance, Utils.GzipUtils.decompress(bytes, this.maxDecompressedBytes));
return (T) instance;
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/

package org.apache.storm.serialization;

import java.util.Map;
import org.apache.storm.utils.Utils;

/**
* Always writes Zstd out, but tests incoming bytes to determine the format.
* If Zstd magic is found, it uses {@link ZstdThriftSerializationDelegate}.
* If not, it falls back to {@link ThriftSerializationDelegate} for raw Thrift.
*/
public class ZstdBridgeThriftSerializationDelegate implements SerializationDelegate {

private final GzipBridgeThriftSerializationDelegate defaultDelegate = new GzipBridgeThriftSerializationDelegate();
private final ZstdThriftSerializationDelegate zstdDelegate = new ZstdThriftSerializationDelegate();

@Override
public void prepare(Map<String, Object> topoConf) {
defaultDelegate.prepare(topoConf);
zstdDelegate.prepare(topoConf);
}

@Override
public byte[] serialize(Object object) {
// Always compress new data with Zstd
return zstdDelegate.serialize(object);
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
if (Utils.ZstdUtils.isZstd(bytes)) {
return zstdDelegate.deserialize(bytes, clazz);
} else {
// Fallback to ZstdBridgeThriftSerializationDelegate
// it delegates to the proper SerializationDelegate (GzipThriftSerializationDelegate or ThriftSerializationDelegate)
return defaultDelegate.deserialize(bytes, clazz);
}
}
}
Loading