/* * tsswitchtest * Copyright 2015 Andreas Frisch * * This program is licensed under the Creative Commons * Attribution-NonCommercial-ShareAlike 3.0 Unported * License. To view a copy of this license, visit * http://creativecommons.org/licenses/by-nc-sa/3.0/ or send a letter to * Creative Commons,559 Nathan Abbott Way,Stanford,California 94305,USA. * * Alternatively, this program may be distributed and executed on * hardware which is licensed by Dream Multimedia GmbH. * * This program is NOT free software. It is open source, you are allowed * to modify it (if you keep the license), but it may not be commercially * distributed other than under the conditions noted above. * * gcc -Wall -g `pkg-config gstreamer-1.0 gstreamer-app-1.0 --cflags --libs` tsswitchtest.c -o tsswitchtest */ #include #include #include #include #include #include #include #include #define TS_PACKET_SIZE 188 #define MPEGTIME_TO_GSTTIME(time) (gst_util_uint64_scale ((time), GST_MSECOND/10, 9LL)) typedef struct { GMainLoop *loop; GstClockTime last_pcr; GstElement *appsrc; GstPipeline *pipeline; int fd; GMutex ts_mutex; } App; // static GstPadProbeReturn switch_to_testsrc (GstPad * pad, GstPadProbeInfo * info, gpointer user_data); static void switch_to_testsrc (GstElement * queue, gpointer user_data); static void appsrc_need_data (GstAppSrc *, guint, gpointer); GST_DEBUG_CATEGORY (tsswitchtest_debug); #define GST_CAT_DEFAULT tsswitchtest_debug int pcr_extract(unsigned char *pcr_raw, App *app) { unsigned long long pcr_base; pcr_base = pcr_raw[0]; pcr_base <<= 25; pcr_base |= pcr_raw[1] << 17; pcr_base |= pcr_raw[2] << 9; pcr_base |= pcr_raw[3] << 1; pcr_base |= (pcr_raw[4] & 0x80) >> 7; app->last_pcr = MPEGTIME_TO_GSTTIME(pcr_base); GST_INFO ("pcr_extract pcr_base=%lld gst time=%"GST_TIME_FORMAT"", pcr_base, GST_TIME_ARGS (app->last_pcr)); return 0; } int handle_af(unsigned char *af, App *app) { int len; int flags; int rp = 2; len = af[0]; if(len == 0) return 0; len += 1; //incl. len field flags = af[1]; if(flags & 0x10) { //PCR //printf("PCR!\n"); //out_hex(af, 20); pcr_extract(&af[rp], app); rp += 6; } //check for stuffing if(len != rp) { //printf("%d stuff bytes\n", len - rp); } return 0; } int handle_packet(unsigned char *ts, App *app) { int pid; int afc; if(ts[0] != 0x47) return -1; pid = ((ts[1] & 0x1f) << 8); pid|= ts[2]; afc = ((ts[3] & 0x30) >> 4); if(afc & 2) { if(handle_af(&ts[4], app)) return -1; } return 0; } static GstPadProbeReturn cb_have_data (GstPad *pad, GstPadProbeInfo *info, gpointer user_data) { App *app = user_data; GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info); GstMapInfo map; if (buffer) { gst_buffer_map (buffer, &map, GST_MAP_READ); int p = 0; while (map.data[p] != 0x47) p++; while (p < map.size-TS_PACKET_SIZE) { // GST_INFO ("p=%i", p); if(handle_packet(map.data+p, app)) { break; } p += TS_PACKET_SIZE; } gst_buffer_unmap (buffer, &map); } return GST_PAD_PROBE_OK; } static void appsrc_need_data (GstAppSrc *appsrc, guint suggested_size, gpointer user_data) { App *app = user_data; gsize off = 0; // suggested_size = (suggested_size/TS_PACKET_SIZE) * TS_PACKET_SIZE; char *data = malloc(suggested_size * sizeof(unsigned char)); if (!data) g_error("Couldn't allocate %lu bytes of memory for buffer", suggested_size * sizeof(unsigned char)); ssize_t upstream_r; upstream_r = read(app->fd, data, suggested_size); if (upstream_r < 0) g_error("Couldn't read payload! reason: %s", upstream_r==1 ? strerror(errno) : "read 0 (peer disconnect)"); if (upstream_r == 0) { g_signal_handlers_disconnect_by_func(app->appsrc, G_CALLBACK (appsrc_need_data), app); GstElement *queue = gst_bin_get_by_name_recurse_up (GST_BIN (app->pipeline), "ts_queue"); g_signal_connect (queue, "underrun", G_CALLBACK (switch_to_testsrc), app); GstElement *parser = gst_bin_get_by_name_recurse_up (GST_BIN (app->pipeline), "tsparse"); // GstPad *pad; pad = gst_element_get_static_pad (parser, "sink"); gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BUFFER, cb_have_data, app, NULL); gst_object_unref (pad); return; } GstBuffer *buffer = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, data, upstream_r, off, upstream_r-off, data, (GDestroyNotify) free); GstFlowReturn pret; pret = gst_app_src_push_buffer (appsrc, gst_buffer_ref(buffer)); GST_INFO ("pushing buffer readlength=%zd to appsink returned %s (suggest_size=%u) %" GST_PTR_FORMAT "", upstream_r, gst_flow_get_name (pret), suggested_size, buffer); gst_buffer_unref(buffer); return; } static void switch_to_testsrc (GstElement * queue, gpointer user_data) // static GstPadProbeReturn switch_to_testsrc (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) { App *app = user_data; g_mutex_lock (&app->ts_mutex); g_signal_handlers_disconnect_by_func(queue, G_CALLBACK (switch_to_testsrc), app); GstClock *clock = gst_element_get_clock (GST_ELEMENT(app->pipeline)); GstClockTime running_time = gst_clock_get_time (clock) - gst_element_get_base_time(app->appsrc); // GstElement *queue = gst_bin_get_by_name_recurse_up (GST_BIN (mediabin), "ts_queue"); gst_element_unlink (app->appsrc, queue); gst_bin_remove (GST_BIN (app->pipeline), app->appsrc); GstElement *vsrc = gst_element_factory_make ("videotestsrc", NULL); GstElement *vflt = gst_element_factory_make ("capsfilter", NULL); GstElement *venc = gst_element_factory_make ("x264enc", NULL); GstElement *mux = gst_element_factory_make ("mpegtsmux", NULL); GstElement *asrc = gst_element_factory_make ("audiotestsrc", NULL); GstElement *aflt = gst_element_factory_make ("capsfilter", NULL); GstElement *aenc = gst_element_factory_make ("faac", NULL); GstCaps *vcaps = gst_caps_new_simple ("video/x-raw", "format", G_TYPE_STRING, "YV12", "framerate", GST_TYPE_FRACTION, 25, 1, "width", G_TYPE_INT, 720, "height", G_TYPE_INT, 576, NULL); g_object_set(G_OBJECT(vflt), "caps", vcaps, NULL); GstCaps *acaps = gst_caps_new_simple ("audio/x-raw", "format", G_TYPE_STRING, "S16LE", "rate", G_TYPE_INT, 48000, "channels", G_TYPE_INT, 2, NULL); g_object_set(G_OBJECT(aflt), "caps", acaps, NULL); gst_bin_add_many (GST_BIN (app->pipeline), vsrc, vflt, venc, asrc, aflt, aenc, mux, NULL); int lret = gst_element_link_many (vsrc, vflt, venc, mux, NULL); GST_LOG ("gst_element_link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT " returned %i", vsrc, vflt, venc, mux, lret); lret = gst_element_link_many (asrc, aflt, aenc, mux, NULL); GST_LOG ("gst_element_link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT " returned %i", asrc, aflt, aenc, mux, lret); lret = gst_element_link (mux, queue); GST_LOG ("gst_element_link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT " returned %i", mux, queue, lret); GstPad *srcpad = gst_element_get_static_pad (mux, "src"); // gst_pad_set_offset (srcpad, G_GINT64_CONSTANT(0)-(app->last_pcr-running_time)); gst_object_unref (srcpad); g_object_set (G_OBJECT (vsrc), "do-timestamp", TRUE, NULL); // g_object_set (G_OBJECT (vsrc), "timestamp-offset", app->last_pcr, NULL); g_object_set (G_OBJECT (vsrc), "timestamp-offset", running_time, NULL); g_object_set (G_OBJECT (asrc), "do-timestamp", TRUE, NULL); // g_object_set (G_OBJECT (asrc), "timestamp-offset", app->last_pcr, NULL); g_object_set (G_OBJECT (asrc), "timestamp-offset", running_time, NULL); GST_INFO ("switch to videotestsrc running time=%"GST_TIME_FORMAT" last_pcr=%"GST_TIME_FORMAT"", GST_TIME_ARGS (running_time), GST_TIME_ARGS (app->last_pcr)); GST_INFO ("mux_pad_offset=%"PRId64" (^=-%"GST_TIME_FORMAT")", gst_pad_get_offset (srcpad), GST_TIME_ARGS (G_GINT64_CONSTANT(-1)*gst_pad_get_offset (srcpad))); GstStateChangeReturn sret; sret = gst_element_set_state (GST_ELEMENT(app->pipeline), GST_STATE_PLAYING); GST_LOG ("gst_element_set_state mux ret %i", sret); gst_element_set_state (app->appsrc, GST_STATE_NULL); gst_object_unref (app->appsrc); /* syslog(LOG_DEBUG, "inject_warning running time %"GST_TIME_FORMAT" (last valid=%"GST_TIME_FORMAT")", GST_TIME_ARGS (running_time), GST_TIME_ARGS (app->last_pcr)); GstClock *clock = gst_element_get_clock (app->appsrc); GstClockTime running_time = gst_clock_get_time (clock); int fd; if ((fd = open("upstreamdisconnected.ts", O_RDONLY)) != -1) { gsize rlen; while (TRUE) { char data[4096]; rlen = read(fd, data, sizeof(data)); GstBuffer *buffer = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, data, rlen, 0, rlen, NULL, NULL); GST_BUFFER_PTS(buffer) = app->last_pcr+G_GINT64_CONSTANT(40)*GST_MSECOND; GST_BUFFER_DTS(buffer) = app->last_pcr+G_GINT64_CONSTANT(40)*GST_MSECOND; GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT); GstFlowReturn pret; if (rlen <= 0) break; pret = gst_app_src_push_buffer (GST_APP_SRC(app->appsrc), gst_buffer_ref(buffer)); GST_LOG ("pushing disconnection warning buffer readlength=%zd to appsink returned %s %" GST_PTR_FORMAT "", rlen, gst_flow_get_name (pret), buffer); } while (rlen > 0); }*/ g_mutex_unlock (&app->ts_mutex); // return GST_PAD_PROBE_REMOVE; } int main (int argc, char *argv[]) { App *app = malloc(sizeof(App)); GError *error = NULL; gst_init (&argc, &argv); GST_DEBUG_CATEGORY_INIT (tsswitchtest_debug, "tsswitchtest", GST_DEBUG_BOLD | GST_DEBUG_FG_YELLOW | GST_DEBUG_BG_BLUE, "ts videotestsrc switching test"); if ((app->fd = open(argv[1], O_RDONLY)) == -1) g_error("can't open %s", argv[1]); GST_INFO ("opened %s fd=%i", argv[1], app->fd); app->loop = g_main_loop_new (NULL, FALSE); app->pipeline = (GstPipeline*) gst_parse_launch ("appsrc name=tsmediaappsrc do-timestamp=FALSE is-live=TRUE ! queue name=ts_queue ! tsparse name=tsparse set-timestamps=TRUE ! tsdemux name=d d. ! queue ! faad ! audioconvert ! alsasink d. ! queue ! h264parse ! avdec_h264 ! autovideosink", &error); if (!app->pipeline) { g_print ("Parse error: %s\n", error->message); exit (1); } app->appsrc = gst_bin_get_by_name_recurse_up (GST_BIN (app->pipeline), "tsmediaappsrc"); g_signal_connect (app->appsrc, "need-data", G_CALLBACK (appsrc_need_data), app); g_object_set (G_OBJECT (app->appsrc), "emit-signals", TRUE, NULL); gst_element_set_state (GST_ELEMENT(app->pipeline), GST_STATE_PLAYING); g_mutex_init (&app->ts_mutex); g_main_loop_run (app->loop); g_main_loop_unref (app->loop); g_mutex_clear (&app->ts_mutex); return 0; }