gstcurlbasesink.c 41.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* GStreamer
 * Copyright (C) 2011 Axis Communications <dev-gstreamer@axis.com>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
Tim-Philipp Müller's avatar
Tim-Philipp Müller committed
16
17
 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
 * Boston, MA 02110-1301, USA.
18
19
 */

20
21
/**
 * SECTION:element-curlsink
22
 * @title: curlsink
23
24
25
26
27
28
 * @short_description: sink that uploads data to a server using libcurl
 * @see_also:
 *
 * This is a network sink that uses libcurl as a client to upload data to
 * a server (e.g. a HTTP/FTP server).
 *
29
 * ## Example launch line (upload a JPEG file to an HTTP server)
30
 * |[
31
 * gst-launch-1.0 filesrc location=image.jpg ! jpegparse ! curlsink  \
32
33
34
35
36
37
 *     file-name=image.jpg  \
 *     location=http://192.168.0.1:8080/cgi-bin/patupload.cgi/  \
 *     user=test passwd=test  \
 *     content-type=image/jpeg  \
 *     use-content-length=false
 * ]|
38
 *
39
40
41
42
43
44
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

45
46
47
48
#include <curl/curl.h>
#include <string.h>
#include <stdio.h>

LRN's avatar
LRN committed
49
#if HAVE_SYS_SOCKET_H
50
#include <sys/socket.h>
LRN's avatar
LRN committed
51
#endif
52
#include <sys/types.h>
LRN's avatar
LRN committed
53
#if HAVE_NETINET_IN_H
54
#include <netinet/in.h>
LRN's avatar
LRN committed
55
#endif
56
#include <unistd.h>
LRN's avatar
LRN committed
57
#if HAVE_NETINET_IP_H
58
#include <netinet/ip.h>
LRN's avatar
LRN committed
59
60
#endif
#if HAVE_NETINET_TCP_H
61
#include <netinet/tcp.h>
LRN's avatar
LRN committed
62
#endif
63
64
65
#include <sys/stat.h>
#include <fcntl.h>

66
#include "gstcurlbasesink.h"
67

68
/* Default values */
69
#define GST_CAT_DEFAULT                gst_curl_base_sink_debug
70
71
72
73
74
75
#define DEFAULT_URL                    "localhost:5555"
#define DEFAULT_TIMEOUT                30
#define DEFAULT_QOS_DSCP               0

#define DSCP_MIN                       0
#define DSCP_MAX                       63
76

77
78
79
80
81
82
83

/* Plugin specific settings */
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
    GST_PAD_SINK,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS_ANY);

84
GST_DEBUG_CATEGORY_STATIC (gst_curl_base_sink_debug);
85
86
87
88
89
90
91
92
93

enum
{
  PROP_0,
  PROP_LOCATION,
  PROP_USER_NAME,
  PROP_USER_PASSWD,
  PROP_FILE_NAME,
  PROP_TIMEOUT,
94
  PROP_QOS_DSCP
95
96
97
};

/* Object class function declarations */
98
99
static void gst_curl_base_sink_finalize (GObject * gobject);
static void gst_curl_base_sink_set_property (GObject * object, guint prop_id,
100
    const GValue * value, GParamSpec * pspec);
101
static void gst_curl_base_sink_get_property (GObject * object, guint prop_id,
102
103
104
    GValue * value, GParamSpec * pspec);

/* BaseSink class function declarations */
105
static GstFlowReturn gst_curl_base_sink_render (GstBaseSink * bsink,
106
    GstBuffer * buf);
107
108
109
110
111
112
static gboolean gst_curl_base_sink_event (GstBaseSink * bsink,
    GstEvent * event);
static gboolean gst_curl_base_sink_start (GstBaseSink * bsink);
static gboolean gst_curl_base_sink_stop (GstBaseSink * bsink);
static gboolean gst_curl_base_sink_unlock (GstBaseSink * bsink);
static gboolean gst_curl_base_sink_unlock_stop (GstBaseSink * bsink);
113
114

/* private functions */
115
116
117
118
119
120
121

static gboolean gst_curl_base_sink_transfer_setup_unlocked
    (GstCurlBaseSink * sink);
static gboolean gst_curl_base_sink_transfer_start_unlocked
    (GstCurlBaseSink * sink);
static void gst_curl_base_sink_transfer_cleanup (GstCurlBaseSink * sink);
static size_t gst_curl_base_sink_transfer_read_cb (void *ptr, size_t size,
122
    size_t nmemb, void *stream);
123
static size_t gst_curl_base_sink_transfer_write_cb (void *ptr, size_t size,
124
    size_t nmemb, void *stream);
125
126
static size_t gst_curl_base_sink_transfer_data_buffer (GstCurlBaseSink * sink,
    void *curl_ptr, size_t block_size, guint * last_chunk);
127
128
129
130
#ifndef GST_DISABLE_GST_DEBUG
static int gst_curl_base_sink_debug_cb (CURL * handle, curl_infotype type,
    char *data, size_t size, void *clientp);
#endif
131
static int gst_curl_base_sink_transfer_socket_cb (void *clientp,
132
    curl_socket_t curlfd, curlsocktype purpose);
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
static gpointer gst_curl_base_sink_transfer_thread_func (gpointer data);
static gint gst_curl_base_sink_setup_dscp_unlocked (GstCurlBaseSink * sink);
static CURLcode gst_curl_base_sink_transfer_check (GstCurlBaseSink * sink);

static gboolean gst_curl_base_sink_wait_for_data_unlocked
    (GstCurlBaseSink * sink);
static void gst_curl_base_sink_new_file_notify_unlocked
    (GstCurlBaseSink * sink);
static void gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked
    (GstCurlBaseSink * sink);
static void gst_curl_base_sink_data_sent_notify (GstCurlBaseSink * sink);
static void gst_curl_base_sink_wait_for_response (GstCurlBaseSink * sink);
static void gst_curl_base_sink_got_response_notify (GstCurlBaseSink * sink);

static void handle_transfer (GstCurlBaseSink * sink);
static size_t transfer_data_buffer (void *curl_ptr, TransferBuffer * buf,
    size_t max_bytes_to_send, guint * last_chunk);

#define parent_class gst_curl_base_sink_parent_class
G_DEFINE_TYPE (GstCurlBaseSink, gst_curl_base_sink, GST_TYPE_BASE_SINK);
153

154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
static gboolean
gst_curl_base_sink_default_has_buffered_data_unlocked (GstCurlBaseSink * sink)
{
  return sink->transfer_buf->len > 0;
}

static gboolean
gst_curl_base_sink_has_buffered_data_unlocked (GstCurlBaseSink * sink)
{
  GstCurlBaseSinkClass *klass;
  gboolean res = FALSE;

  klass = GST_CURL_BASE_SINK_GET_CLASS (sink);

  if (klass->has_buffered_data_unlocked)
    res = klass->has_buffered_data_unlocked (sink);

  return res;
}

174
static void
175
gst_curl_base_sink_class_init (GstCurlBaseSinkClass * klass)
176
{
177
178
179
  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
  GstBaseSinkClass *gstbasesink_class = (GstBaseSinkClass *) klass;
  GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
180

181
182
183
  GST_DEBUG_CATEGORY_INIT (gst_curl_base_sink_debug, "curlbasesink", 0,
      "curl base sink element");
  GST_DEBUG_OBJECT (klass, "class_init");
184

185
  gst_element_class_set_static_metadata (element_class,
186
      "Curl base sink",
187
      "Sink/Network",
188
189
      "Upload data over the network to a server using libcurl",
      "Patricia Muscalu <patricia@axis.com>");
190

191
192
193
194
195
  gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_curl_base_sink_event);
  gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_curl_base_sink_render);
  gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_curl_base_sink_start);
  gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_curl_base_sink_stop);
  gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_curl_base_sink_unlock);
196
  gstbasesink_class->unlock_stop =
197
198
199
200
201
      GST_DEBUG_FUNCPTR (gst_curl_base_sink_unlock_stop);
  gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_curl_base_sink_finalize);

  gobject_class->set_property = gst_curl_base_sink_set_property;
  gobject_class->get_property = gst_curl_base_sink_get_property;
202

203
204
205
  klass->handle_transfer = handle_transfer;
  klass->transfer_read_cb = gst_curl_base_sink_transfer_read_cb;
  klass->transfer_data_buffer = gst_curl_base_sink_transfer_data_buffer;
206
207
  klass->has_buffered_data_unlocked =
      gst_curl_base_sink_default_has_buffered_data_unlocked;
208
209
210
211
212
213
214
215
216
217

  /* FIXME: check against souphttpsrc and use same names for same properties */
  g_object_class_install_property (gobject_class, PROP_LOCATION,
      g_param_spec_string ("location", "Location",
          "URI location to write to", NULL,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_USER_NAME,
      g_param_spec_string ("user", "User name",
          "User name to use for server authentication", NULL,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
218
  g_object_class_install_property (gobject_class, PROP_USER_PASSWD,
219
220
221
      g_param_spec_string ("passwd", "User password",
          "User password to use for server authentication", NULL,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
222
  g_object_class_install_property (gobject_class, PROP_FILE_NAME,
223
224
225
226
227
228
229
230
      g_param_spec_string ("file-name", "Base file name",
          "The base file name for the uploaded images", NULL,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_TIMEOUT,
      g_param_spec_int ("timeout", "Timeout",
          "Number of seconds waiting to write before timeout",
          0, G_MAXINT, DEFAULT_TIMEOUT,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
231
  g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
232
233
234
235
236
      g_param_spec_int ("qos-dscp",
          "QoS diff srv code point",
          "Quality of Service, differentiated services code point (0 default)",
          DSCP_MIN, DSCP_MAX, DEFAULT_QOS_DSCP,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
237

238
  gst_element_class_add_static_pad_template (element_class, &sinktemplate);
239
240
241
}

static void
242
gst_curl_base_sink_init (GstCurlBaseSink * sink)
243
{
244
245
  sink->transfer_buf = g_malloc (sizeof (TransferBuffer));
  sink->transfer_cond = g_malloc (sizeof (TransferCondition));
Wim Taymans's avatar
Wim Taymans committed
246
  g_cond_init (&sink->transfer_cond->cond);
247
248
  sink->transfer_cond->data_sent = FALSE;
  sink->transfer_cond->data_available = FALSE;
249
  sink->transfer_cond->wait_for_response = FALSE;
250
251
252
253
254
  sink->timeout = DEFAULT_TIMEOUT;
  sink->qos_dscp = DEFAULT_QOS_DSCP;
  sink->url = g_strdup (DEFAULT_URL);
  sink->transfer_thread_close = FALSE;
  sink->new_file = TRUE;
255
  sink->error = NULL;
256
257
  sink->flow_ret = GST_FLOW_OK;
  sink->is_live = FALSE;
258
259
260
}

static void
261
gst_curl_base_sink_finalize (GObject * gobject)
262
{
263
  GstCurlBaseSink *this = GST_CURL_BASE_SINK (gobject);
264
265

  GST_DEBUG ("finalizing curlsink");
266
267
  if (this->transfer_thread != NULL) {
    g_thread_join (this->transfer_thread);
268
269
  }

270
  gst_curl_base_sink_transfer_cleanup (this);
Wim Taymans's avatar
Wim Taymans committed
271
  g_cond_clear (&this->transfer_cond->cond);
272
273
274
275
276
277
278
279
280
281
  g_free (this->transfer_cond);
  g_free (this->transfer_buf);

  g_free (this->url);
  g_free (this->user);
  g_free (this->passwd);
  g_free (this->file_name);
  if (this->fdset != NULL) {
    gst_poll_free (this->fdset);
    this->fdset = NULL;
282
283
284
285
  }
  G_OBJECT_CLASS (parent_class)->finalize (gobject);
}

286
287
288
289
290
291
292
293
void
gst_curl_base_sink_transfer_thread_notify_unlocked (GstCurlBaseSink * sink)
{
  GST_LOG ("more data to send");

  sink->transfer_cond->data_available = TRUE;
  sink->transfer_cond->data_sent = FALSE;
  sink->transfer_cond->wait_for_response = TRUE;
Wim Taymans's avatar
Wim Taymans committed
294
  g_cond_signal (&sink->transfer_cond->cond);
295
296
297
298
299
300
301
302
}

void
gst_curl_base_sink_transfer_thread_close (GstCurlBaseSink * sink)
{
  GST_OBJECT_LOCK (sink);
  GST_LOG_OBJECT (sink, "setting transfer thread close flag");
  sink->transfer_thread_close = TRUE;
Wim Taymans's avatar
Wim Taymans committed
303
  g_cond_signal (&sink->transfer_cond->cond);
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
  GST_OBJECT_UNLOCK (sink);

  if (sink->transfer_thread != NULL) {
    GST_LOG_OBJECT (sink, "waiting for transfer thread to finish");
    g_thread_join (sink->transfer_thread);
    sink->transfer_thread = NULL;
  }
}

void
gst_curl_base_sink_set_live (GstCurlBaseSink * sink, gboolean live)
{
  g_return_if_fail (GST_IS_CURL_BASE_SINK (sink));

  GST_OBJECT_LOCK (sink);
  sink->is_live = live;
  GST_OBJECT_UNLOCK (sink);
}

gboolean
gst_curl_base_sink_is_live (GstCurlBaseSink * sink)
{
  gboolean result;

  g_return_val_if_fail (GST_IS_CURL_BASE_SINK (sink), FALSE);

  GST_OBJECT_LOCK (sink);
  result = sink->is_live;
  GST_OBJECT_UNLOCK (sink);

  return result;
}

337
static GstFlowReturn
338
gst_curl_base_sink_render (GstBaseSink * bsink, GstBuffer * buf)
339
{
340
  GstCurlBaseSink *sink;
341
  GstMapInfo map;
342
343
344
  guint8 *data;
  size_t size;
  GstFlowReturn ret;
345
  gchar *error;
346
347
348

  GST_LOG ("enter render");

349
  sink = GST_CURL_BASE_SINK (bsink);
350

351
352
353
  gst_buffer_map (buf, &map, GST_MAP_READ);
  data = map.data;
  size = map.size;
354

355
356
357
358
359
360
361
  if (size == 0) {
    gst_buffer_unmap (buf, &map);
    return GST_FLOW_OK;
  }

  GST_OBJECT_LOCK (sink);

362
363
  /* check if the transfer thread has encountered problems while the
   * pipeline thread was working elsewhere */
364
  if (sink->flow_ret != GST_FLOW_OK) {
365
366
367
    goto done;
  }

368
  g_assert (sink->transfer_cond->data_available == FALSE);
369
370

  /* if there is no transfer thread created, lets create one */
371
  if (sink->transfer_thread == NULL) {
372
    if (!gst_curl_base_sink_transfer_start_unlocked (sink)) {
373
      sink->flow_ret = GST_FLOW_ERROR;
374
375
376
377
378
      goto done;
    }
  }

  /* make data available for the transfer thread and notify */
379
380
381
  sink->transfer_buf->ptr = data;
  sink->transfer_buf->len = size;
  sink->transfer_buf->offset = 0;
382
  gst_curl_base_sink_transfer_thread_notify_unlocked (sink);
383
384
385

  /* wait for the transfer thread to send the data. This will be notified
   * either when transfer is completed by the curl read callback or by
Sebastian Rasmussen's avatar
Sebastian Rasmussen committed
386
   * the thread function if an error has occurred. */
387
  gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked (sink);
388
389

done:
390
391
392
393
394
  gst_buffer_unmap (buf, &map);

  /* Hand over error from transfer thread to streaming thread */
  error = sink->error;
  sink->error = NULL;
395
  ret = sink->flow_ret;
396
  GST_OBJECT_UNLOCK (sink);
397
398
399
400
401
402

  if (error != NULL) {
    GST_ERROR_OBJECT (sink, "%s", error);
    GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("%s", error), (NULL));
    g_free (error);
  }
403
404
405
406
407
408
409

  GST_LOG ("exit render");

  return ret;
}

static gboolean
410
gst_curl_base_sink_event (GstBaseSink * bsink, GstEvent * event)
411
{
412
413
  GstCurlBaseSink *sink = GST_CURL_BASE_SINK (bsink);
  GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
414
415
416
417

  switch (event->type) {
    case GST_EVENT_EOS:
      GST_DEBUG_OBJECT (sink, "received EOS");
418
419
420
421
422
423
424
425
      gst_curl_base_sink_transfer_thread_close (sink);
      gst_curl_base_sink_wait_for_response (sink);
      break;
    case GST_EVENT_CAPS:
      if (klass->set_mime_type) {
        GstCaps *caps;
        gst_event_parse_caps (event, &caps);
        klass->set_mime_type (sink, caps);
426
427
428
429
430
      }
      break;
    default:
      break;
  }
431
432

  return GST_BASE_SINK_CLASS (parent_class)->event (bsink, event);
433
434
435
}

static gboolean
436
gst_curl_base_sink_start (GstBaseSink * bsink)
437
{
438
439
440
  GstCurlBaseSink *sink;

  sink = GST_CURL_BASE_SINK (bsink);
441

442
443
444
445
446
447
448
  /* reset flags */
  sink->transfer_cond->data_sent = FALSE;
  sink->transfer_cond->data_available = FALSE;
  sink->transfer_cond->wait_for_response = FALSE;
  sink->transfer_thread_close = FALSE;
  sink->new_file = TRUE;
  sink->flow_ret = GST_FLOW_OK;
449

450
  if ((sink->fdset = gst_poll_new (TRUE)) == NULL) {
451
452
453
454
455
    GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ_WRITE,
        ("gst_poll_new failed: %s", g_strerror (errno)), (NULL));
    return FALSE;
  }

456
457
  gst_poll_fd_init (&sink->fd);

458
459
460
461
  return TRUE;
}

static gboolean
462
gst_curl_base_sink_stop (GstBaseSink * bsink)
463
{
464
  GstCurlBaseSink *sink = GST_CURL_BASE_SINK (bsink);
465

466
  gst_curl_base_sink_transfer_thread_close (sink);
467
468
469
  if (sink->fdset != NULL) {
    gst_poll_free (sink->fdset);
    sink->fdset = NULL;
470
471
472
473
474
475
  }

  return TRUE;
}

static gboolean
476
gst_curl_base_sink_unlock (GstBaseSink * bsink)
477
{
478
  GstCurlBaseSink *sink;
479

480
  sink = GST_CURL_BASE_SINK (bsink);
481
482

  GST_LOG_OBJECT (sink, "Flushing");
483
  gst_poll_set_flushing (sink->fdset, TRUE);
484
485
486
487
488

  return TRUE;
}

static gboolean
489
gst_curl_base_sink_unlock_stop (GstBaseSink * bsink)
490
{
491
  GstCurlBaseSink *sink;
492

493
  sink = GST_CURL_BASE_SINK (bsink);
494
495

  GST_LOG_OBJECT (sink, "No longer flushing");
496
  gst_poll_set_flushing (sink->fdset, FALSE);
497
498
499
500
501

  return TRUE;
}

static void
502
gst_curl_base_sink_set_property (GObject * object, guint prop_id,
503
504
    const GValue * value, GParamSpec * pspec)
{
505
  GstCurlBaseSink *sink;
506
507
  GstState cur_state;

508
509
  g_return_if_fail (GST_IS_CURL_BASE_SINK (object));
  sink = GST_CURL_BASE_SINK (object);
510
511
512
513
514
515
516

  gst_element_get_state (GST_ELEMENT (sink), &cur_state, NULL, 0);
  if (cur_state != GST_STATE_PLAYING && cur_state != GST_STATE_PAUSED) {
    GST_OBJECT_LOCK (sink);

    switch (prop_id) {
      case PROP_LOCATION:
517
518
519
        g_free (sink->url);
        sink->url = g_value_dup_string (value);
        GST_DEBUG_OBJECT (sink, "url set to %s", sink->url);
520
521
        break;
      case PROP_USER_NAME:
522
523
524
        g_free (sink->user);
        sink->user = g_value_dup_string (value);
        GST_DEBUG_OBJECT (sink, "user set to %s", sink->user);
525
526
        break;
      case PROP_USER_PASSWD:
527
528
529
        g_free (sink->passwd);
        sink->passwd = g_value_dup_string (value);
        GST_DEBUG_OBJECT (sink, "passwd set to %s", sink->passwd);
530
531
        break;
      case PROP_FILE_NAME:
532
533
534
        g_free (sink->file_name);
        sink->file_name = g_value_dup_string (value);
        GST_DEBUG_OBJECT (sink, "file_name set to %s", sink->file_name);
535
536
        break;
      case PROP_TIMEOUT:
537
538
        sink->timeout = g_value_get_int (value);
        GST_DEBUG_OBJECT (sink, "timeout set to %d", sink->timeout);
539
540
        break;
      case PROP_QOS_DSCP:
541
        sink->qos_dscp = g_value_get_int (value);
542
        gst_curl_base_sink_setup_dscp_unlocked (sink);
543
        GST_DEBUG_OBJECT (sink, "dscp set to %d", sink->qos_dscp);
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
        break;
      default:
        GST_DEBUG_OBJECT (sink, "invalid property id %d", prop_id);
        break;
    }

    GST_OBJECT_UNLOCK (sink);

    return;
  }

  /* in PLAYING or PAUSED state */
  GST_OBJECT_LOCK (sink);

  switch (prop_id) {
    case PROP_FILE_NAME:
560
561
562
      g_free (sink->file_name);
      sink->file_name = g_value_dup_string (value);
      GST_DEBUG_OBJECT (sink, "file_name set to %s", sink->file_name);
563
      gst_curl_base_sink_new_file_notify_unlocked (sink);
564
565
      break;
    case PROP_TIMEOUT:
566
567
      sink->timeout = g_value_get_int (value);
      GST_DEBUG_OBJECT (sink, "timeout set to %d", sink->timeout);
568
569
      break;
    case PROP_QOS_DSCP:
570
      sink->qos_dscp = g_value_get_int (value);
571
      gst_curl_base_sink_setup_dscp_unlocked (sink);
572
      GST_DEBUG_OBJECT (sink, "dscp set to %d", sink->qos_dscp);
573
574
575
576
577
578
579
580
581
582
      break;
    default:
      GST_WARNING_OBJECT (sink, "cannot set property when PLAYING");
      break;
  }

  GST_OBJECT_UNLOCK (sink);
}

static void
583
gst_curl_base_sink_get_property (GObject * object, guint prop_id,
584
585
    GValue * value, GParamSpec * pspec)
{
586
  GstCurlBaseSink *sink;
587

588
589
  g_return_if_fail (GST_IS_CURL_BASE_SINK (object));
  sink = GST_CURL_BASE_SINK (object);
590
591
592

  switch (prop_id) {
    case PROP_LOCATION:
593
      g_value_set_string (value, sink->url);
594
595
      break;
    case PROP_USER_NAME:
596
      g_value_set_string (value, sink->user);
597
598
      break;
    case PROP_USER_PASSWD:
599
      g_value_set_string (value, sink->passwd);
600
601
      break;
    case PROP_FILE_NAME:
602
      g_value_set_string (value, sink->file_name);
603
604
      break;
    case PROP_TIMEOUT:
605
      g_value_set_int (value, sink->timeout);
606
607
      break;
    case PROP_QOS_DSCP:
608
      g_value_set_int (value, sink->qos_dscp);
609
610
611
612
613
614
615
616
      break;
    default:
      GST_DEBUG_OBJECT (sink, "invalid property id");
      break;
  }
}

static gboolean
617
gst_curl_base_sink_transfer_set_common_options_unlocked (GstCurlBaseSink * sink)
618
{
619
  GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
620
  CURLcode res;
621

622
#ifndef GST_DISABLE_GST_DEBUG
623
624
625
626
627
628
  res = curl_easy_setopt (sink->curl, CURLOPT_VERBOSE, 1);
  if (res != CURLE_OK) {
    sink->error = g_strdup_printf ("failed to set verbose: %s",
        curl_easy_strerror (res));
    return FALSE;
  }
629
630
631
632
633
634
635
636
637
638
639
640
641
  res = curl_easy_setopt (sink->curl, CURLOPT_DEBUGDATA, sink);
  if (res != CURLE_OK) {
    sink->error = g_strdup_printf ("failed to set debug user_data: %s",
        curl_easy_strerror (res));
    return FALSE;
  }
  res = curl_easy_setopt (sink->curl, CURLOPT_DEBUGFUNCTION,
      gst_curl_base_sink_debug_cb);
  if (res != CURLE_OK) {
    sink->error = g_strdup_printf ("failed to set debug functions: %s",
        curl_easy_strerror (res));
    return FALSE;
  }
642
643
#endif

644
645
646
647
648
649
  res = curl_easy_setopt (sink->curl, CURLOPT_URL, sink->url);
  if (res != CURLE_OK) {
    sink->error = g_strdup_printf ("failed to set URL: %s",
        curl_easy_strerror (res));
    return FALSE;
  }
650

651
652
653
654
655
656
  res = curl_easy_setopt (sink->curl, CURLOPT_CONNECTTIMEOUT, sink->timeout);
  if (res != CURLE_OK) {
    sink->error = g_strdup_printf ("failed to set connection timeout: %s",
        curl_easy_strerror (res));
    return FALSE;
  }
657

Sebastian Rasmussen's avatar
Sebastian Rasmussen committed
658
  /* using signals in a multi-threaded application is dangerous */
659
660
661
662
663
664
  res = curl_easy_setopt (sink->curl, CURLOPT_NOSIGNAL, 1);
  if (res != CURLE_OK) {
    sink->error = g_strdup_printf ("failed to set no signalling: %s",
        curl_easy_strerror (res));
    return FALSE;
  }
665
666

  /* socket settings */
667
668
669
670
671
672
673
  res = curl_easy_setopt (sink->curl, CURLOPT_SOCKOPTDATA, sink);
  if (res != CURLE_OK) {
    sink->error = g_strdup_printf ("failed to set sockopt user data: %s",
        curl_easy_strerror (res));
    return FALSE;
  }
  res = curl_easy_setopt (sink->curl, CURLOPT_SOCKOPTFUNCTION,
674
      gst_curl_base_sink_transfer_socket_cb);
675
676
677
678
679
  if (res != CURLE_OK) {
    sink->error = g_strdup_printf ("failed to set sockopt function: %s",
        curl_easy_strerror (res));
    return FALSE;
  }
680

681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
  res = curl_easy_setopt (sink->curl, CURLOPT_READDATA, sink);
  if (res != CURLE_OK) {
    sink->error = g_strdup_printf ("failed to set read user data: %s",
        curl_easy_strerror (res));
    return FALSE;
  }
  res = curl_easy_setopt (sink->curl, CURLOPT_READFUNCTION,
      klass->transfer_read_cb);
  if (res != CURLE_OK) {
    sink->error = g_strdup_printf ("failed to set read function: %s",
        curl_easy_strerror (res));
    return FALSE;
  }

  res = curl_easy_setopt (sink->curl, CURLOPT_WRITEDATA, sink);
  if (res != CURLE_OK) {
    sink->error = g_strdup_printf ("failed to set write user data: %s",
        curl_easy_strerror (res));
    return FALSE;
  }
  res = curl_easy_setopt (sink->curl, CURLOPT_WRITEFUNCTION,
702
      gst_curl_base_sink_transfer_write_cb);
703
704
705
706
707
  if (res != CURLE_OK) {
    sink->error = g_strdup_printf ("failed to set write function: %s",
        curl_easy_strerror (res));
    return FALSE;
  }
708
709
710
711
712
713
714
715
716
  /* Time out in case transfer speed in bytes per second stay below
   * CURLOPT_LOW_SPEED_LIMIT during CURLOPT_LOW_SPEED_TIME */
  res = curl_easy_setopt (sink->curl, CURLOPT_LOW_SPEED_LIMIT, 1L);
  if (res != CURLE_OK) {
    sink->error = g_strdup_printf ("failed to set low speed limit: %s",
        curl_easy_strerror (res));
    return FALSE;
  }
  res = curl_easy_setopt (sink->curl, CURLOPT_LOW_SPEED_TIME,
Sam Hurst's avatar
Sam Hurst committed
717
      (long) sink->timeout);
718
719
720
721
722
  if (res != CURLE_OK) {
    sink->error = g_strdup_printf ("failed to set low speed time: %s",
        curl_easy_strerror (res));
    return FALSE;
  }
723

724
  GST_LOG ("common options set");
725
726
727
728
729
730
731
  return TRUE;
}

static gboolean
gst_curl_base_sink_transfer_set_options_unlocked (GstCurlBaseSink * sink)
{
  GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
732
  CURLcode res;
733

734
735
736
  if (!gst_curl_base_sink_transfer_set_common_options_unlocked (sink)) {
    return FALSE;
  }
737
738

  /* authentication settings */
739
  if (sink->user != NULL && strlen (sink->user)) {
740
741
742
743
744
745
746
747
748
749
750
751
    res = curl_easy_setopt (sink->curl, CURLOPT_USERNAME, sink->user);
    if (res != CURLE_OK) {
      sink->error = g_strdup_printf ("failed to set user name: %s",
          curl_easy_strerror (res));
      return FALSE;
    }
    res = curl_easy_setopt (sink->curl, CURLOPT_PASSWORD, sink->passwd);
    if (res != CURLE_OK) {
      sink->error = g_strdup_printf ("failed to set password: %s",
          curl_easy_strerror (res));
      return FALSE;
    }
752
753
  }

754
  if (klass->set_options_unlocked) {
755
756
757
    return klass->set_options_unlocked (sink);
  } else {
    return FALSE;
758
  }
759
}
760

761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
static size_t
transfer_data_buffer (void *curl_ptr, TransferBuffer * buf,
    size_t max_bytes_to_send, guint * last_chunk)
{
  guint buf_len = buf->len;
  size_t bytes_to_send = MIN (max_bytes_to_send, buf->len);

  memcpy ((guint8 *) curl_ptr, buf->ptr + buf->offset, bytes_to_send);
  buf->offset = buf->offset + bytes_to_send;
  buf->len = buf->len - bytes_to_send;

  /* the last data chunk */
  if (bytes_to_send == buf_len) {
    buf->offset = 0;
    buf->len = 0;
    *last_chunk = 1;
  }
778

779
  GST_LOG ("sent : %" G_GSIZE_FORMAT, bytes_to_send);
780

781
  return bytes_to_send;
782
783
784
}

static size_t
785
786
gst_curl_base_sink_transfer_data_buffer (GstCurlBaseSink * sink,
    void *curl_ptr, size_t block_size, guint * last_chunk)
787
788
789
{
  TransferBuffer *buffer;

790
791
792
793
794
795
  buffer = sink->transfer_buf;
  GST_LOG ("write buf len=%" G_GSIZE_FORMAT ", offset=%" G_GSIZE_FORMAT,
      buffer->len, buffer->offset);

  if (buffer->len <= 0) {
    GST_WARNING ("got zero- or negative-length buffer");
796
797
798
799

    return 0;
  }

800
  /* more data in buffer(s) */
801
  return transfer_data_buffer (curl_ptr, buffer, block_size, last_chunk);
802
}
803

804
805
806
807
808
809
810
811
812
static size_t
gst_curl_base_sink_transfer_read_cb (void *curl_ptr, size_t size, size_t nmemb,
    void *stream)
{
  GstCurlBaseSink *sink;
  GstCurlBaseSinkClass *klass;
  size_t max_bytes_to_send;
  size_t bytes_to_send;
  guint last_chunk = 0;
813

814
815
  sink = (GstCurlBaseSink *) stream;
  klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
816

817
  max_bytes_to_send = size * nmemb;
818

819
820
821
822
  /* wait for data to come available, if new file or thread close is set
   * then zero will be returned to indicate end of current transfer */
  GST_OBJECT_LOCK (sink);
  if (gst_curl_base_sink_wait_for_data_unlocked (sink) == FALSE) {
823
824
825
826
827
828
829
830
831
832

    if (gst_curl_base_sink_has_buffered_data_unlocked (sink) &&
        sink->transfer_thread_close) {
      GST_WARNING_OBJECT (sink,
          "discarding render data due to thread close flag");

      GST_OBJECT_UNLOCK (sink);
      return CURL_READFUNC_ABORT;
    }

833
834
    if (klass->flush_data_unlocked) {
      bytes_to_send = klass->flush_data_unlocked (sink, curl_ptr,
835
          max_bytes_to_send, sink->new_file, sink->transfer_thread_close);
836
837

      GST_OBJECT_UNLOCK (sink);
838
839

      return bytes_to_send;
840
841
    }

842
843
    GST_OBJECT_UNLOCK (sink);
    GST_LOG ("returning 0, no more data to send in this file");
844
845
846

    return 0;
  }
847
848
849
850
851
852
853
854
855
856
857
858

  GST_OBJECT_UNLOCK (sink);

  bytes_to_send = klass->transfer_data_buffer (sink, curl_ptr,
      max_bytes_to_send, &last_chunk);

  /* the last data chunk */
  if (last_chunk) {
    gst_curl_base_sink_data_sent_notify (sink);
  }

  return bytes_to_send;
859
860
861
}

static size_t
862
gst_curl_base_sink_transfer_write_cb (void G_GNUC_UNUSED * ptr, size_t size,
863
864
    size_t nmemb, void G_GNUC_UNUSED * stream)
{
865
866
  GstCurlBaseSink *sink;
  GstCurlBaseSinkClass *klass;
867
868
  size_t realsize = size * nmemb;

869
870
871
872
873
874
875
876
877
878
879
880
  sink = (GstCurlBaseSink *) stream;
  klass = GST_CURL_BASE_SINK_GET_CLASS (sink);

  if (klass->transfer_verify_response_code) {
    if (!klass->transfer_verify_response_code (sink)) {
      GST_DEBUG_OBJECT (sink, "response error");
      GST_OBJECT_LOCK (sink);
      sink->flow_ret = GST_FLOW_ERROR;
      GST_OBJECT_UNLOCK (sink);
    }
  }

881
  GST_DEBUG ("response %s", (gchar *) ptr);
882

883
884
885
  return realsize;
}

886
887
CURLcode
gst_curl_base_sink_transfer_check (GstCurlBaseSink * sink)
888
889
890
891
892
893
894
895
896
{
  CURLcode code = CURLE_OK;
  CURL *easy;
  CURLMsg *msg;
  gint msgs_left;
  gchar *eff_url = NULL;

  do {
    easy = NULL;
897
    while ((msg = curl_multi_info_read (sink->multi_handle, &msgs_left))) {
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
      if (msg->msg == CURLMSG_DONE) {
        easy = msg->easy_handle;
        code = msg->data.result;
        break;
      }
    }
    if (easy) {
      curl_easy_getinfo (easy, CURLINFO_EFFECTIVE_URL, &eff_url);
      GST_DEBUG ("transfer done %s (%s-%d)\n", eff_url,
          curl_easy_strerror (code), code);
    }
  } while (easy);

  return code;
}

914
915
static void
handle_transfer (GstCurlBaseSink * sink)
916
{
917
  GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
918
  gint retval;
919
  gint activated_fds;
920
921
922
923
924
925
  gint running_handles;
  gint timeout;
  CURLMcode m_code;
  CURLcode e_code;

  GST_OBJECT_LOCK (sink);
926
  timeout = sink->timeout;
927
928
  GST_OBJECT_UNLOCK (sink);

Stefan Sauer's avatar
Stefan Sauer committed
929
930
  GST_DEBUG_OBJECT (sink, "handling transfers");

931
932
933
934
  /* Receiving CURLM_CALL_MULTI_PERFORM means that libcurl may have more data
     available to send or receive - call simply curl_multi_perform before
     poll() on more actions */
  do {
935
    m_code = curl_multi_perform (sink->multi_handle, &running_handles);
936
  } while (m_code == CURLM_CALL_MULTI_PERFORM);
Stefan Sauer's avatar
Stefan Sauer committed
937
  GST_DEBUG_OBJECT (sink, "running handles: %d", running_handles);
938
939

  while (running_handles && (m_code == CURLM_OK)) {
940
941
    if (klass->transfer_prepare_poll_wait) {
      klass->transfer_prepare_poll_wait (sink);
942
943
    }

944
945
    activated_fds = gst_poll_wait (sink->fdset, timeout * GST_SECOND);
    if (G_UNLIKELY (activated_fds == -1)) {
946
947
948
      if (errno == EAGAIN || errno == EINTR) {
        GST_DEBUG_OBJECT (sink, "interrupted by signal");
      } else if (errno == EBUSY) {
949
950
        GST_DEBUG_OBJECT (sink, "poll stopped");
        retval = GST_FLOW_EOS;
951
952
953
954
955
956
957

        GST_OBJECT_LOCK (sink);
        if (gst_curl_base_sink_has_buffered_data_unlocked (sink))
          GST_WARNING_OBJECT (sink,
              "discarding render data due to thread close flag");
        GST_OBJECT_UNLOCK (sink);

958
        goto fail;
959
      } else {
960
        sink->error = g_strdup_printf ("poll failed: %s", g_strerror (errno));
961
962
        retval = GST_FLOW_ERROR;
        goto fail;
963
      }
964
    } else if (G_UNLIKELY (activated_fds == 0)) {
965
966
      sink->error = g_strdup_printf ("poll timed out after %" GST_TIME_FORMAT,
          GST_TIME_ARGS (timeout * GST_SECOND));
967
968
      retval = GST_FLOW_ERROR;
      goto fail;
969
970
971
972
    }

    /* readable/writable sockets */
    do {
973
      m_code = curl_multi_perform (sink->multi_handle, &running_handles);
974
    } while (m_code == CURLM_CALL_MULTI_PERFORM);
Stefan Sauer's avatar
Stefan Sauer committed
975
    GST_DEBUG_OBJECT (sink, "running handles: %d", running_handles);
976
977
978
  }

  if (m_code != CURLM_OK) {
979
980
    sink->error = g_strdup_printf ("failed to write data: %s",
        curl_multi_strerror (m_code));
981
982
    retval = GST_FLOW_ERROR;
    goto fail;
983
984
  }

985
986
987
  /* problems still might have occurred on individual transfers even when
   * curl_multi_perform returns CURLM_OK */
  if ((e_code = gst_curl_base_sink_transfer_check (sink)) != CURLE_OK) {
988
989
    sink->error = g_strdup_printf ("failed to transfer data: %s",
        curl_easy_strerror (e_code));
990
991
    retval = GST_FLOW_ERROR;
    goto fail;
992
993
  }

994
  gst_curl_base_sink_got_response_notify (sink);
995

996
997
  GST_OBJECT_LOCK (sink);
  if (sink->socket_type == CURLSOCKTYPE_ACCEPT) {
998
999
1000
1001
1002
1003
1004
1005
1006
    /* FIXME: remove this again once we can depend on libcurl > 7.44.0,
     * see https://github.com/bagder/curl/issues/405.
     */
    if (G_UNLIKELY (sink->fd.fd < 0)) {
      sink->error = g_strdup_printf ("unknown error");
      retval = GST_FLOW_ERROR;
      GST_OBJECT_UNLOCK (sink);
      goto fail;
    }
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
    if (!gst_poll_remove_fd (sink->fdset, &sink->fd)) {
      sink->error = g_strdup_printf ("failed to remove fd");
      retval = GST_FLOW_ERROR;
      GST_OBJECT_UNLOCK (sink);
      goto fail;
    }
    sink->fd.fd = -1;
  }
  GST_OBJECT_UNLOCK (sink);

1017
1018
1019
1020
1021
1022
  return;

fail:
  GST_OBJECT_LOCK (sink);
  if (sink->flow_ret == GST_FLOW_OK) {
    sink->flow_ret = retval;
1023
  }
1024
1025
  GST_OBJECT_UNLOCK (sink);
  return;
1026
1027
}

1028
1029
1030
1031
1032
1033
#ifndef GST_DISABLE_GST_DEBUG
static int
gst_curl_base_sink_debug_cb (CURL * handle, curl_infotype type, char *data,
    size_t size, void *clientp)
{
  GstCurlBaseSink *sink = (GstCurlBaseSink *) clientp;
1034
  gchar *msg = NULL;
1035
1036
1037

  switch (type) {
    case CURLINFO_TEXT:
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
    case CURLINFO_HEADER_IN:
    case CURLINFO_HEADER_OUT:
      msg = g_memdup (data, size);
      if (size > 0) {
        msg[size - 1] = '\0';
        g_strchomp (msg);
      }
      break;
    default:
      break;
  }

  switch (type) {
    case CURLINFO_TEXT:
      GST_DEBUG_OBJECT (sink, "%s", msg);
1053
1054
      break;
    case CURLINFO_HEADER_IN:
1055
      GST_DEBUG_OBJECT (sink, "incoming header: %s", msg);
1056
1057
      break;
    case CURLINFO_HEADER_OUT:
1058
      GST_DEBUG_OBJECT (sink, "outgoing header: %s", msg);
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
      break;
    case CURLINFO_DATA_IN:
      GST_MEMDUMP_OBJECT (sink, "incoming data", (guint8 *) data, size);
      break;
    case CURLINFO_DATA_OUT:
      GST_MEMDUMP_OBJECT (sink, "outgoing data", (guint8 *) data, size);
      break;
    case CURLINFO_SSL_DATA_IN:
      GST_MEMDUMP_OBJECT (sink, "incoming ssl data", (guint8 *) data, size);
      break;
    case CURLINFO_SSL_DATA_OUT:
      GST_MEMDUMP_OBJECT (sink, "outgoing ssl data", (guint8 *) data, size);
      break;
    default:
      GST_DEBUG_OBJECT (sink, "unknown debug info type %d", type);
      GST_MEMDUMP_OBJECT (sink, "unknown data", (guint8 *) data, size);
      break;
  }
1077
  g_free (msg);
1078
1079
1080
1081
  return 0;
}
#endif

1082
1083
1084
/* This function gets called by libcurl after the socket() call but before
 * the connect() call. */
static int
1085
gst_curl_base_sink_transfer_socket_cb (void *clientp, curl_socket_t curlfd,
1086
    curlsocktype socket_type)
1087
{
1088
  GstCurlBaseSink *sink;
1089
1090
  gboolean ret = TRUE;

1091
  sink = (GstCurlBaseSink *) clientp;
1092
1093
1094
1095
1096
1097
1098

  g_assert (sink);

  if (curlfd < 0) {
    /* signal an unrecoverable error to the library which will close the socket
       and return CURLE_COULDNT_CONNECT
     */
Stefan Sauer's avatar
Stefan Sauer committed
1099
    GST_DEBUG_OBJECT (sink, "no curlfd");
1100
1101
1102
    return 1;
  }

1103
1104
  GST_OBJECT_LOCK (sink);
  sink->socket_type = socket_type;
1105

1106
1107
1108
1109
1110
1111
1112
1113
1114
  if (sink->fd.fd != curlfd) {
    if (sink->fd.fd > 0 && sink->socket_type != CURLSOCKTYPE_ACCEPT) {
      ret &= gst_poll_remove_fd (sink->fdset, &sink->fd);
    }
    sink->fd.fd = curlfd;
    ret &= gst_poll_add_fd (sink->fdset, &sink->fd);
    ret &= gst_poll_fd_ctl_write (sink->fdset, &sink->fd, TRUE);
    ret &= gst_poll_fd_ctl_read (sink->fdset, &sink->fd, TRUE);
  }
Stefan Sauer's avatar
Stefan Sauer committed
1115
  GST_DEBUG_OBJECT (sink, "fd: %d", sink->fd.fd);
1116
  gst_curl_base_sink_setup_dscp_unlocked (sink);
1117
1118
1119
  GST_OBJECT_UNLOCK (sink);

  /* success */
1120
  return ret ? 0 : 1;
1121
1122
1123
}

static gboolean
1124
gst_curl_base_sink_transfer_start_unlocked (GstCurlBaseSink * sink)
1125
1126
1127
1128
1129
{
  GError *error = NULL;
  gboolean ret = TRUE;

  GST_LOG ("creating transfer thread");
1130
1131
  sink->transfer_thread_close = FALSE;
  sink->new_file = TRUE;
1132
  sink->transfer_thread = g_thread_try_new ("curl-transfer", (GThreadFunc)
Wim Taymans's avatar
Wim Taymans committed
1133
      gst_curl_base_sink_transfer_thread_func, sink, &error);
1134

1135
  if (sink->transfer_thread == NULL || error != NULL) {
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
    ret = FALSE;
    if (error) {
      GST_ERROR_OBJECT (sink, "could not create thread %s", error->message);
      g_error_free (error);
    } else {
      GST_ERROR_OBJECT (sink, "could not create thread for unknown reason");
    }
  }

  return ret;
}

static gpointer
1149
gst_curl_base_sink_transfer_thread_func (gpointer data)
1150
{