GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/fetch.cc
Date: 2024-04-28 02:33:07
Exec Total Coverage
Lines: 151 153 98.7%
Branches: 101 173 58.4%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "cvmfs_config.h"
6 #include "fetch.h"
7
8 #include <unistd.h>
9
10 #include "backoff.h"
11 #include "cache.h"
12 #include "clientctx.h"
13 #include "interrupt.h"
14 #include "network/download.h"
15 #include "quota.h"
16 #include "statistics.h"
17 #include "util/concurrency.h"
18 #include "util/logging.h"
19 #include "util/posix.h"
20
21 using namespace std; // NOLINT
22
23 namespace cvmfs {
24
25 2 void TLSDestructor(void *data) {
26 2 Fetcher::ThreadLocalStorage *tls =
27 static_cast<Fetcher::ThreadLocalStorage *>(data);
28 2 std::vector<Fetcher::ThreadLocalStorage *> *tls_blocks =
29 2 &tls->fetcher->tls_blocks_;
30
31 {
32 2 MutexLockGuard m(tls->fetcher->lock_tls_blocks_);
33 2 for (vector<Fetcher::ThreadLocalStorage *>::iterator
34 2 i = tls_blocks->begin(),
35 2 iEnd = tls_blocks->end();
36
1/2
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
4 i != iEnd; ++i) {
37
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
4 if ((*i) == tls) {
38
1/2
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
2 tls_blocks->erase(i);
39 2 break;
40 }
41 }
42 2 }
43 2 tls->fetcher->CleanupTls(tls);
44 2 }
45
46
47 /**
48 * Called when a thread exists, releases a ThreadLocalStorage object and
49 * removes the pointer to it from tls_blocks_.
50 */
51 31 void Fetcher::CleanupTls(ThreadLocalStorage *tls) {
52 31 ClosePipe(tls->pipe_wait);
53
1/2
✓ Branch 0 taken 31 times.
✗ Branch 1 not taken.
31 delete tls;
54 31 }
55
56
57 /**
58 * Initialized thread-local storage if called the first time in a new thread.
59 */
60 54 Fetcher::ThreadLocalStorage *Fetcher::GetTls() {
61 ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
62 54 pthread_getspecific(thread_local_storage_));
63
2/2
✓ Branch 0 taken 23 times.
✓ Branch 1 taken 31 times.
54 if (tls != NULL)
64 23 return tls;
65
66
2/4
✓ Branch 1 taken 31 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 31 times.
✗ Branch 5 not taken.
31 tls = new ThreadLocalStorage();
67 31 tls->fetcher = this;
68
1/2
✓ Branch 1 taken 31 times.
✗ Branch 2 not taken.
31 MakePipe(tls->pipe_wait);
69 31 tls->download_job.SetCompressed(true);
70 31 tls->download_job.SetProbeHosts(true);
71 31 int retval = pthread_setspecific(thread_local_storage_, tls);
72
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31 times.
31 assert(retval == 0);
73
74 31 MutexLockGuard m(lock_tls_blocks_);
75
1/2
✓ Branch 1 taken 31 times.
✗ Branch 2 not taken.
31 tls_blocks_.push_back(tls);
76
77 31 return tls;
78 31 }
79
80
81 64 int Fetcher::Fetch(
82 const CacheManager::LabeledObject &object,
83 const std::string &alt_url)
84 {
85 int fd_return; // Read-only file descriptor that is returned
86 int retval;
87
88 64 perf::Inc(n_invocations);
89
90 // Try to open from local cache
91
3/4
✓ Branch 1 taken 64 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 20 times.
✓ Branch 4 taken 44 times.
64 if ((fd_return = OpenSelect(object)) >= 0) {
92
1/2
✓ Branch 2 taken 20 times.
✗ Branch 3 not taken.
20 LogCvmfs(kLogCache, kLogDebug, "hit: %s", object.label.path.c_str());
93 20 return fd_return;
94 }
95
96
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 44 times.
44 if (object.id.IsNull()) {
97 // This has been seen when trying to load the root catalog signed by an
98 // invalid certificate on an empty cache
99 // TODO(jblomer): check if still necessary after the catalog reload refactor
100 LogCvmfs(kLogCache, kLogDebug, "cancel attempt to download null hash");
101 return -EIO;
102 }
103
104
1/2
✓ Branch 1 taken 44 times.
✗ Branch 2 not taken.
44 ThreadLocalStorage *tls = GetTls();
105
106 // Synchronization point: either act as a master thread for this object or
107 // enqueue to the list of waiting threads.
108 44 pthread_mutex_lock(lock_queues_download_);
109
1/2
✓ Branch 1 taken 44 times.
✗ Branch 2 not taken.
44 ThreadQueues::iterator iDownloadQueue = queues_download_.find(object.id);
110
2/2
✓ Branch 2 taken 1 times.
✓ Branch 3 taken 43 times.
44 if (iDownloadQueue != queues_download_.end()) {
111
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 LogCvmfs(kLogCache, kLogDebug, "waiting for download of %s",
112 object.label.path.c_str());
113
114
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 iDownloadQueue->second->push_back(tls->pipe_wait[1]);
115 1 pthread_mutex_unlock(lock_queues_download_);
116
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 ReadPipe(tls->pipe_wait[0], &fd_return, sizeof(int));
117
118
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 LogCvmfs(kLogCache, kLogDebug, "received from another thread fd %d for %s",
119 fd_return, object.label.path.c_str());
120 1 return fd_return;
121 } else {
122 // Seems we are the first one, check again in the cache (race condition)
123
1/2
✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
43 fd_return = OpenSelect(object);
124
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 42 times.
43 if (fd_return >= 0) {
125 1 pthread_mutex_unlock(lock_queues_download_);
126 1 return fd_return;
127 }
128
129 // Create a new queue for this chunk
130
1/2
✓ Branch 1 taken 42 times.
✗ Branch 2 not taken.
42 queues_download_[object.id] = &tls->other_pipes_waiting;
131 42 pthread_mutex_unlock(lock_queues_download_);
132 }
133
134 42 perf::Inc(n_downloads);
135
136 // Involve the download manager
137
1/2
✓ Branch 2 taken 42 times.
✗ Branch 3 not taken.
42 LogCvmfs(kLogCache, kLogDebug, "downloading %s", object.label.path.c_str());
138 42 std::string url;
139
2/2
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 39 times.
42 if (object.label.IsExternal()) {
140
2/4
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 3 times.
✗ Branch 5 not taken.
3 url = !alt_url.empty() ? alt_url : object.label.path;
141 } else {
142
8/14
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 38 times.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 38 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 38 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 39 times.
✗ Branch 14 not taken.
✓ Branch 18 taken 38 times.
✓ Branch 19 taken 1 times.
✗ Branch 22 not taken.
✗ Branch 23 not taken.
39 url = "/" + (alt_url.size() ? alt_url : "data/" + object.id.MakePath());
143 }
144
1/2
✓ Branch 1 taken 42 times.
✗ Branch 2 not taken.
42 void *txn = alloca(cache_mgr_->SizeOfTxn());
145
1/2
✓ Branch 1 taken 42 times.
✗ Branch 2 not taken.
42 retval = cache_mgr_->StartTxn(object.id, object.label.size, txn);
146
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 41 times.
42 if (retval < 0) {
147
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 LogCvmfs(kLogCache, kLogDebug, "could not start transaction on %s",
148 object.label.path.c_str());
149
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 SignalWaitingThreads(retval, object.id, tls);
150 1 return retval;
151 }
152
1/2
✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
41 cache_mgr_->CtrlTxn(object.label, 0, txn);
153
154
1/2
✓ Branch 3 taken 41 times.
✗ Branch 4 not taken.
41 LogCvmfs(kLogCache, kLogDebug, "miss: %s %s",
155 object.label.path.c_str(), url.c_str());
156 41 TransactionSink sink(cache_mgr_, txn);
157 41 tls->download_job.SetUrl(&url);
158 41 tls->download_job.SetSink(&sink);
159 41 tls->download_job.SetExpectedHash(&object.id);
160 41 tls->download_job.SetExtraInfo(&object.label.path);
161
1/2
✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
41 ClientCtx *ctx = ClientCtx::GetInstance();
162
3/4
✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 4 times.
✓ Branch 4 taken 37 times.
41 if (ctx->IsSet()) {
163
1/2
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
4 ctx->Get(tls->download_job.GetUidPtr(),
164 tls->download_job.GetGidPtr(),
165 tls->download_job.GetPidPtr(),
166 tls->download_job.GetInterruptCuePtr());
167 }
168 41 tls->download_job.SetCompressed(
169 41 object.label.zip_algorithm == zlib::kZlibDefault);
170 41 tls->download_job.SetRangeOffset(object.label.range_offset);
171 41 tls->download_job.SetRangeSize(static_cast<int64_t>(object.label.size));
172
1/2
✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
41 download_mgr_->Fetch(&tls->download_job);
173
174
2/2
✓ Branch 1 taken 36 times.
✓ Branch 2 taken 5 times.
41 if (tls->download_job.error_code() == download::kFailOk) {
175
1/2
✓ Branch 2 taken 36 times.
✗ Branch 3 not taken.
36 LogCvmfs(kLogCache, kLogDebug, "finished downloading of %s", url.c_str());
176
177
1/2
✓ Branch 1 taken 36 times.
✗ Branch 2 not taken.
36 fd_return = cache_mgr_->OpenFromTxn(txn);
178
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 35 times.
36 if (fd_return < 0) {
179
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 cache_mgr_->AbortTxn(txn);
180
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 SignalWaitingThreads(fd_return, object.id, tls);
181 1 return fd_return;
182 }
183
184
1/2
✓ Branch 1 taken 35 times.
✗ Branch 2 not taken.
35 retval = cache_mgr_->CommitTxn(txn);
185
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 33 times.
35 if (retval < 0) {
186
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 cache_mgr_->Close(fd_return);
187
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 SignalWaitingThreads(retval, object.id, tls);
188 2 return retval;
189 }
190
1/2
✓ Branch 1 taken 33 times.
✗ Branch 2 not taken.
33 SignalWaitingThreads(fd_return, object.id, tls);
191 33 return fd_return;
192 }
193
194 // Download failed
195
1/5
✗ Branch 4 not taken.
✓ Branch 5 taken 5 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
10 LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr,
196 "failed to fetch %s (hash: %s, error %d [%s])",
197 object.label.path.c_str(),
198
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
10 object.id.ToString().c_str(),
199 5 tls->download_job.error_code(),
200 download::Code2Ascii(tls->download_job.error_code()));
201
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 cache_mgr_->AbortTxn(txn);
202
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 backoff_throttle_->Throttle();
203
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 SignalWaitingThreads(-EIO, object.id, tls);
204 5 return -EIO;
205 42 }
206
207
208 82 Fetcher::Fetcher(
209 CacheManager *cache_mgr,
210 download::DownloadManager *download_mgr,
211 BackoffThrottle *backoff_throttle,
212 82 perf::StatisticsTemplate statistics)
213 82 : lock_queues_download_(NULL)
214 82 , lock_tls_blocks_(NULL)
215 82 , cache_mgr_(cache_mgr)
216 82 , download_mgr_(download_mgr)
217 82 , backoff_throttle_(backoff_throttle)
218 {
219 int retval;
220 82 retval = pthread_key_create(&thread_local_storage_, TLSDestructor);
221
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 82 times.
82 assert(retval == 0);
222 82 lock_queues_download_ = reinterpret_cast<pthread_mutex_t *>(
223 82 smalloc(sizeof(pthread_mutex_t)));
224 82 retval = pthread_mutex_init(lock_queues_download_, NULL);
225
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 82 times.
82 assert(retval == 0);
226 82 lock_tls_blocks_ = reinterpret_cast<pthread_mutex_t *>(
227 82 smalloc(sizeof(pthread_mutex_t)));
228 82 retval = pthread_mutex_init(lock_tls_blocks_, NULL);
229
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 82 times.
82 assert(retval == 0);
230
3/6
✓ Branch 2 taken 82 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 82 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 82 times.
✗ Branch 10 not taken.
82 n_downloads = statistics.RegisterTemplated("n_downloads",
231 "overall number of downloaded files (incl. catalogs, chunks)");
232
3/6
✓ Branch 2 taken 82 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 82 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 82 times.
✗ Branch 10 not taken.
82 n_invocations = statistics.RegisterTemplated("n_invocations",
233 "overall number of object requests (incl. catalogs, chunks)");
234 82 }
235
236
237 82 Fetcher::~Fetcher() {
238 int retval;
239
240 {
241 82 MutexLockGuard m(lock_tls_blocks_);
242
2/2
✓ Branch 1 taken 29 times.
✓ Branch 2 taken 82 times.
111 for (unsigned i = 0; i < tls_blocks_.size(); ++i)
243 29 CleanupTls(tls_blocks_[i]);
244 82 }
245
246 82 retval = pthread_mutex_destroy(lock_tls_blocks_);
247
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 82 times.
82 assert(retval == 0);
248 82 free(lock_tls_blocks_);
249
250 82 retval = pthread_mutex_destroy(lock_queues_download_);
251
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 82 times.
82 assert(retval == 0);
252 82 free(lock_queues_download_);
253
254 82 retval = pthread_key_delete(thread_local_storage_);
255
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 82 times.
82 assert(retval == 0);
256 82 }
257
258
259 /**
260 * Depending on the object type, uses either Open() or OpenPinned() from the
261 * cache manager
262 */
263 107 int Fetcher::OpenSelect(const CacheManager::LabeledObject &object) {
264
5/6
✓ Branch 1 taken 28 times.
✓ Branch 2 taken 79 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 28 times.
✓ Branch 6 taken 79 times.
✓ Branch 7 taken 28 times.
107 if (object.label.IsCatalog() || object.label.IsPinned()) {
265 79 return cache_mgr_->OpenPinned(object);
266 } else {
267 28 return cache_mgr_->Open(object);
268 }
269 }
270
271
272 45 void Fetcher::SignalWaitingThreads(
273 const int fd,
274 const shash::Any &id,
275 ThreadLocalStorage *tls)
276 {
277 45 MutexLockGuard m(lock_queues_download_);
278
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 45 times.
49 for (unsigned i = 0, s = tls->other_pipes_waiting.size(); i < s; ++i) {
279
3/4
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 times.
✓ Branch 3 taken 3 times.
✗ Branch 4 not taken.
4 int fd_dup = (fd >= 0) ? cache_mgr_->Dup(fd) : fd;
280
1/2
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
4 WritePipe(tls->other_pipes_waiting[i], &fd_dup, sizeof(int));
281 }
282 45 tls->other_pipes_waiting.clear();
283
1/2
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
45 queues_download_.erase(id);
284 45 }
285
286 } // namespace cvmfs
287