#---------------------------------------------------------------------- package DBIx::DataModel::Statement; #---------------------------------------------------------------------- # see POD doc at end of file use warnings; use strict; use Carp; use List::Util qw/min first/; use List::MoreUtils qw/firstval any/; use Scalar::Util qw/weaken refaddr reftype dualvar/; use Params::Validate qw/validate ARRAYREF HASHREF/; use POSIX qw/LONG_MAX/; use Acme::Damn qw/damn/; use Clone qw/clone/; use Try::Tiny; use DBIx::DataModel; use DBIx::DataModel::Meta::Utils; use namespace::clean; {no strict 'refs'; *CARP_NOT = \@DBIx::DataModel::CARP_NOT;} use overload # overload the stringification operator so that Devel::StackTrace is happy; # also useful to show the SQL (if in sqlized state) '""' => sub { my $self = shift; my $string = try {my ($sql, @bind) = $self->sql; __PACKAGE__ . "($sql // " . join(", ", @bind) . ")"; } || overload::StrVal($self); } ; # sequence of states. Stored as dualvars for both ordering and printing use constant { NEW => dualvar(1, "new" ), REFINED => dualvar(2, "refined" ), SQLIZED => dualvar(3, "sqlized" ), PREPARED => dualvar(4, "prepared"), EXECUTED => dualvar(5, "executed"), }; #---------------------------------------------------------------------- # PUBLIC METHODS #---------------------------------------------------------------------- sub new { my ($class, $connected_source, %other_args) = @_; # check $connected_source $connected_source && $connected_source->isa('DBIx::DataModel::ConnectedSource') or croak "invalid connected_source for DBIx::DataModel::Statement->new()"; # build the object my $self = bless {status => NEW, args => {}, pre_bound_params => {}, bound_params => [], connected_source => $connected_source}, $class; # add placeholder_regex my $prefix = $connected_source->schema->{placeholder_prefix}; $self->{placeholder_regex} = qr/^\Q$prefix\E(.+)/; # parse remaining args, if any $self->refine(%other_args) if %other_args; return $self; } # accessors DBIx::DataModel::Meta::Utils->define_readonly_accessors( __PACKAGE__, qw/connected_source status/, ); sub meta_source {shift->{connected_source}->meta_source} sub schema {shift->{connected_source}->schema} # THINK : not documented yet, is this method useful ? sub reset { my ($self, %other_args) = @_; my $new = (ref $self)->new($self->{connected_source}, %other_args); %$self = (%$new); return $self; } #---------------------------------------------------------------------- # PUBLIC METHODS IN RELATION WITH SELECT() #---------------------------------------------------------------------- sub sql { my ($self) = @_; $self->{status} >= SQLIZED or croak "can't call sql() when in status $self->{status}"; return wantarray ? ($self->{sql}, @{$self->{bound_params}}) : $self->{sql}; } sub bind { my ($self, @args) = @_; # arguments can be a list, a hashref or an arrayref if (@args == 1) { for (reftype($args[0]) || "") { /^HASH$/ and do {@args = %{$args[0]}; last;}; /^ARRAY$/ and do {my $i = 0; @args = map {($i++, $_)} @{$args[0]}; last}; #otherwise croak "unexpected arg type to bind()"; } } elsif (@args == 3) { # name => value, \%datatype (see L) # transform into ->bind($name => [$value, \%datatype]) @args = ($args[0], [$args[1], $args[2]]); } elsif (@args % 2 == 1) { croak "odd number of args to bind()"; } # do bind (different behaviour according to status) my %args = @args; if ($self->{status} < SQLIZED) { while (my ($k, $v) = each %args) { $self->{pre_bound_params}{$k} = $v; } } else { while (my ($k, $v) = each %args) { my $indices = $self->{param_indices}{$k} or next; # silently ignore that binding (named placeholder unused) $self->{bound_params}[$_] = $v foreach @$indices; } } return $self; } sub refine { my ($self, %more_args) = @_; $self->{status} <= REFINED or croak "can't refine() when in status $self->{status}"; $self->{status} = REFINED; my $args = $self->{args}; while (my ($k, $v) = each %more_args) { SWITCH: for ($k) { # -where : combine with previous 'where' clauses in same statement /^-where$/ and do { my $sqla = $self->schema->sql_abstract; $args->{-where} = $sqla->merge_conditions($args->{-where}, $v); last SWITCH; }; # -fetch : special select() on primary key /^-fetch$/ and do { # build a -where clause on primary key my $primary_key = ref($v) ? $v : [$v]; my @pk_columns = $self->meta_source->primary_key; @pk_columns or croak "fetch: no primary key in source " . $self->meta_source; @pk_columns == @$primary_key or croak sprintf "fetch from %s: primary key should have %d values", $self->meta_source, scalar(@pk_columns); List::MoreUtils::all {defined $_} @$primary_key or croak "fetch from " . $self->meta_source . ": " . "undefined val in primary key"; my %where = (); @where{@pk_columns} = @$primary_key; my $sqla = $self->schema->sql_abstract; $args->{-where} = $sqla->merge_conditions($args->{-where}, \%where); # want a single record as result $args->{-result_as} = "firstrow"; last SWITCH; }; # -columns : store in $self->{args}{-columns}; can restrict previous list /^-columns$/ and do { my @cols = ref $v ? @$v : ($v); if (my $old_cols = $args->{-columns}) { unless (@$old_cols == 1 && $old_cols->[0] eq '*' ) { foreach my $col (@cols) { any {$_ eq $col} @$old_cols or croak "can't restrict -columns on '$col' (was not in the) " . "previous -columns list"; } } } $args->{-columns} = \@cols; last SWITCH; }; # other args are just stored, will be used later /^-( order_by | group_by | having | for | union(?:_all)? | intersect | except | minus | result_as | post_SQL | pre_exec | post_exec | post_bless | limit | offset | page_size | page_index | column_types | prepare_attrs | dbi_prepare_method | _left_cols | where_on )$/x and do {$args->{$k} = $v; last SWITCH}; # otherwise croak "invalid arg : $k"; } # end SWITCH } # end while return $self; } sub sqlize { my ($self, @args) = @_; $self->{status} < SQLIZED or croak "can't sqlize() when in status $self->{status}"; # merge new args into $self->{args} $self->refine(@args) if @args; # shortcuts my $args = $self->{args}; my $meta_source = $self->meta_source; my $source_where = $meta_source->{where}; my $sql_abstract = $self->schema->sql_abstract; my $result_as = $args->{-result_as} || ""; # build arguments for SQL::Abstract::More $self->refine(-where => $source_where) if $source_where; my @args_to_copy = qw/-columns -where -union -union_all -intersect -except -minus -order_by -group_by -having -limit -offset -page_size -page_index/; my %sqla_args = (-from => clone($meta_source->db_from), -want_details => 1); $args->{$_} and $sqla_args{$_} = $args->{$_} for @args_to_copy; $sqla_args{-columns} ||= $meta_source->default_columns; $sqla_args{-limit} ||= 1 if $result_as eq 'firstrow' && $self->schema->autolimit_firstrow; # "-for" (e.g. "update", "read only") if ($result_as ne 'subquery') { if ($args->{-for}) { $sqla_args{-for} = $args->{-for}; } elsif (!exists $args->{-for}) { $sqla_args{-for} = $self->schema->select_implicitly_for; } } # EXPERIMENTAL: "where_on" if (my $where_on = $args->{-where_on}) { # retrieve components of the join my ($join_op, $first_table, @other_join_args) = @{$sqla_args{-from}}; $join_op eq '-join' or croak "datasource for '-where_on' was not a join"; my %by_dest_table = reverse @other_join_args; # insert additional conditions into appropriate places while (my ($table, $additional_cond) = each %$where_on) { my $join_cond = $by_dest_table{$table} or croak "-where_on => {'$table' => ..}: this table is not in the join"; $join_cond->{condition} = $sql_abstract->merge_conditions($join_cond->{condition}, $additional_cond); } # TODO: should be able to use paths and aliases as keys, instead of # database table names. # TOCHECK: is this stuff still compatible with the bind() method ? } # generate SQL my $sqla_result = $sql_abstract->select(%sqla_args); # maybe post-process the SQL if ($args->{-post_SQL}) { ($sqla_result->{sql}, @{$sqla_result->{bind}}) = $args->{-post_SQL}->($sqla_result->{sql}, @{$sqla_result->{bind}}); } # keep $sql / @bind / aliases in $self, and set new status $self->{bound_params} = $sqla_result->{bind}; $self->{$_} = $sqla_result->{$_} for qw/sql aliased_tables aliased_columns/; $self->{status} = SQLIZED; # analyze placeholders, and replace by pre_bound params if applicable if (my $regex = $self->{placeholder_regex}) { for (my $i = 0; $i < @{$self->{bound_params}}; $i++) { $self->{bound_params}[$i] =~ $regex and push @{$self->{param_indices}{$1}}, $i; } } $self->bind($self->{pre_bound_params}) if $self->{pre_bound_params}; # compute callback to apply to data rows my $callback = $self->{args}{-post_bless}; weaken(my $weak_self = $self); # weaken to avoid a circular ref in closure $self->{row_callback} = sub { my $row = shift; $weak_self->bless_from_DB($row); $callback->($row) if $callback; }; return $self; } sub prepare { my ($self, @args) = @_; my $meta_source = $self->meta_source; $self->sqlize(@args) if @args or $self->{status} < SQLIZED; $self->{status} == SQLIZED or croak "can't prepare() when in status $self->{status}"; # log the statement and bind values $self->schema->_debug("PREPARE $self->{sql} / @{$self->{bound_params}}"); # call the database my $dbh = $self->schema->dbh or croak "Schema has no dbh"; my $method = $self->{args}{-dbi_prepare_method} || $self->schema->dbi_prepare_method; my @prepare_args = ($self->{sql}); if (my $prepare_attrs = $self->{args}{-prepare_attrs}) { push @prepare_args, $prepare_attrs; } $self->{sth} = $dbh->$method(@prepare_args); # new status and return $self->{status} = PREPARED; return $self; } sub execute { my ($self, @bind_args) = @_; # if not prepared yet, prepare it $self->prepare if $self->{status} < PREPARED; # TODO: DON'T REMEMBER why the line below was here. Keep it around for a while ... push @bind_args, offset => $self->{offset} if $self->{offset}; $self->bind(@bind_args) if @bind_args; # shortcuts my $args = $self->{args}; my $sth = $self->{sth}; # previous row_count, row_num and reuse_row are no longer valid delete $self->{reuse_row}; delete $self->{row_count}; $self->{row_num} = $self->offset; # pre_exec callback $args->{-pre_exec}->($sth) if $args->{-pre_exec}; # check that all placeholders were properly bound to values my @unbound; while (my ($k, $indices) = each %{$self->{param_indices} || {}}) { exists $self->{bound_params}[$indices->[0]] or push @unbound, $k; } not @unbound or croak "unbound placeholders (probably a missing foreign key) : " . CORE::join(", ", @unbound); # bind parameters and execute my $sqla = $self->schema->sql_abstract; $sqla->bind_params($sth, @{$self->{bound_params}}); $sth->execute; # post_exec callback $args->{-post_exec}->($sth) if $args->{-post_exec}; $self->{status} = EXECUTED; return $self; } sub select { my $self = shift; $self->refine(@_) if @_; my $args = $self->{args}; # all combined args my $callbacks = CORE::join ", ", grep {exists $args->{$_}} qw/-pre_exec -post_exec -post_bless/; SWITCH: my ($result_as, @key_cols) = ref $args->{-result_as} ? @{$args->{-result_as}} : ($args->{-result_as} || "rows"); for ($result_as) { # CASE statement : the DBIx::DataModel::Statement object /^statement$/i and do { delete $self->{args}{-result_as}; return $self; }; # for all other cases, must first sqlize the statement $self->sqlize if $self->{status} < SQLIZED; # CASE sql : just return the SQL and bind values /^sql$/i and do { not $callbacks or croak "$callbacks incompatible with -result_as=>'sql'"; return $self->sql; }; # CASE subquery : return a ref to an arrayref with SQL and bind values /^subquery$/i and do { not $callbacks or croak "$callbacks incompatible with -result_as=>'subquery'"; my ($sql, @bind) = $self->sql; return \ ["($sql)", @bind]; }; # for all other cases, must first execute the statement $self->execute; # CASE sth : return the DBI statement handle /^sth$/i and do { not $args->{-post_bless} or croak "-post_bless incompatible with -result_as=>'sth'"; return $self->{sth}; }; # CASE rows : all data rows (this is the default) /^(rows|arrayref)$/i and return $self->all; # CASE firstrow : just the first row /^firstrow$/i and do { my $row = $self->next; $self->{sth}->finish; return $row; }; # CASE hashref : all data rows, put into a hashref /^hashref$/i and do { @key_cols or @key_cols = $self->meta_source->primary_key or croak "-result_as=>'hashref' impossible: no primary key"; my %hash; while (my $row = $self->next) { my @key; foreach my $col (@key_cols) { my $val = $row->{$col}; $val = '' if not defined $val; # $val might be 0, so no '||' push @key, $val; } my $last_key_item = pop @key; my $node = \%hash; $node = $node->{$_} ||= {} foreach @key; $node->{$last_key_item} = $row; } return \%hash; }; # CASE fast_statement : creates a reusable row /^fast[-_]statement$/i and do { $self->_build_reuse_row; return $self; }; # CASE flat_arrayref : flattened columns from each row /^flat(?:_array(?:ref)?)?$/ and do { $self->_build_reuse_row; my @vals; my $hash_key_name = $self->{sth}{FetchHashKeyName} || 'NAME'; my $cols = $self->{sth}{$hash_key_name}; while (my $row = $self->next) { push @vals, @{$row}{@$cols}; } return \@vals; }; # OTHERWISE croak "unknown -result_as value: $_"; } } sub row_count { my ($self) = @_; if (! exists $self->{row_count}) { $self->sqlize if $self->{status} < SQLIZED; my ($sql, @bind) = $self->sql; # get syntax used for LIMIT clauses ... my $sqla = $self->schema->sql_abstract; my ($limit_sql, undef, undef) = $sqla->limit_offset(0, 0); $limit_sql =~ s/([()?*])/\\$1/g; # ...and use it to remove the LIMIT clause and associated bind vals, if any if ($limit_sql =~ /ROWNUM/) { # special case for Oracle syntax, complex ... # see source code of SQL::Abstract::More $limit_sql =~ s/%s/(.*)/; if ($sql =~ s/^$limit_sql/$1/) { splice @bind, -2; } } elsif ($sql =~ s[\b$limit_sql][]i) { # regular LIMIT/OFFSET syntaxes splice @bind, -2; } # select COUNT(*) instead of initial columns if ($sql =~ /\b(UNION|INTERSECT|MINUS|EXCEPT)\b/) { $sql = "SELECT COUNT(*) FROM ( $sql )"; } else { $sql =~ s[^SELECT\b.*?\bFROM\b][SELECT COUNT(*) FROM]i or croak "can't count rows from sql: $sql"; } # log the statement and bind values $self->schema->_debug("PREPARE $sql / @bind"); # call the database my $dbh = $self->schema->dbh or croak "Schema has no dbh"; my $method = $self->schema->dbi_prepare_method; my $sth = $dbh->$method($sql); $sth->execute(@bind); ($self->{row_count}) = $sth->fetchrow_array; } return $self->{row_count}; } sub row_num { my ($self) = @_; return $self->{row_num}; } sub next { my ($self, $n_rows) = @_; $self->execute if $self->{status} < EXECUTED; my $sth = $self->{sth} or croak "absent sth in statement"; my $callback = $self->{row_callback} or croak "absent callback in statement"; if (not defined $n_rows) { # if user wants a single row # fetch a single record, either into the reusable row, or into a fresh hash my $row = $self->{reuse_row} ? ($sth->fetch ? $self->{reuse_row} : undef) : $sth->fetchrow_hashref; if ($row) { $callback->($row); $self->{row_num} +=1; } return $row; } else { # if user wants an arrayref of size $n_rows $n_rows > 0 or croak "->next() : invalid argument, $n_rows"; not $self->{reuse_row} or croak "reusable row, cannot retrieve several"; my @rows; while ($n_rows--) { my $row = $sth->fetchrow_hashref or last; push @rows, $row; } $callback->($_) foreach @rows; $self->{row_num} += @rows; return \@rows; } } sub all { my ($self) = @_; # just call next() with a huge number return $self->next(POSIX::LONG_MAX); } sub page_size { shift->{args}{-page_size} || POSIX::LONG_MAX } sub page_index { shift->{args}{-page_index} || 1 } sub offset { shift->{offset} || 0 } sub page_count { my ($self) = @_; my $row_count = $self->row_count or return 0; my $page_size = $self->page_size || 1; return int(($row_count - 1) / $page_size) + 1; } sub goto_page { my ($self, $page_index) = @_; # if negative index, count down from last page $page_index += $self->page_count + 1 if $page_index < 0; $page_index >= 1 or croak "illegal page_index: $page_index"; $self->{page_index} = $page_index; $self->{offset} = ($page_index - 1) * $self->page_size; $self->execute unless $self->{row_num} == $self->{offset}; return $self; } sub shift_pages { my ($self, $delta) = @_; my $page_index = $self->page_index + $delta; $page_index >= 1 or croak "illegal page index: $page_index"; $self->goto_page($page_index); } sub next_page { my ($self) = @_; $self->shift_pages(1); } sub page_boundaries { my ($self) = @_; my $first = $self->offset + 1; my $last = min($self->row_count, $first + $self->page_size - 1); return ($first, $last); } sub page_rows { my ($self) = @_; return $self->next($self->page_size); } sub bless_from_DB { my ($self, $row) = @_; # inject ref to $schema if in multi-schema mode $row->{__schema} = $self->schema unless $self->schema->{is_singleton}; # bless into appropriate class bless $row, $self->meta_source->class; # apply handlers $self->{from_DB_handlers} or $self->_compute_from_DB_handlers; while (my ($column_name, $handler) = each %{$self->{from_DB_handlers}}) { exists $row->{$column_name} and $handler->($row->{$column_name}, $row, $column_name, 'from_DB'); } return $row; } #---------------------------------------------------------------------- # PRIVATE METHODS IN RELATION WITH SELECT() #---------------------------------------------------------------------- sub _build_reuse_row { my ($self) = @_; $self->{status} == EXECUTED or croak "cannot _build_reuse_row() when in state $self->{status}"; # create a reusable hash and bind_columns to it (see L) my %row; my $hash_key_name = $self->{sth}{FetchHashKeyName} || 'NAME'; $self->{sth}->bind_columns(\(@row{@{$self->{sth}{$hash_key_name}}})); $self->{reuse_row} = \%row; } sub _compute_from_DB_handlers { my ($self) = @_; my $meta_source = $self->meta_source; my $meta_schema = $self->schema->metadm; my %handlers = $meta_source->_consolidate_hash('column_handlers'); my %aliased_tables = $meta_source->aliased_tables; # iterate over aliased_columns while (my ($alias, $column) = each %{$self->{aliased_columns} || {}}) { my $table_name; $column =~ s/^(.+)\.// and $table_name = $1; if (!$table_name) { $handlers{$alias} = $handlers{$column}; } else { $table_name = $aliased_tables{$table_name} || $table_name; my $table = $meta_schema->table($table_name) || firstval {($_->{db_name} || '') eq $table_name} ($meta_source, $meta_source->ancestors) # THINK: might perform a case-insensitive search # (as second pass) or croak "unknown table name: $table_name"; $handlers{$alias} = $table->{column_handlers}->{$column}; } } # handlers may be overridden from args{-column_types} if (my $col_types = $self->{args}{-column_types}) { while (my ($type_name, $columns) = each %$col_types) { ref $columns or $columns = [$columns]; my $type = $self->schema->metadm->type($type_name) or croak "no such column type: $type_name"; $handlers{$_} = $type->{handlers} foreach @$columns; } } # just keep the "from_DB" handlers my $from_DB_handlers = {}; while (my ($column, $col_handlers) = each %handlers) { my $from_DB_handler = $col_handlers->{from_DB} or next; $from_DB_handlers->{$column} = $from_DB_handler; } $self->{from_DB_handlers} = $from_DB_handlers; return $self; } 1; # End of DBIx::DataModel::Statement __END__ =head1 NAME DBIx::DataModel::Statement - DBIx::DataModel statement objects =head1 DESCRIPTION The purpose of a I object is to retrieve rows from the database and bless them as objects of appropriate classes. Internally the statement builds and then encapsulates a C statement handle (sth). The design principles for statements are described in the L section of the manual (purpose, lifecycle, etc.). =head1 METHODS =head2 new my $statement = DBIx::DataModel::Statement->new($connected_source, %options); This is the statement constructor; C<$connected_source> is an instance of L. If present, C<%options> are delegated to the L method. Explicit calls to the statement constructor are exceptional; the usual way to create a statement is through L. =head1 PRIVATE METHOD NAMES The following methods or functions are used internally by this module and should be considered as reserved names, not to be redefined in subclasses : =over =item _bless_from_DB =item _compute_from_DB_handlers =back