summaryrefslogtreecommitdiffstats
path: root/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver.cpp')
-rw-r--r--src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver.cpp334
1 files changed, 167 insertions, 167 deletions
diff --git a/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver.cpp b/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver.cpp
index 18d3f01..7ccedd8 100644
--- a/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver.cpp
+++ b/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver.cpp
@@ -1,167 +1,167 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <qpid/client/Connection.h>
-#include <qpid/client/Session.h>
-#include <qpid/client/SubscriptionManager.h>
-#include <axis2_amqp_request_processor.h>
-#include <axis2_amqp_defines.h>
-#include <axis2_amqp_util.h>
-#include <axis2_qpid_receiver_listener.h>
-#include <axis2_qpid_receiver.h>
-#include <list>
-
-Axis2QpidReceiver::Axis2QpidReceiver(
- const axutil_env_t* env,
- axis2_conf_ctx_t* conf_ctx)
-{
- this->env = env;
- this->conf_ctx = conf_ctx;
-}
-
-Axis2QpidReceiver::~Axis2QpidReceiver(
- void)
-{
-}
-
-bool
-Axis2QpidReceiver::start(
- void)
-{
- if(!conf_ctx)
- return false;
-
- Connection connection;
- axis2_bool_t serverSide = AXIS2_TRUE;
-
- serverSide = axis2_amqp_util_conf_ctx_get_server_side(conf_ctx, env);
-
- while(true)
- {
- try
- {
- std::list<string> queueNameList;
- string qpidBrokerIP = axis2_amqp_util_conf_ctx_get_qpid_broker_ip(conf_ctx, env);
- int qpidBrokerPort = axis2_amqp_util_conf_ctx_get_qpid_broker_port(conf_ctx, env);
-
- /* Check if Client Side and Resolve Dynamic Queue Name */
- if(serverSide == AXIS2_TRUE) /* Server side */
- {
- std::cout << "Connecting to Qpid Broker on " << qpidBrokerIP << ":"
- << qpidBrokerPort << " ... ";
- }
-
- /* Create Connection to Qpid Broker */
- connection.open(qpidBrokerIP, qpidBrokerPort);
-
- if(serverSide == AXIS2_TRUE) /* Server side */
- {
- /* Create queue for each service. Queue name is equal to service name */
- axis2_conf_t* conf = axis2_conf_ctx_get_conf(conf_ctx, env);
- if(!conf)
- return false;
-
- axutil_hash_t* serviceMap = axis2_conf_get_all_svcs(conf, env);
- if(!serviceMap)
- return false;
-
- axutil_hash_index_t* serviceHI = NULL;
- void* serviceValue = NULL;
-
- for(serviceHI = axutil_hash_first(serviceMap, env); serviceHI; serviceHI
- = axutil_hash_next(env, serviceHI))
- {
- axutil_hash_this(serviceHI, NULL, NULL, &serviceValue);
-
- axis2_svc_t* service = (axis2_svc_t*)serviceValue;
- if(!service)
- return false;
-
- axis2_char_t* serviceName = axutil_qname_get_localpart(axis2_svc_get_qname(
- service, env), env);
- if(!serviceName)
- return false;
-
- queueNameList.push_back(serviceName);
- }
-
- std::cout << "CONNECTED" << std::endl;
- }
- else /* Client side separate listener in dual-channel case */
- {
- string queueName = axis2_amqp_util_conf_ctx_get_dual_channel_queue_name(conf_ctx,
- env);
-
- queueNameList.push_back(queueName);
- }
-
- /* Create new session */
- Session session = connection.newSession();
-
- /* Create Subscription manager */
- SubscriptionManager subscriptionManager(session);
-
- Axis2QpidReceiverListener qpidReceiverListener(env, conf_ctx);
-
- /* Subscribe to queues */
- while(!queueNameList.empty())
- {
- string queueName = queueNameList.front();
-
- session.queueDeclare(arg::queue = queueName, arg::autoDelete = true);
- session.exchangeBind(arg::exchange = AXIS2_AMQP_EXCHANGE_DIRECT, arg::queue
- = queueName, arg::bindingKey = queueName);
-
- subscriptionManager.subscribe(qpidReceiverListener, queueName);
-
- queueNameList.pop_front();
- }
-
- /* Listen and Wait */
- if(serverSide == AXIS2_TRUE) /* Server side */
- {
- std::cout << "Started Axis2 AMQP Server ..." << std::endl;
- }
-
- subscriptionManager.run();
-
- return true;
- }
- catch(const std::exception& e)
- {
- connection.close();
-
- if(serverSide == AXIS2_TRUE) /* Server side */
- {
- std::cout << "FAILED" << std::endl;
- }
-
- sleep(5);
- }
- }
-
- connection.close();
-
- return false;
-}
-
-bool
-Axis2QpidReceiver::shutdown(
- void)
-{
- return true;
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <qpid/client/Connection.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/SubscriptionManager.h>
+#include <axis2_amqp_request_processor.h>
+#include <axis2_amqp_defines.h>
+#include <axis2_amqp_util.h>
+#include <axis2_qpid_receiver_listener.h>
+#include <axis2_qpid_receiver.h>
+#include <list>
+
+Axis2QpidReceiver::Axis2QpidReceiver(
+ const axutil_env_t* env,
+ axis2_conf_ctx_t* conf_ctx)
+{
+ this->env = env;
+ this->conf_ctx = conf_ctx;
+}
+
+Axis2QpidReceiver::~Axis2QpidReceiver(
+ void)
+{
+}
+
+bool
+Axis2QpidReceiver::start(
+ void)
+{
+ if(!conf_ctx)
+ return false;
+
+ Connection connection;
+ axis2_bool_t serverSide = AXIS2_TRUE;
+
+ serverSide = axis2_amqp_util_conf_ctx_get_server_side(conf_ctx, env);
+
+ while(true)
+ {
+ try
+ {
+ std::list<string> queueNameList;
+ string qpidBrokerIP = axis2_amqp_util_conf_ctx_get_qpid_broker_ip(conf_ctx, env);
+ int qpidBrokerPort = axis2_amqp_util_conf_ctx_get_qpid_broker_port(conf_ctx, env);
+
+ /* Check if Client Side and Resolve Dynamic Queue Name */
+ if(serverSide == AXIS2_TRUE) /* Server side */
+ {
+ std::cout << "Connecting to Qpid Broker on " << qpidBrokerIP << ":"
+ << qpidBrokerPort << " ... ";
+ }
+
+ /* Create Connection to Qpid Broker */
+ connection.open(qpidBrokerIP, qpidBrokerPort);
+
+ if(serverSide == AXIS2_TRUE) /* Server side */
+ {
+ /* Create queue for each service. Queue name is equal to service name */
+ axis2_conf_t* conf = axis2_conf_ctx_get_conf(conf_ctx, env);
+ if(!conf)
+ return false;
+
+ axutil_hash_t* serviceMap = axis2_conf_get_all_svcs(conf, env);
+ if(!serviceMap)
+ return false;
+
+ axutil_hash_index_t* serviceHI = NULL;
+ void* serviceValue = NULL;
+
+ for(serviceHI = axutil_hash_first(serviceMap, env); serviceHI; serviceHI
+ = axutil_hash_next(env, serviceHI))
+ {
+ axutil_hash_this(serviceHI, NULL, NULL, &serviceValue);
+
+ axis2_svc_t* service = (axis2_svc_t*)serviceValue;
+ if(!service)
+ return false;
+
+ axis2_char_t* serviceName = axutil_qname_get_localpart(axis2_svc_get_qname(
+ service, env), env);
+ if(!serviceName)
+ return false;
+
+ queueNameList.push_back(serviceName);
+ }
+
+ std::cout << "CONNECTED" << std::endl;
+ }
+ else /* Client side separate listener in dual-channel case */
+ {
+ string queueName = axis2_amqp_util_conf_ctx_get_dual_channel_queue_name(conf_ctx,
+ env);
+
+ queueNameList.push_back(queueName);
+ }
+
+ /* Create new session */
+ Session session = connection.newSession();
+
+ /* Create Subscription manager */
+ SubscriptionManager subscriptionManager(session);
+
+ Axis2QpidReceiverListener qpidReceiverListener(env, conf_ctx);
+
+ /* Subscribe to queues */
+ while(!queueNameList.empty())
+ {
+ string queueName = queueNameList.front();
+
+ session.queueDeclare(arg::queue = queueName, arg::autoDelete = true);
+ session.exchangeBind(arg::exchange = AXIS2_AMQP_EXCHANGE_DIRECT, arg::queue
+ = queueName, arg::bindingKey = queueName);
+
+ subscriptionManager.subscribe(qpidReceiverListener, queueName);
+
+ queueNameList.pop_front();
+ }
+
+ /* Listen and Wait */
+ if(serverSide == AXIS2_TRUE) /* Server side */
+ {
+ std::cout << "Started Axis2 AMQP Server ..." << std::endl;
+ }
+
+ subscriptionManager.run();
+
+ return true;
+ }
+ catch(const std::exception& e)
+ {
+ connection.close();
+
+ if(serverSide == AXIS2_TRUE) /* Server side */
+ {
+ std::cout << "FAILED" << std::endl;
+ }
+
+ sleep(5);
+ }
+ }
+
+ connection.close();
+
+ return false;
+}
+
+bool
+Axis2QpidReceiver::shutdown(
+ void)
+{
+ return true;
+}