| 1 | /* |
| 2 | * librdkafka - Apache Kafka C library |
| 3 | * |
| 4 | * Copyright (c) 2012,2013 Magnus Edenhill |
| 5 | * All rights reserved. |
| 6 | * |
| 7 | * Redistribution and use in source and binary forms, with or without |
| 8 | * modification, are permitted provided that the following conditions are met: |
| 9 | * |
| 10 | * 1. Redistributions of source code must retain the above copyright notice, |
| 11 | * this list of conditions and the following disclaimer. |
| 12 | * 2. Redistributions in binary form must reproduce the above copyright notice, |
| 13 | * this list of conditions and the following disclaimer in the documentation |
| 14 | * and/or other materials provided with the distribution. |
| 15 | * |
| 16 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| 17 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| 18 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| 19 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
| 20 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| 21 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| 22 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| 23 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| 24 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| 25 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| 26 | * POSSIBILITY OF SUCH DAMAGE. |
| 27 | */ |
| 28 | |
| 29 | #ifndef _RDKAFKA_TOPIC_H_ |
| 30 | #define _RDKAFKA_TOPIC_H_ |
| 31 | |
| 32 | #include "rdlist.h" |
| 33 | |
| 34 | extern const char *rd_kafka_topic_state_names[]; |
| 35 | |
| 36 | |
| 37 | /* rd_kafka_itopic_t: internal representation of a topic */ |
| 38 | struct rd_kafka_itopic_s { |
| 39 | TAILQ_ENTRY(rd_kafka_itopic_s) rkt_link; |
| 40 | |
| 41 | rd_refcnt_t rkt_refcnt; |
| 42 | |
| 43 | rwlock_t rkt_lock; |
| 44 | rd_kafkap_str_t *rkt_topic; |
| 45 | |
| 46 | shptr_rd_kafka_toppar_t *rkt_ua; /* unassigned partition */ |
| 47 | shptr_rd_kafka_toppar_t **rkt_p; |
| 48 | int32_t rkt_partition_cnt; |
| 49 | |
| 50 | rd_list_t rkt_desp; /* Desired partitions |
| 51 | * that are not yet seen |
| 52 | * in the cluster. */ |
| 53 | |
| 54 | rd_ts_t rkt_ts_metadata; /* Timestamp of last metadata |
| 55 | * update for this topic. */ |
| 56 | |
| 57 | mtx_t rkt_app_lock; /* Protects rkt_app_* */ |
| 58 | rd_kafka_topic_t *rkt_app_rkt; /* A shared topic pointer |
| 59 | * to be used for callbacks |
| 60 | * to the application. */ |
| 61 | |
| 62 | int rkt_app_refcnt; /* Number of active rkt's new()ed |
| 63 | * by application. */ |
| 64 | |
| 65 | enum { |
| 66 | RD_KAFKA_TOPIC_S_UNKNOWN, /* No cluster information yet */ |
| 67 | RD_KAFKA_TOPIC_S_EXISTS, /* Topic exists in cluster */ |
| 68 | RD_KAFKA_TOPIC_S_NOTEXISTS, /* Topic is not known in cluster */ |
| 69 | } rkt_state; |
| 70 | |
| 71 | int rkt_flags; |
| 72 | #define RD_KAFKA_TOPIC_F_LEADER_UNAVAIL 0x1 /* Leader lost/unavailable |
| 73 | * for at least one partition. */ |
| 74 | |
| 75 | rd_kafka_t *rkt_rk; |
| 76 | |
| 77 | rd_avg_t rkt_avg_batchsize; /**< Average batch size */ |
| 78 | rd_avg_t rkt_avg_batchcnt; /**< Average batch message count */ |
| 79 | |
| 80 | shptr_rd_kafka_itopic_t *rkt_shptr_app; /* Application's topic_new() */ |
| 81 | |
| 82 | rd_kafka_topic_conf_t rkt_conf; |
| 83 | }; |
| 84 | |
| 85 | #define rd_kafka_topic_rdlock(rkt) rwlock_rdlock(&(rkt)->rkt_lock) |
| 86 | #define rd_kafka_topic_wrlock(rkt) rwlock_wrlock(&(rkt)->rkt_lock) |
| 87 | #define rd_kafka_topic_rdunlock(rkt) rwlock_rdunlock(&(rkt)->rkt_lock) |
| 88 | #define rd_kafka_topic_wrunlock(rkt) rwlock_wrunlock(&(rkt)->rkt_lock) |
| 89 | |
| 90 | |
| 91 | /* Converts a shptr..itopic_t to an internal itopic_t */ |
| 92 | #define rd_kafka_topic_s2i(s_rkt) rd_shared_ptr_obj(s_rkt) |
| 93 | |
| 94 | /* Converts an application topic_t (a shptr topic) to an internal itopic_t */ |
| 95 | #define rd_kafka_topic_a2i(app_rkt) \ |
| 96 | rd_kafka_topic_s2i((shptr_rd_kafka_itopic_t *)app_rkt) |
| 97 | |
| 98 | /* Converts a shptr..itopic_t to an app topic_t (they are the same thing) */ |
| 99 | #define rd_kafka_topic_s2a(s_rkt) ((rd_kafka_topic_t *)(s_rkt)) |
| 100 | |
| 101 | /* Converts an app topic_t to a shptr..itopic_t (they are the same thing) */ |
| 102 | #define rd_kafka_topic_a2s(app_rkt) ((shptr_rd_kafka_itopic_t *)(app_rkt)) |
| 103 | |
| 104 | |
| 105 | |
| 106 | |
| 107 | |
| 108 | /** |
| 109 | * Returns a shared pointer for the topic. |
| 110 | */ |
| 111 | #define rd_kafka_topic_keep(rkt) \ |
| 112 | rd_shared_ptr_get(rkt, &(rkt)->rkt_refcnt, shptr_rd_kafka_itopic_t) |
| 113 | |
| 114 | /* Same, but casts to an app topic_t */ |
| 115 | #define rd_kafka_topic_keep_a(rkt) \ |
| 116 | ((rd_kafka_topic_t *)rd_shared_ptr_get(rkt, &(rkt)->rkt_refcnt, \ |
| 117 | shptr_rd_kafka_itopic_t)) |
| 118 | |
| 119 | void rd_kafka_topic_destroy_final (rd_kafka_itopic_t *rkt); |
| 120 | |
| 121 | |
| 122 | /** |
| 123 | * Frees a shared pointer previously returned by ..topic_keep() |
| 124 | */ |
| 125 | static RD_INLINE RD_UNUSED void |
| 126 | rd_kafka_topic_destroy0 (shptr_rd_kafka_itopic_t *s_rkt) { |
| 127 | rd_shared_ptr_put(s_rkt, |
| 128 | &rd_kafka_topic_s2i(s_rkt)->rkt_refcnt, |
| 129 | rd_kafka_topic_destroy_final( |
| 130 | rd_kafka_topic_s2i(s_rkt))); |
| 131 | } |
| 132 | |
| 133 | |
| 134 | shptr_rd_kafka_itopic_t *rd_kafka_topic_new0 (rd_kafka_t *rk, const char *topic, |
| 135 | rd_kafka_topic_conf_t *conf, |
| 136 | int *existing, int do_lock); |
| 137 | |
| 138 | shptr_rd_kafka_itopic_t *rd_kafka_topic_find_fl (const char *func, int line, |
| 139 | rd_kafka_t *rk, |
| 140 | const char *topic, |
| 141 | int do_lock); |
| 142 | shptr_rd_kafka_itopic_t *rd_kafka_topic_find0_fl (const char *func, int line, |
| 143 | rd_kafka_t *rk, |
| 144 | const rd_kafkap_str_t *topic); |
| 145 | #define rd_kafka_topic_find(rk,topic,do_lock) \ |
| 146 | rd_kafka_topic_find_fl(__FUNCTION__,__LINE__,rk,topic,do_lock) |
| 147 | #define rd_kafka_topic_find0(rk,topic) \ |
| 148 | rd_kafka_topic_find0_fl(__FUNCTION__,__LINE__,rk,topic) |
| 149 | int rd_kafka_topic_cmp_s_rkt (const void *_a, const void *_b); |
| 150 | |
| 151 | void rd_kafka_topic_partitions_remove (rd_kafka_itopic_t *rkt); |
| 152 | |
| 153 | void rd_kafka_topic_metadata_none (rd_kafka_itopic_t *rkt); |
| 154 | |
| 155 | int rd_kafka_topic_metadata_update2 (rd_kafka_broker_t *rkb, |
| 156 | const struct rd_kafka_metadata_topic *mdt); |
| 157 | |
| 158 | void rd_kafka_topic_scan_all (rd_kafka_t *rk, rd_ts_t now); |
| 159 | |
| 160 | |
| 161 | typedef struct rd_kafka_topic_info_s { |
| 162 | const char *topic; /**< Allocated along with struct */ |
| 163 | int partition_cnt; |
| 164 | } rd_kafka_topic_info_t; |
| 165 | |
| 166 | |
| 167 | int rd_kafka_topic_info_cmp (const void *_a, const void *_b); |
| 168 | rd_kafka_topic_info_t *rd_kafka_topic_info_new (const char *topic, |
| 169 | int partition_cnt); |
| 170 | void rd_kafka_topic_info_destroy (rd_kafka_topic_info_t *ti); |
| 171 | |
| 172 | int rd_kafka_topic_match (rd_kafka_t *rk, const char *pattern, |
| 173 | const char *topic); |
| 174 | |
| 175 | int rd_kafka_toppar_leader_update (rd_kafka_toppar_t *rktp, |
| 176 | int32_t leader_id, rd_kafka_broker_t *rkb); |
| 177 | |
| 178 | rd_kafka_resp_err_t |
| 179 | rd_kafka_topics_leader_query_sync (rd_kafka_t *rk, int all_topics, |
| 180 | const rd_list_t *topics, int timeout_ms); |
| 181 | void rd_kafka_topic_leader_query0 (rd_kafka_t *rk, rd_kafka_itopic_t *rkt, |
| 182 | int do_rk_lock); |
| 183 | #define rd_kafka_topic_leader_query(rk,rkt) \ |
| 184 | rd_kafka_topic_leader_query0(rk,rkt,1/*lock*/) |
| 185 | |
| 186 | #define rd_kafka_topic_fast_leader_query(rk) \ |
| 187 | rd_kafka_metadata_fast_leader_query(rk) |
| 188 | |
| 189 | void rd_kafka_local_topics_to_list (rd_kafka_t *rk, rd_list_t *topics); |
| 190 | |
| 191 | void rd_ut_kafka_topic_set_topic_exists (rd_kafka_itopic_t *rkt, |
| 192 | int partition_cnt, |
| 193 | int32_t leader_id); |
| 194 | |
| 195 | #endif /* _RDKAFKA_TOPIC_H_ */ |
| 196 | |