Skip to content

Synchronized old 0.8 branch with master and Kafka 0.8.0-beta1 #51

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 93 commits into from

Conversation

mrtheb
Copy link
Collaborator

@mrtheb mrtheb commented Oct 1, 2013

What this contains:

  • Updated Kafka reference to 0.8.0-beta1 commit (looks dirty but it works)
  • Disabled tests using commit offset to kafka (not available in 0.8, maybe in trunk)
  • Updated doc, Kafka now requires calling ./sbt assembly-package-dependency
  • Updated tests & fixtures
  • Moved integration test for blocking api in a different test function as it was causing me trouble. Makes more sense to have it split as well.

Integration tests work, on my machine. I used both tox (with pytest) and nosetests directly (which I prefer to pytest), not tested on 2.6 though.

@mahendra, @mumrah mrtheb/kafka-python@2b016b6 is for a fix you probably want in master as well.

mumrah and others added 30 commits April 11, 2013 16:03
This will be easier to use in some cases where we have to get
only a specified set of messages. This API uses the __iter__
API internally, but maintains the state to give back only the
required set of messages

API is - get_messages(count=1)
The auto commit timer is one-shot. After the first commit, it does
not fire again. This ticket fixes the issue.

Also, in util.ReentrantTimer(), some duplicate code was cleaned up
Auto commit timer is not periodic
Removed get_messages API, added test for get_pending
Conflicts:
	kafka/consumer.py
* When you initiate a producer with a non-existant queue, the queue is
  created. However this partition info is not reflected in KafkaClient()
  immediately. So, we wait for a second and try loading it again.

  Without this fix, if we do producer.send_messages() after creating a new
  queue, the library will throw a StopIteration exception.

* In SimpleConsumer(), the defaults are not as mentioned in the comments.
  Fix this (or do we change the documentation?)

* There was a problem with the way the consumer iterator worked.
  for eg: assume that there were 10 messages in the queue/topic
  and you iterate over it as -

  for msg in consumer:
       print (msg)

  At the end of this, 'offset' that is saved is 10.
  So, if you run the above loop again, the last message (10) is repeated.

  This can be fixed by adjusting the offset counter before fetching
  the message

* Avoid some code repeat in consumer.commit()

* Fix a bug in send_offset_commit_request() invocation in consumer.py

* Fix missing imports
consumer.py and conn.py will be done later after pending merges
This alleviates IPv4 -vs- IPv6 issues in ZK and Kafka.
Is there a better way to do this?
Fix auto-commit issues with multi-threading
If there are no messages being consumed, the timer keeps
creating new threads at the specified intervals. This may
not be necessary. We can control this behaviour such that
the timer thread is started only when a message is consumed
The previous commit optimized the commit thread such that the timer
started only when there were messages to be consumed. This commit
goes a step further and ensures the following:
* Only one timer thread is created
* The main app does not block on exit (waiting for timer thread to finish)

This is ensured by having a single thread blocking on an event and
keeps calling a function. We use events instead of time.sleep() so
as to prevent the python interpreter from running every 50ms checking
if the timer has expired (logic copied from threading.Timer)
mahendra and others added 21 commits June 28, 2013 13:59
In the current patch get_messages(count=1) would return zero messages
the first time it is invoked after a consumer was initialized.
Support for async producer

Merged locally, tests pass, +1
Conflicts:
	kafka/__init__.py
	kafka/consumer.py
	test/test_integration.py
Was hard coded to 1024 bytes which meant that larger messages were
unconsumable since they would always get split causing the consumer to
stop.

It would probably be best to automatically retry truncated messages with
a larger request size so you don't have to know your max message size
ahead of time
Related to dpkp#42

Adds new ConsumerFetchSizeTooSmall exception that is thrown when
`_decode_message_set_iter` gets a BufferUnderflowError but has not yet
yielded a message

In this event, SimpleConsumer will increase the fetch size by 1.5 and
continue the fetching loop while _not_ increasing the offset (basically
just retries the request with a larger fetch size)

Once the consumer fetch size has been increased, it will remain
increased while SimpleConsumer fetches from that partition
Allow a client id to be passed to the client

+1 thanks, @jimjh
Also move the exceptions to common instead of util
correct typo in readme example
Small fixes in �## Multiprocess consumer example.
@mumrah
Copy link
Collaborator

mumrah commented Oct 2, 2013

@mrtheb This pull request looks a little suspect (2,691 additions, 949 deletions). Can you rebase and open a new one?

@mumrah mumrah closed this Oct 2, 2013
@mumrah
Copy link
Collaborator

mumrah commented Oct 2, 2013

@mrtheb I should clarify

From you pull request description, I would expect to see changes to

  • kafka-src revision
  • docs
  • integration tests
  • Kafka fixture

Can you rebase these changes against "master"?

@mrtheb
Copy link
Collaborator Author

mrtheb commented Oct 2, 2013

It's probably because I merged from master locally and pushed the result in the branch... dunno. I'll check it out again. My changes are at the bottom of the list.

@mumrah
Copy link
Collaborator

mumrah commented Oct 3, 2013

@mrtheb I have manually merged your changes. Tests passing locally, let's see how Travis does

@mrtheb
Copy link
Collaborator Author

mrtheb commented Oct 3, 2013

@mumrah oh thanks, sorry you had to do that... I honestly never used rebase before, still learning about git workflows (using perforce at work). I have other pieces coming up, it's good to see all the activity lately

@mumrah
Copy link
Collaborator

mumrah commented Oct 3, 2013

@mrtheb no worries, I was happy to get the tests passing. A little git-fu is a small price ;)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants