GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/fetch.cc
Date: 2026-06-28 02:36:10
Exec Total Coverage
Lines: 158 168 94.0%
Branches: 105 195 53.8%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5
6 #include "fetch.h"
7
8 #include <unistd.h>
9
10 #include "backoff.h"
11 #include "cache.h"
12 #include "clientctx.h"
13 #include "network/download.h"
14 #include "quota.h"
15 #include "statistics.h"
16 #include "util/logging.h"
17 #include "util/posix.h"
18
19 using namespace std; // NOLINT
20
21 namespace cvmfs {
22
23 2 void TLSDestructor(void *data) {
24 2 Fetcher::ThreadLocalStorage *tls = static_cast<Fetcher::ThreadLocalStorage *>(
25 data);
26 2 std::vector<Fetcher::ThreadLocalStorage *> *tls_blocks = &tls->fetcher
27 ->tls_blocks_;
28
29 {
30 2 const MutexLockGuard m(tls->fetcher->lock_tls_blocks_);
31 2 for (vector<Fetcher::ThreadLocalStorage *>::iterator
32 2 i = tls_blocks->begin(),
33 2 iEnd = tls_blocks->end();
34
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 i != iEnd;
35 2 ++i) {
36
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
4 if ((*i) == tls) {
37
1/2
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
2 tls_blocks->erase(i);
38 2 break;
39 }
40 }
41 2 }
42 2 tls->fetcher->CleanupTls(tls);
43 2 }
44
45
46 /**
47 * Called when a thread exists, releases a ThreadLocalStorage object and
48 * removes the pointer to it from tls_blocks_.
49 */
50 394 void Fetcher::CleanupTls(ThreadLocalStorage *tls) {
51 394 ClosePipe(tls->pipe_wait);
52
1/2
✓ Branch 0 taken 394 times.
✗ Branch 1 not taken.
394 delete tls;
53 394 }
54
55
56 /**
57 * Initialized thread-local storage if called the first time in a new thread.
58 */
59 625 Fetcher::ThreadLocalStorage *Fetcher::GetTls() {
60 ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
61 625 pthread_getspecific(thread_local_storage_));
62
2/2
✓ Branch 0 taken 231 times.
✓ Branch 1 taken 394 times.
625 if (tls != NULL)
63 231 return tls;
64
65
2/4
✓ Branch 1 taken 394 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 394 times.
✗ Branch 5 not taken.
394 tls = new ThreadLocalStorage();
66 394 tls->fetcher = this;
67
1/2
✓ Branch 1 taken 394 times.
✗ Branch 2 not taken.
394 MakePipe(tls->pipe_wait);
68 394 tls->download_job.SetCompressed(true);
69 394 tls->download_job.SetProbeHosts(true);
70 394 const int retval = pthread_setspecific(thread_local_storage_, tls);
71
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 394 times.
394 assert(retval == 0);
72
73 394 const MutexLockGuard m(lock_tls_blocks_);
74
1/2
✓ Branch 1 taken 394 times.
✗ Branch 2 not taken.
394 tls_blocks_.push_back(tls);
75
76 394 return tls;
77 394 }
78
79
80 830 int Fetcher::Fetch(const CacheManager::LabeledObject &object,
81 const std::string &alt_url) {
82 int fd_return; // Read-only file descriptor that is returned
83 int retval;
84
85 830 perf::Inc(n_invocations);
86
87 // Try to open from local cache
88
3/4
✓ Branch 1 taken 830 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 215 times.
✓ Branch 4 taken 615 times.
830 if ((fd_return = OpenSelect(object)) >= 0) {
89
1/2
✓ Branch 2 taken 215 times.
✗ Branch 3 not taken.
215 LogCvmfs(kLogCache, kLogDebug, "hit: %s", object.label.path.c_str());
90 215 return fd_return;
91 }
92
93
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 615 times.
615 if (object.id.IsNull()) {
94 // This has been seen when trying to load the root catalog signed by an
95 // invalid certificate on an empty cache
96 // TODO(jblomer): check if still necessary after the catalog reload refactor
97 LogCvmfs(kLogCache, kLogDebug, "cancel attempt to download null hash");
98 return -EIO;
99 }
100
101
1/2
✓ Branch 1 taken 615 times.
✗ Branch 2 not taken.
615 ThreadLocalStorage *tls = GetTls();
102
103 // Synchronization point: either act as a master thread for this object or
104 // enqueue to the list of waiting threads.
105 615 pthread_mutex_lock(lock_queues_download_);
106 615 const ThreadQueues::iterator iDownloadQueue = queues_download_.find(
107
1/2
✓ Branch 1 taken 615 times.
✗ Branch 2 not taken.
615 object.id);
108
2/2
✓ Branch 2 taken 1 times.
✓ Branch 3 taken 614 times.
615 if (iDownloadQueue != queues_download_.end()) {
109
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 LogCvmfs(kLogCache, kLogDebug, "waiting for download of %s",
110 object.label.path.c_str());
111
112
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 iDownloadQueue->second->push_back(tls->pipe_wait[1]);
113 1 pthread_mutex_unlock(lock_queues_download_);
114
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 ReadPipe(tls->pipe_wait[0], &fd_return, sizeof(int));
115
116
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 LogCvmfs(kLogCache, kLogDebug, "received from another thread fd %d for %s",
117 fd_return, object.label.path.c_str());
118 1 return fd_return;
119 } else {
120 // Seems we are the first one, check again in the cache (race condition)
121
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 fd_return = OpenSelect(object);
122
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 613 times.
614 if (fd_return >= 0) {
123 1 pthread_mutex_unlock(lock_queues_download_);
124 1 return fd_return;
125 }
126
127 // Create a new queue for this chunk
128
1/2
✓ Branch 1 taken 613 times.
✗ Branch 2 not taken.
613 queues_download_[object.id] = &tls->other_pipes_waiting;
129 613 pthread_mutex_unlock(lock_queues_download_);
130 }
131
132 613 perf::Inc(n_downloads);
133
134 // Involve the download manager
135
1/2
✓ Branch 2 taken 613 times.
✗ Branch 3 not taken.
613 LogCvmfs(kLogCache, kLogDebug, "downloading %s", object.label.path.c_str());
136 613 std::string url;
137
2/2
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 610 times.
613 if (object.label.IsExternal()) {
138
2/4
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 3 times.
✗ Branch 5 not taken.
3 url = !alt_url.empty() ? alt_url : object.label.path;
139 } else {
140
8/14
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 609 times.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 609 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 609 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 610 times.
✗ Branch 14 not taken.
✓ Branch 18 taken 609 times.
✓ Branch 19 taken 1 times.
✗ Branch 22 not taken.
✗ Branch 23 not taken.
610 url = "/" + (alt_url.size() ? alt_url : "data/" + object.id.MakePath());
141 }
142
1/2
✓ Branch 1 taken 613 times.
✗ Branch 2 not taken.
613 void *txn = alloca(cache_mgr_->SizeOfTxn());
143
1/2
✓ Branch 1 taken 613 times.
✗ Branch 2 not taken.
613 retval = cache_mgr_->StartTxn(object.id, object.label.size, txn);
144
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 612 times.
613 if (retval < 0) {
145
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 LogCvmfs(kLogCache, kLogDebug, "could not start transaction on %s",
146 object.label.path.c_str());
147
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 SignalWaitingThreads(retval, object.id, tls);
148 1 return retval;
149 }
150
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 cache_mgr_->CtrlTxn(object.label, 0, txn);
151
152
1/2
✓ Branch 3 taken 612 times.
✗ Branch 4 not taken.
612 LogCvmfs(kLogCache, kLogDebug, "miss: %s %s", object.label.path.c_str(),
153 url.c_str());
154 612 TransactionSink sink(cache_mgr_, txn);
155 612 tls->download_job.SetUrl(&url);
156 612 tls->download_job.SetSink(&sink);
157 612 tls->download_job.SetExpectedHash(&object.id);
158 612 tls->download_job.SetPathInfo(&object.label.path);
159
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 ClientCtx *ctx = ClientCtx::GetInstance();
160
3/4
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 188 times.
✓ Branch 4 taken 424 times.
612 if (ctx->IsSet()) {
161
1/2
✓ Branch 5 taken 188 times.
✗ Branch 6 not taken.
188 ctx->Get(tls->download_job.GetUidPtr(),
162 tls->download_job.GetGidPtr(),
163 tls->download_job.GetPidPtr(),
164 tls->download_job.GetInterruptCuePtr());
165 } else {
166 424 *(tls->download_job.GetUidPtr()) = -1;
167 424 *(tls->download_job.GetGidPtr()) = -1;
168 424 *(tls->download_job.GetPidPtr()) = -1;
169 424 *(tls->download_job.GetInterruptCuePtr()) = NULL;
170 }
171 612 tls->download_job.SetCompressed(object.label.zip_algorithm
172 == zlib::kZlibDefault);
173 612 tls->download_job.SetRangeOffset(object.label.range_offset);
174 612 tls->download_job.SetRangeSize(static_cast<int64_t>(object.label.size));
175
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 download_mgr_->Fetch(&tls->download_job);
176
177 // Partial replica failover: a partial Stratum-1 serves a 404 for objects it
178 // did not replicate. Retry once against the full replica and let the common
179 // success/error paths below handle the result. Only 404 triggers failover:
180 // other HTTP errors (403, 5xx, ...) also map to kFailHostHttp but indicate a
181 // server problem rather than an unreplicated object, so they must not be
182 // masked by silently fetching from elsewhere. The txn buffer is reused in
183 // place by StartTxn, so `sink` keeps pointing at the fresh transaction and
184 // needs no reconstruction.
185 612 if (tls->download_job.error_code() == download::kFailHostHttp
186
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 && tls->download_job.http_code() == 404
187
3/6
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 611 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 612 times.
613 && full_replica_download_mgr_ != NULL) {
188 LogCvmfs(kLogCache, kLogDebug | kLogSyslog,
189 "partial replica: %s not available on primary, "
190 "retrying from full replica",
191 object.label.path.c_str());
192 cache_mgr_->AbortTxn(txn);
193 retval = cache_mgr_->StartTxn(object.id, object.label.size, txn);
194 if (retval < 0) {
195 SignalWaitingThreads(retval, object.id, tls);
196 return retval;
197 }
198 cache_mgr_->CtrlTxn(object.label, 0, txn);
199 full_replica_download_mgr_->Fetch(&tls->download_job);
200 }
201
202
2/2
✓ Branch 1 taken 599 times.
✓ Branch 2 taken 13 times.
612 if (tls->download_job.error_code() == download::kFailOk) {
203
1/2
✓ Branch 2 taken 599 times.
✗ Branch 3 not taken.
599 LogCvmfs(kLogCache, kLogDebug, "finished downloading of %s", url.c_str());
204
205
1/2
✓ Branch 1 taken 599 times.
✗ Branch 2 not taken.
599 fd_return = cache_mgr_->OpenFromTxn(txn);
206
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 598 times.
599 if (fd_return < 0) {
207
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 cache_mgr_->AbortTxn(txn);
208
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 SignalWaitingThreads(fd_return, object.id, tls);
209 1 return fd_return;
210 }
211
212
1/2
✓ Branch 1 taken 598 times.
✗ Branch 2 not taken.
598 retval = cache_mgr_->CommitTxn(txn);
213
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 596 times.
598 if (retval < 0) {
214
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 cache_mgr_->Close(fd_return);
215
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 SignalWaitingThreads(retval, object.id, tls);
216 2 return retval;
217 }
218
1/2
✓ Branch 1 taken 596 times.
✗ Branch 2 not taken.
596 SignalWaitingThreads(fd_return, object.id, tls);
219 596 return fd_return;
220 }
221
222 // Download failed (primary, and full replica if configured)
223
1/5
✗ Branch 4 not taken.
✓ Branch 5 taken 13 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
26 LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr,
224 "failed to fetch %s (hash: %s, error %d [%s])",
225
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
26 object.label.path.c_str(), object.id.ToString().c_str(),
226 13 tls->download_job.error_code(),
227 download::Code2Ascii(tls->download_job.error_code()));
228
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
13 cache_mgr_->AbortTxn(txn);
229
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
13 backoff_throttle_->Throttle();
230
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
13 SignalWaitingThreads(-EIO, object.id, tls);
231 13 return -EIO;
232 613 }
233
234
235 1340 Fetcher::Fetcher(CacheManager *cache_mgr,
236 download::DownloadManager *download_mgr,
237 BackoffThrottle *backoff_throttle,
238 1340 perf::StatisticsTemplate statistics)
239 1340 : lock_queues_download_(NULL)
240 1340 , lock_tls_blocks_(NULL)
241 1340 , cache_mgr_(cache_mgr)
242 1340 , download_mgr_(download_mgr)
243 1340 , full_replica_download_mgr_(NULL)
244 1340 , backoff_throttle_(backoff_throttle) {
245 int retval;
246 1340 retval = pthread_key_create(&thread_local_storage_, TLSDestructor);
247
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1340 times.
1340 assert(retval == 0);
248 1340 lock_queues_download_ = reinterpret_cast<pthread_mutex_t *>(
249 1340 smalloc(sizeof(pthread_mutex_t)));
250 1340 retval = pthread_mutex_init(lock_queues_download_, NULL);
251
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1340 times.
1340 assert(retval == 0);
252 1340 lock_tls_blocks_ = reinterpret_cast<pthread_mutex_t *>(
253 1340 smalloc(sizeof(pthread_mutex_t)));
254 1340 retval = pthread_mutex_init(lock_tls_blocks_, NULL);
255
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1340 times.
1340 assert(retval == 0);
256
3/6
✓ Branch 2 taken 1340 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 1340 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 1340 times.
✗ Branch 10 not taken.
1340 n_downloads = statistics.RegisterTemplated(
257 "n_downloads",
258 "overall number of downloaded files (incl. catalogs, chunks)");
259
3/6
✓ Branch 2 taken 1340 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 1340 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 1340 times.
✗ Branch 10 not taken.
1340 n_invocations = statistics.RegisterTemplated(
260 "n_invocations",
261 "overall number of object requests (incl. catalogs, chunks)");
262 1340 }
263
264
265 4972 Fetcher::~Fetcher() {
266 int retval;
267
268 {
269 2680 const MutexLockGuard m(lock_tls_blocks_);
270
2/2
✓ Branch 1 taken 392 times.
✓ Branch 2 taken 1340 times.
3464 for (unsigned i = 0; i < tls_blocks_.size(); ++i)
271 784 CleanupTls(tls_blocks_[i]);
272 }
273
274 2680 retval = pthread_mutex_destroy(lock_tls_blocks_);
275
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1340 times.
2680 assert(retval == 0);
276 2680 free(lock_tls_blocks_);
277
278 2680 retval = pthread_mutex_destroy(lock_queues_download_);
279
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1340 times.
2680 assert(retval == 0);
280 2680 free(lock_queues_download_);
281
282 2680 retval = pthread_key_delete(thread_local_storage_);
283
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1340 times.
2680 assert(retval == 0);
284 4972 }
285
286
287 /**
288 * Depending on the object type, uses either Open() or OpenPinned() from the
289 * cache manager
290 */
291 1444 int Fetcher::OpenSelect(const CacheManager::LabeledObject &object) {
292
5/6
✓ Branch 1 taken 63 times.
✓ Branch 2 taken 1381 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 63 times.
✓ Branch 6 taken 1381 times.
✓ Branch 7 taken 63 times.
1444 if (object.label.IsCatalog() || object.label.IsPinned()) {
293 1381 return cache_mgr_->OpenPinned(object);
294 } else {
295 63 return cache_mgr_->Open(object);
296 }
297 }
298
299
300 616 void Fetcher::SignalWaitingThreads(const int fd,
301 const shash::Any &id,
302 ThreadLocalStorage *tls) {
303 616 const MutexLockGuard m(lock_queues_download_);
304
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 616 times.
620 for (unsigned i = 0, s = tls->other_pipes_waiting.size(); i < s; ++i) {
305
3/4
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 times.
✓ Branch 3 taken 3 times.
✗ Branch 4 not taken.
4 int fd_dup = (fd >= 0) ? cache_mgr_->Dup(fd) : fd;
306
1/2
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
4 WritePipe(tls->other_pipes_waiting[i], &fd_dup, sizeof(int));
307 }
308 616 tls->other_pipes_waiting.clear();
309
1/2
✓ Branch 1 taken 616 times.
✗ Branch 2 not taken.
616 queues_download_.erase(id);
310 616 }
311
312 } // namespace cvmfs
313