OSDN Git Service

add bootstrap step to libdca
[handbrake-jp/handbrake-jp-git.git] / libhb / work.c
index af4665b..c9d32de 100644 (file)
@@ -49,6 +49,24 @@ hb_thread_t * hb_work_init( hb_list_t * jobs, int cpu_count,
     return hb_thread_init( "work", work_func, work, HB_LOW_PRIORITY );
 }
 
+static void InitWorkState( hb_handle_t * h )
+{
+    hb_state_t state;
+
+    state.state = HB_STATE_WORKING;
+#define p state.param.working
+    p.progress  = 0.0;
+    p.rate_cur  = 0.0;
+    p.rate_avg  = 0.0;
+    p.hours     = -1;
+    p.minutes   = -1;
+    p.seconds   = -1; 
+#undef p
+
+    hb_set_state( h, &state );
+
+}
+
 /**
  * Iterates through job list and calls do_job for each job.
  * @param _work Handle work object.
@@ -65,6 +83,7 @@ static void work_func( void * _work )
         hb_list_rem( work->jobs, job );
         job->die = work->die;
         *(work->current_job) = job;
+        InitWorkState( job->h );
         do_job( job, work->cpu_count );
         *(work->current_job) = NULL;
     }
@@ -109,7 +128,8 @@ hb_work_object_t * hb_codec_encoder( int codec )
         case HB_ACODEC_FAAC:   return hb_get_work( WORK_ENCFAAC );
         case HB_ACODEC_LAME:   return hb_get_work( WORK_ENCLAME );
         case HB_ACODEC_VORBIS: return hb_get_work( WORK_ENCVORBIS );
-        case HB_ACODEC_CA_AAC:  return hb_get_work( WORK_ENC_CA_AAC );
+        case HB_ACODEC_CA_AAC: return hb_get_work( WORK_ENC_CA_AAC );
+        case HB_ACODEC_AC3:    return hb_get_work( WORK_ENCAC3 );
     }
     return NULL;
 }
@@ -286,12 +306,24 @@ void hb_display_job_info( hb_job_t * job )
 
         if( subtitle )
         {
-            hb_log( " * subtitle track %i, %s (id %x) %s [%s] -> %s ", subtitle->track, subtitle->lang, subtitle->id,
-                    subtitle->format == PICTURESUB ? "Picture" : "Text",
-                    subtitle->source == VOBSUB ? "VOBSUB" : 
-                    ((subtitle->source == CC608SUB ||
-                      subtitle->source == CC708SUB) ? "CC" : "SRT"),
-                    subtitle->config.dest == RENDERSUB ? "Render/Burn in" : "Pass-Through");
+            if( subtitle->source == SRTSUB )
+            {
+                /* For SRT, print offset and charset too */
+                hb_log( " * subtitle track %i, %s (id %x) %s [%s] -> %s%s, offset: %"PRId64", charset: %s",
+                        subtitle->track, subtitle->lang, subtitle->id, "Text", "SRT", "Pass-Through",
+                        subtitle->config.default_track ? ", Default" : "",
+                        subtitle->config.offset, subtitle->config.src_codeset );
+            }
+            else
+            {
+                hb_log( " * subtitle track %i, %s (id %x) %s [%s] -> %s%s%s", subtitle->track, subtitle->lang, subtitle->id,
+                        subtitle->format == PICTURESUB ? "Picture" : "Text",
+                        hb_subsource_name( subtitle->source ),
+                        job->indepth_scan ? "Foreign Audio Search" :
+                        subtitle->config.dest == RENDERSUB ? "Render/Burn in" : "Pass-Through",
+                        subtitle->config.force ? ", Forced Only" : "",
+                        subtitle->config.default_track ? ", Default" : "" );
+            }
         }
     }
 
@@ -313,7 +345,7 @@ void hb_display_job_info( hb_job_t * job )
                 hb_log( "     + bitrate: %d kbps, samplerate: %d Hz", audio->config.in.bitrate / 1000, audio->config.in.samplerate );
             }
 
-            if( (audio->config.out.codec != HB_ACODEC_AC3) && (audio->config.out.codec != HB_ACODEC_DCA) )
+            if( (audio->config.out.codec != HB_ACODEC_AC3_PASS) && (audio->config.out.codec != HB_ACODEC_DCA_PASS) )
             {
                 for (j = 0; j < hb_audio_mixdowns_count; j++)
                 {
@@ -324,22 +356,24 @@ void hb_display_job_info( hb_job_t * job )
                 }
             }
 
-            if ( audio->config.out.dynamic_range_compression && (audio->config.out.codec != HB_ACODEC_AC3) && (audio->config.out.codec != HB_ACODEC_DCA))
+            if ( audio->config.out.dynamic_range_compression && (audio->config.out.codec != HB_ACODEC_AC3_PASS) && (audio->config.out.codec != HB_ACODEC_DCA_PASS))
             {
                 hb_log("   + dynamic range compression: %f", audio->config.out.dynamic_range_compression);
             }
             
-            if( (audio->config.out.codec == HB_ACODEC_AC3) || (audio->config.out.codec == HB_ACODEC_DCA) )
+            if( (audio->config.out.codec == HB_ACODEC_AC3_PASS) || (audio->config.out.codec == HB_ACODEC_DCA_PASS) )
             {
-                hb_log( "   + %s passthrough", (audio->config.out.codec == HB_ACODEC_AC3) ?
+                hb_log( "   + %s passthrough", (audio->config.out.codec == HB_ACODEC_AC3_PASS) ?
                     "AC3" : "DCA" );
             }
             else
             {
-                hb_log( "   + encoder: %s", ( audio->config.out.codec == HB_ACODEC_FAAC ) ?
-                    "faac" : ( ( audio->config.out.codec == HB_ACODEC_LAME ) ?
-                    "lame" : ( ( audio->config.out.codec == HB_ACODEC_CA_AAC ) ?
-                              "ca_aac" : "vorbis"  ) ) );
+                hb_log( "   + encoder: %s", 
+                    ( audio->config.out.codec == HB_ACODEC_FAAC ) ?  "faac" : 
+                    ( ( audio->config.out.codec == HB_ACODEC_LAME ) ?  "lame" : 
+                    ( ( audio->config.out.codec == HB_ACODEC_CA_AAC ) ?  "ca_aac" : 
+                    ( ( audio->config.out.codec == HB_ACODEC_AC3 ) ?  "ffac3" : 
+                    "vorbis"  ) ) ) );
                 hb_log( "     + bitrate: %d kbps, samplerate: %d Hz", audio->config.out.bitrate, audio->config.out.samplerate );            
             }
         }
@@ -381,11 +415,13 @@ static void do_job( hb_job_t * job, int cpu_count )
     hb_title_t    * title;
     int             i, j;
     hb_work_object_t * w;
+    hb_work_object_t * sync;
     hb_work_object_t * muxer;
     hb_interjob_t * interjob;
 
     hb_audio_t   * audio;
     hb_subtitle_t * subtitle;
+    hb_attachment_t * attachment;
     unsigned int subtitle_highest = 0;
     unsigned int subtitle_highest_id = 0;
     unsigned int subtitle_lowest = -1;
@@ -480,8 +516,8 @@ static void do_job( hb_job_t * job, int cpu_count )
     for( i = 0; i < hb_list_count( title->list_audio ); )
     {
         audio = hb_list_item( title->list_audio, i );
-        if( ( ( audio->config.out.codec == HB_ACODEC_AC3 ) && ( audio->config.in.codec != HB_ACODEC_AC3 ) ) ||
-            ( ( audio->config.out.codec == HB_ACODEC_DCA ) && ( audio->config.in.codec != HB_ACODEC_DCA ) ) )
+        if( ( ( audio->config.out.codec == HB_ACODEC_AC3_PASS ) && ( audio->config.in.codec != HB_ACODEC_AC3 ) ) ||
+            ( ( audio->config.out.codec == HB_ACODEC_DCA_PASS ) && ( audio->config.in.codec != HB_ACODEC_DCA ) ) )
         {
             hb_log( "Passthru requested and input codec is not the same as output codec for track %d",
                     audio->config.out.track );
@@ -489,6 +525,21 @@ static void do_job( hb_job_t * job, int cpu_count )
             free( audio );
             continue;
         }
+        if( audio->config.out.codec != HB_ACODEC_AC3_PASS && 
+            audio->config.out.codec != HB_ACODEC_DCA_PASS &&
+            audio->config.out.samplerate > 48000 )
+        {
+            hb_log( "Sample rate %d not supported.  Down-sampling to 48kHz.",
+                    audio->config.out.samplerate );
+            audio->config.out.samplerate = 48000;
+        }
+        if( audio->config.out.codec == HB_ACODEC_AC3 && 
+            audio->config.out.bitrate > 640 )
+        {
+            hb_log( "Bitrate %d not supported.  Reducing to 640Kbps.",
+                    audio->config.out.bitrate );
+            audio->config.out.bitrate = 640;
+        }
         if ( audio->config.in.codec == HB_ACODEC_FFMPEG )
         {
             if ( aud_id_uses[audio->id] )
@@ -508,123 +559,64 @@ static void do_job( hb_job_t * job, int cpu_count )
     }
 
     int requested_mixdown = 0;
+    int best_mixdown = 0;
     int requested_mixdown_index = 0;
 
     for( i = 0; i < hb_list_count( title->list_audio ); i++ )
     {
         audio = hb_list_item( title->list_audio, i );
 
-        if( audio->config.out.codec != audio->config.in.codec )
-        {
-            /* sense-check the current mixdown options */
+        best_mixdown = hb_get_best_mixdown( audio->config.out.codec,
+                                            audio->config.in.channel_layout );
 
-            /* log the requested mixdown */
-            for (j = 0; j < hb_audio_mixdowns_count; j++) {
-                if (hb_audio_mixdowns[j].amixdown == audio->config.out.mixdown) {
-                    requested_mixdown = audio->config.out.mixdown;
-                    requested_mixdown_index = j;
-                }
-            }
-
-            /* sense-check the requested mixdown */
+        /* sense-check the current mixdown options */
 
-            if( audio->config.out.mixdown == 0 &&
-                audio->config.out.codec != HB_ACODEC_AC3 && 
-                audio->config.out.codec != HB_ACODEC_DCA )
+        /* sense-check the requested mixdown */
+        if( audio->config.out.mixdown == 0 &&
+            audio->config.out.codec != HB_ACODEC_AC3_PASS && 
+            audio->config.out.codec != HB_ACODEC_DCA_PASS )
+        {
+            /*
+             * Mixdown wasn't specified and this is not pass-through,
+             * set a default mixdown
+             */
+            audio->config.out.mixdown = best_mixdown;
+            for (j = 0; j < hb_audio_mixdowns_count; j++)
             {
-                /*
-                 * Mixdown wasn't specified and this is not pass-through,
-                 * set a default mixdown of stereo.
-                 */
-                audio->config.out.mixdown = HB_AMIXDOWN_STEREO;
+                if (hb_audio_mixdowns[j].amixdown == audio->config.out.mixdown)
+                {
+                    hb_log("work: mixdown not specified, track %i setting mixdown %s", i, hb_audio_mixdowns[j].human_readable_name);
+                    break;
+                }
             }
+        }
 
-            // Here we try to sanitize the audio input to output mapping.
-            // Constraints are:
-            //   1. only the AC3 & DCA decoder libraries currently support mixdown
-            //   2. the lame encoder library only supports stereo.
-            // So if the encoder is lame we need the output to be stereo (or multichannel
-            // matrixed into stereo like dpl). If the decoder is not AC3 or DCA the
-            // encoder has to handle the input format since we can't do a mixdown.
-#define STEREO_ONLY(a) ( a->config.out.codec & HB_ACODEC_LAME )
+        /* log the requested mixdown */
+        for (j = 0; j < hb_audio_mixdowns_count; j++) {
+            if (hb_audio_mixdowns[j].amixdown == audio->config.out.mixdown) {
+                requested_mixdown = audio->config.out.mixdown;
+                requested_mixdown_index = j;
+                break;
+            }
+        }
 
-            switch (audio->config.in.channel_layout & HB_INPUT_CH_LAYOUT_DISCRETE_NO_LFE_MASK)
+        if ( !( audio->config.out.codec & HB_ACODEC_PASS_FLAG ) )
+        {
+            if ( audio->config.out.mixdown > best_mixdown )
             {
-                // stereo input or something not handled below
-                default:
-                case HB_INPUT_CH_LAYOUT_STEREO:
-                    // mono gets mixed up to stereo & more than stereo gets mixed down
-                    if ( STEREO_ONLY( audio ) ||
-                         audio->config.out.mixdown > HB_AMIXDOWN_STEREO)
-                    {
-                        audio->config.out.mixdown = HB_AMIXDOWN_STEREO;
-                    }
-                    break;
-
-                // mono input
-                case HB_INPUT_CH_LAYOUT_MONO:
-                    if ( STEREO_ONLY( audio ) )
-                    {
-                        audio->config.out.mixdown = HB_AMIXDOWN_STEREO;
-                    }
-                    else
-                    {
-                        // everything else passes through
-                        audio->config.out.mixdown = HB_AMIXDOWN_MONO;
-                    }
-                    break;
-
-                // dolby (DPL1 aka Dolby Surround = 4.0 matrix-encoded) input
-                // the A52 flags don't allow for a way to distinguish between DPL1 and
-                // DPL2 on a DVD so we always assume a DPL1 source for A52_DOLBY.
-                case HB_INPUT_CH_LAYOUT_DOLBY:
-                    if ( STEREO_ONLY( audio ) ||
-                         audio->config.out.mixdown > HB_AMIXDOWN_DOLBY )
-                    {
-                        audio->config.out.mixdown = HB_AMIXDOWN_DOLBY;
-                    }
-                    break;
-
-                // 4 channel discrete
-                case HB_INPUT_CH_LAYOUT_2F2R:
-                case HB_INPUT_CH_LAYOUT_3F1R:
-                    if ( STEREO_ONLY( audio ) ||
-                         audio->config.out.mixdown > HB_AMIXDOWN_DOLBY )
-                    {
-                        audio->config.out.mixdown = HB_AMIXDOWN_DOLBY;
-                    }
-                    break;
-
-                // 5 or 6 channel discrete
-                case HB_INPUT_CH_LAYOUT_3F2R:
-                    if ( STEREO_ONLY( audio ) )
-                    {
-                        if ( audio->config.out.mixdown < HB_AMIXDOWN_STEREO )
-                        {
-                            audio->config.out.mixdown = HB_AMIXDOWN_STEREO;
-                        }
-                        else if ( audio->config.out.mixdown > HB_AMIXDOWN_DOLBYPLII )
-                        {
-                            audio->config.out.mixdown = HB_AMIXDOWN_DOLBYPLII;
-                        }
-                    }
-                    else if ( ! ( audio->config.in.channel_layout &
-                                    HB_INPUT_CH_LAYOUT_HAS_LFE ) )
-                    {
-                        // we don't do 5 channel discrete so mixdown to DPLII
-                        audio->config.out.mixdown = HB_AMIXDOWN_DOLBYPLII;
-                    }
-                    break;
+                audio->config.out.mixdown = best_mixdown;
             }
+        }
 
+        if ( audio->config.out.mixdown != requested_mixdown )
+        {
             /* log the output mixdown */
-            for (j = 0; j < hb_audio_mixdowns_count; j++) {
-                if (hb_audio_mixdowns[j].amixdown == audio->config.out.mixdown) {
-                    if ( audio->config.out.mixdown != requested_mixdown )
-                    {
-                        hb_log("work: sanitizing track %i mixdown %s to %s", i, hb_audio_mixdowns[requested_mixdown_index].human_readable_name, hb_audio_mixdowns[j].human_readable_name);
-                    }
-                    break;
+            for (j = 0; j < hb_audio_mixdowns_count; j++)
+            {
+                if (hb_audio_mixdowns[j].amixdown == audio->config.out.mixdown)
+                {
+                    hb_log("work: sanitizing track %i mixdown %s to %s", i, hb_audio_mixdowns[requested_mixdown_index].human_readable_name, hb_audio_mixdowns[j].human_readable_name);
+                break;
                 }
             }
         }
@@ -641,15 +633,17 @@ static void do_job( hb_job_t * job, int cpu_count )
 
     }
     /* Synchronization */
-    if( hb_sync_init( job ) )
-    {
-        hb_error( "Failure to initialise sync" );
-        *job->die = 1;
-        goto cleanup;
-    }
+    sync = hb_sync_init( job );
 
     /* Video decoder */
     int vcodec = title->video_codec? title->video_codec : WORK_DECMPEG2;
+#if defined(USE_FF_MPEG2)
+    if (vcodec == WORK_DECMPEG2)
+    {
+        vcodec = WORK_DECAVCODECV;
+        title->video_codec_param = CODEC_ID_MPEG2VIDEO;
+    }
+#endif
     hb_list_add( job->list_work, ( w = hb_get_work( vcodec ) ) );
     w->codec_param = title->video_codec_param;
     w->fifo_in  = job->fifo_mpeg2;
@@ -658,7 +652,10 @@ static void do_job( hb_job_t * job, int cpu_count )
     /* Video renderer */
     hb_list_add( job->list_work, ( w = hb_get_work( WORK_RENDER ) ) );
     w->fifo_in  = job->fifo_sync;
-    w->fifo_out = job->fifo_render;
+    if( !job->indepth_scan )
+        w->fifo_out = job->fifo_render;
+    else
+        w->fifo_out = NULL;
 
     if( !job->indepth_scan )
     {
@@ -685,6 +682,7 @@ static void do_job( hb_job_t * job, int cpu_count )
 
     /*
      * Look for the scanned subtitle in the existing subtitle list
+     * select_subtitle implies that we did a scan.
      */
     if ( !job->indepth_scan && interjob->select_subtitle &&
          ( job->pass == 0 || job->pass == 2 ) )
@@ -692,8 +690,6 @@ static void do_job( hb_job_t * job, int cpu_count )
         /*
          * Disable forced subtitles if we didn't find any in the scan
          * so that we display normal subtitles instead.
-         *
-         * select_subtitle implies that we did a scan.
          */
         if( interjob->select_subtitle->config.force && 
             interjob->select_subtitle->forced_hits == 0 )
@@ -707,12 +703,14 @@ static void do_job( hb_job_t * job, int cpu_count )
             if( subtitle )
             {
                 /*
-                * Disable forced subtitles if we didn't find any in the scan
-                * so that we display normal subtitles instead.
-                *
-                * select_subtitle implies that we did a scan.
+                * Remove the scanned subtitle from the subtitle list if
+                * it would result in an identical duplicate subtitle track
+                * or an emty track (forced and no forced hits).
                 */
-                if( interjob->select_subtitle->id == subtitle->id )
+                if( ( interjob->select_subtitle->id == subtitle->id ) &&
+                    ( ( interjob->select_subtitle->forced_hits == 0 &&
+                        subtitle->config.force ) ||
+                    ( subtitle->config.force == interjob->select_subtitle->config.force ) ) )
                 {
                     *subtitle = *(interjob->select_subtitle);
                     free( interjob->select_subtitle );
@@ -777,6 +775,31 @@ static void do_job( hb_job_t * job, int cpu_count )
                 w->subtitle = subtitle;
                 hb_list_add( job->list_work, w );
             }
+            
+            if( !job->indepth_scan && subtitle->source == UTF8SUB )
+            {
+                w = hb_get_work( WORK_DECUTF8SUB );
+                w->fifo_in  = subtitle->fifo_in;
+                w->fifo_out = subtitle->fifo_raw;
+                hb_list_add( job->list_work, w );
+            }
+            
+            if( !job->indepth_scan && subtitle->source == TX3GSUB )
+            {
+                w = hb_get_work( WORK_DECTX3GSUB );
+                w->fifo_in  = subtitle->fifo_in;
+                w->fifo_out = subtitle->fifo_raw;
+                hb_list_add( job->list_work, w );
+            }
+            
+            if( !job->indepth_scan && subtitle->source == SSASUB )
+            {
+                w = hb_get_work( WORK_DECSSASUB );
+                w->fifo_in  = subtitle->fifo_in;
+                w->fifo_out = subtitle->fifo_raw;
+                w->subtitle = subtitle;
+                hb_list_add( job->list_work, w );
+            }
 
             if( !job->indepth_scan && 
                 subtitle->format == PICTURESUB
@@ -821,8 +844,8 @@ static void do_job( hb_job_t * job, int cpu_count )
             /*
             * Audio Encoder Thread
             */
-            if( audio->config.out.codec != HB_ACODEC_AC3 &&
-                audio->config.out.codec != HB_ACODEC_DCA )
+            if( audio->config.out.codec != HB_ACODEC_AC3_PASS &&
+                audio->config.out.codec != HB_ACODEC_DCA_PASS )
             {
                 /*
                 * Add the encoder thread if not doing AC-3 pass through
@@ -860,7 +883,7 @@ static void do_job( hb_job_t * job, int cpu_count )
     job->done = 0;
 
     /* Launch processing threads */
-    for( i = 1; i < hb_list_count( job->list_work ); i++ )
+    for( i = 0; i < hb_list_count( job->list_work ); i++ )
     {
         w = hb_list_item( job->list_work, i );
         w->done = &job->done;
@@ -875,12 +898,32 @@ static void do_job( hb_job_t * job, int cpu_count )
                                     HB_LOW_PRIORITY );
     }
 
-    // The muxer requires track information that's set up by the encoder
-    // init routines so we have to init the muxer last.
-    muxer = job->indepth_scan? NULL : hb_muxer_init( job );
+    if ( job->indepth_scan )
+    {
+        muxer = NULL;
+        w = sync;
+        sync->done = &job->done;
+    }
+    else
+    {
+        sync->done = &job->done;
+        sync->thread_sleep_interval = 10;
+        if( sync->init( w, job ) )
+        {
+            hb_error( "Failure to initialise thread '%s'", w->name );
+            *job->die = 1;
+            goto cleanup;
+        }
+        sync->thread = hb_thread_init( sync->name, work_loop, sync,
+                                    HB_LOW_PRIORITY );
+
+        // The muxer requires track information that's set up by the encoder
+        // init routines so we have to init the muxer last.
+        muxer = hb_muxer_init( job );
+        w = muxer;
+    }
 
-    w = hb_list_item( job->list_work, 0 );
-    while( !*job->die && w->status != HB_WORK_DONE )
+    while ( !*job->die && !*w->done && w->status != HB_WORK_DONE )
     {
         hb_buffer_t      * buf_in, * buf_out;
 
@@ -896,12 +939,17 @@ static void do_job( hb_job_t * job, int cpu_count )
             break;
         }
 
+        buf_out = NULL;
         w->status = w->work( w, &buf_in, &buf_out );
 
         if( buf_in )
         {
             hb_buffer_close( &buf_in );
         }
+        if ( buf_out && w->fifo_out == NULL )
+        {
+            hb_buffer_close( &buf_out );
+        }
         if( buf_out )
         {
             while ( !*job->die )
@@ -914,9 +962,6 @@ static void do_job( hb_job_t * job, int cpu_count )
             }
         }
     }
-    hb_list_rem( job->list_work, w );
-    w->close( w );
-    free( w );
 
     hb_handle_t * h = job->h;
     hb_state_t state;
@@ -924,14 +969,22 @@ static void do_job( hb_job_t * job, int cpu_count )
     
     hb_log("work: average encoding speed for job is %f fps", state.param.working.rate_avg);
 
-cleanup:
-    /* Stop the write thread (thread_close will block until the muxer finishes) */
+    job->done = 1;
     if( muxer != NULL )
     {
-        hb_thread_close( &muxer->thread );
         muxer->close( muxer );
+        free( muxer );
+
+        if( sync->thread != NULL )
+        {
+            hb_thread_close( &sync->thread );
+            sync->close( sync );
+        }
+        free( sync );
     }
 
+cleanup:
+    /* Stop the write thread (thread_close will block until the muxer finishes) */
     job->done = 1;
 
     /* Close work objects */
@@ -1107,6 +1160,9 @@ static void work_loop( void * _w )
             break;
         }
 
+        // Invalidate buf_out so that if there is no output
+        // we don't try to pass along junk.
+        buf_out = NULL;
         w->status = w->work( w, &buf_in, &buf_out );
 
         // Propagate any chapter breaks for the worker if and only if the
@@ -1117,7 +1173,7 @@ static void work_loop( void * _w )
         if( buf_in && buf_out && buf_in->new_chap && buf_in->start == buf_out->start)
         {
             // restore log below to debug chapter mark propagation problems
-            //hb_log("work %s: Copying Chapter Break @ %lld", w->name, buf_in->start);
+            //hb_log("work %s: Copying Chapter Break @ %"PRId64, w->name, buf_in->start);
             buf_out->new_chap = buf_in->new_chap;
         }
 
@@ -1125,6 +1181,10 @@ static void work_loop( void * _w )
         {
             hb_buffer_close( &buf_in );
         }
+        if ( buf_out && w->fifo_out == NULL )
+        {
+            hb_buffer_close( &buf_out );
+        }
         if( buf_out )
         {
             while ( !*w->done )
@@ -1137,4 +1197,12 @@ static void work_loop( void * _w )
             }
         }
     }
+    // Consume data in incoming fifo till job complete so that
+    // residual data does not stall the pipeline
+    while( !*w->done )
+    {
+        buf_in = hb_fifo_get_wait( w->fifo_in );
+        if ( buf_in != NULL )
+            hb_buffer_close( &buf_in );
+    }
 }