Directory: | cvmfs/ |
---|---|
File: | cvmfs/upload_facility.cc |
Date: | 2025-02-02 02:34:22 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 77 | 93 | 82.8% |
Branches: | 26 | 55 | 47.3% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #include "upload_facility.h" | ||
6 | |||
7 | #include <cassert> | ||
8 | |||
9 | #include "upload_gateway.h" | ||
10 | #include "upload_local.h" | ||
11 | #include "upload_s3.h" | ||
12 | #include "util/exception.h" | ||
13 | |||
14 | namespace upload { | ||
15 | |||
16 | atomic_int64 UploadStreamHandle::g_upload_stream_tag = 0; | ||
17 | |||
18 | 420407 | AbstractUploader::UploadJob::UploadJob( | |
19 | UploadStreamHandle *handle, | ||
20 | UploadBuffer buffer, | ||
21 | 420407 | const CallbackTN *callback) | |
22 | 420407 | : type(Upload) | |
23 | 420407 | , stream_handle(handle) | |
24 | 420407 | , tag_(handle->tag) | |
25 | 420407 | , buffer(buffer) | |
26 | 420407 | , callback(callback) | |
27 | 420428 | { } | |
28 | |||
29 | 250699 | AbstractUploader::UploadJob::UploadJob( | |
30 | UploadStreamHandle *handle, | ||
31 | 250699 | const shash::Any &content_hash) | |
32 | 250699 | : type(Commit) | |
33 | 250699 | , stream_handle(handle) | |
34 | 250699 | , tag_(handle->tag) | |
35 | 250699 | , buffer() | |
36 | 250699 | , callback(NULL) | |
37 | 250699 | , content_hash(content_hash) | |
38 | 250699 | { } | |
39 | |||
40 | 63 | void AbstractUploader::RegisterPlugins() { | |
41 | 63 | RegisterPlugin<LocalUploader>(); | |
42 | 63 | RegisterPlugin<S3Uploader>(); | |
43 | 63 | RegisterPlugin<GatewayUploader>(); | |
44 | 63 | } | |
45 | |||
46 | 112 | AbstractUploader::AbstractUploader(const SpoolerDefinition &spooler_definition) | |
47 | 112 | : spooler_definition_(spooler_definition) | |
48 | 112 | , num_upload_tasks_(spooler_definition.num_upload_tasks) | |
49 |
3/6✓ Branch 3 taken 112 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 112 times.
✗ Branch 7 not taken.
✓ Branch 11 taken 112 times.
✗ Branch 12 not taken.
|
112 | , jobs_in_flight_(spooler_definition.number_of_concurrent_uploads) |
50 | 112 | { } | |
51 | |||
52 | |||
53 | 112 | bool AbstractUploader::Initialize() { | |
54 |
2/2✓ Branch 1 taken 136 times.
✓ Branch 2 taken 112 times.
|
248 | for (unsigned i = 0; i < GetNumTasks(); ++i) { |
55 |
1/2✓ Branch 2 taken 136 times.
✗ Branch 3 not taken.
|
136 | Tube<UploadJob> *t = new Tube<UploadJob>(); |
56 | 136 | tubes_upload_.TakeTube(t); | |
57 |
1/2✓ Branch 2 taken 136 times.
✗ Branch 3 not taken.
|
136 | tasks_upload_.TakeConsumer(new TaskUpload(this, t)); |
58 | } | ||
59 | 112 | tubes_upload_.Activate(); | |
60 | 112 | tasks_upload_.Spawn(); | |
61 | 112 | return true; | |
62 | } | ||
63 | |||
64 | 24 | bool AbstractUploader::FinalizeSession(bool /*commit*/, | |
65 | const std::string & /*old_root_hash*/, | ||
66 | const std::string & /*new_root_hash*/, | ||
67 | const RepositoryTag & /*tag*/) { | ||
68 | 24 | return true; | |
69 | } | ||
70 | |||
71 | |||
72 | 147 | int AbstractUploader::CreateAndOpenTemporaryChunkFile(std::string *path) const { | |
73 | const std::string tmp_path = | ||
74 |
3/6✓ Branch 1 taken 147 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 147 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 147 times.
✗ Branch 8 not taken.
|
294 | CreateTempPath(spooler_definition_.temporary_path + "/" + "chunk", 0644); |
75 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 147 times.
|
147 | if (tmp_path.empty()) { |
76 | ✗ | LogCvmfs(kLogSpooler, kLogStderr, | |
77 | "Failed to create temp file in %s for upload of file chunk" | ||
78 | " (errno: %d).", | ||
79 | ✗ | spooler_definition_.temporary_path.c_str(), errno); | |
80 | ✗ | return -1; | |
81 | } | ||
82 | |||
83 |
1/2✓ Branch 2 taken 147 times.
✗ Branch 3 not taken.
|
147 | const int tmp_fd = open(tmp_path.c_str(), O_WRONLY); |
84 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 147 times.
|
147 | if (tmp_fd < 0) { |
85 | ✗ | LogCvmfs(kLogSpooler, kLogStderr, | |
86 | "Failed to open temp file '%s' for upload of file chunk " | ||
87 | "(errno: %d)", | ||
88 | ✗ | tmp_path.c_str(), errno); | |
89 | ✗ | unlink(tmp_path.c_str()); | |
90 | } else { | ||
91 |
1/2✓ Branch 1 taken 147 times.
✗ Branch 2 not taken.
|
147 | *path = tmp_path; |
92 | } | ||
93 | |||
94 | 147 | return tmp_fd; | |
95 | 147 | } | |
96 | |||
97 | 112 | void AbstractUploader::TearDown() { | |
98 | 112 | tasks_upload_.Terminate(); | |
99 | 112 | } | |
100 | |||
101 | 111 | void AbstractUploader::WaitForUpload() const { jobs_in_flight_.WaitForZero(); } | |
102 | |||
103 | 2 | void AbstractUploader::InitCounters(perf::StatisticsTemplate *statistics) { | |
104 |
3/6✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 2 times.
✗ Branch 9 not taken.
|
2 | counters_ = new UploadCounters(*statistics); |
105 | 2 | } | |
106 | |||
107 | 200 | void AbstractUploader::CountUploadedChunks() const { | |
108 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 200 times.
|
200 | if (counters_.IsValid()) { |
109 | ✗ | perf::Inc(counters_->n_chunks_added); | |
110 | } | ||
111 | 200 | } | |
112 | |||
113 | ✗ | void AbstractUploader::DecUploadedChunks() const { | |
114 | ✗ | if (counters_.IsValid()) { | |
115 | ✗ | perf::Dec(counters_->n_chunks_added); | |
116 | } | ||
117 | } | ||
118 | |||
119 | 200 | void AbstractUploader::CountUploadedBytes(int64_t bytes_written) const { | |
120 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 200 times.
|
200 | if (counters_.IsValid()) { |
121 | ✗ | perf::Xadd(counters_->sz_uploaded_bytes, bytes_written); | |
122 | } | ||
123 | 200 | } | |
124 | |||
125 | ✗ | void AbstractUploader::CountDuplicates() const { | |
126 | ✗ | if (counters_.IsValid()) { | |
127 | ✗ | perf::Inc(counters_->n_chunks_duplicated); | |
128 | } | ||
129 | } | ||
130 | |||
131 | 46 | void AbstractUploader::CountUploadedCatalogs() const { | |
132 |
2/2✓ Branch 1 taken 2 times.
✓ Branch 2 taken 44 times.
|
46 | if (counters_.IsValid()) { |
133 | 2 | perf::Inc(counters_->n_catalogs_added); | |
134 | } | ||
135 | 46 | } | |
136 | |||
137 | 46 | void AbstractUploader::CountUploadedCatalogBytes(int64_t bytes_written) const { | |
138 |
2/2✓ Branch 1 taken 2 times.
✓ Branch 2 taken 44 times.
|
46 | if (counters_.IsValid()) { |
139 | 2 | perf::Xadd(counters_->sz_uploaded_catalog_bytes, bytes_written); | |
140 | } | ||
141 | 46 | } | |
142 | |||
143 | //------------------------------------------------------------------------------ | ||
144 | |||
145 | |||
146 | 671211 | void TaskUpload::Process(AbstractUploader::UploadJob *upload_job) { | |
147 |
2/3✓ Branch 0 taken 420486 times.
✓ Branch 1 taken 250726 times.
✗ Branch 2 not taken.
|
671211 | switch (upload_job->type) { |
148 | 420486 | case AbstractUploader::UploadJob::Upload: | |
149 | 420486 | uploader_->StreamedUpload( | |
150 | upload_job->stream_handle, upload_job->buffer, upload_job->callback); | ||
151 | 420486 | break; | |
152 | |||
153 | 250726 | case AbstractUploader::UploadJob::Commit: | |
154 | 250726 | uploader_->FinalizeStreamedUpload( | |
155 | 250726 | upload_job->stream_handle, upload_job->content_hash); | |
156 | 250726 | break; | |
157 | |||
158 | ✗ | default: | |
159 | ✗ | PANIC(NULL); | |
160 | } | ||
161 | |||
162 |
1/2✓ Branch 0 taken 671212 times.
✗ Branch 1 not taken.
|
671212 | delete upload_job; |
163 | 671212 | } | |
164 | |||
165 | } // namespace upload | ||
166 |