diff --git a/composer.json b/composer.json index d5e1992..dd827af 100644 --- a/composer.json +++ b/composer.json @@ -8,6 +8,11 @@ "email": "k.wnuk@ascetic.pl" } ], + "autoload": { + "psr-4": { + "Kafka\\": "src/" + } + }, "require": { "ext-rdkafka": "*" }, diff --git a/src/Constant/GlobalConfig.php b/src/Constant/GlobalConfig.php new file mode 100644 index 0000000..93a678a --- /dev/null +++ b/src/Constant/GlobalConfig.php @@ -0,0 +1,623 @@ +=0.10.0. If the request is not supported by (an older) broker the `broker.version.fallback` fallback is used. + * + * Type: boolean + * Range: true, false + * Default: true + */ + const API_VERSION_REQUEST = 'api.version.request'; + + /** + * Timeout for broker API version requests. + * + * Type: integer + * Range: 1 .. 300000 + * Default: 10000 + */ + const API_VERSION_REQUEST_TIMEOUT_MS = 'api.version.request.timeout.ms'; + + /** + * Dictates how long the `broker.version.fallback` fallback is used in the case the ApiVersionRequest fails. **NOTE**: The ApiVersionRequest is only issued when a new connection to the broker is made (such as after an upgrade). + * + * Type: integer + * Range: 0 .. 604800000 + * Default: 1200000 + */ + const API_VERSION_FALLBACK_MS = 'api.version.fallback.ms'; + + /** + * Older broker versions (<0.10.0) provides no way for a client to query for supported protocol features (ApiVersionRequest, see `api.version.request`) making it impossible for the client to know what features it may use. As a workaround a user may set this property to the expected broker version and the client will automatically adjust its feature set accordingly if the ApiVersionRequest fails (or is disabled). The fallback broker version will be used for `api.version.fallback.ms`. Valid values are: 0.9.0, 0.8.2, 0.8.1, 0.8.0. Any other value, such as 0.10.2.1, enables ApiVersionRequests. + * + * Type: string + * Default: 0.9.0 + */ + const BROKER_VERSION_FALLBACK = 'broker.version.fallback'; + + /** + * Protocol used to communicate with brokers. + * + * Type: enum value + * Range: plaintext, ssl, sasl_plaintext, sasl_ssl + * Default: plaintext + */ + const SECURITY_PROTOCOL = 'security.protocol'; + + /** + * A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. See manual page for `ciphers(1)` and `SSL_CTX_set_cipher_list(3). + * + * Type: string + */ + const SSL_CIPHER_SUITES = 'ssl.cipher.suites'; + + /** + * The supported-curves extension in the TLS ClientHello message specifies the curves (standard/named, or 'explicit' GF(2^k) or GF(p)) the client is willing to have the server use. See manual page for `SSL_CTX_set1_curves_list(3)`. OpenSSL >= 1.0.2 required. + * + * Type: string + */ + const SSL_CURVES_LIST = 'ssl.curves.list'; + + /** + * The client uses the TLS ClientHello signature_algorithms extension to indicate to the server which signature/hash algorithm pairs may be used in digital signatures. See manual page for `SSL_CTX_set1_sigalgs_list(3)`. OpenSSL >= 1.0.2 required. + * + * Type: string + */ + const SSL_SIGALGS_LIST = 'ssl.sigalgs.list'; + + /** + * Path to client's private key (PEM) used for authentication. + * + * Type: string + */ + const SSL_KEY_LOCATION = 'ssl.key.location'; + + /** + * Private key passphrase + * + * Type: string + */ + const SSL_KEY_PASSWORD = 'ssl.key.password'; + + /** + * Path to client's public key (PEM) used for authentication. + * + * Type: string + */ + const SSL_CERTIFICATE_LOCATION = 'ssl.certificate.location'; + + /** + * File or directory path to CA certificate(s) for verifying the broker's key. + * + * Type: string + */ + const SSL_CA_LOCATION = 'ssl.ca.location'; + + /** + * Path to CRL for verifying broker's certificate validity. + * + * Type: string + */ + const SSL_CRL_LOCATION = 'ssl.crl.location'; + + /** + * Path to client's keystore (PKCS#12) used for authentication. + * + * Type: string + */ + const SSL_KEYSTORE_LOCATION = 'ssl.keystore.location'; + + /** + * Client's keystore (PKCS#12) password. + * + * Type: string + */ + const SSL_KEYSTORE_PASSWORD = 'ssl.keystore.password'; + + /** + * SASL mechanism to use for authentication. Supported: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. **NOTE**: Despite the name only one mechanism must be configured. + * + * Type: string + * Default: GSSAPI + */ + const SASL_MECHANISMS = 'sasl.mechanisms'; + + /** + * Alias for `sasl.mechanisms` + */ + const SASL_MECHANISM = 'sasl.mechanism'; + + /** + * Default: kafka + */ + const SASL_KERBEROS_SERVICE_NAME = 'sasl.kerberos.service.name'; + + /** + * This client's Kerberos principal name. (Not supported on Windows, will use the logon user's principal). + * + * Type: string + * Default: kafkaclient + */ + const SASL_KERBEROS_PRINCIPAL = 'sasl.kerberos.principal'; + + /** + * Full kerberos kinit command string, %{config.prop.name} is replaced by corresponding config object value, %{broker.name} returns the broker's hostname. + * + * Type: string + * Default: kinit -S "%{sasl.kerberos.service.name}/%{broker.name}" -k -t "%{sasl.kerberos.keytab}" %{sasl.kerberos.principal} + */ + const SASL_KERBEROS_KINIT_CMD = 'sasl.kerberos.kinit.cmd'; + + /** + * Path to Kerberos keytab file. Uses system default if not set.**NOTE**: This is not automatically used but must be added to the template in sasl.kerberos.kinit.cmd as ` ... -t %{sasl.kerberos.keytab}`. + * + * Type: string + */ + const SASL_KERBEROS_KEYTAB = 'sasl.kerberos.keytab'; + + /** + * Minimum time in milliseconds between key refresh attempts. + * + * Type: integer + * Range: 1 .. 86400000 + * Default: 60000 + */ + const SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN = 'sasl.kerberos.min.time.before.relogin'; + + /** + * SASL username for use with the PLAIN and SASL-SCRAM-.. mechanisms + * + * Type: string + */ + const SASL_USERNAME = 'sasl.username'; + + /** + * SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism + * + * Type: string + */ + const SASL_PASSWORD = 'sasl.password'; + + /** + * List of plugin libaries to load (; separated). The library search path is platform dependent (see dlopen(3) for Unix and LoadLibrary() for Windows). If no filename extension is specified the platform-specific extension (such as .dll or .so) will be appended automatically. + * + * Type: string + */ + const PLUGIN_LIBRARY_PATHS = 'plugin.library.paths'; + + /** + * Interceptors added through rd_kafka_conf_interceptor_add_..() and any configuration handled by interceptors. + * + * Type: + */ + const INTERCEPTORS = 'interceptors'; + + /** + * Client group id string. All clients sharing the same group.id belong to the same group. + * + * Type: string + */ + const GROUP_ID = 'group.id'; + + /** + * Name of partition assignment strategy to use when elected group leader assigns partitions to group members. + * + * Type: string + * Default: range,roundrobin + */ + const PARTITION_ASSIGNMENT_STRATEGY = 'partition.assignment.strategy'; + + /** + * Client group session and failure detection timeout. + * + * Type: integer + * Range: 1 .. 3600000 + * Default: 30000 + */ + const SESSION_TIMEOUT_MS = 'session.timeout.ms'; + + /** + * Group session keepalive heartbeat interval. + * + * Type: integer + * Range: 1 .. 3600000 + * Default: 1000 + */ + const HEARTBEAT_INTERVAL_MS = 'heartbeat.interval.ms'; + + /** + * Group protocol type + * + * Type: string + * Default: consumer + */ + const GROUP_PROTOCOL_TYPE = 'group.protocol.type'; + + /** + * How often to query for the current client group coordinator. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment. + * + * Type: integer + * Range: 1 .. 3600000 + * Default: 600000 + */ + const COORDINATOR_QUERY_INTERVAL_MS = 'coordinator.query.interval.ms'; + + final protected function __construct() + { + } +} diff --git a/src/Constant/GlobalConsumer.php b/src/Constant/GlobalConsumer.php new file mode 100644 index 0000000..cd4a63e --- /dev/null +++ b/src/Constant/GlobalConsumer.php @@ -0,0 +1,154 @@ +err'. + * + * Type: enum value + * Range: smallest, earliest, beginning, largest, latest, end, error + * Default: largest + */ + const AUTO_OFFSET_RESET = 'auto.offset.reset'; + + /** + * Path to local file for storing offsets. If the path is a directory a filename will be automatically generated in that directory based on the topic and partition. + * + * Type: string + * Default: . + */ + const OFFSET_STORE_PATH = 'offset.store.path'; + + /** + * Fsync() interval for the offset file, in milliseconds. Use -1 to disable syncing, and 0 for immediate sync after each write. + * + * Type: integer + * Range: -1 .. 86400000 + * Default: -1 + */ + const OFFSET_STORE_SYNC_INTERVAL_MS = 'offset.store.sync.interval.ms'; + + /** + * Offset commit store method: 'file' - local file store (offset.store.path, et.al), 'broker' - broker commit store (requires "group.id" to be configured and Apache Kafka 0.8.2 or later on the broker.). + * + * Type: enum value + * Range: file, broker + * Default: broker + */ + const OFFSET_STORE_METHOD = 'offset.store.method'; + + /** + * Maximum number of messages to dispatch in one `rd_kafka_consume_callback*()` call (0 = unlimited) + * + * Type: integer + * Range: 0 .. 1000000 + * Default: 0 + */ + const CONSUME_CALLBACK_MAX_MESSAGES = 'consume.callback.max.messages'; +} diff --git a/src/Constant/TopicProducer.php b/src/Constant/TopicProducer.php new file mode 100644 index 0000000..7738e10 --- /dev/null +++ b/src/Constant/TopicProducer.php @@ -0,0 +1,95 @@ +