10 #include "curl/curl.h"
11 #include "cvmfs_config.h"
24 const uint32_t
kMaxNumJobs = std::numeric_limits<uint32_t>::max();
29 size_t SendCB(
void* ptr,
size_t size,
size_t nmemb,
void* userp) {
32 size_t max_chunk_size = size * nmemb;
33 if (max_chunk_size < 1) {
37 size_t current_chunk_size = 0;
38 while (current_chunk_size < max_chunk_size) {
41 const size_t read_size =
42 std::min(max_chunk_size - current_chunk_size,
44 current_chunk_size += read_size;
47 payload->
index += read_size;
50 const size_t max_read_size = max_chunk_size - current_chunk_size;
52 max_read_size, static_cast<unsigned char*>(ptr) + current_chunk_size);
53 current_chunk_size += nbytes;
61 return current_chunk_size;
64 size_t RecvCB(
void* buffer,
size_t size,
size_t nmemb,
void* userp) {
65 std::string* my_buffer =
static_cast<std::string*
>(userp);
67 if (size * nmemb < 1) {
71 *my_buffer =
static_cast<char*
>(buffer);
73 return my_buffer->size();
86 objects_dispatched_(0),
89 initialized_(false) {}
94 const std::string& session_token,
95 const std::string& key_id,
96 const std::string& secret,
97 uint64_t max_pack_size,
98 uint64_t max_queue_size) {
102 pthread_mutexattr_t attr;
103 if (pthread_mutexattr_init(&attr) ||
104 pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) ||
106 pthread_mutexattr_destroy(&attr)) {
108 "Could not initialize SessionContext lock.");
130 "Could not initialize SessionContext - Existing open object packs.");
142 const std::string& new_root_hash,
160 int64_t jobs_finished = 0;
163 results = future->
Get() && results;
169 if (old_root_hash.empty() || new_root_hash.empty()) {
172 bool commit_result =
Commit(old_root_hash, new_root_hash, tag);
173 if (!commit_result) {
175 "SessionContext: could not commit session. Aborting.");
205 const std::string& name,
206 const bool force_dispatch) {
211 "Error: Called SessionBaseContext::CommitBucket without an open "
225 if (force_dispatch) {
230 uint64_t new_size = 0;
302 const std::string& new_root_hash,
305 request_input.
Add(
"old_root_hash", old_root_hash);
306 request_input.
Add(
"new_root_hash", new_root_hash);
307 request_input.
Add(
"tag_name", tag.
name());
311 request_input.
Add(
"tag_channel", 0);
333 const std::string json_msg =
335 "\", \"payload_digest\" : \"" + payload_digest.
ToString(
false) +
348 const size_t payload_size =
352 CURL* h_curl = curl_easy_init();
359 std::string header_str = std::string(
"Authorization: ") +
key_id_ +
" " +
360 Base64(hmac.ToString(
false));
361 struct curl_slist* auth_header = NULL;
362 auth_header = curl_slist_append(auth_header, header_str.c_str());
363 header_str = std::string(
"Message-Size: ") +
StringifyInt(json_msg.size());
364 auth_header = curl_slist_append(auth_header, header_str.c_str());
365 curl_easy_setopt(h_curl, CURLOPT_HTTPHEADER, auth_header);
368 curl_easy_setopt(h_curl, CURLOPT_NOPROGRESS, 1L);
369 curl_easy_setopt(h_curl, CURLOPT_USERAGENT,
"cvmfs/" VERSION);
370 curl_easy_setopt(h_curl, CURLOPT_MAXREDIRS, 50L);
371 curl_easy_setopt(h_curl, CURLOPT_CUSTOMREQUEST,
"POST");
372 curl_easy_setopt(h_curl, CURLOPT_URL, (
api_url_ +
"/payloads").c_str());
373 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDS, NULL);
374 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDSIZE_LARGE,
375 static_cast<curl_off_t>(payload_size));
376 curl_easy_setopt(h_curl, CURLOPT_READDATA, &payload);
377 curl_easy_setopt(h_curl, CURLOPT_READFUNCTION,
SendCB);
378 curl_easy_setopt(h_curl, CURLOPT_WRITEFUNCTION,
RecvCB);
379 curl_easy_setopt(h_curl, CURLOPT_WRITEDATA, &reply);
382 CURLcode ret = curl_easy_perform(h_curl);
385 "SessionContext::DoUpload - curl_easy_perform failed: %d",
390 const JSON *reply_status =
392 const bool ok = (reply_status != NULL &&
393 std::string(reply_status->string_value) ==
"ok");
396 "SessionContext::DoUpload - error reply: %s",
400 curl_easy_cleanup(h_curl);
416 "SessionContext: could not submit payload. Aborting.");
#define LogCvmfs(source, mask,...)
void Enqueue(const T &data)
static void * UploadLoop(void *data)
void Add(const std::string &key, const std::string &val)
struct cvmcache_context * ctx
std::string description() const
bool MakeEndRequest(const std::string &method, const std::string &key_id, const std::string &secret, const std::string &session_token, const std::string &repo_service_url, const std::string &request_payload, CurlBuffer *reply)
std::string session_token_
static JSON * SearchInObject(const JSON *json_object, const std::string &name, const json_type type)
virtual Future< bool > * DispatchObjectPack(ObjectPack *pack)=0
void TransferBucket(const BucketHandle handle, ObjectPack *other)
std::string ToString(const bool with_suffix=false) const
pthread_mutex_t current_pack_mtx_
virtual bool InitializeDerived(uint64_t max_queue_size)
assert((mem||(size==0))&&"Out Of Memory")
uint64_t bytes_dispatched_
void Set(const T &object)
virtual bool Commit(const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)=0
ObjectPack::BucketHandle NewBucket()
virtual Future< bool > * DispatchObjectPack(ObjectPack *pack)
virtual bool Commit(const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)
static JsonDocument * Create(const std::string &text)
uint64_t bytes_committed_
bool CommitBucket(const ObjectPack::BucketContentType type, const shash::Any &id, const ObjectPack::BucketHandle handle, const std::string &name="", const bool force_dispatch=false)
virtual bool DoUpload(const UploadJob *job)
UniquePtr< FifoChannel< UploadJob * > > upload_jobs_
const std::string * json_message
ObjectPackProducer * pack_serializer
virtual bool FinalizeDerived()=0
void GetDigest(shash::Any *hash)
atomic_int64 objects_dispatched_
string StringifyInt(const int64_t value)
FifoChannel< Future< bool > * > upload_results_
std::vector< ObjectPack::BucketHandle > active_handles_
virtual ~SessionContextBase()
bool Finalize(bool commit, const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)
string Base64(const string &data)
void HmacString(const std::string &key, const std::string &content, Any *any_digest)
virtual bool InitializeDerived(uint64_t max_queue_size)=0
static UploadJob terminator_
size_t SendCB(void *ptr, size_t size, size_t nmemb, void *userp)
bool CommitBucket(const BucketContentType type, const shash::Any &id, const BucketHandle handle, const std::string &name="")
ObjectPack * current_pack_
size_t GetNoObjects() const
bool Initialize(const std::string &api_url, const std::string &session_token, const std::string &key_id, const std::string &secret, uint64_t max_pack_size=ObjectPack::kDefaultLimit, uint64_t max_queue_size=10)
std::string GenerateString() const
virtual bool FinalizeDerived()
int64_t NumJobsSubmitted() const
size_t RecvCB(void *buffer, size_t size, size_t nmemb, void *userp)
const uint32_t kMaxNumJobs
unsigned ProduceNext(const unsigned buf_size, unsigned char *buf)
const JSON * root() const