Skip to content

Add automatic volume creation for local input source #4961

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 2, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/util/opts/storage_specconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

"github.com/bacalhau-project/bacalhau/pkg/models"
storage_ipfs "github.com/bacalhau-project/bacalhau/pkg/storage/ipfs"
storage_local "github.com/bacalhau-project/bacalhau/pkg/storage/local_directory"
storage_local "github.com/bacalhau-project/bacalhau/pkg/storage/local"
storage_s3 "github.com/bacalhau-project/bacalhau/pkg/storage/s3"
storage_url "github.com/bacalhau-project/bacalhau/pkg/storage/url/urldownload"
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/bidstrategy/semantic/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func getBidStrategyRequestWithInput(t testing.TB) bidstrategy.BidStrategyRequest
request := getBidStrategyRequest(t)
request.Job.Task().InputSources = []*models.InputSource{
{
Source: models.NewSpecConfig(models.StorageSourceLocalDirectory).WithParam("SourcePath", "/dummy/path"),
Source: models.NewSpecConfig(models.StorageSourceLocal).WithParam("SourcePath", "/dummy/path"),
Target: "target",
},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/bidstrategy/semantic/input_locality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (s *InputLocalityStrategySuite) SetupSuite() {
statefulJob := mock.Job()
statefulJob.Task().InputSources = []*models.InputSource{
{
Source: models.NewSpecConfig(models.StorageSourceLocalDirectory).WithParam("SourcePath", "/dummy/path"),
Source: models.NewSpecConfig(models.StorageSourceLocal).WithParam("SourcePath", "/dummy/path"),
Target: "target",
},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/bidstrategy/semantic/storage_installed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
var (
OneStorageSpec = []*models.InputSource{
{
Source: models.NewSpecConfig(models.StorageSourceLocalDirectory).WithParam("SourcePath", "/dummy/path"),
Source: models.NewSpecConfig(models.StorageSourceLocal).WithParam("SourcePath", "/dummy/path"),
Target: "target",
},
}
Expand Down
15 changes: 9 additions & 6 deletions pkg/executor/util/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/storage"
"github.com/bacalhau-project/bacalhau/pkg/storage/inline"
ipfs_storage "github.com/bacalhau-project/bacalhau/pkg/storage/ipfs"
localdirectory "github.com/bacalhau-project/bacalhau/pkg/storage/local_directory"
local_storage "github.com/bacalhau-project/bacalhau/pkg/storage/local"
noop_storage "github.com/bacalhau-project/bacalhau/pkg/storage/noop"
"github.com/bacalhau-project/bacalhau/pkg/storage/s3"
"github.com/bacalhau-project/bacalhau/pkg/storage/tracing"
Expand Down Expand Up @@ -62,15 +62,18 @@ func NewStandardStorageProvider(cfg types.Bacalhau) (storage.StorageProvider, er
))
}

if cfg.InputSources.IsNotDisabled(models.StorageSourceLocalDirectory) {
var err error
providers[models.StorageSourceLocalDirectory], err = localdirectory.NewStorageProvider(
localdirectory.StorageProviderParams{
AllowedPaths: localdirectory.ParseAllowPaths(cfg.Compute.AllowListedLocalPaths),
if cfg.InputSources.IsNotDisabled(models.StorageSourceLocal) {
localStorage, err := local_storage.NewStorageProvider(
local_storage.StorageProviderParams{
AllowedPaths: local_storage.ParseAllowPaths(cfg.Compute.AllowListedLocalPaths),
})
if err != nil {
return nil, err
}

// Register under "local" and "localDirectory" names
providers[models.StorageSourceLocal] = localStorage
providers[models.StorageSourceLocalDirectory] = localStorage
}

if cfg.InputSources.IsNotDisabled(models.StorageSourceIPFS) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/models/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ const (
StorageSourceS3 = "s3"
StorageSourceS3PreSigned = "s3PreSigned"
StorageSourceInline = "inline"
StorageSourceLocalDirectory = "localDirectory"
StorageSourceLocalDirectory = "localDirectory" // Deprecated: use StorageSourceLocal instead
StorageSourceLocal = "local"
)

var StoragesNames = []string{
StorageSourceIPFS,
StorageSourceInline,
StorageSourceLocalDirectory,
StorageSourceLocal,
StorageSourceS3,
StorageSourceS3PreSigned,
StorageSourceURL,
Expand Down
81 changes: 81 additions & 0 deletions pkg/storage/local/create_strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package local

import (
"path/filepath"
"strings"

"github.com/bacalhau-project/bacalhau/pkg/bacerrors"
"github.com/rs/zerolog/log"
)

type CreateStrategy string

const (
// Try to infer the type (file vs directory) from path
Infer CreateStrategy = "infer"

// Create as directory
Dir CreateStrategy = "dir"

// Create as file
File CreateStrategy = "file"

// Don't create anything
NoCreate CreateStrategy = "nocreate"
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should this be noCreate or NoCreate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wanted to keep all lowercase for simplicity.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought I left a comment about supporting case insensitivity, but I might've missed that. Similar to all other specs, we should be case insensitive when parsing and decoding the job spec. Meaning NoCreate, noCreate, nocreate should all be treated the same.

StorageSourceS3PreSigned = "s3PreSigned"

For readability though when printing things and in our docs, nocreate is not really readable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I made the spec decoding case-insensitive and also made sure that whenever nocreate is shown to the user it looks like noCreate.

Hint: CreateAs must be one of [dir, file, noCreate]


const DefaultCreateStrategy = Infer
const CreateStrategySpecKey = "CreateAs"

func AllowedCreateStrategies() []string {
return []string{
Infer.String(),
Dir.String(),
File.String(),
NoCreate.String(),
}
}

func CreateStrategyFromString(s string) (CreateStrategy, error) {
switch s {
case Infer.String():
return Infer, nil
case Dir.String():
return Dir, nil
case File.String():
return File, nil
case NoCreate.String():
return NoCreate, nil
case "":
return DefaultCreateStrategy, nil
default:
// TODO: Create a constant for JobSpec to be used in WithComponent for this and similar errors
return "", bacerrors.Newf("invalid CreateAs value %s", s).
WithHint("CreateAs must be one of [%s]", strings.Join(AllowedCreateStrategies(), ", ")).
WithCode(bacerrors.ValidationError)
}
}

// Attempts to infer whether the given path represents a file or directory.
// This is a best-effort attempt based on common conventions.
func InferCreateStrategyFromPath(path string) CreateStrategy {
// Leverage filepath.Split, it handles edge cases like trailing slashes, no slashes, etc.
// For now this is smart enough, but we can improve it later if needed.
// Note that there are some noticeable exceptions for which this will return the wrong strategy,
// For example folders with a dot in the name (e.g. /etc/conf.d) and without a trailing slash
// will be considered files. However such paths are likely to be non-empty and CreateStrategy will not be called for them.
// Additinally, we can look at Target value to see if it gives more insight on whether we should create a file or directory.
_, file := filepath.Split(path)
var inferredStrategy CreateStrategy
if file == "" {
inferredStrategy = Dir
} else {
inferredStrategy = File
}
log.Debug().Str("path", path).Msgf("inferred create strategy: %s", inferredStrategy)
return inferredStrategy
}

func (c CreateStrategy) String() string {
return string(c)
}
128 changes: 128 additions & 0 deletions pkg/storage/local/create_strategy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package local

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestAllowedCreateStrategies(t *testing.T) {
expected := []string{Infer.String(), Dir.String(), File.String(), NoCreate.String()}
actual := AllowedCreateStrategies()

assert.Equal(t, expected, actual, "AllowedCreateStrategies should return all valid strategies")
}

func TestCreateStrategyFromString(t *testing.T) {
testCases := []struct {
name string
input string
expected CreateStrategy
expectError bool
errorContains string
}{
{
name: "infer strategy",
input: "infer",
expected: Infer,
expectError: false,
},
{
name: "directory strategy",
input: "dir",
expected: Dir,
expectError: false,
},
{
name: "file strategy",
input: "file",
expected: File,
expectError: false,
},
{
name: "nocreate strategy",
input: "nocreate",
expected: NoCreate,
expectError: false,
},
{
name: "empty string uses default",
input: "",
expected: DefaultCreateStrategy,
expectError: false,
},
{
name: "invalid strategy",
input: "invalid",
expected: "",
expectError: true,
errorContains: "invalid CreateAs value",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
strategy, err := CreateStrategyFromString(tc.input)

if tc.expectError {
require.Error(t, err)
assert.Contains(t, err.Error(), tc.errorContains)
} else {
require.NoError(t, err)
assert.Equal(t, tc.expected, strategy)
}
})
}
}

func TestInferCreateStrategyFromPath(t *testing.T) {
testCases := []struct {
name string
path string
expected CreateStrategy
}{
{
name: "empty path should be directory",
path: "",
expected: Dir,
},
{
name: "path with trailing slash should be directory",
path: "/path/to/dir/",
expected: Dir,
},
{
name: "path without trailing slash should be file",
path: "/path/to/file",
expected: File,
},
{
name: "path with extension should be file",
path: "/path/to/file.txt",
expected: File,
},
{
name: "root directory should be directory",
path: "/",
expected: Dir,
},
{
name: "relative path to file",
path: "file.txt",
expected: File,
},
{
name: "relative path to directory with trailing slash",
path: "dir/",
expected: Dir,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
strategy := InferCreateStrategyFromPath(tc.path)
assert.Equal(t, tc.expected, strategy)
})
}
}
Loading
Loading