Skip to content

Commit f7eb7ba

Browse files
committed
singledb fixes
1 parent 251e04e commit f7eb7ba

File tree

6 files changed

+188
-103
lines changed

6 files changed

+188
-103
lines changed

backup.go

+14-5
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,14 @@ type GCSBackupProviderOptions struct {
3838
// UseHostCredentials specifies whether to use the host's default credentials.
3939
UseHostCredentials bool
4040
ApplicationCredentialsJSON string
41-
BucketURL string
41+
// Bucket is the GCS bucket to use for backups. Should be of the form `gs://bucket-name`.
42+
Bucket string
43+
// BackupFormat specifies the format of the backup.
44+
// TODO :: implement backup format. Fixed to DuckDB for now.
45+
BackupFormat BackupFormat
46+
// UnqiueIdentifier is used to store backups in a unique location.
47+
// This must be set when multiple databases are writing to the same bucket.
48+
UniqueIdentifier string
4249
}
4350

4451
// NewGCSBackupProvider creates a new BackupProvider based on GCS.
@@ -48,17 +55,19 @@ func NewGCSBackupProvider(ctx context.Context, opts *GCSBackupProviderOptions) (
4855
return nil, err
4956
}
5057

51-
u, err := url.Parse(opts.BucketURL)
58+
u, err := url.Parse(opts.Bucket)
5259
if err != nil {
53-
return nil, fmt.Errorf("failed to parse bucket url %q, %w", opts.BucketURL, err)
60+
return nil, fmt.Errorf("failed to parse bucket url %q, %w", opts.Bucket, err)
5461
}
5562

5663
bucket, err := gcsblob.OpenBucket(ctx, client, u.Host, nil)
5764
if err != nil {
58-
return nil, fmt.Errorf("failed to open bucket %q, %w", opts.BucketURL, err)
65+
return nil, fmt.Errorf("failed to open bucket %q, %w", opts.Bucket, err)
5966
}
6067

61-
bucket = blob.PrefixedBucket(bucket, u.Path)
68+
if opts.UniqueIdentifier != "" {
69+
bucket = blob.PrefixedBucket(bucket, opts.UniqueIdentifier)
70+
}
6271
return &BackupProvider{
6372
bucket: bucket,
6473
}, nil

db.go

+53-32
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,13 @@ type DBOptions struct {
9090
// TODO :: revisit this logic
9191
func (d *DBOptions) ValidateSettings() error {
9292
read := &settings{}
93-
err := mapstructure.WeakDecode(d.ReadSettings, &read)
93+
err := mapstructure.Decode(d.ReadSettings, read)
9494
if err != nil {
9595
return fmt.Errorf("read settings: %w", err)
9696
}
9797

9898
write := &settings{}
99-
err = mapstructure.WeakDecode(d.WriteSettings, &write)
99+
err = mapstructure.Decode(d.WriteSettings, write)
100100
if err != nil {
101101
return fmt.Errorf("write settings: %w", err)
102102
}
@@ -146,7 +146,21 @@ func (d *DBOptions) ValidateSettings() error {
146146
write.MaxMemory = fmt.Sprintf("%d bytes", int64(bytes)/2)
147147
}
148148

149-
if read.Threads == 0 && write.Threads == 0 {
149+
var readThread, writeThread int
150+
if read.Threads != "" {
151+
readThread, err = strconv.Atoi(read.Threads)
152+
if err != nil {
153+
return fmt.Errorf("unable to parse read threads: %w", err)
154+
}
155+
}
156+
if write.Threads != "" {
157+
writeThread, err = strconv.Atoi(write.Threads)
158+
if err != nil {
159+
return fmt.Errorf("unable to parse write threads: %w", err)
160+
}
161+
}
162+
163+
if readThread == 0 && writeThread == 0 {
150164
connector, err := duckdb.NewConnector("", nil)
151165
if err != nil {
152166
return fmt.Errorf("unable to create duckdb connector: %w", err)
@@ -162,31 +176,31 @@ func (d *DBOptions) ValidateSettings() error {
162176
return fmt.Errorf("unable to get threads: %w", err)
163177
}
164178

165-
read.Threads = threads / 2
166-
write.Threads = threads / 2
179+
read.Threads = strconv.Itoa(threads / 2)
180+
write.Threads = strconv.Itoa(threads / 2)
167181
}
168182

169-
if read.Threads == 0 != (write.Threads == 0) {
183+
if readThread == 0 != (writeThread == 0) {
170184
// only one is defined
171185
var threads int
172-
if read.Threads != 0 {
173-
threads = read.Threads
186+
if readThread != 0 {
187+
threads = readThread
174188
} else {
175-
threads = write.Threads
189+
threads = writeThread
176190
}
177191

178-
read.Threads = threads / 2
179-
write.Threads = threads / 2
192+
read.Threads = strconv.Itoa(threads / 2)
193+
write.Threads = strconv.Itoa(threads / 2)
180194
}
181195

182-
err = mapstructure.Decode(read, &d.ReadSettings)
196+
err = mapstructure.WeakDecode(read, &d.ReadSettings)
183197
if err != nil {
184-
return fmt.Errorf("read settings: %w", err)
198+
return fmt.Errorf("failed to update read settings: %w", err)
185199
}
186200

187-
err = mapstructure.Decode(write, &d.WriteSettings)
201+
err = mapstructure.WeakDecode(write, &d.WriteSettings)
188202
if err != nil {
189-
return fmt.Errorf("write settings: %w", err)
203+
return fmt.Errorf("failed to update write settings: %w", err)
190204
}
191205
return nil
192206
}
@@ -212,6 +226,7 @@ type InsertTableOptions struct {
212226

213227
// NewDB creates a new DB instance.
214228
// This can be a slow operation if the backup is large.
229+
// dbIdentifier is a unique identifier for the database reported in metrics.
215230
func NewDB(ctx context.Context, dbIdentifier string, opts *DBOptions) (DB, error) {
216231
if dbIdentifier == "" {
217232
return nil, fmt.Errorf("db identifier cannot be empty")
@@ -225,13 +240,13 @@ func NewDB(ctx context.Context, dbIdentifier string, opts *DBOptions) (DB, error
225240
db := &db{
226241
dbIdentifier: dbIdentifier,
227242
opts: opts,
228-
readPath: filepath.Join(opts.LocalPath, dbIdentifier, "read"),
229-
writePath: filepath.Join(opts.LocalPath, dbIdentifier, "write"),
243+
readPath: filepath.Join(opts.LocalPath, "read"),
244+
writePath: filepath.Join(opts.LocalPath, "write"),
230245
writeDirty: true,
231246
logger: opts.Logger,
232247
}
233248
if opts.BackupProvider != nil {
234-
db.backup = blob.PrefixedBucket(opts.BackupProvider.bucket, dbIdentifier)
249+
db.backup = opts.BackupProvider.bucket
235250
}
236251
// create read and write paths
237252
err = os.MkdirAll(db.readPath, fs.ModePerm)
@@ -875,44 +890,43 @@ func (d *db) attachDBs(ctx context.Context, db *sqlx.DB, path string, read bool)
875890
if err != nil {
876891
return err
877892
}
893+
894+
var views []string
878895
for _, entry := range entries {
879896
if !entry.IsDir() {
880897
continue
881898
}
882899
version, exist, err := tableVersion(path, entry.Name())
883900
if err != nil {
884901
d.logger.Error("error in fetching db version", slog.String("table", entry.Name()), slog.Any("error", err))
885-
_ = os.RemoveAll(path)
902+
_ = os.RemoveAll(filepath.Join(path, entry.Name()))
886903
continue
887904
}
888905
if !exist {
889-
_ = os.RemoveAll(path)
906+
_ = os.RemoveAll(filepath.Join(path, entry.Name()))
890907
continue
891908
}
892-
path := filepath.Join(path, entry.Name(), version)
909+
versionPath := filepath.Join(path, entry.Name(), version)
893910

894911
// read meta file
895-
f, err := os.ReadFile(filepath.Join(path, "meta.json"))
912+
f, err := os.ReadFile(filepath.Join(versionPath, "meta.json"))
896913
if err != nil {
897-
_ = os.RemoveAll(path)
914+
_ = os.RemoveAll(versionPath)
898915
d.logger.Error("error in reading meta file", slog.String("table", entry.Name()), slog.Any("error", err))
899916
// May be keep it as a config to return error or continue ?
900917
continue
901918
}
902919
var meta meta
903920
err = json.Unmarshal(f, &meta)
904921
if err != nil {
905-
_ = os.RemoveAll(path)
922+
_ = os.RemoveAll(versionPath)
906923
d.logger.Error("error in unmarshalling meta file", slog.String("table", entry.Name()), slog.Any("error", err))
907924
continue
908925
}
909926

910927
if meta.ViewSQL != "" {
911928
// table is a view
912-
_, err := db.ExecContext(ctx, fmt.Sprintf("CREATE OR REPLACE VIEW %s AS %s", safeSQLName(entry.Name()), meta.ViewSQL))
913-
if err != nil {
914-
return err
915-
}
929+
views = append(views, fmt.Sprintf("CREATE OR REPLACE VIEW %s AS (%s\n)", safeSQLName(entry.Name()), meta.ViewSQL))
916930
continue
917931
}
918932
switch BackupFormat(meta.Format) {
@@ -922,10 +936,10 @@ func (d *db) attachDBs(ctx context.Context, db *sqlx.DB, path string, read bool)
922936
if read {
923937
readMode = " (READ_ONLY)"
924938
}
925-
_, err := db.ExecContext(ctx, fmt.Sprintf("ATTACH %s AS %s %s", safeSQLString(filepath.Join(path, "data.db")), safeSQLName(dbName), readMode))
939+
_, err := db.ExecContext(ctx, fmt.Sprintf("ATTACH %s AS %s %s", safeSQLString(filepath.Join(versionPath, "data.db")), safeSQLName(dbName), readMode))
926940
if err != nil {
927941
d.logger.Error("error in attaching db", slog.String("table", entry.Name()), slog.Any("error", err))
928-
_ = os.RemoveAll(filepath.Join(path))
942+
_ = os.RemoveAll(versionPath)
929943
continue
930944
}
931945

@@ -944,6 +958,13 @@ func (d *db) attachDBs(ctx context.Context, db *sqlx.DB, path string, read bool)
944958
return fmt.Errorf("unknown backup format %q", meta.Format)
945959
}
946960
}
961+
// create views after attaching all the DBs since views can depend on other tables
962+
for _, view := range views {
963+
_, err := db.ExecContext(ctx, view)
964+
if err != nil {
965+
return err
966+
}
967+
}
947968
return nil
948969
}
949970

@@ -1091,12 +1112,12 @@ func retry(maxRetries int, delay time.Duration, fn func() error) error {
10911112
}
10921113

10931114
func dbName(name string) string {
1094-
return safeSQLName(fmt.Sprintf("%s__data__db", name))
1115+
return fmt.Sprintf("%s__data__db", name)
10951116
}
10961117

10971118
type settings struct {
10981119
MaxMemory string `mapstructure:"max_memory"`
1099-
Threads int `mapstructure:"threads"`
1120+
Threads string `mapstructure:"threads"`
11001121
// Can be more settings
11011122
}
11021123

go.mod

+3-3
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ module github.com/rilldata/duckdb-replicator
33
go 1.23.2
44

55
require (
6-
github.com/XSAM/otelsql v0.34.0
6+
github.com/XSAM/otelsql v0.27.0
77
github.com/google/uuid v1.6.0
8-
github.com/jmoiron/sqlx v1.4.0
8+
github.com/jmoiron/sqlx v1.3.5
99
github.com/marcboeker/go-duckdb v1.8.2
1010
github.com/mitchellh/mapstructure v1.5.0
1111
github.com/stretchr/testify v1.9.0
1212
go.opentelemetry.io/otel v1.30.0
13-
gocloud.dev v0.40.0
13+
gocloud.dev v0.36.0
1414
golang.org/x/oauth2 v0.22.0
1515
golang.org/x/sync v0.8.0
1616
)

0 commit comments

Comments
 (0)