package JobQueue; # TODO: # - don't remove the message from the disk-queue until AFTER the job has finished processing! (in # case we get ctrl-C'd, or the machine reboots) # # - also, we need some concept of "transactions"... so that if the process is killed halfway # through processing a job, and we start that task up again, that the messages the job initially # generates aren't duplicated. # - # As I see it, the easiest way to implement this is to NOT have job-handlers directly call # add_job().... rather, they should queue up the messages to their own internal list, and # when they return... they should RETURN THAT LIST. This fits in neatly with the idea # that the handlers are purely-functional. # # - to accomplish this though, we need an analogue of add_job() that, instead of adding it # to the queue, instead returns it BACK to the calling function, which simply adds it to # its list of messages to be returned at the end. # # - Is there a way to implement "continuations" within a job-handler? (aka. persistable-coroutine?) # Even in purely-functional functions, some functions "stay alive" for the entire span of # execution... even in purely functional programs, there's a need to collect up the # results near the end, and produce summary statistics. # # So it'd be nice if a job-handler could say "I'm done processing for now, here's a bunch # of new messages I need to be processed before I can progress further", and thereby put # itself back in the disk-queue. # # see also: http://search.cpan.org/~sartak/Jifty-0.71129/lib/Jifty/Manual/Continuations.pod # A disk-based work queue # # See http://paperlined.org/perl/job_queue/ use strict; use warnings; use IPC::DirQueue; use Storable; sub new { my $name = shift; my $class = ref($name) || $name; my ($queue_directory) = @_; my $self = {}; $self->{queue} = new IPC::DirQueue({ dir => $queue_directory }); return bless $self, $class; } # Main public entry point for adding a message to the queue. # # job_name A string, must be part of a valid perl symbol name # (if the job name is "Spider_Page", the sub name will be "JOBHANDLER_Spider_Page") # job_details ANY data needed to specify the task that needs to be done. # Will be passed as the first argument to the job handler. sub add_job { my $self = shift; my $obj = { job_name => shift, job_details => shift, }; $self->_enqueue_obj($obj); } # Fetch one job from the queue, and call that job's handler. # # Returns undef if queue was empty, 1 if a job was processed. sub process_job { my ($self) = @_; my $obj = $self->_dequeue_obj(); if (!$obj) { print "Job queue is empty.\n"; return; } my $handler = _does_sub_exist("", "JOBHANDLER_" . $obj->{job_name}) or die "Unable to process job name $obj->{job_name}\n\t"; print "Processing job '$obj->{job_name}'.\n"; $handler->( $obj->{job_details} ); return 1; } # adds a hashref/listref/whatever to the disk-based queue sub _enqueue_obj { my ($self, $obj_ref) = @_; $self->{queue}->enqueue_string(Storable::freeze($obj_ref)); } # retrieves the next object-reference from the disk-based queue sub _dequeue_obj { my ($self) = @_; my $job = $self->{queue}->pickup_queued_job() or return; my $obj_ref = Storable::thaw($job->get_data()); $job->finish(); return $obj_ref; } # Poke at the symbol tables (in non-strict mode) to check if the specified subroutine exists. # If it does exist, return a reference to that subroutine. # # see http://unix.org.ua/orelly/perl/advprog/ch06_08.htm sub _does_sub_exist { my ($packageName, $subName) = @_; # "" or "main" is the top-level package local (*stash, *alias); no strict; *stash = *{"${packageName}::"}; return unless defined(%stash); # leave if the package doesn't exist return unless exists $stash{$subName}; # leave if the symbol name doesn't exist, in any type *alias = $stash{$subName}; return unless defined(&alias); # leave if the symbol name does exist, but as something else (hash, list, scalar, etc) and not as a subroutine return \&alias; use strict; } 1;