summaryrefslogtreecommitdiffstats
path: root/src/core/transport/amqp/receiver
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/transport/amqp/receiver')
-rw-r--r--src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver.cpp334
-rw-r--r--src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver.h30
-rw-r--r--src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver_interface.cpp204
-rw-r--r--src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver_listener.cpp246
-rw-r--r--src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver_listener.h30
5 files changed, 422 insertions, 422 deletions
diff --git a/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver.cpp b/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver.cpp
index 18d3f01..7ccedd8 100644
--- a/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver.cpp
+++ b/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver.cpp
@@ -1,167 +1,167 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <qpid/client/Connection.h>
-#include <qpid/client/Session.h>
-#include <qpid/client/SubscriptionManager.h>
-#include <axis2_amqp_request_processor.h>
-#include <axis2_amqp_defines.h>
-#include <axis2_amqp_util.h>
-#include <axis2_qpid_receiver_listener.h>
-#include <axis2_qpid_receiver.h>
-#include <list>
-
-Axis2QpidReceiver::Axis2QpidReceiver(
- const axutil_env_t* env,
- axis2_conf_ctx_t* conf_ctx)
-{
- this->env = env;
- this->conf_ctx = conf_ctx;
-}
-
-Axis2QpidReceiver::~Axis2QpidReceiver(
- void)
-{
-}
-
-bool
-Axis2QpidReceiver::start(
- void)
-{
- if(!conf_ctx)
- return false;
-
- Connection connection;
- axis2_bool_t serverSide = AXIS2_TRUE;
-
- serverSide = axis2_amqp_util_conf_ctx_get_server_side(conf_ctx, env);
-
- while(true)
- {
- try
- {
- std::list<string> queueNameList;
- string qpidBrokerIP = axis2_amqp_util_conf_ctx_get_qpid_broker_ip(conf_ctx, env);
- int qpidBrokerPort = axis2_amqp_util_conf_ctx_get_qpid_broker_port(conf_ctx, env);
-
- /* Check if Client Side and Resolve Dynamic Queue Name */
- if(serverSide == AXIS2_TRUE) /* Server side */
- {
- std::cout << "Connecting to Qpid Broker on " << qpidBrokerIP << ":"
- << qpidBrokerPort << " ... ";
- }
-
- /* Create Connection to Qpid Broker */
- connection.open(qpidBrokerIP, qpidBrokerPort);
-
- if(serverSide == AXIS2_TRUE) /* Server side */
- {
- /* Create queue for each service. Queue name is equal to service name */
- axis2_conf_t* conf = axis2_conf_ctx_get_conf(conf_ctx, env);
- if(!conf)
- return false;
-
- axutil_hash_t* serviceMap = axis2_conf_get_all_svcs(conf, env);
- if(!serviceMap)
- return false;
-
- axutil_hash_index_t* serviceHI = NULL;
- void* serviceValue = NULL;
-
- for(serviceHI = axutil_hash_first(serviceMap, env); serviceHI; serviceHI
- = axutil_hash_next(env, serviceHI))
- {
- axutil_hash_this(serviceHI, NULL, NULL, &serviceValue);
-
- axis2_svc_t* service = (axis2_svc_t*)serviceValue;
- if(!service)
- return false;
-
- axis2_char_t* serviceName = axutil_qname_get_localpart(axis2_svc_get_qname(
- service, env), env);
- if(!serviceName)
- return false;
-
- queueNameList.push_back(serviceName);
- }
-
- std::cout << "CONNECTED" << std::endl;
- }
- else /* Client side separate listener in dual-channel case */
- {
- string queueName = axis2_amqp_util_conf_ctx_get_dual_channel_queue_name(conf_ctx,
- env);
-
- queueNameList.push_back(queueName);
- }
-
- /* Create new session */
- Session session = connection.newSession();
-
- /* Create Subscription manager */
- SubscriptionManager subscriptionManager(session);
-
- Axis2QpidReceiverListener qpidReceiverListener(env, conf_ctx);
-
- /* Subscribe to queues */
- while(!queueNameList.empty())
- {
- string queueName = queueNameList.front();
-
- session.queueDeclare(arg::queue = queueName, arg::autoDelete = true);
- session.exchangeBind(arg::exchange = AXIS2_AMQP_EXCHANGE_DIRECT, arg::queue
- = queueName, arg::bindingKey = queueName);
-
- subscriptionManager.subscribe(qpidReceiverListener, queueName);
-
- queueNameList.pop_front();
- }
-
- /* Listen and Wait */
- if(serverSide == AXIS2_TRUE) /* Server side */
- {
- std::cout << "Started Axis2 AMQP Server ..." << std::endl;
- }
-
- subscriptionManager.run();
-
- return true;
- }
- catch(const std::exception& e)
- {
- connection.close();
-
- if(serverSide == AXIS2_TRUE) /* Server side */
- {
- std::cout << "FAILED" << std::endl;
- }
-
- sleep(5);
- }
- }
-
- connection.close();
-
- return false;
-}
-
-bool
-Axis2QpidReceiver::shutdown(
- void)
-{
- return true;
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <qpid/client/Connection.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/SubscriptionManager.h>
+#include <axis2_amqp_request_processor.h>
+#include <axis2_amqp_defines.h>
+#include <axis2_amqp_util.h>
+#include <axis2_qpid_receiver_listener.h>
+#include <axis2_qpid_receiver.h>
+#include <list>
+
+Axis2QpidReceiver::Axis2QpidReceiver(
+ const axutil_env_t* env,
+ axis2_conf_ctx_t* conf_ctx)
+{
+ this->env = env;
+ this->conf_ctx = conf_ctx;
+}
+
+Axis2QpidReceiver::~Axis2QpidReceiver(
+ void)
+{
+}
+
+bool
+Axis2QpidReceiver::start(
+ void)
+{
+ if(!conf_ctx)
+ return false;
+
+ Connection connection;
+ axis2_bool_t serverSide = AXIS2_TRUE;
+
+ serverSide = axis2_amqp_util_conf_ctx_get_server_side(conf_ctx, env);
+
+ while(true)
+ {
+ try
+ {
+ std::list<string> queueNameList;
+ string qpidBrokerIP = axis2_amqp_util_conf_ctx_get_qpid_broker_ip(conf_ctx, env);
+ int qpidBrokerPort = axis2_amqp_util_conf_ctx_get_qpid_broker_port(conf_ctx, env);
+
+ /* Check if Client Side and Resolve Dynamic Queue Name */
+ if(serverSide == AXIS2_TRUE) /* Server side */
+ {
+ std::cout << "Connecting to Qpid Broker on " << qpidBrokerIP << ":"
+ << qpidBrokerPort << " ... ";
+ }
+
+ /* Create Connection to Qpid Broker */
+ connection.open(qpidBrokerIP, qpidBrokerPort);
+
+ if(serverSide == AXIS2_TRUE) /* Server side */
+ {
+ /* Create queue for each service. Queue name is equal to service name */
+ axis2_conf_t* conf = axis2_conf_ctx_get_conf(conf_ctx, env);
+ if(!conf)
+ return false;
+
+ axutil_hash_t* serviceMap = axis2_conf_get_all_svcs(conf, env);
+ if(!serviceMap)
+ return false;
+
+ axutil_hash_index_t* serviceHI = NULL;
+ void* serviceValue = NULL;
+
+ for(serviceHI = axutil_hash_first(serviceMap, env); serviceHI; serviceHI
+ = axutil_hash_next(env, serviceHI))
+ {
+ axutil_hash_this(serviceHI, NULL, NULL, &serviceValue);
+
+ axis2_svc_t* service = (axis2_svc_t*)serviceValue;
+ if(!service)
+ return false;
+
+ axis2_char_t* serviceName = axutil_qname_get_localpart(axis2_svc_get_qname(
+ service, env), env);
+ if(!serviceName)
+ return false;
+
+ queueNameList.push_back(serviceName);
+ }
+
+ std::cout << "CONNECTED" << std::endl;
+ }
+ else /* Client side separate listener in dual-channel case */
+ {
+ string queueName = axis2_amqp_util_conf_ctx_get_dual_channel_queue_name(conf_ctx,
+ env);
+
+ queueNameList.push_back(queueName);
+ }
+
+ /* Create new session */
+ Session session = connection.newSession();
+
+ /* Create Subscription manager */
+ SubscriptionManager subscriptionManager(session);
+
+ Axis2QpidReceiverListener qpidReceiverListener(env, conf_ctx);
+
+ /* Subscribe to queues */
+ while(!queueNameList.empty())
+ {
+ string queueName = queueNameList.front();
+
+ session.queueDeclare(arg::queue = queueName, arg::autoDelete = true);
+ session.exchangeBind(arg::exchange = AXIS2_AMQP_EXCHANGE_DIRECT, arg::queue
+ = queueName, arg::bindingKey = queueName);
+
+ subscriptionManager.subscribe(qpidReceiverListener, queueName);
+
+ queueNameList.pop_front();
+ }
+
+ /* Listen and Wait */
+ if(serverSide == AXIS2_TRUE) /* Server side */
+ {
+ std::cout << "Started Axis2 AMQP Server ..." << std::endl;
+ }
+
+ subscriptionManager.run();
+
+ return true;
+ }
+ catch(const std::exception& e)
+ {
+ connection.close();
+
+ if(serverSide == AXIS2_TRUE) /* Server side */
+ {
+ std::cout << "FAILED" << std::endl;
+ }
+
+ sleep(5);
+ }
+ }
+
+ connection.close();
+
+ return false;
+}
+
+bool
+Axis2QpidReceiver::shutdown(
+ void)
+{
+ return true;
+}
diff --git a/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver.h b/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver.h
index 3fe9b44..a5352cd 100644
--- a/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver.h
+++ b/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver.h
@@ -1,19 +1,19 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * tcp://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
#ifndef AXIS2_QPID_RECEIVER_H
#define AXIS2_QPID_RECEIVER_H
diff --git a/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver_interface.cpp b/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver_interface.cpp
index 79faa86..607e526 100644
--- a/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver_interface.cpp
+++ b/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver_interface.cpp
@@ -1,102 +1,102 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include <axis2_qpid_receiver.h>
-#include <axis2_qpid_receiver_interface.h>
-
-#ifdef __cplusplus
-extern "C"
-{
-#endif
-
-AXIS2_EXTERN axis2_qpid_receiver_resource_pack_t* AXIS2_CALL
-axis2_qpid_receiver_create(
- const axutil_env_t* env,
- axis2_conf_ctx_t* conf_ctx)
-{
- AXIS2_ENV_CHECK(env, NULL);
-
- axis2_qpid_receiver_resource_pack_t* resource_pack = NULL;
-
- resource_pack = (axis2_qpid_receiver_resource_pack_t*)AXIS2_MALLOC
- (env->allocator, sizeof(axis2_qpid_receiver_resource_pack_t));
-
- if (!resource_pack)
- {
- AXIS2_ERROR_SET(env->error, AXIS2_ERROR_NO_MEMORY, AXIS2_FAILURE);
- return NULL;
- }
-
- /* Create Qpid Receiver */
- Axis2QpidReceiver* qpid_receiver = new Axis2QpidReceiver(env, conf_ctx);
-
- resource_pack->qpid_receiver = qpid_receiver;
-
- return resource_pack;
-}
-
-
-AXIS2_EXTERN axis2_status_t AXIS2_CALL
-axis2_qpid_receiver_start(
- axis2_qpid_receiver_resource_pack_t* receiver_resource_pack,
- const axutil_env_t* env)
-{
- AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
-
- axis2_status_t status = AXIS2_FAILURE;
-
- /* Start Qpid Receiver */
- Axis2QpidReceiver* qpid_receiver = (Axis2QpidReceiver*)receiver_resource_pack->qpid_receiver;
-
- if ((qpid_receiver) && (qpid_receiver->start()))
- {
- status = AXIS2_SUCCESS;
- }
-
- return status;
-}
-
-
-AXIS2_EXTERN axis2_bool_t AXIS2_CALL
-axis2_qpid_receiver_is_running(
- axis2_qpid_receiver_resource_pack_t* receiver_resource_pack,
- const axutil_env_t* env)
-{
- return AXIS2_TRUE;
-}
-
-
-AXIS2_EXTERN void AXIS2_CALL
-axis2_qpid_receiver_free(
- axis2_qpid_receiver_resource_pack_t* receiver_resource_pack,
- const axutil_env_t* env)
-{
- AXIS2_ENV_CHECK(env, void);
-
- if (receiver_resource_pack)
- {
- Axis2QpidReceiver* qpid_receiver = (Axis2QpidReceiver*)receiver_resource_pack->qpid_receiver;
- if (qpid_receiver)
- delete qpid_receiver;
-
- AXIS2_FREE(env->allocator, receiver_resource_pack);
- }
-}
-
-#ifdef __cplusplus
-}
-#endif
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include <axis2_qpid_receiver.h>
+#include <axis2_qpid_receiver_interface.h>
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+AXIS2_EXTERN axis2_qpid_receiver_resource_pack_t* AXIS2_CALL
+axis2_qpid_receiver_create(
+ const axutil_env_t* env,
+ axis2_conf_ctx_t* conf_ctx)
+{
+ AXIS2_ENV_CHECK(env, NULL);
+
+ axis2_qpid_receiver_resource_pack_t* resource_pack = NULL;
+
+ resource_pack = (axis2_qpid_receiver_resource_pack_t*)AXIS2_MALLOC
+ (env->allocator, sizeof(axis2_qpid_receiver_resource_pack_t));
+
+ if (!resource_pack)
+ {
+ AXIS2_ERROR_SET(env->error, AXIS2_ERROR_NO_MEMORY, AXIS2_FAILURE);
+ return NULL;
+ }
+
+ /* Create Qpid Receiver */
+ Axis2QpidReceiver* qpid_receiver = new Axis2QpidReceiver(env, conf_ctx);
+
+ resource_pack->qpid_receiver = qpid_receiver;
+
+ return resource_pack;
+}
+
+
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+axis2_qpid_receiver_start(
+ axis2_qpid_receiver_resource_pack_t* receiver_resource_pack,
+ const axutil_env_t* env)
+{
+ AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
+
+ axis2_status_t status = AXIS2_FAILURE;
+
+ /* Start Qpid Receiver */
+ Axis2QpidReceiver* qpid_receiver = (Axis2QpidReceiver*)receiver_resource_pack->qpid_receiver;
+
+ if ((qpid_receiver) && (qpid_receiver->start()))
+ {
+ status = AXIS2_SUCCESS;
+ }
+
+ return status;
+}
+
+
+AXIS2_EXTERN axis2_bool_t AXIS2_CALL
+axis2_qpid_receiver_is_running(
+ axis2_qpid_receiver_resource_pack_t* receiver_resource_pack,
+ const axutil_env_t* env)
+{
+ return AXIS2_TRUE;
+}
+
+
+AXIS2_EXTERN void AXIS2_CALL
+axis2_qpid_receiver_free(
+ axis2_qpid_receiver_resource_pack_t* receiver_resource_pack,
+ const axutil_env_t* env)
+{
+ AXIS2_ENV_CHECK(env, void);
+
+ if (receiver_resource_pack)
+ {
+ Axis2QpidReceiver* qpid_receiver = (Axis2QpidReceiver*)receiver_resource_pack->qpid_receiver;
+ if (qpid_receiver)
+ delete qpid_receiver;
+
+ AXIS2_FREE(env->allocator, receiver_resource_pack);
+ }
+}
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver_listener.cpp b/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver_listener.cpp
index 63717ca..6511208 100644
--- a/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver_listener.cpp
+++ b/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver_listener.cpp
@@ -1,123 +1,123 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * tcp://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <axis2_amqp_request_processor.h>
-#include <axis2_amqp_defines.h>
-#include <axis2_amqp_util.h>
-#include <axis2_qpid_receiver_listener.h>
-#include <axis2_qpid_receiver.h>
-#include <string>
-
-Axis2QpidReceiverListener::Axis2QpidReceiverListener(
- const axutil_env_t* env,
- axis2_conf_ctx_t* conf_ctx)
-{
- this->env = env;
- this->conf_ctx = conf_ctx;
-}
-
-Axis2QpidReceiverListener::~Axis2QpidReceiverListener(
- void)
-{
-}
-
-void
-Axis2QpidReceiverListener::received(
- Message& message)
-{
- AXIS2_ENV_CHECK(env, void);
-
- axis2_amqp_request_processor_resource_pack_t* request_data = NULL;
-#ifdef AXIS2_SVR_MULTI_THREADED
- axutil_thread_t* worker_thread = NULL;
-#endif
-
- request_data = (axis2_amqp_request_processor_resource_pack_t*)AXIS2_MALLOC(env->allocator,
- sizeof(axis2_amqp_request_processor_resource_pack_t));
-
- if(!request_data)
- {
- AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Memory Allocation Error");
- return;
- }
-
- request_data->env = (axutil_env_t*)env;
- request_data->conf_ctx = conf_ctx;
-
- /* Create a Local Copy of Request Content */
- std::string message_data = message.getData();
- axis2_char_t* request_content =
- (axis2_char_t*)AXIS2_MALLOC(env->allocator, message_data.size());
- memcpy(request_content, message_data.c_str(), message_data.size());
-
- request_data->request_content = request_content;
- request_data->content_length = message_data.size();
-
- /* Set ReplyTo */
- request_data->reply_to = NULL;
- if(message.getMessageProperties().hasReplyTo())
- {
- /* Create a Local Copy of ReplyTo */
- std::string reply_to_tmp = message.getMessageProperties().getReplyTo().getRoutingKey();
- axis2_char_t* reply_to = (axis2_char_t*)AXIS2_MALLOC(env->allocator, reply_to_tmp.size()
- + 1);
- strcpy(reply_to, reply_to_tmp.c_str());
-
- request_data->reply_to = reply_to;
- }
-
- /* Copy AMQP headers */
- /* Content-Type */
- request_data->content_type = NULL;
- std::string content_type_tmp = message.getHeaders().getAsString(AXIS2_AMQP_HEADER_CONTENT_TYPE);
- if(!content_type_tmp.empty())
- {
- axis2_char_t* content_type = (axis2_char_t*)AXIS2_MALLOC(env->allocator,
- content_type_tmp.size() + 1);
- strcpy(content_type, content_type_tmp.c_str());
-
- request_data->content_type = content_type;
- }
-
- /* SOAPAction */
- request_data->soap_action = NULL;
- std::string soap_action_tmp = message.getHeaders().getAsString(AXIS2_AMQP_HEADER_SOAP_ACTION);
- if(!soap_action_tmp.empty())
- {
- axis2_char_t* soap_action = (axis2_char_t*)AXIS2_MALLOC(env->allocator,
- soap_action_tmp.size() + 1);
- strcpy(soap_action, soap_action_tmp.c_str());
-
- request_data->soap_action = soap_action;
- }
-
-#ifdef AXIS2_SVR_MULTI_THREADED
- worker_thread = axutil_thread_pool_get_thread(env->thread_pool,
- axis2_amqp_request_processor_thread_function,
- (void*)request_data);
-
- if (!worker_thread)
- {
- AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Failed to Create Thread");
- return;
- }
-
- axutil_thread_pool_thread_detach(env->thread_pool, worker_thread);
-#else
- axis2_amqp_request_processor_thread_function(NULL, (void*)request_data);
-#endif
-}
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include <axis2_amqp_request_processor.h>
+#include <axis2_amqp_defines.h>
+#include <axis2_amqp_util.h>
+#include <axis2_qpid_receiver_listener.h>
+#include <axis2_qpid_receiver.h>
+#include <string>
+
+Axis2QpidReceiverListener::Axis2QpidReceiverListener(
+ const axutil_env_t* env,
+ axis2_conf_ctx_t* conf_ctx)
+{
+ this->env = env;
+ this->conf_ctx = conf_ctx;
+}
+
+Axis2QpidReceiverListener::~Axis2QpidReceiverListener(
+ void)
+{
+}
+
+void
+Axis2QpidReceiverListener::received(
+ Message& message)
+{
+ AXIS2_ENV_CHECK(env, void);
+
+ axis2_amqp_request_processor_resource_pack_t* request_data = NULL;
+#ifdef AXIS2_SVR_MULTI_THREADED
+ axutil_thread_t* worker_thread = NULL;
+#endif
+
+ request_data = (axis2_amqp_request_processor_resource_pack_t*)AXIS2_MALLOC(env->allocator,
+ sizeof(axis2_amqp_request_processor_resource_pack_t));
+
+ if(!request_data)
+ {
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Memory Allocation Error");
+ return;
+ }
+
+ request_data->env = (axutil_env_t*)env;
+ request_data->conf_ctx = conf_ctx;
+
+ /* Create a Local Copy of Request Content */
+ std::string message_data = message.getData();
+ axis2_char_t* request_content =
+ (axis2_char_t*)AXIS2_MALLOC(env->allocator, message_data.size());
+ memcpy(request_content, message_data.c_str(), message_data.size());
+
+ request_data->request_content = request_content;
+ request_data->content_length = message_data.size();
+
+ /* Set ReplyTo */
+ request_data->reply_to = NULL;
+ if(message.getMessageProperties().hasReplyTo())
+ {
+ /* Create a Local Copy of ReplyTo */
+ std::string reply_to_tmp = message.getMessageProperties().getReplyTo().getRoutingKey();
+ axis2_char_t* reply_to = (axis2_char_t*)AXIS2_MALLOC(env->allocator, reply_to_tmp.size()
+ + 1);
+ strcpy(reply_to, reply_to_tmp.c_str());
+
+ request_data->reply_to = reply_to;
+ }
+
+ /* Copy AMQP headers */
+ /* Content-Type */
+ request_data->content_type = NULL;
+ std::string content_type_tmp = message.getHeaders().getAsString(AXIS2_AMQP_HEADER_CONTENT_TYPE);
+ if(!content_type_tmp.empty())
+ {
+ axis2_char_t* content_type = (axis2_char_t*)AXIS2_MALLOC(env->allocator,
+ content_type_tmp.size() + 1);
+ strcpy(content_type, content_type_tmp.c_str());
+
+ request_data->content_type = content_type;
+ }
+
+ /* SOAPAction */
+ request_data->soap_action = NULL;
+ std::string soap_action_tmp = message.getHeaders().getAsString(AXIS2_AMQP_HEADER_SOAP_ACTION);
+ if(!soap_action_tmp.empty())
+ {
+ axis2_char_t* soap_action = (axis2_char_t*)AXIS2_MALLOC(env->allocator,
+ soap_action_tmp.size() + 1);
+ strcpy(soap_action, soap_action_tmp.c_str());
+
+ request_data->soap_action = soap_action;
+ }
+
+#ifdef AXIS2_SVR_MULTI_THREADED
+ worker_thread = axutil_thread_pool_get_thread(env->thread_pool,
+ axis2_amqp_request_processor_thread_function,
+ (void*)request_data);
+
+ if (!worker_thread)
+ {
+ AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Failed to Create Thread");
+ return;
+ }
+
+ axutil_thread_pool_thread_detach(env->thread_pool, worker_thread);
+#else
+ axis2_amqp_request_processor_thread_function(NULL, (void*)request_data);
+#endif
+}
diff --git a/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver_listener.h b/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver_listener.h
index 5d3f615..9717dc8 100644
--- a/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver_listener.h
+++ b/src/core/transport/amqp/receiver/qpid_receiver/axis2_qpid_receiver_listener.h
@@ -1,19 +1,19 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * tcp://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
#ifndef AXIS2_QPID_RECEIVER_LISTENER_H
#define AXIS2_QPID_RECEIVER_LISTENER_H