GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/fetch.h
Date: 2025-06-22 02:36:02
Exec Total Coverage
Lines: 17 24 70.8%
Branches: 3 14 21.4%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_FETCH_H_
6 #define CVMFS_FETCH_H_
7
8 #include <pthread.h>
9
10 #include <map>
11 #include <string>
12 #include <vector>
13
14 #include "cache.h"
15 #include "crypto/hash.h"
16 #include "gtest/gtest_prod.h"
17 #include "network/download.h"
18 #include "network/sink.h"
19
20 class BackoffThrottle;
21
22 namespace perf {
23 class Statistics;
24 }
25
26 namespace cvmfs {
27
28 /**
29 * TransacionSink uses an open transaction in a cache manager as a sink. It
30 * allows the download manager to write data without knowing about the cache
31 * manager.
32 */
33 class TransactionSink : public Sink {
34 public:
35 1228 TransactionSink(CacheManager *cache_mgr, void *open_txn)
36 1228 : Sink(false), cache_mgr_(cache_mgr), open_txn_(open_txn) { }
37 2456 virtual ~TransactionSink() { }
38
39 /**
40 * Appends data to the sink
41 *
42 * @returns on success: number of bytes written (can be less than requested)
43 * on failure: -errno.
44 */
45 2295 virtual int64_t Write(const void *buf, uint64_t sz) {
46 2295 return cache_mgr_->Write(buf, sz, open_txn_);
47 }
48
49 /**
50 * Truncate all written data and start over at position zero.
51 *
52 * @returns Success = 0
53 * Failure = -errno
54 */
55 53 virtual int Reset() { return cache_mgr_->Reset(open_txn_); }
56
57 /**
58 * Purges all resources leaving the sink in an invalid state.
59 * More aggressive version of Reset().
60 * For some sinks it might do the same as Reset().
61 *
62 * @returns Success = 0
63 * Failure = -errno
64 */
65 52 virtual int Purge() { return Reset(); }
66 /**
67 * @returns true if the object is correctly initialized.
68 */
69
2/4
✓ Branch 0 taken 1228 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1228 times.
✗ Branch 3 not taken.
1228 virtual bool IsValid() { return cache_mgr_ != NULL && open_txn_ != NULL; }
70
71 1228 virtual int Flush() { return 0; }
72 virtual bool Reserve(size_t /*size*/) { return true; }
73 4762 virtual bool RequiresReserve() { return false; }
74
75 /**
76 * Return a string representation describing the type of sink and its status
77 */
78 virtual std::string Describe() {
79 std::string result = "Transaction sink that is ";
80 result += IsValid() ? "valid" : "invalid";
81 return result;
82 }
83
84 private:
85 CacheManager *cache_mgr_;
86 void *open_txn_;
87 };
88
89
90 /**
91 * The Fetcher uses a cache manager and a download manager in order to provide a
92 * (virtual) file descriptor to a requested object, which is valid in the
93 * context of the cache manager.
94 * If the object is not in the cache, it is downloaded and stored in the cache.
95 *
96 * Concurrent download requests for the same id are collapsed.
97 */
98 class Fetcher : SingleCopy {
99 FRIEND_TEST(T_Fetcher, GetTls);
100 FRIEND_TEST(T_Fetcher, SignalWaitingThreads);
101 friend void *TestGetTls(void *data);
102 friend void *TestFetchCollapse(void *data);
103 friend void *TestFetchCollapse2(void *data);
104 friend void TLSDestructor(void *data);
105
106 public:
107 Fetcher(CacheManager *cache_mgr,
108 download::DownloadManager *download_mgr,
109 BackoffThrottle *backoff_throttle,
110 perf::StatisticsTemplate statistics);
111 ~Fetcher();
112
113 int Fetch(const CacheManager::LabeledObject &object,
114 const std::string &alt_url = "");
115
116 void ReplaceCacheManager(CacheManager *new_cache_mgr) {
117 cache_mgr_ = new_cache_mgr;
118 }
119 3613 CacheManager *cache_mgr() { return cache_mgr_; }
120 733 download::DownloadManager *download_mgr() { return download_mgr_; }
121
122 private:
123 /**
124 * Multiple threads might want to download the same object at the same time.
125 * If that happens, only the first thread performs the download. The other
126 * threads wait on a pipe for a notification from the first thread.
127 */
128 struct ThreadLocalStorage {
129
1/2
✓ Branch 2 taken 887 times.
✗ Branch 3 not taken.
887 ThreadLocalStorage() {
130 887 pipe_wait[0] = -1;
131 887 pipe_wait[1] = -1;
132 887 fetcher = NULL;
133 887 }
134
135 /**
136 * Used during cleanup to find tls_blocks_.
137 */
138 Fetcher *fetcher;
139 /**
140 * Wait on the reading end if another thread is already downloading the same
141 * object.
142 */
143 int pipe_wait[2];
144 /**
145 * Writer ends of all the pipes of threads that want to download the same
146 * object.
147 */
148 std::vector<int> other_pipes_waiting;
149 /**
150 * It is sufficient to construct the JobInfo object once per thread, not
151 * on every call to Fetch().
152 */
153 download::JobInfo download_job;
154 };
155
156 /**
157 * Maps currently downloaded chunks to the other_pipes_waiting member of the
158 * thread local storage of the downloading thread. This way, a thread can
159 * enqueue itself to such an other_pipes_waiting list and gets informed when
160 * the download is completed.
161 */
162 typedef std::map<shash::Any, std::vector<int> *> ThreadQueues;
163
164 ThreadLocalStorage *GetTls();
165 void CleanupTls(ThreadLocalStorage *tls);
166 void SignalWaitingThreads(const int fd, const shash::Any &id,
167 ThreadLocalStorage *tls);
168 int OpenSelect(const CacheManager::LabeledObject &object);
169
170 /**
171 * Key to the thread's ThreadLocalStorage memory
172 */
173 pthread_key_t thread_local_storage_;
174
175 ThreadQueues queues_download_;
176 pthread_mutex_t *lock_queues_download_;
177
178 /**
179 * All the threads register their thread local storage here, so that it can
180 * be cleaned up properly in the destructor of Fetcher.
181 */
182 std::vector<ThreadLocalStorage *> tls_blocks_;
183 pthread_mutex_t *lock_tls_blocks_;
184
185 CacheManager *cache_mgr_;
186 download::DownloadManager *download_mgr_;
187 BackoffThrottle *backoff_throttle_;
188 perf::Counter *n_downloads;
189 perf::Counter *n_invocations;
190 };
191
192 } // namespace cvmfs
193
194 #endif // CVMFS_FETCH_H_
195