_______               __                   _______
       |   |   |.---.-..----.|  |--..-----..----. |    |  |.-----..--.--.--..-----.
       |       ||  _  ||  __||    < |  -__||   _| |       ||  -__||  |  |  ||__ --|
       |___|___||___._||____||__|__||_____||__|   |__|____||_____||________||_____|
                                                             on Gopher (inofficial)
   URI Visit Hacker News on the Web
       
       
       COMMENT PAGE FOR:
   URI   Multi-tenant queues in Postgres
       
       
        rco8786 wrote 22 hours 48 min ago:
        Interesting read, but it seems like the grouping challenge could be
        fairly trivially solved in the application layer by registering job
        types (groups) with the poller, and having the round robin logic there.
       
          remram wrote 9 hours 59 min ago:
          How?
          
          You mean having the worker "know" that it's Alice's turn, and pull 10
          Alice task from the database? Doesn't seem right.
       
            rco8786 wrote 2 hours 17 min ago:
            Why not
       
        aranw wrote 1 day ago:
        Thanks for this write up. Really interesting I've built queues using
        Postgres before but never anything this complex so I'm sure this
        article will come in use and be handy in future!
       
        nosefrog wrote 1 day ago:
        Is a fair queue worth it vs spinning up more capacity? I've worked on
        multiple projects where we've ended up ripping out a queue and just
        spinning up more machines to handle the load synchronously instead.
       
          strken wrote 1 day ago:
          Is it a choice? Most projects I've worked on had times when they
          became overwhelmed with requests; a queue handles this case, but more
          capacity just makes it rarer. Ideally you want enough capacity to
          handle X% of requests within Y milliseconds and a queue to deal with
          the leftovers, and I suppose if your X is low enough then a fair
          queue becomes a necessity.
       
          vidarh wrote 1 day ago:
          More capacity won't address operations that the originator isn't
          willing to (or can't) hang around to wait for and/or that are
          long-running enough that restarts due to failures might be needed.
          That's the most immediate reason: Tasks where no amount of capacity
          will remove the need to have some form of queueing mechanism.
          
          For complex enough workflows, queues are also often helpful at
          addressing potentially failing stages in ways that are easier to
          debug. But in that case you want your queue to be closer to a state
          machine where actually waiting in the queue for much time is the
          exception. You can just build a state machine for that too, ensuring
          the inputs to the stage about to execute are recorded in a durable,
          restartable way. But sometimes you may need more copies of the same
          type of job, and soon you have something that looks and smells much
          like a queue anyway.
          
          Then lastly, spikes. But they only really help well enough if you
          still spin up more machines aggressively enough that the wait time
          doesn't get long enough to be perceived as just as bad as or worse
          than an immediate error, so it does make sense to ask your question.
          
          A queue also doesn't need to be complex. If it gets complex, that
          increases the reasons to ask your question for that specific system.
          If it potentially grows large, as well (sometimes the solution to
          that is simply to refuse to queue if the queue exceeds a certain size
          or the processing time goes above a certain threshold).
          
          Queues are great when appropriate, but they do often get used as a
          "solution" to a scaling problem that hasn't been sufficiently
          analyzed, which sounds like might have been the case in your
          examples.
       
          datascienced wrote 1 day ago:
          I guess a queue handles spikes, and is OK for async-allowed
          operations such as generating a PDF and email it over say loading a
          web page.
          
          A queue may give you time to scale up too?
       
        mind-blight wrote 1 day ago:
        Super cool! I was looking at the self hosted quickstart, and it looks
        like Docker compare installs both hatchet and RabitMQ. Does hatchet use
        rabbit alongside Postgres?
       
          abelanger wrote 1 day ago:
          Yeah we're using RabbitMQ for pub/sub (but are considering getting
          rid of it) and Postgres for the actual task queue. There's some more
          about that here: [1] .
          
   URI    [1]: https://news.ycombinator.com/item?id=39643940
       
        wbeckler wrote 1 day ago:
        I love your animations! How did you do those?
       
          abelanger wrote 1 day ago:
          Thank you! I've been using [1] with the Lottie exporter. It also has
          a Figma plugin so you can reuse components.
          
   URI    [1]: https://jitter.video
       
            victor106 wrote 20 hours 27 min ago:
            lovely animations.
            
            Can you expand on this. Why did you have to use both jitter and
            Lottie?
       
        phibz wrote 1 day ago:
        Why not something like Kafka or Redis?
       
          klysm wrote 1 day ago:
          Using Kafka as a work queue is widely documented as a mistake. Using
          a single database results in a lot of operational simplicity and you
          get to skip a lot of distributed systems when all your state is in a
          single system
       
            victor106 wrote 20 hours 31 min ago:
            > Using Kafka as a work queue is widely documented as a mistake
            
            Could not find any good sources on this. Can you please provide any
            references?
       
              klysm wrote 19 hours 30 min ago:
              don’t have any off the top of my head sorry, I could be making
              it up but I’m pretty sure I remember reading that from several
              sources. Iirc it has to do with the fact that there isn’t
              really single message acknowledging?
       
          teraflop wrote 1 day ago:
          The most straightforward reason is that if you need a transactional
          database anyway, then moving the queue into the DB allows you to
          atomically en/dequeue messages at the same time as making other
          updates. Which can massively simplify your architecture because it
          eliminates an enormous category of possible failure modes. (Or it can
          massively improve your system correctness, if you didn't realize
          those failure modes were possible.)
       
          dewey wrote 1 day ago:
          Also most of the times you already have a database so why not use
          that instead of adding another service to the pile.
       
          hipadev23 wrote 1 day ago:
          Because that’s sane, easy, and boring.
       
        time0ut wrote 1 day ago:
        Very cool. Bookmarked in case I ever need to do this.
        
        I have implemented a transactional outbox in postgres using a simpler
        version of this plus a trigger to notify listening workers. It worked
        well and vastly outpaced the inserting processes. It easily handled
        several million tasks per hour without issue.
        
        It is also nice the article showed the correct CTE based form of the
        query. It is possible to naively write the query without it and
        sometimes get way more tasks than you asked for when there are
        concurrent workers. I discovered that pretty quickly but it had me
        pulling my hair out…
       
        jvolkman wrote 1 day ago:
        I have my queue workers maintain a list of task owners they've already
        processed, and prefer to get a task from an owner they've
        least-recently seen (or haven't seen) using `ORDER BY
        array_position(:seen_owner_ids, owner_id) desc`. Each new task's
        owner_id is inserted into the front of the list (and removed elsewhere
        if it exists).
        
        But I have a relatively small number of possible `owner_id` values at
        any given time.
       
        ndriscoll wrote 1 day ago:
        You've got something wonky going on with that query plan for the 2nd
        partition by attempt. In particular the seq scan on tasks to do the
        `(tasks.id = eligible_tasks.id)` hash join seems odd. The filter on
        queued status in `CTE eligible_tasks` (and not in the last join) also
        seems weird. Is that plan for the same query in the article?
        
        If you add an index on `group_key, id WHERE status = 'queued'` and
        remove the 2nd `WHERE tasks."status" = 'QUEUED'` (I believe that's
        redundant?), you might get a better plan. You'd want something to make
        sure you're not preferring one group_key as well.
        
        I think you should be able to solve your problem with workers having
        zero tasks by moving the LIMIT into the second CTE?
        
        It's also useful in practice to have something like a worker_id and
        timestamp and not just set status to RUNNING in case a worker gets
        stuck/dies and you need to unclaim the work.
       
          abelanger wrote 1 day ago:
          Ah, great catch - I just pushed an update to match the query from the
          article. I wasn't looking at much except for the WindowAgg line,
          thank you!
          
          I tried with a similar indexing strategy - it did make a very
          noticeable difference, breaking at about 40000 enqueued tasks instead
          of 25000. I left indexing out of the article because it can open up a
          different can of worms with performance degradation over a longer
          time horizon.
          
          I also tried with an `ORDER BY RANDOM()` across group keys first,
          which does help with fairness but breaks the "deterministic" element.
       
            ndriscoll wrote 1 day ago:
            With your updated plan, you have
            
            > Hash Cond: (tasks.id = t1.id)
            
            >       ->  Seq Scan on tasks  (cost=0.00..254.60 rows=48
            width=14) (actual time=0.566..10.550 rows=10000 loops=1)
            
            >         Filter: (status = 'QUEUED'::"TaskStatus")
            
            So it's still doing a seq scan on tasks when I'd expect it to join
            using the PK. It must be getting tripped up by the redundant filter
            on queued status. Try removing that.
            
            I ninja edited my previous comment, but if you move the LIMIT to
            the 2nd CTE, that should fix your issue with workers not getting
            work. If you do that and add the other index I think in principle
            it should be able to do everything by maintaining a priority queue
            of the heads of each partition (which are each pre-sorted by the
            index now). idk if pg does that though. If it does, then that
            portion of the query should be streamed, so you don't need to try
            to limit it early to avoid a sort of 10k elements when you only
            need 10. Then if you remove the redundant QUEUED check, it should
            be doing everything else through indexes.
            
            Basically, if doing this manually I'd expect the "good" solution to
            do this in a way where starting from an index, each row is streamed
            (i.e. no full sort) with logn complexity. So I look at it from a
            perspective of "how do I get the database to do what I'd do by
            hand?"
       
              abelanger wrote 1 day ago:
              I created a gist with these recommendations - certainly an
              improvement, but I don't think it gets around the `WindowAgg`
              running across all 10k rows: [1] . Does this accurately implement
              your suggestions?
              
              Happy to try out other suggestions, but I haven't found a way to
              get a window function or `JOIN LATERAL` to scale in near-constant
              time for this queueing strategy.
              
   URI        [1]: https://gist.github.com/abelanger5/5c1a75755072239716cb5...
       
                ndriscoll wrote 1 day ago:
                It looks like now it does still only pull 100 rows out of the
                sort (so moving the limit into the 2nd cte didn't hurt). It
                isn't doing all 10000 rows now though, which is interesting. By
                any chance, do you have 9200 different tenants? If so that
                makes sense. What I suggested would work when you have a small
                number of tenants with queued work (it scales n log n with
                tenants with queued work, but log n with amount of tasks that a
                single tenant has queued). So if you're currently testing with
                many tenants queueing at once, you could see how it behaves
                with like 20 tenants where one has 9k items and the others have
                ~50 each. Then it sort of depends on how your distribution
                looks in practice to know whether that's acceptable.
                
                You could also probably do tricks where individual workers
                filter to specific tenant IDs in the first CTE (e.g. filter
                group_key mod num_workers = worker_id) to reduce that
                cardinality if you expect it to be large. Or you could e.g.
                select 100 random group_keys as a first step and use that to
                filter the window, but then that needs another partial index on
                just `group_key where status = queued`.
                
                Edit: I see it's still doing a seq scan on tasks. There is a PK
                on id, right? It knows there's only 100 rows from the rest of
                the query so it seems odd to me that it would decide to scan
                the table. You could try putting a hash index on id if it's
                refusing to use the btree index I guess. Or it might change its
                mind if you add 1M SUCCEEDED tasks or something.
                
                Another thing to consider is that out of the box, pg's default
                config for the planner is tuned to like 20 year old hardware.
                You need to tweak the io costs for SSDs and tell it you have
                more RAM if you haven't done that. See e.g. [1] for better
                starting values.
                
   URI          [1]: https://pgtune.leopard.in.ua/
       
        plandis wrote 1 day ago:
        At a previous job we did something similar but ended up having workers
        first poll another table to determine which tenant to query against. We
        called these items tokens and they represented a finite amount of
        dedicated thread time for processing a specific tenants’ queue.
        
        What this looked like was a worker thread would first query the token
        table for which tenant to process eligible tasks from, and then update
        the token to take a timed lock and during that time would solely
        process eligible tasks from a specific tenant.
        
        This has some nice properties:
        
        1. You can scale different tenants using different amounts of tokens
        which means different but controlled amounts of thread time.
        
        2. It allows for locality on your worker thread. Within a specific
        tenant the processing was usually similar so any shared resources could
        be cached and reused after polling for additional eligible tasks from
        the tenants queue.
       
          klysm wrote 1 day ago:
          I like this approach a lot, but I’m unsure about time based vs
          number of items based fairness. I guess it really depends on the
          application.
       
          magicalhippo wrote 1 day ago:
          Reminded me of the token bucket[1] algorithm. Good point about
          locality.
          
          [1] 
          
   URI    [1]: https://en.wikipedia.org/wiki/Token_bucket
       
        jperras wrote 1 day ago:
        Is the `FOR UPDATE SKIP LOCKED` in the CTE necessary? Granted my
        understanding of Postgres row-level locking and their interaction with
        CTEs may be a bit lacking, but according to the docs[1]:
        
          The sub-statements in WITH are executed concurrently with each other
        and with the main query. Therefore, when using data-modifying
        statements in WITH, the order in which the specified updates actually
        happen is unpredictable. All the statements are executed with the same
        snapshot (see Chapter 13), so they cannot “see” one another's
        effects on the target tables.
        
        1.
        
   URI  [1]: https://www.postgresql.org/docs/current/queries-with.html#QUER...
       
          mslot wrote 1 day ago:
          Read committed mode (PostgreSQL's default) can get pretty funky.
          
          If two transactions concurrently perform a SELECT (may be in a CTE)
          followed by an UPDATE, then they might see and try to update the same
          rows. That's often undesirable, for instance in the example of a
          queue where messages are supposed to arrive ~once. Serializable mode
          would "solve" the problem by letting one transaction fail, and
          expects the application to retry or otherwise deal with the
          consequences.
          
          FOR UPDATE is a precision tool for working around read committed
          limitations. It ensures rows are locked by whichever transaction
          reads them first, such that the second reader blocks and (here's the
          funky part) when the first transaction is done it actually reads the
          latest row version instead of the one that was in the snapshot.
          That's semantically a bit weird, but nonetheless very useful, and
          actually matches how updates work in PostgreSQL.
          
          The biggest issue with SELECT..FOR UPDATE is that it blocks waiting
          for concurrent updaters to finish, even if the rows no longer match
          its filter after the update. The SKIP LOCKED avoids all that by
          simply skipping the locked rows in the SELECT. Semantically even
          weirder, but very useful for queues.
       
            jperras wrote 1 day ago:
            Ah, I see - the default transactional isolation level is what I
            wasn't accounting for.
            
            Thanks for the explanation! Very much appreciated.
       
        ucarion wrote 1 day ago:
        This is pretty fancy stuff! Sorry if I'm just not reading carefully
        enough, but does this approach account for tenants whose messages take
        longer to process, as opposed to a tenant that sends a larger volume of
        messages?
       
          abelanger wrote 1 day ago:
          No, this assumes you've already split your tasks into quanta of
          approximately the same duration - so all tasks are weighted equally
          in terms of execution time. If each of the tasks have different
          weights you might be looking at something like deficit round-robin
          [1], which I've looked at implementations for in RabbitMQ and we're
          thinking about how to implement in PG as well [2]. [1]
          
   URI    [1]: https://www.cs.bu.edu/fac/matta/Teaching/cs655-papers/DRR.pd...
   URI    [2]: https://nithril.github.io/amqp/2015/07/05/fair-consuming-wit...
       
            ucarion wrote 1 day ago:
            Makes sense!
            
            My gut tells me that it would often make sense to jump straight to
            shuffle sharding, where you'd converge on fair solutions
            dynamically, in a lot of cases. I'm looking forward to that
            follow-on article!
       
        andrewstuart wrote 1 day ago:
        I wonder if Postgres RBAC row based access control is another solution
        to this.
       
          remram wrote 9 hours 57 min ago:
          This isn't related to access control. Workers process jobs for every
          user, that's actually the source of the problem.
       
       
   DIR <- back to front page