link89
09/09/2022, 2:58 AMLouis Vines
09/09/2022, 8:42 AMRichard Alexander
09/09/2022, 2:24 PMEric Bolton
09/09/2022, 2:42 PMprefect-agent
task that registers and runs the flows
• Whenever we make an update through our CI pipeline, a new ECS task is spun up and the old task gets deprovisioned
• Even when the old task is marked as "Stopped" in AWS (there is no hardware now running the old prefect-agent
), the old Flow Run still says "Running" in the Prefect UI and hangs. Given our concurrency flags, it blocks all scheduled flow runs until it is manually canceled
• Questions:
◦ Is there a way for Prefect to detect that the old prefect-agent
is now stopped and automatically force cancel the flow run?
◦ Should we be managing our deployment process differently?John Munera
09/09/2022, 3:21 PMStefan
09/12/2022, 11:34 AMdef sql_builder(date):
statement = statement generator with date and tables
return statement
@task
def get_data(date):
sql = sql_builder(date)
get_data(sql, con)
Surat Mukker
09/13/2022, 12:32 AMSurat Mukker
09/15/2022, 3:54 AMMike He
09/16/2022, 9:09 AMcodes
!!! the deployment will overwrite the directory !!! mark down the following 4 credentials for connecting to the FTP server.
FTP_HOST = "127.0.0.1"
FTP_PORT = 21
FTP_USERNAME = "prefect"
FTP_PASSWORD = "--your-password"
• Test codes structure:
codes/
┣ .prefectignore
┣ deploy.py
┗ my_flow.py
• Codes are as follows, please alter the 4 FTP_xxx
parameters with your FTP credentials
# codes/.prefectignore
__pycache__
deploy.py
# codes/deploy.py
from prefect.deployments import Deployment
from prefect.filesystems import RemoteFileSystem
from my_flow import main_flow
FTP_HOST = "127.0.0.1"
FTP_PORT = 21
FTP_USERNAME = "prefect"
FTP_PASSWORD = "--your-password"
ftp_storage_block = RemoteFileSystem(
basepath=f"ftp://{FTP_HOST}/codes",
settings={
"host": FTP_HOST,
"port": FTP_PORT,
"username": FTP_USERNAME,
"password": FTP_PASSWORD,
},
)
ftp_storage_block.save("ftp-localhost", overwrite=True)
deployment = Deployment.build_from_flow(
main_flow, name="Main Flow", storage=ftp_storage_block
)
if __name__ == "__main__":
deployment.apply()
# codes/my_flow.py
from prefect import flow
from prefect.logging import get_run_logger
@flow
def main_flow():
logger = get_run_logger()
<http://logger.info|logger.info>("Hello")
• prefect orion start
• cd codes
and run python deploy.py
• Up an agent: prefect agent start --work-queue "default"
• Run the deployment now with defaults through Prefect Orion UI
Error Messages:
17:15:22.238 | INFO | prefect.agent - Submitting flow run '681f7ee9-ad7c-4961-bebc-6381a954e0b4'
17:15:22.320 | INFO | prefect.infrastructure.process - Opening process 'copper-unicorn'...
17:15:22.326 | INFO | prefect.agent - Completed submission of flow run '681f7ee9-ad7c-4961-bebc-6381a954e0b4'
17:15:25.478 | ERROR | Flow run 'copper-unicorn' - Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "C:\Users\Mike\AppData\Local\Programs\Python\Python310\lib\site-packages\prefect\engine.py", line 256, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File "C:\Users\Mike\AppData\Local\Programs\Python\Python310\lib\site-packages\prefect\client.py", line 103, in with_injected_client
return await fn(*args, **kwargs)
File "C:\Users\Mike\AppData\Local\Programs\Python\Python310\lib\site-packages\prefect\deployments.py", line 54, in load_flow_from_flow_run
await storage_block.get_directory(from_path=deployment.path, local_path=".")
File "C:\Users\Mike\AppData\Local\Programs\Python\Python310\lib\site-packages\prefect\filesystems.py", line 296, in get_directory
return self.filesystem.get(from_path, local_path, recursive=True)
File "C:\Users\Mike\AppData\Local\Programs\Python\Python310\lib\site-packages\fsspec\spec.py", line 801, in get
self.get_file(rpath, lpath, **kwargs)
File "C:\Users\Mike\AppData\Local\Programs\Python\Python310\lib\site-packages\fsspec\implementations\ftp.py", line 136, in get_file
outfile = open(lpath, "wb")
PermissionError: [Errno 13] Permission denied: 'C:/Users/Mike/AppData/Local/Temp/tmpz2x16tivprefect'
17:15:25.827 | INFO | prefect.infrastructure.process - Process 'copper-unicorn' exited cleanly.
I have also write a post asking this question in Discourse Link.Mike He
09/16/2022, 9:46 AMprefect.filesystem.RemoteFileSystem
with the following code the other day. But after some digging into the fsspec
source code I am not sure the problem should be classified as directly related with Prefect
or the fsspec
. Also the patching is a little bit ugly and I don't want to patch it that way for every agent environments XD. So I am asking here, if that is the case for me only with Windows platform? And if the error is universal, will the Prefect Team fix it in the future or waiting fsspec
to fix that? BTW: I have also raised an Issue to fsspec
here
Summary: Added 2 lines to RemoteFileSystem.get_directory
, as well as a new function RemoteFileSystem.get_directory_ftp
# prefect/filesystem.py
...
class RemoteFileSystem(WritableFileSystem, WritableDeploymentStorage):
...
async def get_directory(
self, from_path: Optional[str] = None, local_path: Optional[str] = None
) -> None:
"""
Downloads a directory from a given remote path to a local direcotry.
Defaults to downloading the entire contents of the block's basepath to the current working directory.
"""
if from_path is None:
from_path = str(self.basepath)
else:
from_path = self._resolve_path(from_path)
if local_path is None:
local_path = Path(".").absolute()
if urllib.parse.urlsplit(self.basepath).scheme == 'ftp': # Add
return await self.get_directory_ftp(from_path, local_path) # Add
return self.filesystem.get(from_path, local_path, recursive=True)
async def get_directory_ftp(
self, from_path: Optional[str] = None, local_path: Optional[str] = None
) -> None:
from_path_raw = urllib.parse.urlsplit(from_path).path
for file_directory_item in <http://self.filesystem.ls|self.filesystem.ls>(from_path):
type_ = file_directory_item["type"]
name = file_directory_item["name"]
other_path = name[len(from_path_raw) :]
if other_path.startswith("/"): # Change to relative path
other_path = other_path[1:]
dest_path = Path(local_path).joinpath(other_path)
if type_ == "directory":
if (not dest_path.exists()) or dest_path.is_file():
dest_path.mkdir()
await self.get_directory_ftp(name, dest_path)
if type_ == "file":
try:
self.filesystem.get_file(name, dest_path)
except:
print(f"FTP Error downloading {name} to {dest_path}")
Misha B
09/22/2022, 5:23 PMMisha B
09/22/2022, 5:23 PMBertangela Loret de Mola
09/23/2022, 2:14 AMJohn Kang
09/23/2022, 1:47 PMZi Yuan
09/26/2022, 7:54 AMRichard Alexander
09/27/2022, 2:01 PMcannot pickle '_thread.lock' object
I have found some methods to get around this for v1, but I can't find anything for v2. How can I get this flow to run?Jon Young
09/27/2022, 2:34 PMb_2_c_task_descriptor.py
. This might scale up well, but I think better to break tasks and workflows into their own directories.
Thoughts? What has scaled well for you?gertjan
09/28/2022, 1:57 PMexport PREFECT_LOGGING_EXTRA_LOGGERS='my-loggers-name'
Now, when I run my codebase in multiprocessing these logs are not shown in the UI.
Anyone has some tips how I still stream the logs of my-loggers-name
to the UI while using multiprocessing?
EDIT: the root
Logger is also ignored..yair friedman
09/29/2022, 4:54 AMyair friedman
09/29/2022, 4:54 AMyair friedman
09/29/2022, 4:54 AMyair friedman
09/29/2022, 4:54 AMyair friedman
09/29/2022, 4:55 AMThomas Fredriksen
09/30/2022, 8:16 AMThomas Fredriksen
10/03/2022, 6:19 PMJordan Charlier
10/04/2022, 2:51 PMRico Farina
10/06/2022, 9:55 AMJaime Raldua Veuthey
10/06/2022, 6:34 PMrkoolbergen
10/07/2022, 2:06 PMJohn
10/08/2022, 7:24 AM/tmp/
), any code that relies on relative paths is now broken. What's the best practice on this? Using absolute paths throughout? Change working directory?
Related threads: thread 1, thread 2, Github open issue 6391John
10/08/2022, 7:24 AM/tmp/
), any code that relies on relative paths is now broken. What's the best practice on this? Using absolute paths throughout? Change working directory?
Related threads: thread 1, thread 2, Github open issue 6391Anna Geller
10/08/2022, 10:09 AM