-
Notifications
You must be signed in to change notification settings - Fork 76
Feature: Handle read only columns #437
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 6 commits
023e0b5
e9fd3c1
696608c
85be83c
c0b7b00
89ee08f
2717bfd
bbd5c31
0167a0f
53d68b0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,3 +18,4 @@ group :development do | |
| end | ||
|
|
||
| gem "mutex_m", "~> 0.3.0" | ||
| gem "logger" | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,11 +14,21 @@ type RowBatch struct { | |
| } | ||
|
|
||
| func NewRowBatch(table *TableSchema, values []RowData, paginationKeyIndex int) *RowBatch { | ||
| return NewRowBatchWithColumns(table, values, ConvertTableColumnsToStrings(table.Columns), paginationKeyIndex) | ||
| } | ||
|
|
||
| // NewRowBatchWithColumns creates a RowBatch with an explicit ordered list of | ||
| // selected column names. Use this when the query that produced the row data | ||
| // returns columns in a different order from the schema — for example, the | ||
| // sharding copy filter issues SELECT * … JOIN … USING(id) which moves 'id' | ||
| // to the front of the result set. The selectedColumns slice must match the | ||
| // order and count of values in each RowData entry. | ||
| func NewRowBatchWithColumns(table *TableSchema, values []RowData, selectedColumns []string, paginationKeyIndex int) *RowBatch { | ||
| return &RowBatch{ | ||
| values: values, | ||
| paginationKeyIndex: paginationKeyIndex, | ||
| table: table, | ||
| columns: ConvertTableColumnsToStrings(table.Columns), | ||
| columns: selectedColumns, | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -64,23 +74,42 @@ func (e *RowBatch) AsSQLQuery(schemaName, tableName string) (string, []interface | |
| return "", nil, err | ||
| } | ||
|
|
||
| valuesStr := "(" + strings.Repeat("?,", len(e.columns)-1) + "?)" | ||
| // Build the INSERT column list from e.columns — the actual query-result | ||
| // order — skipping generated columns by name. | ||
| // | ||
| // We must NOT use table.NonGeneratedColumnNames() here because that | ||
| // always returns schema order. When the SELECT query returns columns in a | ||
| // different order (for example, the sharding copy filter uses | ||
| // SELECT * FROM t JOIN (SELECT id …) AS batch USING(id) | ||
| // which moves 'id' to the front), the column names and row values would | ||
| // be misaligned, corrupting every row written to the target. | ||
| insertColumns := make([]string, 0, len(e.columns)) | ||
| for _, col := range e.columns { | ||
| if e.table.IsColumnNameGenerated(col) { | ||
| continue | ||
| } | ||
| insertColumns = append(insertColumns, col) | ||
| } | ||
|
|
||
| valuesStr := "(" + strings.Repeat("?,", len(insertColumns)-1) + "?)" | ||
| valuesStr = strings.Repeat(valuesStr+",", len(e.values)-1) + valuesStr | ||
|
|
||
| query := "INSERT IGNORE INTO " + | ||
| QuotedTableNameFromString(schemaName, tableName) + | ||
| " (" + strings.Join(QuoteFields(e.columns), ",") + ") VALUES " + valuesStr | ||
| " (" + strings.Join(QuoteFields(insertColumns), ",") + ") VALUES " + valuesStr | ||
|
|
||
| return query, e.flattenRowData(), nil | ||
| } | ||
|
|
||
| func (e *RowBatch) flattenRowData() []interface{} { | ||
| rowSize := len(e.values[0]) | ||
| flattened := make([]interface{}, rowSize*len(e.values)) | ||
|
|
||
| for rowIdx, row := range e.values { | ||
| for colIdx, col := range row { | ||
| flattened[rowIdx*rowSize+colIdx] = col | ||
| flattened := make([]interface{}, 0, len(e.values)*len(e.columns)) | ||
|
|
||
| for _, row := range e.values { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is now going through all rows, all columns, then all columns again. #rows * #cols^2. Or am I reading it wrong?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Asking because this will add up when you have 86 columns in a large table, and we're doing string comparisons. no? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The original change followed Didn't seem to have any relevant performance hit on my test setup, but if this is a consideration, we could use a hashmap instead. Note it'd have to be lazily-initialized, as
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. bbd5c31 adds a new field to cache non-generated columns within a |
||
| for colIdx, col := range e.columns { | ||
| if e.table.IsColumnNameGenerated(col) { | ||
| continue | ||
| } | ||
| flattened = append(flattened, row[colIdx]) | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't put generated columns in the WHERE...