Skip to content

Commit

Permalink
Import code from Google Code.
Browse files Browse the repository at this point in the history
  • Loading branch information
themel committed May 18, 2015
0 parents commit bbb5a04
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 0 deletions.
6 changes: 6 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
1.00 (2009-04-26)

* first release. works, including batching.



7 changes: 7 additions & 0 deletions MANIFEST
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CHANGES
demo-worker.pl
demo-insert-job.pl
lib/TheSchwartz/Worker/PubSubHubbubPublish.pm
Makefile.PL
MANIFEST This list of files
t/00-use.t
13 changes: 13 additions & 0 deletions Makefile.PL
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use ExtUtils::MakeMaker;
WriteMakefile( 'NAME' => 'TheSchwartz::Worker::PubSubHubbubPublish',
'VERSION_FROM' => 'lib/TheSchwartz/Worker/PubSubHubbubPublish.pm',
'PREREQ_PM' => {
'TheSchwartz::Worker' => 0,
'Net::PubSubHubbub::Publisher' => 0.91,
},
ABSTRACT_FROM => 'lib/TheSchwartz/Worker/PubSubHubbubPublish.pm',
AUTHOR => 'Brad Fitzpatrick <[email protected]>',
);



2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
A perl module for the TheSchwartz job queue system that implements the
publishing ("pinging") part of the PubSubHubbub protocol.
34 changes: 34 additions & 0 deletions demo-insert-job.pl
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/usr/bin/perl

use strict;
use TheSchwartz;
use Getopt::Long;

my $n_jobs = 1;
my $hub_url = "http://pubsubhubbub.appspot.com/";

GetOptions("n=i" => \$n_jobs,
"hub=s" => \$hub_url,
) or die "Unknown options.";

my $client = TheSchwartz->new(databases => [{
user => "root",
dsn => "dbi:mysql:theschwartz",
}]);

for (1..$n_jobs) {
my $topic = "http://publisher.example.com/topic/" . rand() . ".atom";
print "Submitting dummy topic $topic ...\n";

my $job = TheSchwartz::Job->new(funcname => 'TheSchwartz::Worker::PubSubHubbubPublish',
arg => {
hub => $hub_url,
topic_url => $topic,
},
coalesce => $hub_url,
);

my $handle = $client->insert($job);
print " job handle: $handle\n";
}

14 changes: 14 additions & 0 deletions demo-worker.pl
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/perl

use strict;
use lib 'lib';
use TheSchwartz;
use TheSchwartz::Worker::PubSubHubbubPublish;

my $client = TheSchwartz->new(databases => [{
user => "root",
dsn => "dbi:mysql:theschwartz",
}]);

$client->can_do("TheSchwartz::Worker::PubSubHubbubPublish");
$client->work;
132 changes: 132 additions & 0 deletions lib/TheSchwartz/Worker/PubSubHubbubPublish.pm
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
=head1 NAME
TheSchwartz::Worker::PubSubHubbubPublish - ping pubsubhubbub hub servers
=head1 SYNOPSIS
use TheSchwartz;
use TheSchwartz::Worker::PubSubHubbubPublish;
my $sclient = TheSchwartz->new(databases => \@Conf::YOUR_DBS);
$sclient->can_do("TheSchwartz::Worker::PubSubHubbubPublish");
$sclient->work; # main loop of program; goes forever, pinging as needed
=head1 DESCRIPTION
This is a worker class for sending pings to PubSubHubbub hub servers.
See L<TheSchwartz> and L<Net::PubSubHubbub::Publisher> for more
information.
=head1 JOB ARGUMENTS
When constructing a job using L<TheSchwartz>'s insert_job
method, construct your L<TheSchwartz::Job> instance with its
'argument' of the following form:
{
hub => $hub_url, # the hub's endpoint URL
topic_url => $url, # Atom URL that was updated
}
Also, if you set your L<TheSchwartz::Job>'s C<coalesce> property to be
the hub URL, this worker will do batch pings instead, vastly reducing
the number of HTTP requests it does.
=cut

package TheSchwartz::Worker::PubSubHubbubPublish;
use strict;
use base 'TheSchwartz::Worker';
use Storable;
use Net::PubSubHubbub::Publisher 0.91;

our $VERSION = '1.00';

our $MAX_BATCH_SIZE = 50;

my $keep_exit_status_for = 0;
sub set_keep_exit_status_for { $keep_exit_status_for = shift; }

my %publisher; # $hub -> Net::PubSubHubbub::Publisher

sub work {
my ($class, $job) = @_;
my $client = $job->handle->client;
my $hub = $job->arg->{hub};
unless ($hub && $hub =~ m!^https?://\S+$!) {
$job->permanent_failure("Bogus hub $hub. Ignoring job.");
return;
}

my @jobs;
my @topics;

my $add_job = sub {
my $j = shift;
my $args = $j->arg;
unless ($args->{hub} eq $hub) {
# Each job must share the same hub.
warn "WARNING: coalesced job had different hub in its args. Skipping.";
return;
}

push @jobs, $j;
push @topics, $args->{topic_url};
};
$add_job->($job);

my $publisher = $publisher{$hub} ||=
Net::PubSubHubbub::Publisher->new(hub => $hub);

while (@topics < $MAX_BATCH_SIZE) {
my $j = $client->find_job_with_coalescing_value(__PACKAGE__, $hub);
last unless $j;
$add_job->($j);
}

if ($publisher->publish_update(@topics)) {
warn "Pinged $hub about topic(s): @topics.\n";
foreach my $j (@jobs) {
$j->completed;
}
return;
}

my $failure_reason = $publisher->last_response->status_line;
warn "Failed to ping $hub about @topics: $failure_reason\n";
$job->failed($failure_reason);
}

sub keep_exit_status_for {
return 0 unless $keep_exit_status_for;
return $keep_exit_status_for->() if ref $keep_exit_status_for eq "CODE";
return $keep_exit_status_for;
}

sub grab_for { 30 }
sub max_retries { 10 }
sub retry_delay {
my ($class, $fails) = @_;
return 30 * $fails;
}

=head1 AUTHOR
Brad Fitzpatrick -- [email protected]
=head1 COPYRIGHT, LICENSE, and WARRANTY
Copyright 2009, Brad Fitzpatrick.
License to use under the same terms as Perl itself.
This software comes with no warranty of any kind.
=head1 SEE ALSO
L<TheSchwartz>
L<http://code.google.com/p/pubsubhubbub/>
=cut

1;
5 changes: 5 additions & 0 deletions t/00-use.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/perl
use Test::More tests => 1;
BEGIN {
use_ok( 'TheSchwartz::Worker::PubSubHubbubPublish' );
}

0 comments on commit bbb5a04

Please sign in to comment.