Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.linkedin.hoptimator;

import java.util.List;
import java.util.Map;


/** Represents a CREATE JOB request for deploying a SqlJob to Kubernetes. */
public class SqlJobDeployable implements Deployable {
Copy link
Copy Markdown
Collaborator

@jogrogan jogrogan May 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More so naming wise but does feel awkward that we now have SqlJobDeployable and Job (which deploys SqlJobs via templates) implementing the same interface but do semi-different things - not a deal breaker


private final String name;
private final String dialect;
private final String executionMode;
private final List<String> sql;
private final Map<String, String> options;

public SqlJobDeployable(String name, String dialect, String executionMode,
List<String> sql, Map<String, String> options) {
this.name = name;
this.dialect = dialect;
this.executionMode = executionMode;
this.sql = sql;
this.options = options;
}

public String name() {
return name;
}

/** Dialect, e.g. "Flink", "FlinkBeam". May be null for default. */
public String dialect() {
return dialect;
}

/** Execution mode, e.g. "Streaming", "Batch". May be null for default. */
public String executionMode() {
return executionMode;
}

public List<String> sql() {
return sql;
}

public Map<String, String> options() {
return options;
}

@Override
public String toString() {
return "SqlJob[" + name + "]";
}
}
10 changes: 10 additions & 0 deletions hoptimator-jdbc/src/main/codegen/config.fmpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ data: {
"org.apache.calcite.sql.ddl.SqlDdlNodes"
"com.linkedin.hoptimator.jdbc.ddl.SqlCreateDatabase"
"com.linkedin.hoptimator.jdbc.ddl.SqlCreateFunction"
"com.linkedin.hoptimator.jdbc.ddl.SqlCreateJob"
"com.linkedin.hoptimator.jdbc.ddl.SqlCreateMaterializedView"
"com.linkedin.hoptimator.jdbc.ddl.SqlCreateTable"
"com.linkedin.hoptimator.jdbc.ddl.SqlCreateTrigger"
Expand Down Expand Up @@ -67,6 +68,10 @@ data: {
"REFRESH"
"PAUSE"
"RESUME"
"JOB"
"FLINK"
"STREAMING"
"BATCH"
]

# List of non-reserved keywords to add;
Expand All @@ -86,12 +91,17 @@ data: {
"REFRESH"
"PAUSE"
"RESUME"
"JOB"
"FLINK"
"STREAMING"
"BATCH"
]

# List of methods for parsing extensions to "CREATE [OR REPLACE]" calls.
# Each must accept arguments "(SqlParserPos pos, boolean replace)".
# Example: "SqlCreateForeignSchema".
createStatementParserMethods: [
"SqlCreateJob"
"SqlCreateDatabase"
"SqlCreateMaterializedView"
"SqlCreateTrigger"
Expand Down
29 changes: 29 additions & 0 deletions hoptimator-jdbc/src/main/codegen/includes/parserImpls.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,35 @@ SqlCreate SqlCreateTrigger(Span s, boolean replace) :
}
}

SqlCreate SqlCreateJob(Span s, boolean replace) :
{
final boolean ifNotExists;
final SqlIdentifier id;
final SqlNode sqlBody;
String dialect = null;
String executionMode = null;
SqlNodeList optionList = null;
}
{
[ <FLINK> { dialect = "Flink"; } ]
(
<STREAMING> { executionMode = "Streaming"; }
|
<BATCH> { executionMode = "Batch"; }
|
{ }
)
<JOB> ifNotExists = IfNotExistsOpt()
id = CompoundIdentifier()
<AS>
sqlBody = StringLiteral()
[ optionList = Options() ]
{
return new SqlCreateJob(s.end(this), replace, ifNotExists, id, sqlBody,
dialect, executionMode, optionList);
}
}

SqlCreate SqlCreateDatabase(Span s, boolean replace) :
{
final boolean ifNotExists;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.linkedin.hoptimator.View;
import com.linkedin.hoptimator.jdbc.ddl.HoptimatorDdlParserImpl;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateDatabase;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateJob;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateMaterializedView;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateTable;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateTrigger;
Expand Down Expand Up @@ -282,6 +283,19 @@ public void execute(SqlCreateDatabase create, CalcitePrepare.Context context) {
logger.info("CREATE DATABASE {} completed", create.name);
}

/** Executes a {@code CREATE JOB} command. */
public void execute(SqlCreateJob create, CalcitePrepare.Context context) {
HoptimatorDdlUtils.DdlMode mode = create.getReplace()
? HoptimatorDdlUtils.DdlMode.UPDATE : HoptimatorDdlUtils.DdlMode.CREATE;
try {
HoptimatorDdlUtils.processCreateJob(connection, create, mode);
} catch (SQLException | RuntimeException e) {
logger.info("Failed to deploy job {}", create.name);
throw new DdlException(create, e.getMessage(), e);
}
logger.info("CREATE JOB {} completed", create.name);
}

/** Executes a {@code PAUSE TRIGGER} command. */
public void execute(SqlPauseTrigger pause, CalcitePrepare.Context context) {
updateTriggerPausedState(pause, pause.name, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import com.linkedin.hoptimator.DatabaseDeployable;
import com.linkedin.hoptimator.Deployer;
import com.linkedin.hoptimator.MaterializedView;
import com.linkedin.hoptimator.SqlJobDeployable;
import com.linkedin.hoptimator.Pipeline;
import com.linkedin.hoptimator.Source;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateDatabase;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateJob;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateMaterializedView;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateTable;
import com.linkedin.hoptimator.util.DeploymentService;
Expand Down Expand Up @@ -672,6 +674,69 @@ static SpecifyResult processCreateDatabase(HoptimatorConnection conn,
}
}

/**
* Shared implementation of the {@code CREATE JOB} pipeline for both real deployment
* and dry-run (SPECIFY) modes.
*
* @param conn the JDBC connection
* @param create the parsed DDL node
* @param mode whether to CREATE, UPDATE, or SPECIFY
* @return a SpecifyResult (specs are empty for CREATE/UPDATE, YAML for SPECIFY)
* @throws SQLException on validation or deployment errors
*/
static SpecifyResult processCreateJob(HoptimatorConnection conn,
SqlCreateJob create, DdlMode mode) throws SQLException {
HoptimatorConnection.HoptimatorConnectionDualLogger logger = conn.getLogger(HoptimatorDdlUtils.class);

logger.info("Validating statement: {}", create);
ValidationService.validateOrThrow(create);

if (create.name.names.size() > 1) {
throw new SQLException("Job names cannot be compound identifiers.");
}
String name = create.name.names.get(0);
String sqlBody = ((SqlLiteral) create.sqlBody).getValueAs(String.class);

// Split SQL body on semicolons into individual statements
List<String> sqlStatements = new ArrayList<>();
for (String stmt : sqlBody.split(";")) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guess this split(";") may create edge-case conditions for any ';' character inside a Flink connector option value. noticed this since we saw something similar here, https://github.com/linkedin/Hoptimator/pull/199/changes

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call will fix

String trimmed = stmt.trim();
if (!trimmed.isEmpty()) {
sqlStatements.add(trimmed);
}
}
if (sqlStatements.isEmpty()) {
throw new SQLException("Job " + name + " has no SQL statements.");
}

Map<String, String> jobOptions = options(create.options);
SqlJobDeployable job = new SqlJobDeployable(name, create.dialect, create.executionMode,
sqlStatements, jobOptions);

Collection<Deployer> deployers = null;
try {
logger.info("Validating job {}", name);
ValidationService.validateOrThrow(job);
deployers = DeploymentService.deployers(job, conn);
ValidationService.validateOrThrow(deployers);

List<String> specs = mode.executeDeployers(deployers, conn);
if (mode.mutable()) {
logger.info("Deployed job {}", name);
} else {
DeploymentService.restore(deployers);
}
return new SpecifyResult(specs, null, Collections.singletonList(name));
} catch (SQLException | RuntimeException e) {
logger.info("Failed to deploy job {}", name);
if (deployers != null) {
DeploymentService.restore(deployers);
logger.info("Restored deployable resources for job {}", name);
}
throw e;
}
}

/**
* Returns the YAML specs that would be created for any supported SQL statement —
* {@code CREATE TABLE}, {@code CREATE MATERIALIZED VIEW}, or {@code INSERT INTO}.
Expand All @@ -698,6 +763,10 @@ public static SpecifyResult specifyFromSql(String sql, HoptimatorConnection conn
return processCreateDatabase(conn, (SqlCreateDatabase) sqlNode, DdlMode.SPECIFY);
}

if (sqlNode instanceof SqlCreateJob) {
return processCreateJob(conn, (SqlCreateJob) sqlNode, DdlMode.SPECIFY);
}

if (sqlNode instanceof SqlCreateTable) {
return processCreateTable(conn.createPrepareContext(), conn, (SqlCreateTable) sqlNode, DdlMode.SPECIFY);
}
Expand Down
Loading
Loading