[ Index ] |
PHP Cross Reference of Unnamed Project |
[Summary view] [Print] [Text view]
1 package DBD::Gofer::Transport::stream; 2 3 # $Id: stream.pm 10905 2008-03-10 22:01:04Z timbo $ 4 # 5 # Copyright (c) 2007, Tim Bunce, Ireland 6 # 7 # You may distribute under the terms of either the GNU General Public 8 # License or the Artistic License, as specified in the Perl README file. 9 10 use strict; 11 use warnings; 12 13 use Carp; 14 15 use base qw(DBD::Gofer::Transport::pipeone); 16 17 our $VERSION = sprintf("0.%06d", q$Revision: 10905 $ =~ /(\d+)/o); 18 19 __PACKAGE__->mk_accessors(qw( 20 go_persist 21 )); 22 23 my $persist_all = 5; 24 my %persist; 25 26 27 sub _connection_key { 28 my ($self) = @_; 29 return join "~", $self->go_url||"", @{ $self->go_perl || [] }; 30 } 31 32 33 sub _connection_get { 34 my ($self) = @_; 35 36 my $persist = $self->go_persist; # = 0 can force non-caching 37 $persist = $persist_all if not defined $persist; 38 my $key = ($persist) ? $self->_connection_key : ''; 39 if ($persist{$key} && $self->_connection_check($persist{$key})) { 40 $self->trace_msg("reusing persistent connection $key\n",0) if $self->trace >= 1; 41 return $persist{$key}; 42 } 43 44 my $connection = $self->_make_connection; 45 46 if ($key) { 47 %persist = () if keys %persist > $persist_all; # XXX quick hack to limit subprocesses 48 $persist{$key} = $connection; 49 } 50 51 return $connection; 52 } 53 54 55 sub _connection_check { 56 my ($self, $connection) = @_; 57 $connection ||= $self->connection_info; 58 my $pid = $connection->{pid}; 59 my $ok = (kill 0, $pid); 60 $self->trace_msg("_connection_check: $ok (pid $$)\n",0) if $self->trace; 61 return $ok; 62 } 63 64 65 sub _connection_kill { 66 my ($self) = @_; 67 my $connection = $self->connection_info; 68 my ($pid, $wfh, $rfh, $efh) = @{$connection}{qw(pid wfh rfh efh)}; 69 $self->trace_msg("_connection_kill: closing write handle\n",0) if $self->trace; 70 # closing the write file handle should be enough, generally 71 close $wfh; 72 # in future we may want to be more aggressive 73 #close $rfh; close $efh; kill 15, $pid 74 # but deleting from the persist cache... 75 delete $persist{ $self->_connection_key }; 76 # ... and removing the connection_info should suffice 77 $self->connection_info( undef ); 78 return; 79 } 80 81 82 sub _make_connection { 83 my ($self) = @_; 84 85 my $go_perl = $self->go_perl; 86 my $cmd = [ @$go_perl, qw(-MDBI::Gofer::Transport::stream -e run_stdio_hex)]; 87 88 #push @$cmd, "DBI_TRACE=2=/tmp/goferstream.log", "sh", "-c"; 89 if (my $url = $self->go_url) { 90 die "Only 'ssh:user\@host' style url supported by this transport" 91 unless $url =~ s/^ssh://; 92 my $ssh = $url; 93 my $setup_env = join "||", map { "source $_ 2>/dev/null" } qw(.bash_profile .bash_login .profile); 94 my $setup = $setup_env.q{; exec "$@"}; 95 # don't use $^X on remote system by default as it's possibly wrong 96 $cmd->[0] = 'perl' if "@$go_perl" eq $^X; 97 # -x not only 'Disables X11 forwarding' but also makes connections *much* faster 98 unshift @$cmd, qw(ssh -xq), split(' ', $ssh), qw(bash -c), $setup; 99 } 100 101 $self->trace_msg("new connection: @$cmd\n",0) if $self->trace; 102 103 # XXX add a handshake - some message from DBI::Gofer::Transport::stream that's 104 # sent as soon as it starts that we can wait for to report success - and soak up 105 # and report useful warnings etc from ssh before we get it? Increases latency though. 106 my $connection = $self->start_pipe_command($cmd); 107 return $connection; 108 } 109 110 111 sub transmit_request_by_transport { 112 my ($self, $request) = @_; 113 my $trace = $self->trace; 114 115 my $connection = $self->connection_info || do { 116 my $con = $self->_connection_get; 117 $self->connection_info( $con ); 118 $con; 119 }; 120 121 my $encoded_request = unpack("H*", $self->freeze_request($request)); 122 $encoded_request .= "\015\012"; 123 124 my $wfh = $connection->{wfh}; 125 $self->trace_msg(sprintf("transmit_request_by_transport: to fh %s fd%d\n", $wfh, fileno($wfh)),0) 126 if $trace >= 4; 127 128 # send frozen request 129 local $\; 130 print $wfh $encoded_request # autoflush enabled 131 or do { 132 # XXX should make new connection and retry 133 $self->_connection_kill; 134 die "Error sending request: $!"; 135 }; 136 $self->trace_msg("Request sent: $encoded_request\n",0) if $trace >= 4; 137 138 return; 139 } 140 141 142 sub receive_response_by_transport { 143 my $self = shift; 144 my $trace = $self->trace; 145 146 $self->trace_msg("receive_response_by_transport: awaiting response\n",0) if $trace >= 4; 147 my $connection = $self->connection_info || die; 148 my ($pid, $rfh, $efh, $cmd) = @{$connection}{qw(pid rfh efh cmd)}; 149 150 my $errno = 0; 151 my $encoded_response; 152 my $stderr_msg; 153 154 $self->read_response_from_fh( { 155 $efh => { 156 error => sub { warn "error reading response stderr: $!"; $errno||=$!; 1 }, 157 eof => sub { warn "eof reading efh" if $trace >= 4; 1 }, 158 read => sub { $stderr_msg .= $_; 0 }, 159 }, 160 $rfh => { 161 error => sub { warn "error reading response: $!"; $errno||=$!; 1 }, 162 eof => sub { warn "eof reading rfh" if $trace >= 4; 1 }, 163 read => sub { $encoded_response .= $_; ($encoded_response=~s/\015\012$//) ? 1 : 0 }, 164 }, 165 }); 166 167 # if we got no output on stdout at all then the command has 168 # probably exited, possibly with an error to stderr. 169 # Turn this situation into a reasonably useful DBI error. 170 if (not $encoded_response) { 171 my @msg; 172 push @msg, "error while reading response: $errno" if $errno; 173 if ($stderr_msg) { 174 chomp $stderr_msg; 175 push @msg, sprintf "error reported by \"%s\" (pid %d%s): %s", 176 $self->cmd_as_string, 177 $pid, ((kill 0, $pid) ? "" : ", exited"), 178 $stderr_msg; 179 } 180 die join(", ", "No response received", @msg)."\n"; 181 } 182 183 $self->trace_msg("Response received: $encoded_response\n",0) 184 if $trace >= 4; 185 186 $self->trace_msg("Gofer stream stderr message: $stderr_msg\n",0) 187 if $stderr_msg && $trace; 188 189 my $frozen_response = pack("H*", $encoded_response); 190 191 # XXX need to be able to detect and deal with corruption 192 my $response = $self->thaw_response($frozen_response); 193 194 if ($stderr_msg) { 195 # add stderr messages as warnings (for PrintWarn) 196 $response->add_err(0, $stderr_msg, undef, $trace) 197 # but ignore warning from old version of blib 198 unless $stderr_msg =~ /^Using .*blib/ && "@$cmd" =~ /-Mblib/; 199 } 200 201 return $response; 202 } 203 204 sub transport_timedout { 205 my $self = shift; 206 $self->_connection_kill; 207 return $self->SUPER::transport_timedout(@_); 208 } 209 210 1; 211 212 __END__ 213 214 =head1 NAME 215 216 DBD::Gofer::Transport::stream - DBD::Gofer transport for stdio streaming 217 218 =head1 SYNOPSIS 219 220 DBI->connect('dbi:Gofer:transport=stream;url=ssh:username@host.example.com;dsn=dbi:...',...) 221 222 or, enable by setting the DBI_AUTOPROXY environment variable: 223 224 export DBI_AUTOPROXY='dbi:Gofer:transport=stream;url=ssh:username@host.example.com' 225 226 =head1 DESCRIPTION 227 228 Without the C<url=> parameter it launches a subprocess as 229 230 perl -MDBI::Gofer::Transport::stream -e run_stdio_hex 231 232 and feeds requests into it and reads responses from it. But that's not very useful. 233 234 With a C<url=ssh:username@host.example.com> parameter it uses ssh to launch the subprocess 235 on a remote system. That's much more useful! 236 237 It gives you secure remote access to DBI databases on any system you can login to. 238 Using ssh also gives you optional compression and many other features (see the 239 ssh manual for how to configure that and many other options via ~/.ssh/config file). 240 241 The actual command invoked is something like: 242 243 ssh -xq ssh:username@host.example.com bash -c $setup $run 244 245 where $run is the command shown above, and $command is 246 247 . .bash_profile 2>/dev/null || . .bash_login 2>/dev/null || . .profile 2>/dev/null; exec "$@" 248 249 which is trying (in a limited and fairly unportable way) to setup the environment 250 (PATH, PERL5LIB etc) as it would be if you had logged in to that system. 251 252 The "C<perl>" used in the command will default to the value of $^X when not using ssh. 253 On most systems that's the full path to the perl that's currently executing. 254 255 256 =head1 PERSISTENCE 257 258 Currently gofer stream connections persist (remain connected) after all 259 database handles have been disconnected. This makes later connections in the 260 same process very fast. 261 262 Currently up to 5 different gofer stream connections (based on url) can 263 persist. If more than 5 are in the cache when a new connection is made then 264 the cache is cleared before adding the new connection. Simple but effective. 265 266 =head1 TO DO 267 268 Document go_perl attribute 269 270 Automatically reconnect (within reason) if there's a transport error. 271 272 Decide on default for persistent connection - on or off? limits? ttl? 273 274 =head1 AUTHOR 275 276 Tim Bunce, L<http://www.tim.bunce.name> 277 278 =head1 LICENCE AND COPYRIGHT 279 280 Copyright (c) 2007, Tim Bunce, Ireland. All rights reserved. 281 282 This module is free software; you can redistribute it and/or 283 modify it under the same terms as Perl itself. See L<perlartistic>. 284 285 =head1 SEE ALSO 286 287 L<DBD::Gofer::Transport::Base> 288 289 L<DBD::Gofer> 290 291 =cut
title
Description
Body
title
Description
Body
title
Description
Body
title
Body
Generated: Tue Mar 17 22:47:18 2015 | Cross-referenced by PHPXref 0.7.1 |