-
Notifications
You must be signed in to change notification settings - Fork 214
Add initial extstore types #2900
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
d087573
3d3a5b0
0328e77
660ac6b
2109261
7eae7a1
96ec5a6
1ee8e92
b4e7cc6
594450a
f434a51
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,95 @@ | ||
| package io.temporal.payload.storage; | ||
|
|
||
| import com.google.common.base.Preconditions; | ||
| import io.temporal.common.Experimental; | ||
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Objects; | ||
| import java.util.Set; | ||
| import javax.annotation.Nonnull; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| /** Configuration for offloading large payloads to external storage. */ | ||
| @Experimental | ||
| public final class ExternalStorage { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rename to |
||
| private static final int DEFAULT_PAYLOAD_SIZE_THRESHOLD = 256 * 1024; | ||
|
cconstable marked this conversation as resolved.
Outdated
|
||
|
|
||
| public static Builder newBuilder() { | ||
| return new Builder(); | ||
| } | ||
|
|
||
| private final @Nonnull List<StorageDriver> drivers; | ||
| private final @Nullable StorageDriverSelector driverSelector; | ||
| private final @Nullable Integer payloadSizeThreshold; | ||
|
|
||
| private ExternalStorage( | ||
| @Nonnull List<StorageDriver> drivers, | ||
| @Nullable StorageDriverSelector driverSelector, | ||
| @Nullable Integer payloadSizeThreshold) { | ||
| this.drivers = Collections.unmodifiableList(new ArrayList<>(drivers)); | ||
| this.driverSelector = driverSelector; | ||
| this.payloadSizeThreshold = payloadSizeThreshold; | ||
| } | ||
|
|
||
| @Nonnull | ||
| public List<StorageDriver> getDrivers() { | ||
| return drivers; | ||
| } | ||
|
|
||
| @Nullable | ||
| public StorageDriverSelector getDriverSelector() { | ||
| return driverSelector; | ||
| } | ||
|
|
||
| /** | ||
| * Minimum payload size in bytes before external storage is considered; {@code null} stores all. | ||
| */ | ||
| @Nullable | ||
| public Integer getPayloadSizeThreshold() { | ||
| return payloadSizeThreshold; | ||
| } | ||
|
|
||
| public static final class Builder { | ||
| private List<StorageDriver> drivers = Collections.emptyList(); | ||
| private StorageDriverSelector driverSelector; | ||
| private Integer payloadSizeThreshold = DEFAULT_PAYLOAD_SIZE_THRESHOLD; | ||
|
|
||
| private Builder() {} | ||
|
|
||
| /** At least one driver is required. When more than one is set, a selector is also required. */ | ||
| public Builder setDrivers(@Nonnull List<StorageDriver> drivers) { | ||
|
cconstable marked this conversation as resolved.
|
||
| this.drivers = Objects.requireNonNull(drivers, "drivers"); | ||
| return this; | ||
| } | ||
|
|
||
| public Builder setDriverSelector(@Nullable StorageDriverSelector driverSelector) { | ||
| this.driverSelector = driverSelector; | ||
| return this; | ||
| } | ||
|
|
||
| /** Defaults to 256 KiB. Set to {@code null} to store all payloads. */ | ||
| public Builder setPayloadSizeThreshold(@Nullable Integer payloadSizeThreshold) { | ||
| this.payloadSizeThreshold = payloadSizeThreshold; | ||
| return this; | ||
| } | ||
|
|
||
| public ExternalStorage build() { | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right now, these all throw state exceptions (which are tested). In a subsequent PR, I would like to replace these with stable diagnostic codes.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For these in particular, do we need diagnostic codes? The messages are fairly clear as to what the issue is and prevent the use of external storage at startup rather than having to report during execution runtime.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am a proponent of best-effort, stable diagnostic codes if the mechanism for adding them very easy. I have a little write-up about this that I wanted to share but the tl;dr is I think we should make a best effort to tag anything that's actionable if for nothing else it supports automated tooling (e.g. AI) and feels like a healthy industry trend (hand wavy gesture). I would not have felt this way 4 years ago. |
||
| Preconditions.checkArgument(!drivers.isEmpty(), "At least one driver must be provided"); | ||
| Preconditions.checkArgument( | ||
| payloadSizeThreshold == null || payloadSizeThreshold >= 0, | ||
| "payloadSizeThreshold must be greater than or equal to zero"); | ||
| Set<String> names = new HashSet<>(); | ||
| for (StorageDriver driver : drivers) { | ||
| String name = driver.getName(); | ||
| Preconditions.checkArgument( | ||
| names.add(name), "Multiple drivers registered with name '%s'", name); | ||
| } | ||
| Preconditions.checkArgument( | ||
| drivers.size() == 1 || driverSelector != null, | ||
| "driverSelector must be specified when more than one driver is registered"); | ||
| return new ExternalStorage(drivers, driverSelector, payloadSizeThreshold); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| package io.temporal.payload.storage; | ||
|
|
||
| import io.temporal.api.common.v1.Payload; | ||
| import io.temporal.common.Experimental; | ||
| import java.util.List; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import javax.annotation.Nonnull; | ||
|
|
||
| /** Stores and retrieves payloads in an external storage system. */ | ||
| @Experimental | ||
| public interface StorageDriver { | ||
| /** | ||
| * Name of this driver instance, unique among the drivers registered in a single {@link | ||
| * ExternalStorage}. Used as the routing key recorded in a stored payload's reference and resolved | ||
| * back to this driver on retrieval. | ||
| */ | ||
| @Nonnull | ||
| String getName(); | ||
|
|
||
| /** | ||
| * Stable, implementation-level identifier for this driver, the same across all instances of the | ||
| * driver class and ideally across SDKs (e.g. {@code "aws.s3driver"}). Used for metrics and worker | ||
| * heartbeat reporting. | ||
| */ | ||
| @Nonnull | ||
| default String getType() { | ||
| return getClass().getName(); | ||
| } | ||
|
|
||
| /** | ||
| * Stores {@code payloads} and returns one {@link StorageDriverClaim} per payload, in the same | ||
| * order. The returned list must be the same length as {@code payloads}. | ||
| */ | ||
| @Nonnull | ||
| CompletableFuture<List<StorageDriverClaim>> store( | ||
| @Nonnull StorageDriverStoreContext context, @Nonnull List<Payload> payloads); | ||
|
|
||
| /** | ||
| * Retrieves the payloads identified by {@code claims} and returns one {@link Payload} per claim, | ||
| * in the same order. The returned list must be the same length as {@code claims}. | ||
| */ | ||
| @Nonnull | ||
| CompletableFuture<List<Payload>> retrieve( | ||
| @Nonnull StorageDriverRetrieveContext context, @Nonnull List<StorageDriverClaim> claims); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| package io.temporal.payload.storage; | ||
|
|
||
| import io.temporal.common.Experimental; | ||
| import java.util.Objects; | ||
| import javax.annotation.Nonnull; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| @Experimental | ||
| public final class StorageDriverActivityInfo implements StorageDriverTargetInfo { | ||
| private final @Nonnull String namespace; | ||
| private final @Nullable String id; | ||
| private final @Nullable String runId; | ||
| private final @Nullable String type; | ||
|
|
||
| public StorageDriverActivityInfo( | ||
| @Nonnull String namespace, | ||
| @Nullable String id, | ||
| @Nullable String runId, | ||
| @Nullable String type) { | ||
| this.namespace = Objects.requireNonNull(namespace, "namespace"); | ||
| this.id = id; | ||
| this.runId = runId; | ||
| this.type = type; | ||
| } | ||
|
|
||
| @Nonnull | ||
| public String getNamespace() { | ||
| return namespace; | ||
| } | ||
|
|
||
| @Nullable | ||
| public String getId() { | ||
| return id; | ||
| } | ||
|
|
||
| @Nullable | ||
| public String getRunId() { | ||
| return runId; | ||
| } | ||
|
|
||
| @Nullable | ||
| public String getType() { | ||
| return type; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| package io.temporal.payload.storage; | ||
|
|
||
| import io.temporal.common.Experimental; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import javax.annotation.Nonnull; | ||
|
|
||
| /** | ||
| * Driver-defined reference to an externally stored payload, used to retrieve it later. | ||
| * | ||
| * @see StorageDriver | ||
| */ | ||
| @Experimental | ||
| public final class StorageDriverClaim { | ||
| private final @Nonnull Map<String, String> claimData; | ||
|
|
||
| public StorageDriverClaim(@Nonnull Map<String, String> claimData) { | ||
| this.claimData = | ||
| Collections.unmodifiableMap(new HashMap<>(Objects.requireNonNull(claimData, "claimData"))); | ||
| } | ||
|
|
||
| @Nonnull | ||
| public Map<String, String> getClaimData() { | ||
| return claimData; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| package io.temporal.payload.storage; | ||
|
|
||
| import io.temporal.common.Experimental; | ||
|
|
||
| /** Context passed to {@link StorageDriver#retrieve}. */ | ||
| @Experimental | ||
| public final class StorageDriverRetrieveContext {} | ||
|
cconstable marked this conversation as resolved.
Outdated
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| package io.temporal.payload.storage; | ||
|
|
||
| import io.temporal.api.common.v1.Payload; | ||
| import io.temporal.common.Experimental; | ||
| import javax.annotation.Nonnull; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| /** Chooses which {@link StorageDriver} stores a given payload. */ | ||
| @Experimental | ||
| @FunctionalInterface | ||
| public interface StorageDriverSelector { | ||
| /** | ||
| * Returns the driver to store {@code payload}, which must be one of the drivers registered in the | ||
| * {@link ExternalStorage}, or {@code null} to leave the payload stored inline. | ||
| */ | ||
| @Nullable | ||
| StorageDriver selectDriver(@Nonnull StorageDriverStoreContext context, @Nonnull Payload payload); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| package io.temporal.payload.storage; | ||
|
|
||
| import io.temporal.common.Experimental; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| /** Context passed to {@link StorageDriver#store} and {@link StorageDriverSelector}. */ | ||
| @Experimental | ||
| public final class StorageDriverStoreContext { | ||
| private final @Nullable StorageDriverTargetInfo target; | ||
|
|
||
| public StorageDriverStoreContext(@Nullable StorageDriverTargetInfo target) { | ||
| this.target = target; | ||
| } | ||
|
|
||
| @Nullable | ||
| public StorageDriverTargetInfo getTarget() { | ||
| return target; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| package io.temporal.payload.storage; | ||
|
|
||
| import io.temporal.common.Experimental; | ||
|
|
||
| /** | ||
| * Identity of the workflow or activity a payload is being stored on behalf of. Provided on a | ||
| * best-effort basis on the storing side only; some fields may be absent. Implemented by {@link | ||
| * StorageDriverWorkflowInfo} and {@link StorageDriverActivityInfo}. | ||
| */ | ||
| @Experimental | ||
| public interface StorageDriverTargetInfo {} |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| package io.temporal.payload.storage; | ||
|
|
||
| import io.temporal.common.Experimental; | ||
| import java.util.Objects; | ||
| import javax.annotation.Nonnull; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| @Experimental | ||
| public final class StorageDriverWorkflowInfo implements StorageDriverTargetInfo { | ||
| private final @Nonnull String namespace; | ||
| private final @Nullable String id; | ||
| private final @Nullable String runId; | ||
| private final @Nullable String type; | ||
|
|
||
| public StorageDriverWorkflowInfo( | ||
| @Nonnull String namespace, | ||
| @Nullable String id, | ||
| @Nullable String runId, | ||
| @Nullable String type) { | ||
| this.namespace = Objects.requireNonNull(namespace, "namespace"); | ||
| this.id = id; | ||
| this.runId = runId; | ||
| this.type = type; | ||
| } | ||
|
|
||
| @Nonnull | ||
| public String getNamespace() { | ||
| return namespace; | ||
| } | ||
|
|
||
| @Nullable | ||
| public String getId() { | ||
| return id; | ||
| } | ||
|
|
||
| @Nullable | ||
| public String getRunId() { | ||
| return runId; | ||
| } | ||
|
|
||
| @Nullable | ||
| public String getType() { | ||
| return type; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the package be located here? or should we consider something like
temporal.common.extstore? Here intemporal.payloadit's next to the codec which seems like the right place.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current
io.temporal.payload.storagemakes sense.