diff --git a/hack/ccp/go.mod b/hack/ccp/go.mod index 882416d06..1aa4b7cf0 100644 --- a/hack/ccp/go.mod +++ b/hack/ccp/go.mod @@ -7,9 +7,10 @@ require ( connectrpc.com/connect v1.16.2 connectrpc.com/grpchealth v1.3.0 connectrpc.com/grpcreflect v1.2.0 - github.com/artefactual-labs/gearmin v0.0.0-20240507145739-e15b2dbf710c + github.com/artefactual-labs/gearmin v0.0.0-20240525053553-51122ec3bbfd github.com/bufbuild/protovalidate-go v0.6.2 github.com/cenkalti/backoff/v4 v4.3.0 + github.com/docker/docker v25.0.5+incompatible github.com/doug-martin/goqu/v9 v9.19.0 github.com/elliotchance/orderedmap/v2 v2.2.0 github.com/fsnotify/fsnotify v1.7.0 @@ -19,6 +20,7 @@ require ( github.com/google/uuid v1.6.0 github.com/hashicorp/go-retryablehttp v0.7.6 github.com/mikespook/gearman-go v0.0.0-20220520031403-2a518e866145 + github.com/otiai10/copy v1.14.0 github.com/peterbourgon/ff/v3 v3.4.0 github.com/rs/cors v1.11.0 github.com/tailscale/hujson v0.0.0-20221223112325-20486734a56a @@ -55,7 +57,6 @@ require ( github.com/cpuguy83/dockercfg v0.3.1 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/distribution/reference v0.5.0 // indirect - github.com/docker/docker v25.0.5+incompatible // indirect github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect diff --git a/hack/ccp/go.sum b/hack/ccp/go.sum index 6902f8587..4045fc40e 100644 --- a/hack/ccp/go.sum +++ b/hack/ccp/go.sum @@ -30,8 +30,8 @@ github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8 github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= github.com/armon/go-radix v1.0.1-0.20221118154546-54df44f2176c h1:651/eoCRnQ7YtSjAnSzRucrJz+3iGEFt+ysraELS81M= github.com/armon/go-radix v1.0.1-0.20221118154546-54df44f2176c/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= -github.com/artefactual-labs/gearmin v0.0.0-20240507145739-e15b2dbf710c h1:vf3GatdfaJojurMPb5g4V2yujXPGwSQZnEs37crmfcA= -github.com/artefactual-labs/gearmin v0.0.0-20240507145739-e15b2dbf710c/go.mod h1:C+JaJpQwRvWN3sa2z2jy+ziy7mQuuTIsTiRFJbUBO38= +github.com/artefactual-labs/gearmin v0.0.0-20240525053553-51122ec3bbfd h1:b9WEVrgVymFz99oBFbfHuzHvnMlDZDNGlGVUB922Rb8= +github.com/artefactual-labs/gearmin v0.0.0-20240525053553-51122ec3bbfd/go.mod h1:0bctkvvsETfNnBXLrUNvHY8FSxFC372jrIhddQLIyBI= github.com/bep/clocks v0.5.0 h1:hhvKVGLPQWRVsBP/UB7ErrHYIO42gINVbvqxvYTPVps= github.com/bep/clocks v0.5.0/go.mod h1:SUq3q+OOq41y2lRQqH5fsOoxN8GbxSiT6jvoVVLCVhU= github.com/bep/debounce v1.2.0 h1:wXds8Kq8qRfwAOpAxHrJDbCXgC5aHSzgQb/0gKsHQqo= @@ -285,6 +285,10 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8 github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= +github.com/otiai10/copy v1.14.0 h1:dCI/t1iTdYGtkvCuBG2BgR6KZa83PTclw4U5n2wAllU= +github.com/otiai10/copy v1.14.0/go.mod h1:ECfuL02W+/FkTWZWgQqXPWZgW9oeKCSQ5qVfSc4qc4w= +github.com/otiai10/mint v1.5.1 h1:XaPLeE+9vGbuyEHem1JNk3bYc7KKqyI/na0/mLd/Kks= +github.com/otiai10/mint v1.5.1/go.mod h1:MJm72SBthJjz8qhefc4z1PYEieWmy8Bku7CjcAqyUSM= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= diff --git a/hack/ccp/hack/integration.sh b/hack/ccp/hack/integration.sh index 1b1930bd5..dcb68a386 100755 --- a/hack/ccp/hack/integration.sh +++ b/hack/ccp/hack/integration.sh @@ -5,15 +5,13 @@ set -e readonly __dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" readonly __root="$(cd "$(dirname "${__dir}")" && pwd)" -docker compose stop archivematica-ccp 2>/dev/null - CCP_DIR=${HOME}/.ccp CCP_AM_DIR=${CCP_DIR}/am-pipeline-data CCP_SS_DIR=${CCP_DIR}/ss-location-data env \ CCP_INTEGRATION_ENABLED=1 \ - CCP_INTEGRATION_TRANSFER_SOURCE=${CCP_SS_DIR}/archivematica/ \ - CCP_INTEGRATION_USE_COMPOSE=yes \ + CCP_INTEGRATION_TRANSFER_SOURCE=${CCP_SS_DIR} \ + CCP_INTEGRATION_ENABLE_TESTCONTAINERS_LOGGING=yes \ CCP_INTEGRATION_USE_STDOUT=yes \ go test -count=1 -v ${__root}/integration/ -run=TestServerCreatePackage diff --git a/hack/ccp/integration/config_test.go b/hack/ccp/integration/config_test.go deleted file mode 100644 index 002b76c37..000000000 --- a/hack/ccp/integration/config_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package integration_test - -import ( - "fmt" - "log" - "os" - "testing" - - "golang.org/x/exp/slices" -) - -const prefix = "CCP_INTEGRATION" - -var ( - enabled = getEnvBool("ENABLED", "no") - useCompose = getEnvBool("USE_COMPOSE", "no") - useStdout = getEnvBool("USE_STDOUT", "yes") - transferSource = getEnvRequired("TRANSFER_SOURCE") -) - -func getEnv(name, fallback string) string { - v := os.Getenv(fmt.Sprintf("%s_%s", prefix, name)) - if v == "" { - return fallback - } - return v -} - -func getEnvRequired(name string) string { - v := getEnv(name, "") - if v == "" && enabled { - log.Fatalf("Required env %s_%s is empty.", prefix, name) - } - return v -} - -func getEnvBool(name, fallback string) bool { - if v := getEnv(name, fallback); slices.Contains([]string{"yes", "1", "on", "true"}, v) { - return true - } else { - return false - } -} - -func requireFlag(t *testing.T) { - if !enabled { - t.Skip("Skipping integration tests (CCP_INTEGRATION_ENABLED=no).") - } -} diff --git a/hack/ccp/integration/data/Dockerfile.worker b/hack/ccp/integration/data/Dockerfile.worker new file mode 100644 index 000000000..39f5912c6 --- /dev/null +++ b/hack/ccp/integration/data/Dockerfile.worker @@ -0,0 +1,169 @@ +# This is a copy of the Archivematica Dockerfile targeting MCPClient only +# I couldn't use the original because testcontainers-go refuses to build it, +# apparently because it's not using BuildKit. + +ARG UBUNTU_VERSION=22.04 +ARG USER_ID=1000 +ARG GROUP_ID=1000 +ARG PYTHON_VERSION=3.9 +ARG PYENV_DIR=/pyenv + +# ----------------------------------------------------------------------------- + +FROM ubuntu:${UBUNTU_VERSION} AS base-builder + +ARG PYENV_DIR + +ENV DEBIAN_FRONTEND noninteractive +ENV PYTHONUNBUFFERED 1 + +RUN set -ex \ + && apt-get update \ + && apt-get install -y --no-install-recommends \ + ca-certificates \ + curl \ + git \ + gnupg \ + libldap2-dev \ + libmysqlclient-dev \ + libsasl2-dev \ + libsqlite3-dev \ + locales \ + pkg-config \ + tzdata \ + && rm -rf /var/lib/apt/lists/* /var/cache/apt/* + +RUN locale-gen en_US.UTF-8 +ENV LANG en_US.UTF-8 +ENV LANGUAGE en_US:en +ENV LC_ALL en_US.UTF-8 + +ENV PYENV_ROOT=${PYENV_DIR}/data +ENV PATH=$PYENV_ROOT/shims:$PYENV_ROOT/bin:$PATH + +# ----------------------------------------------------------------------------- + +FROM base-builder AS pyenv-builder + +ARG PYTHON_VERSION + +RUN set -ex \ + && apt-get update \ + && apt-get install -y --no-install-recommends \ + build-essential \ + libbz2-dev \ + libffi-dev \ + liblzma-dev \ + libncursesw5-dev \ + libreadline-dev \ + libsqlite3-dev \ + libssl-dev \ + libxml2-dev \ + libxmlsec1-dev \ + tk-dev \ + xz-utils \ + zlib1g-dev \ + && rm -rf /var/lib/apt/lists/* /var/cache/apt/* + +RUN set -ex \ + && curl --retry 3 -L https://github.com/pyenv/pyenv-installer/raw/master/bin/pyenv-installer | bash \ + && pyenv install ${PYTHON_VERSION} \ + && pyenv global ${PYTHON_VERSION} + +COPY --link requirements-dev.txt /src/requirements-dev.txt + +RUN set -ex \ + && pyenv exec python3 -m pip install --upgrade pip setuptools \ + && pyenv exec python3 -m pip install --requirement /src/requirements-dev.txt \ + && pyenv rehash + + +# ----------------------------------------------------------------------------- + +FROM base-builder as base + +ARG USER_ID +ARG GROUP_ID +ARG PYENV_DIR + +RUN set -ex \ + && curl --retry 3 -fsSL https://packages.archivematica.org/1.16.x/key.asc | gpg --dearmor -o /etc/apt/keyrings/archivematica-1.16.x.gpg \ + && echo "deb [arch=amd64 signed-by=/etc/apt/keyrings/archivematica-1.16.x.gpg] http://packages.archivematica.org/1.16.x/ubuntu-externals jammy main" > /etc/apt/sources.list.d/archivematica-external.list \ + && curl --retry 3 -so /tmp/repo-mediaarea_1.0-21_all.deb -L https://mediaarea.net/repo/deb/repo-mediaarea_1.0-21_all.deb \ + && dpkg -i /tmp/repo-mediaarea_1.0-21_all.deb \ + && apt-get update \ + && apt-get install -y --no-install-recommends \ + atool \ + bulk-extractor \ + clamav \ + coreutils \ + ffmpeg \ + fits \ + g++ \ + gcc \ + gearman \ + gettext \ + ghostscript \ + hashdeep \ + imagemagick \ + inkscape \ + jhove \ + libffi-dev \ + libimage-exiftool-perl \ + libldap2-dev \ + libmysqlclient-dev \ + libsasl2-dev \ + libssl-dev \ + libxml2-dev \ + libxslt1-dev \ + logapp \ + md5deep \ + mediaconch \ + mediainfo \ + nailgun \ + nfs-common \ + openjdk-8-jre-headless \ + p7zip-full \ + pbzip2 \ + pst-utils \ + python3-lxml \ + rsync \ + siegfried \ + sleuthkit \ + tesseract-ocr \ + tree \ + unar \ + unrar-free \ + uuid \ + && rm -rf /var/lib/apt/lists/* /var/cache/apt/* + +RUN set -ex \ + && groupadd --gid ${GROUP_ID} --system archivematica \ + && useradd --uid ${USER_ID} --gid ${GROUP_ID} --home-dir /var/archivematica --system archivematica \ + && mkdir -p /var/archivematica/sharedDirectory \ + && chown -R archivematica:archivematica /var/archivematica + +# Download ClamAV virus signatures +RUN freshclam --quiet + +USER archivematica + +COPY --chown=${USER_ID}:${GROUP_ID} --from=pyenv-builder --link ${PYENV_DIR} ${PYENV_DIR} +COPY --chown=${USER_ID}:${GROUP_ID} --link . /src + +# ----------------------------------------------------------------------------- + +FROM base + +ENV DJANGO_SETTINGS_MODULE settings.common +ENV PYTHONPATH /src/src/MCPClient/lib/:/src/src/MCPClient/lib/clientScripts:/src/src/archivematicaCommon/lib/:/src/src/dashboard/src/ +ENV ARCHIVEMATICA_MCPCLIENT_ARCHIVEMATICACLIENTMODULES /src/src/MCPClient/lib/archivematicaClientModules +ENV ARCHIVEMATICA_MCPCLIENT_CLIENTASSETSDIRECTORY /src/src/MCPClient/lib/assets/ +ENV ARCHIVEMATICA_MCPCLIENT_CLIENTSCRIPTSDIRECTORY /src/src/MCPClient/lib/clientScripts/ + +# Some scripts in archivematica-fpr-admin executed by MCPClient rely on certain +# files being available in this image (e.g. see https://git.io/vA1wF). +COPY --link src/archivematicaCommon/lib/externals/fido/ /usr/lib/archivematica/archivematicaCommon/externals/fido/ +COPY --link src/archivematicaCommon/lib/externals/fiwalk_plugins/ /usr/lib/archivematica/archivematicaCommon/externals/fiwalk_plugins/ + +ENTRYPOINT ["pyenv", "exec", "python3", "/src/src/MCPClient/lib/archivematicaClient.py"] diff --git a/hack/ccp/integration/mcp.sql.bz2 b/hack/ccp/integration/data/mcp.sql.bz2 similarity index 100% rename from hack/ccp/integration/mcp.sql.bz2 rename to hack/ccp/integration/data/mcp.sql.bz2 diff --git a/hack/ccp/integration/main_test.go b/hack/ccp/integration/main_test.go deleted file mode 100644 index 0878412c7..000000000 --- a/hack/ccp/integration/main_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package integration_test - -import ( - "testing" - - "go.uber.org/goleak" -) - -func TestMain(m *testing.M) { - goleak.VerifyTestMain(m) -} diff --git a/hack/ccp/integration/mysql_test.go b/hack/ccp/integration/mysql_test.go deleted file mode 100644 index b37f0f6c8..000000000 --- a/hack/ccp/integration/mysql_test.go +++ /dev/null @@ -1,57 +0,0 @@ -package integration_test - -import ( - "context" - "database/sql" - "testing" - - "github.com/testcontainers/testcontainers-go" - "github.com/testcontainers/testcontainers-go/modules/mysql" -) - -func useMySQL(t *testing.T) string { - t.Helper() - - if useCompose { - return "root:12345@tcp(127.0.0.1:63001)/MCP" - } - - ctx := context.Background() - - container, err := mysql.RunContainer(ctx, - testcontainers.WithImage("mysql:8.3.0"), - mysql.WithDatabase("MCP"), - mysql.WithUsername("root"), - mysql.WithPassword("12345"), - mysql.WithScripts("mcp.sql.bz2"), - ) - if err != nil { - t.Fatalf("Failed to start container: %s", err) - } - - t.Cleanup(func() { - if err := container.Terminate(ctx); err != nil { - t.Logf("Failed to terminate container: %v", err) - } - }) - - var ( - db *sql.DB - dsn string - ) - dsn, err = container.ConnectionString(ctx) - if err != nil { - t.Fatalf("Failed to create connection string to MySQL server.") - } - db, err = sql.Open("mysql", dsn) - if err != nil { - t.Fatalf("Failed to connect to MySQL: %v", err) - } - defer db.Close() - - if err = db.Ping(); err != nil { - t.Fatalf("Failed to ping MySQL: %v", err) - } - - return dsn -} diff --git a/hack/ccp/integration/server_test.go b/hack/ccp/integration/server_test.go index df1e098cb..9dd7d51f0 100644 --- a/hack/ccp/integration/server_test.go +++ b/hack/ccp/integration/server_test.go @@ -1,48 +1,24 @@ package integration_test import ( - "bytes" - "context" - "errors" - "io" - "net/http" - "os" - "os/exec" - "path/filepath" - "strings" "testing" "time" "connectrpc.com/connect" - "github.com/cenkalti/backoff/v4" "google.golang.org/protobuf/types/known/wrapperspb" "gotest.tools/v3/assert" - "gotest.tools/v3/fs" "gotest.tools/v3/poll" adminv1 "github.com/artefactual/archivematica/hack/ccp/internal/api/gen/archivematica/ccp/admin/v1beta1" - adminv1connect "github.com/artefactual/archivematica/hack/ccp/internal/api/gen/archivematica/ccp/admin/v1beta1/adminv1beta1connect" - "github.com/artefactual/archivematica/hack/ccp/internal/cmd/rootcmd" - "github.com/artefactual/archivematica/hack/ccp/internal/cmd/servercmd" ) func TestServerCreatePackage(t *testing.T) { - // This test is not going to work until I can have MCPClient connect to CCP. - // - // Options: - // 1. Create an external network in Compose and use host.docker.internal:12345. - // 2. Set up test using Dagger services. - // 3. Fake Storage Service - // t.Skip("Create integration environment.") - requireFlag(t) - client := runServer(t) - restartMCPClient(t) - ctx := context.Background() + env := createEnv(t) - transferDir := createTransfer(t) + transferDir := env.createTransfer() - cpResp, err := client.CreatePackage(ctx, &connect.Request[adminv1.CreatePackageRequest]{ + cpResp, err := env.ccpClient.CreatePackage(env.ctx, &connect.Request[adminv1.CreatePackageRequest]{ Msg: &adminv1.CreatePackageRequest{ Name: "Foobar", Path: []string{transferDir}, @@ -53,7 +29,7 @@ func TestServerCreatePackage(t *testing.T) { poll.WaitOn(t, func(t poll.LogT) poll.Result { - rpResp, err := client.ReadPackage(ctx, &connect.Request[adminv1.ReadPackageRequest]{ + rpResp, err := env.ccpClient.ReadPackage(env.ctx, &connect.Request[adminv1.ReadPackageRequest]{ Msg: &adminv1.ReadPackageRequest{ Id: cpResp.Msg.Id, }, @@ -63,159 +39,18 @@ func TestServerCreatePackage(t *testing.T) { } pkg := rpResp.Msg.Pkg - if pkg.Status == adminv1.PackageStatus_PACKAGE_STATUS_DONE || pkg.Status == adminv1.PackageStatus_PACKAGE_STATUS_COMPLETED_SUCCESSFULLY || pkg.Status == adminv1.PackageStatus_PACKAGE_STATUS_FAILED { + if pkg.Status == adminv1.PackageStatus_PACKAGE_STATUS_FAILED { + return poll.Error(err) + } + if pkg.Status == adminv1.PackageStatus_PACKAGE_STATUS_DONE || pkg.Status == adminv1.PackageStatus_PACKAGE_STATUS_COMPLETED_SUCCESSFULLY { return poll.Success() } return poll.Continue("work is still ongoing") }, poll.WithDelay(time.Second/4), - poll.WithTimeout(time.Second*10), + poll.WithTimeout(time.Second*120), ) -} - -func runServer(t *testing.T) adminv1connect.AdminServiceClient { - ctx, cancel := context.WithCancel(context.Background()) - - dsn := useMySQL(t) - - var sharedDir string - if useCompose { - home, _ := os.UserHomeDir() - sharedDir = filepath.Join(home, ".ccp/am-pipeline-data") - } else { - sharedDir = fs.NewDir(t, "amccp-servercmd").Path() - } - - args := []string{ - // root flags - "-v=10", - "--debug", - // server flags - "--shared-dir=" + sharedDir, - "--db.driver=mysql", - "--db.dsn=" + dsn, - "--api.admin.addr=:22300", - "--gearmin.addr=:22301", - "--ssclient.url=http://127.0.0.1:63081", - "--ssclient.username=test", - "--ssclient.key=test", - } - - var stdout io.Writer - if useStdout { - stdout = os.Stdout - } else { - stdout = bytes.NewBuffer([]byte{}) - } - - cmd := servercmd.New(&rootcmd.Config{}, stdout) - assert.NilError(t, cmd.Parse(args)) - done := make(chan error) - go func() { - done <- cmd.Exec(ctx, []string{}) - }() - - // Server is likely running, but let's try to receive to see if it failed. - select { - case <-time.After(time.Second / 2): - case err := <-done: - assert.NilError(t, err) - } - - t.Cleanup(func() { - cancel() - err := <-done - assert.NilError(t, err) - }) - - baseURL := "http://127.0.0.1:22300" - waitForHealthStatus(t, baseURL) - return adminv1connect.NewAdminServiceClient(&http.Client{}, baseURL) -} - -// waitForHealthStatus blocks until the heatlh check status succeeds. -func waitForHealthStatus(t *testing.T, baseURL string) { - client := &http.Client{} - - var retryPolicy backoff.BackOff = backoff.NewConstantBackOff(time.Second) - retryPolicy = backoff.WithMaxRetries(retryPolicy, 10) - - backoff.RetryNotify( - func() error { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) - defer cancel() - - req, err := http.NewRequestWithContext(ctx, http.MethodPost, baseURL+"/grpc.health.v1.Health/Check", bytes.NewReader([]byte("{}"))) - req.Header.Set("Content-Type", "application/json") - if err != nil { - return err - } - resp, err := client.Do(req) - if err != nil { - return err - } - - if resp.StatusCode != http.StatusOK { - return errors.New("unexpected status code") - } - - blob, err := io.ReadAll(resp.Body) - if err != nil { - return err - } - - if string(blob) != `{"status":"SERVING_STATUS_SERVING"}` { - return errors.New("unexpected status") - } - - return nil - }, - retryPolicy, - func(err error, d time.Duration) { - t.Logf("Retrying... (%v)", err) - }, - ) -} - -// createTransfer creates a sample transfer in the transfer source directory. -func createTransfer(t *testing.T) string { - t.Helper() - - const tsRealPath = "/home/archivematica" - - err := os.MkdirAll(filepath.Join(transferSource, "ccp"), os.FileMode(0o770)) - assert.NilError(t, err) - - tmpDir, err := os.MkdirTemp(filepath.Join(transferSource, "ccp"), "transfer-*") - assert.NilError(t, err) - - writeFile(t, filepath.Join(tmpDir, "f1.txt"), "") - writeFile(t, filepath.Join(tmpDir, "f2.txt"), "") - - err = os.Link("../hack/processingMCP.xml", filepath.Join(tmpDir, "processingMCP.xml")) - assert.NilError(t, err) - - tmpDir = strings.TrimPrefix(tmpDir, transferSource) - tmpDir = filepath.Join(tsRealPath, tmpDir) - - return tmpDir -} - -func writeFile(t *testing.T, name, contents string) { - file, err := os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0o644) - assert.NilError(t, err) - - _, err = file.WriteString(contents) - assert.NilError(t, err) - - file.Close() -} - -func restartMCPClient(t *testing.T) { - t.Log("Restarting MCPClient...") - cmd := exec.Command("docker-compose", "restart", "archivematica-mcp-client") - err := cmd.Run() - assert.NilError(t, err) + t.Log("Test completed successfully!") } diff --git a/hack/ccp/integration/storage/storage.go b/hack/ccp/integration/storage/storage.go new file mode 100644 index 000000000..b981aef9b --- /dev/null +++ b/hack/ccp/integration/storage/storage.go @@ -0,0 +1,263 @@ +package storage + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "path/filepath" + "testing" + + "github.com/google/uuid" + "github.com/otiai10/copy" + "gotest.tools/v3/assert" + + "github.com/artefactual/archivematica/hack/ccp/internal/ssclient" + "github.com/artefactual/archivematica/hack/ccp/internal/ssclient/enums" +) + +// I'm not making this up, it's the pipeline identifier found in mcp.sql.bz2. +var pipelineID = uuid.MustParse("dac039b9-81d1-405b-a2c9-72d7d7920c15") + +type storageService struct { + t testing.TB + sharedDir string + transferSourceDir string + locations []*ssclient.Location +} + +func New(t testing.TB, sharedDir, transferSourceDir string) *httptest.Server { + storage := &storageService{ + t: t, + sharedDir: sharedDir, + transferSourceDir: transferSourceDir, + locations: []*ssclient.Location{ + { + ID: uuid.MustParse("5cbbf1f6-7abe-474e-8dda-9904083a1831"), + URI: "/api/v2/location/5cbbf1f6-7abe-474e-8dda-9904083a1831/", + Purpose: enums.LocationPurposeTS, + Pipelines: []string{fmt.Sprintf("/api/v2/pipeline/%s/", pipelineID)}, + Path: transferSourceDir, + }, + { + ID: uuid.MustParse("df192133-3b13-4292-a219-50887d285cb3"), + URI: "/api/v2/location/df192133-3b13-4292-a219-50887d285cb3/", + Purpose: enums.LocationPurposeCP, + Pipelines: []string{fmt.Sprintf("/api/v2/pipeline/%s/", pipelineID)}, + Path: sharedDir, + }, + { + ID: uuid.MustParse("4b3508e0-e32b-4382-ae62-1e639ddab211"), + URI: "/api/v2/location/4b3508e0-e32b-4382-ae62-1e639ddab211/", + Purpose: enums.LocationPurposeBL, + Pipelines: []string{fmt.Sprintf("/api/v2/pipeline/%s/", pipelineID)}, + Path: sharedDir, + }, + }, + } + + mux := http.NewServeMux() + mux.HandleFunc("GET /api/v2/pipeline/{id}/", storage.readPipeline) + mux.HandleFunc("GET /api/v2/location/{id}/", storage.readLocation) + mux.HandleFunc("GET /api/v2/location/", storage.listLocations) + mux.HandleFunc("GET /api/v2/location/default/{purpose}/", storage.readDefaultLocation) + mux.HandleFunc("POST /api/v2/location/{id}/", storage.moveFiles) + mux.HandleFunc("POST /api/v2/file/", storage.createPackage) + mux.HandleFunc("PUT /api/v2/file/{id}/", storage.updatePackagecontents) + + srv := httptest.NewServer(mux) + + t.Cleanup(func() { + srv.Close() + }) + + return srv +} + +func (s *storageService) readPipeline(w http.ResponseWriter, req *http.Request) { + id := req.PathValue("id") + pID, err := uuid.Parse(id) + if err != nil { + http.Error(w, "invalid id", http.StatusBadRequest) + return + } + if pID != pipelineID { + http.Error(w, "pipeline not found", http.StatusNotFound) + return + } + writeJSON(w, jPipeline{ + &ssclient.Pipeline{ + ID: pipelineID, + URI: fmt.Sprintf("/api/v2/pipeline/%s/", pipelineID), + }, + }) +} + +func (s *storageService) readLocation(w http.ResponseWriter, req *http.Request) { + id := req.PathValue("id") + locationID, err := uuid.Parse(id) + if err != nil { + http.Error(w, "invalid id", http.StatusBadRequest) + return + } + var match *ssclient.Location + for _, loc := range s.locations { + if loc.ID == locationID { + match = loc + } + } + if match == nil { + http.Error(w, "location not found", http.StatusNotFound) + return + } + writeJSON(w, jLocation{match}) +} + +func writeJSON(w http.ResponseWriter, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(v); err != nil { + http.Error(w, "Failed to encode response.", http.StatusInternalServerError) + } +} + +func (s *storageService) listLocations(w http.ResponseWriter, req *http.Request) { + purpose := req.URL.Query().Get("purpose") + var matches []*ssclient.Location + for _, item := range s.locations { + if purpose == item.Purpose.String() { + matches = append(matches, item) + } + } + writeJSON(w, jLocationList{matches}) +} + +func (s *storageService) readDefaultLocation(w http.ResponseWriter, req *http.Request) { + purpose := req.PathValue("purpose") + var match string + for _, item := range s.locations { + if purpose == item.Purpose.String() { + match = item.URI + } + } + if match == "" { + http.Error(w, "defaut location not found", http.StatusNotFound) + return + } + w.Header().Set("Location", match) + w.WriteHeader(http.StatusFound) +} + +func (s *storageService) moveFiles(w http.ResponseWriter, req *http.Request) { + id := req.PathValue("id") + fmt.Println(id) + + v := map[string]any{} + if err := json.NewDecoder(req.Body).Decode(&v); err != nil { + http.Error(w, "cannot decode payload", http.StatusBadRequest) + return + } + + files, ok := v["files"] + if !ok { + http.Error(w, "bad request", http.StatusBadRequest) + return + } + + if files, ok := files.([]interface{}); ok { + for _, file := range files { + if f, ok := file.(map[string]interface{}); ok { + src := f["source"].(string) // E.g.: "archivematica/ccp/transfer-3333975231" (relative to /home) + dst := f["destination"].(string) // E.g.: "/var/archivematica/sharedDirectory/tmp/3323409791/Foobar/" + + src = filepath.Join(s.transferSourceDir, src) + + s.t.Logf("storage-stub: copy(%s, %s)", src, dst) + err := copy.Copy( + src, + dst, + copy.Options{}, + ) + assert.NilError(s.t, err) + } + } + } +} + +func (s *storageService) createPackage(w http.ResponseWriter, req *http.Request) { + writeJSON(w, map[string]string{"status": "OK"}) +} + +func (s *storageService) updatePackagecontents(w http.ResponseWriter, req *http.Request) { + writeJSON(w, map[string]string{"status": "OK"}) +} + +type jPipeline struct { + *ssclient.Pipeline +} + +func (j jPipeline) MarshalJSON() ([]byte, error) { + type transformer struct { + ID uuid.UUID `json:"uuid"` + URI string `json:"resource_uri"` + } + tr := transformer{ + ID: j.ID, + URI: j.URI, + } + return json.Marshal(tr) +} + +type jLocation struct { + *ssclient.Location +} + +func (j jLocation) MarshalJSON() ([]byte, error) { + type transformer struct { + ID uuid.UUID `json:"uuid"` + URI string `json:"resource_uri"` + Purpose enums.LocationPurpose `json:"purpose"` + Path string `json:"path"` + RelativePath string `json:"relative_path"` + Pipelines []string `json:"pipeline"` + } + tr := transformer{ + ID: j.ID, + URI: j.URI, + Purpose: j.Purpose, + Path: j.Path, + RelativePath: j.RelativePath, + Pipelines: j.Pipelines, + } + return json.Marshal(tr) +} + +type jLocationList struct { + locations []*ssclient.Location +} + +func (j jLocationList) MarshalJSON() ([]byte, error) { + type meta struct { + Limit int `json:"limit"` + Next *string `json:"next"` + Offset int `json:"offset"` + Previous *string `json:"previous"` + Total int `json:"total_count"` + } + type objects []*jLocation + type transformer struct { + Meta meta `json:"meta"` + Objects objects `json:"objects"` + } + tr := transformer{ + Meta: meta{ + Limit: 100, + Total: len(j.locations), + }, + Objects: make([]*jLocation, 0, len(j.locations)), + } + for _, item := range j.locations { + tr.Objects = append(tr.Objects, &jLocation{item}) + } + return json.Marshal(tr) +} diff --git a/hack/ccp/integration/support_test.go b/hack/ccp/integration/support_test.go new file mode 100644 index 000000000..d492e92d1 --- /dev/null +++ b/hack/ccp/integration/support_test.go @@ -0,0 +1,438 @@ +package integration_test + +import ( + "bytes" + "context" + "database/sql" + "errors" + "fmt" + "io" + "log" + "net" + "net/http" + "net/url" + "os" + "os/user" + "path/filepath" + "strconv" + "strings" + "testing" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/mount" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/modules/mysql" + "go.uber.org/goleak" + "golang.org/x/exp/slices" + "gotest.tools/v3/assert" + + "github.com/artefactual/archivematica/hack/ccp/integration/storage" + adminv1connect "github.com/artefactual/archivematica/hack/ccp/internal/api/gen/archivematica/ccp/admin/v1beta1/adminv1beta1connect" + "github.com/artefactual/archivematica/hack/ccp/internal/cmd/rootcmd" + "github.com/artefactual/archivematica/hack/ccp/internal/cmd/servercmd" +) + +const envPrefix = "CCP_INTEGRATION" + +var ( + enableIntegration = getEnvBool("ENABLED", "no") + enableCCPLogging = getEnvBool("ENABLE_CCP_LOGGING", "yes") + enableTestContainersLogging = getEnvBool("ENABLE_TESTCONTAINERS_LOGGING", "no") +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} + +type hostPort struct { + host string + port string + portN int +} + +func newHostPort(host string, port int) hostPort { + return hostPort{host, strconv.Itoa(port), port} +} + +func (hp hostPort) String() string { + return hp.Addr() +} + +func (hp hostPort) Addr() string { + return net.JoinHostPort(hp.host, hp.port) +} + +func (hp hostPort) URL(scheme string) string { + u, _ := url.Parse(scheme + "://" + hp.Addr()) + return u.String() +} + +type env struct { + t testing.TB + ctx context.Context + + // Information about the current user that is running this test. + user *user.User + + // Path to the shared directory in the local filesystem. + sharedDir string + + // Path to the transfer source directory in the local filesystem. + transferSourceDir string + + // Listen address for the CCP Admin API server. + // TODO: use free port. + ccpAdminServerAddr hostPort + + // Listen address for the CCP Job server. + // TODO: use free port. + ccpJobServerAddr hostPort + + // CCP Admin API client, should be ready to use once the env is created. + ccpClient adminv1connect.AdminServiceClient + + storageServiceBaseURL string + + // MySQL client and connection details. + mysqlClient *sql.DB + mysqlDSN string + mysqlContainerIP string +} + +// createEnv brings up all the dependencies needed by CCP to run our integration +// tests successfully. It uses testcontainers to run containers. +func createEnv(t *testing.T) *env { + env := &env{ + t: t, + ctx: context.Background(), + ccpAdminServerAddr: newHostPort("127.0.0.1", 22300), + ccpJobServerAddr: newHostPort("127.0.0.1", 22301), + } + env.lookUpUser() + + testcontainers.Logger = logger{t} + + env.sharedDir = env.tempDir("ccp-sharedDir") + env.transferSourceDir = env.tempDir("ccp-transferSourceDir") + + // These are all blocking. + env.runMySQL() + env.runStorageService() + env.runCCP() + env.runMCPClient() + + return env +} + +func (e *env) tempDir(name string) string { + tmpDir, err := os.MkdirTemp("", name+"-*") + assert.NilError(e.t, err) + return tmpDir +} + +func (e *env) lookUpUser() { + e.t.Log("Looking up user...") + + user, err := user.Current() + assert.NilError(e.t, err) + e.user = user +} + +func (e *env) runMySQL() { + e.t.Log("Running MySQL server...") + + container, err := mysql.RunContainer(e.ctx, + testcontainers.WithImage("mysql:8.4.0"), + testcontainers.CustomizeRequestOption(func(req *testcontainers.GenericContainerRequest) error { + req.LogConsumerCfg = &testcontainers.LogConsumerConfig{ + Opts: []testcontainers.LogProductionOption{testcontainers.WithLogProductionTimeout(10 * time.Second)}, + Consumers: []testcontainers.LogConsumer{&logConsumer{e.t, "mysql"}}, + } + return nil + }), + mysql.WithDatabase("MCP"), + mysql.WithUsername("root"), + mysql.WithPassword("12345"), + mysql.WithScripts("data/mcp.sql.bz2"), + ) + assert.NilError(e.t, err, "Failed to start container.") + e.t.Cleanup(func() { + _ = container.Terminate(e.ctx) + }) + + e.mysqlContainerIP, err = container.ContainerIP(e.ctx) + assert.NilError(e.t, err) + + e.mysqlDSN, err = container.ConnectionString(e.ctx) + assert.NilError(e.t, err, "Failed to create connection string to MySQL server.") + + e.mysqlClient, err = sql.Open("mysql", e.mysqlDSN) + assert.NilError(e.t, err, "Failed to connect to MySQL.") + e.t.Cleanup(func() { e.mysqlClient.Close() }) + + err = e.mysqlClient.Ping() + assert.NilError(e.t, err, "Failed to ping MySQL.") +} + +func (e *env) runStorageService() { + e.t.Log("Running Storage Service stub...") + + srv := storage.New(e.t, e.sharedDir, e.transferSourceDir) + e.storageServiceBaseURL = srv.URL +} + +func (e *env) runMCPClient() { + e.t.Log("Running Archivematica MCPClient...") + + // Update the database with the URL of the Storage Service stub. + u, err := url.Parse(e.storageServiceBaseURL) + assert.NilError(e.t, err) + ssPort, err := strconv.Atoi(u.Port()) + assert.NilError(e.t, err) + u.Host = fmt.Sprintf("%s:%d", testcontainers.HostInternal, ssPort) + _, err = e.mysqlClient.ExecContext(e.ctx, "UPDATE DashboardSettings SET value = ? WHERE name = 'storage_service_url';", u.String()) + assert.NilError(e.t, err) + + req := testcontainers.ContainerRequest{ + Name: "ccp-archivematica-mcp-client", + FromDockerfile: testcontainers.FromDockerfile{ + // We could start using a public image instead once 1917 and 1931 are merged. + Context: "../../../", + Dockerfile: "hack/ccp/integration/data/Dockerfile.worker", + PrintBuildLog: false, + KeepImage: true, + }, + HostAccessPorts: []int{ + e.ccpJobServerAddr.portN, // Proxy Gearmin job server. + ssPort, // Proxy Storage server. + }, + HostConfigModifier: func(hostConfig *container.HostConfig) { + hostConfig.Mounts = []mount.Mount{ + { + Type: mount.TypeBind, + Source: e.sharedDir, + Target: "/var/archivematica/sharedDirectory", + ReadOnly: false, + }, + } + }, + LogConsumerCfg: &testcontainers.LogConsumerConfig{ + Opts: []testcontainers.LogProductionOption{testcontainers.WithLogProductionTimeout(10 * time.Second)}, + Consumers: []testcontainers.LogConsumer{&logConsumer{e.t, "worker"}}, + }, + Env: map[string]string{ + "DJANGO_SECRET_KEY": "12345", + "ARCHIVEMATICA_MCPCLIENT_CLIENT_USER": "root", + "ARCHIVEMATICA_MCPCLIENT_CLIENT_PASSWORD": "12345", + "ARCHIVEMATICA_MCPCLIENT_CLIENT_HOST": e.mysqlContainerIP, + "ARCHIVEMATICA_MCPCLIENT_CLIENT_PORT": "3306", + "ARCHIVEMATICA_MCPCLIENT_CLIENT_DATABASE": "MCP", + "ARCHIVEMATICA_MCPCLIENT_MCPCLIENT_MCPARCHIVEMATICASERVER": fmt.Sprintf("%s:%d", testcontainers.HostInternal, e.ccpJobServerAddr.portN), + "ARCHIVEMATICA_MCPCLIENT_MCPCLIENT_SEARCH_ENABLED": "false", + }, + } + + container, err := testcontainers.GenericContainer(e.ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + assert.NilError(e.t, err) + + e.t.Cleanup(func() { + if e.t.Failed() { + e.logContainerOutput(container) + } + + _ = container.Terminate(e.ctx) + }) +} + +func (e *env) logContainerOutput(container testcontainers.Container) { + reader, err := container.Logs(e.ctx) + assert.NilError(e.t, err) + + blob, err := io.ReadAll(reader) + assert.NilError(e.t, err) + + e.t.Log(string(blob)) +} + +func (e *env) runCCP() { + ctx, cancel := context.WithCancel(e.ctx) + + args := []string{ + // root flags + "-v=10", + "--debug", + // server flags + "--shared-dir=" + e.sharedDir, + "--db.driver=mysql", + "--db.dsn=" + e.mysqlDSN, + "--api.admin.addr=" + e.ccpAdminServerAddr.Addr(), + "--gearmin.addr=" + e.ccpJobServerAddr.Addr(), + "--ssclient.url=" + e.storageServiceBaseURL, + "--ssclient.username=test", + "--ssclient.key=test", + } + + var stdout io.Writer + if enableCCPLogging { + stdout = os.Stdout + } else { + stdout = bytes.NewBuffer([]byte{}) + } + + cmd := servercmd.New(&rootcmd.Config{}, stdout) + assert.NilError(e.t, cmd.Parse(args)) + done := make(chan error) + go func() { + done <- cmd.Exec(ctx, []string{}) + }() + + // Server is likely running, but let's try to receive to see if it failed. + select { + case <-time.After(time.Second / 2): + case err := <-done: + assert.NilError(e.t, err) + } + + e.t.Cleanup(func() { + cancel() + err := <-done + assert.NilError(e.t, err) + }) + + baseURL := e.ccpAdminServerAddr.URL("http") + waitForHealthStatus(e.t, baseURL) + + e.ccpClient = adminv1connect.NewAdminServiceClient(&http.Client{}, baseURL) +} + +// createTransfer creates a sample transfer in the transfer source directory. +func (e *env) createTransfer() string { + tmpDir, err := os.MkdirTemp(e.transferSourceDir, "transfer-*") + assert.NilError(e.t, err) + + writeFile(e.t, filepath.Join(tmpDir, "f1.txt"), "") + writeFile(e.t, filepath.Join(tmpDir, "f2.txt"), "") + + err = os.Link("../hack/processingMCP.xml", filepath.Join(tmpDir, "processingMCP.xml")) + assert.NilError(e.t, err) + + e.t.Logf("Created transfer: %s", tmpDir) + + return tmpDir +} + +// waitForHealthStatus blocks until the heatlh check status succeeds. +func waitForHealthStatus(t testing.TB, baseURL string) { + client := &http.Client{} + + var retryPolicy backoff.BackOff = backoff.NewConstantBackOff(time.Second) + retryPolicy = backoff.WithMaxRetries(retryPolicy, 10) + + backoff.RetryNotify( + func() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, baseURL+"/grpc.health.v1.Health/Check", bytes.NewReader([]byte("{}"))) + req.Header.Set("Content-Type", "application/json") + if err != nil { + return err + } + + resp, err := client.Do(req) + if err != nil { + return err + } + + if resp.StatusCode != http.StatusOK { + return errors.New("unexpected status code") + } + + blob, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + + if string(blob) != `{"status":"SERVING_STATUS_SERVING"}` { + return errors.New("unexpected status") + } + + return nil + }, + retryPolicy, + func(err error, d time.Duration) { + t.Logf("Retrying... (%v)", err) + }, + ) +} + +func writeFile(t testing.TB, name, contents string) { + file, err := os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0o644) + assert.NilError(t, err) + + _, err = file.WriteString(contents) + assert.NilError(t, err) + + file.Close() +} + +func getEnv(name, fallback string) string { + v := os.Getenv(fmt.Sprintf("%s_%s", envPrefix, name)) + if v == "" { + return fallback + } + return v +} + +func getEnvRequired(name string) string { //nolint: unused + v := getEnv(name, "") + if v == "" && enableIntegration { + log.Fatalf("Required env %s_%s is empty.", envPrefix, name) + } + return v +} + +func getEnvBool(name, fallback string) bool { + if v := getEnv(name, fallback); slices.Contains([]string{"yes", "1", "on", "true"}, v) { + return true + } else { + return false + } +} + +func requireFlag(t *testing.T) { + if !enableIntegration { + t.Skip("Skipping integration tests (CCP_INTEGRATION_ENABLED=no).") + } +} + +// logger implements testcontainers.Logging. This implementation logs only if +// requested by the user via enableTestContainersLogging. +type logger struct { + testing.TB +} + +func (l logger) Printf(format string, v ...interface{}) { + if enableTestContainersLogging { + l.Logf(format, v...) + } +} + +// logConsumer implements testcontainers.LogConsumer. +type logConsumer struct { + t testing.TB + container string +} + +func (c *logConsumer) Accept(l testcontainers.Log) { + if enableTestContainersLogging { + content := string(l.Content) + content = strings.TrimSuffix(content, "\n") + c.t.Logf("[%s] %s", c.container, content) + } +} diff --git a/hack/ccp/internal/cmd/servercmd/server.go b/hack/ccp/internal/cmd/servercmd/server.go index 9ebeb2269..f3fe5c380 100644 --- a/hack/ccp/internal/cmd/servercmd/server.go +++ b/hack/ccp/internal/cmd/servercmd/server.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net" + "net/url" "path/filepath" "time" @@ -110,7 +111,7 @@ func (s *Server) Run() error { s.logger.V(1).Info("Creating ssclient.") retryableClient := retryablehttp.NewClient() - retryableClient.Logger = nil + retryableClient.Logger = httpClientLogger{s.logger.WithName("ssclient").V(2)} ssclient, err := ssclient.NewClient(retryableClient.StandardClient(), s.store, s.config.ssclient) if err != nil { return fmt.Errorf("error creating ssclient: %v", err) @@ -167,3 +168,20 @@ func (s *Server) Close() error { return errs } + +type httpClientLogger struct { + logr.Logger +} + +func (l httpClientLogger) Printf(msg string, keysAndValues ...any) { + method, path := "", "" + if len(keysAndValues) >= 2 { + if v, ok := keysAndValues[0].(string); ok { + method = v + } + if v, ok := keysAndValues[1].(*url.URL); ok { + path = v.Path + } + } + l.Info("ssclient", "method", method, "path", path, "client", "github.com/hashicorp/go-retryablehttp") +} diff --git a/hack/ccp/internal/controller/controller.go b/hack/ccp/internal/controller/controller.go index b64ea8f92..293e18f3e 100644 --- a/hack/ccp/internal/controller/controller.go +++ b/hack/ccp/internal/controller/controller.go @@ -2,6 +2,7 @@ package controller import ( "context" + "errors" "fmt" "path/filepath" "strings" @@ -276,10 +277,11 @@ func (c *Controller) Decisions() []string { func (c *Controller) Close() error { var err error - c.closeOnce.Do(func() { c.groupCancel() - err = c.group.Wait() + if waitErr := c.group.Wait(); errors.Is(waitErr, context.Canceled) { + err = waitErr + } }) return err diff --git a/hack/ccp/internal/controller/iterator.go b/hack/ccp/internal/controller/iterator.go index 52fd61260..4971cbe62 100644 --- a/hack/ccp/internal/controller/iterator.go +++ b/hack/ccp/internal/controller/iterator.go @@ -13,7 +13,10 @@ import ( "github.com/artefactual/archivematica/hack/ccp/internal/workflow" ) -var errWait = errors.New("wait") +var ( + errWait = errors.New("wait") + errEnd = errors.New("terminator") +) // A chain is used for passing information between jobs. // @@ -89,12 +92,6 @@ func (i *iterator) Process(ctx context.Context) (err error) { if err := i.p.markAsProcessing(ctx); err != nil { return err } - defer func() { - // TODO: can we be more specific? E.g. failed or completed. - if markErr := i.p.markAsDone(ctx); err != nil { - err = errors.Join(err, markErr) - } - }() next := i.startAtChainID @@ -131,9 +128,22 @@ func (i *iterator) Process(ctx context.Context) (err error) { } n, err := i.runJob(ctx, next) - if err == io.EOF { + + // End of chain. + if errors.Is(err, io.EOF) { + // TODO: can we have this iterator span across chains? + return nil + } + + // End of processing. + if errors.Is(err, errEnd) { + if markErr := i.p.markAsDone(ctx); err != nil { + return markErr + } return nil } + + // Prompt. if errors.Is(err, errWait) { choice, waitErr := i.wait(ctx) // puts the loop on hold. if waitErr != nil { @@ -142,7 +152,9 @@ func (i *iterator) Process(ctx context.Context) (err error) { next = choice continue } + if err != nil { + // TODO: mark as failed? return fmt.Errorf("run job: %v", err) } next = n @@ -169,15 +181,15 @@ func (i *iterator) runJob(ctx context.Context, id uuid.UUID) (uuid.UUID, error) } next, err := s.exec(ctx) - if err != nil { - if err == io.EOF { + if errors.Is(err, io.EOF) { + if wl.End { + return uuid.Nil, errEnd + } else { return uuid.Nil, err } - return uuid.Nil, fmt.Errorf("link %s with manager %s (%s) couldn't be executed: %v", id, wl.Manager, wl.Description, err) } - - if wl.End { - return uuid.Nil, io.EOF + if err != nil { + return uuid.Nil, fmt.Errorf("link %s with manager %s (%s) couldn't be executed: %v", id, wl.Manager, wl.Description, err) } // Workflow needs to be reactivated by another watched directory. diff --git a/hack/ccp/internal/controller/package.go b/hack/ccp/internal/controller/package.go index 0c311c427..5b8fc7dd8 100644 --- a/hack/ccp/internal/controller/package.go +++ b/hack/ccp/internal/controller/package.go @@ -170,7 +170,7 @@ func NewTransferPackage( defer func() { if err != nil { - logger.Info("Opsie!", "err", err) + logger.Info("Oopsie!", "err", err) } else { logger.Info("Done!") } @@ -731,6 +731,7 @@ func dirBasename(path string) string { // baseReplacements returns replacements needed by all unit types. func baseReplacements(p *Package) replacementMapping { path := p.Path() + return map[string]replacement{ "%SIPUUID%": replacement(p.id.String()), "%SIPName%": replacement(p.Name()), @@ -875,7 +876,23 @@ func (rm replacementMapping) replaceValues(input string) string { } for k, v := range rm { - input = strings.ReplaceAll(input, k, v.escape()) + escaped := v.escape() + + // Unfortunately MCPClient expects paths to sit under "/var/archivematica/sharedDirectory". + // If CCP is using a different basepath, we have to replace it otherwise MCPClient won't find it. + sep := string(filepath.Separator) + if strings.HasPrefix(escaped, sep) { + tailed := strings.HasSuffix(escaped, sep) + parts := strings.Split(escaped, sep) + if len(parts) > 3 && parts[1] == "tmp" && strings.HasPrefix(parts[2], "ccp-sharedDir") { + escaped = joinPath("/var/archivematica/sharedDirectory", filepath.Join(parts[3:]...)) + if tailed { + escaped += sep + } + } + } + + input = strings.ReplaceAll(input, k, escaped) } return input diff --git a/hack/ccp/internal/ssclient/ssclient.go b/hack/ccp/internal/ssclient/ssclient.go index ff5b56ffd..03ae575c2 100644 --- a/hack/ccp/internal/ssclient/ssclient.go +++ b/hack/ccp/internal/ssclient/ssclient.go @@ -99,7 +99,7 @@ func (c *clientImpl) ReadDefaultLocation(ctx context.Context, purpose enums.Loca return nil, err } - // We're asking for a models.Locationable using ByUuuid while rewriting the + // We're asking for a models.Locationable using ByUuid while rewriting the // URL template to hit the Default Location API instead. ssclient-go follows // the redirects automatically, so we don't have to. //