Skip to content

Added method GetControllerConn() in conn.go and related test . #495

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

Mbr2k
Copy link

@Mbr2k Mbr2k commented Aug 29, 2020

This provides the controller connection for the kafka cluster which is required to avoid error 41 when attempting to use segmentio to create topics on a connection to a node in the cluster which is not the controller on a server which has replication factor > 1.

…y provides the controller connection for the kafka cluster which is required to avoid error 41 when attempting to use segmentio to create topics on a connection to a node in the cluster which is not the controller on a server which has replication factor > 1.
@Mbr2k
Copy link
Author

Mbr2k commented Aug 29, 2020

I hope this addition can make it easier for people who might encounter kafka error 41 in kafka server environments which use replication to identify, resolve and make use of this extra functionality.

@Mbr2k Mbr2k changed the title Added code and test for GetControllerConn() in conn.go. This correctl… Added method and test for GetControllerConn() in conn.go. Sep 7, 2020
@Mbr2k Mbr2k changed the title Added method and test for GetControllerConn() in conn.go. Added method GetControllerConn() in conn.go and related test . Sep 7, 2020
@Pryz Pryz self-assigned this Sep 14, 2020
@Pryz
Copy link
Contributor

Pryz commented Sep 14, 2020

Hi @Zell45, thanks for your PR.

This looks a bit redundant with the existing https://pkg.go.dev/github.com/segmentio/kafka-go?tab=doc#Conn.Controller where the user would just have to use something like :

conn, _ := kafka.Dial("tcp", "localhost:9092")
controller, _ := conn.Controller()
conn, _ = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
// ... create topics, etc etc

Did you take a look at the existing Controller method ?

Thanks

@Mbr2k
Copy link
Author

Mbr2k commented Sep 14, 2020

Hi @Pryz, thanks for your review.

I posted this function to provide the convenience of guaranteeing a defined connection object accesses the controller of the cluster (throwing errors where appropriate along the way). It incorporates the existing Conn.Controller() method but rather than returning the broker it returns the direct connection object to the controller.

In my working group we actually found it to be a bit of a goose chase to create the dialler object and make the right connection after dealing with the surprise error 41 on our remote replicated cluster- mainly before this we were using the higher level abstractions of Reader and Writer objects, and the problem was not apparent on the locally hosted kafka servers.

Above you point out you can use the kafka.Dial method which might be more lightweight than the way I have created a Dialer object. Happy to hear your thoughts on it.

Thanks again!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants