This issue is a natural follow-up to #8652 .
I would like to address the scenarios where compressing serialized tuples actually provides a performance benefit and introduce a way to control it.
To frame the discussion, I should look before at where serialization/compression should not happen to avoid efficiency drops:
- Intra-worker traffic (Local): Tuples sent between bolts/spouts on the same worker process bypass serialization via KryoTupleSerializer to maximize performance. Naturally, these should never be compressed.
- Inter-worker traffic (Remote) with small payloads: For tuples exchanged between remote workers that carry tiny payloads (e.g., single words or IDs), compression overhead outweighs the benefits and can even increase the payload size due to metadata headers.
The proposed use case
The only scenario where compressing a serialized tuple makes sense is during inter-worker communication where the developer expects very large tuple sizes.
A perfect example within the codebase is examples/FileReadWordCountTopo. In this topology, the FileReadSpout emits entire lines/sentences of text to the SplitSentenceBolt. If these two components end up on different remote workers, compressing the serialized tuples on this specific stream would drastically reduce network I/O.
I propose introducing an optional configuration property, disabled by default, that allows developers to explicitly declare whether a specific component should enable tuple compression prior to network transfer.
The compression logic must strictly mirror the serialization lifecycle: it should automatically be skipped for intra-worker (local) traffic, regardless of this setting, to prevent unnecessary CPU cycles.
Suppose a new configuration key: "topology.tuple.compression.enable", the topology definition for FileReadWordCountTopo could be:
...
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUT_ID, new FileReadSpout(inputFile), spoutNum)
.addConfiguration("topology.tuple.compression.enable", true);
builder.setBolt(SPLIT_ID, new SplitSentenceBolt(), spBoltNum)
.localOrShuffleGrouping(SPOUT_ID);
builder.setBolt(COUNT_ID, new CountBolt(), cntBoltNum)
.fieldsGrouping(SPLIT_ID, new Fields(SplitSentenceBolt.FIELDS));
...
This issue is a natural follow-up to #8652 .
I would like to address the scenarios where compressing serialized tuples actually provides a performance benefit and introduce a way to control it.
To frame the discussion, I should look before at where serialization/compression should not happen to avoid efficiency drops:
The proposed use case
The only scenario where compressing a serialized tuple makes sense is during inter-worker communication where the developer expects very large tuple sizes.
A perfect example within the codebase is
examples/FileReadWordCountTopo.In this topology, the FileReadSpout emits entire lines/sentences of text to the SplitSentenceBolt. If these two components end up on different remote workers, compressing the serialized tuples on this specific stream would drastically reduce network I/O.I propose introducing an optional configuration property, disabled by default, that allows developers to explicitly declare whether a specific component should enable tuple compression prior to network transfer.
The compression logic must strictly mirror the serialization lifecycle: it should automatically be skipped for intra-worker (local) traffic, regardless of this setting, to prevent unnecessary CPU cycles.
Suppose a new configuration key: "topology.tuple.compression.enable", the topology definition for
FileReadWordCountTopocould be: