test: use table-driven tests in Helm scanner tests (#8592)

Signed-off-by: nikpivkin <nikita.pivkin@smartforce.io>
Co-authored-by: Simar <simar@linux.com>
This commit is contained in:
Nikita Pivkin
2025-03-22 06:49:36 +06:00
committed by GitHub
parent 68b164ddf4
commit 7bafdcaaf9
2 changed files with 134 additions and 275 deletions

View File

@@ -151,12 +151,7 @@ func Test_helm_tarball_parser(t *testing.T) {
t.Logf("Running test: %s", test.testName)
testPath := filepath.Join("testdata", test.archiveFile)
testTemp := t.TempDir()
testFileName := filepath.Join(testTemp, test.archiveFile)
require.NoError(t, copyArchive(testPath, testFileName))
testFs := os.DirFS(testTemp)
testFs := fsysForAcrhive(t, testPath)
helmParser, err := parser.New(test.archiveFile)
require.NoError(t, err)

View File

@@ -2,175 +2,79 @@ package test
import (
"io"
"io/fs"
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"testing"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/aquasecurity/trivy/pkg/iac/rego"
"github.com/aquasecurity/trivy/pkg/iac/scan"
"github.com/aquasecurity/trivy/pkg/iac/scanners/helm"
"github.com/aquasecurity/trivy/pkg/iac/scanners/options"
"github.com/aquasecurity/trivy/pkg/set"
)
func Test_helm_scanner_with_archive(t *testing.T) {
// TODO(simar7): Figure out why this test fails on Winndows only
if runtime.GOOS == "windows" {
t.Skip("skipping test on windows")
}
func TestScanner_ScanFS(t *testing.T) {
tests := []struct {
testName string
chartName string
path string
archiveName string
name string
fsys fs.FS
opts []options.ScannerOption
assert func(t *testing.T, results scan.Results)
}{
{
testName: "Parsing tarball 'mysql-8.8.26.tar'",
chartName: "mysql",
path: filepath.Join("testdata", "mysql-8.8.26.tar"),
archiveName: "mysql-8.8.26.tar",
name: "archived chart",
// TODO: Scan the archive directly
fsys: fsysForAcrhive(t, filepath.Join("testdata", "mysql-8.8.26.tar")),
assert: assertIds([]string{
"AVD-KSV-0001", "AVD-KSV-0003",
"AVD-KSV-0011", "AVD-KSV-0012", "AVD-KSV-0014",
"AVD-KSV-0015", "AVD-KSV-0016", "AVD-KSV-0018",
"AVD-KSV-0020", "AVD-KSV-0021", "AVD-KSV-0030",
"AVD-KSV-0104", "AVD-KSV-0106",
}),
},
}
for _, test := range tests {
t.Logf("Running test: %s", test.testName)
helmScanner := helm.New(rego.WithEmbeddedPolicies(true), rego.WithEmbeddedLibraries(true))
testTemp := t.TempDir()
testFileName := filepath.Join(testTemp, test.archiveName)
require.NoError(t, copyArchive(test.path, testFileName))
testFs := os.DirFS(testTemp)
results, err := helmScanner.ScanFS(t.Context(), testFs, ".")
require.NoError(t, err)
require.NotNil(t, results)
failed := results.GetFailed()
assert.Len(t, failed, 13)
visited := make(map[string]bool)
var errorCodes []string
for _, result := range failed {
id := result.Flatten().RuleID
if _, exists := visited[id]; !exists {
visited[id] = true
errorCodes = append(errorCodes, id)
}
}
assert.Len(t, errorCodes, 13)
sort.Strings(errorCodes)
assert.Equal(t, []string{
"AVD-KSV-0001", "AVD-KSV-0003",
"AVD-KSV-0011", "AVD-KSV-0012", "AVD-KSV-0014",
"AVD-KSV-0015", "AVD-KSV-0016", "AVD-KSV-0018",
"AVD-KSV-0020", "AVD-KSV-0021", "AVD-KSV-0030",
"AVD-KSV-0104", "AVD-KSV-0106",
}, errorCodes)
}
}
func Test_helm_scanner_with_missing_name_can_recover(t *testing.T) {
tests := []struct {
testName string
chartName string
path string
archiveName string
}{
{
testName: "Parsing tarball 'aws-cluster-autoscaler-bad.tar.gz'",
chartName: "aws-cluster-autoscaler",
path: filepath.Join("testdata", "aws-cluster-autoscaler-bad.tar.gz"),
archiveName: "aws-cluster-autoscaler-bad.tar.gz",
name: "chart in directory",
fsys: os.DirFS(filepath.Join("testdata", "testchart")),
assert: func(t *testing.T, results scan.Results) {
assertIds([]string{
"AVD-KSV-0001", "AVD-KSV-0003",
"AVD-KSV-0011", "AVD-KSV-0012", "AVD-KSV-0014",
"AVD-KSV-0015", "AVD-KSV-0016",
"AVD-KSV-0020", "AVD-KSV-0021", "AVD-KSV-0030",
"AVD-KSV-0104", "AVD-KSV-0106",
"AVD-KSV-0117", "AVD-KSV-0110",
})(t, results)
ignored := results.GetIgnored()
assert.Len(t, ignored, 1)
assert.Equal(t, "AVD-KSV-0018", ignored[0].Rule().AVDID)
assert.Equal(t, "templates/deployment.yaml", ignored[0].Metadata().Range().GetFilename())
},
},
}
for _, test := range tests {
t.Logf("Running test: %s", test.testName)
helmScanner := helm.New(rego.WithEmbeddedPolicies(true), rego.WithEmbeddedLibraries(true))
testTemp := t.TempDir()
testFileName := filepath.Join(testTemp, test.archiveName)
require.NoError(t, copyArchive(test.path, testFileName))
testFs := os.DirFS(testTemp)
_, err := helmScanner.ScanFS(t.Context(), testFs, ".")
require.NoError(t, err)
}
}
func Test_helm_scanner_with_dir(t *testing.T) {
// TODO(simar7): Figure out why this test fails on Winndows only
if runtime.GOOS == "windows" {
t.Skip("skipping test on windows")
}
tests := []struct {
testName string
chartName string
}{
{
testName: "Parsing directory testchart'",
chartName: "testchart",
// TODO: The chart name isn't actually empty
name: "scanner with missing chart name can recover",
fsys: fsysForAcrhive(t, filepath.Join("testdata", "aws-cluster-autoscaler-bad.tar.gz")),
assert: assertIds([]string{
"AVD-KSV-0014", "AVD-KSV-0023", "AVD-KSV-0030",
"AVD-KSV-0104", "AVD-KSV-0003", "AVD-KSV-0018",
"AVD-KSV-0118", "AVD-KSV-0012", "AVD-KSV-0106",
"AVD-KSV-0016", "AVD-KSV-0001", "AVD-KSV-0011",
"AVD-KSV-0015", "AVD-KSV-0021", "AVD-KSV-0110", "AVD-KSV-0020",
}),
},
}
for _, test := range tests {
t.Logf("Running test: %s", test.testName)
helmScanner := helm.New(rego.WithEmbeddedPolicies(true), rego.WithEmbeddedLibraries(true))
testFs := os.DirFS(filepath.Join("testdata", test.chartName))
results, err := helmScanner.ScanFS(t.Context(), testFs, ".")
require.NoError(t, err)
require.NotNil(t, results)
failed := results.GetFailed()
assert.Len(t, failed, 14)
visited := make(map[string]bool)
for _, result := range failed {
visited[result.Rule().AVDID] = true
}
errorCodes := lo.Keys(visited)
assert.ElementsMatch(t, []string{
"AVD-KSV-0001", "AVD-KSV-0003",
"AVD-KSV-0011", "AVD-KSV-0012", "AVD-KSV-0014",
"AVD-KSV-0015", "AVD-KSV-0016",
"AVD-KSV-0020", "AVD-KSV-0021", "AVD-KSV-0030",
"AVD-KSV-0104", "AVD-KSV-0106",
"AVD-KSV-0117", "AVD-KSV-0110",
}, errorCodes)
ignored := results.GetIgnored()
assert.Len(t, ignored, 1)
assert.Equal(t, "AVD-KSV-0018", ignored[0].Rule().AVDID)
assert.Equal(t, "templates/deployment.yaml", ignored[0].Metadata().Range().GetFilename())
}
}
func Test_helm_scanner_with_custom_policies(t *testing.T) {
// TODO(simar7): Figure out why this test fails on Winndows only
if runtime.GOOS == "windows" {
t.Skip("skipping test on windows")
}
regoRule := `
package user.kubernetes.ID001
{
name: "with custom check",
fsys: fsysForAcrhive(t, filepath.Join("testdata", "mysql-8.8.26.tar")),
opts: []options.ScannerOption{
rego.WithPolicyNamespaces("user"),
rego.WithPolicyReader(strings.NewReader(`package user.kubernetes.ID001
__rego_metadata__ := {
"id": "ID001",
"avd_id": "AVD-USR-ID001",
@@ -189,99 +93,36 @@ deny[res] {
input.kind == "Service"
msg := sprintf("Found service '%s' but services are not allowed", [input.metadata.name])
res := result.new(msg, input)
}
`
tests := []struct {
testName string
chartName string
path string
archiveName string
}{
{
testName: "Parsing tarball 'mysql-8.8.26.tar'",
chartName: "mysql",
path: filepath.Join("testdata", "mysql-8.8.26.tar"),
archiveName: "mysql-8.8.26.tar",
},
}
for _, test := range tests {
t.Run(test.testName, func(t *testing.T) {
t.Logf("Running test: %s", test.testName)
helmScanner := helm.New(rego.WithEmbeddedPolicies(true), rego.WithEmbeddedLibraries(true),
rego.WithPolicyDirs("rules"),
rego.WithPolicyNamespaces("user"))
testTemp := t.TempDir()
testFileName := filepath.Join(testTemp, test.archiveName)
require.NoError(t, copyArchive(test.path, testFileName))
policyDirName := filepath.Join(testTemp, "rules")
require.NoError(t, os.Mkdir(policyDirName, 0o700))
require.NoError(t, os.WriteFile(filepath.Join(policyDirName, "rule.rego"), []byte(regoRule), 0o600))
testFs := os.DirFS(testTemp)
results, err := helmScanner.ScanFS(t.Context(), testFs, ".")
require.NoError(t, err)
require.NotNil(t, results)
failed := results.GetFailed()
assert.Len(t, failed, 15)
visited := make(map[string]bool)
for _, result := range failed {
visited[result.Rule().AVDID] = true
}
errorCodes := lo.Keys(visited)
assert.ElementsMatch(t, []string{
}`)),
},
assert: assertIds([]string{
"AVD-KSV-0001", "AVD-KSV-0003",
"AVD-KSV-0011", "AVD-KSV-0012", "AVD-KSV-0014",
"AVD-KSV-0015", "AVD-KSV-0016", "AVD-KSV-0018",
"AVD-KSV-0020", "AVD-KSV-0021", "AVD-KSV-0030",
"AVD-KSV-0104", "AVD-KSV-0106", "AVD-USR-ID001",
}, errorCodes)
})
}
}
func copyArchive(src, dst string) error {
in, err := os.Open(src)
if err != nil {
return err
}
defer func() { _ = in.Close() }()
out, err := os.Create(dst)
if err != nil {
return err
}
defer func() { _ = out.Close() }()
if _, err := io.Copy(out, in); err != nil {
return err
}
return nil
}
func Test_helm_chart_with_templated_name(t *testing.T) {
helmScanner := helm.New(rego.WithEmbeddedPolicies(true), rego.WithEmbeddedLibraries(true))
testFs := os.DirFS(filepath.Join("testdata", "templated-name"))
_, err := helmScanner.ScanFS(t.Context(), testFs, ".")
require.NoError(t, err)
}
func TestCodeShouldNotBeMissing(t *testing.T) {
policy := `# METADATA
}),
},
{
name: "template-based name",
fsys: os.DirFS(filepath.Join("testdata", "templated-name")),
opts: []options.ScannerOption{
rego.WithEmbeddedLibraries(false),
rego.WithEmbeddedPolicies(false),
},
},
{
name: "failed result contains the code",
fsys: os.DirFS("testdata/simmilar-templates"),
opts: []options.ScannerOption{
rego.WithEmbeddedPolicies(false),
rego.WithEmbeddedLibraries(true),
rego.WithPolicyNamespaces("user"),
rego.WithPolicyReader(strings.NewReader(`# METADATA
# title: "Test rego"
# description: "Test rego"
# scope: package
# schemas:
# - input: schema["kubernetes"]
# custom:
# id: ID001
# avd_id: AVD-USR-ID001
# severity: LOW
# input:
@@ -292,36 +133,28 @@ package user.kubernetes.ID001
deny[res] {
input.spec.replicas == 3
res := result.new("Replicas are not allowed", input)
}
`
helmScanner := helm.New(
rego.WithEmbeddedPolicies(false),
rego.WithEmbeddedLibraries(false),
rego.WithPolicyNamespaces("user"),
rego.WithPolicyReader(strings.NewReader(policy)),
)
results, err := helmScanner.ScanFS(t.Context(), os.DirFS("testdata/simmilar-templates"), ".")
require.NoError(t, err)
failedResults := results.GetFailed()
require.Len(t, failedResults, 1)
failed := failedResults[0]
code, err := failed.GetCode()
require.NoError(t, err)
assert.NotNil(t, code)
}
func TestScanSubchartOnce(t *testing.T) {
check := `# METADATA
}`)),
},
assert: func(t *testing.T, results scan.Results) {
failedResults := results.GetFailed()
require.Len(t, failedResults, 1)
code, err := failedResults[0].GetCode()
require.NoError(t, err)
assert.NotNil(t, code)
},
},
{
name: "scan the subchart once",
fsys: os.DirFS(filepath.Join("testdata", "with-subchart")),
opts: []options.ScannerOption{
rego.WithEmbeddedPolicies(false),
rego.WithEmbeddedLibraries(true),
rego.WithPolicyNamespaces("user"),
rego.WithPolicyReader(strings.NewReader(`# METADATA
# title: "Test rego"
# description: "Test rego"
# scope: package
# schemas:
# - input: schema["kubernetes"]
# custom:
# id: ID001
# avd_id: AVD-USR-ID001
# severity: LOW
# input:
@@ -337,19 +170,50 @@ deny[res] {
container := kubernetes.containers[_]
container.securityContext.readOnlyRootFilesystem == false
res := result.new("set 'securityContext.readOnlyRootFilesystem' to true", container)
}`)),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
opts := []options.ScannerOption{
rego.WithEmbeddedPolicies(true),
rego.WithEmbeddedLibraries(true),
}
opts = append(opts, tt.opts...)
scanner := helm.New(opts...)
results, err := scanner.ScanFS(t.Context(), tt.fsys, ".")
require.NoError(t, err)
if tt.assert != nil {
tt.assert(t, results)
}
})
}
}
`
scanner := helm.New(
rego.WithEmbeddedPolicies(false),
rego.WithEmbeddedLibraries(true),
rego.WithPolicyNamespaces("user"),
rego.WithPolicyReader(strings.NewReader(check)),
)
func assertIds(expected []string) func(t *testing.T, results scan.Results) {
return func(t *testing.T, results scan.Results) {
errorCodes := set.New[string]()
for _, result := range results.GetFailed() {
errorCodes.Append(result.Rule().AVDID)
}
assert.ElementsMatch(t, expected, errorCodes.Items())
}
}
results, err := scanner.ScanFS(t.Context(), os.DirFS("testdata/with-subchart"), ".")
func fsysForAcrhive(t *testing.T, src string) fs.FS {
in, err := os.Open(src)
require.NoError(t, err)
require.Len(t, results, 1)
defer in.Close()
assert.Empty(t, results.GetFailed())
tmpDir := t.TempDir()
out, err := os.Create(filepath.Join(tmpDir, filepath.Base(src)))
require.NoError(t, err)
defer out.Close()
_, err = io.Copy(out, in)
require.NoError(t, err)
return os.DirFS(tmpDir)
}