GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/fetch.cc
Date: 2025-10-19 02:35:28
Exec Total Coverage
Lines: 155 157 98.7%
Branches: 101 173 58.4%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5
6 #include "fetch.h"
7
8 #include <unistd.h>
9
10 #include "backoff.h"
11 #include "cache.h"
12 #include "clientctx.h"
13 #include "interrupt.h"
14 #include "network/download.h"
15 #include "quota.h"
16 #include "statistics.h"
17 #include "util/concurrency.h"
18 #include "util/logging.h"
19 #include "util/posix.h"
20
21 using namespace std; // NOLINT
22
23 namespace cvmfs {
24
25 80 void TLSDestructor(void *data) {
26 80 Fetcher::ThreadLocalStorage *tls = static_cast<Fetcher::ThreadLocalStorage *>(
27 data);
28 80 std::vector<Fetcher::ThreadLocalStorage *> *tls_blocks = &tls->fetcher
29 ->tls_blocks_;
30
31 {
32 80 const MutexLockGuard m(tls->fetcher->lock_tls_blocks_);
33 80 for (vector<Fetcher::ThreadLocalStorage *>::iterator
34 80 i = tls_blocks->begin(),
35 80 iEnd = tls_blocks->end();
36
1/2
✓ Branch 1 taken 160 times.
✗ Branch 2 not taken.
160 i != iEnd;
37 80 ++i) {
38
2/2
✓ Branch 1 taken 80 times.
✓ Branch 2 taken 80 times.
160 if ((*i) == tls) {
39
1/2
✓ Branch 2 taken 80 times.
✗ Branch 3 not taken.
80 tls_blocks->erase(i);
40 80 break;
41 }
42 }
43 80 }
44 80 tls->fetcher->CleanupTls(tls);
45 80 }
46
47
48 /**
49 * Called when a thread exists, releases a ThreadLocalStorage object and
50 * removes the pointer to it from tls_blocks_.
51 */
52 1186 void Fetcher::CleanupTls(ThreadLocalStorage *tls) {
53 1186 ClosePipe(tls->pipe_wait);
54
1/2
✓ Branch 0 taken 1186 times.
✗ Branch 1 not taken.
1186 delete tls;
55 1186 }
56
57
58 /**
59 * Initialized thread-local storage if called the first time in a new thread.
60 */
61 2043 Fetcher::ThreadLocalStorage *Fetcher::GetTls() {
62 ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
63 2043 pthread_getspecific(thread_local_storage_));
64
2/2
✓ Branch 0 taken 857 times.
✓ Branch 1 taken 1186 times.
2043 if (tls != NULL)
65 857 return tls;
66
67
2/4
✓ Branch 1 taken 1186 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1186 times.
✗ Branch 5 not taken.
1186 tls = new ThreadLocalStorage();
68 1186 tls->fetcher = this;
69
1/2
✓ Branch 1 taken 1186 times.
✗ Branch 2 not taken.
1186 MakePipe(tls->pipe_wait);
70 1186 tls->download_job.SetCompressed(true);
71 1186 tls->download_job.SetProbeHosts(true);
72 1186 const int retval = pthread_setspecific(thread_local_storage_, tls);
73
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1186 times.
1186 assert(retval == 0);
74
75 1186 const MutexLockGuard m(lock_tls_blocks_);
76
1/2
✓ Branch 1 taken 1186 times.
✗ Branch 2 not taken.
1186 tls_blocks_.push_back(tls);
77
78 1186 return tls;
79 1186 }
80
81
82 2433 int Fetcher::Fetch(const CacheManager::LabeledObject &object,
83 const std::string &alt_url) {
84 int fd_return; // Read-only file descriptor that is returned
85 int retval;
86
87 2433 perf::Inc(n_invocations);
88
89 // Try to open from local cache
90
3/4
✓ Branch 1 taken 2433 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 790 times.
✓ Branch 4 taken 1643 times.
2433 if ((fd_return = OpenSelect(object)) >= 0) {
91
1/2
✓ Branch 2 taken 790 times.
✗ Branch 3 not taken.
790 LogCvmfs(kLogCache, kLogDebug, "hit: %s", object.label.path.c_str());
92 790 return fd_return;
93 }
94
95
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1643 times.
1643 if (object.id.IsNull()) {
96 // This has been seen when trying to load the root catalog signed by an
97 // invalid certificate on an empty cache
98 // TODO(jblomer): check if still necessary after the catalog reload refactor
99 LogCvmfs(kLogCache, kLogDebug, "cancel attempt to download null hash");
100 return -EIO;
101 }
102
103
1/2
✓ Branch 1 taken 1643 times.
✗ Branch 2 not taken.
1643 ThreadLocalStorage *tls = GetTls();
104
105 // Synchronization point: either act as a master thread for this object or
106 // enqueue to the list of waiting threads.
107 1643 pthread_mutex_lock(lock_queues_download_);
108 1643 const ThreadQueues::iterator iDownloadQueue = queues_download_.find(
109
1/2
✓ Branch 1 taken 1643 times.
✗ Branch 2 not taken.
1643 object.id);
110
2/2
✓ Branch 2 taken 40 times.
✓ Branch 3 taken 1603 times.
1643 if (iDownloadQueue != queues_download_.end()) {
111
1/2
✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
40 LogCvmfs(kLogCache, kLogDebug, "waiting for download of %s",
112 object.label.path.c_str());
113
114
1/2
✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
40 iDownloadQueue->second->push_back(tls->pipe_wait[1]);
115 40 pthread_mutex_unlock(lock_queues_download_);
116
1/2
✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
40 ReadPipe(tls->pipe_wait[0], &fd_return, sizeof(int));
117
118
1/2
✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
40 LogCvmfs(kLogCache, kLogDebug, "received from another thread fd %d for %s",
119 fd_return, object.label.path.c_str());
120 40 return fd_return;
121 } else {
122 // Seems we are the first one, check again in the cache (race condition)
123
1/2
✓ Branch 1 taken 1603 times.
✗ Branch 2 not taken.
1603 fd_return = OpenSelect(object);
124
2/2
✓ Branch 0 taken 40 times.
✓ Branch 1 taken 1563 times.
1603 if (fd_return >= 0) {
125 40 pthread_mutex_unlock(lock_queues_download_);
126 40 return fd_return;
127 }
128
129 // Create a new queue for this chunk
130
1/2
✓ Branch 1 taken 1563 times.
✗ Branch 2 not taken.
1563 queues_download_[object.id] = &tls->other_pipes_waiting;
131 1563 pthread_mutex_unlock(lock_queues_download_);
132 }
133
134 1563 perf::Inc(n_downloads);
135
136 // Involve the download manager
137
1/2
✓ Branch 2 taken 1563 times.
✗ Branch 3 not taken.
1563 LogCvmfs(kLogCache, kLogDebug, "downloading %s", object.label.path.c_str());
138 1563 std::string url;
139
2/2
✓ Branch 1 taken 120 times.
✓ Branch 2 taken 1443 times.
1563 if (object.label.IsExternal()) {
140
2/4
✓ Branch 1 taken 120 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 120 times.
✗ Branch 5 not taken.
120 url = !alt_url.empty() ? alt_url : object.label.path;
141 } else {
142
8/14
✓ Branch 1 taken 40 times.
✓ Branch 2 taken 1403 times.
✓ Branch 4 taken 40 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1403 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 1403 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 1443 times.
✗ Branch 14 not taken.
✓ Branch 18 taken 1403 times.
✓ Branch 19 taken 40 times.
✗ Branch 22 not taken.
✗ Branch 23 not taken.
1443 url = "/" + (alt_url.size() ? alt_url : "data/" + object.id.MakePath());
143 }
144
1/2
✓ Branch 1 taken 1563 times.
✗ Branch 2 not taken.
1563 void *txn = alloca(cache_mgr_->SizeOfTxn());
145
1/2
✓ Branch 1 taken 1563 times.
✗ Branch 2 not taken.
1563 retval = cache_mgr_->StartTxn(object.id, object.label.size, txn);
146
2/2
✓ Branch 0 taken 40 times.
✓ Branch 1 taken 1523 times.
1563 if (retval < 0) {
147
1/2
✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
40 LogCvmfs(kLogCache, kLogDebug, "could not start transaction on %s",
148 object.label.path.c_str());
149
1/2
✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
40 SignalWaitingThreads(retval, object.id, tls);
150 40 return retval;
151 }
152
1/2
✓ Branch 1 taken 1523 times.
✗ Branch 2 not taken.
1523 cache_mgr_->CtrlTxn(object.label, 0, txn);
153
154
1/2
✓ Branch 3 taken 1523 times.
✗ Branch 4 not taken.
1523 LogCvmfs(kLogCache, kLogDebug, "miss: %s %s", object.label.path.c_str(),
155 url.c_str());
156 1523 TransactionSink sink(cache_mgr_, txn);
157 1523 tls->download_job.SetUrl(&url);
158 1523 tls->download_job.SetSink(&sink);
159 1523 tls->download_job.SetExpectedHash(&object.id);
160 1523 tls->download_job.SetExtraInfo(&object.label.path);
161
1/2
✓ Branch 1 taken 1523 times.
✗ Branch 2 not taken.
1523 ClientCtx *ctx = ClientCtx::GetInstance();
162
3/4
✓ Branch 1 taken 1523 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 124 times.
✓ Branch 4 taken 1399 times.
1523 if (ctx->IsSet()) {
163
1/2
✓ Branch 5 taken 124 times.
✗ Branch 6 not taken.
124 ctx->Get(tls->download_job.GetUidPtr(),
164 tls->download_job.GetGidPtr(),
165 tls->download_job.GetPidPtr(),
166 tls->download_job.GetInterruptCuePtr());
167 } else {
168 1399 *(tls->download_job.GetUidPtr()) = -1;
169 1399 *(tls->download_job.GetGidPtr()) = -1;
170 1399 *(tls->download_job.GetPidPtr()) = -1;
171 1399 *(tls->download_job.GetInterruptCuePtr()) = NULL;
172 }
173 1523 tls->download_job.SetCompressed(object.label.zip_algorithm
174 == zlib::kZlibDefault);
175 1523 tls->download_job.SetRangeOffset(object.label.range_offset);
176 1523 tls->download_job.SetRangeSize(static_cast<int64_t>(object.label.size));
177
1/2
✓ Branch 1 taken 1523 times.
✗ Branch 2 not taken.
1523 download_mgr_->Fetch(&tls->download_job);
178
179
2/2
✓ Branch 1 taken 1319 times.
✓ Branch 2 taken 204 times.
1523 if (tls->download_job.error_code() == download::kFailOk) {
180
1/2
✓ Branch 2 taken 1319 times.
✗ Branch 3 not taken.
1319 LogCvmfs(kLogCache, kLogDebug, "finished downloading of %s", url.c_str());
181
182
1/2
✓ Branch 1 taken 1319 times.
✗ Branch 2 not taken.
1319 fd_return = cache_mgr_->OpenFromTxn(txn);
183
2/2
✓ Branch 0 taken 40 times.
✓ Branch 1 taken 1279 times.
1319 if (fd_return < 0) {
184
1/2
✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
40 cache_mgr_->AbortTxn(txn);
185
1/2
✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
40 SignalWaitingThreads(fd_return, object.id, tls);
186 40 return fd_return;
187 }
188
189
1/2
✓ Branch 1 taken 1279 times.
✗ Branch 2 not taken.
1279 retval = cache_mgr_->CommitTxn(txn);
190
2/2
✓ Branch 0 taken 80 times.
✓ Branch 1 taken 1199 times.
1279 if (retval < 0) {
191
1/2
✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
80 cache_mgr_->Close(fd_return);
192
1/2
✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
80 SignalWaitingThreads(retval, object.id, tls);
193 80 return retval;
194 }
195
1/2
✓ Branch 1 taken 1199 times.
✗ Branch 2 not taken.
1199 SignalWaitingThreads(fd_return, object.id, tls);
196 1199 return fd_return;
197 }
198
199 // Download failed
200
1/5
✗ Branch 4 not taken.
✓ Branch 5 taken 204 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
408 LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr,
201 "failed to fetch %s (hash: %s, error %d [%s])",
202
1/2
✓ Branch 1 taken 204 times.
✗ Branch 2 not taken.
408 object.label.path.c_str(), object.id.ToString().c_str(),
203 204 tls->download_job.error_code(),
204 download::Code2Ascii(tls->download_job.error_code()));
205
1/2
✓ Branch 1 taken 204 times.
✗ Branch 2 not taken.
204 cache_mgr_->AbortTxn(txn);
206
1/2
✓ Branch 1 taken 204 times.
✗ Branch 2 not taken.
204 backoff_throttle_->Throttle();
207
1/2
✓ Branch 1 taken 204 times.
✗ Branch 2 not taken.
204 SignalWaitingThreads(-EIO, object.id, tls);
208 204 return -EIO;
209 1563 }
210
211
212 3202 Fetcher::Fetcher(CacheManager *cache_mgr,
213 download::DownloadManager *download_mgr,
214 BackoffThrottle *backoff_throttle,
215 3202 perf::StatisticsTemplate statistics)
216 3202 : lock_queues_download_(NULL)
217 3202 , lock_tls_blocks_(NULL)
218 3202 , cache_mgr_(cache_mgr)
219 3202 , download_mgr_(download_mgr)
220 3202 , backoff_throttle_(backoff_throttle) {
221 int retval;
222 3202 retval = pthread_key_create(&thread_local_storage_, TLSDestructor);
223
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3202 times.
3202 assert(retval == 0);
224 3202 lock_queues_download_ = reinterpret_cast<pthread_mutex_t *>(
225 3202 smalloc(sizeof(pthread_mutex_t)));
226 3202 retval = pthread_mutex_init(lock_queues_download_, NULL);
227
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3202 times.
3202 assert(retval == 0);
228 3202 lock_tls_blocks_ = reinterpret_cast<pthread_mutex_t *>(
229 3202 smalloc(sizeof(pthread_mutex_t)));
230 3202 retval = pthread_mutex_init(lock_tls_blocks_, NULL);
231
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3202 times.
3202 assert(retval == 0);
232
3/6
✓ Branch 2 taken 3202 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 3202 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 3202 times.
✗ Branch 10 not taken.
3202 n_downloads = statistics.RegisterTemplated(
233 "n_downloads",
234 "overall number of downloaded files (incl. catalogs, chunks)");
235
3/6
✓ Branch 2 taken 3202 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 3202 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 3202 times.
✗ Branch 10 not taken.
3202 n_invocations = statistics.RegisterTemplated(
236 "n_invocations",
237 "overall number of object requests (incl. catalogs, chunks)");
238 3202 }
239
240
241 3202 Fetcher::~Fetcher() {
242 int retval;
243
244 {
245 3202 const MutexLockGuard m(lock_tls_blocks_);
246
2/2
✓ Branch 1 taken 1106 times.
✓ Branch 2 taken 3202 times.
4308 for (unsigned i = 0; i < tls_blocks_.size(); ++i)
247 1106 CleanupTls(tls_blocks_[i]);
248 3202 }
249
250 3202 retval = pthread_mutex_destroy(lock_tls_blocks_);
251
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3202 times.
3202 assert(retval == 0);
252 3202 free(lock_tls_blocks_);
253
254 3202 retval = pthread_mutex_destroy(lock_queues_download_);
255
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3202 times.
3202 assert(retval == 0);
256 3202 free(lock_queues_download_);
257
258 3202 retval = pthread_key_delete(thread_local_storage_);
259
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3202 times.
3202 assert(retval == 0);
260 3202 }
261
262
263 /**
264 * Depending on the object type, uses either Open() or OpenPinned() from the
265 * cache manager
266 */
267 4036 int Fetcher::OpenSelect(const CacheManager::LabeledObject &object) {
268
5/6
✓ Branch 1 taken 1135 times.
✓ Branch 2 taken 2901 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 1135 times.
✓ Branch 6 taken 2901 times.
✓ Branch 7 taken 1135 times.
4036 if (object.label.IsCatalog() || object.label.IsPinned()) {
269 2901 return cache_mgr_->OpenPinned(object);
270 } else {
271 1135 return cache_mgr_->Open(object);
272 }
273 }
274
275
276 1683 void Fetcher::SignalWaitingThreads(const int fd,
277 const shash::Any &id,
278 ThreadLocalStorage *tls) {
279 1683 const MutexLockGuard m(lock_queues_download_);
280
2/2
✓ Branch 1 taken 160 times.
✓ Branch 2 taken 1683 times.
1843 for (unsigned i = 0, s = tls->other_pipes_waiting.size(); i < s; ++i) {
281
3/4
✓ Branch 0 taken 120 times.
✓ Branch 1 taken 40 times.
✓ Branch 3 taken 120 times.
✗ Branch 4 not taken.
160 int fd_dup = (fd >= 0) ? cache_mgr_->Dup(fd) : fd;
282
1/2
✓ Branch 2 taken 160 times.
✗ Branch 3 not taken.
160 WritePipe(tls->other_pipes_waiting[i], &fd_dup, sizeof(int));
283 }
284 1683 tls->other_pipes_waiting.clear();
285
1/2
✓ Branch 1 taken 1683 times.
✗ Branch 2 not taken.
1683 queues_download_.erase(id);
286 1683 }
287
288 } // namespace cvmfs
289