GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/upload_s3.h
Date: 2025-07-13 02:35:07
Exec Total Coverage
Lines: 9 10 90.0%
Branches: 3 8 37.5%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_UPLOAD_S3_H_
6 #define CVMFS_UPLOAD_S3_H_
7
8 #include <pthread.h>
9
10 #include <string>
11 #include <utility>
12 #include <vector>
13
14 #include "network/s3fanout.h"
15 #include "upload_facility.h"
16 #include "util/atomic.h"
17 #include "util/file_backed_buffer.h"
18 #include "util/pointer.h"
19 #include "util/single_copy.h"
20
21 namespace upload {
22
23 struct S3StreamHandle : public UploadStreamHandle {
24 505 S3StreamHandle(const CallbackTN *commit_callback,
25 uint64_t in_memory_threshold,
26 const std::string &tmp_dir = "/tmp/")
27
1/2
✓ Branch 2 taken 505 times.
✗ Branch 3 not taken.
505 : UploadStreamHandle(commit_callback) {
28
2/4
✓ Branch 1 taken 505 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 505 times.
✗ Branch 5 not taken.
505 buffer = FileBackedBuffer::Create(in_memory_threshold, tmp_dir);
29 505 }
30
31 // Ownership is later transferred to the S3 fanout
32 UniquePtr<FileBackedBuffer> buffer;
33 };
34
35 /**
36 * The S3Spooler implements the AbstractSpooler interface to push files
37 * into a S3 CVMFS repository backend.
38 * For a detailed description of the classes interface please have a look into
39 * the AbstractSpooler base class.
40 */
41 class S3Uploader : public AbstractUploader {
42 public:
43 explicit S3Uploader(const SpoolerDefinition &spooler_definition);
44 virtual ~S3Uploader();
45 static bool WillHandle(const SpoolerDefinition &spooler_definition);
46
47 virtual std::string name() const { return "S3"; }
48
49 virtual bool Create();
50
51 /**
52 * Upload() is not done concurrently in the current implementation of the
53 * S3Spooler, since it is a simple move or copy of a file without CPU
54 * intensive operation
55 * This method calls NotifyListeners and invokes a callback for all
56 * registered listeners (see the Observable template for details).
57 */
58 virtual void DoUpload(const std::string &remote_path,
59 IngestionSource *source,
60 const CallbackTN *callback);
61
62 virtual UploadStreamHandle *InitStreamedUpload(
63 const CallbackTN *callback = NULL);
64 virtual void StreamedUpload(UploadStreamHandle *handle, UploadBuffer buffer,
65 const CallbackTN *callback);
66 virtual void FinalizeStreamedUpload(UploadStreamHandle *handle,
67 const shash::Any &content_hash);
68
69 virtual void DoRemoveAsync(const std::string &file_to_delete);
70 virtual bool Peek(const std::string &path);
71 virtual bool Mkdir(const std::string &path);
72 virtual bool PlaceBootstrappingShortcut(const shash::Any &object);
73
74 virtual unsigned int GetNumberOfErrors() const;
75 int64_t DoGetObjectSize(const std::string &file_name);
76
77 // Only for testing
78 15 s3fanout::S3FanoutManager *GetS3FanoutManager() {
79 15 return s3fanout_mgr_.weak_ref();
80 }
81
82 private:
83 static const unsigned kDefaultPort = 80;
84 static const unsigned kHttpsPort = 443;
85 static const unsigned kDefaultNumParallelUploads = 16;
86 static const unsigned kDefaultNumRetries = 3;
87 static const unsigned kDefaultTimeoutSec = 60;
88 static const unsigned kDefaultBackoffInitMs = 100;
89 static const unsigned kDefaultBackoffMaxMs = 2000;
90 static const unsigned kInMemoryObjectThreshold = 500 * 1024; // 500KiB
91
92 // Used to make the async HTTP requests synchronous in Peek() Create(),
93 // and Upload() of single bits
94 struct RequestCtrl : SingleCopy {
95 2560 RequestCtrl() : return_code(-1), callback_forward(NULL) {
96 2560 pipe_wait[0] = pipe_wait[1] = -1;
97 2560 }
98
99 void WaitFor();
100
101 int return_code;
102 const CallbackTN *callback_forward;
103 std::string original_path;
104 int pipe_wait[2];
105 };
106
107 void OnReqComplete(const upload::UploaderResults &results, RequestCtrl *ctrl);
108
109 static void *MainCollectResults(void *data);
110
111 bool ParseSpoolerDefinition(const SpoolerDefinition &spooler_definition);
112 void UploadJobInfo(s3fanout::JobInfo *info);
113
114 s3fanout::JobInfo *CreateJobInfo(const std::string &path) const;
115
116 UniquePtr<s3fanout::S3FanoutManager> s3fanout_mgr_;
117 std::string repository_alias_;
118 std::string host_name_port_;
119 std::string host_name_;
120 std::string region_;
121 std::string flavor_;
122 std::string bucket_;
123 bool dns_buckets_;
124 int num_parallel_uploads_;
125 unsigned num_retries_;
126 unsigned timeout_sec_;
127 std::string access_key_;
128 std::string secret_key_;
129 s3fanout::AuthzMethods authz_method_;
130 bool peek_before_put_;
131 bool use_https_;
132 std::string proxy_;
133
134 const std::string temporary_path_;
135 mutable atomic_int32 io_errors_;
136 pthread_t thread_collect_results_;
137
138 std::string x_amz_acl_;
139 }; // S3Uploader
140
141 } // namespace upload
142
143 #endif // CVMFS_UPLOAD_S3_H_
144