/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include static const axis2_transport_receiver_ops_t amqp_receiver_ops = { axis2_amqp_receiver_init, axis2_amqp_receiver_start, axis2_amqp_receiver_get_reply_to_epr, axis2_amqp_receiver_get_conf_ctx, axis2_amqp_receiver_is_running, axis2_amqp_receiver_stop, axis2_amqp_receiver_free }; AXIS2_EXTERN axis2_transport_receiver_t* AXIS2_CALL axis2_amqp_receiver_create( const axutil_env_t* env, const axis2_char_t* repo, const axis2_char_t* qpid_broker_ip, int qpid_broker_port) { AXIS2_ENV_CHECK(env, NULL); axis2_amqp_receiver_resource_pack_t* receiver_resource_pack = NULL; receiver_resource_pack = (axis2_amqp_receiver_resource_pack_t*)AXIS2_MALLOC(env->allocator, sizeof(axis2_amqp_receiver_resource_pack_t)); if(!receiver_resource_pack) { AXIS2_ERROR_SET(env->error, AXIS2_ERROR_NO_MEMORY, AXIS2_FAILURE); return NULL; } receiver_resource_pack->receiver.ops = &amqp_receiver_ops; receiver_resource_pack->qpid_receiver = NULL; receiver_resource_pack->conf_ctx = NULL; receiver_resource_pack->conf_ctx_private = NULL; if(repo) { /** * 1. We first create a private conf ctx which is owned by this server * we only free this private conf context. We should never free the * receiver_impl->conf_ctx because it may be owned by any other object which * may lead to double free. * * 2. The Qpid broker IP and port are set in conf_ctx at two different places. * If the repo is specified, they are set here. Otherwise, they are set * in axis2_amqp_receiver_init method. */ axutil_property_t* property = NULL; const axis2_char_t* broker_ip = NULL; int* broker_port = (int*)AXIS2_MALLOC(env->allocator, sizeof(int)); *broker_port = AXIS2_QPID_NULL_CONF_INT; receiver_resource_pack->conf_ctx_private = axis2_build_conf_ctx(env, repo); if(!receiver_resource_pack->conf_ctx_private) { axis2_amqp_receiver_free((axis2_transport_receiver_t *)receiver_resource_pack, env); return NULL; } /* Set broker IP */ broker_ip = qpid_broker_ip ? qpid_broker_ip : AXIS2_QPID_DEFAULT_BROKER_IP; property = axutil_property_create_with_args(env, AXIS2_SCOPE_APPLICATION, 0, 0, (void*)broker_ip); axis2_conf_ctx_set_property(receiver_resource_pack->conf_ctx_private, env, AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_IP, property); /* Set broker port */ *broker_port = (qpid_broker_port != AXIS2_QPID_NULL_CONF_INT) ? qpid_broker_port : AXIS2_QPID_DEFAULT_BROKER_PORT; property = axutil_property_create_with_args(env, AXIS2_SCOPE_APPLICATION, 0, 0, (void*)broker_port); axis2_conf_ctx_set_property(receiver_resource_pack->conf_ctx_private, env, AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_PORT, property); receiver_resource_pack->conf_ctx = receiver_resource_pack->conf_ctx_private; } return &(receiver_resource_pack->receiver); } AXIS2_EXTERN axis2_status_t AXIS2_CALL axis2_amqp_receiver_init( axis2_transport_receiver_t* receiver, const axutil_env_t* env, axis2_conf_ctx_t* conf_ctx, axis2_transport_in_desc_t* in_desc) { axis2_amqp_receiver_resource_pack_t* receiver_resource_pack = NULL; axutil_property_t* property = NULL; const axis2_char_t* broker_ip = NULL; int* broker_port = (int*)AXIS2_MALLOC(env->allocator, sizeof(int)); *broker_port = AXIS2_QPID_NULL_CONF_INT; AXIS2_ENV_CHECK(env, AXIS2_FAILURE); receiver_resource_pack = AXIS2_AMQP_RECEIVER_TO_RESOURCE_PACK(receiver); receiver_resource_pack->conf_ctx = conf_ctx; /* Set broker IP */ broker_ip = axis2_amqp_util_get_in_desc_conf_value_string(in_desc, env, AXIS2_AMQP_CONF_QPID_BROKER_IP); if(!broker_ip) { broker_ip = AXIS2_QPID_DEFAULT_BROKER_IP; } property = axutil_property_create_with_args(env, AXIS2_SCOPE_APPLICATION, 0, 0, (void*)broker_ip); axis2_conf_ctx_set_property(receiver_resource_pack->conf_ctx, env, AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_IP, property); /* Set broker port */ *broker_port = axis2_amqp_util_get_in_desc_conf_value_int(in_desc, env, AXIS2_AMQP_CONF_QPID_BROKER_PORT); if(*broker_port == AXIS2_QPID_NULL_CONF_INT) { *broker_port = AXIS2_QPID_DEFAULT_BROKER_PORT; } property = axutil_property_create_with_args(env, AXIS2_SCOPE_APPLICATION, 0, 0, (void*)broker_port); axis2_conf_ctx_set_property(receiver_resource_pack->conf_ctx, env, AXIS2_AMQP_CONF_CTX_PROPERTY_BROKER_PORT, property); return AXIS2_SUCCESS; } AXIS2_EXTERN axis2_status_t AXIS2_CALL axis2_amqp_receiver_start( axis2_transport_receiver_t* receiver, const axutil_env_t* env) { AXIS2_ENV_CHECK(env, AXIS2_FAILURE); axis2_status_t status = AXIS2_FAILURE; axis2_amqp_receiver_resource_pack_t* amqp_receiver_resource_pack = NULL; axis2_qpid_receiver_resource_pack_t* qpid_receiver_resource_pack = NULL; amqp_receiver_resource_pack = AXIS2_AMQP_RECEIVER_TO_RESOURCE_PACK(receiver); /* Create Qpid Receiver */ qpid_receiver_resource_pack = axis2_qpid_receiver_create(env, amqp_receiver_resource_pack->conf_ctx); if(qpid_receiver_resource_pack) { amqp_receiver_resource_pack->qpid_receiver = qpid_receiver_resource_pack; status = axis2_qpid_receiver_start(qpid_receiver_resource_pack, env); } return status; } AXIS2_EXTERN axis2_endpoint_ref_t* AXIS2_CALL axis2_amqp_receiver_get_reply_to_epr( axis2_transport_receiver_t* receiver, const axutil_env_t* env, const axis2_char_t* svc_name) { return NULL; } AXIS2_EXTERN axis2_conf_ctx_t* AXIS2_CALL axis2_amqp_receiver_get_conf_ctx( axis2_transport_receiver_t* receiver, const axutil_env_t* env) { AXIS2_ENV_CHECK(env, NULL); return AXIS2_AMQP_RECEIVER_TO_RESOURCE_PACK(receiver)->conf_ctx; } AXIS2_EXTERN axis2_bool_t AXIS2_CALL axis2_amqp_receiver_is_running( axis2_transport_receiver_t* receiver, const axutil_env_t* env) { return AXIS2_TRUE; } AXIS2_EXTERN axis2_status_t AXIS2_CALL axis2_amqp_receiver_stop( axis2_transport_receiver_t* receiver, const axutil_env_t* env) { return AXIS2_SUCCESS; } AXIS2_EXTERN void AXIS2_CALL axis2_amqp_receiver_free( axis2_transport_receiver_t* receiver, const axutil_env_t* env) { AXIS2_ENV_CHECK(env, void); axis2_amqp_receiver_resource_pack_t* receiver_resource_pack = NULL; receiver_resource_pack = AXIS2_AMQP_RECEIVER_TO_RESOURCE_PACK(receiver); if(receiver_resource_pack->qpid_receiver) { axis2_qpid_receiver_free(receiver_resource_pack->qpid_receiver, env); receiver_resource_pack->qpid_receiver = NULL; } if(receiver_resource_pack->conf_ctx_private) { axis2_conf_ctx_free(receiver_resource_pack->conf_ctx_private, env); receiver_resource_pack->conf_ctx_private = NULL; } receiver_resource_pack->conf_ctx = NULL; /* Do not free this. It may be owned by some other object */ AXIS2_FREE(env->allocator, receiver_resource_pack); } /* Library Exports */ AXIS2_EXPORT int #ifndef AXIS2_STATIC_DEPLOY axis2_get_instance( #else axis2_amqp_receiver_get_instance( #endif struct axis2_transport_receiver** inst, const axutil_env_t* env) { int status = AXIS2_SUCCESS; *inst = axis2_amqp_receiver_create(env, NULL, NULL, AXIS2_QPID_NULL_CONF_INT); if(!(*inst)) { status = AXIS2_FAILURE; } return status; } AXIS2_EXPORT int #ifndef AXIS2_STATIC_DEPLOY axis2_remove_instance( #else axis2_amqp_receiver_remove_instance( #endif axis2_transport_receiver_t* inst, const axutil_env_t* env) { if(inst) { axis2_transport_receiver_free(inst, env); } return AXIS2_SUCCESS; }