diff --git a/gst/mpegtsdemux/mpegtspacketizer.h b/gst/mpegtsdemux/mpegtspacketizer.h index 0cf83a60db11fdbcfb4aa3b4dcef5285116d2774..1f12548b948f70c0d730fe410b846d502f675014 100644 --- a/gst/mpegtsdemux/mpegtspacketizer.h +++ b/gst/mpegtsdemux/mpegtspacketizer.h @@ -40,7 +40,7 @@ #define MPEGTS_MAX_PACKETSIZE MPEGTS_ATSC_PACKETSIZE #define MPEGTS_AFC_DISCONTINUITY_FLAG 0x80 -#define MPEGTS_AFC_RANDOM_ACCES_FLAGS 0x40 +#define MPEGTS_AFC_RANDOM_ACCESS_FLAG 0x40 #define MPEGTS_AFC_ELEMENTARY_STREAM_PRIORITY 0x20 #define MPEGTS_AFC_PCR_FLAG 0x10 #define MPEGTS_AFC_OPCR_FLAG 0x08 diff --git a/gst/mpegtsdemux/mpegtsparse.c b/gst/mpegtsdemux/mpegtsparse.c index d106b3ac01f15a8d4c0120f471e2eaf9ecaaca78..a253a2172b8ba6df567fb6bdc126ea43975ce017 100644 --- a/gst/mpegtsdemux/mpegtsparse.c +++ b/gst/mpegtsdemux/mpegtsparse.c @@ -39,6 +39,7 @@ #define TABLE_ID_UNSET 0xFF #define RUNNING_STATUS_RUNNING 4 +#define SYNC_BYTE 0x47 GST_DEBUG_CATEGORY_STATIC (mpegts_parse_debug); #define GST_CAT_DEFAULT mpegts_parse_debug @@ -64,6 +65,8 @@ struct _MpegTSParsePad /* the return of the latest push */ GstFlowReturn flow_return; + + MpegTSParse2Adapter ts_adapter; }; static GstStaticPadTemplate src_template = @@ -84,6 +87,7 @@ enum PROP_SET_TIMESTAMPS, PROP_SMOOTHING_LATENCY, PROP_PCR_PID, + PROP_ALIGNMENT, /* FILL ME */ }; @@ -102,6 +106,8 @@ mpegts_parse_push (MpegTSBase * base, MpegTSPacketizerPacket * packet, GstMpegtsSection * section); static void mpegts_parse_inspect_packet (MpegTSBase * base, MpegTSPacketizerPacket * packet); +static GstFlowReturn mpegts_parse_have_buffer (MpegTSBase * base, + GstBuffer * buffer); static MpegTSParsePad *mpegts_parse_create_tspad (MpegTSParse2 * parse, const gchar * name); @@ -125,13 +131,16 @@ static GstFlowReturn drain_pending_buffers (MpegTSParse2 * parse, gboolean drain_all); static void -mpegts_parse_dispose (GObject * object) +mpegts_parse_finalize (GObject * object) { MpegTSParse2 *parse = (MpegTSParse2 *) object; gst_flow_combiner_free (parse->flowcombiner); - GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (object)); + gst_adapter_clear (parse->ts_adapter.adapter); + g_object_unref (parse->ts_adapter.adapter); + + GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (object)); } static void @@ -143,7 +152,7 @@ mpegts_parse_class_init (MpegTSParse2Class * klass) gobject_class->set_property = mpegts_parse_set_property; gobject_class->get_property = mpegts_parse_get_property; - gobject_class->dispose = mpegts_parse_dispose; + gobject_class->finalize = mpegts_parse_finalize; g_object_class_install_property (gobject_class, PROP_SET_TIMESTAMPS, g_param_spec_boolean ("set-timestamps", @@ -159,6 +168,10 @@ mpegts_parse_class_init (MpegTSParse2Class * klass) g_param_spec_int ("pcr-pid", "PID containing PCR", "Set the PID to use for PCR values (-1 for auto)", -1, G_MAXINT, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_ALIGNMENT, + g_param_spec_uint ("alignment", "Alignment", + "Number of packets per buffer (padded with dummy packets on EOS) (0 = auto)", + 0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); element_class = GST_ELEMENT_CLASS (klass); element_class->pad_removed = mpegts_parse_pad_removed; @@ -190,9 +203,8 @@ mpegts_parse_init (MpegTSParse2 * parse) MpegTSBase *base = (MpegTSBase *) parse; base->program_size = sizeof (MpegTSParseProgram); - /* We will only need to handle data/section if we have request pads */ - base->push_data = FALSE; - base->push_section = FALSE; + base->push_data = TRUE; + base->push_section = TRUE; parse->user_pcr_pid = parse->pcr_pid = -1; @@ -207,6 +219,12 @@ mpegts_parse_init (MpegTSParse2 * parse) parse->have_group_id = FALSE; parse->group_id = G_MAXUINT; + + parse->ts_adapter.adapter = gst_adapter_new (); + parse->ts_adapter.packets_in_adapter = 0; + parse->alignment = 0; + parse->is_eos = FALSE; + parse->header = 0; } static void @@ -250,6 +268,11 @@ mpegts_parse_reset (MpegTSBase * base) parse->bytes_since_pcr = 0; parse->pcr_pid = parse->user_pcr_pid; parse->ts_offset = 0; + + gst_adapter_clear (parse->ts_adapter.adapter); + parse->ts_adapter.packets_in_adapter = 0; + parse->is_eos = FALSE; + parse->header = 0; } static void @@ -270,6 +293,9 @@ mpegts_parse_set_property (GObject * object, guint prop_id, case PROP_PCR_PID: parse->pcr_pid = parse->user_pcr_pid = g_value_get_int (value); break; + case PROP_ALIGNMENT: + parse->alignment = g_value_get_uint (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); } @@ -291,6 +317,9 @@ mpegts_parse_get_property (GObject * object, guint prop_id, case PROP_PCR_PID: g_value_set_int (value, parse->pcr_pid); break; + case PROP_ALIGNMENT: + g_value_set_uint (value, parse->alignment); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); } @@ -373,8 +402,52 @@ push_event (MpegTSBase * base, GstEvent * event) } prepare_src_pad (base, parse); } - if (G_UNLIKELY (GST_EVENT_TYPE (event) == GST_EVENT_EOS)) + if (G_UNLIKELY (GST_EVENT_TYPE (event) == GST_EVENT_EOS)) { + parse->is_eos = TRUE; + + if (parse->alignment > 0 && parse->ts_adapter.packets_in_adapter > 0 + && parse->ts_adapter.packets_in_adapter < parse->alignment) { + GstBuffer *buf; + GstMapInfo map; + guint8 *data; + gint missing_packets = + parse->alignment - parse->ts_adapter.packets_in_adapter; + gint i = missing_packets; + gsize packet_size = base->packetizer->packet_size; + + GST_DEBUG_OBJECT (parse, "Adding %d dummy packets", missing_packets); + + buf = gst_buffer_new_and_alloc (missing_packets * packet_size); + gst_buffer_map (buf, &map, GST_MAP_READWRITE); + data = map.data; + + g_assert (packet_size > 0); + + for (; i > 0; i--) { + gint offset; + + if (packet_size > MPEGTS_NORMAL_PACKETSIZE) { + parse->header++; + GST_WRITE_UINT32_BE (data, parse->header); + offset = 4; + } else { + offset = 0; + } + GST_WRITE_UINT8 (data + offset, SYNC_BYTE); + /* null packet PID */ + GST_WRITE_UINT16_BE (data + offset + 1, 0x1FFF); + /* no adaptation field exists | continuity counter undefined */ + GST_WRITE_UINT8 (data + offset + 3, 0x10); + /* payload */ + memset (data + offset + 4, 0, MPEGTS_NORMAL_PACKETSIZE - 4); + data += packet_size; + } + gst_buffer_unmap (buf, &map); + gst_adapter_push (parse->ts_adapter.adapter, buf); + parse->ts_adapter.packets_in_adapter += missing_packets; + } drain_pending_buffers (parse, TRUE); + } if (G_UNLIKELY (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT)) parse->ts_offset = 0; @@ -409,6 +482,8 @@ mpegts_parse_create_tspad (MpegTSParse2 * parse, const gchar * pad_name) tspad->program = NULL; tspad->pushed = FALSE; tspad->flow_return = GST_FLOW_NOT_LINKED; + tspad->ts_adapter.adapter = gst_adapter_new (); + tspad->ts_adapter.packets_in_adapter = 0; gst_pad_set_element_private (pad, tspad); gst_flow_combiner_add_pad (parse->flowcombiner, pad); @@ -418,6 +493,9 @@ mpegts_parse_create_tspad (MpegTSParse2 * parse, const gchar * pad_name) static void mpegts_parse_destroy_tspad (MpegTSParse2 * parse, MpegTSParsePad * tspad) { + gst_adapter_clear (tspad->ts_adapter.adapter); + g_object_unref (tspad->ts_adapter.adapter); + /* free the wrapper */ g_free (tspad); } @@ -426,7 +504,6 @@ static void mpegts_parse_pad_removed (GstElement * element, GstPad * pad) { MpegTSParsePad *tspad; - MpegTSBase *base = (MpegTSBase *) element; MpegTSParse2 *parse = GST_MPEGTS_PARSE (element); if (gst_pad_get_direction (pad) == GST_PAD_SINK) @@ -438,10 +515,6 @@ mpegts_parse_pad_removed (GstElement * element, GstPad * pad) parse->srcpads = g_list_remove_all (parse->srcpads, pad); } - if (parse->srcpads == NULL) { - base->push_data = FALSE; - base->push_section = FALSE; - } if (GST_ELEMENT_CLASS (parent_class)->pad_removed) GST_ELEMENT_CLASS (parent_class)->pad_removed (element, pad); @@ -451,7 +524,6 @@ static GstPad * mpegts_parse_request_new_pad (GstElement * element, GstPadTemplate * template, const gchar * padname, const GstCaps * caps) { - MpegTSBase *base = (MpegTSBase *) element; MpegTSParse2 *parse; MpegTSParsePad *tspad; MpegTSParseProgram *parseprogram; @@ -484,8 +556,6 @@ mpegts_parse_request_new_pad (GstElement * element, GstPadTemplate * template, pad = tspad->pad; parse->srcpads = g_list_append (parse->srcpads, pad); - base->push_data = TRUE; - base->push_section = TRUE; gst_pad_set_active (pad, TRUE); @@ -516,6 +586,16 @@ mpegts_parse_request_new_pad (GstElement * element, GstPadTemplate * template, return pad; } +static GstBuffer * +mpegts_packet_to_buffer (MpegTSPacketizerPacket * packet) +{ + GstBuffer *buf = + gst_buffer_new_and_alloc (packet->data_end - packet->data_start); + gst_buffer_fill (buf, 0, packet->data_start, + packet->data_end - packet->data_start); + return buf; +} + static void mpegts_parse_release_pad (GstElement * element, GstPad * pad) { @@ -527,6 +607,53 @@ mpegts_parse_release_pad (GstElement * element, GstPad * pad) gst_element_remove_pad (element, pad); } +static GstFlowReturn +empty_adapter_into_pad (MpegTSParse2Adapter * ts_adapter, GstPad * pad) +{ + GstAdapter *adapter = ts_adapter->adapter; + GstBuffer *buf = NULL; + GstClockTime ts = gst_adapter_prev_pts (adapter, NULL); + gsize avail = gst_adapter_available (adapter); + GstFlowReturn ret = GST_FLOW_OK; + + if (avail > 0) + buf = gst_adapter_take_buffer (adapter, avail); + + ts_adapter->packets_in_adapter = 0; + + if (buf) { + GST_BUFFER_PTS (buf) = ts; + ret = gst_pad_push (pad, buf); + } + + return ret; +} + +static GstFlowReturn +enqueue_and_maybe_push_buffer (MpegTSParse2 * parse, GstPad * pad, + MpegTSParse2Adapter * ts_adapter, GstBuffer * buffer) +{ + GstFlowReturn ret = GST_FLOW_OK; + + if (buffer != NULL) { + if (parse->alignment == 1) { + ret = gst_pad_push (pad, buffer); + ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret); + } else { + gst_adapter_push (ts_adapter->adapter, buffer); + ts_adapter->packets_in_adapter++; + + if (ts_adapter->packets_in_adapter == parse->alignment + && ts_adapter->packets_in_adapter > 0) { + ret = empty_adapter_into_pad (ts_adapter, pad); + ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret); + } + } + } + + return ret; +} + static GstFlowReturn mpegts_parse_tspad_push_section (MpegTSParse2 * parse, MpegTSParsePad * tspad, GstMpegtsSection * section, MpegTSPacketizerPacket * packet) @@ -556,12 +683,10 @@ mpegts_parse_tspad_push_section (MpegTSParse2 * parse, MpegTSParsePad * tspad, tspad->program_number, section->table_id); if (to_push) { - GstBuffer *buf = - gst_buffer_new_and_alloc (packet->data_end - packet->data_start); - gst_buffer_fill (buf, 0, packet->data_start, - packet->data_end - packet->data_start); - ret = gst_pad_push (tspad->pad, buf); - ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret); + GstBuffer *buf = mpegts_packet_to_buffer (packet); + ret = + enqueue_and_maybe_push_buffer (parse, tspad->pad, &tspad->ts_adapter, + buf); } GST_LOG_OBJECT (parse, "Returning %s", gst_flow_get_name (ret)); @@ -586,10 +711,7 @@ mpegts_parse_tspad_push (MpegTSParse2 * parse, MpegTSParsePad * tspad, if (bp) { if (packet->pid == bp->pmt_pid || bp->streams == NULL || bp->streams[packet->pid]) { - GstBuffer *buf = - gst_buffer_new_and_alloc (packet->data_end - packet->data_start); - gst_buffer_fill (buf, 0, packet->data_start, - packet->data_end - packet->data_start); + GstBuffer *buf = mpegts_packet_to_buffer (packet); /* push if there's no filter or if the pid is in the filter */ ret = gst_pad_push (tspad->pad, buf); ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret); @@ -620,6 +742,7 @@ mpegts_parse_push (MpegTSBase * base, MpegTSPacketizerPacket * packet, MpegTSParsePad *tspad; GstFlowReturn ret; GList *srcpads; + GstBuffer *buf; GST_OBJECT_LOCK (parse); srcpads = parse->srcpads; @@ -639,6 +762,9 @@ mpegts_parse_push (MpegTSBase * base, MpegTSPacketizerPacket * packet, } GST_OBJECT_UNLOCK (parse); + buf = mpegts_packet_to_buffer (packet); + ret = mpegts_parse_have_buffer (base, buf); + while (pad && !done) { tspad = gst_pad_get_element_private (pad); @@ -842,10 +968,12 @@ drain_pending_buffers (MpegTSParse2 * parse, gboolean drain_all) GST_BUFFER_PTS (buffer) = out_ts + parse->ts_offset; GST_BUFFER_DTS (buffer) = out_ts + parse->ts_offset; if (ret == GST_FLOW_OK) { - ret = gst_pad_push (parse->srcpad, buffer); - ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret); - } else + ret = + enqueue_and_maybe_push_buffer (parse, parse->srcpad, + &parse->ts_adapter, buffer); + } else { gst_buffer_unref (buffer); + } /* Free this list node and move to the next */ p = g_list_previous (l); @@ -853,6 +981,10 @@ drain_pending_buffers (MpegTSParse2 * parse, gboolean drain_all) l = p; } + if (parse->is_eos) { + empty_adapter_into_pad (&parse->ts_adapter, parse->srcpad); + } + parse->pending_buffers = end; parse->bytes_since_pcr = bytes_since_pcr; parse->previous_pcr = pcr; @@ -860,13 +992,26 @@ drain_pending_buffers (MpegTSParse2 * parse, gboolean drain_all) } static GstFlowReturn -mpegts_parse_input_done (MpegTSBase * base, GstBuffer * buffer) +mpegts_parse_have_buffer (MpegTSBase * base, GstBuffer * buffer) { MpegTSParse2 *parse = GST_MPEGTS_PARSE (base); GstFlowReturn ret = GST_FLOW_OK; GST_LOG_OBJECT (parse, "Received buffer %" GST_PTR_FORMAT, buffer); + /* Assume all packets have equal size */ + if (parse->alignment > 0 && + base->packetizer->packet_size != MPEGTS_NORMAL_PACKETSIZE) { + GstMapInfo map; + guint8 *data; + + gst_buffer_map (buffer, &map, GST_MAP_READ); + data = map.data; + + parse->header = GST_READ_UINT32_BE (data); + gst_buffer_unmap (buffer, &map); + } + if (parse->current_pcr != GST_CLOCK_TIME_NONE) { GST_DEBUG_OBJECT (parse, "InputTS %" GST_TIME_FORMAT " PCR %" GST_TIME_FORMAT, @@ -894,11 +1039,37 @@ mpegts_parse_input_done (MpegTSBase * base, GstBuffer * buffer) } } - if (buffer != NULL) { - ret = gst_pad_push (parse->srcpad, buffer); + ret = + enqueue_and_maybe_push_buffer (parse, parse->srcpad, &parse->ts_adapter, + buffer); + + return ret; +} + +static void +empty_pad (GstPad * pad, MpegTSParse2 * parse) +{ + MpegTSParsePad *tspad = (MpegTSParsePad *) gst_pad_get_element_private (pad); + GstFlowReturn ret; + + ret = empty_adapter_into_pad (&tspad->ts_adapter, tspad->pad); + ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret); +} + +static GstFlowReturn +mpegts_parse_input_done (MpegTSBase * base, GstBuffer * buffer) +{ + MpegTSParse2 *parse = GST_MPEGTS_PARSE (base); + GstFlowReturn ret = GST_FLOW_OK; + + if (!prepare_src_pad (base, parse)) + return GST_FLOW_OK; + + if (parse->alignment == 0) { + ret = empty_adapter_into_pad (&parse->ts_adapter, parse->srcpad); ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret); + g_list_foreach (parse->srcpads, (GFunc) empty_pad, parse); } - return ret; } diff --git a/gst/mpegtsdemux/mpegtsparse.h b/gst/mpegtsdemux/mpegtsparse.h index f89bd4c3a365f4ba83f11b63a90793b20519c716..dfd221ed57e5c44087151c01f772ce7cb24b919d 100644 --- a/gst/mpegtsdemux/mpegtsparse.h +++ b/gst/mpegtsdemux/mpegtsparse.h @@ -46,6 +46,11 @@ G_BEGIN_DECLS typedef struct _MpegTSParse2 MpegTSParse2; typedef struct _MpegTSParse2Class MpegTSParse2Class; +typedef struct _MpegTSParse2Adapter { + GstAdapter *adapter; + guint packets_in_adapter; +} MpegTSParse2Adapter; + struct _MpegTSParse2 { MpegTSBase parent; @@ -66,7 +71,7 @@ struct _MpegTSParse2 { GList *srcpads; GstFlowCombiner *flowcombiner; - + /* state */ gboolean first; gboolean set_timestamps; @@ -75,6 +80,12 @@ struct _MpegTSParse2 { GList *pending_buffers; GstClockTime previous_pcr; guint bytes_since_pcr; + + /* Combine several packets into a larger buffer */ + MpegTSParse2Adapter ts_adapter; + guint alignment; + gboolean is_eos; + guint32 header; }; struct _MpegTSParse2Class {