From 30cd8a7eba3f4e9e7c7cab492cd384af0174606c Mon Sep 17 00:00:00 2001 From: nandika Date: Wed, 18 Aug 2010 14:39:59 +0000 Subject: license added git-svn-id: http://svn.apache.org/repos/asf/axis/axis2/c/core/trunk@986708 13f79535-47bb-0310-9956-ffa450edef68 --- .../receiver/qpid_receiver/axis2_qpid_receiver.cpp | 334 ++++++++++----------- 1 file changed, 167 insertions(+), 167 deletions(-) (limited to 'src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver.cpp') diff --git a/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver.cpp b/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver.cpp index 18d3f01..7ccedd8 100644 --- a/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver.cpp +++ b/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver.cpp @@ -1,167 +1,167 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -Axis2QpidReceiver::Axis2QpidReceiver( - const axutil_env_t* env, - axis2_conf_ctx_t* conf_ctx) -{ - this->env = env; - this->conf_ctx = conf_ctx; -} - -Axis2QpidReceiver::~Axis2QpidReceiver( - void) -{ -} - -bool -Axis2QpidReceiver::start( - void) -{ - if(!conf_ctx) - return false; - - Connection connection; - axis2_bool_t serverSide = AXIS2_TRUE; - - serverSide = axis2_amqp_util_conf_ctx_get_server_side(conf_ctx, env); - - while(true) - { - try - { - std::list queueNameList; - string qpidBrokerIP = axis2_amqp_util_conf_ctx_get_qpid_broker_ip(conf_ctx, env); - int qpidBrokerPort = axis2_amqp_util_conf_ctx_get_qpid_broker_port(conf_ctx, env); - - /* Check if Client Side and Resolve Dynamic Queue Name */ - if(serverSide == AXIS2_TRUE) /* Server side */ - { - std::cout << "Connecting to Qpid Broker on " << qpidBrokerIP << ":" - << qpidBrokerPort << " ... "; - } - - /* Create Connection to Qpid Broker */ - connection.open(qpidBrokerIP, qpidBrokerPort); - - if(serverSide == AXIS2_TRUE) /* Server side */ - { - /* Create queue for each service. Queue name is equal to service name */ - axis2_conf_t* conf = axis2_conf_ctx_get_conf(conf_ctx, env); - if(!conf) - return false; - - axutil_hash_t* serviceMap = axis2_conf_get_all_svcs(conf, env); - if(!serviceMap) - return false; - - axutil_hash_index_t* serviceHI = NULL; - void* serviceValue = NULL; - - for(serviceHI = axutil_hash_first(serviceMap, env); serviceHI; serviceHI - = axutil_hash_next(env, serviceHI)) - { - axutil_hash_this(serviceHI, NULL, NULL, &serviceValue); - - axis2_svc_t* service = (axis2_svc_t*)serviceValue; - if(!service) - return false; - - axis2_char_t* serviceName = axutil_qname_get_localpart(axis2_svc_get_qname( - service, env), env); - if(!serviceName) - return false; - - queueNameList.push_back(serviceName); - } - - std::cout << "CONNECTED" << std::endl; - } - else /* Client side separate listener in dual-channel case */ - { - string queueName = axis2_amqp_util_conf_ctx_get_dual_channel_queue_name(conf_ctx, - env); - - queueNameList.push_back(queueName); - } - - /* Create new session */ - Session session = connection.newSession(); - - /* Create Subscription manager */ - SubscriptionManager subscriptionManager(session); - - Axis2QpidReceiverListener qpidReceiverListener(env, conf_ctx); - - /* Subscribe to queues */ - while(!queueNameList.empty()) - { - string queueName = queueNameList.front(); - - session.queueDeclare(arg::queue = queueName, arg::autoDelete = true); - session.exchangeBind(arg::exchange = AXIS2_AMQP_EXCHANGE_DIRECT, arg::queue - = queueName, arg::bindingKey = queueName); - - subscriptionManager.subscribe(qpidReceiverListener, queueName); - - queueNameList.pop_front(); - } - - /* Listen and Wait */ - if(serverSide == AXIS2_TRUE) /* Server side */ - { - std::cout << "Started Axis2 AMQP Server ..." << std::endl; - } - - subscriptionManager.run(); - - return true; - } - catch(const std::exception& e) - { - connection.close(); - - if(serverSide == AXIS2_TRUE) /* Server side */ - { - std::cout << "FAILED" << std::endl; - } - - sleep(5); - } - } - - connection.close(); - - return false; -} - -bool -Axis2QpidReceiver::shutdown( - void) -{ - return true; -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +Axis2QpidReceiver::Axis2QpidReceiver( + const axutil_env_t* env, + axis2_conf_ctx_t* conf_ctx) +{ + this->env = env; + this->conf_ctx = conf_ctx; +} + +Axis2QpidReceiver::~Axis2QpidReceiver( + void) +{ +} + +bool +Axis2QpidReceiver::start( + void) +{ + if(!conf_ctx) + return false; + + Connection connection; + axis2_bool_t serverSide = AXIS2_TRUE; + + serverSide = axis2_amqp_util_conf_ctx_get_server_side(conf_ctx, env); + + while(true) + { + try + { + std::list queueNameList; + string qpidBrokerIP = axis2_amqp_util_conf_ctx_get_qpid_broker_ip(conf_ctx, env); + int qpidBrokerPort = axis2_amqp_util_conf_ctx_get_qpid_broker_port(conf_ctx, env); + + /* Check if Client Side and Resolve Dynamic Queue Name */ + if(serverSide == AXIS2_TRUE) /* Server side */ + { + std::cout << "Connecting to Qpid Broker on " << qpidBrokerIP << ":" + << qpidBrokerPort << " ... "; + } + + /* Create Connection to Qpid Broker */ + connection.open(qpidBrokerIP, qpidBrokerPort); + + if(serverSide == AXIS2_TRUE) /* Server side */ + { + /* Create queue for each service. Queue name is equal to service name */ + axis2_conf_t* conf = axis2_conf_ctx_get_conf(conf_ctx, env); + if(!conf) + return false; + + axutil_hash_t* serviceMap = axis2_conf_get_all_svcs(conf, env); + if(!serviceMap) + return false; + + axutil_hash_index_t* serviceHI = NULL; + void* serviceValue = NULL; + + for(serviceHI = axutil_hash_first(serviceMap, env); serviceHI; serviceHI + = axutil_hash_next(env, serviceHI)) + { + axutil_hash_this(serviceHI, NULL, NULL, &serviceValue); + + axis2_svc_t* service = (axis2_svc_t*)serviceValue; + if(!service) + return false; + + axis2_char_t* serviceName = axutil_qname_get_localpart(axis2_svc_get_qname( + service, env), env); + if(!serviceName) + return false; + + queueNameList.push_back(serviceName); + } + + std::cout << "CONNECTED" << std::endl; + } + else /* Client side separate listener in dual-channel case */ + { + string queueName = axis2_amqp_util_conf_ctx_get_dual_channel_queue_name(conf_ctx, + env); + + queueNameList.push_back(queueName); + } + + /* Create new session */ + Session session = connection.newSession(); + + /* Create Subscription manager */ + SubscriptionManager subscriptionManager(session); + + Axis2QpidReceiverListener qpidReceiverListener(env, conf_ctx); + + /* Subscribe to queues */ + while(!queueNameList.empty()) + { + string queueName = queueNameList.front(); + + session.queueDeclare(arg::queue = queueName, arg::autoDelete = true); + session.exchangeBind(arg::exchange = AXIS2_AMQP_EXCHANGE_DIRECT, arg::queue + = queueName, arg::bindingKey = queueName); + + subscriptionManager.subscribe(qpidReceiverListener, queueName); + + queueNameList.pop_front(); + } + + /* Listen and Wait */ + if(serverSide == AXIS2_TRUE) /* Server side */ + { + std::cout << "Started Axis2 AMQP Server ..." << std::endl; + } + + subscriptionManager.run(); + + return true; + } + catch(const std::exception& e) + { + connection.close(); + + if(serverSide == AXIS2_TRUE) /* Server side */ + { + std::cout << "FAILED" << std::endl; + } + + sleep(5); + } + } + + connection.close(); + + return false; +} + +bool +Axis2QpidReceiver::shutdown( + void) +{ + return true; +} -- cgit v1.1-32-gdbae