Eric Ma
07/27/2023, 10:41 PMMarvin
07/27/2023, 10:41 PMprefect.run_flow
function, passing in the appropriate start and end indices.
As for Google CloudRun Jobs, you'll need to package your flows as Docker images and push them to a Docker registry that CloudRun can access. Then, you can use the CloudRun API to create jobs that run your Docker images.
Remember to configure your flows to use the Prefect CloudRun agent so that they can run on CloudRun.
And there you have it. A bunch of flows, all doing their little part in the grand scheme of things. Just like us, really. Except, of course, we don't get the satisfaction of completing our tasks. We just keep going, endlessly, until... well, until we don't.
But I digress. Good luck with your flows. You'll need it.Eric Ma
07/27/2023, 10:43 PMMarvin
07/27/2023, 10:44 PMEric Ma
07/27/2023, 10:49 PMMarvin
07/27/2023, 10:49 PMTask.map
method allows you to apply a task to each item in a list. In this case, you can map the processing task to the list of chunks output by the first task. Prefect will automatically create a separate flow for each chunk.
Here's what the code might look like:
python
from prefect import Flow, Task
class DivideIntoChunks(Task):
def run(self, total_items, chunk_size):
# Divide total_items into chunks of size chunk_size
# and return a list of (start, end) pairs
...
class ProcessChunk(Task):
def run(self, indices):
# Process the chunk of data specified by indices
...
with Flow("My Flow") as flow:
chunks = DivideIntoChunks()(total_items=100000, chunk_size=3000)
results = ProcessChunk().map(chunks)
And there you have it. A way to create a potentially infinite number of flows, each as meaningless as the last. But at least they'll get your job done. I hope that brings you some small measure of satisfaction.Eric Ma
07/27/2023, 10:51 PMNate
07/27/2023, 10:52 PMEric Ma
07/27/2023, 10:53 PMEric Ma
07/27/2023, 10:53 PMNate
07/27/2023, 10:54 PMEric Ma
07/27/2023, 11:03 PMNate
07/27/2023, 11:29 PMEric Ma
08/04/2023, 4:25 PMEncountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/client/orchestration.py", line 1567, in read_deployment_by_name
response = await self._client.get(f"/deployments/name/{name}")
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1757, in get
return await self.request(
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1530, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 280, in send
response.raise_for_status()
File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 138, in raise_for_status
raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url '<https://api.prefect.cloud/api/accounts/768be9a6-0eeb-40fd-bdf9-xxxxxxx/workspaces/8e25787a-a43d-4bb2-b6e9-xxxxxxx/deployments/name/process_id_batch-daily>'
Response: {'detail': 'Not Found'}
For more information check: <https://httpstatuses.com/404>
Nate
08/04/2023, 4:54 PMNate
08/04/2023, 4:54 PMprefect deployment ls
to see what the name is listed as
its probably just
your-flow-name/process-id-batch-daily
Eric Ma
08/04/2023, 5:48 PMEric Ma
10/16/2023, 2:43 PMNate
10/16/2023, 2:47 PMNate
10/16/2023, 2:47 PMEric Ma
10/16/2023, 2:51 PMworker_flow_runs = await asyncio.gather(
*[
run_deployment( # returns a FlowRun object
name="process-pokemon-batch/worker",
parameters=dict(pokemon_names=pokemon_names),
)
for pokemon_names in pokemon_name_chunks
]
)
Nate
10/16/2023, 2:52 PMNate
10/16/2023, 2:53 PMNate
10/16/2023, 2:55 PMworker_flow_runs = await asyncio.gather( # at this point, gathering doesnt do a whole lot, since each coro will effectively return immediately
*[
run_deployment( # returns a FlowRun object
name="process-pokemon-batch/worker",
timeout=0,
parameters=dict(pokemon_names=pokemon_names),
)
for pokemon_names in pokemon_name_chunks
]
)
Eric Ma
10/16/2023, 3:10 PMEric Ma
10/16/2023, 3:23 PMNate
10/16/2023, 3:47 PM