Directory: | cvmfs/ |
---|---|
File: | cvmfs/upload_facility.cc |
Date: | 2025-06-22 02:36:02 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 72 | 88 | 81.8% |
Branches: | 26 | 55 | 47.3% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #include "upload_facility.h" | ||
6 | |||
7 | #include <cassert> | ||
8 | |||
9 | #include "upload_gateway.h" | ||
10 | #include "upload_local.h" | ||
11 | #include "upload_s3.h" | ||
12 | #include "util/exception.h" | ||
13 | |||
14 | namespace upload { | ||
15 | |||
16 | atomic_int64 UploadStreamHandle::g_upload_stream_tag = 0; | ||
17 | |||
18 | 896439 | AbstractUploader::UploadJob::UploadJob(UploadStreamHandle *handle, | |
19 | UploadBuffer buffer, | ||
20 | 896439 | const CallbackTN *callback) | |
21 | 896439 | : type(Upload) | |
22 | 896439 | , stream_handle(handle) | |
23 | 896439 | , tag_(handle->tag) | |
24 | 896439 | , buffer(buffer) | |
25 | 896439 | , callback(callback) { } | |
26 | |||
27 | 259010 | AbstractUploader::UploadJob::UploadJob(UploadStreamHandle *handle, | |
28 | 259010 | const shash::Any &content_hash) | |
29 | 259010 | : type(Commit) | |
30 | 259010 | , stream_handle(handle) | |
31 | 259010 | , tag_(handle->tag) | |
32 | 259010 | , buffer() | |
33 | 259009 | , callback(NULL) | |
34 | 259009 | , content_hash(content_hash) { } | |
35 | |||
36 | 892 | void AbstractUploader::RegisterPlugins() { | |
37 | 892 | RegisterPlugin<LocalUploader>(); | |
38 | 892 | RegisterPlugin<S3Uploader>(); | |
39 | 892 | RegisterPlugin<GatewayUploader>(); | |
40 | 892 | } | |
41 | |||
42 | 2715 | AbstractUploader::AbstractUploader(const SpoolerDefinition &spooler_definition) | |
43 | 2715 | : spooler_definition_(spooler_definition) | |
44 | 2715 | , num_upload_tasks_(spooler_definition.num_upload_tasks) | |
45 |
3/6✓ Branch 3 taken 2715 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 2715 times.
✗ Branch 7 not taken.
✓ Branch 11 taken 2715 times.
✗ Branch 12 not taken.
|
2715 | , jobs_in_flight_(spooler_definition.number_of_concurrent_uploads) { } |
46 | |||
47 | |||
48 | 2715 | bool AbstractUploader::Initialize() { | |
49 |
2/2✓ Branch 1 taken 3459 times.
✓ Branch 2 taken 2715 times.
|
6174 | for (unsigned i = 0; i < GetNumTasks(); ++i) { |
50 |
1/2✓ Branch 2 taken 3459 times.
✗ Branch 3 not taken.
|
3459 | Tube<UploadJob> *t = new Tube<UploadJob>(); |
51 | 3459 | tubes_upload_.TakeTube(t); | |
52 |
1/2✓ Branch 2 taken 3459 times.
✗ Branch 3 not taken.
|
3459 | tasks_upload_.TakeConsumer(new TaskUpload(this, t)); |
53 | } | ||
54 | 2715 | tubes_upload_.Activate(); | |
55 | 2715 | tasks_upload_.Spawn(); | |
56 | 2715 | return true; | |
57 | } | ||
58 | |||
59 | 1058 | bool AbstractUploader::FinalizeSession(bool /*commit*/, | |
60 | const std::string & /*old_root_hash*/, | ||
61 | const std::string & /*new_root_hash*/, | ||
62 | const RepositoryTag & /*tag*/) { | ||
63 | 1058 | return true; | |
64 | } | ||
65 | |||
66 | |||
67 | 6650 | int AbstractUploader::CreateAndOpenTemporaryChunkFile(std::string *path) const { | |
68 | const std::string tmp_path = CreateTempPath( | ||
69 |
3/6✓ Branch 1 taken 6650 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6650 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 6650 times.
✗ Branch 8 not taken.
|
13300 | spooler_definition_.temporary_path + "/" + "chunk", 0644); |
70 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 6650 times.
|
6650 | if (tmp_path.empty()) { |
71 | ✗ | LogCvmfs(kLogSpooler, kLogStderr, | |
72 | "Failed to create temp file in %s for upload of file chunk" | ||
73 | " (errno: %d).", | ||
74 | ✗ | spooler_definition_.temporary_path.c_str(), errno); | |
75 | ✗ | return -1; | |
76 | } | ||
77 | |||
78 |
1/2✓ Branch 2 taken 6650 times.
✗ Branch 3 not taken.
|
6650 | const int tmp_fd = open(tmp_path.c_str(), O_WRONLY); |
79 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6650 times.
|
6650 | if (tmp_fd < 0) { |
80 | ✗ | LogCvmfs(kLogSpooler, kLogStderr, | |
81 | "Failed to open temp file '%s' for upload of file chunk " | ||
82 | "(errno: %d)", | ||
83 | ✗ | tmp_path.c_str(), errno); | |
84 | ✗ | unlink(tmp_path.c_str()); | |
85 | } else { | ||
86 |
1/2✓ Branch 1 taken 6650 times.
✗ Branch 2 not taken.
|
6650 | *path = tmp_path; |
87 | } | ||
88 | |||
89 | 6650 | return tmp_fd; | |
90 | 6650 | } | |
91 | |||
92 | 2714 | void AbstractUploader::TearDown() { tasks_upload_.Terminate(); } | |
93 | |||
94 | 3513 | void AbstractUploader::WaitForUpload() const { jobs_in_flight_.WaitForZero(); } | |
95 | |||
96 | 98 | void AbstractUploader::InitCounters(perf::StatisticsTemplate *statistics) { | |
97 |
3/6✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 98 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 98 times.
✗ Branch 9 not taken.
|
98 | counters_ = new UploadCounters(*statistics); |
98 | 98 | } | |
99 | |||
100 | 6200 | void AbstractUploader::CountUploadedChunks() const { | |
101 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 6200 times.
|
6200 | if (counters_.IsValid()) { |
102 | ✗ | perf::Inc(counters_->n_chunks_added); | |
103 | } | ||
104 | 6200 | } | |
105 | |||
106 | ✗ | void AbstractUploader::DecUploadedChunks() const { | |
107 | ✗ | if (counters_.IsValid()) { | |
108 | ✗ | perf::Dec(counters_->n_chunks_added); | |
109 | } | ||
110 | } | ||
111 | |||
112 | 6200 | void AbstractUploader::CountUploadedBytes(int64_t bytes_written) const { | |
113 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 6200 times.
|
6200 | if (counters_.IsValid()) { |
114 | ✗ | perf::Xadd(counters_->sz_uploaded_bytes, bytes_written); | |
115 | } | ||
116 | 6200 | } | |
117 | |||
118 | ✗ | void AbstractUploader::CountDuplicates() const { | |
119 | ✗ | if (counters_.IsValid()) { | |
120 | ✗ | perf::Inc(counters_->n_chunks_duplicated); | |
121 | } | ||
122 | } | ||
123 | |||
124 | 1802 | void AbstractUploader::CountUploadedCatalogs() const { | |
125 |
2/2✓ Branch 1 taken 98 times.
✓ Branch 2 taken 1704 times.
|
1802 | if (counters_.IsValid()) { |
126 | 98 | perf::Inc(counters_->n_catalogs_added); | |
127 | } | ||
128 | 1802 | } | |
129 | |||
130 | 1802 | void AbstractUploader::CountUploadedCatalogBytes(int64_t bytes_written) const { | |
131 |
2/2✓ Branch 1 taken 98 times.
✓ Branch 2 taken 1704 times.
|
1802 | if (counters_.IsValid()) { |
132 | 98 | perf::Xadd(counters_->sz_uploaded_catalog_bytes, bytes_written); | |
133 | } | ||
134 | 1802 | } | |
135 | |||
136 | //------------------------------------------------------------------------------ | ||
137 | |||
138 | |||
139 | 1155442 | void TaskUpload::Process(AbstractUploader::UploadJob *upload_job) { | |
140 |
2/3✓ Branch 0 taken 896419 times.
✓ Branch 1 taken 259023 times.
✗ Branch 2 not taken.
|
1155442 | switch (upload_job->type) { |
141 | 896419 | case AbstractUploader::UploadJob::Upload: | |
142 | 896419 | uploader_->StreamedUpload(upload_job->stream_handle, upload_job->buffer, | |
143 | upload_job->callback); | ||
144 | 896515 | break; | |
145 | |||
146 | 259023 | case AbstractUploader::UploadJob::Commit: | |
147 | 259023 | uploader_->FinalizeStreamedUpload(upload_job->stream_handle, | |
148 | 259023 | upload_job->content_hash); | |
149 | 259023 | break; | |
150 | |||
151 | ✗ | default: | |
152 | ✗ | PANIC(NULL); | |
153 | } | ||
154 | |||
155 |
1/2✓ Branch 0 taken 1155538 times.
✗ Branch 1 not taken.
|
1155538 | delete upload_job; |
156 | 1155538 | } | |
157 | |||
158 | } // namespace upload | ||
159 |