10 #include "curl/curl.h"
24 const uint32_t
kMaxNumJobs = std::numeric_limits<uint32_t>::max();
29 size_t SendCB(
void* ptr,
size_t size,
size_t nmemb,
void* userp) {
32 size_t max_chunk_size = size * nmemb;
33 if (max_chunk_size < 1) {
37 size_t current_chunk_size = 0;
38 while (current_chunk_size < max_chunk_size) {
41 const size_t read_size =
42 std::min(max_chunk_size - current_chunk_size,
44 current_chunk_size += read_size;
47 payload->
index += read_size;
50 const size_t max_read_size = max_chunk_size - current_chunk_size;
52 max_read_size, static_cast<unsigned char*>(ptr) + current_chunk_size);
53 current_chunk_size += nbytes;
61 return current_chunk_size;
64 size_t RecvCB(
void* buffer,
size_t size,
size_t nmemb,
void* userp) {
65 std::string* my_buffer =
static_cast<std::string*
>(userp);
67 if (size * nmemb < 1) {
71 *my_buffer =
static_cast<char*
>(buffer);
73 return my_buffer->size();
88 initialized_(false) {}
93 const std::string& session_token,
94 const std::string& key_id,
95 const std::string& secret,
96 uint64_t max_pack_size,
97 uint64_t max_queue_size) {
101 pthread_mutexattr_t attr;
102 if (pthread_mutexattr_init(&attr) ||
103 pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) ||
105 pthread_mutexattr_destroy(&attr)) {
107 "Could not initialize SessionContext lock.");
127 "Could not initialize SessionContext - Existing open object packs.");
139 const std::string& new_root_hash,
159 results = future->
Get() && results;
164 if (old_root_hash.empty() || new_root_hash.empty()) {
167 bool commit_result =
Commit(old_root_hash, new_root_hash, tag);
168 if (!commit_result) {
170 "SessionContext: could not commit session. Aborting.");
200 const std::string& name,
201 const bool force_dispatch) {
206 "Error: Called SessionBaseContext::CommitBucket without an open "
220 if (force_dispatch) {
225 uint64_t new_size = 0;
291 const std::string& new_root_hash,
294 request_input.
Add(
"old_root_hash", old_root_hash);
295 request_input.
Add(
"new_root_hash", new_root_hash);
296 request_input.
Add(
"tag_name", tag.
name());
300 request_input.
Add(
"tag_channel", 0);
323 const std::string json_msg =
325 "\", \"payload_digest\" : \"" + payload_digest.
ToString(
false) +
338 const size_t payload_size =
342 CURL* h_curl = curl_easy_init();
349 std::string header_str = std::string(
"Authorization: ") +
key_id_ +
" " +
350 Base64(hmac.ToString(
false));
351 struct curl_slist* auth_header = NULL;
352 auth_header = curl_slist_append(auth_header, header_str.c_str());
353 header_str = std::string(
"Message-Size: ") +
StringifyInt(json_msg.size());
354 auth_header = curl_slist_append(auth_header, header_str.c_str());
355 curl_easy_setopt(h_curl, CURLOPT_HTTPHEADER, auth_header);
358 curl_easy_setopt(h_curl, CURLOPT_NOPROGRESS, 1L);
359 curl_easy_setopt(h_curl, CURLOPT_USERAGENT,
"cvmfs/" CVMFS_VERSION);
360 curl_easy_setopt(h_curl, CURLOPT_MAXREDIRS, 50L);
361 curl_easy_setopt(h_curl, CURLOPT_CUSTOMREQUEST,
"POST");
362 curl_easy_setopt(h_curl, CURLOPT_URL, (
api_url_ +
"/payloads").c_str());
363 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDS, NULL);
364 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDSIZE_LARGE,
365 static_cast<curl_off_t>(payload_size));
366 curl_easy_setopt(h_curl, CURLOPT_READDATA, &payload);
367 curl_easy_setopt(h_curl, CURLOPT_READFUNCTION,
SendCB);
368 curl_easy_setopt(h_curl, CURLOPT_WRITEFUNCTION,
RecvCB);
369 curl_easy_setopt(h_curl, CURLOPT_WRITEDATA, &reply);
372 CURLcode ret = curl_easy_perform(h_curl);
375 "SessionContext::DoUpload - curl_easy_perform failed: %d",
380 const JSON *reply_status =
382 const bool ok = (reply_status != NULL &&
383 std::string(reply_status->string_value) ==
"ok");
386 "SessionContext::DoUpload - error reply: %s",
390 curl_easy_cleanup(h_curl);
406 "SessionContext: could not submit payload. Aborting.");
static void * UploadLoop(void *data)
UniquePtr< Tube< UploadJob > > upload_jobs_
void Add(const std::string &key, const std::string &val)
struct cvmcache_context * ctx
std::string description() const
bool MakeEndRequest(const std::string &method, const std::string &key_id, const std::string &secret, const std::string &session_token, const std::string &repo_service_url, const std::string &request_payload, CurlBuffer *reply)
std::string session_token_
static JSON * SearchInObject(const JSON *json_object, const std::string &name, const json_type type)
virtual Future< bool > * DispatchObjectPack(ObjectPack *pack)=0
void TransferBucket(const BucketHandle handle, ObjectPack *other)
std::string ToString(const bool with_suffix=false) const
pthread_mutex_t current_pack_mtx_
virtual bool InitializeDerived(uint64_t max_queue_size)
assert((mem||(size==0))&&"Out Of Memory")
uint64_t bytes_dispatched_
void Set(const T &object)
virtual bool Commit(const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)=0
ObjectPack::BucketHandle NewBucket()
virtual Future< bool > * DispatchObjectPack(ObjectPack *pack)
virtual bool Commit(const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)
static JsonDocument * Create(const std::string &text)
uint64_t bytes_committed_
Link * EnqueueFront(ItemT *item)
bool CommitBucket(const ObjectPack::BucketContentType type, const shash::Any &id, const ObjectPack::BucketHandle handle, const std::string &name="", const bool force_dispatch=false)
virtual bool DoUpload(const UploadJob *job)
const std::string * json_message
ObjectPackProducer * pack_serializer
virtual bool FinalizeDerived()=0
void GetDigest(shash::Any *hash)
string StringifyInt(const int64_t value)
std::vector< ObjectPack::BucketHandle > active_handles_
virtual ~SessionContextBase()
bool Finalize(bool commit, const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)
string Base64(const string &data)
void HmacString(const std::string &key, const std::string &content, Any *any_digest)
virtual bool InitializeDerived(uint64_t max_queue_size)=0
static UploadJob terminator_
size_t SendCB(void *ptr, size_t size, size_t nmemb, void *userp)
bool CommitBucket(const BucketContentType type, const shash::Any &id, const BucketHandle handle, const std::string &name="")
ObjectPack * current_pack_
Tube< Future< bool > > upload_results_
size_t GetNoObjects() const
bool Initialize(const std::string &api_url, const std::string &session_token, const std::string &key_id, const std::string &secret, uint64_t max_pack_size=ObjectPack::kDefaultLimit, uint64_t max_queue_size=10)
std::string GenerateString() const
virtual bool FinalizeDerived()
size_t RecvCB(void *buffer, size_t size, size_t nmemb, void *userp)
const uint32_t kMaxNumJobs
unsigned ProduceNext(const unsigned buf_size, unsigned char *buf)
const JSON * root() const
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)