sungo is a professional perl programmer, the author of several CPAN modules and a POE committer. He may be reached at [email protected] or on irc.perl.org #perl.
Introduction
Enterprise Perl is a phrase that is thrown around a lot in the Perl community these days. Perl is so flexible that corporations must come to terms with how much of Perl they're going to allow in their development environment. Most of these discussions, however, assume that Perl is only a good choice for automation tasks (single run applications) or WWW applications. Perl simply can't handle the amount of data, transactions, users, etc that other languages can. One would never implement high performance, long running applications in Perl. (Or so the thinking goes.)
In the mid 90s, these thoughts were more or less true. Since then, Perl has grown up. Perl is now capable of handling all but the most speed-thirsty applications and is a prime choice for server-based application design. Several frameworks now exist to make these applications easier to build and easier to maintain.
My framework of choice is the Perl Object Environment, or POE. POE is a single-threaded, event driven,
cooperative multitasking environment for Perl. Basically, POE is an
application framework in which a single threaded perl
process waits for
events to occur so it can act accordingly. This event loop comprises the
core of a POE process.
If all POE offered was an event loop, there would not be much to talk
about, though. Nor would POE be particularly special. Several event loop
modules already exist on CPAN. Event
, Coro
, IO::Events
, and IO::Poll
all
offer similar functionality. However, any worthwhile application demands
more than a simple set of actions.
The Lingo
As an application framework, POE offers several abstractions to make design and implementation easier. It is important to understand POE's unique lingo to use it properly.
Kernel
The Kernel is the core of a POE system, living in the POE::Kernel
namespace. It is analagous to the kernel of an operating system (hence
the name) and it handles all event queuing and dispatching and all
manner of other low-level functionality. Nearly three-quarters of the
POE API is utilized by operating on the kernel. As such, the kernel
object (which is created for you behind the scenes) is always available
via the global variable $poe_kernel
. The POE system is initialized and
launched using the run()
method on the kernel. This method blocks and
will only return when POE has completely shutdown.
Sessions
A Session is the fundamental unit of the POE environment and lives in
the POE::Session
namespace. It is essentially a worker class and
embodies a series of states and events. Sessions are slightly analogous
to threads in that they have a unique runtime context and a semi-private
data store (called the "heap"). Each session operates independently from
other sessions, receiving time-slices from the POE kernel. It is
important to remember that, despite the similarity to threads, all POE
sessions run in the same single-threaded process and share CPU time.
Wheels
For some tasks, a full session is unnecessary. Sometimes, it makes more
sense to alter the abilities of an existing session to provide the
desired functionality. Wheels mutate or alter the abilities of a session
to provide some new functionality. They live in the POE::Wheel
namespace.
Wheels share their entire operating context with the session but do not have the same features as a session. Wheels do not have their own heap and cannot create aliases for themselves. In many ways, they are like a parasite clinging to the side of the user's code.
The upside to using Wheels is the loss of internal POE overhead. Sessions require a certain amount of maintenance to keep running. POE checks sessions to see if they still have work to do, if there are timers or alarms outstanding for them, if they should be garbage collected, etc. The more sessions that exist in a system, the more that overhead grows. Wheels have none of this overhead. They piggyback on top of the user's session so, apart from any events they may trigger as part of their normal operation, there is no inherent internal POE overhead in using a wheel.
POE ships with a few core wheels. POE::Wheel::SocketFactory
allows a
session to talk to network sockets or listen on network sockets. This is
generally used in conjuction with POE::Wheel::ReadWrite
which notifies
a session of the readability or writability of file descriptors.
POE::Wheel::FollowTail
allows a session to "tail" a file (like the
"tail" command in most unices) and receive notification when new content
is available.
Filters
Many wheels handle incoming and outgoing data. For instance, FollowTail
catches new content from whatever file it is watching and presents it to
the session. When watching a log file, simply getting a blob of text
back is perfectly acceptable. In real systems applications, however, POE
is generally watching more complex data streams like HTTP, XML-RPC, MP3
streams or proprietary protocols.
Filters provide translation services to allow users to get back sane objects or data structures instead of giant blobs of network data. Basically, filters are very simple data-parsing modules. Most POE filters are limited enough to be used outside of a POE environment. They know nothing of POE or of the running POE environment. Most wheels accept filters for both input and output and allow for filters to be hot-plugged at runtime.
Drivers
Drivers provide low-level IO primitives. Generally, wheels use drivers to
read and write from file descriptors and the like without needing to
know the details of the operation. Currently, POE ships with a single
driver, POE::Driver::SysRW
, that abstracts "sysread" and "syswrite"
semantics. SysRW defaults to a block size of 65536 which can be
customized via a constructor parameter.
Components
Components are sessions that provide services. Unlike a wheel, which
plugs into a session and adds functionality, Components run seperately
in the background and offer some functionality through an abstracted API.
They are analogous to system daemons on modern unices. For instance,
POE::Component::Client::DNS
offers asynchronous DNS resolution.
Standard State Parameters
All POE states receive a series of standard parameters. For reasons of
speed, these are passed as part of @_
and accessed via subroutine
constant indexes.
my $kernel = $_[KERNEL]; my ($kernel, $session) = @_[KERNEL, SESSION];
These are the fields that make up the standard parameter list:
- KERNEL
- A reference to
$poe_kernel
- SESSION
- A reference to the current session
- SENDER
- A reference to the session that sent an event
- STATE
- The event name that caused the state to occur
- HEAP
- A reference to the session's storage space
- OBJECT
- For object states, this contains the object whose method is being invoked. For package states, this contains the name of the package whose method is being invoked. This parameter will always be undefined for inline states.
- CALLER_FILE / CALLER_LINE / CALLER_STATE
- The file, line number, and state from which this event was sent
- ARG0 .. ARG9
- The first ten custom parameters to a state. They will always be at
the end of
@_
so it is possible to send more than 10 parameters. Often they are accessed using$#_
like so:
my @args = @_[ARG0 .. $#_];
Working with POE
With those concepts in mind, it is time to look at some code. To make life a bit easier, let's say we would like to accept and parse data that resembles CGI query strings. This data will be key value pairs in which the key and value are seperated by "="'s and the pairs themselves are delimited by "&". An example string is as follows:
foo=bar&baz=1&bat=2
A Filter
First, we need a parser for our data streams. As discussed earlier, Filters are much easier to deal with because they are unaware of their environment and the POE context in which they are run.
package POE::Filter::SimpleQueryString; use warnings; use strict; use Carp qw(carp croak);
Next we need a constructor.
sub new { my $class = shift; my $self = bless { buffer => undef, }, $class; return $self; }
This is about the most simplistic constructor possible. This very simple filter requires no parameters to operate. It is perfectly reasonable, however, to demand parameters of the user. For instance, if the filter could decrypt the incoming data before parsing, a parameter could turn that feature on. NOTE: Use constructor parameters sparingly in filters. Some wheels and components only take the name of the filter and do not allow parameter passing.
POE has two possible APIs for filters. The easiest, and the oldest,
interface uses get()
and put()
to hand off the data. get()
is passed a
large data chunk and it parses the data, returning as many records as
the filter can find as an array reference. put()
works similarly. It is
handed a set of records and translates them back into a raw data stream,
returning an array reference of data chunks.
The second, and more complex, API for filters allows for runtime
switchout of filters. The second API uses get_one_start()
and get_one()
to parse data into records and put()
to translate records into a data
stream. This API version also calls for a get_pending()
method that
returns the contents of the current internal buffer, allowing wheels to
trade out filters without data loss. get_one_start()
accepts the initial
data buffer as an array reference containing data chunks. It adds this
data to an internal buffer and returns. get_one()
processes that buffer,
returning an array reference of records. Unlike get()
in the previous
API, get_one()
is not greedy and should return either zero or one
record.
We will be using the newer get_one_start()/get_one()
version of the
Filter API. If this code were destined for CPAN, we would also implement
the older API for backwards compatibility.
get_one_start / get_one
As stated above, get_one_start()
's job is simply to add data to the
internal buffer.
sub get_one_start { my $self = shift; my $incoming = shift; $self->{buffer} .= join '', @$incoming; return; }
Note that we are creating one big string buffer from the incoming data
chunks. The filter has no control over how data is chunked. Our parser,
however, has specific requirements about what a chunk should look like.
Smashing everything back into a string buffer allows the parser inside
get_one()
to chunk data however it wants.
get_one()
's job is to transform raw data into cooked record sets. The
example string above foo=bar&baz=1&bat=2
will become a hash.
sub get_one { my $self = shift;
In our super-easy format, an individual record is terminated by a newline, "\n". Key value pairs are delimited by "&". The key and value themselves are separated by an "=". Note that we aren't dealing with issues like character escaping or data taint.
my @chunks;
Each parsed line makes up a chunk of data. We want to represent each record as a distinct entity to the user.
$self->{buffer} =~ s/^(.+?)\n//; my $line = $1; if(defined $line && length $line) { my @pairs = split(/&/, $line); my %chunk; foreach my $pair (@pairs) { my ($key, $value) = split(/=/, $pair, 2);
So what happens if there is more than one instance of a given key in a record? Simple. We make an array reference. The user will need to inspect the value of each key to determine if they have more than one value.
if(defined $chunk{ $key }) { if(ref $chunk{ $key } eq 'ARRAY') { push @{ $chunk{ $key } }, $value; } else { $chunk{ $key } = [ $chunk{ $key }, $value ]; } } else { $chunk{ $key } = $value; } } push @chunk, \%chunk; } return \@chunks;
A call to get_one()
returns an array reference containing hashes of our
data.
$VAR1 = [ { 'bat' => '2', 'baz' => '1', 'foo' => 'bar' } ];
put
put()
goes the other direction with our data. It accepts an array
reference of records like we created above and creates raw data chunks.
sub put { my $self = shift; my $records = shift; my $buffer; foreach my $record (@$records) { my @pairs; foreach my $key (keys %$record) { if(ref $record->{$key} eq 'ARRAY') { foreach my $value (@{ $record->{ $key }) { push @pairs, "$key=$value"; } } else { my $value = $record->{$key}; push @pairs, "$key=$value"; } } $buffer .= join("&", @pairs) . "\n"; } return [ $buffer ]; }
A Session
Now that we have the ability to parse data, we need to get data from somewhere. Let's provide a listening socket on the network to catch incoming data. First we need a session.
Construction
The Session constructor defines a small state machine. The "inline_states" parameter creates this by using an explicit name to code mapping. The key of the hash above is the state name and the value is a code reference to execute when that event happens.
POE::Session->create( inline_states => { _start => \&start, _stop => sub {}, mystate => \&mystate, myotherstate => \&myotherstate, }, );
The package_states
parameter creates states using the name of
functions in a given package. In this example, the state name is the
same as the function name.
POE::Session->create( package_states => { MyPackage => [ 'function1', 'function2', ], }, );
It is also possible to specify different state names.
POE::Session->create( package_states => { MyPackage => { mystate => 'function1', myotherstate => 'function2', ], }, );
The object_states
parameter creates states using the name of methods
on a given object.
POE::Session->create( object_states => { $some_object => [ 'method1', 'method2' ], }, );
As with package states, it is possible to specify different state names.
POE::Session->create( object_states => { $some_object => { mystate => 'method1', myotherstate => 'method2', }, }, );
The two states _start
and _stop
are mandatory and session
construction will fail without them. _start
is called when the POE
environment is fully active. Use this state to set up any events or
timers that the session needs. _stop
is called as the final event in
the lifespan of a session. Use this to clean up any filehandles or other
objects that need to be explicitly closed or otherwise shutdown.
The Heap
Sessions also have a place to store internal data. This storage is called the heap and it can be allocated in the Session constructor.
POE::Session->create( # ... heap => { hostname => 'localhost', }, );
The heap is unique to the session and should be used to store all data that the session needs to operate. Some wheels will put data in the heap, though that is generally discouraged. Make sure to check the documentation on the wheels you are using and avoid name collisons.
Setup
We are going to plug POE::Wheel::SocketFactory
and
POE::Wheel::Readwrite
objects into our session, so we need several
states to handle their needs.
POE::Session->create( inline_states => { _start => \&start, factory_success => \&factory_success, client_input => \&client_input, fatal_error => sub { die "A fatal error occurred" }, _stop => sub {}, }, );
When the session starts up, we launch a POE::Wheel::SocketFactory
wheel. With the Reuse flag on, SocketFactory
will continuously listen on
the specified port and address, handing us events for each client.
sub start { $_[HEAP]->{factory} = POE::Wheel::SocketFactory->new( BindAddress => '127.0.0.1', BindPort => '31337', SuccessEvent => 'factory_success', FailureEvent => 'fatal_error', SocketProtocol => 'tcp', Reuse => 'on', ); }
Notice that wheels take the name of the states we created above as parameters. POE uses the word "Event" to signify the name and the word "State" to signify the code that is run.
When a client makes a connection, the SocketFactory
notifies the session
via the SuccessEvent
. It is our job to figure out what to do with the
filehandle that SocketFactory
built for us. In this case, we want
read/write functionality using the filter we built above.
POE::Wheel::ReadWrite
provides this functionality, including the
ability to plug in our filter. Each wheel-based event provides a unique
identifier so it is possible to handle more than one client per session.
The unique id passed to the SuccessEvent
identifies each client.
sub factory_success { my( $handle, $wheel_id ) = @_[ARG0, ARG1]; $_[HEAP]->{clients}->{ $wheel_id } = POE::Wheel::ReadWrite->new( Handle => $handle, Driver => POE::Driver::SysRW->new(), Filter => POE::Filter::SimpleQueryString->new(), InputEvent => 'client_input', ); }
POE::Wheel::ReadWrite
will take data from the incoming socket, run it
through our filter, and then give it to us via the InputEvent
we
provided in the constructor. What we do with the data is up to us. Let's
print it out and echo it back to the client.
sub client_input { my ($input, $wheel_id) = @_[ARG0, ARG1]; use Data::Dumper; print Dumper $input; $_[HEAP]->{clients}->{ $wheel_id }->put( $input ); }
Data::Dumper
handles printing out the structure for us. The put()
call sends our data through our filter and back out to the client. If
our algorithms are correct, we should get the same data back that we put
in.
sungo@nodens% telnet localhost 31337 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. foo=bar
The server prints out:
sungo@nodens% perl -Ilib examples/server.pl $VAR1 = { 'foo' => 'bar' };
And then echoes back to us:
foo=bar
We're in business!
A Component
That was a lot of code to get a simple TCP server up and running. Surely this can be simplified. POE itself comes to the rescue. POE ships with a component specifically designed to simplify TCP server creation. We can replace all that code above with a simple call to the component's constructor.
POE::Component::Server::TCP->new( Address => '127.0.0.1', Port => '31337', ClientFilter => "POE::Filter::SimpleQueryString", ClientInput => sub { my $input = $_[ARG0]; use Data::Dumper; print Dumper $input; $_[HEAP]->{client}->put($input); }
The downside is that Server::TCP
doesn't allow for argument passing to
the filter's constructor and we lose the flexibility of doing things by
hand. For a lot of situations, however, this component does the trick
quite nicely.
To release this code to the world, we need to make our own component. For the purpose of this example, we're going to wrap the smaller code above instead of the larger wheel-based example. There is no reason why you couldn't use the wheel-based code in your component, however.
package POE::Component::SimpleQueryString; use warnings; use strict; use vars qw($VERSION); $VERSION = '0.01'; use POE; use POE::Component::Server::TCP; use POE::Filter::SimpleQueryString; use Carp qw(croak); sub new { my $class = shift; my %args = @_; my $addr = delete $args{ListenAddr} or croak "ListenAddr required"; my $port = delete $args{ListenPort} or croak "ListenPort required"; my $input_event = delete $args{InputEvent} or croak "InputEvent required"; my $server = POE::Component::Server::TCP->new( Address => $addr, Port => $port, ClientInput => $input_event, ClientFilter => "POE::Filter::SimpleQueryString", ); return $server; } 1;
Now our users can just load up the component like so:
POE::Component::SimpleQueryString->new( ListenAddr => '127.0.0.1', ListenPort => '31337', InputEvent => sub { my $input = $_[ARG0]; use Data::Dumper; print Dumper $input; $_[HEAP]->{client}->put($input); }, );
Conclusion
I hope that this introduction to POE has piqued your interest. POE allows you to create powerful, event-driven programs that can be used in many corporate, and private environments.
POE is available on the CPAN (http://search.cpan.org/dist/POE) and has a rich, community-maintained website (http://poe.perl.org).
TPJ