#define PERL_NO_GET_CONTEXT #include "EXTERN.h" #include "perl.h" #include "XSUB.h" #include "ppport.h" #include #include #include #include #define DEBUG 0 #include "xshelper/xshelper.h" #include "nlws.h" #include "nlws_frame.h" #include "nlws_courier.h" #include "nlws_perl_loop.h" #include "nlws_context.h" #include "nlws_logger.h" #define WARN_DESTROY_AT_DESTRUCT(sv) \ warn("Destroying %" SVf " at global destruction!\n", sv); #if DEBUG //#include #define LOG_FUNC fprintf(stderr, "%s\n", __func__) #else #define LOG_FUNC #endif #if defined(LWS_WITHOUT_EXTENSIONS) # define NLWS_LWS_HAS_EXTENSIONS false #else # define NLWS_LWS_HAS_EXTENSIONS true #endif #ifdef NLWS_LWS_HAS_PMD #define _LWS_HAS_PMD 1 #else #define _LWS_HAS_PMD 0 #endif #define WS_CLOSE_IS_FAILURE(code) (code != LWS_CLOSE_STATUS_NOSTATUS && code != LWS_CLOSE_STATUS_NORMAL) typedef struct { SV* courier_sv; } pause_t; static inline void _finish_deferred_sv (pTHX_ SV* loop_sv, SV** deferred_svp, const char* methname, SV* payload) { if (DEBUG) warn("finishing deferred (payload=%p)\n", payload); if (!*deferred_svp) croak("Can’t %s(); already finished!", methname); SV* deferred_sv = *deferred_svp; // The deferred’s callbacks might execute synchronously and also // might depend on the referent pointer being NULL as an indicator // that the deferred was already finished. // *deferred_svp = NULL; SV* deferred_args[] = { newSVsv(deferred_sv), newSVpv(methname, 0), payload, NULL, }; SvREFCNT_dec(deferred_sv); xsh_call_object_method_void( aTHX_ loop_sv, "schedule_destroy_and_finish", deferred_args ); } SV* _create_err_obj (pTHX_ const char* type, unsigned argscount, ...) { va_list args; va_start(args, argscount); SV* create_args[argscount+2]; create_args[0] = newSVpv(type, 0); create_args[argscount+1] = NULL; for (unsigned a=0; aperlobj, &my_perl_context->done_d, deferred_method, promise_value ); } void _on_ws_error (pTHX_ my_perl_context_t* my_perl_context, nlws_abstract_loop_t* myloop_p, size_t reasonlen, const char* reason) { LOG_FUNC; SV** deferred_svp = &my_perl_context->done_d; SV* err_obj = _create_err_obj( aTHX_ "ConnectionFailed", 1, newSVpvn(reason, reasonlen) ); _finish_deferred_sv( aTHX_ myloop_p->perlobj, deferred_svp, "reject", err_obj ); } void _on_ws_message(pTHX_ my_perl_context_t* my_perl_context, SV* msgsv) { courier_t* courier = my_perl_context->courier; // Because of the assert() below this initialization isn’t needed, // but some compilers aren’t smart enough to realize that. unsigned cbcount = 0; SV** cbs; switch (my_perl_context->message_type) { case NET_LWS_MESSAGE_TYPE_TEXT: cbcount = courier->on_text_count; cbs = courier->on_text; break; case NET_LWS_MESSAGE_TYPE_BINARY: cbcount = courier->on_binary_count; cbs = courier->on_binary; break; default: assert(0); } SV* cbargs[] = { NULL, NULL }; for (unsigned c=0; c no context given; skipping ...\n"); return 0; } PERL_CONTEXT_FROM_STRUCT(my_perl_context); switch (reason) { case LWS_CALLBACK_WSI_DESTROY: if (my_perl_context->courier_sv) { SvREFCNT_dec(my_perl_context->courier_sv); } break; case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: { unsigned char **p = (unsigned char **)in; unsigned char *end = (*p) + len; AV* headers_av = (AV*) SvRV(my_perl_context->headers_ar); int headers_len = 1 + av_top_index(headers_av); STRLEN valuelen; SV** key; SV** value; int failed = 0; for (int h=0; hheaders_ar); if (failed) return -1; } break; case LWS_CALLBACK_CLIENT_ESTABLISHED: { courier_t* courier = nlws_create_courier(aTHX_ wsi); my_perl_context->courier = courier; SV* courier_sv = xsh_ptr_to_svrv(courier, gv_stashpv(COURIER_CLASS, FALSE)); my_perl_context->courier_sv = courier_sv; SV* args[] = { newSVsv(courier_sv), NULL, }; xsh_call_sv_trap_void( my_perl_context->on_ready, args, "on_ready" ); SvREFCNT_dec(my_perl_context->on_ready); } break; case LWS_CALLBACK_CLIENT_WRITEABLE: { courier_t* courier = my_perl_context->courier; // Idea taken from LWS’s lws-minimal-client-echo demo: // permessage-deflate requires that we forgo consuming the // item from the ring buffer until the next writeable. // if (courier->consume_pending_count) { courier->consume_pending_count -= lws_ring_consume(courier->ring, NULL, NULL, courier->consume_pending_count); } const frame_t *frame_p = lws_ring_get_element(courier->ring, NULL); if (frame_p) { int wrote = lws_write( wsi, LWS_PRE + frame_p->pre_plus_payload, frame_p->len, frame_p->flags ); if (wrote < (int)frame_p->len) { warn("ERROR %d while writing to WebSocket!", wrote); return -1; } courier->consume_pending_count++; lws_callback_on_writable(wsi); } // Don’t close until we’ve flushed the buffer: else if (courier->close_requested) { if (courier->close_status != LWS_CLOSE_STATUS_NOSTATUS) { lws_close_reason( wsi, courier->close_status, courier->close_reason, courier->close_reason_length ); } return -1; } } break; case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: { nlws_abstract_loop_t* myloop_p = (nlws_abstract_loop_t*) lws_evlib_wsi_to_evlib_pt(wsi); _on_ws_error(aTHX_ my_perl_context, myloop_p, len, in); } break; case LWS_CALLBACK_CLIENT_RECEIVE: { if (lws_is_first_fragment(wsi)) { my_perl_context->message_type = lws_frame_is_binary(wsi) ? NET_LWS_MESSAGE_TYPE_BINARY : NET_LWS_MESSAGE_TYPE_TEXT; // In this (generally prevalent) case we can create our SV // directly from the incoming frame. if (lws_is_final_fragment(wsi)) { _on_ws_message(aTHX_ my_perl_context, newSVpvn_flags(in, len, lws_frame_is_binary(wsi) ? 0 : SVf_UTF8)); break; } my_perl_context->content_length = len; } else { my_perl_context->content_length += len; } Renew(my_perl_context->message_content, my_perl_context->content_length, char); Copy( in, my_perl_context->message_content + my_perl_context->content_length - len, len, char ); if (lws_is_final_fragment(wsi)) { SV* msgsv = newSVpvn_flags( my_perl_context->message_content, my_perl_context->content_length, my_perl_context->message_type == NET_LWS_MESSAGE_TYPE_TEXT ? SVf_UTF8 : 0 ); _on_ws_message(aTHX_ my_perl_context, msgsv ); } } break; case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: { courier_t* courier = my_perl_context->courier; courier->close_status = ntohs( *(uint16_t *) in ); courier->close_reason_length = len - sizeof(uint16_t); memcpy(courier->close_reason, sizeof(uint16_t) + in, courier->close_reason_length); } break; case LWS_CALLBACK_CLIENT_CLOSED: { courier_t* courier = my_perl_context->courier; nlws_abstract_loop_t* myloop_p = (nlws_abstract_loop_t*) lws_evlib_wsi_to_evlib_pt(wsi); _on_ws_close(aTHX_ my_perl_context, myloop_p, courier->close_status, courier->close_reason_length, courier->close_reason ); } break; default: break; } return 0; } void _courier_send( pTHX_ courier_t* courier, U8* buf, STRLEN len, enum lws_write_protocol protocol ) { frame_t frame = { .len = len, .flags = lws_write_ws_flags(protocol, 1, 1), }; Newx(frame.pre_plus_payload, len + LWS_PRE, U8); Copy(buf, LWS_PRE + frame.pre_plus_payload, len, U8); if (!lws_ring_insert( courier->ring, &frame, 1 )) { nlws_destroy_frame(&frame); size_t count = lws_ring_get_count_free_elements(courier->ring); croak("Failed to add message to ring buffer! (%zu ring nodes free)", count); } lws_callback_on_writable(courier->wsi); } static inline void _lws_service_fd (pTHX_ UV lws_context_uv, int fd, short event) { uintptr_t lws_context_int = lws_context_uv; struct lws_context *context = (void *) lws_context_int; struct lws_pollfd pollfd = { .fd = fd, .events = event, .revents = event, }; lws_service_fd(context, &pollfd); } const struct lws_protocols wsclient_protocols[] = { { .name = NET_LWS_LOCAL_PROTOCOL_NAME, .callback = net_lws_wsclient_callback, .per_session_data_size = sizeof(void*), .rx_buffer_size = 0, }, { NULL } }; void _populate_extensions (pTHX_ struct lws_extension* extensions, AV* compressions_av) { #if _LWS_HAS_PMD SSize_t compressions_len = 1 + av_top_index(compressions_av); for (SSize_t c=0; c= (1 << LLL_COUNT))) { croak("%s: Invalid level: %" IVdf, xsh_PL_package, level); } lws_set_log_level(level, NULL); # ---------------------------------------------------------------------- # Privates: void _lws_service_fd_read( UV lws_context_uv, int fd ) CODE: _lws_service_fd(aTHX_ lws_context_uv, fd, POLLIN); void _lws_service_fd_write( UV lws_context_uv, int fd ) CODE: _lws_service_fd(aTHX_ lws_context_uv, fd, POLLOUT); int _get_timeout( UV lws_context_uv ) CODE: uintptr_t lws_context_int = lws_context_uv; struct lws_context *context = (void *) lws_context_int; RETVAL = lws_service_adjust_timeout( context, DEFAULT_POLL_TIMEOUT, 0 ); OUTPUT: RETVAL void _lws_context_destroy( UV lws_context_uv ) CODE: lws_context_destroy( (struct lws_context*) lws_context_uv ); MODULE = Net::Libwebsockets PACKAGE = Net::Libwebsockets::WebSocket::Client PROTOTYPES: DISABLE void _new (SV* hostname, int port, SV* path, SV* compression_sv, SV* subprotocols_sv, SV* headers_ar, int tls_opts, unsigned ping_interval, unsigned ping_timeout, SV* loop_obj, SV* done_d, SV* on_ready_sv, SV* logger_obj) CODE: assert(SvROK(compression_sv)); assert(SVt_PVAV == SvTYPE(SvRV(compression_sv))); AV* compressions_av = (AV*) SvRV(compression_sv); SSize_t compressions_len = 1 + av_top_index(compressions_av); struct lws_extension* extensions_p; if (NLWS_LWS_HAS_EXTENSIONS) { Newxz(extensions_p, 1 + compressions_len, struct lws_extension); _populate_extensions(aTHX_ extensions_p, compressions_av); } else { extensions_p = NULL; } lws_log_cx_t* log_cx; if (logger_obj && SvOK(logger_obj)) { log_cx = xsh_svrv_to_ptr(logger_obj); } else { log_cx = NULL; } // This struct gets copied; we defer bumping loop_obj’s refcount // until that time. See nlws_perl_loop.c. nlws_abstract_loop_t abstract_loop = { PERL_CONTEXT_IN_STRUCT .perlobj = loop_obj, }; struct lws_context_creation_info info = { .options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT | LWS_SERVER_OPTION_VALIDATE_UTF8, .extensions = extensions_p, .event_lib_custom = &evlib_custom, .foreign_loops = (void *[]) { &abstract_loop, }, .port = CONTEXT_PORT_NO_LISTEN, /* we do not run any server */ .protocols = wsclient_protocols, // Ref-counting of logger_obj happens via the logger’s // own reference count. .log_cx = log_cx, }; // This function never croaks; it only rejects the promise. // We thus bump the promise’s refcount here. // SvREFCNT_inc(done_d); struct lws_context *context = lws_create_context(&info); if (!context) { if (extensions_p) Safefree(extensions_p); _finish_deferred_sv( aTHX_ loop_obj, &done_d, "reject", _create_err_obj( aTHX_ "General", 1, newSVpvs("lws_create_context failed") ) ); return; } my_perl_context_t* my_perl_context; Newxz(my_perl_context, 1, my_perl_context_t); *my_perl_context = (my_perl_context_t) { PERL_CONTEXT_IN_STRUCT .pid = getpid(), .extensions = extensions_p, .courier = NULL, .courier_sv = NULL, // We bump the refcounts of these below: .on_ready = on_ready_sv, .done_d = done_d, .headers_ar = headers_ar, .logger_obj = logger_obj, .lws_retry = (lws_retry_bo_t) { .retry_ms_table_count = 0, .conceal_count = 0, .secs_since_valid_ping = ping_interval, .secs_since_valid_hangup = ping_timeout, }, }; const char* hostname_str = xsh_sv_to_str(hostname); struct lws_client_connect_info client = { .context = context, .port = port, .address = hostname_str, .path = xsh_sv_to_str(path), .host = hostname_str, .origin = hostname_str, .ssl_connection = tls_opts, .retry_and_idle_policy = &my_perl_context->lws_retry, // The callback’s `user`: .userdata = my_perl_context, .protocol = SvOK(subprotocols_sv) ? xsh_sv_to_str( subprotocols_sv) : NULL, }; if (!lws_client_connect_via_info(&client)) { if (extensions_p) Safefree(extensions_p); lws_context_destroy(context); return; } SvREFCNT_inc(on_ready_sv); SvREFCNT_inc(headers_ar); # ---------------------------------------------------------------------- MODULE = Net::Libwebsockets PACKAGE = Net::Libwebsockets::WebSocket::Pause PROTOTYPES: DISABLE void DESTROY (SV* self_sv) CODE: pause_t* my_pause = xsh_svrv_to_ptr(self_sv); courier_t* courier = xsh_svrv_to_ptr(my_pause->courier_sv); courier->pauses--; if (!courier->pauses) { lws_rx_flow_control(courier->wsi, 1); } SvREFCNT_dec(my_pause->courier_sv); Safefree(my_pause); # ---------------------------------------------------------------------- MODULE = Net::Libwebsockets PACKAGE = Net::Libwebsockets::WebSocket::Courier PROTOTYPES: DISABLE void on_text (SV* self_sv, SV* cbref) CODE: courier_t* courier = xsh_svrv_to_ptr(self_sv); SvREFCNT_inc(cbref); courier->on_text_count++; Renew(courier->on_text, courier->on_text_count, SV*); courier->on_text[courier->on_text_count - 1] = cbref; void on_binary (SV* self_sv, SV* cbref) CODE: courier_t* courier = xsh_svrv_to_ptr(self_sv); SvREFCNT_inc(cbref); courier->on_binary_count++; Renew(courier->on_binary, courier->on_binary_count, SV*); courier->on_binary[courier->on_binary_count - 1] = cbref; void send_text (SV* self_sv, SV* payload_sv) CODE: courier_t* courier = xsh_svrv_to_ptr(self_sv); STRLEN len; U8* buf = (U8*) SvPVutf8(payload_sv, len); _courier_send(aTHX_ courier, buf, len, LWS_WRITE_TEXT); void send_binary (SV* self_sv, SV* payload_sv) CODE: courier_t* courier = xsh_svrv_to_ptr(self_sv); STRLEN len; U8* buf = (U8*) SvPVbyte(payload_sv, len); _courier_send(aTHX_ courier, buf, len, LWS_WRITE_BINARY); SV* pause (SV* self_sv) CODE: if (GIMME_V == G_VOID) croak("Don’t call pause() in void context!"); courier_t* courier = xsh_svrv_to_ptr(self_sv); pause_t* my_pause; Newx(my_pause, 1, pause_t); my_pause->courier_sv = self_sv; SvREFCNT_inc(self_sv); if (!courier->pauses) { lws_rx_flow_control(courier->wsi, 0); } courier->pauses++; RETVAL = xsh_ptr_to_svrv(my_pause, gv_stashpv(PAUSE_CLASS, FALSE)); OUTPUT: RETVAL void close (SV* self_sv, U16 code=LWS_CLOSE_STATUS_NOSTATUS, SV* reason_sv=NULL) CODE: courier_t* courier = xsh_svrv_to_ptr(self_sv); if (reason_sv && SvOK(reason_sv)) { U8* reason = (U8*) SvPVutf8(reason_sv, courier->close_reason_length); if (courier->close_reason_length > MAX_CLOSE_REASON_LENGTH) { warn("Truncating %zu-byte close reason (%.*s) to %d bytes …", courier->close_reason_length, (int) courier->close_reason_length, reason, MAX_CLOSE_REASON_LENGTH); courier->close_reason_length = MAX_CLOSE_REASON_LENGTH; } memcpy(courier->close_reason, reason, courier->close_reason_length); } else { courier->close_reason_length = 0; } courier->close_requested = true; courier->close_status = code; // Force a writable callback, which will trigger our close. lws_callback_on_writable(courier->wsi); void DESTROY (SV* self_sv) CODE: courier_t* courier = xsh_svrv_to_ptr(self_sv); if (IS_GLOBAL_DESTRUCTION && (getpid() == courier->pid)) { WARN_DESTROY_AT_DESTRUCT(self_sv); } nlws_destroy_courier(aTHX_ courier); # ---------------------------------------------------------------------- MODULE = Net::Libwebsockets PACKAGE = Net::Libwebsockets::Logger PROTOTYPES: DISABLE SV* _new (SV* classname_sv, SV* level_sv, SV* callback) CODE: U32 level; if (SvOK(level_sv)) { level = xsh_sv_to_uv(level_sv); } else { level = nlws_get_global_lwsl_level(); } lws_log_cx_t* my_logger_p; Newx(my_logger_p, 1, lws_log_cx_t); SV* cb_or_null = (callback && SvOK(callback)) ? SvREFCNT_inc(callback) : NULL; nlws_logger_opaque_t* opaque; Newx(opaque, 1, nlws_logger_opaque_t); SV* self_sv = xsh_ptr_to_svrv(my_logger_p, gv_stashpv(SvPVbyte_nolen(classname_sv), FALSE)); *opaque = (nlws_logger_opaque_t) { PERL_CONTEXT_IN_STRUCT .pid = getpid(), .perlobj = self_sv, }; *my_logger_p = (lws_log_cx_t) { .lll_flags = level, .refcount_cb = nlws_logger_on_refcount_change, .opaque = opaque, }; if (cb_or_null) { opaque->callback = cb_or_null; my_logger_p->lll_flags |= LLLF_LOG_CONTEXT_AWARE; my_logger_p->u.emit_cx = nlws_logger_emit; } else { my_logger_p->u.emit = lwsl_emit_stderr; } RETVAL = self_sv; OUTPUT: RETVAL void DESTROY (SV* self_sv) CODE: lws_log_cx_t* my_logger_p = xsh_svrv_to_ptr(self_sv); if (my_logger_p->opaque) { nlws_logger_opaque_t* opaque = my_logger_p->opaque; if (IS_GLOBAL_DESTRUCTION && (getpid() == opaque->pid)) { WARN_DESTROY_AT_DESTRUCT(self_sv); } if (opaque->callback) SvREFCNT_dec(opaque->callback); Safefree(opaque); } Safefree(my_logger_p);