GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/upload_s3.h
Date: 2026-03-15 02:35:27
Exec Total Coverage
Lines: 9 10 90.0%
Branches: 3 8 37.5%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_UPLOAD_S3_H_
6 #define CVMFS_UPLOAD_S3_H_
7
8 #include <pthread.h>
9
10 #include <string>
11 #include <utility>
12 #include <vector>
13
14 #include "network/s3fanout.h"
15 #include "upload_facility.h"
16 #include "util/atomic.h"
17 #include "util/file_backed_buffer.h"
18 #include "util/pointer.h"
19 #include "util/single_copy.h"
20
21 namespace upload {
22
23 struct S3StreamHandle : public UploadStreamHandle {
24 4141 S3StreamHandle(const CallbackTN *commit_callback,
25 uint64_t in_memory_threshold,
26 const std::string &tmp_dir = "/tmp/")
27
1/2
✓ Branch 2 taken 4141 times.
✗ Branch 3 not taken.
4141 : UploadStreamHandle(commit_callback) {
28
2/4
✓ Branch 1 taken 4141 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 4141 times.
✗ Branch 5 not taken.
4141 buffer = FileBackedBuffer::Create(in_memory_threshold, tmp_dir);
29 4141 }
30
31 // Ownership is later transferred to the S3 fanout
32 UniquePtr<FileBackedBuffer> buffer;
33 };
34
35 /**
36 * The S3Spooler implements the AbstractSpooler interface to push files
37 * into a S3 CVMFS repository backend.
38 * For a detailed description of the classes interface please have a look into
39 * the AbstractSpooler base class.
40 */
41 class S3Uploader : public AbstractUploader {
42 public:
43 explicit S3Uploader(const SpoolerDefinition &spooler_definition);
44 virtual ~S3Uploader();
45 static bool WillHandle(const SpoolerDefinition &spooler_definition);
46
47 virtual std::string name() const { return "S3"; }
48
49 virtual bool Create();
50
51 /**
52 * Upload() is not done concurrently in the current implementation of the
53 * S3Spooler, since it is a simple move or copy of a file without CPU
54 * intensive operation
55 * This method calls NotifyListeners and invokes a callback for all
56 * registered listeners (see the Observable template for details).
57 */
58 virtual void DoUpload(const std::string &remote_path,
59 IngestionSource *source,
60 const CallbackTN *callback);
61
62 virtual UploadStreamHandle *InitStreamedUpload(
63 const CallbackTN *callback = NULL);
64 virtual void StreamedUpload(UploadStreamHandle *handle, UploadBuffer buffer,
65 const CallbackTN *callback);
66 virtual void FinalizeStreamedUpload(UploadStreamHandle *handle,
67 const shash::Any &content_hash);
68
69 virtual void DoRemoveAsync(const std::string &file_to_delete);
70 virtual void WaitForUpload() const;
71 virtual bool Peek(const std::string &path);
72 virtual bool Mkdir(const std::string &path);
73 virtual bool PlaceBootstrappingShortcut(const shash::Any &object);
74
75 virtual unsigned int GetNumberOfErrors() const;
76 int64_t DoGetObjectSize(const std::string &file_name);
77
78 // Only for testing
79 123 s3fanout::S3FanoutManager *GetS3FanoutManager() {
80 123 return s3fanout_mgr_.weak_ref();
81 }
82
83 private:
84 static const unsigned kDefaultPort = 80;
85 static const unsigned kHttpsPort = 443;
86 static const unsigned kDefaultNumParallelUploads = 16;
87 static const unsigned kDefaultNumRetries = 3;
88 static const unsigned kDefaultTimeoutSec = 60;
89 static const unsigned kDefaultBackoffInitMs = 100;
90 static const unsigned kDefaultBackoffMaxMs = 2000;
91 static const unsigned kInMemoryObjectThreshold = 500 * 1024; // 500KiB
92
93 // Used to make the async HTTP requests synchronous in Peek() Create(),
94 // and Upload() of single bits
95 struct RequestCtrl : SingleCopy {
96 20992 RequestCtrl() : return_code(-1), callback_forward(NULL) {
97 20992 pipe_wait[0] = pipe_wait[1] = -1;
98 20992 }
99
100 void WaitFor();
101
102 int return_code;
103 const CallbackTN *callback_forward;
104 std::string original_path;
105 int pipe_wait[2];
106 };
107
108 void OnReqComplete(const upload::UploaderResults &results, RequestCtrl *ctrl);
109
110 static void *MainCollectResults(void *data);
111
112 bool ParseSpoolerDefinition(const SpoolerDefinition &spooler_definition);
113 void UploadJobInfo(s3fanout::JobInfo *info);
114
115 static const unsigned kMaxBatchDeleteSize = 1000;
116
117 s3fanout::JobInfo *CreateJobInfo(const std::string &path) const;
118 void FlushDeleteBatch() const;
119
120 mutable UniquePtr<s3fanout::S3FanoutManager> s3fanout_mgr_;
121 std::string repository_alias_;
122 std::string host_name_port_;
123 std::string host_name_;
124 std::string region_;
125 std::string flavor_;
126 std::string bucket_;
127 bool dns_buckets_;
128 int num_parallel_uploads_;
129 unsigned num_retries_;
130 unsigned timeout_sec_;
131 std::string access_key_;
132 std::string secret_key_;
133 s3fanout::AuthzMethods authz_method_;
134 bool peek_before_put_;
135 bool use_https_;
136 bool batch_delete_enabled_;
137 std::string proxy_;
138
139 const std::string temporary_path_;
140 mutable atomic_int32 io_errors_;
141 pthread_t thread_collect_results_;
142
143 std::string x_amz_acl_;
144
145 mutable pthread_mutex_t delete_batch_mutex_;
146 mutable std::vector<std::string> pending_deletes_;
147 }; // S3Uploader
148
149 } // namespace upload
150
151 #endif // CVMFS_UPLOAD_S3_H_
152