Tuesday, 25 July 2017

Install Tensorflow

$pip install virtualenv

$mkdir envs

Create a virtual environment
$virtualenv ~/envs/tensorflow

Activate the environment
$source ~/envs/tensorflow/bin/activate

Install Tensorflow
(tensorflow)$pip install tensorflow​

Exit the virtual environment
(tensorflow)$ deactivate

Monday, 8 May 2017

Kafka Partition Leader and Controller

Zookeeper and Controller:

Every time a broker process starts, it registers itself with its id in Zookeper by creating an ephemeral node. When a broker loses connectivity to Zookeeper, the ephemeral node that the broker created when starting will be automatically removed from Zookeeper. Kafka components that are watching the list of brokers will be notified for that.

The controller is one of kafka brokers is also responsible for the task of electing leaders among the partitions and replicas. The first broker starts in the cluster becomes the controller by creating an ephemeral node in Zookeeper /controller. The brokers create a Zookeeper watch on the controller node, so they get notified on changes to this node. Kafka uses Zookeeper's ephemeral node feature to elect a controller and to notify the controller when nodes join and leave the cluster.

When the controller announces that a partition has a new leader, it sends LeaderAndIsr request to the new leader and the followers.

Request Processing:

All requests sent to the broker from a specific client will be processd in the order they were received. Both produce requests(producer) and fetch requests(consumers and followers) have to be sent to the leader replica of a partition.

Kafka clients uses another request "metadata request" to know where to send the requests.
Metadata request includes a list of topics the client is interested in. It can be sent to any broker since all brokers have a metadata cache taht contains this information. Clients typically cache this information, and need to occasinally refresh it by sending metadata requests.

In-Sync Replica(ISR)

In order to stay in sync with the leader, the replicas send the leader Fetch requests. Only in-sync replicas are eligible to be elected as partition leaders in case the existing leader fails.

"acks" as "written successfully" when the messages was accepted by just the leader(acks=1), all in-sync replicas(acks=all). Consumers can only read messages that were written to all in-sync-replicas.

Saturday, 6 May 2017

Why Kubernetes?

What is a Container?

A container at its core is an allocation, portioning, and assignment of host resources such as CPU Shares, Network I/O, Bandwidth, Block I/O, and Memory, so that kernel level constructs may jail-off, isolate or “contain” these protected resources so that specific running services and namespaces may solely utilize them without interfering with the rest of the system.

Commonly known as “operating system-level virtualization containers differ from hypervisor level virtualization. The main difference is that the container model eliminates the hypervisor layer, redundant OS kernels, binaries, and libraries needed to typically run workloads in a VM.
The New Way is to deploy containers based on operating-system-level virtualization rather than hardware virtualization. These containers are isolated from each other and from the host: they have their own filesystems, they can’t see each others’ processes, and their computational resource usage can be bounded. They are easier to build than VMs, and because they are decoupled from the underlying infrastructure and from the host filesystem, they are portable across clouds and OS distributions.

Kubernetes can schedule and run application containers on clusters of physical or virtual machines. However, Kubernetes also allows developers to ‘cut the cord’ to physical and virtual machines, moving from a host-centric infrastructure to a container-centric infrastructure, which provides the full advantages and benefits inherent to containers.

If an application can run in a container, it should run great on Kubernetes. Additionally, Kubernetes is not a mere “orchestration system”; it eliminates the need for orchestration.

Wednesday, 3 May 2017

Kafka Consumer Challenges

Consumer Retry
1. When you encounter a retriable error, is to commit the last record you processed successfully. Then, store the records that still need to be processed in a buffer, use the consumer pause() method to ensure that additional polls won't return data. If you succeed, or retried enough times, log an error, call resume() to unpause the consumer and the next poll will return new records to process.

2. When encountering a retriable error is to write it to a separate topic and continue. A separate consumer group can be used to handle retries from the retry topic.

Long Processing Times

The consumer processing records could take a long time. But you can't stop polling for more than few seconds. You must continue polling so the client can send hearbeats to the broker, and rebalance will not be triggered. A common pattern is to hand off the data to process to a thread-pool, when possible with multiple threads. You can pause the consumer, and keep polling without fetching additional data until the worker-threads finished.

Exactly Once Delivery

1. Writing results to a system that has some support for unique keys. Either the record itself contains a unique key, or you can create a unique key using the topic, partition and offset combination, which uniquely identifies a Kafka record. The data-store will override the existing one, when it receives the duplication. This pattern is called idempotent writes.

2. Writing results to a system that has transactions. Write the record and their offsets in the same transactionk. When starting up, retrieve the offsets of the latest records written to the external store, and then use consumer.seek() to start consuming again from those offsets.

Sunday, 30 April 2017

Kafka Connect

Kafka, being a streaming data platform, acts as a giant buffer that decouples the time-sensitivity requirements between producers and consumers. Kafka itself applies back-pressure on producers, and consumption rate is driven entirely by the consumers. If producer throughput exceeds that of the consumer, data will accumulate in Kafka until the consumer can catch up.

Kafka can provide "at least once" on its own, and "exactly once" when combined with an external data store that has a transactional model or unique keys.


ETL: Extract-Transform-Load, the data pipeline is responsible for making modifications to the data as it passes through.

ELT: Extract-Load-Transform, the data pipeline does only minimal transformation(e.g. data type conversion), with the goal of making sure the data that arrives at the target is as similar as possible to the source data. Data-Lake architecture preserves as much of the raw data as possible and allow downstream apps to make their own decision regarding data processing and aggregation.

Kafka Connect vs. Client APIs

Use Connect, when you don't write or modify their code. Connect provides out the box features like configuration management, offset storage, parallelization, error handling, REST, etc.

The connector is responsible for:

  1. How many tasks will run for the connector
  2. How to split the data copying work between tasks
  3. Get configuration for the tasks from workers

All tasks are initialized by receiving a context from the worker. Source context includes an object that allows the source task to store offsets of source records, retry and store offsets externally for exactly-once delivery.


Kafka Connect's worker processes are the container processes that execute the connectors and tasks.
If a worker process crashes, other workers will recognize that and will reassign the connectors and tasks that ran on that worker.

Connectors and tasks are responsible for the "moving data" part of data integration, while the workers are responsible for the REST API, configuration management, reliability, and load balancing.

Offset Management

For source connectors, the records that the connector returns to the Connect workers include a logical partition and a logical offset in the source system, e.g. a partition can be a file and an offset can be a line number, or a partition can be a database table and a offset can be an ID or a record.
The worker stores both the records and their offsets to Kafka topic. This allows connectors to start processing events from the most recent stored offset after a restart or a crash.

Monday, 24 April 2017

Avro Schema Registry

It’s useless to send the schema of the data along with the data each time (as we do with JSON). It’s not memory and network efficient. It’s smarter to just send an ID along the data that the other parties will use to understand how are encoded the data.

On serialization: we contact the SR to register (if not already) the Avro schema of the data we’re going to write (to get a unique ID). We write this ID as the first bytes in the payload, then we append the data. A schema has a unique ID (so multiple messages use the same schema ID).

On deserialization: we read the first bytes of the payload to know what is the version of the schema that was used to write the data. We contact the SR with this ID to grab the Schema if we don’t have it yet, then we parse it to a org.apache.Schema and we read the data using the Avro API and this Schema(or we can read with another compatible Schema if we know it’s backward/forward compatible).

A subject represents a collection of compatible (according to custom validation rules) schemas in the SR. The schema registry depends on Zookeeper and looks for Kafka brokers. If it can’t find one, it won’t start.

By default, the client caches the schemas passing by to avoid querying the HTTP endpoint each time. The schema validation is done on the schema registry itself according to its configuration (none, backward, forward, full)

A Kafka message is a Key-Value pair. In consequence, a topic can have two schemas, one for the Key, one for the Value.
Avro fixes those issues:
  • Space and network efficiency thanks to a reduced payload.
  • Schema evolution intelligence and compatibility enforcing.
  • Schema registry visibility, centralization, and reutilization.
  • Kafka and Hadoop compliant.
Avro Serialization/Deserialization

Specific Avro classes mean that we use Avro's code generation to generate the object class from avro schema file, then populate it and produce to Kafka. Avro maven plugin is provided in pom.xml file, and run  $ mvn clean package to generate object Java files.

However, Generic Avro, without code generation, only avro schema file is needed. It sends GenericRecord


Monday, 17 April 2017

Config Log4j to Send Kafka logs to Syslog

1. Configure Syslog Daemon for UDP Input

$ sudo vim /etc/rsyslog.conf

Uncomment these lines to accept UDP messages on the default port 514.
$ModLoad imudp
$UDPServerRun 514
local7.*      /var/log/kafka.log

2. Restart the rsyslog service so the changes take effect

$ sudo service rsyslog restart

3. Open your log4j.properties file:

$ vi /opt/kafka/config/log4j.properties

log4j.rootLogger =INFO, stdout, SYSLOG
log4j.appender.SYSLOG =org.apache.log4j.net.SyslogAppender
log4j.appender.SYSLOG.SyslogHost =localhost
log4j.appender.SYSLOG.Facility =LOCAL7
log4j.appender.SYSLOG.threshold =INFO
log4j.appender.SYSLOG.DatePattern ='.'yyyy-MM-dd-HH
log4j.appender.SYSLOG.layout =org.apache.log4j.PatternLayout
log4j.appender.SYSLOG.layout.ConversionPattern =kafka-broker: [%d] %p %m (%c)%n

Log4j comes out of the box with a SyslogAppender
Configure SyslogAppender to write these messages as UDP over localhost to the syslog daemon. The first field in the conversion pattern is the syslog appname.

4. Restart Kafka
$ sudo stop kafka-broker
$ sudo start kafka-broker